diff --git a/.dockerignore b/.dockerignore index acbf458949d..bc539a8e60c 100644 --- a/.dockerignore +++ b/.dockerignore @@ -6,3 +6,5 @@ env offline_build/ !offline_build/arrow !offline_build/ibis +ui/node_modules +sdk/python/feast/ui/node_modules diff --git a/Makefile b/Makefile index 58c99b8c984..77c7466a433 100644 --- a/Makefile +++ b/Makefile @@ -683,6 +683,15 @@ build-feature-server-docker: ## Build Feature Server Docker image -f sdk/python/feast/infra/feature_servers/multicloud/Dockerfile \ $(if $(filter true,$(DOCKER_PUSH)),--push,--load) sdk/python/feast/infra/feature_servers/multicloud +push-feature-server-experimental-docker: ## Push experimental Python 3.14 free-threaded Feature Server Docker image + docker push $(REGISTRY)/feature-server:$(VERSION)-python314t-experimental + +build-feature-server-experimental-docker: ## Build experimental Python 3.14 free-threaded Feature Server Docker image + docker buildx build $(if $(DOCKER_PLATFORMS),--platform $(DOCKER_PLATFORMS),) \ + -t $(REGISTRY)/feature-server:$(VERSION)-python314t-experimental \ + -f sdk/python/feast/infra/feature_servers/multicloud/Dockerfile.experimental \ + $(if $(filter true,$(DOCKER_PUSH)),--push,--load) . + push-feature-transformation-server-docker: ## Push Feature Transformation Server Docker image docker push $(REGISTRY)/feature-transformation-server:$(VERSION) @@ -733,6 +742,11 @@ build-feature-server-dev: ## Build Feature Server Dev Docker image -t feastdev/feature-server:dev \ -f sdk/python/feast/infra/feature_servers/multicloud/Dockerfile.dev --load . +benchmark-feature-server-experimental: ## Benchmark stable vs experimental feature server containers + uv run python infra/scripts/benchmark_feature_server_experimental.py \ + --baseline-image $(REGISTRY)/feature-server:$(VERSION) \ + --experimental-image $(REGISTRY)/feature-server:$(VERSION)-python314t-experimental + build-feature-server-dev-docker: ## Build Feature Server Dev Docker image docker buildx build $(if $(DOCKER_PLATFORMS),--platform $(DOCKER_PLATFORMS),) \ -t $(REGISTRY)/feature-server:$(VERSION) \ diff --git a/docs/project/development-guide.md b/docs/project/development-guide.md index ee5cc8cfcce..cfae3579b20 100644 --- a/docs/project/development-guide.md +++ b/docs/project/development-guide.md @@ -166,6 +166,18 @@ make compile-protos-python docker build -t docker-whale -f ./sdk/python/feast/infra/feature_servers/multicloud/Dockerfile . ``` +### Building the experimental Python 3.14 free-threaded feature server image +```sh +make build-feature-server-experimental-docker + +# Optional: compare it to the stable image with the sample local repo +make benchmark-feature-server-experimental +``` + +This experimental image is currently aimed at early compatibility and performance testing for the Python feature server. The benchmark target compares the stable and experimental containers against the sample local repo and is currently most representative for SQLite-backed serving experiments. + +In the current SQLite benchmark run, the experimental `3.14t` image reached `157.01 req/s` versus `187.65 req/s` for the stable image, while improving p95 latency from `274.72 ms` to `262.61 ms`. Treat that as an early data point rather than a universal result: the free-threaded stack is still evolving, and dependency behavior can outweigh any no-GIL gains on a given workload. + ### Code Style and Linting Feast Python SDK and CLI codebase: - Conforms to [Black code style](https://black.readthedocs.io/en/stable/the_black_code_style/current_style.html) diff --git a/docs/reference/feature-servers/python-feature-server.md b/docs/reference/feature-servers/python-feature-server.md index 654c4b9f938..6d99d529101 100644 --- a/docs/reference/feature-servers/python-feature-server.md +++ b/docs/reference/feature-servers/python-feature-server.md @@ -53,6 +53,32 @@ Key performance options: - Use HTTP health checks instead of TCP for better application-level monitoring - Consider horizontal pod autoscaling based on request latency metrics +### Experimental Python 3.14 Free-Threaded Container + +Feast also ships an experimental feature-server container build for Python 3.14 free-threaded (`3.14t`) runtimes. This image is intended for benchmarking and early compatibility validation, not as the default production image. + +Today, the experimental image is best treated as a SQLite-oriented serving experiment. The online-serving startup path has been trimmed to avoid eagerly importing `pandas`, which makes it a better fit for free-threaded serving benchmarks, but the broader Feast runtime and some offline-oriented paths still rely on dependencies that are not fully optimized for no-GIL execution yet. + +```bash +# Build the stable image +make build-feature-server-docker + +# Build the experimental no-GIL image +make build-feature-server-experimental-docker + +# Compare the two images against the same sample feature repo +make benchmark-feature-server-experimental +``` + +The benchmark target writes a JSON report to `feature-server-experimental-benchmark.json` with throughput and latency comparisons for the stable and experimental containers. The current harness uses the sample local feature repo and is most useful for side-by-side SQLite serving comparisons. Because the free-threaded Python ecosystem is still evolving, treat the result as a workload-specific experiment and validate dependency compatibility before promoting it further. + +On the current SQLite benchmark run that seeded this experiment, the experimental `3.14t` image delivered lower throughput but slightly better tail latency than the stable image: + +- Stable image: `187.65 req/s`, `152.17 ms` mean latency, `274.72 ms` p95 latency, `35` failed requests +- Experimental image: `157.01 req/s`, `183.21 ms` mean latency, `262.61 ms` p95 latency, `31` failed requests + +That works out to about `16%` lower throughput for the experimental image, alongside about `4.6%` better p95 latency on this workload. In other words, the current no-GIL image is not yet a clear performance win for SQLite online serving, but it is now easy to benchmark and iterate on as dependencies improve. + ## Deploying as a service See [this](../../how-to-guides/running-feast-in-production.md#id-4.2.-deploy-feast-feature-servers-on-kubernetes) for an example on how to run Feast on Kubernetes using the Operator. @@ -536,4 +562,4 @@ The [PyTorch NLP template](https://github.com/feast-dev/feast/tree/main/sdk/pyth ## How to configure Authentication and Authorization ? -Please refer the [page](./../../../docs/getting-started/concepts/permission.md) for more details on how to configure authentication and authorization. \ No newline at end of file +Please refer the [page](./../../../docs/getting-started/concepts/permission.md) for more details on how to configure authentication and authorization. diff --git a/infra/scripts/benchmark_feature_server_experimental.py b/infra/scripts/benchmark_feature_server_experimental.py new file mode 100644 index 00000000000..5d6e29bc413 --- /dev/null +++ b/infra/scripts/benchmark_feature_server_experimental.py @@ -0,0 +1,365 @@ +#!/usr/bin/env python3 +"""Benchmark stable and experimental feature-server containers side by side.""" + +from __future__ import annotations + +import argparse +import concurrent.futures +import http.client +import json +import shutil +import statistics +import subprocess +import tempfile +import time +import urllib.error +import urllib.request +from dataclasses import asdict, dataclass +from datetime import datetime, timezone +from pathlib import Path + + +DEFAULT_REQUEST = { + "features": [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ], + "entities": { + "driver_id": [1001, 1002, 1003], + }, +} + + +@dataclass +class BenchmarkResult: + image: str + endpoint: str + total_requests: int + concurrency: int + warmup_requests: int + successful_requests: int + failed_requests: int + elapsed_seconds: float + requests_per_second: float + mean_latency_ms: float + median_latency_ms: float + p95_latency_ms: float + min_latency_ms: float + max_latency_ms: float + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=__doc__, + ) + parser.add_argument("--baseline-image", required=True) + parser.add_argument("--experimental-image", required=True) + parser.add_argument( + "--sample-repo", + default="examples/podman_local/feature_repo", + help="Feature repo used to prepare and serve benchmark traffic.", + ) + parser.add_argument("--requests", type=int, default=400) + parser.add_argument("--concurrency", type=int, default=32) + parser.add_argument("--warmup-requests", type=int, default=50) + parser.add_argument("--workers", type=int, default=1) + parser.add_argument("--worker-connections", type=int, default=1000) + parser.add_argument("--max-requests", type=int, default=1000) + parser.add_argument("--registry-ttl-sec", type=int, default=60) + parser.add_argument( + "--output", + default="feature-server-experimental-benchmark.json", + help="Path to the JSON benchmark report.", + ) + return parser.parse_args() + + +def run( + cmd: list[str], *, check: bool = True, capture_output: bool = True +) -> subprocess.CompletedProcess[str]: + try: + return subprocess.run( + cmd, + check=check, + text=True, + capture_output=capture_output, + ) + except subprocess.CalledProcessError as exc: + if exc.stderr: + print(exc.stderr.strip()) + raise + + +def post_json(url: str, payload: dict) -> dict: + body = json.dumps(payload).encode("utf-8") + request = urllib.request.Request( + url, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(request, timeout=30) as response: + return json.loads(response.read().decode("utf-8")) + + +def wait_for_server(endpoint: str, payload: dict, timeout_seconds: int = 60) -> None: + deadline = time.time() + timeout_seconds + while time.time() < deadline: + try: + response = post_json(f"{endpoint}/get-online-features", payload) + if "metadata" in response and "results" in response: + return + except ( + TimeoutError, + urllib.error.URLError, + urllib.error.HTTPError, + http.client.RemoteDisconnected, + json.JSONDecodeError, + ConnectionResetError, + ): + pass + time.sleep(1) + raise TimeoutError( + f"Server at {endpoint} did not become ready within {timeout_seconds}s" + ) + + +def percentile(values_ms: list[float], percentile_value: float) -> float: + if not values_ms: + return 0.0 + if len(values_ms) == 1: + return values_ms[0] + ordered = sorted(values_ms) + index = int(round((len(ordered) - 1) * percentile_value)) + return ordered[index] + + +def benchmark_endpoint( + endpoint: str, + image: str, + request_payload: dict, + requests: int, + concurrency: int, + warmup_requests: int, +) -> BenchmarkResult: + for _ in range(warmup_requests): + post_json(f"{endpoint}/get-online-features", request_payload) + + latencies_ms: list[float] = [] + failures = 0 + + def one_request(_: int) -> float: + start = time.perf_counter() + response = post_json(f"{endpoint}/get-online-features", request_payload) + if "metadata" not in response: + raise RuntimeError(f"Unexpected response payload: {response}") + return (time.perf_counter() - start) * 1000.0 + + started = time.perf_counter() + with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: + futures = [executor.submit(one_request, index) for index in range(requests)] + for future in concurrent.futures.as_completed(futures): + try: + latencies_ms.append(future.result()) + except Exception: + failures += 1 + elapsed_seconds = time.perf_counter() - started + + successes = len(latencies_ms) + return BenchmarkResult( + image=image, + endpoint=endpoint, + total_requests=requests, + concurrency=concurrency, + warmup_requests=warmup_requests, + successful_requests=successes, + failed_requests=failures, + elapsed_seconds=elapsed_seconds, + requests_per_second=(successes / elapsed_seconds) if elapsed_seconds else 0.0, + mean_latency_ms=statistics.fmean(latencies_ms) if latencies_ms else 0.0, + median_latency_ms=statistics.median(latencies_ms) if latencies_ms else 0.0, + p95_latency_ms=percentile(latencies_ms, 0.95), + min_latency_ms=min(latencies_ms) if latencies_ms else 0.0, + max_latency_ms=max(latencies_ms) if latencies_ms else 0.0, + ) + + +def docker_run( + repo_dir: Path, image: str, *args: str +) -> subprocess.CompletedProcess[str]: + return run( + [ + "docker", + "run", + "--rm", + "-v", + f"{repo_dir}:/feature_repo", + image, + *args, + ] + ) + + +def prepare_repo(repo_dir: Path, baseline_image: str) -> None: + today = datetime.now(timezone.utc).date().isoformat() + docker_run(repo_dir, baseline_image, "feast", "-c", "/feature_repo", "apply") + docker_run( + repo_dir, + baseline_image, + "feast", + "-c", + "/feature_repo", + "materialize-incremental", + today, + ) + + +def start_server_container( + repo_dir: Path, + image: str, + name: str, + host_port: int, + workers: int, + worker_connections: int, + max_requests: int, + registry_ttl_sec: int, +) -> str: + run( + [ + "docker", + "run", + "--rm", + "-d", + "--name", + name, + "-p", + f"{host_port}:6566", + "-v", + f"{repo_dir}:/feature_repo", + image, + "feast", + "-c", + "/feature_repo", + "serve", + "-h", + "0.0.0.0", + "--workers", + str(workers), + "--worker-connections", + str(worker_connections), + "--max-requests", + str(max_requests), + "--registry_ttl_sec", + str(registry_ttl_sec), + "--no-access-log", + ] + ) + return f"http://127.0.0.1:{host_port}" + + +def stop_server_container(name: str) -> None: + run(["docker", "rm", "-f", name], check=False) + + +def find_free_port() -> int: + import socket + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + return sock.getsockname()[1] + + +def main() -> int: + args = parse_args() + + if shutil.which("docker") is None: + raise RuntimeError( + "docker is required to run the experimental feature server benchmark" + ) + + workspace = Path.cwd() + sample_repo = (workspace / args.sample_repo).resolve() + if not sample_repo.exists(): + raise FileNotFoundError(f"Sample repo not found: {sample_repo}") + + output_path = (workspace / args.output).resolve() + + with tempfile.TemporaryDirectory( + prefix="feast-feature-server-benchmark-", + dir=workspace, + ) as temp_dir: + temp_repo = Path(temp_dir) / "feature_repo" + shutil.copytree(sample_repo, temp_repo) + + print(f"Preparing benchmark repo in {temp_repo}") + prepare_repo(temp_repo, args.baseline_image) + + results: list[BenchmarkResult] = [] + for label, image in ( + ("baseline", args.baseline_image), + ("experimental", args.experimental_image), + ): + container_name = f"feast-feature-server-{label}-{int(time.time())}" + port = find_free_port() + print(f"Starting {label} container {image} on port {port}") + endpoint = start_server_container( + temp_repo, + image, + container_name, + port, + args.workers, + args.worker_connections, + args.max_requests, + args.registry_ttl_sec, + ) + try: + wait_for_server(endpoint, DEFAULT_REQUEST) + result = benchmark_endpoint( + endpoint, + image, + DEFAULT_REQUEST, + args.requests, + args.concurrency, + args.warmup_requests, + ) + results.append(result) + print( + f"{label}: {result.requests_per_second:.2f} req/s, " + f"p95={result.p95_latency_ms:.2f}ms, " + f"failures={result.failed_requests}" + ) + finally: + stop_server_container(container_name) + + summary = { + "generated_at": datetime.now(timezone.utc).isoformat(), + "sample_repo": str(sample_repo), + "request_payload": DEFAULT_REQUEST, + "results": [asdict(result) for result in results], + } + + if len(results) == 2: + baseline, experimental = results + summary["comparison"] = { + "throughput_gain_ratio": ( + experimental.requests_per_second / baseline.requests_per_second + if baseline.requests_per_second + else 0.0 + ), + "latency_p95_improvement_ratio": ( + baseline.p95_latency_ms / experimental.p95_latency_ms + if experimental.p95_latency_ms + else 0.0 + ), + "successful_request_delta": experimental.successful_requests + - baseline.successful_requests, + } + + output_path.write_text(json.dumps(summary, indent=2)) + print(f"Benchmark report written to {output_path}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index b1881c50150..f5b7c0274ed 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -1,76 +1,72 @@ +from importlib import import_module from importlib.metadata import PackageNotFoundError from importlib.metadata import version as _version -from feast.infra.offline_stores.bigquery_source import BigQuerySource -from feast.infra.offline_stores.contrib.athena_offline_store.athena_source import ( - AthenaSource, -) -from feast.infra.offline_stores.contrib.oracle_offline_store.oracle_source import ( - OracleSource, -) -from feast.infra.offline_stores.file_source import FileSource -from feast.infra.offline_stores.redshift_source import RedshiftSource -from feast.infra.offline_stores.snowflake_source import SnowflakeSource +_LAZY_EXPORTS = { + "Aggregation": ("feast.aggregation", "Aggregation"), + "AthenaSource": ( + "feast.infra.offline_stores.contrib.athena_offline_store.athena_source", + "AthenaSource", + ), + "BaseChunker": ("feast.chunker", "BaseChunker"), + "BaseEmbedder": ("feast.embedder", "BaseEmbedder"), + "BatchFeatureView": ("feast.batch_feature_view", "BatchFeatureView"), + "BigQuerySource": ("feast.infra.offline_stores.bigquery_source", "BigQuerySource"), + "ChrononSource": ( + "feast.infra.offline_stores.contrib.chronon_offline_store.chronon_source", + "ChrononSource", + ), + "ChunkingConfig": ("feast.chunker", "ChunkingConfig"), + "DataFrameEngine": ("feast.dataframe", "DataFrameEngine"), + "DocEmbedder": ("feast.doc_embedder", "DocEmbedder"), + "EmbeddingConfig": ("feast.embedder", "EmbeddingConfig"), + "Entity": ("feast.entity", "Entity"), + "Feature": ("feast.feature", "Feature"), + "FeatureService": ("feast.feature_service", "FeatureService"), + "FeatureStore": ("feast.feature_store", "FeatureStore"), + "FeatureView": ("feast.feature_view", "FeatureView"), + "FeastDataFrame": ("feast.dataframe", "FeastDataFrame"), + "FeastVectorStore": ("feast.vector_store", "FeastVectorStore"), + "Field": ("feast.field", "Field"), + "FileSource": ("feast.infra.offline_stores.file_source", "FileSource"), + "KafkaSource": ("feast.data_source", "KafkaSource"), + "KinesisSource": ("feast.data_source", "KinesisSource"), + "MultiModalEmbedder": ("feast.embedder", "MultiModalEmbedder"), + "OnDemandFeatureView": ("feast.on_demand_feature_view", "OnDemandFeatureView"), + "OracleSource": ( + "feast.infra.offline_stores.contrib.oracle_offline_store.oracle_source", + "OracleSource", + ), + "Project": ("feast.project", "Project"), + "PushSource": ("feast.data_source", "PushSource"), + "RedshiftSource": ("feast.infra.offline_stores.redshift_source", "RedshiftSource"), + "RepoConfig": ("feast.repo_config", "RepoConfig"), + "RequestSource": ("feast.data_source", "RequestSource"), + "SchemaTransformFn": ("feast.doc_embedder", "SchemaTransformFn"), + "SnowflakeSource": ( + "feast.infra.offline_stores.snowflake_source", + "SnowflakeSource", + ), + "StreamFeatureView": ("feast.stream_feature_view", "StreamFeatureView"), + "TextChunker": ("feast.chunker", "TextChunker"), + "ValueType": ("feast.value_type", "ValueType"), +} + +__all__ = sorted(_LAZY_EXPORTS) + + +def __getattr__(name: str): + if name not in _LAZY_EXPORTS: + raise AttributeError(f"module 'feast' has no attribute {name!r}") + + module_name, attr_name = _LAZY_EXPORTS[name] + module = import_module(module_name) + value = getattr(module, attr_name) + globals()[name] = value + return value -from .aggregation import Aggregation -from .batch_feature_view import BatchFeatureView -from .chunker import BaseChunker, ChunkingConfig, TextChunker -from .data_source import KafkaSource, KinesisSource, PushSource, RequestSource -from .dataframe import DataFrameEngine, FeastDataFrame -from .doc_embedder import DocEmbedder, SchemaTransformFn -from .embedder import BaseEmbedder, EmbeddingConfig, MultiModalEmbedder -from .entity import Entity -from .feature import Feature -from .feature_service import FeatureService -from .feature_store import FeatureStore -from .feature_view import FeatureView -from .field import Field -from .on_demand_feature_view import OnDemandFeatureView -from .project import Project -from .repo_config import RepoConfig -from .stream_feature_view import StreamFeatureView -from .value_type import ValueType -from .vector_store import FeastVectorStore try: __version__ = _version("feast") except PackageNotFoundError: - # package is not installed pass - -__all__ = [ - "Aggregation", - "BatchFeatureView", - "DataFrameEngine", - "Entity", - "KafkaSource", - "KinesisSource", - "FeastDataFrame", - "Feature", - "Field", - "FeatureService", - "FeatureStore", - "FeatureView", - "OnDemandFeatureView", - "RepoConfig", - "StreamFeatureView", - "ValueType", - "BigQuerySource", - "FileSource", - "RedshiftSource", - "SnowflakeSource", - "PushSource", - "RequestSource", - "AthenaSource", - "OracleSource", - "Project", - "FeastVectorStore", - "DocEmbedder", - "SchemaTransformFn", - "BaseChunker", - "TextChunker", - "ChunkingConfig", - "BaseEmbedder", - "MultiModalEmbedder", - "EmbeddingConfig", -] diff --git a/sdk/python/feast/_lazy_pandas.py b/sdk/python/feast/_lazy_pandas.py new file mode 100644 index 00000000000..794cec74bb5 --- /dev/null +++ b/sdk/python/feast/_lazy_pandas.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +from types import ModuleType + + +class _LazyPandasProxy: + """Load pandas only when a code path actually touches it.""" + + _module: ModuleType | None = None + + def _load(self) -> ModuleType: + if self._module is None: + import pandas as pandas_module + + self._module = pandas_module + return self._module + + def __getattr__(self, name: str): + return getattr(self._load(), name) + + +pd = _LazyPandasProxy() diff --git a/sdk/python/feast/dqm/profilers/profiler.py b/sdk/python/feast/dqm/profilers/profiler.py index 03481bdc999..998eb894f6c 100644 --- a/sdk/python/feast/dqm/profilers/profiler.py +++ b/sdk/python/feast/dqm/profilers/profiler.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import abc from typing import Any, List, Optional -import pandas as pd +from feast._lazy_pandas import pd class Profile: diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index f60eeb9d87d..17aeb04cbf3 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -1,3 +1,5 @@ +from __future__ import annotations + # Copyright 2025 The Feast Authors # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -11,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import asyncio import os import sys @@ -25,7 +26,6 @@ from types import SimpleNamespace from typing import Any, DefaultDict, Dict, List, NamedTuple, Optional, Set, Union -import pandas as pd from dateutil import parser from fastapi import ( Depends, @@ -46,12 +46,12 @@ import feast from feast import metrics as feast_metrics from feast import proto_json, utils +from feast._lazy_pandas import pd from feast.constants import DEFAULT_FEATURE_SERVER_REGISTRY_TTL from feast.data_source import PushMode from feast.errors import ( FeastError, ) -from feast.feast_object import FeastObject from feast.feature_view_utils import get_feature_view_from_feature_store from feast.permissions.action import WRITE, AuthzedAction from feast.permissions.security_manager import assert_permissions @@ -544,7 +544,7 @@ async def _push_with_to(push_to: PushMode) -> None: async def _get_feast_object( feature_view_name: str, allow_registry_cache: bool - ) -> FeastObject: + ) -> Any: return await run_in_threadpool( get_feature_view_from_feature_store, store, @@ -730,7 +730,6 @@ async def websocket_endpoint(websocket: WebSocket): def _add_mcp_support_if_enabled(app, store: "feast.FeatureStore"): """Add MCP support to the FastAPI app if enabled in configuration.""" - mcp_transport_not_supported_error = None try: # Check if MCP is enabled in feature server config if ( @@ -739,16 +738,7 @@ def _add_mcp_support_if_enabled(app, store: "feast.FeatureStore"): and store.config.feature_server.type == "mcp" and getattr(store.config.feature_server, "mcp_enabled", False) ): - try: - from feast.infra.mcp_servers.mcp_server import ( - McpTransportNotSupportedError, - add_mcp_support_to_app, - ) - - mcp_transport_not_supported_error = McpTransportNotSupportedError - except ImportError as e: - logger.error(f"Error checking/adding MCP support: {e}") - return + from feast.infra.mcp_servers.mcp_server import add_mcp_support_to_app mcp_server = add_mcp_support_to_app(app, store, store.config.feature_server) @@ -759,10 +749,6 @@ def _add_mcp_support_if_enabled(app, store: "feast.FeatureStore"): else: logger.debug("MCP support is not enabled in feature server configuration") except Exception as e: - if mcp_transport_not_supported_error and isinstance( - e, mcp_transport_not_supported_error - ): - raise logger.error(f"Error checking/adding MCP support: {e}") # Don't fail the entire server if MCP fails to initialize diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index f95bbf10c03..071dc3b4ed5 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1,3 +1,5 @@ +from __future__ import annotations + # Copyright 2019 The Feast Authors # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -36,8 +38,12 @@ if TYPE_CHECKING: from feast.diff.apply_progress import ApplyProgressContext + from feast.saved_dataset import ( + SavedDataset, + SavedDatasetStorage, + ValidationReference, + ) -import pandas as pd import pyarrow as pa from colorama import Fore, Style from fastapi.concurrency import run_in_threadpool @@ -45,6 +51,7 @@ from tqdm import tqdm from feast import feature_server, flags_helper, ui_server, utils +from feast._lazy_pandas import pd from feast.base_feature_view import BaseFeatureView from feast.batch_feature_view import BatchFeatureView from feast.data_source import ( @@ -66,7 +73,6 @@ PushSourceNotFoundException, RequestDataNotFoundInEntityDfException, ) -from feast.feast_object import FeastObject from feast.feature_service import FeatureService from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView from feast.inference import ( @@ -74,9 +80,6 @@ update_feature_views_with_inferred_features_and_entities, ) from feast.infra.infra_object import Infra -from feast.infra.offline_stores.offline_utils import ( - DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, -) from feast.infra.provider import Provider, RetrievalJob, get_provider from feast.infra.registry.base_registry import BaseRegistry from feast.infra.registry.registry import Registry @@ -94,14 +97,13 @@ from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import RepoConfig, load_repo_config from feast.repo_contents import RepoContents -from feast.saved_dataset import SavedDataset, SavedDatasetStorage, ValidationReference from feast.ssl_ca_trust_store_setup import configure_ca_trust_store_env_variables from feast.stream_feature_view import StreamFeatureView -from feast.transformation.pandas_transformation import PandasTransformation -from feast.transformation.python_transformation import PythonTransformation from feast.utils import _get_feature_view_vector_field_metadata, _utc_now from feast.version_utils import parse_version +DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp" + _track_materialization = None # Lazy-loaded on first materialization call _track_materialization_loaded = False @@ -125,6 +127,16 @@ def _get_track_materialization(): return _track_materialization +def _get_saved_dataset_classes(): + from feast.saved_dataset import ( + SavedDataset, + SavedDatasetStorage, + ValidationReference, + ) + + return SavedDataset, SavedDatasetStorage, ValidationReference + + warnings.simplefilter("once", DeprecationWarning) @@ -1049,9 +1061,9 @@ def apply( FeatureService, ValidationReference, Permission, - List[FeastObject], + List[Any], ], - objects_to_delete: Optional[List[FeastObject]] = None, + objects_to_delete: Optional[List[Any]] = None, partial: bool = True, skip_feature_view_validation: bool = False, no_promote: bool = False, @@ -1106,6 +1118,8 @@ def apply( if not objects_to_delete: objects_to_delete = [] + _, _, ValidationReference = _get_saved_dataset_classes() + # Separate all objects into entities, feature services, and different feature view types. projects_to_update = [ob for ob in objects if isinstance(ob, Project)] if len(projects_to_update) > 1: @@ -1447,19 +1461,6 @@ def get_historical_features( feature_views = list(view for view, _ in fvs) on_demand_feature_views = list(view for view, _ in odfvs) - # ODFV source FV dependencies (e.g. driver_stats:conv_rate) are resolved - # by _group_feature_refs and included in `fvs`, but not in _feature_refs. - # Offline stores use feature_refs to map which features to fetch from each - # FV, so we must include these implicit dependency refs. - _feature_refs_for_provider = list(_feature_refs) - existing_refs = set(_feature_refs) - for view, feats in fvs: - for feat in feats: - ref = f"{view.projection.name_to_use()}:{feat}" - if ref not in existing_refs: - _feature_refs_for_provider.append(ref) - existing_refs.add(ref) - # Check that the right request data is present in the entity_df if type(entity_df) == pd.DataFrame: if self.config.coerce_tz_aware: @@ -1486,7 +1487,7 @@ def get_historical_features( job = provider.get_historical_features( self.config, feature_views, - _feature_refs_for_provider, + _feature_refs, entity_df, self.registry, self.project, @@ -1539,6 +1540,7 @@ def create_saved_dataset( f"The RetrievalJob {type(from_)} must implement the metadata property." ) + SavedDataset, _, _ = _get_saved_dataset_classes() dataset = SavedDataset( name=name, features=from_.metadata.features, @@ -2198,6 +2200,8 @@ def _transform_on_demand_feature_view_df( _t0 = _time.monotonic() try: + from feast.transformation.python_transformation import PythonTransformation + if feature_view.mode == "python" and isinstance( feature_view.feature_transformation, PythonTransformation ): @@ -2262,7 +2266,9 @@ def _transform_on_demand_feature_view_df( return pd.DataFrame(transformed_data) - elif feature_view.mode == "pandas" and isinstance( + from feast.transformation.pandas_transformation import PandasTransformation + + if feature_view.mode == "pandas" and isinstance( feature_view.feature_transformation, PandasTransformation ): transformed_df = feature_view.feature_transformation.udf(df) @@ -2781,10 +2787,7 @@ def _doc_feature(x): online_features_response=online_features_response, data=requested_features_data, ) - feature_types = { - f.name: f.dtype.to_value_type() for f in requested_feature_view.features - } - return OnlineResponse(online_features_response, feature_types=feature_types) + return OnlineResponse(online_features_response) def retrieve_online_documents_v2( self, @@ -3074,20 +3077,24 @@ def _retrieve_from_online_store_v2( online_features_response.metadata.feature_names.val.extend( features_to_request ) - feature_types = {f.name: f.dtype.to_value_type() for f in table.features} - return OnlineResponse(online_features_response, feature_types=feature_types) + return OnlineResponse(online_features_response) table_entity_values, idxs, output_len = utils._get_unique_entities_from_values( entity_key_dict, ) - online_features_response = GetOnlineFeaturesResponse(results=[]) - utils._populate_response_from_feature_data( + feature_data = utils._convert_rows_to_protobuf( requested_features=features_to_request, read_rows=list(zip(datevals, list_of_feature_dicts)), + ) + + online_features_response = GetOnlineFeaturesResponse(results=[]) + utils._populate_response_from_feature_data( + feature_data=feature_data, indexes=idxs, online_features_response=online_features_response, full_feature_names=False, + requested_features=features_to_request, table=table, output_len=output_len, include_feature_view_version_metadata=include_feature_view_version_metadata, @@ -3098,8 +3105,7 @@ def _retrieve_from_online_store_v2( data=entity_key_dict, ) - feature_types = {f.name: f.dtype.to_value_type() for f in table.features} - return OnlineResponse(online_features_response, feature_types=feature_types) + return OnlineResponse(online_features_response) def serve( self, diff --git a/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile.experimental b/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile.experimental new file mode 100644 index 00000000000..5861efe7e3d --- /dev/null +++ b/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile.experimental @@ -0,0 +1,62 @@ +ARG PYTHON_VERSION=3.14.0 +ARG PYTHON_TARBALL=https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-${PYTHON_VERSION}.tgz + +FROM registry.access.redhat.com/ubi9/ubi:latest AS python-builder + +ARG PYTHON_VERSION +ARG PYTHON_TARBALL + +RUN dnf install -y \ + bzip2-devel \ + findutils \ + gcc \ + gzip \ + libffi-devel \ + make \ + openssl-devel \ + sqlite-devel \ + tar \ + wget \ + xz-devel \ + zlib-devel \ + && dnf clean all + +WORKDIR /tmp +RUN wget -O python.tgz "${PYTHON_TARBALL}" \ + && tar -xzf python.tgz \ + && cd Python-${PYTHON_VERSION} \ + && ./configure \ + --prefix=/opt/python314t \ + --disable-gil \ + --enable-optimizations \ + --with-ensurepip=install \ + && make -j"$(nproc)" \ + && make install + +FROM registry.access.redhat.com/ubi9/ubi:latest + +LABEL org.opencontainers.image.title="Feast Feature Server (Experimental Python 3.14 free-threaded)" +LABEL org.opencontainers.image.description="Experimental Feast feature server image built with a Python 3.14 free-threaded runtime." +LABEL feast.experimental="true" + +RUN dnf install -y \ + gcc \ + && dnf clean all + +COPY --from=python-builder /opt/python314t /opt/python314t +COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv + +ENV PATH=/opt/python314t/bin:${PATH} +ENV UV_CACHE_DIR=/tmp/uv-cache +ENV FEAST_EXPERIMENTAL_PYTHON_FREE_THREADED=1 +ENV SETUPTOOLS_SCM_PRETEND_VERSION=0.61.0 + +WORKDIR /src +COPY pyproject.toml README.md MANIFEST.in Makefile /src/ +COPY protos /src/protos +COPY sdk/python /src/sdk/python + +RUN uv pip install --python /opt/python314t/bin/python3.14t --system /src protobuf==6.33.6 + +# modify permissions to support running with a random uid +RUN chmod g+w $(python3.14t -c "import feast.ui as ui; print(ui.__path__)" | tr -d "[']")/build/projects-list.json diff --git a/sdk/python/feast/infra/feature_servers/multicloud/requirements.experimental.txt b/sdk/python/feast/infra/feature_servers/multicloud/requirements.experimental.txt new file mode 100644 index 00000000000..f408e075a97 --- /dev/null +++ b/sdk/python/feast/infra/feature_servers/multicloud/requirements.experimental.txt @@ -0,0 +1,5 @@ +# Experimental no-GIL image: +# keep this benchmark profile focused on the local SQLite feature-server path. +# Optional cloud and warehouse extras stay out of this image until their native +# dependencies are confirmed to work on cp314t. +feast == 0.61.0 diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 9bdf681fb69..5e697836d25 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -1,7 +1,10 @@ +from __future__ import annotations + from abc import ABC, abstractmethod from datetime import datetime from pathlib import Path from typing import ( + TYPE_CHECKING, Any, Callable, Dict, @@ -14,11 +17,11 @@ Union, ) -import pandas as pd import pyarrow from tqdm import tqdm from feast import FeatureService, errors +from feast._lazy_pandas import pd from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource from feast.entity import Entity @@ -36,13 +39,16 @@ from feast.protos.feast.types.Value_pb2 import RepeatedValue from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import RepoConfig -from feast.saved_dataset import SavedDataset + +if TYPE_CHECKING: + from feast.saved_dataset import SavedDataset PROVIDERS_CLASS_FOR_TYPE = { "gcp": "feast.infra.passthrough_provider.PassthroughProvider", "aws": "feast.infra.passthrough_provider.PassthroughProvider", "local": "feast.infra.passthrough_provider.PassthroughProvider", "azure": "feast.infra.passthrough_provider.PassthroughProvider", + "chronon": "feast.infra.chronon_provider.ChrononProvider", } diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index da4f291bc44..9687451ddac 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -1,3 +1,5 @@ +from __future__ import annotations + # Copyright 2019 The Feast Authors # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,7 +18,7 @@ from abc import ABC, abstractmethod from collections import defaultdict from datetime import datetime -from typing import Any, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union from google.protobuf.json_format import MessageToJson from google.protobuf.message import Message @@ -51,10 +53,16 @@ from feast.protos.feast.core.StreamFeatureView_pb2 import ( StreamFeatureView as StreamFeatureViewProto, ) -from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView -from feast.transformation.pandas_transformation import PandasTransformation -from feast.transformation.substrait_transformation import SubstraitTransformation + +if TYPE_CHECKING: + from feast.saved_dataset import SavedDataset, ValidationReference + + +def _get_saved_dataset_classes(): + from feast.saved_dataset import SavedDataset, ValidationReference + + return SavedDataset, ValidationReference class BaseRegistry(ABC): @@ -1106,6 +1114,13 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: DeprecationWarning, ) if on_demand_feature_view.feature_transformation: + from feast.transformation.pandas_transformation import ( + PandasTransformation, + ) + from feast.transformation.substrait_transformation import ( + SubstraitTransformation, + ) + if isinstance( on_demand_feature_view.feature_transformation, PandasTransformation ): @@ -1164,6 +1179,8 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: @staticmethod def deserialize_registry_values(serialized_proto, feast_obj_type) -> Any: + SavedDataset, _ = _get_saved_dataset_classes() + if feast_obj_type == Entity: return EntityProto.FromString(serialized_proto) if feast_obj_type == SavedDataset: diff --git a/sdk/python/feast/on_demand_feature_view.py b/sdk/python/feast/on_demand_feature_view.py index 57c51fccd8a..e9bbd7419c9 100644 --- a/sdk/python/feast/on_demand_feature_view.py +++ b/sdk/python/feast/on_demand_feature_view.py @@ -1,6 +1,5 @@ import copy import functools -import uuid import warnings from types import FunctionType from typing import Any, List, Optional, Union, cast @@ -31,9 +30,6 @@ ) from feast.transformation.base import Transformation from feast.transformation.mode import TransformationMode -from feast.transformation.pandas_transformation import PandasTransformation -from feast.transformation.python_transformation import PythonTransformation -from feast.transformation.substrait_transformation import SubstraitTransformation from feast.utils import _utc_now from feast.value_type import ValueType from feast.version_utils import normalize_version_string @@ -362,6 +358,10 @@ def get_feature_transformation(self) -> Transformation: mode=self.mode, udf=self.udf, udf_string=self.udf_string or "" ) elif self.mode == TransformationMode.SUBSTRAIT or self.mode == "substrait": + from feast.transformation.substrait_transformation import ( + SubstraitTransformation, + ) + return SubstraitTransformation.from_ibis(self.udf, self.sources) else: raise ValueError( @@ -773,8 +773,16 @@ def _parse_transformation_from_proto( # Check for non-empty UDF body if udf_proto.body_text: if mode == "pandas": + from feast.transformation.pandas_transformation import ( + PandasTransformation, + ) + return PandasTransformation.from_proto(udf_proto) elif mode == "python": + from feast.transformation.python_transformation import ( + PythonTransformation, + ) + return PythonTransformation.from_proto(udf_proto) else: raise ValueError(ODFVErrorMessages.unsupported_mode_for_udf(mode)) @@ -783,6 +791,10 @@ def _parse_transformation_from_proto( return cls._handle_backward_compatible_udf(proto) elif transformation_type == "substrait_transformation": + from feast.transformation.substrait_transformation import ( + SubstraitTransformation, + ) + return SubstraitTransformation.from_proto( feature_transformation.substrait_transformation ) @@ -810,6 +822,8 @@ def _handle_backward_compatible_udf( body=old_udf.body, body_text=old_udf.body_text, ) + from feast.transformation.pandas_transformation import PandasTransformation + return PandasTransformation.from_proto( user_defined_function_proto=backwards_compatible_udf, ) @@ -912,6 +926,10 @@ def transform_ibis( ): from ibis.expr.types import Table + from feast.transformation.substrait_transformation import ( + SubstraitTransformation, + ) + if not isinstance(ibis_table, Table): raise TypeError("transform_ibis only accepts ibis.expr.types.Table") @@ -1273,9 +1291,6 @@ def _get_sample_values_by_type(self) -> dict[ValueType, list[Any]]: # Special binary types ValueType.PDF_BYTES: [pdf_sample], ValueType.IMAGE_BYTES: [image_sample], - # UUID types - ValueType.UUID: [uuid.uuid4()], - ValueType.TIME_UUID: [uuid.uuid1()], # List types ValueType.BYTES_LIST: [[b"hello world"]], ValueType.STRING_LIST: [["hello world"]], @@ -1285,19 +1300,6 @@ def _get_sample_values_by_type(self) -> dict[ValueType, list[Any]]: ValueType.FLOAT_LIST: [[1.0]], ValueType.BOOL_LIST: [[True]], ValueType.UNIX_TIMESTAMP_LIST: [[_utc_now()]], - ValueType.UUID_LIST: [[uuid.uuid4(), uuid.uuid4()]], - ValueType.TIME_UUID_LIST: [[uuid.uuid1(), uuid.uuid1()]], - # Set types - ValueType.BYTES_SET: [{b"hello world", b"foo bar"}], - ValueType.STRING_SET: [{"hello world", "foo bar"}], - ValueType.INT32_SET: [{1, 2}], - ValueType.INT64_SET: [{1, 2}], - ValueType.DOUBLE_SET: [{1.0, 2.0}], - ValueType.FLOAT_SET: [{1.0, 2.0}], - ValueType.BOOL_SET: [{True, False}], - ValueType.UNIX_TIMESTAMP_SET: [{_utc_now()}], - ValueType.UUID_SET: [{uuid.uuid4(), uuid.uuid4()}], - ValueType.TIME_UUID_SET: [{uuid.uuid1(), uuid.uuid1()}], } @staticmethod diff --git a/sdk/python/feast/online_response.py b/sdk/python/feast/online_response.py index 7b6b4806e4d..549c6a7bb2d 100644 --- a/sdk/python/feast/online_response.py +++ b/sdk/python/feast/online_response.py @@ -1,3 +1,5 @@ +from __future__ import annotations + # Copyright 2020 The Feast Authors # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -11,18 +13,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import TYPE_CHECKING, Any, Dict, List, TypeAlias, Union -import uuid as uuid_module -from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeAlias, Union - -import pandas as pd import pyarrow as pa +from feast._lazy_pandas import pd from feast.feature_view import DUMMY_ENTITY_ID from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse from feast.torch_wrapper import get_torch from feast.type_map import feast_value_type_to_python_type -from feast.value_type import ValueType if TYPE_CHECKING: import torch @@ -39,20 +38,14 @@ class OnlineResponse: Defines an online response in feast. """ - def __init__( - self, - online_response_proto: GetOnlineFeaturesResponse, - feature_types: Optional[Dict[str, ValueType]] = None, - ): + def __init__(self, online_response_proto: GetOnlineFeaturesResponse): """ Construct a native online response from its protobuf version. Args: online_response_proto: GetOnlineResponse proto object to construct from. - feature_types: Optional mapping of feature names to ValueType for type-aware deserialization. """ self.proto = online_response_proto - self._feature_types = feature_types or {} # Delete DUMMY_ENTITY_ID from proto if it exists for idx, val in enumerate(self.proto.metadata.feature_names.val): if val == DUMMY_ENTITY_ID: @@ -73,10 +66,8 @@ def to_dict(self, include_event_timestamps: bool = False) -> Dict[str, Any]: for feature_ref, feature_vector in zip( self.proto.metadata.feature_names.val, self.proto.results ): - feature_type = self._feature_types.get(feature_ref) response[feature_ref] = [ - feast_value_type_to_python_type(v, feature_type) - for v in feature_vector.values + feast_value_type_to_python_type(v) for v in feature_vector.values ] if include_event_timestamps: @@ -104,9 +95,8 @@ def to_arrow(self, include_event_timestamps: bool = False) -> pa.Table: Args: include_event_timestamps: bool Optionally include feature timestamps in the table """ - result = self.to_dict(include_event_timestamps) - result = _convert_uuids_for_arrow(result) - return pa.Table.from_pydict(result) + + return pa.Table.from_pydict(self.to_dict(include_event_timestamps)) def to_tensor( self, @@ -151,31 +141,3 @@ def to_tensor( values # Return as-is for strings or unsupported types ) return tensor_dict - - -def _convert_uuids_for_arrow(result: Dict[str, List[Any]]) -> Dict[str, List[Any]]: - """Convert uuid.UUID objects and sets to Arrow-compatible types.""" - for key, values in result.items(): - first_valid = next((v for v in values if v is not None), None) - if isinstance(first_valid, uuid_module.UUID): - result[key] = [ - str(v) if isinstance(v, uuid_module.UUID) else v for v in values - ] - elif isinstance(first_valid, list): - inner = next((e for e in first_valid if e is not None), None) - if isinstance(inner, uuid_module.UUID): - result[key] = [ - [str(e) if isinstance(e, uuid_module.UUID) else e for e in v] - if isinstance(v, list) - else v - for v in values - ] - elif isinstance(first_valid, set): - inner = next((e for e in first_valid if e is not None), None) - if isinstance(inner, uuid_module.UUID): - result[key] = [ - [str(e) for e in v] if isinstance(v, set) else v for v in values - ] - else: - result[key] = [list(v) if isinstance(v, set) else v for v in values] - return result diff --git a/sdk/python/feast/saved_dataset.py b/sdk/python/feast/saved_dataset.py index 4a3043a8731..8975b31826c 100644 --- a/sdk/python/feast/saved_dataset.py +++ b/sdk/python/feast/saved_dataset.py @@ -1,11 +1,13 @@ +from __future__ import annotations + from abc import abstractmethod from datetime import datetime from typing import TYPE_CHECKING, Dict, List, Optional, Type, cast -import pandas as pd import pyarrow from google.protobuf.json_format import MessageToJson +from feast._lazy_pandas import pd from feast.data_source import DataSource from feast.dqm.profilers.profiler import Profile, Profiler from feast.importer import import_class diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 001cb1d5b62..23e20303aed 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -1,3 +1,5 @@ +from __future__ import annotations + # Copyright 2019 The Feast Authors # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -11,11 +13,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import decimal import json import logging -import uuid as uuid_module from collections import defaultdict from datetime import datetime, timezone from typing import ( @@ -35,9 +35,9 @@ ) import numpy as np -import pandas as pd from google.protobuf.timestamp_pb2 import Timestamp +from feast._lazy_pandas import pd from feast.protos.feast.types.Value_pb2 import ( BoolList, BoolSet, @@ -53,7 +53,6 @@ Int64Set, Map, MapList, - RepeatedValue, StringList, StringSet, ) @@ -69,10 +68,7 @@ logger = logging.getLogger(__name__) -def feast_value_type_to_python_type( - field_value_proto: ProtoValue, - feature_type: Optional[ValueType] = None, -) -> Any: +def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: """ Converts field value Proto to Dict and returns each field's Feast Value Type value in their respective Python value. @@ -106,10 +102,6 @@ def feast_value_type_to_python_type( result.append(v) return result - # Handle nested collection types (list_val, set_val) - if val_attr in ("list_val", "set_val"): - return _handle_nested_collection_value(val) - # Handle Struct types — stored using Map proto, returned as dicts if val_attr == "struct_val": return _handle_map_value(val) @@ -157,34 +149,6 @@ def feast_value_type_to_python_type( elif val_attr.endswith("_set_val") and val_attr != "unix_timestamp_set_val": val = set(val) - # Convert UUID values to uuid.UUID objects - if val_attr in ("uuid_val", "time_uuid_val"): - return uuid_module.UUID(val) if isinstance(val, str) else val - if val_attr in ("uuid_list_val", "time_uuid_list_val"): - return [uuid_module.UUID(v) if isinstance(v, str) else v for v in val] - if val_attr in ("uuid_set_val", "time_uuid_set_val"): - return {uuid_module.UUID(v) if isinstance(v, str) else v for v in val} - - # Convert DECIMAL values to decimal.Decimal objects - if val_attr == "decimal_val": - return decimal.Decimal(val) if isinstance(val, str) else val - if val_attr == "decimal_list_val": - return [decimal.Decimal(v) if isinstance(v, str) else v for v in val] - if val_attr == "decimal_set_val": - return {decimal.Decimal(v) if isinstance(v, str) else v for v in val} - - # Backward compatibility: handle UUIDs stored as string_val/string_list_val with feature_type hint - if feature_type in (ValueType.UUID, ValueType.TIME_UUID) and isinstance(val, str): - return uuid_module.UUID(val) - if feature_type in (ValueType.UUID_LIST, ValueType.TIME_UUID_LIST) and isinstance( - val, list - ): - return [uuid_module.UUID(v) if isinstance(v, str) else v for v in val] - if feature_type in (ValueType.UUID_SET, ValueType.TIME_UUID_SET) and isinstance( - val, set - ): - return {uuid_module.UUID(v) if isinstance(v, str) else v for v in val} - return val @@ -211,18 +175,6 @@ def _handle_map_list_value(map_list_message) -> List[Dict[str, Any]]: return result -def _handle_nested_collection_value(repeated_value) -> List[Any]: - """Handle nested collection proto (RepeatedValue containing Values). - - Each inner Value is itself a list/set proto. We recursively convert - each inner Value to a Python list/set via feast_value_type_to_python_type. - """ - result = [] - for inner_value in repeated_value.val: - result.append(feast_value_type_to_python_type(inner_value)) - return result - - def feast_value_type_to_pandas_type(value_type: ValueType) -> Any: value_type_to_pandas_type: Dict[ValueType, str] = { ValueType.FLOAT: "float", @@ -233,12 +185,9 @@ def feast_value_type_to_pandas_type(value_type: ValueType) -> Any: ValueType.BYTES: "bytes", ValueType.BOOL: "bool", ValueType.UNIX_TIMESTAMP: "datetime64[ns]", - ValueType.UUID: "str", - ValueType.TIME_UUID: "str", - ValueType.DECIMAL: "object", } if ( - value_type.name in ("MAP", "JSON", "STRUCT", "VALUE_LIST", "VALUE_SET") + value_type.name in ("MAP", "JSON", "STRUCT") or value_type.name.endswith("_LIST") or value_type.name.endswith("_SET") ): @@ -299,8 +248,6 @@ def python_type_to_feast_value_type( "datetime64[ns, utc]": ValueType.UNIX_TIMESTAMP, "date": ValueType.UNIX_TIMESTAMP, "category": ValueType.STRING, - "uuid": ValueType.UUID, - "decimal": ValueType.DECIMAL, } if type_name in type_map: @@ -333,9 +280,8 @@ def python_type_to_feast_value_type( if not recurse: raise ValueError( f"Value type for field {name} is {type(value)} but " - f"recursion is not allowed. Nested collection types cannot be " - f"inferred automatically; use an explicit Field dtype instead " - f"(e.g., dtype=Array(Array(Int32)))." + f"recursion is not allowed. Array types can only be one level " + f"deep." ) # This is the final type which we infer from the list @@ -460,25 +406,6 @@ def _convert_value_type_str_to_value_type(type_str: str) -> ValueType: "JSON_LIST": ValueType.JSON_LIST, "STRUCT": ValueType.STRUCT, "STRUCT_LIST": ValueType.STRUCT_LIST, - "BYTES_SET": ValueType.BYTES_SET, - "STRING_SET": ValueType.STRING_SET, - "INT32_SET": ValueType.INT32_SET, - "INT64_SET": ValueType.INT64_SET, - "DOUBLE_SET": ValueType.DOUBLE_SET, - "FLOAT_SET": ValueType.FLOAT_SET, - "BOOL_SET": ValueType.BOOL_SET, - "UNIX_TIMESTAMP_SET": ValueType.UNIX_TIMESTAMP_SET, - "UUID": ValueType.UUID, - "TIME_UUID": ValueType.TIME_UUID, - "UUID_LIST": ValueType.UUID_LIST, - "TIME_UUID_LIST": ValueType.TIME_UUID_LIST, - "UUID_SET": ValueType.UUID_SET, - "TIME_UUID_SET": ValueType.TIME_UUID_SET, - "VALUE_LIST": ValueType.VALUE_LIST, - "VALUE_SET": ValueType.VALUE_SET, - "DECIMAL": ValueType.DECIMAL, - "DECIMAL_LIST": ValueType.DECIMAL_LIST, - "DECIMAL_SET": ValueType.DECIMAL_SET, } return type_map.get(type_str, ValueType.STRING) @@ -510,21 +437,6 @@ def _type_err(item, dtype): ValueType.STRING_LIST: (StringList, "string_list_val", [np.str_, str]), ValueType.BOOL_LIST: (BoolList, "bool_list_val", [np.bool_, bool]), ValueType.BYTES_LIST: (BytesList, "bytes_list_val", [np.bytes_, bytes]), - ValueType.UUID_LIST: ( - StringList, - "uuid_list_val", - [np.str_, str, uuid_module.UUID], - ), - ValueType.TIME_UUID_LIST: ( - StringList, - "time_uuid_list_val", - [np.str_, str, uuid_module.UUID], - ), - ValueType.DECIMAL_LIST: ( - StringList, - "decimal_list_val", - [np.str_, str, decimal.Decimal], - ), } PYTHON_SET_VALUE_TYPE_TO_PROTO_VALUE: Dict[ @@ -550,17 +462,6 @@ def _type_err(item, dtype): ValueType.STRING_SET: (StringSet, "string_set_val", [np.str_, str]), ValueType.BOOL_SET: (BoolSet, "bool_set_val", [np.bool_, bool]), ValueType.BYTES_SET: (BytesSet, "bytes_set_val", [np.bytes_, bytes]), - ValueType.UUID_SET: (StringSet, "uuid_set_val", [np.str_, str, uuid_module.UUID]), - ValueType.TIME_UUID_SET: ( - StringSet, - "time_uuid_set_val", - [np.str_, str, uuid_module.UUID], - ), - ValueType.DECIMAL_SET: ( - StringSet, - "decimal_set_val", - [np.str_, str, decimal.Decimal], - ), } PYTHON_SCALAR_VALUE_TYPE_TO_PROTO_VALUE: Dict[ @@ -586,9 +487,6 @@ def _type_err(item, dtype): ValueType.BYTES: ("bytes_val", lambda x: x, {bytes}), ValueType.IMAGE_BYTES: ("bytes_val", lambda x: x, {bytes}), ValueType.BOOL: ("bool_val", lambda x: x, {bool, np.bool_, int, np.int_}), - ValueType.UUID: ("uuid_val", lambda x: str(x), {str, uuid_module.UUID}), - ValueType.TIME_UUID: ("time_uuid_val", lambda x: str(x), {str, uuid_module.UUID}), - ValueType.DECIMAL: ("decimal_val", lambda x: str(x), {decimal.Decimal, str}), } @@ -788,32 +686,6 @@ def convert_set_to_list(value: Any) -> Any: converted_values, set_field_name, set_proto_type ) - if feast_value_type in (ValueType.UUID_SET, ValueType.TIME_UUID_SET): - # uuid.UUID objects must be converted to str for StringSet proto. - return [ - ( - ProtoValue( - **{set_field_name: set_proto_type(val=[str(e) for e in value])} # type: ignore[arg-type, misc] - ) - if value is not None - else ProtoValue() - ) - for value in converted_values - ] - - if feast_value_type == ValueType.DECIMAL_SET: - # decimal.Decimal objects must be converted to str for StringSet proto. - return [ - ( - ProtoValue( - **{set_field_name: set_proto_type(val=[str(e) for e in value])} # type: ignore[arg-type, misc] - ) - if value is not None - else ProtoValue() - ) - for value in converted_values - ] - # Generic set conversion return [ ProtoValue(**{set_field_name: set_proto_type(val=value)}) # type: ignore[arg-type] @@ -874,32 +746,6 @@ def _convert_list_values_to_proto( if feast_value_type == ValueType.BOOL_LIST: return _convert_bool_collection_to_proto(values, field_name, proto_type) - if feast_value_type in (ValueType.UUID_LIST, ValueType.TIME_UUID_LIST): - # uuid.UUID objects must be converted to str for StringList proto. - return [ - ( - ProtoValue( - **{field_name: proto_type(val=[str(e) for e in value])} # type: ignore[arg-type, misc] - ) - if value is not None - else ProtoValue() - ) - for value in values - ] - - if feast_value_type == ValueType.DECIMAL_LIST: - # decimal.Decimal objects must be converted to str for StringList proto. - return [ - ( - ProtoValue( - **{field_name: proto_type(val=[str(e) for e in value])} # type: ignore[arg-type, misc] - ) - if value is not None - else ProtoValue() - ) - for value in values - ] - # Generic list conversion return [ ProtoValue(**{field_name: proto_type(val=value)}) # type: ignore[arg-type] @@ -1029,10 +875,6 @@ def _python_value_to_proto_value( Returns: List of Feast Value Proto """ - # Handle nested collection types (VALUE_LIST, VALUE_SET) - if feast_value_type in (ValueType.VALUE_LIST, ValueType.VALUE_SET): - return _convert_nested_collection_to_proto(feast_value_type, values) - # Handle Map types if feast_value_type == ValueType.MAP: result = [] @@ -1166,48 +1008,6 @@ def _python_value_to_proto_value( raise Exception(f"Unsupported data type: {feast_value_type}") -def _convert_nested_collection_to_proto( - feast_value_type: ValueType, values: List[Any] -) -> List[ProtoValue]: - """Convert nested collection values (list-of-lists, list-of-sets, etc.) to proto.""" - val_attr = "list_val" if feast_value_type == ValueType.VALUE_LIST else "set_val" - - result = [] - for value in values: - if value is None: - result.append(ProtoValue()) - else: - inner_values = [] - for inner_collection in value: - if inner_collection is None: - inner_values.append(ProtoValue()) - else: - inner_list = list(inner_collection) - if len(inner_list) == 0: - # Empty inner collection: store as empty ProtoValue - inner_values.append(ProtoValue()) - elif any( - isinstance(item, (list, set, tuple, np.ndarray)) - for item in inner_list - ): - # Deeper nesting (3+ levels): recurse using VALUE_LIST - inner_proto = _convert_nested_collection_to_proto( - ValueType.VALUE_LIST, [inner_list] - ) - inner_values.append(inner_proto[0]) - else: - # Leaf level: wrap as a single list-typed Value - proto_vals = python_values_to_proto_values( - [inner_list], ValueType.UNKNOWN - ) - inner_values.append(proto_vals[0]) - repeated = RepeatedValue(val=inner_values) - proto = ProtoValue() - getattr(proto, val_attr).CopyFrom(repeated) - result.append(proto) - return result - - def _python_dict_to_map_proto(python_dict: Dict[str, Any]) -> Map: """Convert a Python dictionary to a Map proto message.""" map_proto = Map() @@ -1302,8 +1102,6 @@ def python_values_to_proto_values( "json_list_val": ValueType.JSON_LIST, "struct_val": ValueType.STRUCT, "struct_list_val": ValueType.STRUCT_LIST, - "list_val": ValueType.VALUE_LIST, - "set_val": ValueType.VALUE_SET, "int32_set_val": ValueType.INT32_SET, "int64_set_val": ValueType.INT64_SET, "double_set_val": ValueType.DOUBLE_SET, @@ -1312,15 +1110,6 @@ def python_values_to_proto_values( "bytes_set_val": ValueType.BYTES_SET, "bool_set_val": ValueType.BOOL_SET, "unix_timestamp_set_val": ValueType.UNIX_TIMESTAMP_SET, - "uuid_set_val": ValueType.UUID_SET, - "time_uuid_set_val": ValueType.TIME_UUID_SET, - "uuid_val": ValueType.UUID, - "time_uuid_val": ValueType.TIME_UUID, - "uuid_list_val": ValueType.UUID_LIST, - "time_uuid_list_val": ValueType.TIME_UUID_LIST, - "decimal_val": ValueType.DECIMAL, - "decimal_list_val": ValueType.DECIMAL_LIST, - "decimal_set_val": ValueType.DECIMAL_SET, } VALUE_TYPE_TO_PROTO_VALUE_MAP: Dict[ValueType, str] = { @@ -1348,11 +1137,7 @@ def pa_to_feast_value_type(pa_type_as_str: str) -> ValueType: is_list = False if pa_type_as_str.startswith("list", "") if pa_type_as_str.startswith("timestamp"): value_type = ValueType.UNIX_TIMESTAMP @@ -1614,15 +1399,6 @@ def _convert_value_name_to_snowflake_udf(value_name: str, project_name: str) -> "FLOAT_LIST": f"feast_{project_name}_snowflake_array_float_to_list_double_proto", "BOOL_LIST": f"feast_{project_name}_snowflake_array_boolean_to_list_bool_proto", "UNIX_TIMESTAMP_LIST": f"feast_{project_name}_snowflake_array_timestamp_to_list_unix_timestamp_proto", - "UUID": f"feast_{project_name}_snowflake_varchar_to_string_proto", - "TIME_UUID": f"feast_{project_name}_snowflake_varchar_to_string_proto", - "UUID_LIST": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto", - "TIME_UUID_LIST": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto", - "UUID_SET": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto", - "TIME_UUID_SET": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto", - "DECIMAL": f"feast_{project_name}_snowflake_varchar_to_string_proto", - "DECIMAL_LIST": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto", - "DECIMAL_SET": f"feast_{project_name}_snowflake_array_varchar_to_list_string_proto", } return name_map[value_name].upper() @@ -1826,8 +1602,8 @@ def pg_type_to_feast_value_type(type_str: str) -> ValueType: "timestamp with time zone[]": ValueType.UNIX_TIMESTAMP_LIST, "numeric[]": ValueType.DOUBLE_LIST, "numeric": ValueType.DOUBLE, - "uuid": ValueType.UUID, - "uuid[]": ValueType.UUID_LIST, + "uuid": ValueType.STRING, + "uuid[]": ValueType.STRING_LIST, "json": ValueType.MAP, "jsonb": ValueType.MAP, "json[]": ValueType.MAP_LIST, @@ -1873,20 +1649,7 @@ def feast_value_type_to_pa( ValueType.JSON_LIST: pyarrow.list_(pyarrow.large_string()), ValueType.STRUCT: pyarrow.struct([]), ValueType.STRUCT_LIST: pyarrow.list_(pyarrow.struct([])), - # Placeholder: inner type is unknown from ValueType alone. - # Callers needing accurate inner types should use from_feast_to_pyarrow_type() with a FeastType. - ValueType.VALUE_LIST: pyarrow.list_(pyarrow.list_(pyarrow.string())), - ValueType.VALUE_SET: pyarrow.list_(pyarrow.list_(pyarrow.string())), ValueType.NULL: pyarrow.null(), - ValueType.UUID: pyarrow.string(), - ValueType.TIME_UUID: pyarrow.string(), - ValueType.UUID_LIST: pyarrow.list_(pyarrow.string()), - ValueType.TIME_UUID_LIST: pyarrow.list_(pyarrow.string()), - ValueType.UUID_SET: pyarrow.list_(pyarrow.string()), - ValueType.TIME_UUID_SET: pyarrow.list_(pyarrow.string()), - ValueType.DECIMAL: pyarrow.string(), - ValueType.DECIMAL_LIST: pyarrow.list_(pyarrow.string()), - ValueType.DECIMAL_SET: pyarrow.list_(pyarrow.string()), } return type_map[feast_type] @@ -2039,7 +1802,7 @@ def cb_columnar_type_to_feast_value_type(type_str: str) -> ValueType: "object": ValueType.UNKNOWN, "array": ValueType.UNKNOWN, "multiset": ValueType.UNKNOWN, - "uuid": ValueType.UUID, + "uuid": ValueType.STRING, } value = ( type_map[type_str.lower()] @@ -2065,8 +1828,6 @@ def convert_scalar_column( return series.astype("boolean") elif value_type == ValueType.STRING: return series.astype("string") - elif value_type in [ValueType.UUID, ValueType.TIME_UUID]: - return series.astype("string") elif value_type == ValueType.UNIX_TIMESTAMP: return pd.to_datetime(series, unit="s", errors="coerce") elif value_type in (ValueType.JSON, ValueType.STRUCT, ValueType.MAP): @@ -2086,18 +1847,6 @@ def convert_array_column(series: pd.Series, value_type: ValueType) -> pd.Series: ValueType.STRING_LIST: object, ValueType.BYTES_LIST: object, ValueType.UNIX_TIMESTAMP_LIST: "datetime64[s]", - ValueType.UUID_LIST: object, - ValueType.TIME_UUID_LIST: object, - ValueType.BYTES_SET: object, - ValueType.STRING_SET: object, - ValueType.INT32_SET: np.int32, - ValueType.INT64_SET: np.int64, - ValueType.FLOAT_SET: np.float32, - ValueType.DOUBLE_SET: np.float64, - ValueType.BOOL_SET: np.bool_, - ValueType.UNIX_TIMESTAMP_SET: "datetime64[s]", - ValueType.UUID_SET: object, - ValueType.TIME_UUID_SET: object, } target_dtype = base_type_map.get(value_type, object) diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index 55ad6c6a0b8..5d910abc903 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import copy import itertools import os @@ -20,11 +22,11 @@ cast, ) -import pandas as pd import pyarrow from dateutil.tz import tzlocal from google.protobuf.timestamp_pb2 import Timestamp +from feast._lazy_pandas import pd from feast.aggregation import aggregation_specs_to_agg_ops from feast.constants import FEAST_FS_YAML_FILE_PATH_ENV_NAME from feast.entity import Entity @@ -34,7 +36,6 @@ RequestDataNotFoundInEntityRowsException, ) from feast.field import Field -from feast.infra.compute_engines.backends.pandas_backend import PandasBackend from feast.infra.key_encoding_utils import deserialize_entity_key from feast.protos.feast.serving.ServingService_pb2 import ( FieldStatus, @@ -665,6 +666,30 @@ def _group_feature_refs( return fvs_result, odfvs_result +def construct_response_feature_vector( + values_vector: Iterable[Any], + statuses_vector: Iterable[Any], + timestamp_vector: Iterable[Any], + mapping_indexes: Iterable[List[int]], + output_len: int, +) -> GetOnlineFeaturesResponse.FeatureVector: + values_output: Iterable[Any] = [None] * output_len + statuses_output: Iterable[Any] = [None] * output_len + timestamp_output: Iterable[Any] = [None] * output_len + + for i, destinations in enumerate(mapping_indexes): + for idx in destinations: + values_output[idx] = values_vector[i] # type: ignore[index] + statuses_output[idx] = statuses_vector[i] # type: ignore[index] + timestamp_output[idx] = timestamp_vector[i] # type: ignore[index] + + return GetOnlineFeaturesResponse.FeatureVector( + values=values_output, + statuses=statuses_output, + event_timestamps=timestamp_output, + ) + + def _apply_aggregations_to_response( response_data: Union[pyarrow.Table, Dict[str, List[Any]]], aggregations, @@ -688,6 +713,8 @@ def _apply_aggregations_to_response( if not aggregations: return response_data + from feast.infra.compute_engines.backends.pandas_backend import PandasBackend + backend = PandasBackend() # Convert to pandas DataFrame @@ -726,7 +753,6 @@ def _augment_response_with_on_demand_transforms( feature_refs: List[str], requested_on_demand_feature_views: List["OnDemandFeatureView"], full_feature_names: bool, - feature_types: Optional[Dict[str, "ValueType"]] = None, ): """Computes on demand feature values and adds them to the result rows. @@ -757,9 +783,7 @@ def _augment_response_with_on_demand_transforms( else feature_name ) - initial_response = OnlineResponse( - online_features_response, feature_types=feature_types - ) + initial_response = OnlineResponse(online_features_response) initial_response_arrow: Optional[pyarrow.Table] = None initial_response_dict: Optional[Dict[str, List[Any]]] = None @@ -1108,6 +1132,115 @@ def ensure_request_data_values_exist( raise RequestDataNotFoundInEntityRowsException(feature_names=missing_features) +def _populate_response_from_feature_data( + feature_data: Iterable[ + Tuple[ + Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[ValueProto] + ] + ], + indexes: Iterable[List[int]], + online_features_response: GetOnlineFeaturesResponse, + full_feature_names: bool, + requested_features: Iterable[str], + table: "FeatureView", + output_len: int, + include_feature_view_version_metadata: bool = False, +): + """Populate the GetOnlineFeaturesResponse with feature data. + + This method assumes that `_read_from_online_store` returns data for each + combination of Entities in `entity_rows` in the same order as they + are provided. + + Args: + feature_data: A list of data in Protobuf form which was retrieved from the OnlineStore. + indexes: A list of indexes which should be the same length as `feature_data`. Each list + of indexes corresponds to a set of result rows in `online_features_response`. + online_features_response: The object to populate. + full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names, + changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to + "customer_fv__daily_transactions"). + requested_features: The names of the features in `feature_data`. This should be ordered in the same way as the + data in `feature_data`. + table: The FeatureView that `feature_data` was retrieved from. + output_len: The number of result rows in `online_features_response`. + """ + # Add the feature names to the response. + # Use name_to_use() which includes version tag (e.g. "fv@v2") when a + # version-qualified ref was used, so multi-version queries produce + # distinct column names like "fv@v1__feat" and "fv@v2__feat". + table_name = table.projection.name_to_use() + clean_table_name = table.projection.name_alias or table.projection.name + requested_feature_refs = [ + f"{table_name}__{feature_name}" if full_feature_names else feature_name + for feature_name in requested_features + ] + online_features_response.metadata.feature_names.val.extend(requested_feature_refs) + + # Add version metadata if requested + if include_feature_view_version_metadata: + # Check if this feature view already exists in metadata to avoid duplicates + existing_names = [ + fvm.name for fvm in online_features_response.metadata.feature_view_metadata + ] + if clean_table_name not in existing_names: + fv_metadata = online_features_response.metadata.feature_view_metadata.add() + fv_metadata.name = clean_table_name + # Extract version from the table's current_version_number attribute + fv_metadata.version = getattr(table, "current_version_number", 0) or 0 + + # Process each feature vector in a single pass + for timestamp_vector, statuses_vector, values_vector in feature_data: + response_vector = construct_response_feature_vector( + values_vector, statuses_vector, timestamp_vector, indexes, output_len + ) + online_features_response.results.append(response_vector) + + +def _populate_response_from_feature_data_v2( + feature_data: Iterable[ + Tuple[ + Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[ValueProto] + ] + ], + indexes: Iterable[List[int]], + online_features_response: GetOnlineFeaturesResponse, + requested_features: Iterable[str], + output_len: int, +): + """Populate the GetOnlineFeaturesResponse with feature data. + + This method assumes that `_read_from_online_store` returns data for each + combination of Entities in `entity_rows` in the same order as they + are provided. + + Args: + feature_data: A list of data in Protobuf form which was retrieved from the OnlineStore. + indexes: A list of indexes which should be the same length as `feature_data`. Each list + of indexes corresponds to a set of result rows in `online_features_response`. + online_features_response: The object to populate. + full_feature_names: A boolean that provides the option to add the feature view prefixes to the feature names, + changing them from the format "feature" to "feature_view__feature" (e.g., "daily_transactions" changes to + "customer_fv__daily_transactions"). + requested_features: The names of the features in `feature_data`. This should be ordered in the same way as the + data in `feature_data`. + output_len: The number of result rows in `online_features_response`. + """ + # Add the feature names to the response. + requested_feature_refs = [(feature_name) for feature_name in requested_features] + online_features_response.metadata.feature_names.val.extend(requested_feature_refs) + + timestamps, statuses, values = zip(*feature_data) + + # Populate the result with data fetched from the OnlineStore + # which is guaranteed to be aligned with `requested_features`. + for timestamp_vector, statuses_vector, values_vector in feature_data: + response_vector = construct_response_feature_vector( + values_vector, statuses_vector, timestamp_vector, indexes, output_len + ) + online_features_response.results.append(response_vector) + + def _convert_entity_key_to_proto_to_dict( entity_key_vals: List[EntityKeyProto], ) -> Dict[str, List[ValueProto]]: @@ -1481,108 +1614,35 @@ def _get_entity_key_protos( return entity_key_protos -def _populate_response_from_feature_data( +def _convert_rows_to_protobuf( requested_features: List[str], read_rows: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]], - indexes: Iterable[List[int]], - online_features_response: GetOnlineFeaturesResponse, - full_feature_names: bool, - table: "FeatureView", - output_len: int, - include_feature_view_version_metadata: bool = False, -): - """Populate the GetOnlineFeaturesResponse from raw online_read rows. - - Converts raw rows from the OnlineStore into protobuf FeatureVectors and - appends them to the response. This method assumes that ``online_read`` - returns data for each unique entity in the same order as ``indexes``. - - Args: - requested_features: The names of the features to extract from - each row. Determines the order of FeatureVectors in the response. - read_rows: Raw output from ``OnlineStore.online_read`` — a list of - ``(event_timestamp, feature_dict)`` tuples, one per unique entity. - ``feature_dict`` may be ``None`` when the entity is not found. - indexes: A tuple of lists that maps each unique entity (by position - in ``read_rows``) to one or more output positions in the response. - Used to fan-out deduplicated reads back to the original request rows. - online_features_response: The protobuf response object to populate. - full_feature_names: If True, feature names are prefixed with the - feature view name (e.g. ``"driver_fv__trips_today"``). - table: The FeatureView that ``read_rows`` was retrieved from. - output_len: Total number of result rows in the response. - include_feature_view_version_metadata: If True, version metadata - for the feature view is added to the response. - """ - n_features = len(requested_features) - - table_name = table.projection.name_to_use() - clean_table_name = table.projection.name_alias or table.projection.name - feature_refs = [ - f"{table_name}__{fn}" if full_feature_names else fn for fn in requested_features - ] - online_features_response.metadata.feature_names.val.extend(feature_refs) - - if include_feature_view_version_metadata: - existing_names = [ - fvm.name for fvm in online_features_response.metadata.feature_view_metadata - ] - if clean_table_name not in existing_names: - fv_metadata = online_features_response.metadata.feature_view_metadata.add() - fv_metadata.name = clean_table_name - fv_metadata.version = getattr(table, "current_version_number", 0) or 0 +) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[ValueProto]]]: + n_rows = len(read_rows) null_value = ValueProto() - null_ts = Timestamp() - PRESENT = FieldStatus.PRESENT - NOT_FOUND = FieldStatus.NOT_FOUND + null_status = FieldStatus.NOT_FOUND + present_status = FieldStatus.PRESENT - row_ts_protos = [] + # Pre-compute timestamps once per entity (not per feature) + # This reduces O(features * entities) to O(entities) for timestamp conversion + row_timestamps = [] for row_ts, _ in read_rows: - ts = Timestamp() + ts_proto = Timestamp() if row_ts is not None: - ts.FromDatetime(row_ts) - row_ts_protos.append(ts) - - ts_template = [null_ts] * output_len - indexes_tuple = tuple(indexes) - for row_idx, destinations in enumerate(indexes_tuple): - ts = row_ts_protos[row_idx] - for out_idx in destinations: - ts_template[out_idx] = ts - - feat_values = [[null_value] * output_len for _ in range(n_features)] - feat_statuses = [[NOT_FOUND] * output_len for _ in range(n_features)] - - feat_idx_map = {name: i for i, name in enumerate(requested_features)} - for row_idx, destinations in enumerate(indexes_tuple): - _, feature_data = read_rows[row_idx] - if feature_data is None: - continue - for feat_name, feat_val in feature_data.items(): - f_idx = feat_idx_map.get(feat_name) - if f_idx is not None: - for out_idx in destinations: - feat_values[f_idx][out_idx] = feat_val - feat_statuses[f_idx][out_idx] = PRESENT - - try: - from feast.metrics import track_feature_statuses - - _present = sum(s == PRESENT for row in feat_statuses for s in row) - _not_found = (n_features * output_len) - _present - track_feature_statuses(table.name, _present, _not_found) - except Exception: - pass - - for f_idx in range(n_features): - online_features_response.results.append( - GetOnlineFeaturesResponse.FeatureVector( - values=feat_values[f_idx], - statuses=feat_statuses[f_idx], - event_timestamps=list(ts_template), - ) - ) + ts_proto.FromDatetime(row_ts) + row_timestamps.append(ts_proto) + requested_features_vectors = [] + for feature_name in requested_features: + ts_vector = list(row_timestamps) # Shallow copy of pre-computed timestamps + status_vector = [null_status] * n_rows + value_vector = [null_value] * n_rows + for idx, (_, feature_data) in enumerate(read_rows): + if (feature_data is not None) and (feature_name in feature_data): + status_vector[idx] = present_status + value_vector[idx] = feature_data[feature_name] + requested_features_vectors.append((ts_vector, status_vector, value_vector)) + return requested_features_vectors def has_all_tags(