Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 236 additions & 11 deletions sidemantic/adapters/holistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,18 @@ def parse(self, source: str | Path) -> SemanticGraph:
for model in resolved_models.values():
graph.add_model(model)

# Dataset-level dimensions/metrics, standalone Metric blocks, and
# PartialDataset blocks composed via .extend(). Datasets surface as
# models so their cross-model dimensions/metrics are not dropped, and
# standalone metrics register as graph-level metrics.
dataset_models, standalone_metrics = _resolve_dataset_artifacts(documents, constants, resolved_models)
for model in dataset_models.values():
if model.name not in graph.models:
graph.add_model(model)
for metric in standalone_metrics.values():
if metric.name not in graph.metrics:
graph.add_metric(metric)
Comment thread
nicosuave marked this conversation as resolved.

pending_relationships: list[_AmlRelationship] = []
pending_relationship_refs: list[_RelationshipRef] = []
relationships_by_name: dict[str, _AmlRelationship] = {}
Expand Down Expand Up @@ -1136,6 +1148,168 @@ def _parse_measure_block(block: AmlBlock, constants: dict[str, AmlValue], contex
)


def _collect_dataset_definitions(
documents: Iterable[_AmlDocument],
) -> tuple[
dict[str, tuple[AmlBlock, _FileContext]],
dict[str, tuple[AmlBlock, _FileContext]],
dict[str, tuple[AmlBlock, _FileContext]],
dict[str, tuple[ObjectAssignment, _FileContext]],
]:
"""Collect Dataset / PartialDataset / standalone Metric blocks and Dataset assignments.

Returns (dataset_blocks, partial_dataset_blocks, metric_blocks, dataset_assignments).
"""
dataset_blocks: dict[str, tuple[AmlBlock, _FileContext]] = {}
partial_dataset_blocks: dict[str, tuple[AmlBlock, _FileContext]] = {}
metric_blocks: dict[str, tuple[AmlBlock, _FileContext]] = {}
dataset_assignments: dict[str, tuple[ObjectAssignment, _FileContext]] = {}

for document in documents:
context = document.context
for item in document.items:
if isinstance(item, AmlBlock) and item.name:
if item.kind == "Dataset":
name = _qualify_declared_name(item.name, context.module_prefix)
dataset_blocks[name] = (AmlBlock(kind="Dataset", name=name, items=item.items), context)
elif item.kind == "PartialDataset":
name = _qualify_declared_name(item.name, context.module_prefix)
partial_dataset_blocks[name] = (
AmlBlock(kind="PartialDataset", name=name, items=item.items),
context,
)
elif item.kind == "Metric":
name = _qualify_declared_name(item.name, context.module_prefix)
metric_blocks[name] = (AmlBlock(kind="Metric", name=name, items=item.items), context)
continue
if isinstance(item, ObjectAssignment) and item.kind == "Dataset":
name = _qualify_declared_name(item.name, context.module_prefix)
dataset_assignments[name] = (item, context)
Comment thread
nicosuave marked this conversation as resolved.
Outdated

return dataset_blocks, partial_dataset_blocks, metric_blocks, dataset_assignments


def _resolve_dataset_artifacts(
documents: Iterable[_AmlDocument],
constants: dict[str, AmlValue],
existing_models: dict[str, Model],
) -> tuple[dict[str, Model], dict[str, Metric]]:
"""Resolve dataset-level fields and standalone metrics into models / graph metrics."""
documents = list(documents)
dataset_blocks, partial_dataset_blocks, metric_blocks, dataset_assignments = _collect_dataset_definitions(documents)

# PartialDataset blocks resolve like partial models for .extend() composition.
resolved_blocks: dict[str, AmlBlock] = {}
resolving: set[str] = set()

def resolve_named_block(name: str) -> AmlBlock | None:
if name in resolved_blocks:
return resolved_blocks[name]
if name in resolving:
return None
if name in dataset_blocks:
block, _ = dataset_blocks[name]
resolved_blocks[name] = block
return block
if name in partial_dataset_blocks:
block, _ = partial_dataset_blocks[name]
return block
assignment_entry = dataset_assignments.get(name)
if assignment_entry:
assignment, ctx = assignment_entry
resolving.add(name)
resolved_value = _resolve_block_from_value(assignment.value, ctx, resolve_named_block)
Comment thread
nicosuave marked this conversation as resolved.
resolving.remove(name)
if resolved_value:
resolved_blocks[name] = AmlBlock(kind="Dataset", name=name, items=resolved_value.items)
return resolved_blocks[name]
return None

dataset_models: dict[str, Model] = {}

for name, (block, context) in dataset_blocks.items():
model = _parse_dataset_block(block, constants, context)
if model and model.name not in existing_models:
dataset_models[model.name] = model

for name, (assignment, context) in dataset_assignments.items():
block = resolve_named_block(name)
if not block:
continue
block_with_name = AmlBlock(kind="Dataset", name=name, items=block.items)
model = _parse_dataset_block(block_with_name, constants, context)
Comment thread
nicosuave marked this conversation as resolved.
Comment thread
nicosuave marked this conversation as resolved.
if model and model.name not in existing_models:
dataset_models[model.name] = model

