Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 176 additions & 50 deletions sidemantic/adapters/atscale_sml.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import re
import warnings
from dataclasses import dataclass
from pathlib import Path
Expand Down Expand Up @@ -80,6 +81,7 @@ class MetricInfo:
label: str | None
description: str | None
format_str: str | None
metadata: dict[str, Any] | None = None


def _normalize_calc_method(method: str | None) -> str | None:
Expand Down Expand Up @@ -135,6 +137,53 @@ def _parse_named_quantile(value: str | None) -> float | None:
return None


def _parse_custom_quantiles(value: Any) -> list[float]:
"""Parse a custom_quantiles array (SML v1.5) of numbers between 0 and 1."""
if not isinstance(value, (list, tuple)):
return []
quantiles: list[float] = []
for item in value:
try:
numeric = float(item)
except (TypeError, ValueError):
continue
if 0 < numeric < 1:
quantiles.append(numeric)
return quantiles


_PERCENTILE_CONT_RE = re.compile(
r"PERCENTILE_CONT\s*\(\s*(?P<quantile>[0-9.]+)\s*\)\s*WITHIN\s+GROUP\s*\(\s*ORDER\s+BY\s+(?P<column>.+?)\s*\)\s*$",
Comment thread
nicosuave marked this conversation as resolved.
Outdated
re.IGNORECASE | re.DOTALL,
)


def _percentile_column_from_sql(sql: str | None) -> str | None:
"""Extract the ordered column from a PERCENTILE_CONT(...) WITHIN GROUP (ORDER BY col) expression."""
if not sql:
return None
match = _PERCENTILE_CONT_RE.search(sql.strip())
if not match:
return None
return match.group("column").strip() or None


def _percentile_quantile_from_sql(sql: str | None) -> float | None:
"""Extract the quantile from a PERCENTILE_CONT(q) WITHIN GROUP (ORDER BY col) expression."""
if not sql:
return None
match = _PERCENTILE_CONT_RE.search(sql.strip())
if not match:
return None
try:
quantile = float(match.group("quantile"))
except (TypeError, ValueError):
return None
if 0 < quantile < 1:
return quantile
return None


def _data_type_to_granularity(data_type: str | None) -> str | None:
if not data_type:
return None
Expand Down Expand Up @@ -168,6 +217,7 @@ class AtScaleSMLAdapter(BaseAdapter):
"metric_calc",
"model",
"composite_model",
"package",
}

_object_type_by_dir = {
Expand All @@ -177,6 +227,7 @@ class AtScaleSMLAdapter(BaseAdapter):
"dimensions": "dimension",
"metrics": "metric",
"models": "model",
"packages": "package",
}

