diff --git a/infra/ansible/group_vars/all/all.yml b/infra/ansible/group_vars/all/all.yml index 75b12030..4193f1f1 100644 --- a/infra/ansible/group_vars/all/all.yml +++ b/infra/ansible/group_vars/all/all.yml @@ -33,7 +33,7 @@ keycloak_realm: display_name: "ISIS Analytics Data Platform" keycloak_realm_url: "{{ keycloak_url }}/realms/{{ keycloak_realm.name }}" keycloak_bootstrap: - admin_user: "{{ secrets_keycloak['bootstrap_admin_user'] }}" + admin_user: "temp-admin" admin_password: "{{ secrets_keycloak['bootstrap_admin_password'] }}" lakekeeper_base_path: /iceberg diff --git a/infra/ansible/group_vars/keycloak.yml b/infra/ansible/group_vars/keycloak.yml index 1650d3cc..15c02401 100644 --- a/infra/ansible/group_vars/keycloak.yml +++ b/infra/ansible/group_vars/keycloak.yml @@ -5,6 +5,10 @@ keycloak_db_name: "{{ secrets_keycloak['db_name'] }}" keycloak_db_user: "{{ secrets_keycloak['db_user'] }}" keycloak_db_password: "{{ secrets_keycloak['db_password'] }}" +keycloak_local_admin: + user: "{{ secrets_keycloak['master_local_admin_user'] }}" + password: "{{ secrets_keycloak['master_local_admin_password'] }}" + # Eveything is assigned to the realm defined by keycloak_realm.name in all/all.yml keycloak_client_scopes: - name: lakekeeper diff --git a/infra/ansible/roles/keycloak/tasks/main.yml b/infra/ansible/roles/keycloak/tasks/main.yml index 21c50ca6..e80fdd59 100644 --- a/infra/ansible/roles/keycloak/tasks/main.yml +++ b/infra/ansible/roles/keycloak/tasks/main.yml @@ -44,6 +44,24 @@ path: "{{ keycloak_working_dir }}" rebuild: always +- name: Check if we need to bootstrap an admin + no_log: "{{ not (keycloak_bootstrap_logging | default(false)) }}" + ansible.builtin.uri: + url: "{{ keycloak_url }}/realms/master/protocol/openid-connect/token" + method: POST + body_format: form-urlencoded + body: + client_id: "admin-cli" + username: "{{ keycloak_local_admin.user }}" + password: "{{ keycloak_local_admin.password }}" + grant_type: "password" + ignore_errors: true + register: keycloak_token_response + +- name: Set admin required fact + ansible.builtin.set_fact: + local_admin_user_exists: "{{ keycloak_token_response['status'] == 200 }}" + - name: Bootstrap Keycloak admin no_log: "{{ not (keycloak_bootstrap_logging | default(false)) }}" community.docker.docker_container: @@ -53,7 +71,6 @@ [ "bootstrap-admin", "user", - "--username={{ keycloak_bootstrap.admin_user }}", "--password:env=KC_BOOTSTRAP_ADMIN_PASSWORD", "--optimized", "--no-prompt", @@ -69,6 +86,7 @@ KC_DB_URL: "jdbc:postgresql://{{ keycloak_db_host }}:{{ keycloak_db_port }}/{{ keycloak_db_name }}" KC_DB_USERNAME: "{{ keycloak_db_user }}" KC_DB_PASSWORD: "{{ keycloak_db_password }}" + when: not local_admin_user_exists - name: Run Keycloak community.docker.docker_container: @@ -105,6 +123,54 @@ timeout: 2s retries: 15 -- ansible.builtin.import_tasks: setup-realm.yml +# Configure master realm admin +- name: Create permanent admin user + no_log: true + community.general.keycloak_user: + auth_keycloak_url: "{{ keycloak_url }}" + auth_realm: master + auth_username: "{{ keycloak_bootstrap.admin_user }}" + auth_password: "{{ keycloak_bootstrap.admin_password }}" + realm: master + username: "{{ keycloak_local_admin.user }}" + enabled: true + emailVerified: true + credentials: + - type: password + value: "{{ keycloak_local_admin.password }}" + temporary: false + state: present + register: kc_new_admin + when: not local_admin_user_exists + +- name: Assign admin realm role to permanent user + no_log: "{{ not (keycloak_bootstrap_logging | default(false)) }}" + community.general.keycloak_user_rolemapping: + auth_keycloak_url: "{{ keycloak_url }}" + auth_realm: master + auth_username: "{{ keycloak_bootstrap.admin_user }}" + auth_password: "{{ keycloak_bootstrap.admin_password }}" + realm: master + uid: "{{ kc_new_admin.end_state.id }}" + roles: + - name: "admin" + state: present + when: not local_admin_user_exists + +- name: Disable temp-admin bootstrap account + no_log: "{{ not (keycloak_bootstrap_logging | default(false)) }}" + community.general.keycloak_user: + auth_keycloak_url: "{{ keycloak_url }}" + auth_realm: "master" + auth_username: "{{ keycloak_local_admin.user }}" + auth_password: "{{ keycloak_local_admin.password }}" + realm: master + username: "{{ keycloak_bootstrap.admin_user }}" + enabled: false + state: present + when: not local_admin_user_exists + +# Configure custom realm +- ansible.builtin.import_tasks: setup-target-realm.yml vars: target_realm: "{{ keycloak_realm.name }}" diff --git a/infra/ansible/roles/keycloak/tasks/setup-ldap.yml b/infra/ansible/roles/keycloak/tasks/setup-ldap.yml index 03307f14..77e2f9d4 100644 --- a/infra/ansible/roles/keycloak/tasks/setup-ldap.yml +++ b/infra/ansible/roles/keycloak/tasks/setup-ldap.yml @@ -4,8 +4,8 @@ auth_client_id: admin-cli auth_keycloak_url: "{{ keycloak_url }}" auth_realm: master - auth_username: "{{ keycloak_bootstrap.admin_user }}" - auth_password: "{{ keycloak_bootstrap.admin_password }}" + auth_username: "{{ keycloak_local_admin.user }}" + auth_password: "{{ keycloak_local_admin.password }}" realm: "{{ target_realm }}" name: "STFC LDAP" state: present diff --git a/infra/ansible/roles/keycloak/tasks/setup-realm.yml b/infra/ansible/roles/keycloak/tasks/setup-target-realm.yml similarity index 95% rename from infra/ansible/roles/keycloak/tasks/setup-realm.yml rename to infra/ansible/roles/keycloak/tasks/setup-target-realm.yml index 4f9bcbdb..b29a7ce1 100644 --- a/infra/ansible/roles/keycloak/tasks/setup-realm.yml +++ b/infra/ansible/roles/keycloak/tasks/setup-target-realm.yml @@ -6,8 +6,8 @@ auth_client_id: admin-cli auth_keycloak_url: "{{ keycloak_url }}" auth_realm: master - auth_username: "{{ keycloak_bootstrap.admin_user }}" - auth_password: "{{ keycloak_bootstrap.admin_password }}" + auth_username: "{{ keycloak_local_admin.user }}" + auth_password: "{{ keycloak_local_admin.password }}" when: false - name: Create Keycloak realm diff --git a/warehouses/facility_ops/transform/models/staging/estates/stg_electricity_sharepoint_rdm_data.sql b/warehouses/facility_ops/transform/models/staging/estates/stg_electricity_sharepoint_rdm_data.sql index 38234ad4..0bab0a59 100644 --- a/warehouses/facility_ops/transform/models/staging/estates/stg_electricity_sharepoint_rdm_data.sql +++ b/warehouses/facility_ops/transform/models/staging/estates/stg_electricity_sharepoint_rdm_data.sql @@ -9,7 +9,7 @@ renamed as ( select date_time as power_measured_at, - isis_elec_total_power_mwx as total_isis_power_mw + isis_elec_total_power_mw as total_isis_power_mw from source diff --git a/warehouses/facility_ops_landing/ingest/estates/electricity_sharepoint/electricity_sharepoint.py b/warehouses/facility_ops_landing/ingest/estates/electricity_sharepoint/electricity_sharepoint.py index 3e761806..f9369d8a 100644 --- a/warehouses/facility_ops_landing/ingest/estates/electricity_sharepoint/electricity_sharepoint.py +++ b/warehouses/facility_ops_landing/ingest/estates/electricity_sharepoint/electricity_sharepoint.py @@ -1,7 +1,7 @@ # /// script # requires-python = ">=3.13" # dependencies = [ -# "pandas", +# "pandas>=3", # "elt-common[m365]", # "python-calamine", # ] @@ -17,23 +17,24 @@ import dlt import dlt.common.logger as logger -from dlt.sources import TDataItems -import pendulum import pandas as pd +import pendulum import pyarrow.compute as pc -import pytz - +from dlt.sources import TDataItems from elt_common.cli_utils import cli_main -from elt_common.dlt_sources.m365 import ( - sharepoint, - M365DriveItem, -) from elt_common.dlt_destinations.pyiceberg.helpers import load_iceberg_table from elt_common.dlt_destinations.pyiceberg.pyiceberg_adapter import ( - pyiceberg_adapter, PartitionTrBuilder, + pyiceberg_adapter, +) +from elt_common.dlt_sources.m365 import ( + M365DriveItem, + sharepoint, ) +CSV_PREAMBLE_ANCHOR = "time" +COL_DATE_TIME = "date_time" +COL_TOTAL_POWER = "isis_elec_total_power_mw" EXCEL_ENGINE = "calamine" EXCEL_SKIP_ROWS = 7 MAX_WORKERS = min(8, (os.cpu_count() or 1) + 4) @@ -41,19 +42,41 @@ PIPELINE_NAME = "electricity_sharepoint" RDM_TIMEZONE = "Europe/London" -# File information -# The file can contain a record from a single export or multiple records combined. Each record starts with -# a header: - +# Source details +# +# There are 3 known formats for the data. Each contains a preamble of the form: +# # Site Information: # RAL ISIS RDM # RAL ISIS RDM # Controller: ISIS # Controller description: ISIS Energy Totals # Status: Online - -# Time Date ISIS Elec Total Power {MW} -CSV_HEADER_ANCHOR = "time" +# +# This preamble is discarded from all formats. +# +# 1. Excel (old, manual export format) +# ------------------------------------ +# +# These were produced by manual export from the original system. +# +# There are two columns: "Time,Total Power (MW)". Time is in the format "YYYY-mm-DDTHH:MM:SS". +# +# 2. Automated CSV export +# ----------------------- +# +# An automation deposits .csv files at regular intervals. +# +# There are three columns: "Time,Date,ISIS Elec Total Power", where Time is in format HH:MM:SS and Date is in format DD/mm/YY. +# Several of these files can be concatenated together to form larger timespan records - they are concatenated as is and repeated +# preamble sections are not discarded. The variable CSV_PREAMBLE_ANCHOR defines the line at which a new section occurs. +# +# 3. Manual CSV export +# -------------------- +# +# These were produced more recently by manual export from the original system. +# There are three columns: "Time,ISIS Elec Total Energy,ISIS Elec Total Power", where Time is in format "DD/mm/YY HH:MM:SS". +# The second column is discarded. def to_utc(ts: pd.Series) -> pd.Series: @@ -63,29 +86,45 @@ def to_utc(ts: pd.Series) -> pd.Series: def csv_section_to_df(file_name: str, lines: Sequence[str]) -> pd.DataFrame | None: """Parse csv and return a DataFrame if the times are valid. None if not""" - df = pd.read_csv(io.StringIO("\n".join(lines))) + df_raw = pd.read_csv(io.StringIO("\n".join(lines))) # clean up column name (strip any whitespace) - df.columns = df.columns.str.strip() + df_raw.columns = df_raw.columns.str.strip() + cols = [c for c in df_raw.columns] + assert len(cols) == 3 try: - df["DateTime"] = to_utc( - pd.to_datetime(df["Date"] + " " + df["Time"], format="%d/%m/%y %H:%M:%S") # type: ignore - ) - return df.drop(["Date", "Time"], axis=1) - except (pytz.exceptions.AmbiguousTimeError, ValueError) as exc: - logger.warning( - f"'Error loading section of {file_name}'. There will be gaps in the data.\nDetails: {str(exc)}" - ) - return None + if cols[1].strip() == "Date": + # Automated CSV format + df = to_utc( + pd.to_datetime( + df_raw["Date"] + " " + df_raw["Time"], format="%d/%m/%y %H:%M:%S" + ) # type: ignore + ).to_frame(name=COL_DATE_TIME) + else: + # Manual CSV format + df = to_utc( + pd.to_datetime(df_raw["Time"], format="%d/%m/%y %H:%M:%S") + ).to_frame(name=COL_DATE_TIME) + except ValueError as exc: + # Pandas 3 uses ValueError for conversions that produce ambiguous/non-existent times + msg = str(exc) + if "ambiguous" in msg or "nonexistent" in msg: + logger.warning( + f"'Error loading section of {file_name}'. DST issues detected: {str(exc)}" + ) + return None + else: + raise + + assert "power" in cols[2].lower() + df[COL_TOTAL_POWER] = df_raw[cols[2]] + return df def read_power_consumption_csv( file_content: io.BytesIO, file_name: str, ) -> pd.DataFrame | None: - """Read csv-formatted power consumption records. This can return None if there is not data in the file. - - The Date & Time columns together to create a single DateTime column. - """ + # See comment at the top of this describing the format def _append_if_not_none(seq, df): if df is not None: @@ -97,7 +136,7 @@ def _append_if_not_none(seq, df): line = line.strip() line_lower = line.lower() - if line_lower.startswith(CSV_HEADER_ANCHOR): + if line_lower.startswith(CSV_PREAMBLE_ANCHOR): # Save any previous section if current_lines: _append_if_not_none( @@ -127,15 +166,11 @@ def _append_if_not_none(seq, df): def read_power_consumption_excel(file_content: io.BytesIO) -> pd.DataFrame: - """Read an excel-formatted power consumption record. - - Expected columns: Time, Total Power (MW) - The Time column is renamed DateTime for consistency. - """ - df = pd.read_excel(file_content, engine=EXCEL_ENGINE, skiprows=EXCEL_SKIP_ROWS) - df = df.rename(columns={"Time": "DateTime"}) - df["DateTime"] = to_utc(df["DateTime"]) - return df + # See comment at the top of this describing the format + df_raw = pd.read_excel(file_content, engine=EXCEL_ENGINE, skiprows=EXCEL_SKIP_ROWS) + df_raw = df_raw.rename(columns={"Time": COL_DATE_TIME}) + df_raw[COL_DATE_TIME] = to_utc(df_raw[COL_DATE_TIME]) # type: ignore + return df_raw @dlt.transformer(section="m365") @@ -164,6 +199,8 @@ def read_as_dataframe(file_obj: M365DriveItem) -> pd.DataFrame | None: case _: raise RuntimeError(f"Unsupported file extension in '{file_name}'") + if df is not None: + df["file_name"] = file_name return df df_batch = None @@ -179,42 +216,39 @@ def read_as_dataframe(file_obj: M365DriveItem) -> pd.DataFrame | None: yield df_batch -@dlt.resource(merge_key="DateTime", columns={"DateTime": {"nullable": False}}) +@dlt.resource( + primary_key="DateTime", + write_disposition={"disposition": "merge", "strategy": "upsert"}, +) def rdm_data( site_url: str = dlt.config.value, root_dir: str = dlt.config.value, backfill: bool = False, + backfill_glob: str | None = None, ) -> Iterator[TDataItems]: if backfill: - historic_xl = sharepoint( - site_url=site_url, - file_glob=f"{root_dir}/**/*.xlsx", - extract_content=False, - ) - reader = historic_xl | extract_content_and_read() - yield from reader.apply_hints( - write_disposition="merge", - ) - historic_csv = sharepoint( + if backfill_glob is not None: + file_globs = [backfill_glob] + else: + file_globs = [ + f"{root_dir}/**/*.xlsx", + f"{root_dir}/**/*-daily.csv", + f"{root_dir}/**/*-manual-export.csv", + ] + modified_after = None + else: + file_globs = [f"{root_dir}/*-ISIS.csv"] + modified_after = get_latest_timestamp(dlt.current.pipeline()) + + for file_glob in file_globs: + file_listing = sharepoint( site_url=site_url, - file_glob=f"{root_dir}/**/*-daily.csv", + file_glob=file_glob, extract_content=False, + modified_after=modified_after, ) - reader = historic_csv | extract_content_and_read() - yield from reader.apply_hints( - write_disposition="merge", - ) - - latest_files = sharepoint( - site_url=site_url, - file_glob=f"{root_dir}/*-ISIS.csv", - extract_content=False, - modified_after=get_latest_timestamp(dlt.current.pipeline()), - ) - reader = latest_files | extract_content_and_read() - yield from reader.apply_hints( - write_disposition="merge", - ) + reader = file_listing | extract_content_and_read() + yield from reader def get_latest_timestamp(pipeline: dlt.Pipeline) -> pendulum.DateTime | None: