Skip to content

Commit ff16125

Browse files
committed
I/O: Apache Iceberg (support more catalogs and storage backends)
1 parent 7e44958 commit ff16125

File tree

4 files changed

+90
-22
lines changed

4 files changed

+90
-22
lines changed

cratedb_toolkit/io/iceberg.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import logging
1010
import tempfile
1111
from copy import copy
12-
from typing import Optional
12+
from typing import Dict, List, Optional
1313

1414
import pandas as pd
1515
import polars as pl
@@ -83,37 +83,45 @@ def load_catalog(self) -> Catalog:
8383
"""
8484
Load the Iceberg catalog with appropriate configuration.
8585
"""
86-
# TODO: Consider accepting catalog configuration as parameters
87-
# to support different catalog types (Hive, REST, etc.).
8886
return load_catalog(self.catalog, **self.catalog_properties)
8987

9088
@property
9189
def catalog_properties(self):
9290
"""
9391
Provide Iceberg catalog properties.
92+
https://py.iceberg.apache.org/configuration/#catalogs
9493
https://py.iceberg.apache.org/reference/pyiceberg/catalog/
9594
"""
96-
return {
95+
opts = {
9796
"uri": self.url.query_params.get(
9897
"catalog-uri", f"sqlite:///{self.temporary_catalog_location}/pyiceberg_catalog.db"
9998
),
99+
"credential": self.url.query_params.get("catalog-credential"),
100100
"token": self.url.query_params.get("catalog-token"),
101+
"type": self.url.query_params.get("catalog-type"),
101102
"warehouse": self.location, # TODO: Is the `file://` prefix faster when accessing the local filesystem?
102103
}
104+
prefixes = ["dynamodb.", "gcp.", "glue."]
105+
opts.update(self.collect_properties(self.url.query_params, prefixes))
106+
return opts
103107

104108
@property
105109
def storage_options(self):
106-
opts = {
107-
"s3.endpoint": self.url.query_params.get("s3.endpoint"),
108-
"s3.region": self.url.query_params.get("s3.region"),
109-
"s3.access-key-id": self.url.query_params.get("s3.access-key-id"),
110-
"s3.secret-access-key": self.url.query_params.get("s3.secret-access-key"),
111-
}
112-
return {k: v for k, v in opts.items() if v is not None}
113110
"""
114111
Provide Iceberg storage properties.
115112
https://py.iceberg.apache.org/configuration/#fileio
116113
"""
114+
prefixes = ["adls.", "gcs.", "hdfs.", "hf.", "s3."]
115+
return self.collect_properties(self.url.query_params, prefixes)
116+
117+
@staticmethod
118+
def collect_properties(query_params: Dict, prefixes: List) -> Dict[str, str]:
119+
opts = {}
120+
for name, value in query_params.items():
121+
for prefix in prefixes:
122+
if name.startswith(prefix) and value is not None:
123+
opts[name] = value
124+
return opts
117125

118126
@property
119127
def identifier(self):

doc/backlog/io.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,7 @@
1717

1818
- Auth!
1919
https://github.com/apache/iceberg/issues/13550
20+
21+
- Can the postgresql catalog implementation be used with CrateDB?
22+
23+
- Verify with other catalogs: https://py.iceberg.apache.org/configuration/#common-integrations-examples

doc/io/iceberg/index.md

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,17 @@
66

77
Import and export data into/from [Apache Iceberg] tables, for humans and machines.
88

9-
Iceberg works with the concept of a [FileIO] which is a pluggable module for
10-
reading, writing, and deleting files. It supports different backends like
11-
S3, HDFS, Azure Data Lake, Google Cloud Storage, Alibaba Cloud Object Storage,
12-
and Hugging Face.
13-
149
## Synopsis
1510

16-
- Load from Iceberg table: `ctk load table file+iceberg://...`,
17-
`ctk load table s3+iceberg://...`
11+
Load from Iceberg table:
12+
```shell
13+
ctk load table {file,s3,abfs,gs,hdfs}+iceberg://...
14+
```
1815

19-
- Export to Iceberg table: `ctk save table file+iceberg://...`
16+
Export to Iceberg table:
17+
```shell
18+
ctk save table {file,s3,abfs,gs,hdfs}+iceberg://...
19+
```
2020

2121
## Install
2222

@@ -32,6 +32,14 @@ other operating systems.
3232

3333
## Usage
3434

35+
Iceberg works with the concept of a [FileIO] which is a pluggable module for
36+
reading, writing, and deleting files. It supports different backends like
37+
S3, HDFS, Azure Data Lake, Google Cloud Storage, Alibaba Cloud Object Storage,
38+
and Hugging Face.
39+
40+
Please look up available configuration parameters in the reference documentation,
41+
otherwise derive your ETL commands from the examples shared below.
42+
3543
### Load
3644

3745
Load from metadata file on filesystem.
@@ -48,18 +56,55 @@ ctk load table \
4856
--cluster-url="crate://crate:crate@localhost:4200/demo/taxi-tiny"
4957
```
5058

51-
Load from REST catalog and AWS S3 storage.
59+
Use REST catalog and AWS S3 storage.
5260
```shell
5361
ctk load table \
5462
"s3+iceberg://bucket1/?catalog-uri=http://iceberg-catalog.example.org:5000&catalog-token=foo&catalog=default&namespace=demo&table=taxi-tiny&s3.access-key-id=<your_access_key_id>&s3.secret-access-key=<your_secret_access_key>&s3.endpoint=<endpoint_url>&s3.region=<s3-region>" \
5563
--cluster-url="crate://crate:crate@localhost:4200/demo/taxi-tiny"
5664
```
5765

58-
Query data in CrateDB.
66+
Use catalog in Apache Hive.
67+
```shell
68+
ctk load table "s3+iceberg://bucket1/?catalog-uri=thrift://localhost:9083/&catalog-credential=t-1234:secret&..."
69+
```
70+
71+
Use catalog in AWS Glue.
72+
```shell
73+
ctk load table "s3+iceberg://bucket1/?catalog-type=glue&glue.id=foo&glue.profile-name=bar&glue.region=region&glue.access-key-id=key&glue.secret-access-key=secret&..."
74+
```
75+
76+
Use catalog in Google BigQuery.
77+
```shell
78+
ctk load table "s3+iceberg://bucket1/?catalog-type=bigquery&gcp.bigquery.project-id=foo&..."
79+
```
80+
81+
Use catalog in DynamoDB.
82+
```shell
83+
ctk load table "s3+iceberg://bucket1/?catalog-type=dynamodb&dynamodb.profile-name=foo&dynamodb.region=bar&dynamodb.access-key-id=key&dynamodb.secret-access-key=secret&..."
84+
```
85+
86+
Load data from Azure Data Lake Storage.
87+
```shell
88+
ctk load table "abfs+iceberg://container/path/?adls.account-name=devstoreaccount1&adls.account-key=foo&..."
89+
```
90+
91+
Load data from Google Cloud Storage.
92+
```shell
93+
ctk load table "gs+iceberg://bucket?gcs.project-id=..."
94+
```
95+
96+
Load data from HDFS Storage.
97+
```shell
98+
ctk load table "hdfs+iceberg://path?hdfs.host=https://10.0.19.25/&hdfs.port=9000&hdfs.user=&hdfs.kerberos_ticket="
99+
```
100+
101+
:::{tip}
102+
After loading your data into CrateDB, query it.
59103
```shell
60104
ctk shell --command 'SELECT * FROM demo."taxi-tiny";'
61105
ctk show table 'demo."taxi-tiny"'
62106
```
107+
:::
63108

64109
### Save
65110

@@ -77,6 +122,8 @@ ctk save table \
77122
"s3+iceberg://bucket1/?catalog=default&namespace=demo&table=taxi-tiny&s3.access-key-id=<your_access_key_id>&s3.secret-access-key=<your_secret_access_key>&s3.endpoint=<endpoint_url>&s3.region=<s3-region>"
78123
```
79124

125+
For other target URLs, see "Source" section.
126+
80127
### Cloud
81128

82129
A canonical invocation for copying data from an Iceberg table on AWS S3 to CrateDB Cloud.
@@ -141,7 +188,16 @@ to a truthy value, save operations will append to an existing table.
141188
ctk save table "file+iceberg://./var/lib/iceberg/?...&append=true"
142189
```
143190

191+
#### PyIceberg
192+
193+
The PyIceberg I/O adapters accept a plethora of options that can be used 1:1.
194+
For a list of all available options, please consult the [FileIO] documentation.
195+
For I/O adapters not part of the documentation yet, please consult the source
196+
code about [catalog options] and [storage options].
197+
144198

145199
[Apache Iceberg]: https://iceberg.apache.org/
200+
[catalog options]: https://github.com/apache/iceberg-python/tree/main/pyiceberg/catalog
146201
[FileIO]: https://py.iceberg.apache.org/configuration/#fileio
202+
[storage options]: https://github.com/apache/iceberg-python/tree/main/pyiceberg/io
147203
[uv]: https://docs.astral.sh/uv/

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ optional-dependencies.io-base = [
182182
"fsspec[http,s3]",
183183
"pandas>=1,<3.1",
184184
"polars<1.39",
185-
"pyiceberg[adlfs,gcsfs,hive,pyarrow,s3fs]<0.12",
185+
"pyiceberg[adlfs,bigquery,dynamodb,gcsfs,glue,hive,pyarrow,s3fs]<0.12",
186186
"universal-pathlib<0.4",
187187
]
188188
optional-dependencies.io-ingestr = [

0 commit comments

Comments
 (0)