Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/test-rust.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,15 @@ jobs:
with:
workspaces: "pkg/data_cache"

- name: Setup Python
uses: actions/setup-python@v6
with:
python-version: "3.12"

- name: Install local Iceberg fixture dependencies
run: |
python -m pip install --upgrade pip
python -m pip install pyiceberg pyarrow sqlalchemy

- name: Run Rust unit tests
run: make test-rust
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,13 @@ test-python-integration: ## Run Python integration test.

PYTHONPATH=$(PROJECT_DIR) pytest ./test/integration/initializers

.PHONY: generate-data-cache-fixture
generate-data-cache-fixture: ## Generate local on-disk Iceberg table for data cache dev/CI.
python3 ./hack/data_cache/generate_local_iceberg_fixture.py

.PHONY: test-rust
test-rust: ## Run Rust unit test.
cargo test --lib --bins --manifest-path ./pkg/data_cache/Cargo.toml
test-rust: ## Run Rust unit and integration tests (includes local Iceberg fixture test).
cargo test --manifest-path ./pkg/data_cache/Cargo.toml

.PHONY: test-e2e-setup-cluster
test-e2e-setup-cluster: kind ## Setup Kind cluster for e2e test. (Set GPU_CLUSTER=gpu for GPU nodes)
Expand Down
108 changes: 108 additions & 0 deletions hack/data_cache/generate_local_iceberg_fixture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env python3
# Copyright The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Generate a minimal on-disk Iceberg table for local data-cache development and CI.

Requires: pip install pyiceberg pyarrow sqlalchemy

Output layout (under pkg/data_cache/testdata/local_iceberg/warehouse):
- Iceberg namespace: local
- Iceberg table: demo
- Columns: id (int), value (string) — no cache_index column

