diff --git a/.github/workflows/test-rust.yaml b/.github/workflows/test-rust.yaml index c259c96785..d2b370b802 100644 --- a/.github/workflows/test-rust.yaml +++ b/.github/workflows/test-rust.yaml @@ -23,5 +23,15 @@ jobs: with: workspaces: "pkg/data_cache" + - name: Setup Python + uses: actions/setup-python@v6 + with: + python-version: "3.12" + + - name: Install local Iceberg fixture dependencies + run: | + python -m pip install --upgrade pip + python -m pip install pyiceberg pyarrow sqlalchemy + - name: Run Rust unit tests run: make test-rust diff --git a/Makefile b/Makefile index e474cad909..65154046b3 100644 --- a/Makefile +++ b/Makefile @@ -205,9 +205,13 @@ test-python-integration: ## Run Python integration test. PYTHONPATH=$(PROJECT_DIR) pytest ./test/integration/initializers +.PHONY: generate-data-cache-fixture +generate-data-cache-fixture: ## Generate local on-disk Iceberg table for data cache dev/CI. + python3 ./hack/data_cache/generate_local_iceberg_fixture.py + .PHONY: test-rust -test-rust: ## Run Rust unit test. - cargo test --lib --bins --manifest-path ./pkg/data_cache/Cargo.toml +test-rust: ## Run Rust unit and integration tests (includes local Iceberg fixture test). + cargo test --manifest-path ./pkg/data_cache/Cargo.toml .PHONY: test-e2e-setup-cluster test-e2e-setup-cluster: kind ## Setup Kind cluster for e2e test. (Set GPU_CLUSTER=gpu for GPU nodes) diff --git a/hack/data_cache/generate_local_iceberg_fixture.py b/hack/data_cache/generate_local_iceberg_fixture.py new file mode 100755 index 0000000000..ef6feee974 --- /dev/null +++ b/hack/data_cache/generate_local_iceberg_fixture.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# Copyright The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Generate a minimal on-disk Iceberg table for local data-cache development and CI. + +Requires: pip install pyiceberg pyarrow sqlalchemy + +Output layout (under pkg/data_cache/testdata/local_iceberg/warehouse): + - Iceberg namespace: local + - Iceberg table: demo + - Columns: id (int), value (string) — no cache_index column + +After generation, use METADATA_LOC pointing at the latest metadata JSON file. +""" + +from __future__ import annotations + +import argparse +import shutil +import sys +from pathlib import Path + +import pyarrow as pa +from pyiceberg.catalog.sql import SqlCatalog +from pyiceberg.schema import Schema +from pyiceberg.types import IntegerType, NestedField, StringType + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + default_out = ( + Path(__file__).resolve().parents[2] / "pkg/data_cache/testdata/local_iceberg" + ) + parser.add_argument( + "--output-dir", + type=Path, + default=default_out, + help="Directory to write the warehouse and catalog DB", + ) + args = parser.parse_args() + + output_dir: Path = args.output_dir + warehouse = output_dir / "warehouse" + if output_dir.exists(): + shutil.rmtree(output_dir) + warehouse.mkdir(parents=True) + + warehouse_uri = warehouse.resolve().as_uri() + catalog_db = output_dir / "catalog.db" + + catalog = SqlCatalog( + "local_cache_fixture", + **{ + "uri": f"sqlite:///{catalog_db}", + "warehouse": warehouse_uri, + }, + ) + + schema = Schema( + NestedField(field_id=1, name="id", field_type=IntegerType(), required=True), + NestedField(field_id=2, name="value", field_type=StringType(), required=False), + ) + + catalog.create_namespace("local") + table = catalog.create_table("local.demo", schema=schema) + + data = pa.Table.from_pydict( + { + "id": [1, 2, 3], + "value": ["alpha", "beta", "gamma"], + }, + schema=pa.schema( + [ + pa.field("id", pa.int32(), nullable=False), + pa.field("value", pa.string(), nullable=True), + ] + ), + ) + table.append(data) + + metadata_files = sorted( + (warehouse / "local" / "demo" / "metadata").glob("*.metadata.json") + ) + if not metadata_files: + print("ERROR: no metadata.json produced", file=sys.stderr) + return 1 + + latest = metadata_files[-1].resolve().as_uri() + print(f"Generated local Iceberg table at {warehouse}") + print("SCHEMA_NAME=local TABLE_NAME=demo") + print(f"METADATA_LOC={latest}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/hack/data_cache/run_with_local_table.sh b/hack/data_cache/run_with_local_table.sh new file mode 100755 index 0000000000..93f6d8308b --- /dev/null +++ b/hack/data_cache/run_with_local_table.sh @@ -0,0 +1,139 @@ +# Copyright The Kubeflow Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/bin/bash +# Start data-cache head and workers against a local on-disk Iceberg table (file://). +# +# Usage: +# ./hack/data_cache/run_with_local_table.sh [environment] +# +# If the fixture is missing, runs hack/data_cache/generate_local_iceberg_fixture.py first. +# Requires: Rust/cargo, nc, curl, python3 + pyiceberg/pyarrow/sqlalchemy for generation. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" +FIXTURE_DIR="${REPO_ROOT}/pkg/data_cache/testdata/local_iceberg" +METADATA_DIR="${FIXTURE_DIR}/warehouse/local/demo/metadata" +ENVIRONMENT="${1:-LOCAL}" + +latest_metadata_file() { + ls -1 "${METADATA_DIR}"/*.metadata.json 2>/dev/null | sort | tail -1 +} + +ensure_fixture() { + if [[ -z "$(latest_metadata_file || true)" ]]; then + echo "Local Iceberg fixture not found; generating..." + python3 "${SCRIPT_DIR}/generate_local_iceberg_fixture.py" + fi + local meta + meta="$(latest_metadata_file)" + if [[ -z "${meta}" ]]; then + echo "ERROR: failed to find metadata file under ${METADATA_DIR}" >&2 + exit 1 + fi + python3 -c "import pathlib; print(pathlib.Path('${meta}').resolve().as_uri())" +} + +METADATA_LOC="$(ensure_fixture)" +TABLE_NAME="demo" +SCHEMA_NAME="local" + +echo "Metadata Location: ${METADATA_LOC}" +echo "Table Name: ${TABLE_NAME}" +echo "Schema Name: ${SCHEMA_NAME}" +echo "Environment: ${ENVIRONMENT}" + +export METADATA_LOC +export TABLE_NAME +export SCHEMA_NAME +export RUNTIME_ENV="${ENVIRONMENT}" + +cleanup() { + echo "" + echo "Stopping services..." + kill -9 "${WORKER1_PID:-}" "${WORKER2_PID:-}" "${HEAD_PID:-}" 2>/dev/null || true + wait "${WORKER1_PID:-}" "${WORKER2_PID:-}" "${HEAD_PID:-}" 2>/dev/null || true + for port in 8080 8081 8082 50051 50052 50053; do + pid=$(lsof -ti :"${port}" 2>/dev/null || true) + if [[ -n "${pid}" ]]; then + kill -9 "${pid}" 2>/dev/null || true + fi + done + exit 0 +} + +trap cleanup SIGINT SIGTERM + +cd "${REPO_ROOT}/pkg/data_cache" + +for port in 8080 8081 8082 50051 50052 50053; do + pid=$(lsof -ti :"${port}" 2>/dev/null || true) + if [[ -n "${pid}" ]]; then + kill -9 "${pid}" 2>/dev/null || true + sleep 1 + fi +done + +check_service_port() { + local host=$1 port=$2 name=$3 + echo "Waiting for ${name} on ${host}:${port}..." + while ! nc -z "${host}" "${port}" 2>/dev/null; do + sleep 2 + done +} + +check_service_ready() { + local host=$1 health_port=$2 name=$3 + echo "Checking ${name} readiness on :${health_port}..." + local count=0 + while [[ ${count} -lt 60 ]]; do + http_code=$(curl -s -o /dev/null -w "%{http_code}" --max-time 5 "http://${host}:${health_port}/ready" 2>/dev/null || echo "000") + if [[ "${http_code}" == "200" ]]; then + echo " ${name} is ready" + return 0 + fi + count=$((count + 1)) + sleep 2 + done + echo "ERROR: ${name} not ready" >&2 + return 1 +} + +echo "Starting worker 1..." +HEALTH_PORT=8081 cargo run --bin worker -- 0.0.0.0 50052 >"${REPO_ROOT}/worker1.log" 2>&1 & +WORKER1_PID=$! + +echo "Starting worker 2..." +HEALTH_PORT=8082 cargo run --bin worker -- 0.0.0.0 50053 >"${REPO_ROOT}/worker2.log" 2>&1 & +WORKER2_PID=$! + +check_service_port localhost 50052 worker1 +check_service_port localhost 50053 worker2 + +echo "Starting head..." +HEALTH_PORT=8080 cargo run --bin head -- 0.0.0.0 50051 >"${REPO_ROOT}/head.log" 2>&1 & +HEAD_PID=$! + +check_service_port localhost 50051 head +check_service_ready localhost 8081 worker1 +check_service_ready localhost 8082 worker2 +check_service_ready localhost 8080 head + +echo "" +echo "All services ready. Example client:" +echo " cd pkg/data_cache/test && cargo run -- --endpoint http://localhost:50051 --local-rank 0 --world-size 2" +echo "Press Ctrl+C to stop." +wait diff --git a/pkg/data_cache/Cargo.toml b/pkg/data_cache/Cargo.toml index 623913ba43..5bc508d857 100644 --- a/pkg/data_cache/Cargo.toml +++ b/pkg/data_cache/Cargo.toml @@ -13,7 +13,7 @@ arrow-schema = "55.0.0" serde = { version = "1.0.228", features = ["derive"] } async-trait = "0.1.89" iceberg-datafusion = "0.6.0" -iceberg = "0.6.0" +iceberg = { version = "0.6.0", features = ["storage-fs"] } futures = "0.3.32" arrow-flight = "55.0.0" tonic = "0.12.3" @@ -25,8 +25,13 @@ tower = "0.5" hyper = "1.9" [dev-dependencies] +futures = "0.3.32" tokio = { version = "1.52.3", features = ["full"] } +[[test]] +name = "local_iceberg_fixture" +path = "tests/local_iceberg_fixture.rs" + [[bin]] name = "head" path = "cmd/head/main.rs" diff --git a/pkg/data_cache/README.md b/pkg/data_cache/README.md index 439639af00..76ad0890fb 100644 --- a/pkg/data_cache/README.md +++ b/pkg/data_cache/README.md @@ -3,9 +3,9 @@ ## Prerequisites - Rust and Cargo -- AWS CLI configured with appropriate credentials -- `jq` for JSON parsing -- `nc` (netcat) for service health checks +- For **remote (S3) tables**: AWS CLI configured with appropriate credentials and `jq` +- For **local tables**: Python 3 with `pyiceberg`, `pyarrow`, and `sqlalchemy` (fixture generation only) +- `nc` (netcat) and `curl` for service health checks ## Development Setup @@ -75,9 +75,42 @@ This script will: Press `Ctrl+C` to stop all services. +### Option 2: Local Iceberg table (file://) + +Use an on-disk Iceberg table with Parquet files for local validation and CI (no AWS credentials). + +**Generate the test fixture once** (from repository root): + +```bash +python3 hack/data_cache/generate_local_iceberg_fixture.py +``` + +**Run head and workers** (generates the fixture automatically if missing): + +```bash +./hack/data_cache/run_with_local_table.sh +``` + +Default table identifiers: `SCHEMA_NAME=local`, `TABLE_NAME=demo`. The script sets `METADATA_LOC` to the latest `*.metadata.json` under `pkg/data_cache/testdata/local_iceberg/`. + +`METADATA_LOC` must be an absolute URI (`file://`, `s3://`, or `s3a://`). + ## Testing +### Run unit and integration tests + +From repository root: + +```bash +make test-rust +``` + +This includes a local Iceberg fixture integration test (`tests/local_iceberg_fixture.rs`), which regenerates the fixture via Python when needed. + ### Run Client Test + +With services running (remote or local script): + ```bash cd test cargo run --bin client -- --endpoint http://localhost:50051 --local-rank 2 --world-size 4 diff --git a/pkg/data_cache/src/config/file_io.rs b/pkg/data_cache/src/config/file_io.rs new file mode 100644 index 0000000000..4f285134a1 --- /dev/null +++ b/pkg/data_cache/src/config/file_io.rs @@ -0,0 +1,91 @@ +// Copyright The Kubeflow Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Helpers for constructing Iceberg [`FileIO`] from dataset metadata locations. + +use iceberg::io::FileIO; + +/// Supported URI schemes for Iceberg metadata and data file locations. +const ALLOWED_SCHEMES: &[&str] = &["file", "s3", "s3a"]; + +/// Builds an Iceberg [`FileIO`] for the given metadata location. +/// +/// The location must be an absolute URI with an allowed scheme (`file://`, `s3://`, or `s3a://`). +/// Scheme inference is delegated to [`FileIO::from_path`], which selects the appropriate +/// storage backend (local filesystem, S3, etc.). +pub fn build_file_io(metadata_loc: &str) -> Result { + validate_metadata_loc(metadata_loc)?; + FileIO::from_path(metadata_loc) + .map_err(|e| format!("Failed to create FileIO: {}", e))? + .build() + .map_err(|e| format!("Failed to build FileIO: {}", e)) +} + +/// Validates that `metadata_loc` is a non-empty absolute URI with an allowed scheme. +pub fn validate_metadata_loc(metadata_loc: &str) -> Result<(), String> { + let trimmed = metadata_loc.trim(); + if trimmed.is_empty() { + return Err("METADATA_LOC must not be empty".to_string()); + } + + let scheme = trimmed + .split("://") + .next() + .filter(|_| trimmed.contains("://")) + .ok_or_else(|| { + format!( + "METADATA_LOC must be an absolute URI with a scheme (e.g. file://, s3://, s3a://), got: {}", + metadata_loc + ) + })?; + + let scheme_lower = scheme.to_ascii_lowercase(); + if !ALLOWED_SCHEMES.contains(&scheme_lower.as_str()) { + return Err(format!( + "METADATA_LOC scheme '{}' is not supported; allowed schemes: {}", + scheme, + ALLOWED_SCHEMES.join(", ") + )); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn accepts_file_scheme() { + assert!(validate_metadata_loc("file:///tmp/warehouse/metadata/v1.metadata.json").is_ok()); + } + + #[test] + fn accepts_s3_schemes() { + assert!(validate_metadata_loc("s3://bucket/metadata/v1.metadata.json").is_ok()); + assert!(validate_metadata_loc("s3a://bucket/metadata/v1.metadata.json").is_ok()); + } + + #[test] + fn rejects_empty_and_relative_paths() { + assert!(validate_metadata_loc("").is_err()); + assert!(validate_metadata_loc("/tmp/metadata.json").is_err()); + assert!(validate_metadata_loc("relative/path").is_err()); + } + + #[test] + fn rejects_unsupported_scheme() { + assert!(validate_metadata_loc("http://example.com/meta.json").is_err()); + } +} diff --git a/pkg/data_cache/src/config/mod.rs b/pkg/data_cache/src/config/mod.rs index b178453d73..46794db02a 100644 --- a/pkg/data_cache/src/config/mod.rs +++ b/pkg/data_cache/src/config/mod.rs @@ -13,3 +13,4 @@ // limitations under the License. pub mod config; +pub mod file_io; diff --git a/pkg/data_cache/src/head/provider.rs b/pkg/data_cache/src/head/provider.rs index f3a5fbfa70..5eceb0b0b5 100644 --- a/pkg/data_cache/src/head/provider.rs +++ b/pkg/data_cache/src/head/provider.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::config::file_io::build_file_io; use arrow::array::{ArrayRef, GenericListBuilder, RecordBatch, StringViewBuilder, UInt64Array}; use arrow_schema::SchemaRef; use async_trait::async_trait; @@ -30,7 +31,6 @@ use futures::StreamExt; use futures::stream::iter; use iceberg::TableIdent; use iceberg::expr::Predicate; -use iceberg::io::FileIO; use iceberg::scan::{FileScanTask, FileScanTaskStream}; use iceberg::table::{StaticTable, Table}; use std::any::Any; @@ -91,10 +91,7 @@ impl DataFileTableProvider { arrow_schema: SchemaRef, num_workers: usize, ) -> Result> { - let file_io = FileIO::from_path(metadata_loc) - .map_err(|e| format!("Failed to create FileIO: {}", e))? - .build() - .map_err(|e| format!("Failed to build FileIO: {}", e))?; + let file_io = build_file_io(metadata_loc)?; let table_indent = TableIdent::from_strs([schema_name, table_name]) .map_err(|e| format!("Failed to create table ident: {}", e))?; let static_table = StaticTable::from_metadata_file(metadata_loc, table_indent, file_io) diff --git a/pkg/data_cache/src/worker/worker_datasource.rs b/pkg/data_cache/src/worker/worker_datasource.rs index 7e0cf64b31..741577c64e 100644 --- a/pkg/data_cache/src/worker/worker_datasource.rs +++ b/pkg/data_cache/src/worker/worker_datasource.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::config::config::CACHE_INDEX_COLUMN; +use crate::config::file_io::build_file_io; use arrow::array::UInt64Array; use arrow::record_batch::RecordBatch; use arrow_schema::{DataType, Field, Schema, SchemaBuilder, SchemaRef}; @@ -31,7 +32,6 @@ use datafusion::physical_plan::{ use futures::{Stream, StreamExt, TryStreamExt}; use iceberg::TableIdent; use iceberg::arrow::schema_to_arrow_schema; -use iceberg::io::FileIO; use iceberg::scan::{FileScanTask, FileScanTaskStream}; use iceberg::table::{StaticTable, Table}; use iceberg_datafusion::{from_datafusion_error, to_datafusion_error}; @@ -107,10 +107,7 @@ impl WorkerDataSource { file_urls: Vec, start_index: u64, ) -> Result> { - let file_io = FileIO::from_path(&metadata_loc) - .map_err(|e| format!("Failed to create FileIO: {}", e))? - .build() - .map_err(|e| format!("Failed to build FileIO: {}", e))?; + let file_io = build_file_io(&metadata_loc)?; let table_indent = TableIdent::from_strs([schema_name, table_name]) .map_err(|e| format!("Failed to create table ident: {}", e))?; let static_table = diff --git a/pkg/data_cache/testdata/local_iceberg/.gitignore b/pkg/data_cache/testdata/local_iceberg/.gitignore new file mode 100644 index 0000000000..7c9d611b59 --- /dev/null +++ b/pkg/data_cache/testdata/local_iceberg/.gitignore @@ -0,0 +1,3 @@ +* +!.gitignore +!README.md diff --git a/pkg/data_cache/testdata/local_iceberg/README.md b/pkg/data_cache/testdata/local_iceberg/README.md new file mode 100644 index 0000000000..ea08817f36 --- /dev/null +++ b/pkg/data_cache/testdata/local_iceberg/README.md @@ -0,0 +1,17 @@ +# Local Iceberg fixture for data cache + +This directory holds a generated on-disk Iceberg table used for local development and CI. + +**Do not commit generated files** (warehouse, `catalog.db`). Regenerate with: + +```bash +python3 hack/data_cache/generate_local_iceberg_fixture.py +``` + +Requires: `pip install pyiceberg pyarrow sqlalchemy` + +After generation: + +- `SCHEMA_NAME=local` +- `TABLE_NAME=demo` +- `METADATA_LOC` — printed by the script (latest `*.metadata.json` under `warehouse/local/demo/metadata/`) diff --git a/pkg/data_cache/tests/local_iceberg_fixture.rs b/pkg/data_cache/tests/local_iceberg_fixture.rs new file mode 100644 index 0000000000..4039594e92 --- /dev/null +++ b/pkg/data_cache/tests/local_iceberg_fixture.rs @@ -0,0 +1,105 @@ +// Copyright The Kubeflow Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Integration tests for local on-disk Iceberg tables (issue #3174). + +use futures::StreamExt; +use iceberg::TableIdent; +use iceberg::table::StaticTable; +use kubeflow_data_cache::config::file_io::build_file_io; +use std::path::{Path, PathBuf}; +use std::process::Command; + +fn repo_root() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .and_then(|p| p.parent()) + .expect("repo root") + .to_path_buf() +} + +fn fixture_metadata_dir() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("testdata/local_iceberg/warehouse/local/demo/metadata") +} + +fn generate_fixture() { + let script = repo_root().join("hack/data_cache/generate_local_iceberg_fixture.py"); + let status = Command::new("python3") + .arg(&script) + .status() + .expect("failed to run python3"); + assert!( + status.success(), + "generate_local_iceberg_fixture.py failed; install pyiceberg pyarrow sqlalchemy" + ); +} + +fn latest_metadata_path() -> Option { + let dir = fixture_metadata_dir(); + if !dir.is_dir() { + return None; + } + let mut files: Vec = std::fs::read_dir(&dir) + .ok()? + .filter_map(|e| e.ok()) + .map(|e| e.path()) + .filter(|p| { + p.file_name() + .and_then(|n| n.to_str()) + .is_some_and(|n| n.ends_with(".metadata.json")) + }) + .collect(); + files.sort(); + files.pop() +} + +fn metadata_loc_uri(path: &Path) -> String { + format!( + "file://{}", + path.canonicalize() + .expect("canonicalize metadata") + .display() + ) +} + +fn ensure_fixture_metadata() -> String { + if latest_metadata_path().is_none() { + generate_fixture(); + } + let path = latest_metadata_path().expect("metadata file after generation"); + metadata_loc_uri(&path) +} + +#[tokio::test] +async fn loads_local_iceberg_metadata_file() { + let metadata_loc = ensure_fixture_metadata(); + let file_io = build_file_io(&metadata_loc).expect("build FileIO"); + let table_ident = TableIdent::from_strs(["local", "demo"]).expect("table ident"); + let static_table = StaticTable::from_metadata_file(&metadata_loc, table_ident, file_io) + .await + .expect("load static table"); + let table = static_table.into_table(); + let scan = table.scan().build().expect("scan"); + let mut file_count = 0u32; + let mut stream = scan.plan_files().await.expect("plan files"); + while let Some(task) = stream.next().await { + let _task = task.expect("file scan task"); + file_count += 1; + } + assert!( + file_count > 0, + "expected at least one data file in local fixture" + ); +}