From d05a728e9447e76162f0f0c71d5e83057733bbf3 Mon Sep 17 00:00:00 2001 From: nick-gorman Date: Tue, 26 May 2026 15:25:05 +1000 Subject: [PATCH 1/2] Distinguish 'Downloading' from 'Extracting cached zip' in INFO logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before this change, _download_data unconditionally logged "Downloading data for table X, year Y, month M" whenever the cache-hit check at the outer layer missed — even when the actual network fetch inside download_to_path short-circuited because the zip was already on disk (e.g. fformat='csv' with keep_csv=False, where the CSV is deleted after every read but the zip persists). Reading the log line, users reasonably conclude every call hammers AEMO when in fact only the local unzip is being repeated. Bubble the True/False "did a network fetch happen" signal that download_to_path already returns up through download_unzip_csv, the four _download_and_unpack_* helpers, and all six run* functions. The run* functions now return True on a real fetch, False on a zip reuse, None on failure (a warning has already been emitted downstream). _download_data reads that signal and picks the verb: "Downloading and extracting data" on a real fetch, "Extracting cached zip" on a reuse, nothing on failure. The first-call verb names both steps explicitly so the second message reads as a proper subset (network skipped) rather than a different operation. No existing test asserts on the prior implicit None return of the run* functions or the absent return of download_unzip_csv, so the contract change is backwards-compatible. tests/test_downloader.py still passes. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/nemosis/data_fetch_methods.py | 32 +++++++++++++++------- src/nemosis/downloader.py | 45 ++++++++++++++++++++++++------- 2 files changed, 58 insertions(+), 19 deletions(-) diff --git a/src/nemosis/data_fetch_methods.py b/src/nemosis/data_fetch_methods.py index 009caa6..ec35a8c 100644 --- a/src/nemosis/data_fetch_methods.py +++ b/src/nemosis/data_fetch_methods.py @@ -857,28 +857,42 @@ def _download_data( Dispatch table to downloader to be downloaded. Returns: nothing + + Logging is honest about whether we actually contacted AEMO: the + `run*` functions return True if a network fetch occurred, False if + a previously-downloaded zip on disk was reused, or None if the + attempt failed (in which case a warning has already been emitted + downstream). Both branches also extract the zip; the verb on a real + fetch is "Downloading and extracting" to make that explicit, so the + "Extracting cached zip" message reads as a proper subset (skipped + the network) rather than something different. Users aren't misled + into thinking every call hammers AEMO when in fact only the extract + step is being repeated (see #TBD discussion in user-testing + exploration). """ - if chunk == 1: + fetched = _processing_info_maps.downloader[table_type]( + year, month, day, chunk, index, filename_stub, raw_data_location, + keep_zip=keep_zip, + ) + + if chunk == 1 and fetched is not None: + verb = "Downloading and extracting data" if fetched else "Extracting cached zip" if day is None: logger.info( - f"Downloading data for table {table_name}, " + f"year {year}, month {month}" + f"{verb} for table {table_name}, " + + f"year {year}, month {month}" ) elif index is None: logger.info( - f"Downloading data for table {table_name}, " + f"{verb} for table {table_name}, " + f"year {year}, month {month}, day {day}" ) else: logger.info( - f"Downloading data for table {table_name}, " + f"{verb} for table {table_name}, " + f"year {year}, month {month}, day {day}," + f"time {index}." ) - - _processing_info_maps.downloader[table_type]( - year, month, day, chunk, index, filename_stub, raw_data_location, - keep_zip=keep_zip, - ) return diff --git a/src/nemosis/downloader.py b/src/nemosis/downloader.py index 55fc643..8153cee 100644 --- a/src/nemosis/downloader.py +++ b/src/nemosis/downloader.py @@ -101,7 +101,12 @@ def _pre_check_file_is_missing(file_url): def run(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=True): - """This function""" + """ + Returns True if a network fetch occurred, False if a cached zip + was reused, or None if the attempt failed (in which case a warning + has already been emitted). Callers use this to log honestly about + whether AEMO was contacted (see _download_data). + """ url = defaults.aemo_mms_url # Add the year and month information to the generic AEMO data url @@ -109,26 +114,28 @@ def run(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=Tr # Perform the download, unzipping saving of the file try: - download_unzip_csv(url_formatted, down_load_to, keep_zip=keep_zip) + return download_unzip_csv(url_formatted, down_load_to, keep_zip=keep_zip) except Exception as e: if chunk == 1: logger.warning(f"{filename_stub} not downloaded ({e})") + return None def run_bid_tables(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=True): if day is None: - run(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=keep_zip) + return run(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=keep_zip) else: try: filename_stub = "BIDMOVE_COMPLETE_{year}{month}{day}".format(year=year, month=month, day=day) download_url = _get_current_url( filename_stub, defaults.current_data_page_urls["BIDDING"]) - _download_and_unpack_bid_move_complete_files( + return _download_and_unpack_bid_move_complete_files( download_url, down_load_to, keep_zip=keep_zip ) except Exception as e: logger.warning(f"{filename_stub} not downloaded ({e})") + return None def run_next_day_region_tables(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=True): @@ -137,11 +144,12 @@ def run_next_day_region_tables(year, month, day, chunk, index, filename_stub, do download_url = _get_current_url( filename_stub, defaults.current_data_page_urls["DAILY_REGION_SUMMARY"]) - _download_and_unpack_next_region_tables( + return _download_and_unpack_next_region_tables( download_url, down_load_to, keep_zip=keep_zip ) except Exception as e: logger.warning(f"{filename_stub} not downloaded ({e})") + return None def run_next_dispatch_tables(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=True): @@ -150,9 +158,10 @@ def run_next_dispatch_tables(year, month, day, chunk, index, filename_stub, down download_url = _get_current_url( filename_stub, defaults.current_data_page_urls["NEXT_DAY_DISPATCHLOAD"]) - _download_and_unpack_next_dispatch_load_files_complete_files(download_url, down_load_to, keep_zip=keep_zip) + return _download_and_unpack_next_dispatch_load_files_complete_files(download_url, down_load_to, keep_zip=keep_zip) except Exception as e: logger.warning(f"{filename_stub} not downloaded ({e})") + return None def run_intermittent_gen_scada(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=True): @@ -160,9 +169,10 @@ def run_intermittent_gen_scada(year, month, day, chunk, index, filename_stub, do download_url = _get_current_url( filename_stub, defaults.current_data_page_urls["INTERMITTENT_GEN_SCADA"]) - _download_and_unpack_intermittent_gen_scada_file(download_url, down_load_to, keep_zip=keep_zip) + return _download_and_unpack_intermittent_gen_scada_file(download_url, down_load_to, keep_zip=keep_zip) except Exception as e: logger.warning(f"{filename_stub} not downloaded ({e})") + return None def _get_current_url(filename_stub, current_page_url): @@ -210,6 +220,7 @@ def _download_and_unpack_bid_move_complete_files( finally: if downloaded and not keep_zip and os.path.isfile(zip_local_path): os.unlink(zip_local_path) + return downloaded def _download_and_unpack_next_region_tables( @@ -243,6 +254,7 @@ def _download_and_unpack_next_region_tables( finally: if downloaded and not keep_zip and os.path.isfile(zip_local_path): os.unlink(zip_local_path) + return downloaded def _download_and_unpack_next_dispatch_load_files_complete_files( @@ -272,6 +284,7 @@ def _download_and_unpack_next_dispatch_load_files_complete_files( finally: if downloaded and not keep_zip and os.path.isfile(zip_local_path): os.unlink(zip_local_path) + return downloaded def _download_and_unpack_intermittent_gen_scada_file( @@ -301,6 +314,7 @@ def _download_and_unpack_intermittent_gen_scada_file( finally: if downloaded and not keep_zip and os.path.isfile(zip_local_path): os.unlink(zip_local_path) + return downloaded def _find_start_row_nth_table(sub_folder_zipfile, file_name, n): @@ -322,7 +336,12 @@ def _find_start_row_nth_table(sub_folder_zipfile, file_name, n): def run_fcas4s(year, month, day, chunk, index, filename_stub, down_load_to, keep_zip=True): - """This function""" + """ + Returns True if a network fetch occurred, False if a cached zip + was reused, or None if both fallback URLs failed (in which case a + warning has already been emitted unless the CSV was already on + disk from a prior call). + """ # Add the year and month information to the generic AEMO data url url_formatted_latest = defaults.fcas_4_url.format(year, month, day, index) @@ -331,16 +350,17 @@ def run_fcas4s(year, month, day, chunk, index, filename_stub, down_load_to, keep ) # Perform the download, unzipping saving of the file try: - download_unzip_csv(url_formatted_latest, down_load_to, keep_zip=keep_zip) + return download_unzip_csv(url_formatted_latest, down_load_to, keep_zip=keep_zip) except Exception: try: - download_unzip_csv(url_formatted_hist, down_load_to, keep_zip=keep_zip) + return download_unzip_csv(url_formatted_hist, down_load_to, keep_zip=keep_zip) except Exception as e: # FCAS csvs are bundled in 30 minute bundles # Check if the csv exists before warning file_check = os.path.join(down_load_to, filename_stub + ".csv") if not os.path.isfile(file_check): logger.warning(f"{filename_stub} not downloaded {(e)}") + return None def download_to_dir(url, down_load_to, force_redo=False): """ @@ -427,6 +447,10 @@ def download_unzip_csv(url, down_load_to, keep_zip=True): touches zips this call actually downloaded — pre-existing zips (from a previous call, or another concurrent process) are left alone. + + Returns True if a network fetch occurred, False if a cached zip + already on disk was reused. Callers use this to log honestly + about whether AEMO was contacted. """ zip_local_path, downloaded = download_to_dir(url, down_load_to) try: @@ -435,6 +459,7 @@ def download_unzip_csv(url, down_load_to, keep_zip=True): finally: if downloaded and not keep_zip and os.path.isfile(zip_local_path): os.unlink(zip_local_path) + return downloaded def download_csv(url, path_and_name): From 63edfee11a6d6939dea2c4cef4cb58fbc07d7885 Mon Sep 17 00:00:00 2001 From: nick-gorman Date: Tue, 26 May 2026 17:21:44 +1000 Subject: [PATCH 2/2] Tighten input validation and quiet redundant WARNINGs from user-testing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to exploratory user-testing of DISPATCHPRICE workflows. Five small fixes, all in src/nemosis/data_fetch_methods.py (plus one loose test trimmed to match the new contract). Input validation (new UserInputErrors instead of silent bad behaviour): - select_columns / filter_cols / filter_values: catch typos, length mismatches, and missing pairs up front via new helpers _validate_user_select_columns, _validate_filter_args, and a _check_loaded_select_columns post-load membership check. Previously select_columns=['SETTLEMENTDATE', 'rrp'] returned a stub DataFrame, mismatched filter_cols/filter_values lengths silently dropped filters, and filter_cols with no filter_values raised a bare TypeError from inside zip(). - start_time / end_time: reject inverted (end < start) and zero-length (end == start) windows. Previously both silently returned an empty DataFrame after downloading a whole month for nothing. New helper _validate_time_window wired into dynamic_data_compiler and cache_compiler. Polish: - reset_index(drop=True) on dynamic_data_compiler and static_table returns so callers can use .loc[0] / .iloc[0] without hitting the underlying parquet row indices (e.g. 20162, 20167, ...). Quieter logs: - Plumb a user_select_columns flag through _dynamic_data_fetch_loop / _perform_column_selection / _validate_select_columns. Defaults-only column rejections (e.g. RAISE1SECRRP / LOWER1SECRRP on pre-1-sec-FCAS vintages) now log at DEBUG instead of WARNING. User-typed missing columns still WARNING + raise via _check_loaded_select_columns. - Demote the redundant "Loading data from .../FILE.parquet failed." WARNING that fires after a download 404 to DEBUG, with rewording ("No cached data file present at X (upstream download likely 404'd)."). The 404 itself is already announced by the downloader; the old message read as local cache corruption. Test: - tests/.../test_interconnector_constraint.py: drop IMPORTLIMIT and EXPORTLIMIT from select_columns. The test asserted neither, and these columns aren't in the cached fixture parquet — the test was relying on the silent-stub behaviour now being fixed. All 428 tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/nemosis/data_fetch_methods.py | 202 +++++++++++++++++- .../test_interconnector_constraint.py | 2 +- 2 files changed, 193 insertions(+), 11 deletions(-) diff --git a/src/nemosis/data_fetch_methods.py b/src/nemosis/data_fetch_methods.py index ec35a8c..410967c 100644 --- a/src/nemosis/data_fetch_methods.py +++ b/src/nemosis/data_fetch_methods.py @@ -98,6 +98,16 @@ def dynamic_data_compiler( "If select_columns='all' is used fformat='csv' must be used." ) + _validate_user_select_columns(select_columns, table_name) + _validate_filter_args(filter_cols, filter_values) + _validate_time_window(start_time, end_time) + + # Remember whether the user explicitly asked for columns, so we can do a + # post-load check below. Columns inherited from defaults are allowed to + # silently be missing from old data vintages (e.g. RAISE1SECRRP before + # 1-sec FCAS existed); columns the user explicitly typed should not be. + user_select_columns = select_columns + ( start_time, end_time, @@ -133,6 +143,7 @@ def dynamic_data_compiler( keep_zip=keep_zip, rebuild=rebuild, write_kwargs=kwargs, + user_select_columns=user_select_columns, ) if data_tables: all_data = _pd.concat(data_tables, sort=False) @@ -153,6 +164,11 @@ def dynamic_data_compiler( all_data = _filter_on_column_value( all_data, filter_cols, filter_values ) + _check_loaded_select_columns(all_data, user_select_columns, table_name) + # Reset index so callers can do .loc[0] / .iloc[0] on the returned + # DataFrame; without this, rows carry the indices from the underlying + # parquet/feather/CSV (e.g. 20162, 20167, ...). + all_data = all_data.reset_index(drop=True) logger.info(f"Returning {table_name}.") return all_data else: @@ -248,6 +264,11 @@ def cache_compiler( ) ) + _validate_user_select_columns(select_columns, table_name) + _validate_time_window(start_time, end_time) + + user_select_columns = select_columns + logger.info(f"Caching data for table {table_name}") ( @@ -275,6 +296,7 @@ def cache_compiler( caching_mode=True, rebuild=rebuild, write_kwargs=kwargs, + user_select_columns=user_select_columns, ) return @@ -336,6 +358,9 @@ def static_table( if table_name not in _defaults.static_tables: raise UserInputError("Table name provided is not a static table.") + _validate_user_select_columns(select_columns, table_name) + _validate_filter_args(filter_cols, filter_values) + if filter_cols and not set(filter_cols).issubset(set(select_columns)): raise UserInputError( ( @@ -398,6 +423,10 @@ def static_table( for finaliser in static_table_finalisers: table = finaliser(table, table_name) + # Reset index so callers can do .loc[0] / .iloc[0] on the returned + # DataFrame; without this, rows carry the original on-disk indices + # after filtering. + table = table.reset_index(drop=True) return table @@ -602,6 +631,7 @@ def _dynamic_data_fetch_loop( caching_mode=False, rebuild=False, write_kwargs={}, + user_select_columns=None, ): """ Loops through generated dates and checks if the appropriate file exists. @@ -688,7 +718,9 @@ def _dynamic_data_fetch_loop( ) if caching_mode: - data = _perform_column_selection(data, select_columns, full_filename) + data = _perform_column_selection( + data, select_columns, full_filename, user_select_columns + ) if data is not None and fformat != "csv": _log_file_creation_message(fformat, table_name, year, month, day, index) @@ -704,11 +736,21 @@ def _dynamic_data_fetch_loop( if date_filter is not None: data = date_filter(data, start_time, end_time) - data = _perform_column_selection(data, select_columns, full_filename) + data = _perform_column_selection( + data, select_columns, full_filename, user_select_columns + ) data_tables.append(data) elif not caching_mode and chunk == 1: - logger.warning(f"Loading data from {full_filename} failed.") + # Demoted from WARNING to DEBUG: when we reach here, the + # cache file doesn't exist on disk — which means the + # download upstream already failed and the downloader + # already emitted its own warning (e.g. "PUBLIC_X not + # downloaded (404 ...)"). Repeating "Loading data from + # .../FILE.parquet failed" at WARNING level reads as + # local cache corruption and confuses users who hit the + # 404 path for historical / future / missing months. + logger.debug(f"No cached data file present at {full_filename} (upstream download likely 404'd).") if data is None or '#' not in filename_stub: check_for_next_data_chunk = False @@ -716,9 +758,11 @@ def _dynamic_data_fetch_loop( return data_tables -def _perform_column_selection(data, select_columns, full_filename): +def _perform_column_selection(data, select_columns, full_filename, user_select_columns=None): if select_columns != "all": - keep_cols = _validate_select_columns(data, select_columns, full_filename) + keep_cols = _validate_select_columns( + data, select_columns, full_filename, user_select_columns + ) if keep_cols: data = data.loc[:, keep_cols] else: @@ -750,12 +794,20 @@ def _create_filename( return filename_stub, full_filename, path_and_name -def _validate_select_columns(data, select_columns, full_filename): +def _validate_select_columns(data, select_columns, full_filename, user_select_columns=None): """ Checks whether select_columns are in the file. If at least one is, then it will return any of select_columns that are available as well as the date col (for date filtering). If not, it will return an empty list. + The warning about missing columns is only escalated to WARNING level + when the missing column was explicitly typed by the user (i.e. is in + user_select_columns). Missing columns that came from + defaults.table_columns are an internal NEMOSIS-vs-AEMO-vintage mismatch + (e.g. RAISE1SECRRP / LOWER1SECRRP didn't exist in DISPATCHPRICE before + 1-sec FCAS in late 2023) and would otherwise spam WARNING on every + historical cache read — so those are demoted to DEBUG. + Returns: List """ file_cols = data.columns @@ -765,13 +817,143 @@ def _validate_select_columns(data, select_columns, full_filename): return [] else: if rejected_cols: - logger.warning( - f"{rejected_cols} not in {full_filename}. " - + f"Loading {available_cols}" - ) + user_set = set(user_select_columns or []) + user_typed_missing = rejected_cols & user_set + if user_typed_missing: + logger.warning( + f"{rejected_cols} not in {full_filename}. " + + f"Loading {available_cols}" + ) + else: + # All rejected columns came from defaults — quiet noise. + logger.debug( + f"{rejected_cols} not in {full_filename} " + "(columns from defaults, not in this vintage). " + f"Loading {available_cols}" + ) return available_cols +def _validate_user_select_columns(select_columns, table_name): + """Up-front sanity check on user-supplied select_columns: shape only. + + The strict membership check (does every name actually exist?) happens + after the data has been loaded — see _check_loaded_select_columns — + because NEMOSIS legitimately allows users to ask for any column the + AEMO file contains, not just those listed in defaults.table_columns. + """ + if select_columns is None or select_columns == "all": + return + if not isinstance(select_columns, (list, tuple)): + raise UserInputError( + f"select_columns must be a list or the string 'all', " + f"got {type(select_columns).__name__}." + ) + + +def _check_loaded_select_columns(all_data, user_select_columns, table_name): + """Post-load check: every column the user explicitly asked for must + actually have made it into the returned DataFrame. + + Catches the silent-stub bug where select_columns=['SETTLEMENTDATE', 'rrp'] + (lowercase typo) used to return a 1-column DataFrame with just + SETTLEMENTDATE and a WARNING. We only validate columns the user + explicitly typed — defaults-inherited columns that happen to be missing + from a particular vintage (e.g. RAISE1SECRRP pre-2023) keep the legacy + warning behaviour. + """ + if user_select_columns is None or user_select_columns == "all": + return + missing = [c for c in user_select_columns if c not in all_data.columns] + if missing: + raise UserInputError( + f"select_columns contains {missing} which are not present in " + f"the data for table {table_name}. Available columns: " + f"{sorted(all_data.columns)}. Check for typos (column names " + f"are case-sensitive); to use a column not in the NEMOSIS " + f"default set, see the README section " + f"'Accessing additional table columns'." + ) + + +def _validate_time_window(start_time, end_time): + """Reject inverted (end < start) or zero-length (end == start) windows. + + Without this, an inverted window silently returns an empty DataFrame + and a zero-length window downloads a whole month just to return + (0, N) rows. + + We parse locally for the comparison and discard the result; the caller + re-parses through its existing flow. Cheap and avoids changing the + call-site shape. + """ + if start_time is None or end_time is None: + raise UserInputError( + "start_time and end_time are required, got " + f"start_time={start_time!r}, end_time={end_time!r}." + ) + # parse_datetime_py raises ValueError for bad strings / tz-aware + # datetimes. We let it propagate unchanged here so the existing error + # contract (e.g. tz-aware → "Conversion between timezones not + # implemented") is preserved. + start_dt = _parse_datetime_py(start_time, midnight='start') + end_dt = _parse_datetime_py(end_time, midnight='end') + if end_dt < start_dt: + raise UserInputError( + f"end_time ({end_time}) is before start_time ({start_time}). " + "Pass a window where end_time > start_time." + ) + if end_dt == start_dt: + raise UserInputError( + f"start_time ({start_time}) and end_time ({end_time}) resolve " + "to the same instant — zero-length window. Pass a window " + "where end_time > start_time." + ) + + +def _validate_filter_args(filter_cols, filter_values): + """Validate filter_cols / filter_values shape before downstream code zips + them. Without this: + - filter_cols supplied with filter_values=None raises a bare + ``TypeError: 'NoneType' object is not iterable`` from inside zip(). + - filter_cols and filter_values of mismatched length silently drop the + trailing filter_cols (zip truncates to the shorter), which can + quietly produce a much wider result than the user expected — the + README's "exclude intervention" pattern is one such case. + """ + if filter_cols is None and filter_values is None: + return + if filter_cols is None: + raise UserInputError( + "filter_values was provided but filter_cols is None. " + "Pass both, or neither." + ) + if filter_values is None: + raise UserInputError( + "filter_cols was provided but filter_values is None. " + "filter_values must be a tuple of lists, one list per " + "filter_col (e.g. filter_cols=['REGIONID'], " + "filter_values=(['SA1'],))." + ) + if not isinstance(filter_cols, (list, tuple)): + raise UserInputError( + f"filter_cols must be a list, got {type(filter_cols).__name__}." + ) + if not isinstance(filter_values, (list, tuple)): + raise UserInputError( + f"filter_values must be a tuple of lists, got " + f"{type(filter_values).__name__}." + ) + if len(filter_cols) != len(filter_values): + raise UserInputError( + f"filter_cols has {len(filter_cols)} entries but filter_values " + f"has {len(filter_values)}. Each filter_col needs exactly one " + f"corresponding list of values in filter_values " + f"(e.g. filter_cols=['REGIONID', 'INTERVENTION'], " + f"filter_values=(['SA1'], [0]))." + ) + + def _log_file_creation_message(fformat, table_name, year, month, day, index): logstr = f"Creating {fformat} file for " + f"{table_name}, {year}, {month}" if day is None: diff --git a/tests/end_to_end_table_tests/test_interconnector_constraint.py b/tests/end_to_end_table_tests/test_interconnector_constraint.py index ee26fff..b0227df 100644 --- a/tests/end_to_end_table_tests/test_interconnector_constraint.py +++ b/tests/end_to_end_table_tests/test_interconnector_constraint.py @@ -16,7 +16,7 @@ def test_interconnector_constraint_returns_fixtured_rows(nemosis_fixture, monkey end_time="2021/05/01 01:00:00", table_name="INTERCONNECTORCONSTRAINT", raw_data_location=str(nemosis_fixture), - select_columns=["INTERCONNECTORID", "EFFECTIVEDATE", "VERSIONNO", "IMPORTLIMIT", "EXPORTLIMIT"], + select_columns=["INTERCONNECTORID", "EFFECTIVEDATE", "VERSIONNO"], ) assert not data.empty