After generation, use METADATA_LOC pointing at the latest metadata JSON file.
"""

from __future__ import annotations

import argparse
import shutil
import sys
from pathlib import Path

import pyarrow as pa
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.schema import Schema
from pyiceberg.types import IntegerType, NestedField, StringType


def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
default_out = (
Path(__file__).resolve().parents[2] / "pkg/data_cache/testdata/local_iceberg"
)
parser.add_argument(
"--output-dir",
type=Path,
default=default_out,
help="Directory to write the warehouse and catalog DB",
)
args = parser.parse_args()

output_dir: Path = args.output_dir
warehouse = output_dir / "warehouse"
if output_dir.exists():
shutil.rmtree(output_dir)
warehouse.mkdir(parents=True)
Comment on lines +54 to +58

warehouse_uri = warehouse.resolve().as_uri()
catalog_db = output_dir / "catalog.db"

catalog = SqlCatalog(
"local_cache_fixture",
**{
"uri": f"sqlite:///{catalog_db}",
"warehouse": warehouse_uri,
},
)

schema = Schema(
NestedField(field_id=1, name="id", field_type=IntegerType(), required=True),
NestedField(field_id=2, name="value", field_type=StringType(), required=False),
)

catalog.create_namespace("local")
table = catalog.create_table("local.demo", schema=schema)

data = pa.Table.from_pydict(
{
"id": [1, 2, 3],
"value": ["alpha", "beta", "gamma"],
},
schema=pa.schema(
[
pa.field("id", pa.int32(), nullable=False),
pa.field("value", pa.string(), nullable=True),
]
),
)
table.append(data)

metadata_files = sorted(
(warehouse / "local" / "demo" / "metadata").glob("*.metadata.json")
)
if not metadata_files:
print("ERROR: no metadata.json produced", file=sys.stderr)
return 1

latest = metadata_files[-1].resolve().as_uri()
print(f"Generated local Iceberg table at {warehouse}")
print("SCHEMA_NAME=local TABLE_NAME=demo")
print(f"METADATA_LOC={latest}")
return 0


if __name__ == "__main__":
sys.exit(main())
139 changes: 139 additions & 0 deletions hack/data_cache/run_with_local_table.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#!/bin/bash
# Start data-cache head and workers against a local on-disk Iceberg table (file://).
Comment on lines +1 to +16
#
# Usage:
# ./hack/data_cache/run_with_local_table.sh [environment]
#
# If the fixture is missing, runs hack/data_cache/generate_local_iceberg_fixture.py first.
# Requires: Rust/cargo, nc, curl, python3 + pyiceberg/pyarrow/sqlalchemy for generation.

set -euo pipefail

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)"
FIXTURE_DIR="${REPO_ROOT}/pkg/data_cache/testdata/local_iceberg"
METADATA_DIR="${FIXTURE_DIR}/warehouse/local/demo/metadata"
ENVIRONMENT="${1:-LOCAL}"

latest_metadata_file() {
ls -1 "${METADATA_DIR}"/*.metadata.json 2>/dev/null | sort | tail -1
}

ensure_fixture() {
if [[ -z "$(latest_metadata_file || true)" ]]; then
echo "Local Iceberg fixture not found; generating..."
python3 "${SCRIPT_DIR}/generate_local_iceberg_fixture.py"
fi
local meta
meta="$(latest_metadata_file)"
if [[ -z "${meta}" ]]; then
echo "ERROR: failed to find metadata file under ${METADATA_DIR}" >&2
exit 1
fi
python3 -c "import pathlib; print(pathlib.Path('${meta}').resolve().as_uri())"
}

METADATA_LOC="$(ensure_fixture)"
TABLE_NAME="demo"
SCHEMA_NAME="local"

echo "Metadata Location: ${METADATA_LOC}"
echo "Table Name: ${TABLE_NAME}"
echo "Schema Name: ${SCHEMA_NAME}"
echo "Environment: ${ENVIRONMENT}"

export METADATA_LOC
export TABLE_NAME
export SCHEMA_NAME
export RUNTIME_ENV="${ENVIRONMENT}"

cleanup() {
echo ""
echo "Stopping services..."
kill -9 "${WORKER1_PID:-}" "${WORKER2_PID:-}" "${HEAD_PID:-}" 2>/dev/null || true
wait "${WORKER1_PID:-}" "${WORKER2_PID:-}" "${HEAD_PID:-}" 2>/dev/null || true
for port in 8080 8081 8082 50051 50052 50053; do
pid=$(lsof -ti :"${port}" 2>/dev/null || true)
if [[ -n "${pid}" ]]; then
kill -9 "${pid}" 2>/dev/null || true
fi
done
Comment on lines +69 to +74
exit 0
}

trap cleanup SIGINT SIGTERM

cd "${REPO_ROOT}/pkg/data_cache"

for port in 8080 8081 8082 50051 50052 50053; do
pid=$(lsof -ti :"${port}" 2>/dev/null || true)
if [[ -n "${pid}" ]]; then
kill -9 "${pid}" 2>/dev/null || true
sleep 1
fi
done
Comment on lines +82 to +88

check_service_port() {
local host=$1 port=$2 name=$3
echo "Waiting for ${name} on ${host}:${port}..."
while ! nc -z "${host}" "${port}" 2>/dev/null; do
sleep 2
done
}

check_service_ready() {
local host=$1 health_port=$2 name=$3
echo "Checking ${name} readiness on :${health_port}..."
local count=0
while [[ ${count} -lt 60 ]]; do
http_code=$(curl -s -o /dev/null -w "%{http_code}" --max-time 5 "http://${host}:${health_port}/ready" 2>/dev/null || echo "000")
if [[ "${http_code}" == "200" ]]; then
echo " ${name} is ready"
return 0
fi
count=$((count + 1))
sleep 2
done
echo "ERROR: ${name} not ready" >&2
return 1
}

echo "Starting worker 1..."
HEALTH_PORT=8081 cargo run --bin worker -- 0.0.0.0 50052 >"${REPO_ROOT}/worker1.log" 2>&1 &
WORKER1_PID=$!

echo "Starting worker 2..."
HEALTH_PORT=8082 cargo run --bin worker -- 0.0.0.0 50053 >"${REPO_ROOT}/worker2.log" 2>&1 &
WORKER2_PID=$!

check_service_port localhost 50052 worker1
check_service_port localhost 50053 worker2

echo "Starting head..."
HEALTH_PORT=8080 cargo run --bin head -- 0.0.0.0 50051 >"${REPO_ROOT}/head.log" 2>&1 &
HEAD_PID=$!

check_service_port localhost 50051 head
check_service_ready localhost 8081 worker1
check_service_ready localhost 8082 worker2
check_service_ready localhost 8080 head

echo ""
echo "All services ready. Example client:"
echo " cd pkg/data_cache/test && cargo run -- --endpoint http://localhost:50051 --local-rank 0 --world-size 2"
echo "Press Ctrl+C to stop."
wait
7 changes: 6 additions & 1 deletion pkg/data_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ arrow-schema = "55.0.0"
serde = { version = "1.0.228", features = ["derive"] }
async-trait = "0.1.89"
iceberg-datafusion = "0.6.0"
iceberg = "0.6.0"
iceberg = { version = "0.6.0", features = ["storage-fs"] }
futures = "0.3.32"
arrow-flight = "55.0.0"
tonic = "0.12.3"
Expand All @@ -25,8 +25,13 @@ tower = "0.5"
hyper = "1.9"

[dev-dependencies]
futures = "0.3.32"
tokio = { version = "1.52.3", features = ["full"] }

[[test]]
name = "local_iceberg_fixture"
path = "tests/local_iceberg_fixture.rs"

[[bin]]
name = "head"
path = "cmd/head/main.rs"
Expand Down
39 changes: 36 additions & 3 deletions pkg/data_cache/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
## Prerequisites

- Rust and Cargo
- AWS CLI configured with appropriate credentials
- `jq` for JSON parsing
- `nc` (netcat) for service health checks
- For **remote (S3) tables**: AWS CLI configured with appropriate credentials and `jq`
- For **local tables**: Python 3 with `pyiceberg`, `pyarrow`, and `sqlalchemy` (fixture generation only)
- `nc` (netcat) and `curl` for service health checks


## Development Setup
Expand Down Expand Up @@ -75,9 +75,42 @@ This script will:

Press `Ctrl+C` to stop all services.

### Option 2: Local Iceberg table (file://)

Use an on-disk Iceberg table with Parquet files for local validation and CI (no AWS credentials).

**Generate the test fixture once** (from repository root):

```bash
python3 hack/data_cache/generate_local_iceberg_fixture.py
```

**Run head and workers** (generates the fixture automatically if missing):

```bash
./hack/data_cache/run_with_local_table.sh
```

Default table identifiers: `SCHEMA_NAME=local`, `TABLE_NAME=demo`. The script sets `METADATA_LOC` to the latest `*.metadata.json` under `pkg/data_cache/testdata/local_iceberg/`.

`METADATA_LOC` must be an absolute URI (`file://`, `s3://`, or `s3a://`).

## Testing

### Run unit and integration tests

From repository root:

```bash
make test-rust
```

This includes a local Iceberg fixture integration test (`tests/local_iceberg_fixture.rs`), which regenerates the fixture via Python when needed.

### Run Client Test

With services running (remote or local script):

```bash
cd test
cargo run --bin client -- --endpoint http://localhost:50051 --local-rank 2 --world-size 4
Expand Down
Loading