Skip to content

Commit 73dfc16

Browse files
committed
I/O: Apache Iceberg (save)
1 parent 38cb22d commit 73dfc16

File tree

5 files changed

+133
-5
lines changed

5 files changed

+133
-5
lines changed

cratedb_toolkit/cli.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
from .dms.cli import cli as dms_cli
1111
from .docs.cli import cli as docs_cli
1212
from .info.cli import cli as info_cli
13-
from .io.cli import cli as io_cli
13+
from .io.cli import cli_load as io_cli_load
14+
from .io.cli import cli_save as io_cli_save
1415
from .query.cli import cli as query_cli
1516
from .settings.cli import cli as settings_cli
1617
from .shell.cli import cli as shell_cli
@@ -32,7 +33,8 @@ def cli(ctx: click.Context, verbose: bool, debug: bool):
3233
cli.add_command(cloud_cli, name="cluster")
3334
cli.add_command(docs_cli, name="docs")
3435
cli.add_command(dms_cli, name="dms")
35-
cli.add_command(io_cli, name="load")
36+
cli.add_command(io_cli_load, name="load")
37+
cli.add_command(io_cli_save, name="save")
3638
cli.add_command(query_cli, name="query")
3739
cli.add_command(rockset_cli, name="rockset")
3840
cli.add_command(shell_cli, name="shell")

cratedb_toolkit/cluster/core.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
DatabaseAddressMissingError,
2121
OperationFailed,
2222
)
23-
from cratedb_toolkit.io.iceberg import from_iceberg
23+
from cratedb_toolkit.io.iceberg import from_iceberg, to_iceberg
2424
from cratedb_toolkit.io.ingestr.api import ingestr_copy, ingestr_select
2525
from cratedb_toolkit.model import ClusterAddressOptions, DatabaseAddress, InputOutputResource, TableAddress
2626
from cratedb_toolkit.util.client import jwt_token_patch
@@ -396,6 +396,14 @@ def load_table(
396396

397397
return self
398398

399+
def save_table(
400+
self, source: TableAddress, target: InputOutputResource, transformation: t.Union[Path, None] = None
401+
) -> "ManagedCluster":
402+
"""
403+
Export data from a database table on a managed CrateDB Server.
404+
"""
405+
raise NotImplementedError("Not implemented for CrateDB Cloud yet")
406+
399407
@property
400408
def adapter(self) -> DatabaseAdapter:
401409
"""
@@ -623,6 +631,29 @@ def load_table(
623631

624632
return self
625633

634+
def save_table(
635+
self, source: TableAddress, target: InputOutputResource, transformation: t.Union[Path, None] = None
636+
) -> "StandaloneCluster":
637+
"""
638+
Export data from a database table on a standalone CrateDB Server.
639+
640+
Synopsis
641+
--------
642+
export CRATEDB_CLUSTER_URL=crate://crate@localhost:4200/testdrive/demo
643+
644+
ctk load table influxdb2://example:token@localhost:8086/testdrive/demo
645+
ctk load table mongodb://localhost:27017/testdrive/demo
646+
"""
647+
source_url = self.address.dburi
648+
target_url_obj = URL(target.url)
649+
650+
if target_url_obj.scheme.startswith("iceberg") or target_url_obj.scheme.endswith("iceberg"):
651+
return to_iceberg(source_url, target.url)
652+
else:
653+
raise NotImplementedError(f"Exporting resource not implemented yet: {target_url_obj}")
654+
655+
return self
656+
626657

627658
class DatabaseCluster:
628659
"""

cratedb_toolkit/io/cli.py

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
1919
@click.version_option()
2020
@click.pass_context
21-
def cli(ctx: click.Context, verbose: bool, debug: bool):
21+
def cli_load(ctx: click.Context, verbose: bool, debug: bool):
2222
"""
2323
Load data into CrateDB.
2424
"""
2525
return boot_click(ctx, verbose, debug)
2626

2727

28-
@make_command(cli, name="table")
28+
@make_command(cli_load, name="table")
2929
@click.argument("url")
3030
@option_cluster_id
3131
@option_cluster_name
@@ -67,3 +67,59 @@ def load_table(
6767
cluster_url=cluster_url,
6868
)
6969
cluster.load_table(source=source, target=target, transformation=transformation)
70+
71+
72+
@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type]
73+
@click.option("--verbose", is_flag=True, required=False, help="Turn on logging")
74+
@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level")
75+
@click.version_option()
76+
@click.pass_context
77+
def cli_save(ctx: click.Context, verbose: bool, debug: bool):
78+
"""
79+
Export data from CrateDB.
80+
"""
81+
return boot_click(ctx, verbose, debug)
82+
83+
84+
@make_command(cli_save, name="table")
85+
@click.argument("url")
86+
@option_cluster_id
87+
@option_cluster_name
88+
@option_cluster_url
89+
@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data")
90+
@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data")
91+
@click.option("--format", "format_", type=str, required=False, help="File format of the import resource")
92+
@click.option("--compression", type=str, required=False, help="Compression format of the import resource")
93+
@click.option("--transformation", type=Path, required=False, help="Path to Zyp transformation file")
94+
@click.pass_context
95+
def save_table(
96+
ctx: click.Context,
97+
url: str,
98+
cluster_id: str,
99+
cluster_name: str,
100+
cluster_url: str,
101+
schema: str,
102+
table: str,
103+
format_: str,
104+
compression: str,
105+
transformation: t.Union[Path, None],
106+
):
107+
"""
108+
Export data from CrateDB and CrateDB Cloud clusters.
109+
"""
110+
111+
# When `--transformation` is given, but empty, fix it.
112+
if transformation is not None and transformation.name == "":
113+
transformation = None
114+
115+
# Encapsulate source and target parameters.
116+
source = TableAddress(schema=schema, table=table)
117+
target = InputOutputResource(url=url, format=format_, compression=compression)
118+
119+
# Dispatch "load table" operation.
120+
cluster = DatabaseCluster.create(
121+
cluster_id=cluster_id,
122+
cluster_name=cluster_name,
123+
cluster_url=cluster_url,
124+
)
125+
cluster.save_table(source=source, target=target, transformation=transformation)

