Skip to content
Open
6 changes: 5 additions & 1 deletion protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import "feast/types/Value.proto";
import "feast/core/Feature.proto";

// Defines a Data Source that can be used source Feature data
// Next available id: 28
// Next available id: 29
message DataSource {
// Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not,
// but they are going to be reserved for backwards compatibility.
Expand Down Expand Up @@ -81,6 +81,10 @@ message DataSource {
// Must specify creation timestamp column name
string created_timestamp_column = 5;

// (Optional) Type of the timestamp_field column ("TIMESTAMP" or "DATE").
// When set to "DATE", SQL generation uses date-only comparisons.
string timestamp_field_type = 28;

// This is an internal field that is represents the python class for the data source object a proto object represents.
// This should be set by feast, and not by users.
// The field is used primarily by custom data sources and is mandatory for them to set. Feast may set it for
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ class DataSource(ABC):
tags: Dict[str, str]
owner: str
date_partition_column: str
timestamp_field_type: str
created_timestamp: Optional[datetime]
last_updated_timestamp: Optional[datetime]

Expand All @@ -219,6 +220,7 @@ def __init__(
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
date_partition_column: Optional[str] = None,
timestamp_field_type: Optional[str] = None,
):
"""
Creates a DataSource object.
Expand All @@ -237,6 +239,9 @@ def __init__(
owner (optional): The owner of the data source, typically the email of the primary
maintainer.
date_partition_column (optional): Timestamp column used for partitioning. Not supported by all stores
timestamp_field_type (optional): Type of the timestamp_field column.
Defaults to "TIMESTAMP". Set to "DATE" when the event timestamp column
is a DATE type, so SQL generation uses date-only comparisons.
"""
self.name = name
self.timestamp_field = timestamp_field or ""
Expand All @@ -257,6 +262,7 @@ def __init__(
self.date_partition_column = (
date_partition_column if date_partition_column else ""
)
self.timestamp_field_type = timestamp_field_type if timestamp_field_type else ""
now = _utc_now()
self.created_timestamp = now
self.last_updated_timestamp = now
Expand All @@ -280,6 +286,7 @@ def __eq__(self, other):
or self.created_timestamp_column != other.created_timestamp_column
or self.field_mapping != other.field_mapping
or self.date_partition_column != other.date_partition_column
or self.timestamp_field_type != other.timestamp_field_type
or self.description != other.description
or self.tags != other.tags
or self.owner != other.owner
Expand Down
21 changes: 19 additions & 2 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,18 @@ def pull_latest_from_table_or_query(
project=project_id,
location=config.offline_store.location,
)
cast_style: Literal["date_func", "timestamp_func"] = (
"date_func"
if data_source.timestamp_field_type == "DATE"
else "timestamp_func"
)
timestamp_filter = get_timestamp_filter_sql(
start_date,
end_date,
timestamp_field,
date_partition_column=data_source.date_partition_column,
quote_fields=False,
cast_style="timestamp_func",
cast_style=cast_style,
)
query = f"""
SELECT
Expand Down Expand Up @@ -220,13 +225,18 @@ def pull_all_from_table_or_query(
+ BigQueryOfflineStore._escape_query_columns(feature_name_columns)
+ timestamp_fields
)
cast_style: Literal["date_func", "timestamp_func"] = (
"date_func"
if data_source.timestamp_field_type == "DATE"
else "timestamp_func"
)
timestamp_filter = get_timestamp_filter_sql(
start_date,
end_date,
timestamp_field,
date_partition_column=data_source.date_partition_column,
quote_fields=False,
cast_style="timestamp_func",
cast_style=cast_style,
)
query = f"""
SELECT {field_string}
Expand Down Expand Up @@ -934,10 +944,17 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
{% if featureview.timestamp_field_type == "DATE" %}
WHERE {{ featureview.timestamp_field }} <= DATE('{{ featureview.max_event_timestamp[:10] }}')
{% if featureview.ttl == 0 %}{% else %}
AND {{ featureview.timestamp_field }} >= DATE('{{ featureview.min_event_timestamp[:10] }}')
{% endif %}
{% else %}
WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}
AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}'
{% endif %}
{% endif %}
{% if featureview.date_partition_column %}
AND {{ featureview.date_partition_column | backticks }} <= '{{ featureview.max_event_timestamp[:10] }}'
{% if featureview.min_event_timestamp %}
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/infra/offline_stores/bigquery_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(
created_timestamp_column: Optional[str] = "",
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = None,
timestamp_field_type: Optional[str] = None,
query: Optional[str] = None,
description: Optional[str] = "",
tags: Optional[Dict[str, str]] = None,
Expand All @@ -54,6 +55,9 @@ def __init__(
field_mapping (optional): A dictionary mapping of column names in this data source to feature names in a feature table
or view. Only used for feature columns, not entities or timestamp columns.
date_partition_column (optional): Timestamp column used for partitioning.
timestamp_field_type (optional): Type of the timestamp_field column.
Set to "DATE" when the event timestamp column is a DATE type,
so SQL generation uses date-only comparisons instead of TIMESTAMP().
query (optional): The query to be executed to obtain the features. When both 'table'
and 'query' are provided, 'query' takes priority for reads.
description (optional): A human-readable description.
Expand Down Expand Up @@ -81,6 +85,7 @@ def __init__(
created_timestamp_column=created_timestamp_column,
field_mapping=field_mapping,
date_partition_column=date_partition_column,
timestamp_field_type=timestamp_field_type,
description=description,
tags=tags,
owner=owner,
Expand Down Expand Up @@ -121,6 +126,7 @@ def from_proto(data_source: DataSourceProto):
timestamp_field=data_source.timestamp_field,
created_timestamp_column=data_source.created_timestamp_column,
date_partition_column=data_source.date_partition_column,
timestamp_field_type=data_source.timestamp_field_type or None,
query=data_source.bigquery_options.query,
description=data_source.description,
tags=dict(data_source.tags),
Expand All @@ -139,6 +145,7 @@ def _to_proto_impl(self) -> DataSourceProto:
timestamp_field=self.timestamp_field,
created_timestamp_column=self.created_timestamp_column,
date_partition_column=self.date_partition_column,
timestamp_field_type=self.timestamp_field_type,
)

return data_source_proto
Expand Down
14 changes: 12 additions & 2 deletions sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class FeatureViewQueryContext:
date_partition_column: Optional[
str
] # this attribute is added because partition pruning affects Athena's query performance.
timestamp_field_type: Optional[str]


def get_feature_view_query_context(
Expand Down Expand Up @@ -160,6 +161,10 @@ def get_feature_view_query_context(
feature_view.batch_source.date_partition_column,
)

timestamp_field_type = getattr(
feature_view.batch_source, "timestamp_field_type", ""
)

max_event_timestamp = to_naive_utc(entity_df_timestamp_range[1]).isoformat()
min_event_timestamp = None
if feature_view.ttl:
Expand All @@ -181,6 +186,7 @@ def get_feature_view_query_context(
min_event_timestamp=min_event_timestamp,
max_event_timestamp=max_event_timestamp,
date_partition_column=date_partition_column,
timestamp_field_type=timestamp_field_type or None,
)
query_context.append(context)

Expand Down Expand Up @@ -340,7 +346,7 @@ def get_timestamp_filter_sql(
date_partition_column: Optional[str] = None,
tz: Optional[timezone] = None,
cast_style: Literal[
"timestamp", "timestamp_func", "timestamptz", "raw"
"timestamp", "timestamp_func", "timestamptz", "raw", "date_func"
] = "timestamp",
date_time_separator: str = "T",
quote_fields: bool = True,
Expand All @@ -355,10 +361,11 @@ def get_timestamp_filter_sql(
date_partition_column: optional partition column (for pruning)
tz: optional timezone for datetime inputs
cast_style: one of:
- "timestamp": TIMESTAMP '...' → Common Sql engine Snowflake, Redshift etc.
- "timestamp": TIMESTAMP '...' → Common Sql engine Snowflake, Redshift etc.
- "timestamp_func": TIMESTAMP('...') → BigQuery, Couchbase etc.
- "timestamptz": '...'::timestamptz → PostgreSQL
- "raw": '...' → no cast, string only
- "date_func": DATE('...') → BigQuery DATE columns
date_time_separator: separator for datetime strings (default is "T")
(e.g. "2023-10-01T00:00:00" or "2023-10-01 00:00:00")
quote_fields: whether to quote the timestamp and partition column names
Expand All @@ -384,6 +391,9 @@ def format_casted_ts(val: Union[str, datetime]) -> str:
return f"TIMESTAMP '{val_str}'"
elif cast_style == "timestamp_func":
return f"TIMESTAMP('{val_str}')"
elif cast_style == "date_func":
date_str = val_str[:10] if len(val_str) >= 10 else val_str
return f"DATE('{date_str}')"
elif cast_style == "timestamptz":
return f"'{val_str}'::{cast_style}"
else:
Expand Down
Loading
Loading