Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 215 additions & 19 deletions src/nemosis/data_fetch_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ def dynamic_data_compiler(
"If select_columns='all' is used fformat='csv' must be used."
)

_validate_user_select_columns(select_columns, table_name)
_validate_filter_args(filter_cols, filter_values)
_validate_time_window(start_time, end_time)

# Remember whether the user explicitly asked for columns, so we can do a
# post-load check below. Columns inherited from defaults are allowed to
# silently be missing from old data vintages (e.g. RAISE1SECRRP before
# 1-sec FCAS existed); columns the user explicitly typed should not be.
user_select_columns = select_columns

(
start_time,
end_time,
Expand Down Expand Up @@ -133,6 +143,7 @@ def dynamic_data_compiler(
keep_zip=keep_zip,
rebuild=rebuild,
write_kwargs=kwargs,
user_select_columns=user_select_columns,
)
if data_tables:
all_data = _pd.concat(data_tables, sort=False)
Expand All @@ -153,6 +164,11 @@ def dynamic_data_compiler(
all_data = _filter_on_column_value(
all_data, filter_cols, filter_values
)
_check_loaded_select_columns(all_data, user_select_columns, table_name)
# Reset index so callers can do .loc[0] / .iloc[0] on the returned
# DataFrame; without this, rows carry the indices from the underlying
# parquet/feather/CSV (e.g. 20162, 20167, ...).
all_data = all_data.reset_index(drop=True)
logger.info(f"Returning {table_name}.")
return all_data
else:
Expand Down Expand Up @@ -248,6 +264,11 @@ def cache_compiler(
)
)

_validate_user_select_columns(select_columns, table_name)
_validate_time_window(start_time, end_time)

user_select_columns = select_columns

logger.info(f"Caching data for table {table_name}")

(
Expand Down Expand Up @@ -275,6 +296,7 @@ def cache_compiler(
caching_mode=True,
rebuild=rebuild,
write_kwargs=kwargs,
user_select_columns=user_select_columns,
)
return

Expand Down Expand Up @@ -336,6 +358,9 @@ def static_table(
if table_name not in _defaults.static_tables:
raise UserInputError("Table name provided is not a static table.")

_validate_user_select_columns(select_columns, table_name)
_validate_filter_args(filter_cols, filter_values)

if filter_cols and not set(filter_cols).issubset(set(select_columns)):
raise UserInputError(
(
Expand Down Expand Up @@ -398,6 +423,10 @@ def static_table(
for finaliser in static_table_finalisers:
table = finaliser(table, table_name)

# Reset index so callers can do .loc[0] / .iloc[0] on the returned
# DataFrame; without this, rows carry the original on-disk indices
# after filtering.
table = table.reset_index(drop=True)
return table


Expand Down Expand Up @@ -602,6 +631,7 @@ def _dynamic_data_fetch_loop(
caching_mode=False,
rebuild=False,
write_kwargs={},
user_select_columns=None,
):
"""
Loops through generated dates and checks if the appropriate file exists.
Expand Down Expand Up @@ -688,7 +718,9 @@ def _dynamic_data_fetch_loop(
)

if caching_mode:
data = _perform_column_selection(data, select_columns, full_filename)
data = _perform_column_selection(
data, select_columns, full_filename, user_select_columns
)

if data is not None and fformat != "csv":
_log_file_creation_message(fformat, table_name, year, month, day, index)
Expand All @@ -704,21 +736,33 @@ def _dynamic_data_fetch_loop(
if date_filter is not None:
data = date_filter(data, start_time, end_time)

data = _perform_column_selection(data, select_columns, full_filename)
data = _perform_column_selection(
data, select_columns, full_filename, user_select_columns
)

data_tables.append(data)
elif not caching_mode and chunk == 1:
logger.warning(f"Loading data from {full_filename} failed.")
# Demoted from WARNING to DEBUG: when we reach here, the
# cache file doesn't exist on disk — which means the
# download upstream already failed and the downloader
# already emitted its own warning (e.g. "PUBLIC_X not
# downloaded (404 ...)"). Repeating "Loading data from
# .../FILE.parquet failed" at WARNING level reads as
# local cache corruption and confuses users who hit the
# 404 path for historical / future / missing months.
logger.debug(f"No cached data file present at {full_filename} (upstream download likely 404'd).")

if data is None or '#' not in filename_stub:
check_for_next_data_chunk = False

return data_tables


def _perform_column_selection(data, select_columns, full_filename):
def _perform_column_selection(data, select_columns, full_filename, user_select_columns=None):
if select_columns != "all":
keep_cols = _validate_select_columns(data, select_columns, full_filename)
keep_cols = _validate_select_columns(
data, select_columns, full_filename, user_select_columns
)
if keep_cols:
data = data.loc[:, keep_cols]
else:
Expand Down Expand Up @@ -750,12 +794,20 @@ def _create_filename(
return filename_stub, full_filename, path_and_name


def _validate_select_columns(data, select_columns, full_filename):
def _validate_select_columns(data, select_columns, full_filename, user_select_columns=None):
"""
Checks whether select_columns are in the file. If at least one is,
then it will return any of select_columns that are available as well as
the date col (for date filtering). If not, it will return an empty list.

The warning about missing columns is only escalated to WARNING level
when the missing column was explicitly typed by the user (i.e. is in
user_select_columns). Missing columns that came from
defaults.table_columns are an internal NEMOSIS-vs-AEMO-vintage mismatch
(e.g. RAISE1SECRRP / LOWER1SECRRP didn't exist in DISPATCHPRICE before
1-sec FCAS in late 2023) and would otherwise spam WARNING on every
historical cache read — so those are demoted to DEBUG.

Returns: List
"""
file_cols = data.columns
Expand All @@ -765,13 +817,143 @@ def _validate_select_columns(data, select_columns, full_filename):
return []
else:
if rejected_cols:
logger.warning(
f"{rejected_cols} not in {full_filename}. "
+ f"Loading {available_cols}"
)
user_set = set(user_select_columns or [])
user_typed_missing = rejected_cols & user_set
if user_typed_missing:
logger.warning(
f"{rejected_cols} not in {full_filename}. "
+ f"Loading {available_cols}"
)
else:
# All rejected columns came from defaults — quiet noise.
logger.debug(
f"{rejected_cols} not in {full_filename} "
"(columns from defaults, not in this vintage). "
f"Loading {available_cols}"
)
return available_cols


def _validate_user_select_columns(select_columns, table_name):
"""Up-front sanity check on user-supplied select_columns: shape only.

The strict membership check (does every name actually exist?) happens
after the data has been loaded — see _check_loaded_select_columns —
because NEMOSIS legitimately allows users to ask for any column the
AEMO file contains, not just those listed in defaults.table_columns.
"""
if select_columns is None or select_columns == "all":
return
if not isinstance(select_columns, (list, tuple)):
raise UserInputError(
f"select_columns must be a list or the string 'all', "
f"got {type(select_columns).__name__}."
)


def _check_loaded_select_columns(all_data, user_select_columns, table_name):
"""Post-load check: every column the user explicitly asked for must
actually have made it into the returned DataFrame.

Catches the silent-stub bug where select_columns=['SETTLEMENTDATE', 'rrp']
(lowercase typo) used to return a 1-column DataFrame with just
SETTLEMENTDATE and a WARNING. We only validate columns the user
explicitly typed — defaults-inherited columns that happen to be missing
from a particular vintage (e.g. RAISE1SECRRP pre-2023) keep the legacy
warning behaviour.
"""
if user_select_columns is None or user_select_columns == "all":
return
missing = [c for c in user_select_columns if c not in all_data.columns]
if missing:
raise UserInputError(
f"select_columns contains {missing} which are not present in "
f"the data for table {table_name}. Available columns: "
f"{sorted(all_data.columns)}. Check for typos (column names "
f"are case-sensitive); to use a column not in the NEMOSIS "
f"default set, see the README section "
f"'Accessing additional table columns'."
)


def _validate_time_window(start_time, end_time):
"""Reject inverted (end < start) or zero-length (end == start) windows.

Without this, an inverted window silently returns an empty DataFrame
and a zero-length window downloads a whole month just to return
(0, N) rows.

We parse locally for the comparison and discard the result; the caller
re-parses through its existing flow. Cheap and avoids changing the
call-site shape.
"""
if start_time is None or end_time is None:
raise UserInputError(
"start_time and end_time are required, got "
f"start_time={start_time!r}, end_time={end_time!r}."
)
# parse_datetime_py raises ValueError for bad strings / tz-aware
# datetimes. We let it propagate unchanged here so the existing error
# contract (e.g. tz-aware → "Conversion between timezones not
# implemented") is preserved.
start_dt = _parse_datetime_py(start_time, midnight='start')
end_dt = _parse_datetime_py(end_time, midnight='end')
if end_dt < start_dt:
raise UserInputError(
f"end_time ({end_time}) is before start_time ({start_time}). "
"Pass a window where end_time > start_time."
)
if end_dt == start_dt:
raise UserInputError(
f"start_time ({start_time}) and end_time ({end_time}) resolve "
"to the same instant — zero-length window. Pass a window "
"where end_time > start_time."
)


def _validate_filter_args(filter_cols, filter_values):
"""Validate filter_cols / filter_values shape before downstream code zips
them. Without this:
- filter_cols supplied with filter_values=None raises a bare
``TypeError: 'NoneType' object is not iterable`` from inside zip().
- filter_cols and filter_values of mismatched length silently drop the
trailing filter_cols (zip truncates to the shorter), which can
quietly produce a much wider result than the user expected — the
README's "exclude intervention" pattern is one such case.
"""
if filter_cols is None and filter_values is None:
return
if filter_cols is None:
raise UserInputError(
"filter_values was provided but filter_cols is None. "
"Pass both, or neither."
)
if filter_values is None:
raise UserInputError(
"filter_cols was provided but filter_values is None. "
"filter_values must be a tuple of lists, one list per "
"filter_col (e.g. filter_cols=['REGIONID'], "
"filter_values=(['SA1'],))."
)
if not isinstance(filter_cols, (list, tuple)):
raise UserInputError(
f"filter_cols must be a list, got {type(filter_cols).__name__}."
)
if not isinstance(filter_values, (list, tuple)):
raise UserInputError(
f"filter_values must be a tuple of lists, got "
f"{type(filter_values).__name__}."
)
if len(filter_cols) != len(filter_values):
raise UserInputError(
f"filter_cols has {len(filter_cols)} entries but filter_values "
f"has {len(filter_values)}. Each filter_col needs exactly one "
f"corresponding list of values in filter_values "
f"(e.g. filter_cols=['REGIONID', 'INTERVENTION'], "
f"filter_values=(['SA1'], [0]))."
)


def _log_file_creation_message(fformat, table_name, year, month, day, index):
logstr = f"Creating {fformat} file for " + f"{table_name}, {year}, {month}"
if day is None:
Expand Down Expand Up @@ -857,28 +1039,42 @@ def _download_data(
Dispatch table to downloader to be downloaded.

Returns: nothing

Logging is honest about whether we actually contacted AEMO: the
`run*` functions return True if a network fetch occurred, False if
a previously-downloaded zip on disk was reused, or None if the
attempt failed (in which case a warning has already been emitted
downstream). Both branches also extract the zip; the verb on a real
fetch is "Downloading and extracting" to make that explicit, so the
"Extracting cached zip" message reads as a proper subset (skipped
the network) rather than something different. Users aren't misled
into thinking every call hammers AEMO when in fact only the extract
step is being repeated (see #TBD discussion in user-testing
exploration).
"""
if chunk == 1:
fetched = _processing_info_maps.downloader[table_type](
year, month, day, chunk, index, filename_stub, raw_data_location,
keep_zip=keep_zip,
)

if chunk == 1 and fetched is not None:
verb = "Downloading and extracting data" if fetched else "Extracting cached zip"
if day is None:
logger.info(
f"Downloading data for table {table_name}, " + f"year {year}, month {month}"
f"{verb} for table {table_name}, "
+ f"year {year}, month {month}"
)
elif index is None:
logger.info(
f"Downloading data for table {table_name}, "
f"{verb} for table {table_name}, "
+ f"year {year}, month {month}, day {day}"
)
else:
logger.info(
f"Downloading data for table {table_name}, "
f"{verb} for table {table_name}, "
+ f"year {year}, month {month}, day {day},"
+ f"time {index}."
)

_processing_info_maps.downloader[table_type](
year, month, day, chunk, index, filename_stub, raw_data_location,
keep_zip=keep_zip,
)
return


Expand Down
Loading
Loading