diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index b91296dca31..2889fba042f 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -29,7 +29,7 @@ import "feast/types/Value.proto"; import "feast/core/Feature.proto"; // Defines a Data Source that can be used source Feature data -// Next available id: 28 +// Next available id: 29 message DataSource { // Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not, // but they are going to be reserved for backwards compatibility. @@ -81,6 +81,10 @@ message DataSource { // Must specify creation timestamp column name string created_timestamp_column = 5; + // (Optional) Type of the timestamp_field column ("TIMESTAMP" or "DATE"). + // When set to "DATE", SQL generation uses date-only comparisons. + string timestamp_field_type = 28; + // This is an internal field that is represents the python class for the data source object a proto object represents. // This should be set by feast, and not by users. // The field is used primarily by custom data sources and is mandatory for them to set. Feast may set it for diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 2d4997ae786..e14f2a49383 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -205,6 +205,7 @@ class DataSource(ABC): tags: Dict[str, str] owner: str date_partition_column: str + timestamp_field_type: str created_timestamp: Optional[datetime] last_updated_timestamp: Optional[datetime] @@ -219,6 +220,7 @@ def __init__( tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", date_partition_column: Optional[str] = None, + timestamp_field_type: Optional[str] = None, ): """ Creates a DataSource object. @@ -237,6 +239,9 @@ def __init__( owner (optional): The owner of the data source, typically the email of the primary maintainer. date_partition_column (optional): Timestamp column used for partitioning. Not supported by all stores + timestamp_field_type (optional): Type of the timestamp_field column. + Defaults to "TIMESTAMP". Set to "DATE" when the event timestamp column + is a DATE type, so SQL generation uses date-only comparisons. """ self.name = name self.timestamp_field = timestamp_field or "" @@ -257,6 +262,7 @@ def __init__( self.date_partition_column = ( date_partition_column if date_partition_column else "" ) + self.timestamp_field_type = timestamp_field_type if timestamp_field_type else "" now = _utc_now() self.created_timestamp = now self.last_updated_timestamp = now @@ -280,6 +286,7 @@ def __eq__(self, other): or self.created_timestamp_column != other.created_timestamp_column or self.field_mapping != other.field_mapping or self.date_partition_column != other.date_partition_column + or self.timestamp_field_type != other.timestamp_field_type or self.description != other.description or self.tags != other.tags or self.owner != other.owner diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 948cfcf1ff0..eda17a34cd0 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -161,13 +161,18 @@ def pull_latest_from_table_or_query( project=project_id, location=config.offline_store.location, ) + cast_style: Literal["date_func", "timestamp_func"] = ( + "date_func" + if data_source.timestamp_field_type == "DATE" + else "timestamp_func" + ) timestamp_filter = get_timestamp_filter_sql( start_date, end_date, timestamp_field, date_partition_column=data_source.date_partition_column, quote_fields=False, - cast_style="timestamp_func", + cast_style=cast_style, ) query = f""" SELECT @@ -220,13 +225,18 @@ def pull_all_from_table_or_query( + BigQueryOfflineStore._escape_query_columns(feature_name_columns) + timestamp_fields ) + cast_style: Literal["date_func", "timestamp_func"] = ( + "date_func" + if data_source.timestamp_field_type == "DATE" + else "timestamp_func" + ) timestamp_filter = get_timestamp_filter_sql( start_date, end_date, timestamp_field, date_partition_column=data_source.date_partition_column, quote_fields=False, - cast_style="timestamp_func", + cast_style=cast_style, ) query = f""" SELECT {field_string} @@ -934,10 +944,17 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField] {% if loop.last %}{% else %}, {% endif %} {% endfor %} FROM {{ featureview.table_subquery }} + {% if featureview.timestamp_field_type == "DATE" %} + WHERE {{ featureview.timestamp_field }} <= DATE('{{ featureview.max_event_timestamp[:10] }}') + {% if featureview.ttl == 0 %}{% else %} + AND {{ featureview.timestamp_field }} >= DATE('{{ featureview.min_event_timestamp[:10] }}') + {% endif %} + {% else %} WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}' {% if featureview.ttl == 0 %}{% else %} AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}' {% endif %} + {% endif %} {% if featureview.date_partition_column %} AND {{ featureview.date_partition_column | backticks }} <= '{{ featureview.max_event_timestamp[:10] }}' {% if featureview.min_event_timestamp %} diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index 7b476afdcda..5fdc29a19fb 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -35,6 +35,7 @@ def __init__( created_timestamp_column: Optional[str] = "", field_mapping: Optional[Dict[str, str]] = None, date_partition_column: Optional[str] = None, + timestamp_field_type: Optional[str] = None, query: Optional[str] = None, description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, @@ -54,6 +55,9 @@ def __init__( field_mapping (optional): A dictionary mapping of column names in this data source to feature names in a feature table or view. Only used for feature columns, not entities or timestamp columns. date_partition_column (optional): Timestamp column used for partitioning. + timestamp_field_type (optional): Type of the timestamp_field column. + Set to "DATE" when the event timestamp column is a DATE type, + so SQL generation uses date-only comparisons instead of TIMESTAMP(). query (optional): The query to be executed to obtain the features. When both 'table' and 'query' are provided, 'query' takes priority for reads. description (optional): A human-readable description. @@ -81,6 +85,7 @@ def __init__( created_timestamp_column=created_timestamp_column, field_mapping=field_mapping, date_partition_column=date_partition_column, + timestamp_field_type=timestamp_field_type, description=description, tags=tags, owner=owner, @@ -121,6 +126,7 @@ def from_proto(data_source: DataSourceProto): timestamp_field=data_source.timestamp_field, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, + timestamp_field_type=data_source.timestamp_field_type or None, query=data_source.bigquery_options.query, description=data_source.description, tags=dict(data_source.tags), @@ -139,6 +145,7 @@ def _to_proto_impl(self) -> DataSourceProto: timestamp_field=self.timestamp_field, created_timestamp_column=self.created_timestamp_column, date_partition_column=self.date_partition_column, + timestamp_field_type=self.timestamp_field_type, ) return data_source_proto diff --git a/sdk/python/feast/infra/offline_stores/offline_utils.py b/sdk/python/feast/infra/offline_stores/offline_utils.py index 5664e6f45a6..fee87dde595 100644 --- a/sdk/python/feast/infra/offline_stores/offline_utils.py +++ b/sdk/python/feast/infra/offline_stores/offline_utils.py @@ -98,6 +98,7 @@ class FeatureViewQueryContext: date_partition_column: Optional[ str ] # this attribute is added because partition pruning affects Athena's query performance. + timestamp_field_type: Optional[str] def get_feature_view_query_context( @@ -160,6 +161,10 @@ def get_feature_view_query_context( feature_view.batch_source.date_partition_column, ) + timestamp_field_type = getattr( + feature_view.batch_source, "timestamp_field_type", "" + ) + max_event_timestamp = to_naive_utc(entity_df_timestamp_range[1]).isoformat() min_event_timestamp = None if feature_view.ttl: @@ -181,6 +186,7 @@ def get_feature_view_query_context( min_event_timestamp=min_event_timestamp, max_event_timestamp=max_event_timestamp, date_partition_column=date_partition_column, + timestamp_field_type=timestamp_field_type or None, ) query_context.append(context) @@ -340,7 +346,7 @@ def get_timestamp_filter_sql( date_partition_column: Optional[str] = None, tz: Optional[timezone] = None, cast_style: Literal[ - "timestamp", "timestamp_func", "timestamptz", "raw" + "timestamp", "timestamp_func", "timestamptz", "raw", "date_func" ] = "timestamp", date_time_separator: str = "T", quote_fields: bool = True, @@ -355,10 +361,11 @@ def get_timestamp_filter_sql( date_partition_column: optional partition column (for pruning) tz: optional timezone for datetime inputs cast_style: one of: - - "timestamp": TIMESTAMP '...' → Common Sql engine Snowflake, Redshift etc. + - "timestamp": TIMESTAMP '...' → Common Sql engine Snowflake, Redshift etc. - "timestamp_func": TIMESTAMP('...') → BigQuery, Couchbase etc. - "timestamptz": '...'::timestamptz → PostgreSQL - "raw": '...' → no cast, string only + - "date_func": DATE('...') → BigQuery DATE columns date_time_separator: separator for datetime strings (default is "T") (e.g. "2023-10-01T00:00:00" or "2023-10-01 00:00:00") quote_fields: whether to quote the timestamp and partition column names @@ -384,6 +391,9 @@ def format_casted_ts(val: Union[str, datetime]) -> str: return f"TIMESTAMP '{val_str}'" elif cast_style == "timestamp_func": return f"TIMESTAMP('{val_str}')" + elif cast_style == "date_func": + date_str = val_str[:10] if len(val_str) >= 10 else val_str + return f"DATE('{date_str}')" elif cast_style == "timestamptz": return f"'{val_str}'::{cast_style}" else: diff --git a/sdk/python/feast/protos/feast/core/DataSource_pb2.py b/sdk/python/feast/protos/feast/core/DataSource_pb2.py index f3086233584..51dee5652a2 100644 --- a/sdk/python/feast/protos/feast/core/DataSource_pb2.py +++ b/sdk/python/feast/protos/feast/core/DataSource_pb2.py @@ -19,7 +19,7 @@ from feast.protos.feast.core import Feature_pb2 as feast_dot_core_dot_Feature__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1b\x66\x65\x61st/core/DataSource.proto\x12\nfeast.core\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1b\x66\x65\x61st/core/DataFormat.proto\x1a\x17\x66\x65\x61st/types/Value.proto\x1a\x18\x66\x65\x61st/core/Feature.proto\"\x89\x18\n\nDataSource\x12\x0c\n\x04name\x18\x14 \x01(\t\x12\x0f\n\x07project\x18\x15 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x17 \x01(\t\x12.\n\x04tags\x18\x18 \x03(\x0b\x32 .feast.core.DataSource.TagsEntry\x12\r\n\x05owner\x18\x19 \x01(\t\x12/\n\x04type\x18\x01 \x01(\x0e\x32!.feast.core.DataSource.SourceType\x12?\n\rfield_mapping\x18\x02 \x03(\x0b\x32(.feast.core.DataSource.FieldMappingEntry\x12\x17\n\x0ftimestamp_field\x18\x03 \x01(\t\x12\x1d\n\x15\x64\x61te_partition_column\x18\x04 \x01(\t\x12 \n\x18\x63reated_timestamp_column\x18\x05 \x01(\t\x12\x1e\n\x16\x64\x61ta_source_class_type\x18\x11 \x01(\t\x12,\n\x0c\x62\x61tch_source\x18\x1a \x01(\x0b\x32\x16.feast.core.DataSource\x12/\n\x04meta\x18\x32 \x01(\x0b\x32!.feast.core.DataSource.SourceMeta\x12:\n\x0c\x66ile_options\x18\x0b \x01(\x0b\x32\".feast.core.DataSource.FileOptionsH\x00\x12\x42\n\x10\x62igquery_options\x18\x0c \x01(\x0b\x32&.feast.core.DataSource.BigQueryOptionsH\x00\x12<\n\rkafka_options\x18\r \x01(\x0b\x32#.feast.core.DataSource.KafkaOptionsH\x00\x12@\n\x0fkinesis_options\x18\x0e \x01(\x0b\x32%.feast.core.DataSource.KinesisOptionsH\x00\x12\x42\n\x10redshift_options\x18\x0f \x01(\x0b\x32&.feast.core.DataSource.RedshiftOptionsH\x00\x12I\n\x14request_data_options\x18\x12 \x01(\x0b\x32).feast.core.DataSource.RequestDataOptionsH\x00\x12\x44\n\x0e\x63ustom_options\x18\x10 \x01(\x0b\x32*.feast.core.DataSource.CustomSourceOptionsH\x00\x12\x44\n\x11snowflake_options\x18\x13 \x01(\x0b\x32\'.feast.core.DataSource.SnowflakeOptionsH\x00\x12:\n\x0cpush_options\x18\x16 \x01(\x0b\x32\".feast.core.DataSource.PushOptionsH\x00\x12<\n\rspark_options\x18\x1b \x01(\x0b\x32#.feast.core.DataSource.SparkOptionsH\x00\x12<\n\rtrino_options\x18\x1e \x01(\x0b\x32#.feast.core.DataSource.TrinoOptionsH\x00\x12>\n\x0e\x61thena_options\x18# \x01(\x0b\x32$.feast.core.DataSource.AthenaOptionsH\x00\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x33\n\x11\x46ieldMappingEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\xf5\x01\n\nSourceMeta\x12:\n\x16\x65\x61rliestEventTimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x38\n\x14latestEventTimestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x35\n\x11\x63reated_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12:\n\x16last_updated_timestamp\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x1a\x65\n\x0b\x46ileOptions\x12+\n\x0b\x66ile_format\x18\x01 \x01(\x0b\x32\x16.feast.core.FileFormat\x12\x0b\n\x03uri\x18\x02 \x01(\t\x12\x1c\n\x14s3_endpoint_override\x18\x03 \x01(\t\x1a/\n\x0f\x42igQueryOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x1a,\n\x0cTrinoOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x1a\xae\x01\n\x0cKafkaOptions\x12\x1f\n\x17kafka_bootstrap_servers\x18\x01 \x01(\t\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x30\n\x0emessage_format\x18\x03 \x01(\x0b\x32\x18.feast.core.StreamFormat\x12<\n\x19watermark_delay_threshold\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x1a\x66\n\x0eKinesisOptions\x12\x0e\n\x06region\x18\x01 \x01(\t\x12\x13\n\x0bstream_name\x18\x02 \x01(\t\x12/\n\rrecord_format\x18\x03 \x01(\x0b\x32\x18.feast.core.StreamFormat\x1aQ\n\x0fRedshiftOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0e\n\x06schema\x18\x03 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x04 \x01(\t\x1aT\n\rAthenaOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x03 \x01(\t\x12\x13\n\x0b\x64\x61ta_source\x18\x04 \x01(\t\x1aX\n\x10SnowflakeOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0e\n\x06schema\x18\x03 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x04 \x01(\tJ\x04\x08\x05\x10\x06\x1a\xa4\x01\n\x0cSparkOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0c\n\x04path\x18\x03 \x01(\t\x12\x13\n\x0b\x66ile_format\x18\x04 \x01(\t\x12$\n\x1c\x64\x61te_partition_column_format\x18\x05 \x01(\t\x12-\n\x0ctable_format\x18\x06 \x01(\x0b\x32\x17.feast.core.TableFormat\x1a,\n\x13\x43ustomSourceOptions\x12\x15\n\rconfiguration\x18\x01 \x01(\x0c\x1a\xf7\x01\n\x12RequestDataOptions\x12Z\n\x11\x64\x65precated_schema\x18\x02 \x03(\x0b\x32?.feast.core.DataSource.RequestDataOptions.DeprecatedSchemaEntry\x12)\n\x06schema\x18\x03 \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x1aT\n\x15\x44\x65precatedSchemaEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12*\n\x05value\x18\x02 \x01(\x0e\x32\x1b.feast.types.ValueType.Enum:\x02\x38\x01J\x04\x08\x01\x10\x02\x1a\x13\n\x0bPushOptionsJ\x04\x08\x01\x10\x02\"\xf8\x01\n\nSourceType\x12\x0b\n\x07INVALID\x10\x00\x12\x0e\n\nBATCH_FILE\x10\x01\x12\x13\n\x0f\x42\x41TCH_SNOWFLAKE\x10\x08\x12\x12\n\x0e\x42\x41TCH_BIGQUERY\x10\x02\x12\x12\n\x0e\x42\x41TCH_REDSHIFT\x10\x05\x12\x10\n\x0cSTREAM_KAFKA\x10\x03\x12\x12\n\x0eSTREAM_KINESIS\x10\x04\x12\x11\n\rCUSTOM_SOURCE\x10\x06\x12\x12\n\x0eREQUEST_SOURCE\x10\x07\x12\x0f\n\x0bPUSH_SOURCE\x10\t\x12\x0f\n\x0b\x42\x41TCH_TRINO\x10\n\x12\x0f\n\x0b\x42\x41TCH_SPARK\x10\x0b\x12\x10\n\x0c\x42\x41TCH_ATHENA\x10\x0c\x42\t\n\x07optionsJ\x04\x08\x06\x10\x0b\"=\n\x0e\x44\x61taSourceList\x12+\n\x0b\x64\x61tasources\x18\x01 \x03(\x0b\x32\x16.feast.core.DataSourceBT\n\x10\x66\x65\x61st.proto.coreB\x0f\x44\x61taSourceProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1b\x66\x65\x61st/core/DataSource.proto\x12\nfeast.core\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1b\x66\x65\x61st/core/DataFormat.proto\x1a\x17\x66\x65\x61st/types/Value.proto\x1a\x18\x66\x65\x61st/core/Feature.proto\"\xa7\x18\n\nDataSource\x12\x0c\n\x04name\x18\x14 \x01(\t\x12\x0f\n\x07project\x18\x15 \x01(\t\x12\x13\n\x0b\x64\x65scription\x18\x17 \x01(\t\x12.\n\x04tags\x18\x18 \x03(\x0b\x32 .feast.core.DataSource.TagsEntry\x12\r\n\x05owner\x18\x19 \x01(\t\x12/\n\x04type\x18\x01 \x01(\x0e\x32!.feast.core.DataSource.SourceType\x12?\n\rfield_mapping\x18\x02 \x03(\x0b\x32(.feast.core.DataSource.FieldMappingEntry\x12\x17\n\x0ftimestamp_field\x18\x03 \x01(\t\x12\x1d\n\x15\x64\x61te_partition_column\x18\x04 \x01(\t\x12 \n\x18\x63reated_timestamp_column\x18\x05 \x01(\t\x12\x1c\n\x14timestamp_field_type\x18\x1c \x01(\t\x12\x1e\n\x16\x64\x61ta_source_class_type\x18\x11 \x01(\t\x12,\n\x0c\x62\x61tch_source\x18\x1a \x01(\x0b\x32\x16.feast.core.DataSource\x12/\n\x04meta\x18\x32 \x01(\x0b\x32!.feast.core.DataSource.SourceMeta\x12:\n\x0c\x66ile_options\x18\x0b \x01(\x0b\x32\".feast.core.DataSource.FileOptionsH\x00\x12\x42\n\x10\x62igquery_options\x18\x0c \x01(\x0b\x32&.feast.core.DataSource.BigQueryOptionsH\x00\x12<\n\rkafka_options\x18\r \x01(\x0b\x32#.feast.core.DataSource.KafkaOptionsH\x00\x12@\n\x0fkinesis_options\x18\x0e \x01(\x0b\x32%.feast.core.DataSource.KinesisOptionsH\x00\x12\x42\n\x10redshift_options\x18\x0f \x01(\x0b\x32&.feast.core.DataSource.RedshiftOptionsH\x00\x12I\n\x14request_data_options\x18\x12 \x01(\x0b\x32).feast.core.DataSource.RequestDataOptionsH\x00\x12\x44\n\x0e\x63ustom_options\x18\x10 \x01(\x0b\x32*.feast.core.DataSource.CustomSourceOptionsH\x00\x12\x44\n\x11snowflake_options\x18\x13 \x01(\x0b\x32\'.feast.core.DataSource.SnowflakeOptionsH\x00\x12:\n\x0cpush_options\x18\x16 \x01(\x0b\x32\".feast.core.DataSource.PushOptionsH\x00\x12<\n\rspark_options\x18\x1b \x01(\x0b\x32#.feast.core.DataSource.SparkOptionsH\x00\x12<\n\rtrino_options\x18\x1e \x01(\x0b\x32#.feast.core.DataSource.TrinoOptionsH\x00\x12>\n\x0e\x61thena_options\x18# \x01(\x0b\x32$.feast.core.DataSource.AthenaOptionsH\x00\x1a+\n\tTagsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\x33\n\x11\x46ieldMappingEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x1a\xf5\x01\n\nSourceMeta\x12:\n\x16\x65\x61rliestEventTimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x38\n\x14latestEventTimestamp\x18\x02 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x35\n\x11\x63reated_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12:\n\x16last_updated_timestamp\x18\x04 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x1a\x65\n\x0b\x46ileOptions\x12+\n\x0b\x66ile_format\x18\x01 \x01(\x0b\x32\x16.feast.core.FileFormat\x12\x0b\n\x03uri\x18\x02 \x01(\t\x12\x1c\n\x14s3_endpoint_override\x18\x03 \x01(\t\x1a/\n\x0f\x42igQueryOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x1a,\n\x0cTrinoOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x1a\xae\x01\n\x0cKafkaOptions\x12\x1f\n\x17kafka_bootstrap_servers\x18\x01 \x01(\t\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x30\n\x0emessage_format\x18\x03 \x01(\x0b\x32\x18.feast.core.StreamFormat\x12<\n\x19watermark_delay_threshold\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x1a\x66\n\x0eKinesisOptions\x12\x0e\n\x06region\x18\x01 \x01(\t\x12\x13\n\x0bstream_name\x18\x02 \x01(\t\x12/\n\rrecord_format\x18\x03 \x01(\x0b\x32\x18.feast.core.StreamFormat\x1aQ\n\x0fRedshiftOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0e\n\x06schema\x18\x03 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x04 \x01(\t\x1aT\n\rAthenaOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x03 \x01(\t\x12\x13\n\x0b\x64\x61ta_source\x18\x04 \x01(\t\x1aX\n\x10SnowflakeOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0e\n\x06schema\x18\x03 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x04 \x01(\tJ\x04\x08\x05\x10\x06\x1a\xa4\x01\n\x0cSparkOptions\x12\r\n\x05table\x18\x01 \x01(\t\x12\r\n\x05query\x18\x02 \x01(\t\x12\x0c\n\x04path\x18\x03 \x01(\t\x12\x13\n\x0b\x66ile_format\x18\x04 \x01(\t\x12$\n\x1c\x64\x61te_partition_column_format\x18\x05 \x01(\t\x12-\n\x0ctable_format\x18\x06 \x01(\x0b\x32\x17.feast.core.TableFormat\x1a,\n\x13\x43ustomSourceOptions\x12\x15\n\rconfiguration\x18\x01 \x01(\x0c\x1a\xf7\x01\n\x12RequestDataOptions\x12Z\n\x11\x64\x65precated_schema\x18\x02 \x03(\x0b\x32?.feast.core.DataSource.RequestDataOptions.DeprecatedSchemaEntry\x12)\n\x06schema\x18\x03 \x03(\x0b\x32\x19.feast.core.FeatureSpecV2\x1aT\n\x15\x44\x65precatedSchemaEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12*\n\x05value\x18\x02 \x01(\x0e\x32\x1b.feast.types.ValueType.Enum:\x02\x38\x01J\x04\x08\x01\x10\x02\x1a\x13\n\x0bPushOptionsJ\x04\x08\x01\x10\x02\"\xf8\x01\n\nSourceType\x12\x0b\n\x07INVALID\x10\x00\x12\x0e\n\nBATCH_FILE\x10\x01\x12\x13\n\x0f\x42\x41TCH_SNOWFLAKE\x10\x08\x12\x12\n\x0e\x42\x41TCH_BIGQUERY\x10\x02\x12\x12\n\x0e\x42\x41TCH_REDSHIFT\x10\x05\x12\x10\n\x0cSTREAM_KAFKA\x10\x03\x12\x12\n\x0eSTREAM_KINESIS\x10\x04\x12\x11\n\rCUSTOM_SOURCE\x10\x06\x12\x12\n\x0eREQUEST_SOURCE\x10\x07\x12\x0f\n\x0bPUSH_SOURCE\x10\t\x12\x0f\n\x0b\x42\x41TCH_TRINO\x10\n\x12\x0f\n\x0b\x42\x41TCH_SPARK\x10\x0b\x12\x10\n\x0c\x42\x41TCH_ATHENA\x10\x0c\x42\t\n\x07optionsJ\x04\x08\x06\x10\x0b\"=\n\x0e\x44\x61taSourceList\x12+\n\x0b\x64\x61tasources\x18\x01 \x03(\x0b\x32\x16.feast.core.DataSourceBT\n\x10\x66\x65\x61st.proto.coreB\x0f\x44\x61taSourceProtoZ/github.com/feast-dev/feast/go/protos/feast/coreb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -34,41 +34,41 @@ _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._options = None _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_options = b'8\001' _globals['_DATASOURCE']._serialized_start=189 - _globals['_DATASOURCE']._serialized_end=3270 - _globals['_DATASOURCE_TAGSENTRY']._serialized_start=1436 - _globals['_DATASOURCE_TAGSENTRY']._serialized_end=1479 - _globals['_DATASOURCE_FIELDMAPPINGENTRY']._serialized_start=1481 - _globals['_DATASOURCE_FIELDMAPPINGENTRY']._serialized_end=1532 - _globals['_DATASOURCE_SOURCEMETA']._serialized_start=1535 - _globals['_DATASOURCE_SOURCEMETA']._serialized_end=1780 - _globals['_DATASOURCE_FILEOPTIONS']._serialized_start=1782 - _globals['_DATASOURCE_FILEOPTIONS']._serialized_end=1883 - _globals['_DATASOURCE_BIGQUERYOPTIONS']._serialized_start=1885 - _globals['_DATASOURCE_BIGQUERYOPTIONS']._serialized_end=1932 - _globals['_DATASOURCE_TRINOOPTIONS']._serialized_start=1934 - _globals['_DATASOURCE_TRINOOPTIONS']._serialized_end=1978 - _globals['_DATASOURCE_KAFKAOPTIONS']._serialized_start=1981 - _globals['_DATASOURCE_KAFKAOPTIONS']._serialized_end=2155 - _globals['_DATASOURCE_KINESISOPTIONS']._serialized_start=2157 - _globals['_DATASOURCE_KINESISOPTIONS']._serialized_end=2259 - _globals['_DATASOURCE_REDSHIFTOPTIONS']._serialized_start=2261 - _globals['_DATASOURCE_REDSHIFTOPTIONS']._serialized_end=2342 - _globals['_DATASOURCE_ATHENAOPTIONS']._serialized_start=2344 - _globals['_DATASOURCE_ATHENAOPTIONS']._serialized_end=2428 - _globals['_DATASOURCE_SNOWFLAKEOPTIONS']._serialized_start=2430 - _globals['_DATASOURCE_SNOWFLAKEOPTIONS']._serialized_end=2518 - _globals['_DATASOURCE_SPARKOPTIONS']._serialized_start=2521 - _globals['_DATASOURCE_SPARKOPTIONS']._serialized_end=2685 - _globals['_DATASOURCE_CUSTOMSOURCEOPTIONS']._serialized_start=2687 - _globals['_DATASOURCE_CUSTOMSOURCEOPTIONS']._serialized_end=2731 - _globals['_DATASOURCE_REQUESTDATAOPTIONS']._serialized_start=2734 - _globals['_DATASOURCE_REQUESTDATAOPTIONS']._serialized_end=2981 - _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_start=2891 - _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_end=2975 - _globals['_DATASOURCE_PUSHOPTIONS']._serialized_start=2983 - _globals['_DATASOURCE_PUSHOPTIONS']._serialized_end=3002 - _globals['_DATASOURCE_SOURCETYPE']._serialized_start=3005 - _globals['_DATASOURCE_SOURCETYPE']._serialized_end=3253 - _globals['_DATASOURCELIST']._serialized_start=3272 - _globals['_DATASOURCELIST']._serialized_end=3333 + _globals['_DATASOURCE']._serialized_end=3300 + _globals['_DATASOURCE_TAGSENTRY']._serialized_start=1466 + _globals['_DATASOURCE_TAGSENTRY']._serialized_end=1509 + _globals['_DATASOURCE_FIELDMAPPINGENTRY']._serialized_start=1511 + _globals['_DATASOURCE_FIELDMAPPINGENTRY']._serialized_end=1562 + _globals['_DATASOURCE_SOURCEMETA']._serialized_start=1565 + _globals['_DATASOURCE_SOURCEMETA']._serialized_end=1810 + _globals['_DATASOURCE_FILEOPTIONS']._serialized_start=1812 + _globals['_DATASOURCE_FILEOPTIONS']._serialized_end=1913 + _globals['_DATASOURCE_BIGQUERYOPTIONS']._serialized_start=1915 + _globals['_DATASOURCE_BIGQUERYOPTIONS']._serialized_end=1962 + _globals['_DATASOURCE_TRINOOPTIONS']._serialized_start=1964 + _globals['_DATASOURCE_TRINOOPTIONS']._serialized_end=2008 + _globals['_DATASOURCE_KAFKAOPTIONS']._serialized_start=2011 + _globals['_DATASOURCE_KAFKAOPTIONS']._serialized_end=2185 + _globals['_DATASOURCE_KINESISOPTIONS']._serialized_start=2187 + _globals['_DATASOURCE_KINESISOPTIONS']._serialized_end=2289 + _globals['_DATASOURCE_REDSHIFTOPTIONS']._serialized_start=2291 + _globals['_DATASOURCE_REDSHIFTOPTIONS']._serialized_end=2372 + _globals['_DATASOURCE_ATHENAOPTIONS']._serialized_start=2374 + _globals['_DATASOURCE_ATHENAOPTIONS']._serialized_end=2458 + _globals['_DATASOURCE_SNOWFLAKEOPTIONS']._serialized_start=2460 + _globals['_DATASOURCE_SNOWFLAKEOPTIONS']._serialized_end=2548 + _globals['_DATASOURCE_SPARKOPTIONS']._serialized_start=2551 + _globals['_DATASOURCE_SPARKOPTIONS']._serialized_end=2715 + _globals['_DATASOURCE_CUSTOMSOURCEOPTIONS']._serialized_start=2717 + _globals['_DATASOURCE_CUSTOMSOURCEOPTIONS']._serialized_end=2761 + _globals['_DATASOURCE_REQUESTDATAOPTIONS']._serialized_start=2764 + _globals['_DATASOURCE_REQUESTDATAOPTIONS']._serialized_end=3011 + _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_start=2921 + _globals['_DATASOURCE_REQUESTDATAOPTIONS_DEPRECATEDSCHEMAENTRY']._serialized_end=3005 + _globals['_DATASOURCE_PUSHOPTIONS']._serialized_start=3013 + _globals['_DATASOURCE_PUSHOPTIONS']._serialized_end=3032 + _globals['_DATASOURCE_SOURCETYPE']._serialized_start=3035 + _globals['_DATASOURCE_SOURCETYPE']._serialized_end=3283 + _globals['_DATASOURCELIST']._serialized_start=3302 + _globals['_DATASOURCELIST']._serialized_end=3363 # @@protoc_insertion_point(module_scope) diff --git a/sdk/python/feast/protos/feast/core/DataSource_pb2.pyi b/sdk/python/feast/protos/feast/core/DataSource_pb2.pyi index 7876e1adc98..e38b71359fb 100644 --- a/sdk/python/feast/protos/feast/core/DataSource_pb2.pyi +++ b/sdk/python/feast/protos/feast/core/DataSource_pb2.pyi @@ -39,7 +39,7 @@ DESCRIPTOR: google.protobuf.descriptor.FileDescriptor class DataSource(google.protobuf.message.Message): """Defines a Data Source that can be used source Feature data - Next available id: 28 + Next available id: 29 """ DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -484,6 +484,7 @@ class DataSource(google.protobuf.message.Message): SPARK_OPTIONS_FIELD_NUMBER: builtins.int TRINO_OPTIONS_FIELD_NUMBER: builtins.int ATHENA_OPTIONS_FIELD_NUMBER: builtins.int + TIMESTAMP_FIELD_TYPE_FIELD_NUMBER: builtins.int name: builtins.str """Unique name of data source within the project""" project: builtins.str @@ -506,6 +507,8 @@ class DataSource(google.protobuf.message.Message): """ created_timestamp_column: builtins.str """Must specify creation timestamp column name""" + timestamp_field_type: builtins.str + """Optional type of the timestamp field (e.g. DATE)""" data_source_class_type: builtins.str """This is an internal field that is represents the python class for the data source object a proto object represents. This should be set by feast, and not by users. @@ -554,6 +557,7 @@ class DataSource(google.protobuf.message.Message): timestamp_field: builtins.str = ..., date_partition_column: builtins.str = ..., created_timestamp_column: builtins.str = ..., + timestamp_field_type: builtins.str = ..., data_source_class_type: builtins.str = ..., batch_source: global___DataSource | None = ..., meta: global___DataSource.SourceMeta | None = ..., @@ -571,7 +575,7 @@ class DataSource(google.protobuf.message.Message): athena_options: global___DataSource.AthenaOptions | None = ..., ) -> None: ... def HasField(self, field_name: typing_extensions.Literal["athena_options", b"athena_options", "batch_source", b"batch_source", "bigquery_options", b"bigquery_options", "custom_options", b"custom_options", "file_options", b"file_options", "kafka_options", b"kafka_options", "kinesis_options", b"kinesis_options", "meta", b"meta", "options", b"options", "push_options", b"push_options", "redshift_options", b"redshift_options", "request_data_options", b"request_data_options", "snowflake_options", b"snowflake_options", "spark_options", b"spark_options", "trino_options", b"trino_options"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["athena_options", b"athena_options", "batch_source", b"batch_source", "bigquery_options", b"bigquery_options", "created_timestamp_column", b"created_timestamp_column", "custom_options", b"custom_options", "data_source_class_type", b"data_source_class_type", "date_partition_column", b"date_partition_column", "description", b"description", "field_mapping", b"field_mapping", "file_options", b"file_options", "kafka_options", b"kafka_options", "kinesis_options", b"kinesis_options", "meta", b"meta", "name", b"name", "options", b"options", "owner", b"owner", "project", b"project", "push_options", b"push_options", "redshift_options", b"redshift_options", "request_data_options", b"request_data_options", "snowflake_options", b"snowflake_options", "spark_options", b"spark_options", "tags", b"tags", "timestamp_field", b"timestamp_field", "trino_options", b"trino_options", "type", b"type"]) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["athena_options", b"athena_options", "batch_source", b"batch_source", "bigquery_options", b"bigquery_options", "created_timestamp_column", b"created_timestamp_column", "custom_options", b"custom_options", "data_source_class_type", b"data_source_class_type", "date_partition_column", b"date_partition_column", "description", b"description", "field_mapping", b"field_mapping", "file_options", b"file_options", "kafka_options", b"kafka_options", "kinesis_options", b"kinesis_options", "meta", b"meta", "name", b"name", "options", b"options", "owner", b"owner", "project", b"project", "push_options", b"push_options", "redshift_options", b"redshift_options", "request_data_options", b"request_data_options", "snowflake_options", b"snowflake_options", "spark_options", b"spark_options", "tags", b"tags", "timestamp_field", b"timestamp_field", "timestamp_field_type", b"timestamp_field_type", "trino_options", b"trino_options", "type", b"type"]) -> None: ... def WhichOneof(self, oneof_group: typing_extensions.Literal["options", b"options"]) -> typing_extensions.Literal["file_options", "bigquery_options", "kafka_options", "kinesis_options", "redshift_options", "request_data_options", "custom_options", "snowflake_options", "push_options", "spark_options", "trino_options", "athena_options"] | None: ... global___DataSource = DataSource diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark_bfv_compute_on_read.py b/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark_bfv_compute_on_read.py index 0bcc282ae83..130583a13b1 100644 --- a/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark_bfv_compute_on_read.py +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/spark_offline_store/test_spark_bfv_compute_on_read.py @@ -52,6 +52,7 @@ def base_query_context(): min_event_timestamp="2023-01-01T00:00:00", max_event_timestamp="2024-01-01T00:00:00", date_partition_column=None, + timestamp_field_type=None, ) diff --git a/sdk/python/tests/unit/infra/offline_stores/test_bigquery.py b/sdk/python/tests/unit/infra/offline_stores/test_bigquery.py index 969a9679971..5a050f60c4f 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_bigquery.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_bigquery.py @@ -154,6 +154,117 @@ def test_pull_all_from_table_or_query_partition_pruning(mock_get_bigquery_client assert "partition_date <= '2021-01-02'" in actual_query +@patch("feast.infra.offline_stores.bigquery._get_bigquery_client") +def test_pull_latest_date_type_timestamp_field(mock_get_bigquery_client): + mock_get_bigquery_client.return_value = Mock() + test_repo_config = RepoConfig( + registry="gs://ml-test/repo/registry.db", + project="test", + provider="gcp", + online_store=SqliteOnlineStoreConfig(type="sqlite"), + offline_store=BigQueryOfflineStoreConfig(type="bigquery", dataset="feast"), + ) + test_data_source = BigQuerySource( + table="project:dataset.table", + timestamp_field="event_date", + timestamp_field_type="DATE", + ) + retrieval_job = BigQueryOfflineStore.pull_latest_from_table_or_query( + config=test_repo_config, + data_source=test_data_source, + join_key_columns=["driver_id"], + feature_name_columns=["feature1"], + timestamp_field="event_date", + created_timestamp_column=None, + start_date=datetime(2021, 1, 1, tzinfo=timezone.utc), + end_date=datetime(2021, 1, 2, tzinfo=timezone.utc), + ) + actual_query = retrieval_job.to_sql() + assert ( + "event_date BETWEEN DATE('2021-01-01') AND DATE('2021-01-02')" in actual_query + ) + assert "TIMESTAMP(" not in actual_query + + +@patch("feast.infra.offline_stores.bigquery._get_bigquery_client") +def test_pull_all_date_type_timestamp_field(mock_get_bigquery_client): + mock_get_bigquery_client.return_value = Mock() + test_repo_config = RepoConfig( + registry="gs://ml-test/repo/registry.db", + project="test", + provider="gcp", + online_store=SqliteOnlineStoreConfig(type="sqlite"), + offline_store=BigQueryOfflineStoreConfig(type="bigquery", dataset="feast"), + ) + test_data_source = BigQuerySource( + table="project:dataset.table", + timestamp_field="event_date", + timestamp_field_type="DATE", + ) + retrieval_job = BigQueryOfflineStore.pull_all_from_table_or_query( + config=test_repo_config, + data_source=test_data_source, + join_key_columns=["driver_id"], + feature_name_columns=["feature1"], + timestamp_field="event_date", + start_date=datetime(2021, 1, 1, tzinfo=timezone.utc), + end_date=datetime(2021, 1, 2, tzinfo=timezone.utc), + ) + actual_query = retrieval_job.to_sql() + assert ( + "event_date BETWEEN DATE('2021-01-01') AND DATE('2021-01-02')" in actual_query + ) + assert "TIMESTAMP(" not in actual_query + + +@patch("feast.infra.offline_stores.bigquery._get_bigquery_client") +def test_pull_latest_date_type_with_partition_column(mock_get_bigquery_client): + mock_get_bigquery_client.return_value = Mock() + test_repo_config = RepoConfig( + registry="gs://ml-test/repo/registry.db", + project="test", + provider="gcp", + online_store=SqliteOnlineStoreConfig(type="sqlite"), + offline_store=BigQueryOfflineStoreConfig(type="bigquery", dataset="feast"), + ) + test_data_source = BigQuerySource( + table="project:dataset.table", + timestamp_field="event_date", + timestamp_field_type="DATE", + date_partition_column="_PARTITIONDATE", + ) + retrieval_job = BigQueryOfflineStore.pull_latest_from_table_or_query( + config=test_repo_config, + data_source=test_data_source, + join_key_columns=["driver_id"], + feature_name_columns=["feature1"], + timestamp_field="event_date", + created_timestamp_column=None, + start_date=datetime(2021, 1, 1, tzinfo=timezone.utc), + end_date=datetime(2021, 1, 2, tzinfo=timezone.utc), + ) + actual_query = retrieval_job.to_sql() + assert "DATE('2021-01-01')" in actual_query + assert "DATE('2021-01-02')" in actual_query + assert "_PARTITIONDATE >= '2021-01-01'" in actual_query + assert "_PARTITIONDATE <= '2021-01-02'" in actual_query + + +def test_bigquery_source_date_type_proto_roundtrip(): + source = BigQuerySource( + table="project:dataset.table", + timestamp_field="event_date", + timestamp_field_type="DATE", + date_partition_column="_PARTITIONDATE", + ) + proto = source.to_proto() + restored = BigQuerySource.from_proto(proto) + assert restored.timestamp_field_type == "DATE" + assert restored.date_partition_column == "_PARTITIONDATE" + assert restored.timestamp_field == "event_date" + assert source == restored + + class TestBigQuerySourceGetTableQueryString: def test_table_only(self): source = BigQuerySource(