Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 89 additions & 3 deletions src/data_input/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ def load_INCA_baseline_from_netcdf(
non-hourly timestamps.
fill_missing_files: If True, missing files are filled with NaN arrays instead
of raising. Defaults to True.

Returns:
xr.Dataset with dimensions (time, y, x) and coordinates:
x, y – Swiss CH1903 (EPSG:21781) easting/northing [m]
Expand Down Expand Up @@ -416,6 +415,84 @@ def _nan_array(units: str) -> xr.DataArray:
attrs={"units": units},
)

def _open_convert(rt: datetime, pfx: str) -> tuple[Path, xr.DataArray | None]:
"""Open an INCA file and apply unit conversion.

Returns (path, DataArray) on success, (path, None) when the file is missing.
"""
fp = (
root
/ f"{rt.year:04d}"
/ f"{rt.month:02d}"
/ f"{pfx}_INCA_{rt:%Y%m%d%H%M}.nc"
)
try:
d = xr.open_dataset(fp, drop_variables=["grid_mapping"]).rename(
{"chx": "x", "chy": "y"}
)
except FileNotFoundError:
return fp, None
LOG.info("Reading %s", fp)
da = d[pfx]
u = da.attrs.get("units", "")
if u == "degrees C":
da = (da - ZERO_KELVIN).assign_attrs({**da.attrs, "units": "K"})
elif u == "mm/h":
da = da.assign_attrs({**da.attrs, "units": "kg m-2"})
Comment on lines +440 to +441
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct? We are converting from a rate to an accumulation. This particular conversion only works if values are for hourly accumulations. Is that always the case?

return fp, da

def _load_shifted(param: str, prefix: str) -> xr.DataArray:
"""Load T_2M / TD_2M working around the INCA full-hour bug.

For reftimes at full hours (HH:00), steps 1-N in the current run file are
affected by a known INCA bug. Step 0 is taken from the current reftime;
steps 1+ are taken from the run 10 min earlier (HH-1:50), which is unaffected.
"""
prev_rf = reftime - timedelta(minutes=10)
LOG.info(
"Applying INCA shifted-run workaround for %s: step 0 from %s, steps 1+ from %s",
param,
reftime,
prev_rf,
)
parts: list[xr.DataArray] = []

# Step 0 from current reftime file
zero_idx = [i for i, s in enumerate(steps) if s == 0]
if zero_idx:
fp, da_raw = _open_convert(reftime, prefix)
if da_raw is None:
if not fill_missing_files:
raise FileNotFoundError(
f"INCA file not found for parameter {param!r}: {fp}"
)
LOG.warning("INCA file not found, filling %s with NaN: %s", param, fp)
parts.append(_nan_array(PARAM_UNITS[param]).isel(time=zero_idx))
else:
parts.append(
da_raw.isel(time=zero_idx).assign_coords(time=valid_times[zero_idx])
)

# Steps 1+ from previous reftime file (positional index = step value)
nz_idx = [i for i, s in enumerate(steps) if s != 0]
if nz_idx:
nz_steps = [steps[i] for i in nz_idx]
fp, da_raw = _open_convert(prev_rf, prefix)
if da_raw is None:
if not fill_missing_files:
raise FileNotFoundError(
f"INCA file not found for parameter {param!r}: {fp}"
)
LOG.warning("INCA file not found, filling %s with NaN: %s", param, fp)
parts.append(_nan_array(PARAM_UNITS[param]).isel(time=nz_idx))
else:
parts.append(
da_raw.isel(time=nz_steps).assign_coords(time=valid_times[nz_idx])
)

da = xr.concat(parts, dim="time") if len(parts) > 1 else parts[0]
return da.rename(param)

# Maps output variable name -> INCA file prefix, per freq.
# File prefix == variable name inside the NetCDF file.
PARAM_TO_PREFIX: dict[str, dict[str, str]] = {
Expand Down Expand Up @@ -494,9 +571,17 @@ def _nan_array(units: str) -> xr.DataArray:
datasets[param] = _nan_array("unknown").rename(param)
continue

# load parameter by parameter
# T_2M and TD_2M are affected by a known INCA bug at full-hour reftimes (steps 1-6).
# Use the run from 10 min earlier for those steps; see _load_shifted().
_SHIFTED_PARAMS = {"T_2M", "TD_2M"}

for param in to_load:
prefix = prefix_map[param]

if param in _SHIFTED_PARAMS and freq == "1h":
datasets[param] = _load_shifted(param, prefix)
continue

filepath = (
root
/ f"{reftime.year:04d}"
Expand All @@ -514,6 +599,7 @@ def _nan_array(units: str) -> xr.DataArray:
LOG.warning("INCA file not found, filling %s with NaN: %s", param, filepath)
datasets[param] = _nan_array(PARAM_UNITS[param]).rename(param)
continue
LOG.info("Reading %s", filepath)
# Convert units if necessary
da = ds_var[prefix]
units = da.attrs.get("units", "")
Expand All @@ -525,7 +611,7 @@ def _nan_array(units: str) -> xr.DataArray:
# at timestamps absent from their native resolution
datasets[param] = da.rename(param).reindex(time=valid_times)

merged = xr.merge(list(datasets.values()), join="override")
merged = xr.merge(list(datasets.values()), join="override", compat="override")

# Add lat/lon derived from the x/y coordinates in the loaded NetCDF files
merged = merged.assign_coords(**_chxy_to_latlon(merged.x.values, merged.y.values))
Expand Down
Loading