fix(spark): Replace mapInArrow with foreachPartition for materialization#6370
Draft
abhijeet-dhumal wants to merge 7 commits intofeast-dev:masterfrom
Draft
fix(spark): Replace mapInArrow with foreachPartition for materialization#6370abhijeet-dhumal wants to merge 7 commits intofeast-dev:masterfrom
abhijeet-dhumal wants to merge 7 commits intofeast-dev:masterfrom
Conversation
…config forwarding for vector store materialization Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
60fe5cf to
2cee78e
Compare
Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
SparkSource previously required exactly one of table/query/path. This relaxes the constraint to allow query + path together: - query: used for reading raw data during materialization - path: used for offline write-back (offline=True) and as pre-computed read source in get_historical_features Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
…MemoryError/OOMKill on large feature views Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What this PR does / why we need it
Fixes materialization failures when using
BatchFeatureView+SparkComputeEnginewith vector stores and Spark 3.5+.1. Replace
mapInArrowwithforeachPartition(main fix)Spark 3.5 inserts
WindowGroupLimitExecupstream ofMapInArrowExecwhen UDFs use Window operations, routing the Python worker through the wrong serialiser:foreachPartitionuses pickle — no Arrow UDF bridge — so the mismatch cannot occur.2. Re-apply session configs on reuse
SparkSession.getOrCreate()silently drops newspark.sql.*/spark.hadoop.*overrides when a session already exists, causing S3 access failures. Now explicitly re-applied aftergetOrCreate().3. Fix
map_in_pandasdummy yieldChanged
pd.DataFrame([pd.Series(range(1, 2))])→pd.DataFrame({"status": [0]})to match the declared return schema.4. Remove redundant
_apply_bfv_transformationsSuperseded by
SparkFeatureBuilderDAG pipeline (#6357) which handles BFV transformations for both materialization and historical retrieval.5. Add
spark_embedutilityHelper for
@batch_feature_viewUDFs that generate embeddings viaSentenceTransformer. UseslocalCheckpoint(eager=True)to sever Python lineage before downstream writes.Which issue(s) this PR fixes
Fixes
BatchFeatureViewmaterialization with vector stores (Milvus/Redis), Spark 3.5+ Window operations, and K8s-mode session config drift.Testing Strategy
map_in_pandasschema, empty partition handling, S3 event log dirBatchFeatureViewwith PySpark + RAPIDS GPU on K8s (4 executors), 26M+ keys to RedisChecks