# Standalone Metric blocks become graph-level metrics.
standalone_metrics: dict[str, Metric] = {}
for name, (block, context) in metric_blocks.items():
metric_block = AmlBlock(kind="metric", name=block.name, items=block.items)
metric = _parse_measure_block(metric_block, constants, context)
if metric:
metric.name = name
standalone_metrics[name] = metric

return dataset_models, standalone_metrics


def _parse_dataset_block(block: AmlBlock, constants: dict[str, AmlValue], context: _FileContext) -> Model | None:
"""Parse a Dataset block's dataset-level dimension/metric blocks into a Model.

Dataset dimensions and metrics use cross-model AQL definitions (the only
style Holistics allows in datasets). They are surfaced on a synthetic model
named after the dataset so they are not silently dropped.
"""
if not block.name:
return None

properties = _properties_from_items(block.items)
label = _value_as_string(properties.get("label"), constants, context)
description = _value_as_string(properties.get("description"), constants, context)

dimensions: list[Dimension] = []
metrics: list[Metric] = []

for item in block.items:
if not isinstance(item, AmlBlock):
continue
Comment thread
nicosuave marked this conversation as resolved.
if item.kind == "dimension":
dimension, _ = _parse_dimension_block(item, constants, context)
if not dimension:
continue
# Dataset dimensions usually combine fields across models, but some
# are authored with an aggregate AQL (e.g. count(...)). An aggregate
# cannot be a groupable dimension, so surface it as a derived metric.
if _sql_is_aggregate(dimension.sql):
metrics.append(
Metric(
name=dimension.name,
type="derived",
sql=dimension.sql,
label=dimension.label,
description=dimension.description,
format=dimension.format,
)
)
else:
dimensions.append(dimension)
Comment thread
nicosuave marked this conversation as resolved.
elif item.kind in {"measure", "metric"}:
metric = _parse_measure_block(item, constants, context)
if metric:
metrics.append(metric)

if not dimensions and not metrics:
return None

return Model(
name=block.name,
description=description or label,
primary_key="id",
dimensions=dimensions,
metrics=metrics,
Comment thread
nicosuave marked this conversation as resolved.
)


def _parse_relationship_definition(block: AmlBlock, context: _FileContext) -> _AmlRelationship | None:
rel = _parse_relationship_block(block, context)
if not rel:
Expand Down Expand Up @@ -1482,7 +1656,9 @@ def _translate_aql_to_sql(expr: str) -> str:
if len(segments) == 1:
return _translate_aql_inline(base)

current = _replace_aql_macros(base)
# Translate the base segment too: it may itself be a function call
# (e.g. count(orders.id) | of_all(...)), not just a bare field reference.
current = _translate_aql_inline(base)
for segment in segments[1:]:
current = _apply_aql_pipe(current, segment.strip())
return current
Expand Down Expand Up @@ -1610,29 +1786,65 @@ def _find_matching_paren(expr: str, start: int) -> int | None:
return None


# Aggregation functions: map directly to SQL aggregates.
_AQL_AGG_SQL = {
"count": "COUNT",
"count_all": "COUNT",
"sum": "SUM",
"avg": "AVG",
"average": "AVG",
"min": "MIN",
"max": "MAX",
"median": "MEDIAN",
}

# Table-shaping functions (filter/group/select rows of a table). At the
# SQL-fragment level the surrounding aggregation produces the value, so these
# are best-effort passed through (the base flows to the next pipe stage).
_AQL_TABLE_FUNCS = {"filter", "group", "select", "where"}

# Metric-modifier functions that change a metric's grouping/period scope.
# We cannot express their windowing at the fragment level, so we preserve the
# inner metric expression rather than dropping it.
_AQL_METRIC_MODIFIERS = {"of_all", "exclude", "keep_grains", "relative_period", "period_to_date", "running_total"}


def _apply_aql_function(name: str, args: list[str], base: str | None) -> str:
normalized = name.strip().lower()
cleaned_args = [_replace_aql_macros(arg.strip()) for arg in args if arg.strip()]
target = cleaned_args[0] if cleaned_args else base or "*"

if normalized in {"count", "count_all"}:
return f"COUNT({target})"
if normalized in _AQL_AGG_SQL:
sql_func = _AQL_AGG_SQL[normalized]
# Two-arg aggregation form: sum(table, expr) aggregates expr over table.
if base is None and len(cleaned_args) >= 2:
return f"{sql_func}({cleaned_args[1]})"
return f"{sql_func}({target})"
if normalized in {"count_distinct", "countdistinct"}:
if base is None and len(cleaned_args) >= 2:
return f"COUNT(DISTINCT {cleaned_args[1]})"
return f"COUNT(DISTINCT {target})"
if normalized in {"sum"}:
return f"SUM({target})"
if normalized in {"avg", "average"}:
return f"AVG({target})"
if normalized in {"min"}:
return f"MIN({target})"
if normalized in {"max"}:
return f"MAX({target})"
if normalized in {"count_if", "countif"}:
condition = cleaned_args[0] if cleaned_args else (base or "")
if not condition:
return "COUNT(*)"
return f"SUM(CASE WHEN {condition} THEN 1 ELSE 0 END)"

