-
Notifications
You must be signed in to change notification settings - Fork 0
MRB-665 Support for global models and remove meteodata-lab #90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 14 commits
cd511b9
ebc5dec
acc794f
695cfcb
6f23152
e2553a1
7445145
091f75c
7197a75
80f062c
e1e0a7b
a5ee466
f3e9439
df329cc
a474436
36fd358
561130c
9c3f7dc
d0bc642
9eac3f4
ad76047
1caa848
ee3a5df
f25d0ac
79c34f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| # yaml-language-server: $schema=../workflow/tools/config.schema.json | ||
| description: | | ||
| Evaluate skill of a Stage B (global) checkpoint. | ||
|
|
||
| dates: | ||
| start: 2024-01-01T06:00 | ||
| end: 2024-02-01T00:00 | ||
| frequency: 30h | ||
|
|
||
| runs: | ||
| - forecaster: | ||
| checkpoint: https://service.meteoswiss.ch/mlstore#/experiments/602/runs/721576a388114c8788405cc6936ca13c | ||
| label: Stage B | ||
| steps: 0/120/6 | ||
| config: resources/inference/configs/global-forecaster.yaml | ||
| extra_requirements: | ||
| - git+https://github.com/ecmwf/anemoi-inference.git@b9aaee5df86614cad9d8d08b76876a4be4e980db | ||
| disable_local_eccodes_definitions: true | ||
| inference_resources: | ||
| slurm_partition: preemptible | ||
| - forecaster: | ||
| checkpoint: https://service.meteoswiss.ch/mlstore#/experiments/602/runs/878f19c35f6644d7bd91bd9374e47b91 | ||
| label: Stage B w/ subgrid | ||
| steps: 0/120/6 | ||
| config: resources/inference/configs/global-forecaster.yaml | ||
| extra_requirements: | ||
| - git+https://github.com/ecmwf/anemoi-inference.git@b9aaee5df86614cad9d8d08b76876a4be4e980db | ||
| disable_local_eccodes_definitions: true | ||
| inference_resources: | ||
| slurm_partition: preemptible | ||
|
|
||
| truth: | ||
| label: ERA5 | ||
| root: /store_new/mch/msopr/ml/datasets/aifs-ea-an-oper-0001-mars-n320-1979-2024-6h-v1-for-single-v2.zarr | ||
|
|
||
| stratification: | ||
| regions: [] | ||
| root: /scratch/mch/bhendj/regions/Prognoseregionen_LV95_20220517 | ||
|
|
||
| locations: | ||
| output_root: output/ | ||
|
|
||
| profile: | ||
| executor: slurm | ||
| global_resources: | ||
| gpus: 16 | ||
| default_resources: | ||
| slurm_partition: "postproc" | ||
| cpus_per_task: 1 | ||
| mem_mb_per_cpu: 1800 | ||
| runtime: "1h" | ||
| gpus: 0 | ||
| jobs: 50 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| input: | ||
| test: | ||
| use_original_paths: true | ||
|
|
||
| allow_nans: true | ||
|
|
||
| output: | ||
| grib: | ||
| path: grib/{date}{time:04}_{step:03}.grib | ||
| negative_step_mode: skip | ||
| # templates: | ||
| # samples: resources/templates_index_global-ea.yaml | ||
|
|
||
| write_initial_state: true |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,19 +1,38 @@ | ||
| import yaml | ||
| import logging | ||
| import os | ||
| import sys | ||
| from datetime import datetime, timedelta | ||
| from pathlib import Path | ||
|
|
||
| eccodes_definition_path = Path(sys.prefix) / "share/eccodes-cosmo-resources/definitions" | ||
| os.environ["ECCODES_DEFINITION_PATH"] = str(eccodes_definition_path) | ||
|
|
||
| from meteodatalab import data_source, grib_decoder # noqa: E402 | ||
| from functools import lru_cache | ||
|
|
||
| import numpy as np # noqa: E402 | ||
| import xarray as xr # noqa: E402 | ||
| import earthkit.data as ekd # noqa: E402 | ||
|
|
||
| LOG = logging.getLogger(__name__) | ||
|
|
||
| # IFS shortNames that COSMO eccodes definitions don't remap to COSMO names. | ||
| # These need explicit aliasing so callers can use COSMO names consistently. | ||
| _IFS_TO_COSMO = { | ||
| "tp": "TOT_PREC", | ||
| "msl": "PMSL", | ||
| "10u": "U_10M", | ||
| "10v": "V_10M", | ||
| "2t": "T_2M", | ||
| "2d": "TD_2M", | ||
| "sp": "PS", | ||
| "lsm": "FR_LAND", | ||
| "z": "FIS", | ||
| } | ||
| _COSMO_TO_IFS = {v: k for k, v in _IFS_TO_COSMO.items()} | ||
|
|
||
|
|
||
| @lru_cache(maxsize=1) | ||
| def earthkit_xarray_engine_profile() -> dict: | ||
| fn = Path(__file__).parent / "profile.yaml" | ||
| with open(fn) as f: | ||
| profile = yaml.safe_load(f) | ||
| return profile | ||
|
|
||
|
|
||
| def _select_valid_times(ds, times: np.datetime64): | ||
| # (handle special case where some valid times are not in the dataset, e.g. at the end) | ||
|
|
@@ -64,7 +83,12 @@ def load_analysis_data_from_zarr( | |
| PARAMS_MAP_COSMO1 = { | ||
| v: v.replace("TOT_PREC", "TOT_PREC_6H") for v in PARAMS_MAP_COSMO2.keys() | ||
| } | ||
| PARAMS_MAP = PARAMS_MAP_COSMO2 if "co2" in root.name else PARAMS_MAP_COSMO1 | ||
| USE_IFS_NAMES = {"-co2-", "-ea-"} | ||
| PARAMS_MAP = ( | ||
| PARAMS_MAP_COSMO2 | ||
| if any(tag in root.name for tag in USE_IFS_NAMES) | ||
| else PARAMS_MAP_COSMO1 | ||
| ) | ||
|
|
||
| ds = xr.open_zarr(root, consolidated=False) | ||
|
|
||
|
|
@@ -78,7 +102,7 @@ def load_analysis_data_from_zarr( | |
| ds = ds.sel(variable=[PARAMS_MAP[p] for p in params]).squeeze("ensemble", drop=True) | ||
|
|
||
| # recover original 2D shape | ||
| if len(ds.attrs["field_shape"]) == 2: | ||
| if "field_shape" in ds.attrs and len(ds.attrs["field_shape"]) == 2: | ||
| ny, nx = ds.attrs["field_shape"] | ||
| y_idx, x_idx = np.unravel_index(np.arange(ny * nx), shape=(ny, nx)) | ||
| ds = ds.assign_coords({"y": ("cell", y_idx), "x": ("cell", x_idx)}) | ||
|
|
@@ -88,7 +112,10 @@ def load_analysis_data_from_zarr( | |
| # set lat lon as coords (optional) | ||
| if "latitudes" in ds and "longitudes" in ds: | ||
| ds = ds.rename({"latitudes": "lat", "longitudes": "lon"}) | ||
| ds = ds.set_coords(["lat", "lon"]) | ||
| if "latitude" in ds and "longitude" in ds: | ||
| ds = ds.rename({"latitude": "lat", "longitude": "lon"}) | ||
| if "lat" in ds and "lon" in ds: | ||
| ds = ds.set_coords(["lat", "lon"]) | ||
| ds = ( | ||
| ds["data"] | ||
| .to_dataset("variable") | ||
|
|
@@ -108,9 +135,33 @@ def load_fct_data_from_grib( | |
| ) -> xr.Dataset: | ||
| """Load forecast data from GRIB files for a specific valid time.""" | ||
| files = sorted(root.glob(f"{reftime:%Y%m%d%H%M}*.grib")) | ||
| fds = data_source.FileDataSource(datafiles=files) | ||
| ds = grib_decoder.load(fds, {"param": params, "step": steps}) | ||
| for var, da in ds.items(): | ||
|
|
||
| profile = earthkit_xarray_engine_profile() | ||
| LOG.debug(f"loading GRIB for params {params} and steps {steps} from {root}") | ||
| # Extend param selection to include IFS aliases (e.g. "tp" for "TOT_PREC") so | ||
| # that both COSMO-named and IFS-named GRIB files (global models) are handled. | ||
| params_sel = list({p for p in params} | {_COSMO_TO_IFS[p] for p in params if p in _COSMO_TO_IFS}) | ||
| # Precipitation params don't have a step=0 field (accumulation is zero at | ||
| # analysis time and is often not written), so loading them together with | ||
| # other variables causes an inconsistent-step error in earthkit-data. | ||
| # Load them separately with step>0, then merge. | ||
| _PREC_PARAMS = {"tp", "TOT_PREC"} | ||
| prec_params = [p for p in params_sel if p in _PREC_PARAMS] | ||
| other_params = [p for p in params_sel if p not in _PREC_PARAMS] | ||
| fieldlist = ekd.from_source("file", files) | ||
| datasets = [] | ||
| if other_params: | ||
| datasets.append(fieldlist.sel(param=other_params, step=steps).to_xarray(profile=profile)) | ||
| if prec_params: | ||
| prec_steps = [s for s in steps if s > 0] | ||
| datasets.append(fieldlist.sel(param=prec_params, step=prec_steps).to_xarray(profile=profile)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in That one looks slow to me, but I haven't profiled.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The above approach has the advantage that you do not hard-code expectations in, in the sense that if there was a precipitation field at time 0 (with zero or NA in it), then you could still read it. |
||
| ds: xr.Dataset = xr.merge(datasets, join="outer") if len(datasets) > 1 else datasets[0] | ||
| # Rename any IFS names back to COSMO names | ||
| ifs_rename = {ifs: cosmo for ifs, cosmo in _IFS_TO_COSMO.items() if ifs in ds.data_vars} | ||
| if ifs_rename: | ||
| ds = ds.rename(ifs_rename) | ||
|
|
||
| for var, da in ds.data_vars.items(): | ||
| if "z" in da.dims and da.sizes["z"] == 1: | ||
| ds[var] = da.squeeze("z", drop=True) | ||
| elif "z" in da.dims and da.sizes["z"] > 1: | ||
|
|
@@ -126,6 +177,14 @@ def load_fct_data_from_grib( | |
| .clip(min=0.0) | ||
| ) | ||
| ) | ||
| # set lat lon as coords (optional) | ||
| if "latitudes" in ds and "longitudes" in ds: | ||
| ds = ds.rename({"latitudes": "lat", "longitudes": "lon"}) | ||
| if "latitude" in ds and "longitude" in ds: | ||
| ds = ds.rename({"latitude": "lat", "longitude": "lon"}) | ||
| if "lat" in ds and "lon" in ds: | ||
| ds = ds.set_coords(["lat", "lon"]) | ||
|
|
||
| # make sure time coordinate is available, and valid_time is not | ||
| if "valid_time" in ds.coords: | ||
| ds = ds.rename({"valid_time": "time"}) | ||
|
|
@@ -234,7 +293,7 @@ def load_truth_data( | |
| ) -> xr.Dataset: | ||
| """Load truth data from analysis Zarr or PeakWeather observations.""" | ||
| if root.suffix == ".zarr": | ||
| LOG.info("Loading ground truth from an analysis zarr dataset...") | ||
| LOG.info(f"Loading ground truth from {root}") | ||
| truth = load_analysis_data_from_zarr( | ||
| root=root, | ||
| reftime=reftime, | ||
|
|
@@ -247,7 +306,7 @@ def load_truth_data( | |
| else {"values": -1} | ||
| ) | ||
| elif "peakweather" in str(root): | ||
| LOG.info("Loading ground truth from PeakWeather observations...") | ||
| LOG.info(f"Loading ground truth from {root}") | ||
| truth = load_obs_data_from_peakweather( | ||
| root=root, | ||
| reftime=reftime, | ||
|
|
@@ -265,15 +324,15 @@ def load_forecast_data( | |
| """Load forecast data from GRIB files or a baseline Zarr dataset.""" | ||
|
|
||
| if any(root.glob("*.grib")): | ||
| LOG.info("Loading forecasts from GRIB files...") | ||
| LOG.info(f"Loading forecasts from GRIB files from {root}") | ||
| fcst = load_fct_data_from_grib( | ||
| root=root, | ||
| reftime=reftime, | ||
| steps=steps, | ||
| params=params, | ||
| ) | ||
| else: | ||
| LOG.info("Loading baseline forecasts from zarr dataset...") | ||
| LOG.info(f"Loading baseline forecasts from zarr dataset from {root}") | ||
| fcst = load_baseline_from_zarr( | ||
| root=root, | ||
| reftime=reftime, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| ensure_dims: [z, number, step, forecast_reference_time] | ||
| rename_dims: | ||
| { level: z, number: eps, step: lead_time, forecast_reference_time: ref_time } | ||
| variable_attrs: | ||
| - cfName | ||
| - name | ||
| - units | ||
| - typeOfLevel | ||
| - levtype | ||
| - paramId | ||
|
|
||
| global_attrs: | ||
| - Conventions: CF-1.8 | ||
| - institution: MeteoSwiss | ||
| add_valid_time_coord: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this was previously handled by meteodata-lab?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this didn't occur previously because we haven't read from the global files at all. All our lam cutouts use COSMO names.