Skip to content

Commit 82c3124

Browse files
committed
I/O: Apache Iceberg (avoid OOM in from_iceberg)
1 parent 0634661 commit 82c3124

File tree

1 file changed

+13
-9
lines changed

1 file changed

+13
-9
lines changed

cratedb_toolkit/io/iceberg.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -177,17 +177,21 @@ def from_iceberg(source_url, target_url, progress: bool = False):
177177
# https://github.com/pola-rs/polars/issues/7852
178178
# Note: This code also uses the most efficient `insert_bulk` method with CrateDB.
179179
# https://cratedb.com/docs/sqlalchemy-cratedb/dataframe.html#efficient-insert-operations-with-pandas
180+
# Note: `collect_batches()` is marked as unstable and slower than native sinks;
181+
# consider native Polars sinks (e.g., write_database) as a faster alternative if available.
182+
# https://github.com/crate/cratedb-toolkit/pull/444#discussion_r2825382887
180183
with pl.Config(streaming_chunk_size=chunksize):
181184
table = iceberg_address.load_table()
182-
table.collect(engine="streaming").to_pandas().to_sql(
183-
name=cratedb_table.table,
184-
schema=cratedb_table.schema,
185-
con=engine,
186-
if_exists=if_exists,
187-
index=False,
188-
chunksize=chunksize,
189-
method=insert_bulk,
190-
)
185+
for batch in table.collect_batches(engine="streaming", chunk_size=chunksize):
186+
batch.to_pandas().to_sql(
187+
name=cratedb_table.table,
188+
schema=cratedb_table.schema,
189+
con=engine,
190+
if_exists=if_exists,
191+
index=False,
192+
chunksize=chunksize,
193+
method=insert_bulk,
194+
)
191195

192196
# Note: This variant was much slower.
193197
"""

0 commit comments

Comments
 (0)