if normalized in _AQL_TABLE_FUNCS:
# Best-effort: keep the upstream table expression flowing through the
# pipeline; the aggregation that follows still produces the value.
if base is not None:
return base
Comment thread
nicosuave marked this conversation as resolved.
if cleaned_args:
return cleaned_args[0]
return "*"

if normalized in _AQL_METRIC_MODIFIERS:
# Preserve the metric being modified so it is not lost. The scope change
# (ignore grouping / shift period) cannot be expressed in a fragment.
inner = base if base is not None else (cleaned_args[0] if cleaned_args else "")
return inner
Comment thread
nicosuave marked this conversation as resolved.

if base:
combined_args = [base] + cleaned_args if cleaned_args else [base]
else:
Expand Down Expand Up @@ -1749,6 +1961,19 @@ def _detect_ratio(expr: str) -> tuple[str, str] | None:
return None


_AGGREGATE_FUNC_RE = re.compile(
r"\b(COUNT|SUM|AVG|MIN|MAX|MEDIAN|STDDEV|STDDEV_SAMP|STDDEV_POP|VAR_SAMP|VAR_POP|VARIANCE)\s*\(",
re.IGNORECASE,
)


def _sql_is_aggregate(sql: str | None) -> bool:
"""Return True if the SQL fragment contains a SQL aggregate function call."""
if not sql:
return False
return bool(_AGGREGATE_FUNC_RE.search(sql))


def _map_dimension_type(dim_type: str | None) -> tuple[str, str | None]:
if not dim_type:
return "categorical", None
Expand Down
67 changes: 67 additions & 0 deletions tests/adapters/holistics/test_kitchen_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,73 @@ def test_relationships(self, kitchen_sink_layer):

assert summary is not None

def test_dataset_level_metrics_and_dimensions(self, kitchen_sink_layer):
"""Dataset-level metric/dimension blocks (AQL, cross-model) are surfaced."""
graph = kitchen_sink_layer.graph

# Datasets surface as models named after the dataset.
assert "kitchen_transactions" in graph.models
assert "kitchen_sink" in graph.models

transactions = graph.models["kitchen_transactions"]

# Dataset dimension authored with an aggregate AQL (count(...)) is
# surfaced as a derived metric, since an aggregate cannot be a
# groupable dimension.
total_buyer_orders = transactions.get_metric("total_buyer_orders")
assert total_buyer_orders is not None
assert total_buyer_orders.sql == "COUNT(kitchen_orders.order_id)"

# Dataset metric with no aggregation_type, defined purely via @aql.
avg_order_amount = transactions.get_metric("avg_order_amount")
assert avg_order_amount is not None
assert "SUM(kitchen_orders.amount)" in avg_order_amount.sql
assert "COUNT(kitchen_orders.order_id)" in avg_order_amount.sql

buyer_event_ratio = transactions.get_metric("buyer_event_ratio")
assert buyer_event_ratio is not None
assert "COUNT(kitchen_events.event_id)" in buyer_event_ratio.sql
assert "COUNT(DISTINCT kitchen_buyers.person_id)" in buyer_event_ratio.sql

sink = graph.models["kitchen_sink"]
revenue_per_customer = sink.get_metric("revenue_per_customer")
assert revenue_per_customer is not None
assert "SUM(kitchen_orders.amount)" in revenue_per_customer.sql

def test_standalone_metric_and_partial_dataset(self, kitchen_sink_layer):
"""Standalone Metric blocks register as graph metrics; PartialDataset
metrics compose into a Dataset via .extend()."""
graph = kitchen_sink_layer.graph

# Standalone top-level Metric -> graph-level metric.
assert "kitchen_global_revenue" in graph.metrics
global_revenue = graph.metrics["kitchen_global_revenue"]
assert global_revenue.label == "Global Revenue"
assert global_revenue.sql == "SUM(kitchen_orders.amount)"

# Dataset = base.extend(partial_dataset) surfaces the partial's metrics.
assert "kitchen_metric_store" in graph.models
store = graph.models["kitchen_metric_store"]

order_count = store.get_metric("reusable_order_count")
assert order_count is not None
assert order_count.sql == "COUNT(kitchen_orders.order_id)"

# where() table function preserved through the pipe; count still applies.
high_value = store.get_metric("high_value_orders")
assert high_value is not None
assert high_value.sql == "COUNT(kitchen_orders.order_id)"

# of_all() metric modifier preserves the inner aggregation.
revenue_share = store.get_metric("revenue_share")
assert revenue_share is not None
assert revenue_share.sql == "SUM(kitchen_orders.amount)"

# relative_period() period-over-period preserves the inner aggregation.
last_month = store.get_metric("revenue_last_month")
assert last_month is not None
assert last_month.sql == "SUM(kitchen_orders.amount)"

def test_extends_merge(self, kitchen_sink_layer):
extended = kitchen_sink_layer.graph.models["kitchen_orders_extended"]
inline = kitchen_sink_layer.graph.models["kitchen_orders_inline"]
Expand Down
Loading
Loading