-
Notifications
You must be signed in to change notification settings - Fork 0
feat(warehouses): Add support to backfill manually exported missing electricity data #381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
e05460b
8f81dff
5fa1a9f
bfad88b
4af72a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -17,21 +17,20 @@ | |||||||||||||||||||
|
|
||||||||||||||||||||
| import dlt | ||||||||||||||||||||
| import dlt.common.logger as logger | ||||||||||||||||||||
| from dlt.sources import TDataItems | ||||||||||||||||||||
| import pendulum | ||||||||||||||||||||
| import pandas as pd | ||||||||||||||||||||
| import pendulum | ||||||||||||||||||||
| import pyarrow.compute as pc | ||||||||||||||||||||
| import pytz | ||||||||||||||||||||
|
|
||||||||||||||||||||
| from dlt.sources import TDataItems | ||||||||||||||||||||
| from elt_common.cli_utils import cli_main | ||||||||||||||||||||
| from elt_common.dlt_sources.m365 import ( | ||||||||||||||||||||
| sharepoint, | ||||||||||||||||||||
| M365DriveItem, | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| from elt_common.dlt_destinations.pyiceberg.helpers import load_iceberg_table | ||||||||||||||||||||
| from elt_common.dlt_destinations.pyiceberg.pyiceberg_adapter import ( | ||||||||||||||||||||
| pyiceberg_adapter, | ||||||||||||||||||||
| PartitionTrBuilder, | ||||||||||||||||||||
| pyiceberg_adapter, | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| from elt_common.dlt_sources.m365 import ( | ||||||||||||||||||||
| M365DriveItem, | ||||||||||||||||||||
| sharepoint, | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| EXCEL_ENGINE = "calamine" | ||||||||||||||||||||
|
|
@@ -41,19 +40,45 @@ | |||||||||||||||||||
| PIPELINE_NAME = "electricity_sharepoint" | ||||||||||||||||||||
| RDM_TIMEZONE = "Europe/London" | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # File information | ||||||||||||||||||||
| # The file can contain a record from a single export or multiple records combined. Each record starts with | ||||||||||||||||||||
| # a header: | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # Source details | ||||||||||||||||||||
| # | ||||||||||||||||||||
| # There are 3 known formats for the data. Each contains a preamble of the form: | ||||||||||||||||||||
| # | ||||||||||||||||||||
| # Site Information: | ||||||||||||||||||||
| # RAL ISIS RDM | ||||||||||||||||||||
| # RAL ISIS RDM | ||||||||||||||||||||
| # Controller: ISIS | ||||||||||||||||||||
| # Controller description: ISIS Energy Totals | ||||||||||||||||||||
| # Status: Online | ||||||||||||||||||||
| # | ||||||||||||||||||||
| # This preamble is discarded from all formats. | ||||||||||||||||||||
| # | ||||||||||||||||||||
| # 1. Excel (old, manual export format) | ||||||||||||||||||||
| # ------------------------------------ | ||||||||||||||||||||
| # | ||||||||||||||||||||
| # These were produced by manual export from the original system. | ||||||||||||||||||||
| # | ||||||||||||||||||||
| # There are two columns: "Time,Total Power (MW)". Time is in the format "YYYY-mm-DDTHH:MM:SS". | ||||||||||||||||||||
| # | ||||||||||||||||||||
| # 2. Automated CSV export | ||||||||||||||||||||
| # ----------------------- | ||||||||||||||||||||
| # | ||||||||||||||||||||
| # An automation deposits .csv files at regular intervals. | ||||||||||||||||||||
| # | ||||||||||||||||||||
| # There are three columns: "Time,Date,ISIS Elec Total Power", where Time is in format HH:MM:SS and Date is in format DD/mm/YY. | ||||||||||||||||||||
| # Several of these files can be concatenated together to form larger timespan records - they are concatenated as is and repeated | ||||||||||||||||||||
| # preamble sections are not discarded. | ||||||||||||||||||||
| # | ||||||||||||||||||||
| # 3. Manual CSV export | ||||||||||||||||||||
| # -------------------- | ||||||||||||||||||||
| # | ||||||||||||||||||||
| # These were produced more recently by manual export from the original system. | ||||||||||||||||||||
| # There are three columns: "Time,ISIS Elec Total Energy,ISIS Elec Total Power", where Time is in format "DD/mm/YY HH:MM:SS". | ||||||||||||||||||||
| # The second column is discarded. | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| # Time Date ISIS Elec Total Power {MW} | ||||||||||||||||||||
| CSV_HEADER_ANCHOR = "time" | ||||||||||||||||||||
| CSV_PREAMBLE_ANCHOR = "time" | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| def to_utc(ts: pd.Series) -> pd.Series: | ||||||||||||||||||||
|
|
@@ -66,26 +91,38 @@ def csv_section_to_df(file_name: str, lines: Sequence[str]) -> pd.DataFrame | No | |||||||||||||||||||
| df = pd.read_csv(io.StringIO("\n".join(lines))) | ||||||||||||||||||||
| # clean up column name (strip any whitespace) | ||||||||||||||||||||
| df.columns = df.columns.str.strip() | ||||||||||||||||||||
| cols = [c for c in df.columns] | ||||||||||||||||||||
| assert len(cols) == 3 | ||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🩺 Stability & Availability | 🟠 Major | ⚡ Quick win Replace the assertion with recoverable CSV-shape validation.
Proposed fix cols = [c for c in df.columns]
- assert len(cols) == 3
+ if len(cols) != 3:
+ logger.warning(
+ f"Error loading section of {file_name!r}: expected 3 columns, got {len(cols)}. "
+ "There will be gaps in the data."
+ )
+ return None📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||
| try: | ||||||||||||||||||||
| df["DateTime"] = to_utc( | ||||||||||||||||||||
| pd.to_datetime(df["Date"] + " " + df["Time"], format="%d/%m/%y %H:%M:%S") # type: ignore | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| return df.drop(["Date", "Time"], axis=1) | ||||||||||||||||||||
| if "Date" in cols: | ||||||||||||||||||||
| # Automated CSV format | ||||||||||||||||||||
| df["DateTime"] = to_utc( | ||||||||||||||||||||
| pd.to_datetime( | ||||||||||||||||||||
| df["Date"] + " " + df["Time"], format="%d/%m/%y %H:%M:%S" | ||||||||||||||||||||
| ) # type: ignore | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| df = df.drop(["Date", "Time"], axis=1) | ||||||||||||||||||||
| else: | ||||||||||||||||||||
| # Manual CSV format | ||||||||||||||||||||
| df["DateTime"] = to_utc( | ||||||||||||||||||||
| pd.to_datetime(df["Time"], format="%d/%m/%y %H:%M:%S") | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| # Drop time and energy columns | ||||||||||||||||||||
| df = df.drop(["Time", cols[1]], axis=1) | ||||||||||||||||||||
| except (pytz.exceptions.AmbiguousTimeError, ValueError) as exc: | ||||||||||||||||||||
| logger.warning( | ||||||||||||||||||||
| f"'Error loading section of {file_name}'. There will be gaps in the data.\nDetails: {str(exc)}" | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| return None | ||||||||||||||||||||
|
|
||||||||||||||||||||
| return df | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| def read_power_consumption_csv( | ||||||||||||||||||||
| file_content: io.BytesIO, | ||||||||||||||||||||
| file_name: str, | ||||||||||||||||||||
| ) -> pd.DataFrame | None: | ||||||||||||||||||||
| """Read csv-formatted power consumption records. This can return None if there is not data in the file. | ||||||||||||||||||||
|
|
||||||||||||||||||||
| The Date & Time columns together to create a single DateTime column. | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| # See comment at the top of this describing the format | ||||||||||||||||||||
|
|
||||||||||||||||||||
| def _append_if_not_none(seq, df): | ||||||||||||||||||||
| if df is not None: | ||||||||||||||||||||
|
|
@@ -97,7 +134,7 @@ def _append_if_not_none(seq, df): | |||||||||||||||||||
| line = line.strip() | ||||||||||||||||||||
| line_lower = line.lower() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| if line_lower.startswith(CSV_HEADER_ANCHOR): | ||||||||||||||||||||
| if line_lower.startswith(CSV_PREAMBLE_ANCHOR): | ||||||||||||||||||||
| # Save any previous section | ||||||||||||||||||||
| if current_lines: | ||||||||||||||||||||
| _append_if_not_none( | ||||||||||||||||||||
|
|
@@ -127,11 +164,8 @@ def _append_if_not_none(seq, df): | |||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| def read_power_consumption_excel(file_content: io.BytesIO) -> pd.DataFrame: | ||||||||||||||||||||
| """Read an excel-formatted power consumption record. | ||||||||||||||||||||
| # See comment at the top of this describing the format | ||||||||||||||||||||
|
|
||||||||||||||||||||
| Expected columns: Time, Total Power (MW) | ||||||||||||||||||||
| The Time column is renamed DateTime for consistency. | ||||||||||||||||||||
| """ | ||||||||||||||||||||
| df = pd.read_excel(file_content, engine=EXCEL_ENGINE, skiprows=EXCEL_SKIP_ROWS) | ||||||||||||||||||||
| df = df.rename(columns={"Time": "DateTime"}) | ||||||||||||||||||||
| df["DateTime"] = to_utc(df["DateTime"]) | ||||||||||||||||||||
|
|
@@ -179,42 +213,39 @@ def read_as_dataframe(file_obj: M365DriveItem) -> pd.DataFrame | None: | |||||||||||||||||||
| yield df_batch | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| @dlt.resource(merge_key="DateTime", columns={"DateTime": {"nullable": False}}) | ||||||||||||||||||||
| @dlt.resource( | ||||||||||||||||||||
| primary_key="DateTime", | ||||||||||||||||||||
| write_disposition={"disposition": "merge", "strategy": "upsert"}, | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| def rdm_data( | ||||||||||||||||||||
| site_url: str = dlt.config.value, | ||||||||||||||||||||
| root_dir: str = dlt.config.value, | ||||||||||||||||||||
| backfill: bool = False, | ||||||||||||||||||||
| backfill_glob: str | None = None, | ||||||||||||||||||||
| ) -> Iterator[TDataItems]: | ||||||||||||||||||||
| if backfill: | ||||||||||||||||||||
| historic_xl = sharepoint( | ||||||||||||||||||||
| if backfill_glob is not None: | ||||||||||||||||||||
| file_globs = [backfill_glob] | ||||||||||||||||||||
| else: | ||||||||||||||||||||
| file_globs = [ | ||||||||||||||||||||
| f"{root_dir}/**/*.xlsx", | ||||||||||||||||||||
| f"{root_dir}/**/*-daily.csv", | ||||||||||||||||||||
| f"{root_dir}/**/*-manual-export.csv", | ||||||||||||||||||||
| ] | ||||||||||||||||||||
| modified_after = None | ||||||||||||||||||||
| else: | ||||||||||||||||||||
| file_globs = [f"{root_dir}/*-ISIS.csv"] | ||||||||||||||||||||
| modified_after = get_latest_timestamp(dlt.current.pipeline()) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| for file_glob in file_globs: | ||||||||||||||||||||
| file_listing = sharepoint( | ||||||||||||||||||||
| site_url=site_url, | ||||||||||||||||||||
| file_glob=f"{root_dir}/**/*.xlsx", | ||||||||||||||||||||
| file_glob=file_glob, | ||||||||||||||||||||
| extract_content=False, | ||||||||||||||||||||
| modified_after=modified_after, | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| reader = historic_xl | extract_content_and_read() | ||||||||||||||||||||
| yield from reader.apply_hints( | ||||||||||||||||||||
| write_disposition="merge", | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| historic_csv = sharepoint( | ||||||||||||||||||||
| site_url=site_url, | ||||||||||||||||||||
| file_glob=f"{root_dir}/**/*-daily.csv", | ||||||||||||||||||||
| extract_content=False, | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| reader = historic_csv | extract_content_and_read() | ||||||||||||||||||||
| yield from reader.apply_hints( | ||||||||||||||||||||
| write_disposition="merge", | ||||||||||||||||||||
| ) | ||||||||||||||||||||
|
|
||||||||||||||||||||
| latest_files = sharepoint( | ||||||||||||||||||||
| site_url=site_url, | ||||||||||||||||||||
| file_glob=f"{root_dir}/*-ISIS.csv", | ||||||||||||||||||||
| extract_content=False, | ||||||||||||||||||||
| modified_after=get_latest_timestamp(dlt.current.pipeline()), | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| reader = latest_files | extract_content_and_read() | ||||||||||||||||||||
| yield from reader.apply_hints( | ||||||||||||||||||||
| write_disposition="merge", | ||||||||||||||||||||
| ) | ||||||||||||||||||||
| reader = file_listing | extract_content_and_read() | ||||||||||||||||||||
| yield from reader | ||||||||||||||||||||
|
|
||||||||||||||||||||
|
|
||||||||||||||||||||
| def get_latest_timestamp(pipeline: dlt.Pipeline) -> pendulum.DateTime | None: | ||||||||||||||||||||
|
|
||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.