diff --git a/ctapipe/calib/camera/calibrator.py b/ctapipe/calib/camera/calibrator.py index 0e29f7f228c..d79473f6282 100644 --- a/ctapipe/calib/camera/calibrator.py +++ b/ctapipe/calib/camera/calibrator.py @@ -123,9 +123,6 @@ def __init__( super().__init__(subarray=subarray, config=config, parent=parent, **kwargs) self.subarray = subarray - self._r1_empty_warn = False - self._dl0_empty_warn = False - self.image_extractors = {} if image_extractor is None: @@ -155,24 +152,12 @@ def __init__( def _check_r1_empty(self, waveforms): if waveforms is None: - if not self._r1_empty_warn: - warnings.warn( - "Encountered an event with no R1 data. " - "DL0 is unchanged in this circumstance." - ) - self._r1_empty_warn = True return True else: return False def _check_dl0_empty(self, waveforms): if waveforms is None: - if not self._dl0_empty_warn: - warnings.warn( - "Encountered an event with no DL0 data. " - "DL1 is unchanged in this circumstance." - ) - self._dl0_empty_warn = True return True else: return False diff --git a/ctapipe/core/telescope_component.py b/ctapipe/core/telescope_component.py index 6b49dae9227..33208323d9f 100644 --- a/ctapipe/core/telescope_component.py +++ b/ctapipe/core/telescope_component.py @@ -181,13 +181,6 @@ def attach_subarray(self, subarray): ] logger.debug(f"argument '{arg}' matched: {matched_tel_types}") - if len(matched_tel_types) == 0: - logger.warning( - "TelescopeParameter type argument '%s' did not match " - "any known telescope types", - arg, - ) - for tel_type in matched_tel_types: self._value_for_type[tel_type] = value for tel_id in subarray.get_tel_ids_for_type(tel_type): diff --git a/ctapipe/core/tool.py b/ctapipe/core/tool.py index 5d28c1585a3..cb45188621e 100644 --- a/ctapipe/core/tool.py +++ b/ctapipe/core/tool.py @@ -176,6 +176,11 @@ def main(): quiet = Bool(default_value=False).tag(config=True) overwrite = Bool(default_value=False).tag(config=True) + debug = Bool( + default_value=False, + help="If true, exceptions are not caught and converted" + " to exit codes to enable using a debugger", + ).tag(config=True) _log_formatter_cls = ColoredFormatter @@ -209,6 +214,11 @@ def __init__(self, **kwargs): {"Tool": {"overwrite": True}}, "Overwrite existing output files without asking", ), + "debug": ( + {"Tool": {"debug": True}}, + "Disable the transformation of exceptions" + " to exit codes to enable debugger usage", + ), } self.flags = {**flags, **self.flags} @@ -383,7 +393,7 @@ def finish(self): """ self.log.info("Goodbye") - def run(self, argv=None, raises=False): + def run(self, argv=None, raises=None): """Run the tool. This automatically calls `Tool.initialize`, `Tool.start` and `Tool.finish` @@ -410,6 +420,8 @@ def run(self, argv=None, raises=False): Provenance().start_activity(self.name) self.initialize(argv) + if raises is None: + raises = self.debug self.setup() self.is_setup = True diff --git a/ctapipe/io/hdf5merger.py b/ctapipe/io/hdf5merger.py index 8e97d43982b..c18263c6a62 100644 --- a/ctapipe/io/hdf5merger.py +++ b/ctapipe/io/hdf5merger.py @@ -158,6 +158,15 @@ class HDF5Merger(Component): True, help="Whether to include processing statistics in merged output" ).tag(config=True) + single_ob = traits.Bool( + False, + help=( + "If true, input files are assumed to be multiple chunks from the same" + " observation block and the ob / sb blocks will only be copied from " + " the first input file" + ), + ).tag(config=True) + def __init__(self, output_path=None, **kwargs): # enable using output_path as posarg if output_path not in {None, traits.Undefined}: @@ -202,6 +211,7 @@ def __init__(self, output_path=None, **kwargs): focal_length_choice=FocalLengthKind.EQUIVALENT, ) self.required_nodes = _get_required_nodes(self.h5file) + self._n_merged = 0 def __call__(self, other: Union[str, Path, tables.File]): """ @@ -213,7 +223,7 @@ def __call__(self, other: Union[str, Path, tables.File]): with exit_stack: # first file to be merged - if self.meta is None: + if self._n_merged == 0: self.meta = self._read_meta(other) self.data_model_version = self.meta.product.data_model_version metadata.write_to_hdf5(self.meta.to_dict(), self.h5file) @@ -229,6 +239,7 @@ def __call__(self, other: Union[str, Path, tables.File]): self.log.info( "Updated required nodes to %s", sorted(self.required_nodes) ) + self._n_merged += 1 finally: self._update_meta() @@ -272,13 +283,15 @@ def _append(self, other): # Configuration self._append_subarray(other) - config_keys = [ - "/configuration/observation/scheduling_block", - "/configuration/observation/observation_block", - ] - for key in config_keys: - if key in other.root: - self._append_table(other, other.root[key]) + # in case of "single_ob", we only copy sb/ob blocks for the first file + if not self.single_ob or self._n_merged == 0: + config_keys = [ + "/configuration/observation/scheduling_block", + "/configuration/observation/observation_block", + ] + for key in config_keys: + if key in other.root: + self._append_table(other, other.root[key]) # Simulation simulation_table_keys = [ diff --git a/ctapipe/io/pointing.py b/ctapipe/io/pointing.py new file mode 100644 index 00000000000..81f1683504f --- /dev/null +++ b/ctapipe/io/pointing.py @@ -0,0 +1,81 @@ +from typing import Any + +import astropy.units as u +import numpy as np +import tables +from scipy.interpolate import interp1d + +from ctapipe.core import Component, traits + +from .astropy_helpers import read_table + + +class PointingInterpolator(Component): + bounds_error = traits.Bool(default_value=True).tag(config=True) + extrapolate = traits.Bool(default_value=False).tag(config=True) + + def __init__(self, h5file, **kwargs): + super().__init__(**kwargs) + + if not isinstance(h5file, tables.File): + raise TypeError("h5file must be a tables.File") + self.h5file = h5file + + self.interp_options: dict[str, Any] = dict(assume_sorted=True, copy=False) + if self.bounds_error: + self.interp_options["bounds_error"] = True + elif self.extrapolate: + self.interp_options["bounds_error"] = False + self.interp_options["fill_value"] = "extrapolate" + else: + self.interp_options["bounds_error"] = False + self.interp_options["fill_value"] = np.nan + + self._alt_interpolators = {} + self._az_interpolators = {} + + def _read_pointing_table(self, tel_id): + pointing_table = read_table( + self.h5file, + f"/dl0/monitoring/telescope/pointing/tel_{tel_id:03d}", + ) + + # sort first, so it's not done twice for each interpolator + pointing_table.sort("time") + mjd = pointing_table["time"].tai.mjd + + az = pointing_table["azimuth"].quantity.to_value(u.rad) + # prepare azimuth for interpolation "unwrapping", i.e. turning 359, 1 into 359, 361 + az = np.unwrap(az) + alt = pointing_table["altitude"].quantity.to_value(u.rad) + + self._az_interpolators[tel_id] = interp1d(mjd, az, **self.interp_options) + self._alt_interpolators[tel_id] = interp1d(mjd, alt, **self.interp_options) + + def __call__(self, tel_id, time): + """ + Interpolate alt/az for given time and tel_id. + + Parameters + ---------- + tel_id : int + telescope id + time : astropy.time.Time + time for which to interpolate the pointing + + Returns + ------- + altitude : astropy.units.Quantity[deg] + interpolated altitude angle + azimuth : astropy.units.Quantity[deg] + interpolated azimuth angle + + + """ + if tel_id not in self._az_interpolators: + self._read_pointing_table(tel_id) + + mjd = time.tai.mjd + az = u.Quantity(self._az_interpolators[tel_id](mjd), u.rad, copy=False) + alt = u.Quantity(self._alt_interpolators[tel_id](mjd), u.rad, copy=False) + return alt, az diff --git a/ctapipe/io/tableloader.py b/ctapipe/io/tableloader.py index f7077969e8e..9d9834dd8fc 100644 --- a/ctapipe/io/tableloader.py +++ b/ctapipe/io/tableloader.py @@ -13,6 +13,7 @@ from astropy.utils.decorators import lazyproperty from ctapipe.instrument.optics import FocalLengthKind +from ctapipe.io.pointing import PointingInterpolator from ..core import Component, Provenance, traits from ..instrument import SubarrayDescription @@ -31,6 +32,7 @@ SIMULATION_CONFIG_TABLE = "/configuration/simulation/run" SHOWER_DISTRIBUTION_TABLE = "/simulation/service/shower_distribution" OBSERVATION_TABLE = "/configuration/observation/observation_block" +POINTING_GROUP = "/dl0/monitoring/telescope/pointing" DL2_SUBARRAY_GROUP = "/dl2/event/subarray" DL2_TELESCOPE_GROUP = "/dl2/event/telescope" @@ -177,9 +179,11 @@ class TableLoader(Component): ).tag(config=True) load_dl1_images = traits.Bool(False, help="load extracted images").tag(config=True) + load_dl1_parameters = traits.Bool( True, help="load reconstructed image parameters" ).tag(config=True) + load_dl1_muons = traits.Bool(False, help="load muon ring parameters").tag( config=True ) @@ -206,6 +210,11 @@ class TableLoader(Component): False, help="join observation information to each event" ).tag(config=True) + interpolate_pointing = traits.Bool( + False, + help="If True, load monitoring pointing information and interpolate for each telescope event", + ).tag(config=True) + focal_length_choice = traits.UseEnum( FocalLengthKind, default_value=FocalLengthKind.EFFECTIVE, @@ -250,6 +259,13 @@ def __init__(self, input_url=None, h5file=None, **kwargs): self.instrument_table = None if self.load_instrument: self.instrument_table = self.subarray.to_table("joined") + # to prevent merging meta data onto event tables + self.instrument_table.meta.clear() + + self._pointing_interpolator = PointingInterpolator( + h5file=self.h5file, + parent=self, + ) groups = { "load_dl1_parameters": PARAMETERS_GROUP, @@ -529,6 +545,11 @@ def _read_telescope_events_for_id(self, tel_id, start=None, stop=None): ) table = _join_telescope_events(table, impacts) + if self.interpolate_pointing and POINTING_GROUP in self.h5file.root: + alt, az = self._pointing_interpolator(tel_id, table["time"]) + table["telescope_pointing_altitude"] = alt + table["telescope_pointing_azimuth"] = az + return table def _read_telescope_events_for_ids(self, tel_ids, start=None, stop=None): diff --git a/ctapipe/io/tests/test_table_loader.py b/ctapipe/io/tests/test_table_loader.py index 864a53ba9f7..8f066c6d02c 100644 --- a/ctapipe/io/tests/test_table_loader.py +++ b/ctapipe/io/tests/test_table_loader.py @@ -1,3 +1,5 @@ +import shutil + import astropy.units as u import numpy as np import pytest @@ -436,3 +438,37 @@ def test_order_merged(): for tel, table in tables.items(): mask = np.isin(tel_trigger["tel_id"], loader.subarray.get_tel_ids(tel)) check_equal_array_event_order(table, tel_trigger[mask]) + + +def test_interpolate_pointing(dl1_file, tmp_path): + from ctapipe.io import TableLoader, write_table + + path = tmp_path / dl1_file.name + shutil.copy(dl1_file, path) + + with TableLoader(dl1_file) as loader: + events = loader.read_telescope_events() + subarray = loader.subarray + + # create some dummy monitoring data + time = events["time"] + start, stop = time[[0, -1]] + duration = (stop - start).to_value(u.s) + + # start a bit before, go a bit longer + dt = np.arange(-1, duration + 2, 1) * u.s + time_mon = start + dt + + alt = (69 + 2 * dt / dt[-1]) * u.deg + az = (180 + 5 * dt / dt[-1]) * u.deg + + table = Table({"time": time_mon, "azimuth": az, "altitude": alt}) + + for tel_id in subarray.tel: + write_table(table, path, f"/dl0/monitoring/telescope/pointing/tel_{tel_id:03d}") + + with TableLoader(path, interpolate_pointing=True) as loader: + events = loader.read_telescope_events([4]) + assert len(events) > 0 + assert "telescope_pointing_azimuth" in events.colnames + assert "telescope_pointing_altitude" in events.colnames diff --git a/ctapipe/reco/sklearn.py b/ctapipe/reco/sklearn.py index 1ae0bc78158..7b1d305aae3 100644 --- a/ctapipe/reco/sklearn.py +++ b/ctapipe/reco/sklearn.py @@ -103,6 +103,10 @@ class SKLearnReconstructor(Reconstructor): help="If given, load serialized model from this path", ).tag(config=True) + def __setstate__(self, state): + state["_models"] = {str(k): v for k, v in state["_models"].items()} + return super().__setstate__(state) + def __init__(self, subarray=None, models=None, **kwargs): # Run the Component __init__ first to handle the configuration # and make `self.load_path` available @@ -149,6 +153,7 @@ def __init__(self, subarray=None, models=None, **kwargs): ) self.__dict__.update(loaded.__dict__) self.subarray = subarray + self._models = {str(key): model for key, model in self._models.items()} if self.prefix is None: self.prefix = self.model_cls @@ -241,6 +246,7 @@ class SKLearnRegressionReconstructor(SKLearnReconstructor): ).tag(config=True) def _predict(self, key, table): + key = str(key) if key not in self._models: raise KeyError( f"No model available for key {key}," @@ -305,6 +311,7 @@ class SKLearnClassificationReconstructor(SKLearnReconstructor): ).tag(config=True) def _predict(self, key, table): + key = str(key) if key not in self._models: raise KeyError( f"No model available for key {key}," @@ -326,6 +333,7 @@ def _predict(self, key, table): return prediction, valid def _predict_score(self, key, table): + key = str(key) if key not in self._models: raise KeyError( f"No model available for key {key}," @@ -352,6 +360,7 @@ def _predict_score(self, key, table): return scores, valid def _get_positive_index(self, key): + key = str(key) return np.nonzero(self._models[key].classes_ == self.positive_class)[0][0] @@ -397,6 +406,7 @@ def __call__(self, event: ArrayEventContainer) -> None: def predict_table(self, key, table: Table) -> Dict[ReconstructionProperty, Table]: """Predict on a table of events""" + key = str(key) table = self.feature_generator(table, subarray=self.subarray) n_rows = len(table) @@ -464,6 +474,7 @@ def __call__(self, event: ArrayEventContainer) -> None: def predict_table(self, key, table: Table) -> Dict[ReconstructionProperty, Table]: """Predict on a table of events""" + key = str(key) table = self.feature_generator(table, subarray=self.subarray) n_rows = len(table) @@ -583,6 +594,10 @@ def __init__(self, subarray=None, models=None, **kwargs): self.__dict__.update(loaded.__dict__) self.subarray = subarray + def __setstate__(self, state): + state["_models"] = {str(k): v for k, v in state["_models"].items()} + return super().__setstate__(state) + def _new_models(self): norm_regressor = SUPPORTED_REGRESSORS[self.norm_cls](**self.norm_config) sign_classifier = SUPPORTED_CLASSIFIERS[self.sign_cls](**self.sign_config) @@ -754,6 +769,7 @@ def predict_table(self, key, table: Table) -> Dict[ReconstructionProperty, Table altaz_table : `~astropy.table.Table` Table with resulting predictions of horizontal coordinates """ + key = str(key) table = self.feature_generator(table, subarray=self.subarray) n_rows = len(table) @@ -781,13 +797,20 @@ def predict_table(self, key, table: Table) -> Dict[ReconstructionProperty, Table fov_lon = table["hillas_fov_lon"].quantity + disp * np.cos(psi) fov_lat = table["hillas_fov_lat"].quantity + disp * np.sin(psi) - # FIXME: Assume constant and parallel pointing for each run - self.log.warning("Assuming constant and parallel pointing for each run") + # prefer to use pointing interpolated to event + if "telescope_pointing_altitude" in table.colnames: + pointing_alt = table["telescope_pointing_altitude"] + pointing_az = table["telescope_pointing_azimuth"] + else: + # fallback to fixed pointing of ob + pointing_alt = table["subarray_pointing_lat"] + pointing_az = table["subarray_pointing_lon"] + alt, az = telescope_to_horizontal( lon=fov_lon, lat=fov_lat, - pointing_alt=table["subarray_pointing_lat"], - pointing_az=table["subarray_pointing_lon"], + pointing_alt=pointing_alt, + pointing_az=pointing_az, ) altaz_result = Table( diff --git a/ctapipe/tools/apply_models.py b/ctapipe/tools/apply_models.py index 2df691d8bf4..bc6aa5d9398 100644 --- a/ctapipe/tools/apply_models.py +++ b/ctapipe/tools/apply_models.py @@ -143,6 +143,7 @@ def setup(self): load_dl1_images=False, load_simulated=False, load_observation_info=True, + interpolate_pointing=True, ) ) @@ -175,7 +176,7 @@ def _apply(self, reconstructor, tel_tables, start, stop): prefix = reconstructor.prefix for tel_id, table in tel_tables.items(): - tel = self.loader.subarray.tel[tel_id] + tel = str(self.loader.subarray.tel[tel_id]) if len(table) == 0: self.log.info("No events for telescope %d", tel_id) diff --git a/ctapipe/tools/merge.py b/ctapipe/tools/merge.py index 5dada6d8883..387cee2ead5 100644 --- a/ctapipe/tools/merge.py +++ b/ctapipe/tools/merge.py @@ -77,6 +77,10 @@ class MergeTool(Tool): } flags = { + "single-ob": ( + {"HDF5Merger": {"single_ob": True}}, + "Only copy observation config of first file to be merged.", + ), "progress": ( {"MergeTool": {"progress_bar": True}}, "Show a progress bar for all given input files",