Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 183 additions & 2 deletions src/nemosis/data_fetch_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from nemosis import processing_info_maps as _processing_info_maps
from nemosis import date_generators as _date_generators
from nemosis import defaults as _defaults
from nemosis import query_wrappers as _query_wrappers
from nemosis.value_parser import _infer_column_data_types
from nemosis.date_generators import parse_datetime_py as _parse_datetime_py
from nemosis.custom_errors import UserInputError, NoDataToReturn, DataMismatchError
Expand Down Expand Up @@ -99,6 +100,7 @@ def dynamic_data_compiler(
)

_validate_user_select_columns(select_columns, table_name)
_validate_user_select_columns_includes_pk(select_columns, table_name)
_validate_filter_args(filter_cols, filter_values)
_validate_time_window(start_time, end_time)

Expand Down Expand Up @@ -159,7 +161,19 @@ def dynamic_data_compiler(
missing_columns = [
col for col in filter_cols if col not in all_data.columns
]
UserInputError(f"Filter columns {missing_columns} not in data.")
# Was previously `UserInputError(...)` (no `raise`) — a
# silent no-op that returned unfiltered data. Now
# properly raised; mostly unreachable thanks to
# `_check_loaded_select_columns` below, but kept as
# defence in depth (e.g. filter_cols came from defaults
# because user passed select_columns=None, and the
# default filter column isn't in this AEMO file
# vintage).
raise UserInputError(
f"Filter columns {missing_columns} not in data for "
f"table {table_name}. Available columns: "
f"{sorted(all_data.columns)}."
)
else:
all_data = _filter_on_column_value(
all_data, filter_cols, filter_values
Expand Down Expand Up @@ -265,6 +279,7 @@ def cache_compiler(
)

_validate_user_select_columns(select_columns, table_name)
_validate_user_select_columns_includes_pk(select_columns, table_name)
_validate_time_window(start_time, end_time)

user_select_columns = select_columns
Expand Down Expand Up @@ -359,8 +374,18 @@ def static_table(
raise UserInputError("Table name provided is not a static table.")

_validate_user_select_columns(select_columns, table_name)
_validate_user_select_columns_includes_pk(select_columns, table_name)
_validate_filter_args(filter_cols, filter_values)

# Remember whether the user explicitly asked for columns, so we can
# enforce strict membership after the file is loaded. Defaults-
# inherited columns are allowed to silently be missing from the
# static file (e.g. AEMO drops a column from the registration
# workbook); user-typed columns should not be — that's almost always
# a typo, and silently returning a stub DataFrame is worse than
# raising. Mirrors the dynamic_data_compiler contract.
user_select_columns = select_columns

if filter_cols and not set(filter_cols).issubset(set(select_columns)):
raise UserInputError(
(
Expand Down Expand Up @@ -412,10 +437,28 @@ def static_table(
for column in table.select_dtypes(["object"]).columns:
table[column] = table[column].map(lambda x: _strip_if_string(x))

# Reject before filtering: catches the silent-stub bug where a typo
# in user-typed select_columns (e.g. 'duid' instead of 'DUID') used
# to ship a DataFrame missing the requested column with only a
# WARNING. Mirrors `_check_loaded_select_columns` in
# dynamic_data_compiler.
_check_loaded_select_columns(table, user_select_columns, table_name)

if filter_cols is not None:
if not set(filter_cols).issubset(set(table.columns)):
missing_columns = [col for col in filter_cols if col not in table.columns]
UserInputError(f"Filter columns {missing_columns} not in data.")
# Was previously `UserInputError(...)` (no `raise`) — a
# silent no-op that returned unfiltered data. Now properly
# raised; this path is mostly unreachable thanks to the
# post-load select_columns check above, but kept as a
# defence in depth (e.g. filter_cols came from defaults
# because user passed select_columns=None, and the default
# filter column isn't in this AEMO file vintage).
raise UserInputError(
f"Filter columns {missing_columns} not in data for "
f"table {table_name}. Available columns: "
f"{sorted(table.columns)}."
)
else:
table = _filter_on_column_value(table, filter_cols, filter_values)

Expand Down Expand Up @@ -617,6 +660,49 @@ def _set_up_dynamic_compilers(table_name, start_time, end_time, select_columns):
return start_time, end_time, select_columns, date_filter, start_search


def _iteration_overlaps_window(year, month, day, index, user_start, user_end):
"""Does this date_gen iteration's data period overlap the user's
[user_start, user_end) window?

Used to distinguish iterations the user actually asked for from
book-keeping iterations the fetch loop makes:
- the 1-day buffer-back month before `start_time`,
- the ~120 historical months a `search_type='all'` table scans
to compute "snapshot as of start_time".

A user asking for 16 months of an effective-date table should not
be warned that we didn't get data for 1997-11; they didn't ask for
1997-11. A user asking for 16 months of a current/scraped table
against AEMO's 6-week retention SHOULD be warned that ~450 days of
their request had no data.

Period extents (right-open, matching AEMO's end-of-interval
convention):
- (year, month, day=None, index=None): one calendar month
- (year, month, day, index=None): one calendar day
- (year, month, day, index="HHMM"): one 5-minute FCAS slot
"""
year_i = int(year)
month_i = int(month)
if day is None:
period_start = _datetime(year_i, month_i, 1)
if month_i == 12:
period_end = _datetime(year_i + 1, 1, 1)
else:
period_end = _datetime(year_i, month_i + 1, 1)
elif index is None:
day_i = int(day)
period_start = _datetime(year_i, month_i, day_i)
period_end = period_start + _timedelta(days=1)
else:
day_i = int(day)
hour_i = int(index[:2])
minute_i = int(index[2:])
period_start = _datetime(year_i, month_i, day_i, hour_i, minute_i)
period_end = period_start + _timedelta(minutes=5)
return period_start < user_end and period_end > user_start


def _dynamic_data_fetch_loop(
start_search,
start_time,
Expand Down Expand Up @@ -656,7 +742,29 @@ def _dynamic_data_fetch_loop(
start_search = start_search - _timedelta(days=1)
date_gen = date_gen_func(start_search, end_time)

# Track per-period success across the user's requested window so we
# can emit a single coverage-gap summary at the end. Without it, a
# multi-day query against e.g. INTERMITTENT_GEN_SCADA whose range
# mostly falls outside AEMO's Reports/Current/ retention window
# silently returns just the in-window days — the only signal is N
# separate per-file warnings that an aggregating user is unlikely
# to read.
#
# We count only iterations whose period overlaps the user's
# [start_time, end_time) window — so the buffer-back month (always
# one month before start_time) and the historical scan months for
# `search_type='all'` tables are excluded from the denominator.
# Any gap that remains is a gap in the user's actual request.
requested_periods = 0
successful_requested_periods = 0

for year, month, day, index in date_gen:
in_user_window = _iteration_overlaps_window(
year, month, day, index, start_time, end_time
)
if in_user_window:
requested_periods += 1
period_has_data = False
check_for_next_data_chunk = True
chunk = 0
while check_for_next_data_chunk:
Expand Down Expand Up @@ -741,6 +849,7 @@ def _dynamic_data_fetch_loop(
)

data_tables.append(data)
period_has_data = True
elif not caching_mode and chunk == 1:
# Demoted from WARNING to DEBUG: when we reach here, the
# cache file doesn't exist on disk — which means the
Expand All @@ -754,6 +863,30 @@ def _dynamic_data_fetch_loop(

if data is None or '#' not in filename_stub:
check_for_next_data_chunk = False
if period_has_data and in_user_window:
successful_requested_periods += 1

# Warn iff the user's requested window has a gap, AND we did get at
# least some data (zero-data is already covered by NoDataToReturn
# in the caller — a duplicate WARNING here would be noise).
if (
not caching_mode
and 0 < successful_requested_periods < requested_periods
):
missing = requested_periods - successful_requested_periods
coverage = successful_requested_periods / requested_periods
logger.warning(
f"Partial coverage for {table_name}: only "
f"{successful_requested_periods}/{requested_periods} "
f"periods in the requested window returned data "
f"({coverage:.0%}). {missing} period(s) missing — see "
f"per-period WARNINGs above for the specific files. "
f"Common causes: requested range extends beyond AEMO's "
f"Reports/Current/ retention window (typically ~6-7 weeks "
f"for the current/scraped tables); requested month not "
f"yet published in the MMSDM archive; requested range "
f"pre-dates AEMO data."
)

return data_tables

Expand Down Expand Up @@ -851,6 +984,54 @@ def _validate_user_select_columns(select_columns, table_name):
)


def _validate_user_select_columns_includes_pk(select_columns, table_name):
"""Ensure user-supplied select_columns contains every primary-key
column for tables whose processing pipeline dedupes on PK.

Without this, the dedup step inside finalise (`drop_duplicates_by_primary_key`
for dynamic tables, or `_finalise_excel_data` for the Generators
static table) raises a bare pandas `KeyError: Index([...], dtype='str')`
that points at pandas internals rather than the user's actual
mistake — they omitted a column NEMOSIS needs internally for
correct dedup semantics.

Defaults always include the PK columns (by construction of
`defaults.table_columns`), so this check only fires when the user
explicitly types a select_columns subset that's missing one. Pass
`select_columns=None` to fall back to defaults.
"""
if select_columns is None or select_columns == "all":
return
pk = _defaults.table_primary_keys.get(table_name)
if not pk:
return
# Dynamic-table dedup runs `drop_duplicates_by_primary_key` from the
# finalise pipeline.
finalise = _processing_info_maps.finalise.get(table_name)
is_dynamic_pk_dedup = bool(
finalise and any(
fn is _query_wrappers.drop_duplicates_by_primary_key for fn in finalise
)
)
# Static-table dedup runs `_finalise_excel_data`, which does
# `data.drop_duplicates(primary_keys)` if the table is in
# `defaults.table_primary_keys`.
static_finalisers = static_data_finaliser_map.get(table_name, [])
is_static_pk_dedup = _finalise_excel_data in static_finalisers
if not (is_dynamic_pk_dedup or is_static_pk_dedup):
return
missing = [c for c in pk if c not in select_columns]
if missing:
raise UserInputError(
f"select_columns for {table_name} must include the table's "
f"primary-key columns so NEMOSIS can dedupe correctly. "
f"Missing: {missing}. Full primary key: {pk}. Either add the "
f"missing column(s) to select_columns, or pass "
f"select_columns=None to use the table defaults (which "
f"already include the primary key)."
)


def _check_loaded_select_columns(all_data, user_select_columns, table_name):
"""Post-load check: every column the user explicitly asked for must
actually have made it into the returned DataFrame.
Expand Down
14 changes: 13 additions & 1 deletion src/nemosis/processing_info_maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,19 @@
query_wrappers.most_recent_records_before_start_time,
query_wrappers.drop_duplicates_by_primary_key,
],
"MARKET_PRICE_THRESHOLDS": None,
"MARKET_PRICE_THRESHOLDS": [
# `effective_date_group_col` for this table is `[]`, which sends
# `most_recent_records_before_start_time` down the `.tail(1)`
# branch — exactly one "as-of" row from before start_time. The
# subsequent dedup-by-PK then collapses the many-monthly-archives
# duplicates of every effective date `>= start_time` down to one
# row per (EFFECTIVEDATE, VERSIONNO). Previously `None` here,
# which left every monthly archive's copy of every effective
# date in the returned DataFrame — e.g. a 5-month query returned
# ~75× duplicates per row.
query_wrappers.most_recent_records_before_start_time,
query_wrappers.drop_duplicates_by_primary_key,
],
"DAILY_REGION_SUMMARY": None,
"ROOFTOP_PV_ACTUAL": [
query_wrappers.drop_duplicates_by_primary_key
Expand Down
14 changes: 14 additions & 0 deletions tests/end_to_end_table_tests/test_market_price_thresholds.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,17 @@ def test_market_price_thresholds_returns_fixtured_rows(nemosis_fixture, monkeypa
)

assert not data.empty

# PK invariant: every (EFFECTIVEDATE, VERSIONNO) pair appears at
# most once. Pre-fix, MARKET_PRICE_THRESHOLDS had `finalise=None` in
# processing_info_maps, so every monthly archive's copy of every
# effective date passed through — a 5-month query returned ~75×
# duplicates per row. Even on this 2-month fixture, the bug yields
# 2 copies of every row.
pk = ["EFFECTIVEDATE", "VERSIONNO"]
pair_counts = data.groupby(pk).size()
assert pair_counts.max() == 1, (
f"MARKET_PRICE_THRESHOLDS PK invariant broken: max rows per "
f"(EFFECTIVEDATE, VERSIONNO) = {int(pair_counts.max())}, "
f"expected 1. Likely regression in processing_info_maps."
)
Loading
Loading