diff --git a/src/data_input/__init__.py b/src/data_input/__init__.py index 403683d9..c6b12d7b 100644 --- a/src/data_input/__init__.py +++ b/src/data_input/__init__.py @@ -367,7 +367,6 @@ def load_INCA_baseline_from_netcdf( non-hourly timestamps. fill_missing_files: If True, missing files are filled with NaN arrays instead of raising. Defaults to True. - Returns: xr.Dataset with dimensions (time, y, x) and coordinates: x, y – Swiss CH1903 (EPSG:21781) easting/northing [m] @@ -416,6 +415,84 @@ def _nan_array(units: str) -> xr.DataArray: attrs={"units": units}, ) + def _open_convert(rt: datetime, pfx: str) -> tuple[Path, xr.DataArray | None]: + """Open an INCA file and apply unit conversion. + + Returns (path, DataArray) on success, (path, None) when the file is missing. + """ + fp = ( + root + / f"{rt.year:04d}" + / f"{rt.month:02d}" + / f"{pfx}_INCA_{rt:%Y%m%d%H%M}.nc" + ) + try: + d = xr.open_dataset(fp, drop_variables=["grid_mapping"]).rename( + {"chx": "x", "chy": "y"} + ) + except FileNotFoundError: + return fp, None + LOG.info("Reading %s", fp) + da = d[pfx] + u = da.attrs.get("units", "") + if u == "degrees C": + da = (da - ZERO_KELVIN).assign_attrs({**da.attrs, "units": "K"}) + elif u == "mm/h": + da = da.assign_attrs({**da.attrs, "units": "kg m-2"}) + return fp, da + + def _load_shifted(param: str, prefix: str) -> xr.DataArray: + """Load T_2M / TD_2M working around the INCA full-hour bug. + + For reftimes at full hours (HH:00), steps 1-N in the current run file are + affected by a known INCA bug. Step 0 is taken from the current reftime; + steps 1+ are taken from the run 10 min earlier (HH-1:50), which is unaffected. + """ + prev_rf = reftime - timedelta(minutes=10) + LOG.info( + "Applying INCA shifted-run workaround for %s: step 0 from %s, steps 1+ from %s", + param, + reftime, + prev_rf, + ) + parts: list[xr.DataArray] = [] + + # Step 0 from current reftime file + zero_idx = [i for i, s in enumerate(steps) if s == 0] + if zero_idx: + fp, da_raw = _open_convert(reftime, prefix) + if da_raw is None: + if not fill_missing_files: + raise FileNotFoundError( + f"INCA file not found for parameter {param!r}: {fp}" + ) + LOG.warning("INCA file not found, filling %s with NaN: %s", param, fp) + parts.append(_nan_array(PARAM_UNITS[param]).isel(time=zero_idx)) + else: + parts.append( + da_raw.isel(time=zero_idx).assign_coords(time=valid_times[zero_idx]) + ) + + # Steps 1+ from previous reftime file (positional index = step value) + nz_idx = [i for i, s in enumerate(steps) if s != 0] + if nz_idx: + nz_steps = [steps[i] for i in nz_idx] + fp, da_raw = _open_convert(prev_rf, prefix) + if da_raw is None: + if not fill_missing_files: + raise FileNotFoundError( + f"INCA file not found for parameter {param!r}: {fp}" + ) + LOG.warning("INCA file not found, filling %s with NaN: %s", param, fp) + parts.append(_nan_array(PARAM_UNITS[param]).isel(time=nz_idx)) + else: + parts.append( + da_raw.isel(time=nz_steps).assign_coords(time=valid_times[nz_idx]) + ) + + da = xr.concat(parts, dim="time") if len(parts) > 1 else parts[0] + return da.rename(param) + # Maps output variable name -> INCA file prefix, per freq. # File prefix == variable name inside the NetCDF file. PARAM_TO_PREFIX: dict[str, dict[str, str]] = { @@ -494,9 +571,17 @@ def _nan_array(units: str) -> xr.DataArray: datasets[param] = _nan_array("unknown").rename(param) continue - # load parameter by parameter + # T_2M and TD_2M are affected by a known INCA bug at full-hour reftimes (steps 1-6). + # Use the run from 10 min earlier for those steps; see _load_shifted(). + _SHIFTED_PARAMS = {"T_2M", "TD_2M"} + for param in to_load: prefix = prefix_map[param] + + if param in _SHIFTED_PARAMS and freq == "1h": + datasets[param] = _load_shifted(param, prefix) + continue + filepath = ( root / f"{reftime.year:04d}" @@ -514,6 +599,7 @@ def _nan_array(units: str) -> xr.DataArray: LOG.warning("INCA file not found, filling %s with NaN: %s", param, filepath) datasets[param] = _nan_array(PARAM_UNITS[param]).rename(param) continue + LOG.info("Reading %s", filepath) # Convert units if necessary da = ds_var[prefix] units = da.attrs.get("units", "") @@ -525,7 +611,7 @@ def _nan_array(units: str) -> xr.DataArray: # at timestamps absent from their native resolution datasets[param] = da.rename(param).reindex(time=valid_times) - merged = xr.merge(list(datasets.values()), join="override") + merged = xr.merge(list(datasets.values()), join="override", compat="override") # Add lat/lon derived from the x/y coordinates in the loaded NetCDF files merged = merged.assign_coords(**_chxy_to_latlon(merged.x.values, merged.y.values))