cratedb_toolkit/io/iceberg.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import dataclasses
22
import logging
33

4+
import pandas as pd
45
import polars as pl
56
import sqlalchemy as sa
67
from boltons.urlutils import URL
@@ -105,3 +106,40 @@ def from_iceberg(source_url, cratedb_url, progress: bool = False):
105106
# Note: This was much slower.
106107
# table.to_polars().collect(streaming=True) \ # noqa: ERA001
107108
# .write_database(table_name=table_address.fullname, connection=engine, if_table_exists="replace")
109+
110+
111+
def to_iceberg(cratedb_url, target_url, progress: bool = False):
112+
"""
113+
Synopsis
114+
--------
115+
ctk save table \
116+
--cluster-url="crate://crate@localhost:4200/demo/taxi_dataset" \
117+
"file+iceberg://./var/lib/iceberg/?catalog=default&namespace=demo&table=taxi_dataset"
118+
"""
119+
120+
cratedb_address = DatabaseAddress.from_string(cratedb_url)
121+
cratedb_url, cratedb_table = cratedb_address.decode()
122+
if cratedb_table.table is None:
123+
raise ValueError("Table name is missing. Please adjust CrateDB database URL.")
124+
logger.info(f"Source address: {cratedb_address}")
125+
126+
iceberg_address = IcebergAddress.from_url(target_url)
127+
logger.info(
128+
f"Iceberg address: Path: {iceberg_address.path}, "
129+
f"catalog: {iceberg_address.catalog}, namespace: {iceberg_address.namespace}, table: {iceberg_address.table}"
130+
)
131+
132+
# Prepare namespace.
133+
catalog = iceberg_address.load_catalog()
134+
catalog.create_namespace_if_not_exists(iceberg_address.namespace)
135+
catalog.close()
136+
137+
# Invoke copy operation.
138+
logger.info("Running Iceberg copy")
139+
engine = sa.create_engine(str(cratedb_url))
140+
with engine.connect() as connection:
141+
pd.read_sql_table(table_name=cratedb_table.table, schema=cratedb_table.schema, con=connection).to_iceberg(
142+
iceberg_address.namespace + "." + iceberg_address.table,
143+
catalog_name=iceberg_address.catalog,
144+
catalog_properties=iceberg_address.catalog_properties,
145+
)

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ optional-dependencies.io = [
176176
"sqlalchemy>=2",
177177
]
178178
optional-dependencies.io-base = [
179+
"connectorx<0.5",
179180
"cr8",
180181
"dask[dataframe]>=2020",
181182
"fsspec[http,s3]",

0 commit comments

Comments
 (0)