Skip to content

Commit 039c127

Browse files
committed
I/O: Apache Iceberg (blob storage compatibility)
1 parent b6a74fd commit 039c127

File tree

2 files changed

+68
-14
lines changed

2 files changed

+68
-14
lines changed

cratedb_toolkit/io/iceberg.py

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import dataclasses
22
import logging
3+
import tempfile
4+
from copy import copy
35

46
import pandas as pd
57
import polars as pl
@@ -18,18 +20,35 @@
1820

1921
@dataclasses.dataclass
2022
class IcebergAddress:
21-
path: str
23+
url: URL
24+
location: str
2225
catalog: str
2326
namespace: str
2427
table: str
2528

29+
def __post_init__(self):
30+
self.tmpdir = tempfile.TemporaryDirectory()
31+
self.temporary_catalog_location = self.tmpdir.name
32+
33+
def __del__(self):
34+
self.tmpdir.cleanup()
35+
2636
@classmethod
2737
def from_url(cls, url: str):
2838
iceberg_url = URL(url)
29-
if iceberg_url.host == ".":
30-
iceberg_url.path = iceberg_url.path.lstrip("/")
39+
if iceberg_url.scheme.startswith("file"):
40+
if iceberg_url.host == ".":
41+
iceberg_url.path = iceberg_url.path.lstrip("/")
42+
location = iceberg_url.path
43+
else:
44+
if iceberg_url.scheme.endswith("+iceberg"):
45+
iceberg_url.scheme = iceberg_url.scheme.replace("+iceberg", "")
46+
u2 = copy(iceberg_url)
47+
u2._query = ""
48+
location = str(u2)
3149
return cls(
32-
path=iceberg_url.path,
50+
url=iceberg_url,
51+
location=location,
3352
catalog=iceberg_url.query_params.get("catalog"),
3453
namespace=iceberg_url.query_params.get("namespace"),
3554
table=iceberg_url.query_params.get("table"),
@@ -41,21 +60,35 @@ def load_catalog(self) -> Catalog:
4160
@property
4261
def catalog_properties(self):
4362
return {
44-
"type": "sql",
45-
"uri": f"sqlite:///{self.path}/pyiceberg_catalog.db", # TODO: Make it configurable?
46-
"warehouse": f"file://{self.path}",
63+
"uri": self.url.query_params.get(
64+
"catalog-uri", f"sqlite:///{self.temporary_catalog_location}/pyiceberg_catalog.db"
65+
),
66+
"token": self.url.query_params.get("catalog-token"),
67+
"warehouse": self.location, # TODO: Is the `file://` prefix faster when accessing the local filesystem?
68+
}
69+
70+
@property
71+
def storage_options(self):
72+
return {
73+
"s3.endpoint": self.url.query_params.get("s3.endpoint"),
74+
"s3.region": self.url.query_params.get("s3.region"),
75+
"s3.access-key-id": self.url.query_params.get("s3.access-key-id"),
76+
"s3.secret-access-key": self.url.query_params.get("s3.secret-access-key"),
4777
}
4878

4979
@property
5080
def identifier(self):
5181
return (self.namespace, self.table)
5282

5383
def load_table(self) -> pl.LazyFrame:
84+
"""
85+
Load a table from a catalog, or by scanning the filesystem.
86+
"""
5487
if self.catalog is not None:
5588
catalog = self.load_catalog()
5689
return catalog.load_table(self.identifier).to_polars()
5790
else:
58-
return pl.scan_iceberg(self.path)
91+
return pl.scan_iceberg(self.location, storage_options=self.storage_options)
5992

6093

6194
def from_iceberg(source_url, cratedb_url, progress: bool = False):
@@ -65,15 +98,28 @@ def from_iceberg(source_url, cratedb_url, progress: bool = False):
6598
6699
Synopsis
67100
--------
101+
102+
# Load from metadata file on filesystem.
68103
ctk load table \
69104
"file+iceberg://./var/lib/iceberg/demo/taxi_dataset/metadata/00001-79d5b044-8bce-46dd-b21c-83679a01c986.metadata.json" \
70105
--cluster-url="crate://crate@localhost:4200/demo/taxi_dataset"
106+
107+
# Load from metadata file on AWS S3.
108+
ctk load table \
109+
"s3+iceberg://bucket1/demo/taxi-tiny/metadata/00003-dd9223cb-6d11-474b-8d09-3182d45862f4.metadata.json?s3.access-key-id=<your_access_key_id>&s3.secret-access-key=<your_secret_access_key>&s3.endpoint=<endpoint_url>&s3.region=<s3-region>" \
110+
--cluster-url="crate://crate@localhost:4200/demo/taxi-tiny"
111+
112+
# Load from REST catalog on AWS S3.
113+
ctk load table \
114+
"s3+iceberg://bucket1/?catalog-uri=http://localhost:5001&catalog-token=foo&catalog=default&namespace=demo&table=taxi-tiny&s3.access-key-id=<your_access_key_id>&s3.secret-access-key=<your_secret_access_key>&s3.endpoint=<endpoint_url>&s3.region=<s3-region>" \
115+
--cluster-url="crate://crate@localhost:4200/demo/taxi-tiny3"
116+
71117
""" # noqa:E501
72118

73119
iceberg_address = IcebergAddress.from_url(source_url)
74120

75121
# Display parameters.
76-
logger.info(f"Iceberg address: Path: {iceberg_address.path}")
122+
logger.info(f"Iceberg address: Path: {iceberg_address.location}")
77123

78124
cratedb_address = DatabaseAddress.from_string(cratedb_url)
79125
cratedb_url, cratedb_table = cratedb_address.decode()
@@ -115,7 +161,11 @@ def to_iceberg(cratedb_url, target_url, progress: bool = False):
115161
ctk save table \
116162
--cluster-url="crate://crate@localhost:4200/demo/taxi_dataset" \
117163
"file+iceberg://./var/lib/iceberg/?catalog=default&namespace=demo&table=taxi_dataset"
118-
"""
164+
165+
ctk save table \
166+
--cluster-url="crate://crate@localhost:4200/demo/taxi-tiny" \
167+
"s3+iceberg://bucket1/?catalog=default&namespace=demo&table=taxi-tiny&s3.access-key-id=<your_access_key_id>&s3.secret-access-key=<your_secret_access_key>&s3.endpoint=<endpoint_url>&s3.region=<s3-region>"
168+
""" # noqa:E501
119169

120170
cratedb_address = DatabaseAddress.from_string(cratedb_url)
121171
cratedb_url, cratedb_table = cratedb_address.decode()
@@ -125,7 +175,7 @@ def to_iceberg(cratedb_url, target_url, progress: bool = False):
125175

126176
iceberg_address = IcebergAddress.from_url(target_url)
127177
logger.info(
128-
f"Iceberg address: Path: {iceberg_address.path}, "
178+
f"Iceberg address: Path: {iceberg_address.location}, "
129179
f"catalog: {iceberg_address.catalog}, namespace: {iceberg_address.namespace}, table: {iceberg_address.table}"
130180
)
131181

@@ -136,10 +186,14 @@ def to_iceberg(cratedb_url, target_url, progress: bool = False):
136186

137187
# Invoke copy operation.
138188
logger.info("Running Iceberg copy")
189+
catalog_properties = {}
190+
catalog_properties.update(iceberg_address.catalog_properties)
191+
catalog_properties.update(iceberg_address.storage_options)
139192
engine = sa.create_engine(str(cratedb_url))
140193
with engine.connect() as connection:
141194
pd.read_sql_table(table_name=cratedb_table.table, schema=cratedb_table.schema, con=connection).to_iceberg(
142195
iceberg_address.namespace + "." + iceberg_address.table,
143196
catalog_name=iceberg_address.catalog,
144-
catalog_properties=iceberg_address.catalog_properties,
197+
catalog_properties=catalog_properties,
198+
append=False, # TODO: Make available via parameter.
145199
)

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,8 @@ optional-dependencies.io-base = [
181181
"dask[dataframe]>=2020",
182182
"fsspec[http,s3]",
183183
"pandas>=1,<3.1",
184-
"polars<1.32", # Higher versions fail `ctk load table file+iceberg:`.
185-
"pyiceberg[pyarrow]<0.12",
184+
"polars<1.32", # Higher versions fail `ctk load table file+iceberg:`.
185+
"pyiceberg[adlfs,gcsfs,hive,pyarrow,s3fs]<0.12",
186186
"universal-pathlib<0.4",
187187
]
188188
optional-dependencies.io-ingestr = [

0 commit comments

Comments
 (0)