Skip to content

Commit dc2f4a0

Browse files
committed
I/O: Apache Iceberg (tests)
1 parent 039c127 commit dc2f4a0

File tree

1 file changed

+100
-0
lines changed

1 file changed

+100
-0
lines changed

tests/io/test_iceberg.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
from pathlib import Path
2+
3+
import polars as pl
4+
import pytest
5+
from click.testing import CliRunner
6+
from pueblo.testing.dataframe import DataFrameFactory
7+
from pyiceberg.catalog import load_catalog
8+
9+
from cratedb_toolkit.cli import cli
10+
11+
12+
@pytest.fixture
13+
def example_iceberg(tmp_path) -> Path:
14+
catalog_properties = {
15+
"uri": f"sqlite:///{tmp_path}/pyiceberg_catalog.db",
16+
"warehouse": str(tmp_path),
17+
}
18+
catalog = load_catalog("default", **catalog_properties)
19+
catalog.create_namespace_if_not_exists("demo")
20+
21+
dff = DataFrameFactory(rows=42)
22+
df = dff.make_mixed()
23+
df.to_iceberg(
24+
"demo.mixed",
25+
catalog_name="default",
26+
catalog_properties=catalog_properties,
27+
)
28+
table = catalog.load_table("demo.mixed")
29+
metadata_location = find_iceberg_data_metadata_location(Path(table.location()))
30+
31+
catalog.close()
32+
return metadata_location
33+
34+
35+
def find_iceberg_data_metadata_location(table_path: Path) -> Path:
36+
"""
37+
Resolve path to metadata.json file in Iceberg table.
38+
This path is needed for `polars.scan_iceberg()`.
39+
"""
40+
return sorted((table_path / "metadata").glob("*.json"))[-1]
41+
42+
43+
def test_load_iceberg_table(caplog, cratedb, example_iceberg):
44+
"""
45+
Verify loading data from an Iceberg table into CrateDB.
46+
"""
47+
48+
# Source and target URLs.
49+
source_url = f"file+iceberg://{example_iceberg}"
50+
target_url = f"{cratedb.get_connection_url()}/testdrive/demo"
51+
52+
# Run transfer command.
53+
runner = CliRunner(env={"CRATEDB_CLUSTER_URL": target_url})
54+
result = runner.invoke(
55+
cli,
56+
args=f"load table {source_url}",
57+
catch_exceptions=False,
58+
)
59+
assert result.exit_code == 0
60+
61+
# Verify data in target database.
62+
db = cratedb.database
63+
assert db.table_exists("testdrive.demo") is True, "Table `testdrive.demo` does not exist"
64+
assert db.refresh_table("testdrive.demo") is True, "Refreshing table `testdrive.demo` failed"
65+
assert db.count_records("testdrive.demo") == 5, "Table `testdrive.demo` does not include expected amount of records"
66+
67+
68+
def test_save_iceberg_table(caplog, cratedb, tmp_path):
69+
"""
70+
Verify saving data from CrateDB into an Iceberg table.
71+
"""
72+
73+
# Source and target URLs.
74+
source_url = f"{cratedb.get_connection_url()}/sys/summits"
75+
target_url = f"file+iceberg://{tmp_path}/?catalog=default&namespace=sys&table=summits"
76+
77+
# Run transfer command.
78+
runner = CliRunner(env={"CRATEDB_CLUSTER_URL": source_url})
79+
result = runner.invoke(
80+
cli,
81+
args=f"save table {target_url}",
82+
catch_exceptions=False,
83+
)
84+
assert result.exit_code == 0
85+
86+
# Verify data in Iceberg table.
87+
metadata_location = find_iceberg_data_metadata_location(tmp_path / "sys" / "summits")
88+
table = pl.scan_iceberg(str(metadata_location))
89+
assert table.collect_schema().names() == [
90+
"classification",
91+
"coordinates",
92+
"country",
93+
"first_ascent",
94+
"height",
95+
"mountain",
96+
"prominence",
97+
"range",
98+
"region",
99+
]
100+
assert table.collect().height == 1605, "Iceberg table does not include expected amount of records"

0 commit comments

Comments
 (0)