Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 200 additions & 20 deletions sidemantic/adapters/gooddata.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def _parse_cloud_dataset(self, dataset_def: dict[str, Any]) -> Model | None:
"table",
)
)
sql = dataset_def.get("sql")
sql, sql_data_source_id = self._coerce_sql(dataset_def.get("sql"))
if sql:
table = None

Expand All @@ -164,6 +164,7 @@ def _parse_cloud_dataset(self, dataset_def: dict[str, Any]) -> Model | None:

attributes = self._as_list(dataset_def.get("attributes"))
facts = self._as_list(dataset_def.get("facts"))
aggregated_facts = self._as_list(dataset_def.get("aggregatedFacts") or dataset_def.get("aggregated_facts"))

fields = dataset_def.get("fields")
if fields:
Expand All @@ -186,6 +187,11 @@ def _parse_cloud_dataset(self, dataset_def: dict[str, Any]) -> Model | None:
if metric:
metrics.append(metric)

for agg_fact_def in aggregated_facts:
metric = self._parse_cloud_aggregated_fact(agg_fact_def)
if metric:
metrics.append(metric)

if primary_key and not any(dim.name == primary_key for dim in dimensions):
dimensions.append(
Dimension(
Expand All @@ -202,16 +208,27 @@ def _parse_cloud_dataset(self, dataset_def: dict[str, Any]) -> Model | None:
if rel:
relationships.append(rel)

data_source_table_id = dataset_def.get("dataSourceTableId") or dataset_def.get("data_source_table_id")
data_source_id = dataset_def.get("dataSourceId") or dataset_def.get("data_source_id")
# SQL-backed datasets carry the data source on the sql object; object-form
# dataSourceTableId carries it on the table descriptor.
if not data_source_id:
data_source_id = sql_data_source_id
if not data_source_id and isinstance(data_source_table_id, dict):
data_source_id = data_source_table_id.get("dataSourceId") or data_source_table_id.get("data_source_id")

metadata = {
GOODDATA_METADATA_KEY: {
"id": dataset_id,
"title": dataset_def.get("title"),
"description": dataset_def.get("description"),
"tags": dataset_def.get("tags"),
"data_source_id": dataset_def.get("dataSourceId") or dataset_def.get("data_source_id"),
"data_source_table_id": dataset_def.get("dataSourceTableId") or dataset_def.get("data_source_table_id"),
"data_source_id": data_source_id,
"data_source_table_id": data_source_table_id,
"table_path": dataset_def.get("tablePath") or dataset_def.get("table_path"),
"grain": grain_ids,
"workspace_data_filter_columns": dataset_def.get("workspaceDataFilterColumns")
or dataset_def.get("workspace_data_filter_columns"),
"extra": self._extract_extra(dataset_def, self._cloud_dataset_keys()),
}
}
Expand Down Expand Up @@ -244,9 +261,10 @@ def _parse_cloud_attribute(self, attr_def: dict[str, Any]) -> Dimension | None:
source_column = self._get_first(label_def or {}, "sourceColumn", "source_column") or self._get_first(
attr_def, "sourceColumn", "source_column"
)
data_type = self._get_first(label_def or {}, "dataType", "data_type") or self._get_first(
attr_def, "dataType", "data_type"
)
# sourceColumnDataType is the newer name for dataType; read both.
data_type = self._get_first(
label_def or {}, "sourceColumnDataType", "source_column_data_type", "dataType", "data_type"
) or self._get_first(attr_def, "sourceColumnDataType", "source_column_data_type", "dataType", "data_type")

dim_type, granularity = self._map_dimension_type(data_type)

Expand Down Expand Up @@ -283,7 +301,10 @@ def _parse_cloud_fact(self, fact_def: dict[str, Any]) -> Metric | None:
raise GoodDataParseError("Fact is missing an id/identifier")

source_column = self._get_first(fact_def, "sourceColumn", "source_column")
data_type = self._get_first(fact_def, "dataType", "data_type")
# sourceColumnDataType is the newer name for dataType; read both.
data_type = self._get_first(
fact_def, "sourceColumnDataType", "source_column_data_type", "dataType", "data_type"
)
agg = self._map_fact_aggregation(fact_def, data_type)

metadata = {
Expand All @@ -308,6 +329,55 @@ def _parse_cloud_fact(self, fact_def: dict[str, Any]) -> Metric | None:
metadata=metadata,
)

def _parse_cloud_aggregated_fact(self, agg_def: dict[str, Any]) -> Metric | None:
if "aggregatedFact" in agg_def and isinstance(agg_def["aggregatedFact"], dict):
agg_def = agg_def["aggregatedFact"]

agg_id = self._extract_identifier(agg_def, keys=("id", "identifier", "name"))
if not agg_id:
raise GoodDataParseError("Aggregated fact is missing an id/identifier")

source_column = self._get_first(agg_def, "sourceColumn", "source_column")
data_type = self._get_first(agg_def, "sourceColumnDataType", "source_column_data_type", "dataType", "data_type")

# Aggregate awareness: source fact + SUM/MIN/MAX operation.
source_fact_ref = self._get_first(agg_def, "sourceFactReference", "source_fact_reference") or {}
operation = None
source_fact_id = None
if isinstance(source_fact_ref, dict):
operation = source_fact_ref.get("operation")
reference = source_fact_ref.get("reference")
if isinstance(reference, dict):
source_fact_id = self._extract_identifier(reference, keys=("id", "identifier", "name"))
elif isinstance(reference, str):
source_fact_id = reference

agg = self._map_fact_aggregation({"aggregation": operation} if operation else {}, data_type)

metadata = {
GOODDATA_METADATA_KEY: {
"id": agg_id,
"title": agg_def.get("title"),
"description": agg_def.get("description"),
"tags": agg_def.get("tags"),
"source_column": source_column,
"data_type": data_type,
"aggregated_fact": True,
"operation": operation,
"source_fact": source_fact_id,
"extra": self._extract_extra(agg_def, self._cloud_aggregated_fact_keys()),
}
}

return Metric(
name=agg_id,
agg=agg,
sql=source_column or agg_id,
label=agg_def.get("title"),
description=agg_def.get("description"),
metadata=metadata,
)

def _parse_cloud_reference(self, ref_def: Any) -> Relationship | None:
if isinstance(ref_def, str):
target_id = ref_def
Expand All @@ -325,7 +395,18 @@ def _parse_cloud_reference(self, ref_def: Any) -> Relationship | None:
multivalue = self._get_first(ref_def, "multivalue", "multiValue") is True
rel_type = "many_to_many" if multivalue else "many_to_one"

source_columns = self._get_first(ref_def, "sourceColumns", "source_columns", "sourceColumn", "source_column")
# Newer exports replace flat sourceColumns with a sources array of
# {column, target: {id, type}, dataType}. Keep flat support for older exports.
sources = self._get_first(ref_def, "sources")
source_objects = None
if isinstance(sources, list) and sources:
source_objects = sources
source_columns = [s.get("column") for s in sources if isinstance(s, dict) and s.get("column")]
Comment thread
nicosuave marked this conversation as resolved.
Comment thread
nicosuave marked this conversation as resolved.
else:
source_columns = self._get_first(
ref_def, "sourceColumns", "source_columns", "sourceColumn", "source_column"
)

foreign_key = None
if isinstance(source_columns, list) and len(source_columns) == 1:
foreign_key = source_columns[0]
Expand All @@ -336,6 +417,7 @@ def _parse_cloud_reference(self, ref_def: Any) -> Relationship | None:
GOODDATA_METADATA_KEY: {
"identifier": target_id,
"source_columns": source_columns,
"sources": source_objects,
"multivalue": multivalue,
"extra": self._extract_extra(ref_def, self._cloud_reference_keys()),
}
Expand Down Expand Up @@ -507,9 +589,9 @@ def _parse_legacy_attribute(self, attr_def: dict[str, Any] | None, label_map: di
source_column = self._get_first(label_def or {}, "sourceColumn", "source_column") or self._get_first(
attr_def, "sourceColumn", "source_column"
)
data_type = self._get_first(label_def or {}, "dataType", "data_type") or self._get_first(
attr_def, "dataType", "data_type"
)
data_type = self._get_first(
label_def or {}, "sourceColumnDataType", "source_column_data_type", "dataType", "data_type"
) or self._get_first(attr_def, "sourceColumnDataType", "source_column_data_type", "dataType", "data_type")

dim_type, granularity = self._map_dimension_type(data_type)

Expand Down Expand Up @@ -545,7 +627,9 @@ def _parse_legacy_fact(self, fact_def: dict[str, Any] | None) -> Metric | None:
raise GoodDataParseError("Legacy fact is missing an identifier")

source_column = self._get_first(fact_def, "sourceColumn", "source_column")
data_type = self._get_first(fact_def, "dataType", "data_type")
data_type = self._get_first(
fact_def, "sourceColumnDataType", "source_column_data_type", "dataType", "data_type"
)
agg = self._map_fact_aggregation(fact_def, data_type)

metadata = {
Expand Down Expand Up @@ -617,10 +701,14 @@ def _export_dataset(self, model: Model) -> dict[str, Any]:
if gd_meta.get("tags"):
dataset["tags"] = gd_meta["tags"]

if model.table:
original_table_id = gd_meta.get("data_source_table_id")
if isinstance(original_table_id, dict):
# Preserve the newer object form (dataSourceId/id/type/path) verbatim.
dataset["dataSourceTableId"] = original_table_id
elif model.table:
dataset["dataSourceTableId"] = model.table
elif gd_meta.get("data_source_table_id"):
dataset["dataSourceTableId"] = gd_meta["data_source_table_id"]
elif original_table_id:
dataset["dataSourceTableId"] = original_table_id
elif gd_meta.get("table_path"):
dataset["tablePath"] = gd_meta["table_path"]

Expand All @@ -630,6 +718,10 @@ def _export_dataset(self, model: Model) -> dict[str, Any]:
if model.sql:
dataset["sql"] = model.sql

wdf_columns = gd_meta.get("workspace_data_filter_columns")
if wdf_columns:
dataset["workspaceDataFilterColumns"] = wdf_columns

grain_ids = gd_meta.get("grain") or [model.primary_key]
dataset["grain"] = [{"id": grain_id, "type": "attribute"} for grain_id in grain_ids if grain_id]

Expand Down Expand Up @@ -673,8 +765,31 @@ def _export_dataset(self, model: Model) -> dict[str, Any]:
dataset["attributes"] = attributes

facts = []
aggregated_facts = []
for metric in model.metrics:
fact_meta = (metric.metadata or {}).get(GOODDATA_METADATA_KEY, {})

if fact_meta.get("aggregated_fact"):
agg_def: dict[str, Any] = {
"id": fact_meta.get("id") or metric.name,
"title": metric.label or metric.name,
"sourceColumn": metric.sql or metric.name,
"sourceColumnDataType": fact_meta.get("data_type") or "NUMERIC",
}
if metric.description:
agg_def["description"] = metric.description
if fact_meta.get("tags"):
agg_def["tags"] = fact_meta["tags"]
operation = fact_meta.get("operation") or (metric.agg.upper() if metric.agg else None)
source_fact = fact_meta.get("source_fact")
if operation and source_fact:
agg_def["sourceFactReference"] = {
"operation": operation,
"reference": {"id": source_fact, "type": "fact"},
}
aggregated_facts.append(agg_def)
continue

fact_def: dict[str, Any] = {
"id": fact_meta.get("id") or metric.name,
"title": metric.label or metric.name,
Expand All @@ -696,6 +811,9 @@ def _export_dataset(self, model: Model) -> dict[str, Any]:
if facts:
dataset["facts"] = facts

if aggregated_facts:
dataset["aggregatedFacts"] = aggregated_facts

references = []
for rel in model.relationships:
if rel.type not in ("many_to_one", "many_to_many", "one_to_one"):
Expand All @@ -707,11 +825,16 @@ def _export_dataset(self, model: Model) -> dict[str, Any]:
"multivalue": rel.type == "many_to_many" or rel_meta.get("multivalue") is True,
}

source_columns = rel_meta.get("source_columns")
if not source_columns and rel.foreign_key:
source_columns = [rel.foreign_key]
if source_columns:
ref["sourceColumns"] = source_columns
sources = rel_meta.get("sources")
if isinstance(sources, list) and sources:
# Preserve the newer sources array form verbatim.
ref["sources"] = sources
else:
source_columns = rel_meta.get("source_columns")
if not source_columns and rel.foreign_key:
source_columns = [rel.foreign_key]
if source_columns:
ref["sourceColumns"] = source_columns

references.append(ref)

Expand Down Expand Up @@ -827,15 +950,41 @@ def _select_label(self, labels: list[dict[str, Any]], default_label_id: str | No
return label
return labels[0] if labels else None

def _coerce_sql(self, value: Any) -> tuple[str | None, str | None]:
"""Return (sql_statement, data_source_id) for legacy string or object SQL.

SQL-backed datasets use ``{"dataSourceId": ..., "statement": ...}``;
older exports use a plain string. Both forms are supported.
"""
if isinstance(value, dict):
statement = value.get("statement") or value.get("sql")
data_source_id = value.get("dataSourceId") or value.get("data_source_id")
return (statement if isinstance(statement, str) else None), (
data_source_id if isinstance(data_source_id, str) else None
)
Comment thread
nicosuave marked this conversation as resolved.
if isinstance(value, str):
return value, None
return None, None

def _coerce_table_path(self, value: Any) -> str | None:
if isinstance(value, list):
parts = [str(part) for part in value if part]
return ".".join(parts) if parts else None
if isinstance(value, dict):
# Newer object form: {dataSourceId, id, type, path: [schema, table]}
path = value.get("path")
if isinstance(path, list):
parts = [str(part) for part in path if part]
if parts:
return ".".join(parts)
schema = value.get("schema")
table = value.get("table")
if schema and table:
return f"{schema}.{table}"
# Fall back to a bare id when no path/schema is available.
table_id = value.get("id")
if isinstance(table_id, str):
return table_id
if isinstance(value, str):
return value
return None
Expand Down Expand Up @@ -950,8 +1099,12 @@ def _cloud_dataset_keys(self) -> set[str]:
"primaryKey",
"attributes",
"facts",
"aggregatedFacts",
"aggregated_facts",
"references",
"fields",
"workspaceDataFilterColumns",
"workspace_data_filter_columns",
}

def _cloud_attribute_keys(self) -> set[str]:
Expand All @@ -966,6 +1119,8 @@ def _cloud_attribute_keys(self) -> set[str]:
"source_column",
"dataType",
"data_type",
"sourceColumnDataType",
"source_column_data_type",
"labels",
"defaultView",
"default_view",
Expand All @@ -984,6 +1139,28 @@ def _cloud_fact_keys(self) -> set[str]:
"source_column",
"dataType",
"data_type",
"sourceColumnDataType",
"source_column_data_type",
"aggregation",
"type",
}

def _cloud_aggregated_fact_keys(self) -> set[str]:
return {
"id",
"identifier",
"name",
"title",
"description",
"tags",
"sourceColumn",
"source_column",
"dataType",
"data_type",
"sourceColumnDataType",
"source_column_data_type",
"sourceFactReference",
"source_fact_reference",
"aggregation",
"type",
}
Expand All @@ -993,10 +1170,13 @@ def _cloud_reference_keys(self) -> set[str]:
"identifier",
"dataset",
"reference",
"sources",
"sourceColumns",
"source_columns",
"sourceColumn",
"source_column",
"sourceColumnDataTypes",
"source_column_data_types",
"multivalue",
"multiValue",
}
Expand Down
Loading
Loading