|
| 1 | +from pathlib import Path |
| 2 | + |
| 3 | +import pandas as pd |
| 4 | +import polars as pl |
| 5 | +import pytest |
| 6 | +from click.testing import CliRunner |
| 7 | +from pueblo.testing.dataframe import DataFrameFactory |
| 8 | +from pyiceberg.catalog import load_catalog |
| 9 | + |
| 10 | +from cratedb_toolkit.cli import cli |
| 11 | + |
| 12 | +if not hasattr(pd.DataFrame, "to_iceberg2"): |
| 13 | + raise pytest.skip("Older pandas releases do not support Apache Iceberg", allow_module_level=True) |
| 14 | + |
| 15 | + |
| 16 | +@pytest.fixture |
| 17 | +def example_iceberg(tmp_path) -> Path: |
| 18 | + catalog_properties = { |
| 19 | + "uri": f"sqlite:///{tmp_path}/pyiceberg_catalog.db", |
| 20 | + "warehouse": str(tmp_path), |
| 21 | + } |
| 22 | + catalog = load_catalog("default", **catalog_properties) |
| 23 | + catalog.create_namespace_if_not_exists("demo") |
| 24 | + |
| 25 | + dff = DataFrameFactory(rows=42) |
| 26 | + df = dff.make_mixed() |
| 27 | + df.to_iceberg( |
| 28 | + "demo.mixed", |
| 29 | + catalog_name="default", |
| 30 | + catalog_properties=catalog_properties, |
| 31 | + ) |
| 32 | + table = catalog.load_table("demo.mixed") |
| 33 | + metadata_location = find_iceberg_data_metadata_location(Path(table.location())) |
| 34 | + |
| 35 | + catalog.close() |
| 36 | + return metadata_location |
| 37 | + |
| 38 | + |
| 39 | +def find_iceberg_data_metadata_location(table_path: Path) -> Path: |
| 40 | + """ |
| 41 | + Resolve path to metadata.json file in Iceberg table. |
| 42 | + This path is needed for `polars.scan_iceberg()`. |
| 43 | + """ |
| 44 | + return sorted((table_path / "metadata").glob("*.json"))[-1] |
| 45 | + |
| 46 | + |
| 47 | +def test_load_iceberg_table(caplog, cratedb, example_iceberg): |
| 48 | + """ |
| 49 | + Verify loading data from an Iceberg table into CrateDB. |
| 50 | + """ |
| 51 | + |
| 52 | + # Source and target URLs. |
| 53 | + source_url = f"file+iceberg://{example_iceberg}" |
| 54 | + target_url = f"{cratedb.get_connection_url()}/testdrive/demo" |
| 55 | + |
| 56 | + # Run transfer command. |
| 57 | + runner = CliRunner(env={"CRATEDB_CLUSTER_URL": target_url}) |
| 58 | + result = runner.invoke( |
| 59 | + cli, |
| 60 | + args=f"load table {source_url}", |
| 61 | + catch_exceptions=False, |
| 62 | + ) |
| 63 | + assert result.exit_code == 0 |
| 64 | + |
| 65 | + # Verify data in target database. |
| 66 | + db = cratedb.database |
| 67 | + assert db.table_exists("testdrive.demo") is True, "Table `testdrive.demo` does not exist" |
| 68 | + assert db.refresh_table("testdrive.demo") is True, "Refreshing table `testdrive.demo` failed" |
| 69 | + assert db.count_records("testdrive.demo") == 5, "Table `testdrive.demo` does not include expected amount of records" |
| 70 | + |
| 71 | + |
| 72 | +def test_save_iceberg_table(caplog, cratedb, tmp_path): |
| 73 | + """ |
| 74 | + Verify saving data from CrateDB into an Iceberg table. |
| 75 | + """ |
| 76 | + |
| 77 | + # Source and target URLs. |
| 78 | + source_url = f"{cratedb.get_connection_url()}/sys/summits" |
| 79 | + target_url = f"file+iceberg://{tmp_path}/?catalog=default&namespace=sys&table=summits" |
| 80 | + |
| 81 | + # Run transfer command. |
| 82 | + runner = CliRunner(env={"CRATEDB_CLUSTER_URL": source_url}) |
| 83 | + result = runner.invoke( |
| 84 | + cli, |
| 85 | + args=f"save table {target_url}", |
| 86 | + catch_exceptions=False, |
| 87 | + ) |
| 88 | + assert result.exit_code == 0 |
| 89 | + |
| 90 | + # Verify data in Iceberg table. |
| 91 | + metadata_location = find_iceberg_data_metadata_location(tmp_path / "sys" / "summits") |
| 92 | + table = pl.scan_iceberg(str(metadata_location)) |
| 93 | + assert table.collect_schema().names() == [ |
| 94 | + "classification", |
| 95 | + "coordinates", |
| 96 | + "country", |
| 97 | + "first_ascent", |
| 98 | + "height", |
| 99 | + "mountain", |
| 100 | + "prominence", |
| 101 | + "range", |
| 102 | + "region", |
| 103 | + ] |
| 104 | + assert table.collect().height == 1605, "Iceberg table does not include expected amount of records" |
0 commit comments