def parse(self, source: str | Path) -> SemanticGraph:
Expand Down Expand Up @@ -291,6 +342,7 @@ def _load_objects(self, files: list[Path]) -> dict[str, dict[str, dict[str, Any]
"metric_calc": {},
"model": {},
"composite_model": {},
"package": {},
}

for file_path in files:
Expand Down Expand Up @@ -330,9 +382,21 @@ def _register_object(
if obj_type not in self._supported_object_types:
return

unique_name = data.get("unique_name")
if unique_name:
objects[obj_type][unique_name] = data
# Package objects (external Git-repo references, SML packages/) key on
# `name` since they do not carry a `unique_name`.
key = data.get("unique_name") or (data.get("name") if obj_type == "package" else None)
Comment thread
nicosuave marked this conversation as resolved.
if key:
objects[obj_type][key] = data

# A catalog may declare external Git-repo packages inline. Register each
# so it is tracked alongside standalone package files.
if obj_type == "catalog":
for package in data.get("packages") or []:
if not isinstance(package, dict):
continue
pkg_key = package.get("name") or package.get("url")
if pkg_key:
objects["package"].setdefault(pkg_key, package)

def _resolve_datasets(
self, datasets: dict[str, dict[str, Any]], connections: dict[str, dict[str, Any]]
Expand Down Expand Up @@ -610,6 +674,7 @@ def _metric_from_definition(self, metric_def: dict[str, Any]) -> MetricInfo | No
agg = None
metric_type = None
sql = None
metadata: dict[str, Any] | None = None

if method in _CALC_METHOD_AGG_MAP:
agg = _CALC_METHOD_AGG_MAP[method]
Expand All @@ -619,8 +684,19 @@ def _metric_from_definition(self, metric_def: dict[str, Any]) -> MetricInfo | No
if column:
sql = f"SUM(DISTINCT {column})"
elif method == "percentile":
quantile = _parse_named_quantile(metric_def.get("named_quantiles"))
if quantile == 0.5 and (metric_def.get("named_quantiles") or "").lower() == "median":
metadata = self._percentile_metadata(metric_def)
# custom_quantiles (SML v1.5) takes precedence over named_quantiles.
custom_quantiles = _parse_custom_quantiles(metric_def.get("custom_quantiles"))
if custom_quantiles:
quantile = custom_quantiles[0]
Comment thread
nicosuave marked this conversation as resolved.
else:
quantile = _parse_named_quantile(metric_def.get("named_quantiles"))
is_median = (
not custom_quantiles
and quantile == 0.5
and (metric_def.get("named_quantiles") or "").lower() == "median"
)
if is_median:
agg = "median"
sql = column
else:
Expand Down Expand Up @@ -648,8 +724,21 @@ def _metric_from_definition(self, metric_def: dict[str, Any]) -> MetricInfo | No
label=metric_def.get("label"),
description=metric_def.get("description"),
format_str=metric_def.get("format"),
metadata=metadata,
)

@staticmethod
def _percentile_metadata(metric_def: dict[str, Any]) -> dict[str, Any] | None:
"""Preserve SML v1.5 percentile fields that have no direct SQL mapping."""
metadata: dict[str, Any] = {}
compression = metric_def.get("compression")
if compression is not None:
metadata["compression"] = compression
custom_quantiles = _parse_custom_quantiles(metric_def.get("custom_quantiles"))
if custom_quantiles:
metadata["custom_quantiles"] = custom_quantiles
return metadata or None

def _metric_from_metrical_attribute(self, metrical: dict[str, Any]) -> MetricInfo | None:
if not metrical:
return None
Expand All @@ -668,6 +757,8 @@ def _metric_from_metrical_attribute(self, metrical: dict[str, Any]) -> MetricInf
"description": metrical.get("description"),
"format": metrical.get("format"),
"named_quantiles": metrical.get("named_quantiles"),
"custom_quantiles": metrical.get("custom_quantiles"),
"compression": metrical.get("compression"),
}

return self._metric_from_definition(metric_def)
Expand All @@ -689,6 +780,19 @@ def _metric_from_calc(self, calc_def: dict[str, Any]) -> MetricInfo | None:
format_str=calc_def.get("format"),
)

@staticmethod
def _metric_from_info(info: MetricInfo) -> Metric:
return Metric(
name=info.name,
agg=info.agg,
sql=info.sql,
type=info.metric_type,
label=info.label,
description=info.description,
format=info.format_str,
metadata=info.metadata,
)

