feat(cache): support local Iceberg tables for dev and CI#3564
Conversation
Enable file:// metadata URIs with validated FileIO setup, a regenerable local fixture, dev startup script without AWS, and a Rust integration test so data cache can be validated without S3. Closes kubeflow#3174
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
🎉 Welcome to the Kubeflow Trainer! 🎉 Thanks for opening your first PR! We're happy to have you as part of our community 🚀 Here's what happens next:
Join the community:
Feel free to ask questions in the comments if you need any help or clarification! |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds a local on-disk Iceberg table fixture to enable data-cache development/CI without S3, and centralizes Iceberg FileIO construction/validation.
Changes:
- Introduce a Python generator + bash runner for a local Iceberg fixture, and wire CI to install Python deps.
- Add a Rust integration test that loads a local Iceberg metadata file and scans planned files.
- Centralize
FileIOconstruction behindconfig::file_io::build_file_ioand enable Iceberg FS storage support.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/data_cache/tests/local_iceberg_fixture.rs | New integration test that generates/uses a local Iceberg fixture and validates scan planning. |
| pkg/data_cache/testdata/local_iceberg/README.md | Documents fixture generation and expected metadata location. |
| pkg/data_cache/testdata/local_iceberg/.gitignore | Prevents committing generated fixture artifacts. |
| pkg/data_cache/src/worker/worker_datasource.rs | Switches FileIO construction to shared helper. |
| pkg/data_cache/src/head/provider.rs | Switches FileIO construction to shared helper. |
| pkg/data_cache/src/config/mod.rs | Exposes new file_io module. |
| pkg/data_cache/src/config/file_io.rs | Adds build_file_io + validation and unit tests. |
| pkg/data_cache/README.md | Adds local-table workflow docs and updates prerequisites/testing notes. |
| pkg/data_cache/Cargo.toml | Enables Iceberg storage-fs and registers integration test. |
| hack/data_cache/run_with_local_table.sh | New helper script to run head/workers against local fixture. |
| hack/data_cache/generate_local_iceberg_fixture.py | New fixture generator script using PyIceberg/PyArrow. |
| Makefile | Adds fixture generation target; broadens Rust test target to include integration tests. |
| .github/workflows/test-rust.yaml | Installs Python + fixture deps so Rust tests can generate fixture in CI. |
| # Copyright The Kubeflow Authors. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| #!/bin/bash | ||
| # Start data-cache head and workers against a local on-disk Iceberg table (file://). |
| output_dir: Path = args.output_dir | ||
| warehouse = output_dir / "warehouse" | ||
| if output_dir.exists(): | ||
| shutil.rmtree(output_dir) | ||
| warehouse.mkdir(parents=True) |
| for port in 8080 8081 8082 50051 50052 50053; do | ||
| pid=$(lsof -ti :"${port}" 2>/dev/null || true) | ||
| if [[ -n "${pid}" ]]; then | ||
| kill -9 "${pid}" 2>/dev/null || true | ||
| fi | ||
| done |
| for port in 8080 8081 8082 50051 50052 50053; do | ||
| pid=$(lsof -ti :"${port}" 2>/dev/null || true) | ||
| if [[ -n "${pid}" ]]; then | ||
| kill -9 "${pid}" 2>/dev/null || true | ||
| sleep 1 | ||
| fi | ||
| done |
| pub fn build_file_io(metadata_loc: &str) -> Result<FileIO, String> { | ||
| validate_metadata_loc(metadata_loc)?; | ||
| FileIO::from_path(metadata_loc) | ||
| .map_err(|e| format!("Failed to create FileIO: {}", e))? | ||
| .build() | ||
| .map_err(|e| format!("Failed to build FileIO: {}", e)) | ||
| } |
| let table_indent = TableIdent::from_strs([schema_name, table_name]) | ||
| .map_err(|e| format!("Failed to create table ident: {}", e))?; | ||
| let static_table = |
| let table_indent = TableIdent::from_strs([schema_name, table_name]) | ||
| .map_err(|e| format!("Failed to create table ident: {}", e))?; | ||
| let static_table = StaticTable::from_metadata_file(metadata_loc, table_indent, file_io) |
| let scheme = trimmed | ||
| .split("://") | ||
| .next() | ||
| .filter(|_| trimmed.contains("://")) | ||
| .ok_or_else(|| { | ||
| format!( | ||
| "METADATA_LOC must be an absolute URI with a scheme (e.g. file://, s3://, s3a://), got: {}", | ||
| metadata_loc | ||
| ) | ||
| })?; |
| fn generate_fixture() { | ||
| let script = repo_root().join("hack/data_cache/generate_local_iceberg_fixture.py"); | ||
| let status = Command::new("python3") | ||
| .arg(&script) | ||
| .status() | ||
| .expect("failed to run python3"); | ||
| assert!( | ||
| status.success(), | ||
| "generate_local_iceberg_fixture.py failed; install pyiceberg pyarrow sqlalchemy" | ||
| ); | ||
| } |
| let mut files: Vec<PathBuf> = std::fs::read_dir(&dir) | ||
| .ok()? | ||
| .filter_map(|e| e.ok()) | ||
| .map(|e| e.path()) | ||
| .filter(|p| { | ||
| p.file_name() | ||
| .and_then(|n| n.to_str()) | ||
| .is_some_and(|n| n.ends_with(".metadata.json")) | ||
| }) | ||
| .collect(); | ||
| files.sort(); | ||
| files.pop() |
Summary
build_file_io()with scheme validation (file://,s3://,s3a://) and enable iceberg-rsstorage-fsfor local filesystem access.hack/data_cache/generate_local_iceberg_fixture.pyandrun_with_local_table.shto develop and validate data cache without AWS/S3.local_iceberg_fixture(generates fixture via Python when missing, loads Iceberg metadata, plans Parquet files).test-rustCI with Python deps for fixture generation.Closes #3174
Test plan
make test-rust(local)pre-commit run --fileson changed paths./hack/data_cache/run_with_local_table.sh+ Flight client (manual smoke)Notes
torch-distributed-with-cacheis intentionally deferred to a follow-up PR.pkg/data_cache/testdata/local_iceberg/.