Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 67 additions & 12 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,28 +148,72 @@ class ChatRequest(BaseModel):
messages: List[ChatMessage]


def _resolve_feature_counts(
def _parse_feature_info(
features: Union[List[str], "feast.FeatureService"],
) -> tuple:
"""Return (feature_count, feature_view_count) from the resolved features.
"""Return ``(feature_view_names, feature_count)`` from resolved features.

``features`` is either a list of ``"feature_view:feature"`` strings or
a ``FeatureService`` with ``feature_view_projections``.

Returns:
(fv_names, feat_count) where fv_names is a list of unique feature
view name strings and feat_count is the total number of features.
"""
from feast.feature_service import FeatureService

if isinstance(features, FeatureService):
projections = features.feature_view_projections
fv_count = len(projections)
fv_names = [p.name for p in projections]
feat_count = sum(len(p.features) for p in projections)
elif isinstance(features, list):
feat_count = len(features)
fv_names = {ref.split(":")[0].split("@")[0] for ref in features if ":" in ref}
fv_count = len(fv_names)
fv_names = list(
{ref.split(":")[0].split("@")[0] for ref in features if ":" in ref}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be not new but this has to handle versioning as well. May be use _parse_feature_ref from feast.utils

fv_names = list({_parse_feature_ref(ref)[0] for ref in features if ":" in ref})

)
else:
fv_names = []
feat_count = 0
fv_count = 0
return str(feat_count), str(fv_count)
return fv_names, feat_count


def _resolve_feature_counts(
features: Union[List[str], "feast.FeatureService"],
) -> tuple:
"""Return ``(feature_count_str, feature_view_count_str)`` for Prometheus labels."""
fv_names, feat_count = _parse_feature_info(features)
return str(feat_count), str(len(fv_names))


def _emit_online_audit(
request: GetOnlineFeaturesRequest,
features: Union[List[str], "feast.FeatureService"],
entity_count: int,
status: str,
latency_ms: float,
):
"""Best-effort audit log emission for online feature requests."""
try:
from feast.permissions.security_manager import get_security_manager

requestor_id = "anonymous"
sm = get_security_manager()
if sm and sm.current_user:
requestor_id = sm.current_user.username or "anonymous"

fv_names, feat_count = _parse_feature_info(features)

feast_metrics.emit_online_audit_log(
requestor_id=requestor_id,
entity_keys=list(request.entities.keys()),
entity_count=entity_count,
feature_views=fv_names,
feature_count=feat_count,
status=status,
latency_ms=latency_ms,
)
except Exception:
logger.warning("Failed to emit online audit log", exc_info=True)


async def _get_features(
Expand Down Expand Up @@ -387,11 +431,22 @@ async def get_online_features(request: GetOnlineFeaturesRequest) -> Any:
include_feature_view_version_metadata=request.include_feature_view_version_metadata,
)

if store._get_provider().async_supported.online.read:
response = await store.get_online_features_async(**read_params) # type: ignore
else:
response = await run_in_threadpool(
lambda: store.get_online_features(**read_params) # type: ignore
audit_start_ms = time.monotonic() * 1000
audit_status = "success"
try:
if store._get_provider().async_supported.online.read:
response = await store.get_online_features_async(**read_params) # type: ignore
else:
response = await run_in_threadpool(
lambda: store.get_online_features(**read_params) # type: ignore
)
except Exception:
audit_status = "error"
raise
finally:
audit_latency_ms = time.monotonic() * 1000 - audit_start_ms
_emit_online_audit(
request, features, entity_count, audit_status, audit_latency_ms
)

response_dict = await run_in_threadpool(
Expand Down
11 changes: 11 additions & 0 deletions sdk/python/feast/infra/feature_servers/base_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ class MetricsConfig(FeastConfigBaseModel):
"""Emit per-feature-view freshness gauges
(feast_feature_freshness_seconds)."""

offline_features: StrictBool = True
"""Emit offline store retrieval metrics
(feast_offline_store_request_total,
feast_offline_store_request_latency_seconds,
feast_offline_store_row_count)."""

audit_logging: StrictBool = False
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document both in infra/feast-operator/config/samples/v1_featurestore_serving.yaml

"""Emit structured JSON audit log entries for online and offline
feature requests via the ``feast.audit`` logger. Captures requestor
identity, entity keys, feature views, row counts, and latency."""


class BaseFeatureServerConfig(FeastConfigBaseModel):
"""Base Feature Server config that should be extended"""
Expand Down
65 changes: 63 additions & 2 deletions sdk/python/feast/infra/offline_stores/offline_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import warnings
from abc import ABC
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -70,6 +72,21 @@ def __init__(
self.max_event_timestamp = max_event_timestamp


def _extract_retrieval_metadata(job: "RetrievalJob") -> tuple:
"""Return ``(feature_view_names, feature_count)`` from a RetrievalJob's metadata."""
try:
meta = job.metadata
if meta:
feature_count = len(meta.features)
feature_views = list(
{ref.split(":")[0] for ref in meta.features if ":" in ref}
)
return feature_views, feature_count
except (NotImplementedError, AttributeError):
pass
return [], 0


class RetrievalJob(ABC):
"""A RetrievalJob manages the execution of a query to retrieve data from the offline store."""

Expand Down Expand Up @@ -152,7 +169,51 @@ def to_arrow(
validation_reference (optional): The validation to apply against the retrieved dataframe.
timeout (optional): The query timeout if applicable.
"""
features_table = self._to_arrow_internal(timeout=timeout)
start_wall = time.monotonic()
status_label = "success"
row_count = 0
try:
features_table = self._to_arrow_internal(timeout=timeout)
row_count = features_table.num_rows
except Exception:
status_label = "error"
raise
finally:
try:
from feast import metrics as feast_metrics

elapsed = time.monotonic() - start_wall

if feast_metrics._config.offline_features:
feast_metrics.offline_store_request_total.labels(
method="to_arrow", status=status_label
).inc()
feast_metrics.offline_store_request_latency_seconds.labels(
method="to_arrow"
).observe(elapsed)
if row_count > 0:
feast_metrics.offline_store_row_count.labels(
method="to_arrow"
).observe(row_count)

if feast_metrics._config.audit_logging:
feature_views, feature_count = _extract_retrieval_metadata(self)
now_iso = datetime.now(tz=timezone.utc).isoformat()
feast_metrics.emit_offline_audit_log(
method="to_arrow",
feature_views=feature_views,
feature_count=feature_count,
row_count=row_count,
status=status_label,
start_time=now_iso,
end_time=now_iso,
Comment on lines +201 to +209
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Offline audit log start_time and end_time are always identical

In RetrievalJob.to_arrow(), the now_iso timestamp used for both start_time and end_time in the audit log is captured at a single point in the finally block (line 201), after the query has already completed. This means both fields always contain the same value, making start_time meaningless. The start_time should instead be derived from now - elapsed (e.g., datetime.now(tz=timezone.utc) - timedelta(seconds=elapsed)) to reflect when the operation actually began.

Suggested change
now_iso = datetime.now(tz=timezone.utc).isoformat()
feast_metrics.emit_offline_audit_log(
method="to_arrow",
feature_views=feature_views,
feature_count=feature_count,
row_count=row_count,
status=status_label,
start_time=now_iso,
end_time=now_iso,
end_time = datetime.now(tz=timezone.utc)
start_time = end_time - __import__('datetime').timedelta(seconds=elapsed)
feast_metrics.emit_offline_audit_log(
method="to_arrow",
feature_views=feature_views,
feature_count=feature_count,
row_count=row_count,
status=status_label,
start_time=start_time.isoformat(),
end_time=end_time.isoformat(),
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

duration_ms=elapsed * 1000,
)
except Exception:
logging.getLogger(__name__).debug(
"Failed to record offline store metrics", exc_info=True
)

if self.on_demand_feature_views:
# Build a mapping of ODFV name to requested feature names
# This ensures we only return the features that were explicitly requested
Expand Down
103 changes: 102 additions & 1 deletion sdk/python/feast/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"""

import atexit
import json
import logging
import os
import shutil
Expand All @@ -51,7 +52,7 @@
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING, List, Optional

import psutil

Expand Down Expand Up @@ -123,6 +124,8 @@ class _MetricsFlags:
push: bool = False
materialization: bool = False
freshness: bool = False
offline_features: bool = False
audit_logging: bool = False


_config = _MetricsFlags()
Expand All @@ -144,6 +147,8 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag
push=True,
materialization=True,
freshness=True,
offline_features=True,
audit_logging=False,
)
return _MetricsFlags(
enabled=True,
Expand All @@ -153,6 +158,8 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag
push=getattr(metrics_config, "push", True),
materialization=getattr(metrics_config, "materialization", True),
freshness=getattr(metrics_config, "freshness", True),
offline_features=getattr(metrics_config, "offline_features", True),
audit_logging=getattr(metrics_config, "audit_logging", False),
)


Expand Down Expand Up @@ -260,6 +267,33 @@ def build_metrics_flags(metrics_config: Optional[object] = None) -> _MetricsFlag
multiprocess_mode="max",
)

