Skip to content

Commit 3fefa82

Browse files
committed
I/O: Apache Iceberg (suggestions by CodeRabbit)
1 parent d42a985 commit 3fefa82

File tree

4 files changed

+56
-24
lines changed

4 files changed

+56
-24
lines changed

cratedb_toolkit/cluster/core.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
DatabaseAddressMissingError,
2121
OperationFailed,
2222
)
23-
from cratedb_toolkit.io.iceberg import from_iceberg, to_iceberg
2423
from cratedb_toolkit.io.ingestr.api import ingestr_copy, ingestr_select
2524
from cratedb_toolkit.model import ClusterAddressOptions, DatabaseAddress, InputOutputResource, TableAddress
2625
from cratedb_toolkit.util.client import jwt_token_patch
@@ -617,7 +616,10 @@ def load_table(
617616
self._load_table_result = False
618617

619618
elif source_url_obj.scheme.startswith("iceberg") or source_url_obj.scheme.endswith("iceberg"):
620-
return from_iceberg(str(source_url_obj), target_url)
619+
from cratedb_toolkit.io.iceberg import from_iceberg
620+
621+
if from_iceberg(str(source_url_obj), target_url):
622+
self._load_table_result = True
621623

622624
elif ingestr_select(source_url):
623625
if ingestr_copy(source_url, self.address, progress=True):
@@ -648,7 +650,9 @@ def save_table(
648650
target_url_obj = URL(target.url)
649651

650652
if target_url_obj.scheme.startswith("iceberg") or target_url_obj.scheme.endswith("iceberg"):
651-
return to_iceberg(source_url, target.url)
653+
from cratedb_toolkit.io.iceberg import to_iceberg
654+
655+
to_iceberg(source_url, target.url)
652656
else:
653657
raise NotImplementedError(f"Exporting resource not implemented yet: {target_url_obj}")
654658

cratedb_toolkit/io/cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def save_table(
116116
source = TableAddress(schema=schema, table=table)
117117
target = InputOutputResource(url=url, format=format_, compression=compression)
118118

119-
# Dispatch "load table" operation.
119+
# Dispatch "save table" operation.
120120
cluster = DatabaseCluster.create(
121121
cluster_id=cluster_id,
122122
cluster_name=cluster_name,

cratedb_toolkit/io/iceberg.py

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
"""
2+
Apache Iceberg integration for CrateDB Toolkit.
3+
4+
This module provides functionality to transfer data between Iceberg tables
5+
and CrateDB databases, supporting both import and export operations.
6+
"""
7+
18
import dataclasses
29
import logging
310
import tempfile
@@ -44,7 +51,7 @@ def from_url(cls, url: str):
4451
if iceberg_url.scheme.endswith("+iceberg"):
4552
iceberg_url.scheme = iceberg_url.scheme.replace("+iceberg", "")
4653
u2 = copy(iceberg_url)
47-
u2._query = ""
54+
u2.query_params.clear()
4855
location = str(u2)
4956
return cls(
5057
url=iceberg_url,
@@ -55,6 +62,11 @@ def from_url(cls, url: str):
5562
)
5663

5764
def load_catalog(self) -> Catalog:
65+
"""
66+
Load the Iceberg catalog with appropriate configuration.
67+
"""
68+
# TODO: Consider accepting catalog configuration as parameters
69+
# to support different catalog types (Hive, REST, etc.).
5870
return load_catalog(self.catalog, **self.catalog_properties)
5971

6072
@property
@@ -69,29 +81,34 @@ def catalog_properties(self):
6981

7082
@property
7183
def storage_options(self):
72-
return {
84+
opts = {
7385
"s3.endpoint": self.url.query_params.get("s3.endpoint"),
7486
"s3.region": self.url.query_params.get("s3.region"),
7587
"s3.access-key-id": self.url.query_params.get("s3.access-key-id"),
7688
"s3.secret-access-key": self.url.query_params.get("s3.secret-access-key"),
7789
}
90+
return {k: v for k, v in opts.items() if v is not None}
7891

7992
@property
8093
def identifier(self):
94+
"""
95+
Return the catalog-table identifier tuple.
96+
"""
8197
return (self.namespace, self.table)
8298

8399
def load_table(self) -> pl.LazyFrame:
84100
"""
85-
Load a table from a catalog, or by scanning the filesystem.
101+
Load the Iceberg table as a Polars LazyFrame.
102+
103+
Either load a table from a catalog, or by scanning the filesystem.
86104
"""
87105
if self.catalog is not None:
88106
catalog = self.load_catalog()
89107
return catalog.load_table(self.identifier).to_polars()
90-
else:
91-
return pl.scan_iceberg(self.location, storage_options=self.storage_options)
108+
return pl.scan_iceberg(self.location, storage_options=self.storage_options)
92109

93110

94-
def from_iceberg(source_url, cratedb_url, progress: bool = False):
111+
def from_iceberg(source_url, target_url, progress: bool = False):
95112
"""
96113
Scan an Iceberg table from local filesystem or object store, and load into CrateDB.
97114
https://docs.pola.rs/api/python/stable/reference/api/polars.scan_iceberg.html
@@ -121,11 +138,11 @@ def from_iceberg(source_url, cratedb_url, progress: bool = False):
121138
# Display parameters.
122139
logger.info(f"Iceberg address: Path: {iceberg_address.location}")
123140

124-
cratedb_address = DatabaseAddress.from_string(cratedb_url)
141+
cratedb_address = DatabaseAddress.from_string(target_url)
125142
cratedb_url, cratedb_table = cratedb_address.decode()
126143
if cratedb_table.table is None:
127144
raise ValueError("Table name is missing. Please adjust CrateDB database URL.")
128-
logger.info(f"Target address: {cratedb_address}")
145+
logger.info("Target address: %s", cratedb_address)
129146

130147
# Invoke copy operation.
131148
logger.info("Running Iceberg copy")
@@ -139,7 +156,7 @@ def from_iceberg(source_url, cratedb_url, progress: bool = False):
139156
# https://github.com/pola-rs/polars/issues/7852
140157
# Note: This code also uses the most efficient `insert_bulk` method with CrateDB.
141158
# https://cratedb.com/docs/sqlalchemy-cratedb/dataframe.html#efficient-insert-operations-with-pandas
142-
table.collect(streaming=True).to_pandas().to_sql(
159+
table.collect(engine="streaming").to_pandas().to_sql(
143160
name=cratedb_table.table,
144161
schema=cratedb_table.schema,
145162
con=engine,
@@ -153,8 +170,10 @@ def from_iceberg(source_url, cratedb_url, progress: bool = False):
153170
# table.to_polars().collect(streaming=True) \ # noqa: ERA001
154171
# .write_database(table_name=table_address.fullname, connection=engine, if_table_exists="replace")
155172

173+
return True
156174

157-
def to_iceberg(cratedb_url, target_url, progress: bool = False):
175+
176+
def to_iceberg(source_url, target_url, progress: bool = False):
158177
"""
159178
Synopsis
160179
--------
@@ -167,7 +186,7 @@ def to_iceberg(cratedb_url, target_url, progress: bool = False):
167186
"s3+iceberg://bucket1/?catalog=default&namespace=demo&table=taxi-tiny&s3.access-key-id=<your_access_key_id>&s3.secret-access-key=<your_secret_access_key>&s3.endpoint=<endpoint_url>&s3.region=<s3-region>"
168187
""" # noqa:E501
169188

170-
cratedb_address = DatabaseAddress.from_string(cratedb_url)
189+
cratedb_address = DatabaseAddress.from_string(source_url)
171190
cratedb_url, cratedb_table = cratedb_address.decode()
172191
if cratedb_table.table is None:
173192
raise ValueError("Table name is missing. Please adjust CrateDB database URL.")
@@ -197,3 +216,5 @@ def to_iceberg(cratedb_url, target_url, progress: bool = False):
197216
catalog_properties=catalog_properties,
198217
append=False, # TODO: Make available via parameter.
199218
)
219+
220+
return True

tests/io/test_iceberg.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,32 @@
11
from pathlib import Path
22

33
import pandas as pd
4-
import polars as pl
54
import pytest
65
from click.testing import CliRunner
76
from pueblo.testing.dataframe import DataFrameFactory
8-
from pyiceberg.catalog import load_catalog
97

108
from cratedb_toolkit.cli import cli
119

12-
if not hasattr(pd.DataFrame, "to_iceberg2"):
10+
polars = pytest.importorskip("polars", reason="Skipping Iceberg tests because 'polars' package is not installed")
11+
12+
import polars as pl # noqa: E402
13+
14+
if not hasattr(pd.DataFrame, "to_iceberg"):
1315
raise pytest.skip("Older pandas releases do not support Apache Iceberg", allow_module_level=True)
1416

1517

1618
@pytest.fixture
1719
def example_iceberg(tmp_path) -> Path:
20+
from pyiceberg.catalog import load_catalog # noqa: E402
21+
1822
catalog_properties = {
1923
"uri": f"sqlite:///{tmp_path}/pyiceberg_catalog.db",
2024
"warehouse": str(tmp_path),
2125
}
2226
catalog = load_catalog("default", **catalog_properties)
2327
catalog.create_namespace_if_not_exists("demo")
2428

25-
dff = DataFrameFactory(rows=42)
29+
dff = DataFrameFactory()
2630
df = dff.make_mixed()
2731
df.to_iceberg(
2832
"demo.mixed",
@@ -41,10 +45,13 @@ def find_iceberg_data_metadata_location(table_path: Path) -> Path:
4145
Resolve path to metadata.json file in Iceberg table.
4246
This path is needed for `polars.scan_iceberg()`.
4347
"""
44-
return sorted((table_path / "metadata").glob("*.json"))[-1]
48+
files = sorted((table_path / "metadata").glob("*.json"))
49+
if not files:
50+
raise FileNotFoundError(f"No Iceberg metadata JSON found under {table_path / 'metadata'}")
51+
return files[-1]
4552

4653

47-
def test_load_iceberg_table(caplog, cratedb, example_iceberg):
54+
def test_load_iceberg_table(cratedb, example_iceberg):
4855
"""
4956
Verify loading data from an Iceberg table into CrateDB.
5057
"""
@@ -69,7 +76,7 @@ def test_load_iceberg_table(caplog, cratedb, example_iceberg):
6976
assert db.count_records("testdrive.demo") == 5, "Table `testdrive.demo` does not include expected amount of records"
7077

7178

72-
def test_save_iceberg_table(caplog, cratedb, tmp_path):
79+
def test_save_iceberg_table(cratedb, tmp_path):
7380
"""
7481
Verify saving data from CrateDB into an Iceberg table.
7582
"""
@@ -90,7 +97,7 @@ def test_save_iceberg_table(caplog, cratedb, tmp_path):
9097
# Verify data in Iceberg table.
9198
metadata_location = find_iceberg_data_metadata_location(tmp_path / "sys" / "summits")
9299
table = pl.scan_iceberg(str(metadata_location))
93-
assert table.collect_schema().names() == [
100+
assert sorted(table.collect_schema().names()) == [
94101
"classification",
95102
"coordinates",
96103
"country",
@@ -101,4 +108,4 @@ def test_save_iceberg_table(caplog, cratedb, tmp_path):
101108
"range",
102109
"region",
103110
]
104-
assert table.collect().height == 1605, "Iceberg table does not include expected amount of records"
111+
assert table.collect().height >= 1600, "Iceberg table does not include expected amount of records"

0 commit comments

Comments
 (0)