def _apply_dimension_attrs_to_models(
self,
dimension_attrs: dict[str, list[DimensionAttr]],
Expand Down Expand Up @@ -763,34 +867,14 @@ def _apply_metric_infos_to_models(
for info in metrics:
if model.get_metric(info.name):
continue
model.metrics.append(
Metric(
name=info.name,
agg=info.agg,
sql=info.sql,
type=info.metric_type,
label=info.label,
description=info.description,
format=info.format_str,
)
)
model.metrics.append(self._metric_from_info(info))

if not dataset_to_model:
for info in metric_infos.get("__global__", []):
for model in graph.models.values():
if model.get_metric(info.name):
continue
model.metrics.append(
Metric(
name=info.name,
agg=info.agg,
sql=info.sql,
type=info.metric_type,
label=info.label,
description=info.description,
format=info.format_str,
)
)
model.metrics.append(self._metric_from_info(info))

def _build_models_from_model_defs(
self,
Expand Down Expand Up @@ -893,34 +977,14 @@ def _attach_metrics_for_model(
continue
if model.get_metric(info.name):
continue
model.metrics.append(
Metric(
name=info.name,
agg=info.agg,
sql=info.sql,
type=info.metric_type,
label=info.label,
description=info.description,
format=info.format_str,
)
)
model.metrics.append(self._metric_from_info(info))

for info in metric_infos.get("__global__", []):
if not include_all and info.name not in metric_names:
continue
if model.get_metric(info.name):
continue
model.metrics.append(
Metric(
name=info.name,
agg=info.agg,
sql=info.sql,
type=info.metric_type,
label=info.label,
description=info.description,
format=info.format_str,
)
)
model.metrics.append(self._metric_from_info(info))

def _apply_model_relationships(
self,
Expand Down Expand Up @@ -1216,6 +1280,13 @@ def _export_metric(self, metric: Metric, model: Model) -> dict[str, Any]:
if metric.type and metric.type != "derived":
return self._export_metric_calc(metric)

# Imported custom-quantile percentiles (SML v1.5) arrive as derived metrics with
# agg=None and a PERCENTILE_CONT(...) expression. Export them as percentile metrics
# so custom_quantiles/compression round-trip instead of degrading to a metric_calc.
percentile_def = self._export_percentile_metric(metric, model)
if percentile_def is not None:
return percentile_def

if metric.agg is None:
return self._export_metric_calc(metric)

Expand All @@ -1232,15 +1303,70 @@ def _export_metric(self, metric: Metric, model: Model) -> dict[str, Any]:
if not metric.sql:
return self._export_metric_calc(metric)

calculation_method = method_mapping.get(metric.agg, "sum")
metric_def = {
"unique_name": metric.name,
"object_type": "metric",
"label": metric.label or metric.name,
"calculation_method": method_mapping.get(metric.agg, "sum"),
"calculation_method": calculation_method,
"dataset": model.name,
"column": metric.sql,
}

if calculation_method == "percentile":
metadata = metric.metadata or {}
custom_quantiles = metadata.get("custom_quantiles")
if custom_quantiles:
metric_def["custom_quantiles"] = list(custom_quantiles)
Comment thread
nicosuave marked this conversation as resolved.
else:
metric_def["named_quantiles"] = "median"
if metadata.get("compression") is not None:
metric_def["compression"] = metadata["compression"]

if metric.description:
metric_def["description"] = metric.description
if metric.format:
metric_def["format"] = metric.format

return metric_def

def _export_percentile_metric(self, metric: Metric, model: Model) -> dict[str, Any] | None:
"""Export an imported percentile metric as a percentile metric definition.

Imported SML v1.5 percentiles parse to derived metrics with ``agg=None`` and a
``PERCENTILE_CONT(q) WITHIN GROUP (ORDER BY col)`` expression. Both custom-quantile
percentiles (``custom_quantiles: [0.75]``) and named percentiles (``named_quantiles: p90``,
which carry ``compression`` but no ``custom_quantiles``) are handled here. Without this,
``_export_metric`` would emit a ``metric_calc`` and drop the percentile shape plus
``custom_quantiles``/``compression``. Named percentiles are re-emitted as their numeric
``custom_quantiles`` equivalent since the original name is not preserved on import.
"""
if metric.agg is not None:
return None

column = _percentile_column_from_sql(metric.sql)
if column is None:
return None

metadata = metric.metadata or {}
custom_quantiles = _parse_custom_quantiles(metadata.get("custom_quantiles"))
if not custom_quantiles:
quantile = _percentile_quantile_from_sql(metric.sql)
if quantile is None:
return None
custom_quantiles = [quantile]

metric_def: dict[str, Any] = {
"unique_name": metric.name,
"object_type": "metric",
"label": metric.label or metric.name,
"calculation_method": "percentile",
"dataset": model.name,
"column": column,
"custom_quantiles": list(custom_quantiles),
}
if metadata.get("compression") is not None:
metric_def["compression"] = metadata["compression"]
if metric.description:
metric_def["description"] = metric.description
if metric.format:
Expand Down
Loading
Loading