# ---------------------------------------------------------------------------
# Offline store retrieval metrics
# ---------------------------------------------------------------------------
offline_store_request_total = Counter(
"feast_offline_store_request_total",
"Total offline store retrieval requests",
["method", "status"],
)
offline_store_request_latency_seconds = Histogram(
"feast_offline_store_request_latency_seconds",
"Latency of offline store retrieval operations in seconds",
["method"],
buckets=(0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0),
)
offline_store_row_count = Histogram(
"feast_offline_store_row_count",
"Number of rows returned by offline store retrieval",
["method"],
buckets=(100, 1000, 10000, 100000, 500000, 1000000, 5000000),
)

# ---------------------------------------------------------------------------
# Audit logger — separate from the main feast logger so operators can
# route SOX-style audit entries to a dedicated sink.
# ---------------------------------------------------------------------------
audit_logger = logging.getLogger("feast.audit")

# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -388,6 +422,71 @@ def track_materialization(
)


def emit_online_audit_log(
*,
requestor_id: str,
entity_keys: List[str],
entity_count: int,
feature_views: List[str],
feature_count: int,
status: str,
latency_ms: float,
):
"""Emit a structured JSON audit log entry for an online feature request."""
if not _config.audit_logging:
return
audit_logger.info(
_json_dumps(
{
"event": "online_feature_request",
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
"requestor_id": requestor_id,
"entity_keys": entity_keys,
"entity_count": entity_count,
"feature_views": feature_views,
"feature_count": feature_count,
"status": status,
"latency_ms": round(latency_ms, 2),
}
)
)


def emit_offline_audit_log(
*,
method: str,
feature_views: List[str],
feature_count: int,
row_count: int,
status: str,
start_time: str,
end_time: str,
duration_ms: float,
):
"""Emit a structured JSON audit log entry for an offline feature retrieval."""
if not _config.audit_logging:
return
audit_logger.info(
_json_dumps(
{
"event": "offline_feature_retrieval",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also log current timestamp ? similar to emit_online_audit_log

"method": method,
"start_time": start_time,
"end_time": end_time,
"feature_views": feature_views,
"feature_count": feature_count,
"row_count": row_count,
"status": status,
"duration_ms": round(duration_ms, 2),
}
)
)


def _json_dumps(obj: dict) -> str:
return json.dumps(obj, separators=(",", ":"))


def update_feature_freshness(
store: "FeatureStore",
) -> None:
Expand Down Expand Up @@ -507,6 +606,8 @@ def start_metrics_server(
push=True,
materialization=True,
freshness=True,
offline_features=True,
audit_logging=False,
)

from prometheus_client import CollectorRegistry, make_wsgi_app
Expand Down
Loading
Loading