Skip to content

Commit a3d7292

Browse files
committed
I/O: Apache Iceberg (inline comments)
1 parent 64fc69a commit a3d7292

2 files changed

Lines changed: 16 additions & 19 deletions

File tree

cratedb_toolkit/cluster/core.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,9 @@ def save_table(
640640
"""
641641
Export data from a database table on a standalone CrateDB Server.
642642
643+
Note: The `transformation` parameter is not respected yet, but required by contract.
644+
In this spirit, it is reserved for later use.
645+
643646
Synopsis
644647
--------
645648
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
@@ -653,11 +656,8 @@ def save_table(
653656
if target_url_obj.scheme.startswith("iceberg") or target_url_obj.scheme.endswith("iceberg"):
654657
from cratedb_toolkit.io.iceberg import to_iceberg
655658

656-
if to_iceberg(source_url, target.url):
657-
self._load_table_result = True
658-
else:
659-
logger.error("Data loading failed or incomplete")
660-
self._load_table_result = False
659+
if not to_iceberg(source_url, target.url):
660+
raise IOError("Data loading failed or incomplete")
661661

662662
else:
663663
raise NotImplementedError(f"Exporting resource not implemented yet: {target_url_obj}")

cratedb_toolkit/io/iceberg.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -172,14 +172,19 @@ def from_iceberg(source_url, target_url, progress: bool = False):
172172
logger.info(f"Running Iceberg copy with chunksize={chunksize}")
173173
engine = sa.create_engine(str(cratedb_url))
174174

175-
# This conversion to pandas is zero-copy,
176-
# so we can utilize their SQL utils for free.
177-
# https://github.com/pola-rs/polars/issues/7852
175+
# Note: The conversion to pandas is zero-copy,
176+
# so we can utilize their SQL utils for free.
177+
# https://github.com/pola-rs/polars/issues/7852
178178
# Note: This code also uses the most efficient `insert_bulk` method with CrateDB.
179-
# https://cratedb.com/docs/sqlalchemy-cratedb/dataframe.html#efficient-insert-operations-with-pandas
179+
# https://cratedb.com/docs/sqlalchemy-cratedb/dataframe.html#efficient-insert-operations-with-pandas
180180
# Note: `collect_batches()` is marked as unstable and slower than native sinks;
181-
# consider native Polars sinks (e.g., write_database) as a faster alternative if available.
182-
# https://github.com/crate/cratedb-toolkit/pull/444#discussion_r2825382887
181+
# consider native Polars sinks (e.g., write_database) as a faster alternative if available.
182+
# https://github.com/crate/cratedb-toolkit/pull/444#discussion_r2825382887
183+
# Note: This variant appeared to be much slower, let's revisit and investigate why?
184+
# table.to_polars().collect(streaming=True).write_database(
185+
# table_name=cratedb_table.fullname, connection=engine, if_table_exists="replace" # noqa: ERA001
186+
# Note: When `collect_batches` yields more than one batch, the first batch must use the
187+
# user-specified `if_exists`, but subsequent batches must use "append".
183188
with pl.Config(streaming_chunk_size=chunksize):
184189
table = iceberg_address.load_table()
185190
for batch in table.collect_batches(engine="streaming", chunk_size=chunksize):
@@ -192,14 +197,6 @@ def from_iceberg(source_url, target_url, progress: bool = False):
192197
chunksize=chunksize,
193198
method=insert_bulk,
194199
)
195-
196-
# Note: This variant was much slower.
197-
"""
198-
table.to_polars().collect(streaming=True).write_database(
199-
table_name=cratedb_table.fullname, connection=engine, if_table_exists="replace"
200-
)
201-
"""
202-
203200
return True
204201

205202

0 commit comments

Comments
 (0)