From 1bb80150e7a05d027deb1f572918d1ebddd26f42 Mon Sep 17 00:00:00 2001 From: Mario Gonzalez Date: Thu, 7 May 2026 03:00:11 +0200 Subject: [PATCH 1/2] Enable the registration at runtime of DtoH and HtoD product transformations --- .../plugins/alpaka/TrivialSerialisation.cc | 3 +- .../plugins/alpaka/TrivialSerialisation.cc | 6 +- .../plugins/alpaka/TrivialSerialisation.cc | 6 +- .../plugins/alpaka/TrivialSerialisation.cc | 9 +- .../plugins/alpaka/TrivialSerialisation.cc | 9 +- .../plugins/alpaka/TrivialSerialisation.cc | 5 +- .../plugins/alpaka/TrivialSerialisation.cc | 3 +- .../plugins/alpaka/TrivialSerialisation.cc | 12 +- .../plugins/alpaka/TrivialSerialisation.cc | 30 +- .../plugins/alpaka/TrivialSerialisation.cc | 3 +- .../plugins/alpaka/TrivialSerialisation.cc | 6 +- .../SiStripClusterSoA/plugins/BuildFile.xml | 12 + .../plugins/TrivialSerialisation.cc | 4 + .../plugins/alpaka/TrivialSerialisation.cc | 5 + .../SiStripDigiSoA/plugins/BuildFile.xml | 12 + .../plugins/TrivialSerialisation.cc | 4 + .../plugins/alpaka/TrivialSerialisation.cc | 5 + .../plugins/alpaka/TrivialSerialisation.cc | 3 +- .../plugins/alpaka/TrivialSerialisation.cc | 3 +- .../plugins/alpaka/TrivialSerialisation.cc | 3 +- .../Framework/interface/stream/implementors.h | 13 + .../Framework/test/stream_producer_catch2.cc | 34 ++ .../interface/alpaka/ProducerBase.h | 100 ++++++ .../TrivialSerialisation/interface/Reader.h | 11 +- .../interface/alpaka/Serialiser.h | 147 ++++++++- .../interface/alpaka/SerialiserBase.h | 25 +- .../alpaka/SerialiserFactoryDevice.h | 34 +- .../plugins/BuildFile.xml | 15 + .../plugins/alpaka/GenericClonerPortable.cc | 299 ++++++++++++++++++ .../TrivialSerialisation/src/Serialiser.cc | 14 + .../TrivialSerialisation/test/BuildFile.xml | 1 + ...eCollectionsSerialiserPluginFactory.dev.cc | 14 +- .../test/testGenericClonerPortable_cfg.py | 95 ++++++ 33 files changed, 883 insertions(+), 62 deletions(-) create mode 100644 DataFormats/SiStripClusterSoA/plugins/BuildFile.xml create mode 100644 DataFormats/SiStripClusterSoA/plugins/TrivialSerialisation.cc create mode 100644 DataFormats/SiStripClusterSoA/plugins/alpaka/TrivialSerialisation.cc create mode 100644 DataFormats/SiStripDigiSoA/plugins/BuildFile.xml create mode 100644 DataFormats/SiStripDigiSoA/plugins/TrivialSerialisation.cc create mode 100644 DataFormats/SiStripDigiSoA/plugins/alpaka/TrivialSerialisation.cc create mode 100644 HeterogeneousCore/TrivialSerialisation/plugins/alpaka/GenericClonerPortable.cc create mode 100644 HeterogeneousCore/TrivialSerialisation/src/Serialiser.cc create mode 100644 HeterogeneousCore/TrivialSerialisation/test/testGenericClonerPortable_cfg.py diff --git a/DataFormats/BeamSpot/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/BeamSpot/plugins/alpaka/TrivialSerialisation.cc index c72d53adc73ed..eea5bb1e5e307 100644 --- a/DataFormats/BeamSpot/plugins/alpaka/TrivialSerialisation.cc +++ b/DataFormats/BeamSpot/plugins/alpaka/TrivialSerialisation.cc @@ -1,4 +1,5 @@ +#include "DataFormats/BeamSpot/interface/BeamSpotHost.h" #include "DataFormats/BeamSpot/interface/alpaka/BeamSpotDevice.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(BeamSpotDevice); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(BeamSpotHost, BeamSpotDevice); diff --git a/DataFormats/EcalDigi/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/EcalDigi/plugins/alpaka/TrivialSerialisation.cc index 198b397cb647a..cfb02b3b1477e 100644 --- a/DataFormats/EcalDigi/plugins/alpaka/TrivialSerialisation.cc +++ b/DataFormats/EcalDigi/plugins/alpaka/TrivialSerialisation.cc @@ -1,6 +1,8 @@ +#include "DataFormats/EcalDigi/interface/EcalDigiHostCollection.h" +#include "DataFormats/EcalDigi/interface/EcalDigiPhase2HostCollection.h" #include "DataFormats/EcalDigi/interface/alpaka/EcalDigiDeviceCollection.h" #include "DataFormats/EcalDigi/interface/alpaka/EcalDigiPhase2DeviceCollection.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(EcalDigiDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(EcalDigiPhase2DeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(EcalDigiHostCollection, EcalDigiDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(EcalDigiPhase2HostCollection, EcalDigiPhase2DeviceCollection); diff --git a/DataFormats/EcalRecHit/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/EcalRecHit/plugins/alpaka/TrivialSerialisation.cc index b716a3b99a2ea..0e5757deba5ad 100644 --- a/DataFormats/EcalRecHit/plugins/alpaka/TrivialSerialisation.cc +++ b/DataFormats/EcalRecHit/plugins/alpaka/TrivialSerialisation.cc @@ -1,6 +1,8 @@ +#include "DataFormats/EcalRecHit/interface/EcalRecHitHostCollection.h" +#include "DataFormats/EcalRecHit/interface/EcalUncalibratedRecHitHostCollection.h" #include "DataFormats/EcalRecHit/interface/alpaka/EcalRecHitDeviceCollection.h" #include "DataFormats/EcalRecHit/interface/alpaka/EcalUncalibratedRecHitDeviceCollection.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(EcalRecHitDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(EcalUncalibratedRecHitDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(EcalRecHitHostCollection, EcalRecHitDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(EcalUncalibratedRecHitHostCollection, EcalUncalibratedRecHitDeviceCollection); diff --git a/DataFormats/HGCalDigi/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/HGCalDigi/plugins/alpaka/TrivialSerialisation.cc index fbc2e7d8d8c1c..444e4e3265e0b 100644 --- a/DataFormats/HGCalDigi/plugins/alpaka/TrivialSerialisation.cc +++ b/DataFormats/HGCalDigi/plugins/alpaka/TrivialSerialisation.cc @@ -1,8 +1,11 @@ +#include "DataFormats/HGCalDigi/interface/HGCalDigiHost.h" +#include "DataFormats/HGCalDigi/interface/HGCalECONDPacketInfoHost.h" +#include "DataFormats/HGCalDigi/interface/HGCalFEDPacketInfoHost.h" #include "DataFormats/HGCalDigi/interface/alpaka/HGCalDigiDevice.h" #include "DataFormats/HGCalDigi/interface/alpaka/HGCalECONDPacketInfoDevice.h" #include "DataFormats/HGCalDigi/interface/alpaka/HGCalFEDPacketInfoDevice.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(hgcaldigi::HGCalDigiDevice); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(hgcaldigi::HGCalECONDPacketInfoDevice); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(hgcaldigi::HGCalFEDPacketInfoDevice); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(hgcaldigi::HGCalDigiHost, hgcaldigi::HGCalDigiDevice); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(hgcaldigi::HGCalECONDPacketInfoHost, hgcaldigi::HGCalECONDPacketInfoDevice); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(hgcaldigi::HGCalFEDPacketInfoHost, hgcaldigi::HGCalFEDPacketInfoDevice); diff --git a/DataFormats/HGCalReco/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/HGCalReco/plugins/alpaka/TrivialSerialisation.cc index bb0b22d0498d2..acc500fd6ff95 100644 --- a/DataFormats/HGCalReco/plugins/alpaka/TrivialSerialisation.cc +++ b/DataFormats/HGCalReco/plugins/alpaka/TrivialSerialisation.cc @@ -1,8 +1,11 @@ +#include "DataFormats/HGCalReco/interface/HGCalSoAClustersHostCollection.h" +#include "DataFormats/HGCalReco/interface/HGCalSoARecHitsExtraHostCollection.h" +#include "DataFormats/HGCalReco/interface/HGCalSoARecHitsHostCollection.h" #include "DataFormats/HGCalReco/interface/alpaka/HGCalSoAClustersDeviceCollection.h" #include "DataFormats/HGCalReco/interface/alpaka/HGCalSoARecHitsExtraDeviceCollection.h" #include "DataFormats/HGCalReco/interface/alpaka/HGCalSoARecHitsDeviceCollection.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(HGCalSoAClustersDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(HGCalSoARecHitsExtraDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(HGCalSoARecHitsDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(HGCalSoAClustersHostCollection, HGCalSoAClustersDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(HGCalSoARecHitsExtraHostCollection, HGCalSoARecHitsExtraDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(HGCalSoARecHitsHostCollection, HGCalSoARecHitsDeviceCollection); diff --git a/DataFormats/HcalDigi/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/HcalDigi/plugins/alpaka/TrivialSerialisation.cc index cea62bac1fe9d..d16c35583f5bb 100644 --- a/DataFormats/HcalDigi/plugins/alpaka/TrivialSerialisation.cc +++ b/DataFormats/HcalDigi/plugins/alpaka/TrivialSerialisation.cc @@ -1,5 +1,6 @@ +#include "DataFormats/HcalDigi/interface/HcalDigiHostCollection.h" #include "DataFormats/HcalDigi/interface/alpaka/HcalDigiDeviceCollection.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(hcal::Phase0DigiDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(hcal::Phase1DigiDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(hcal::Phase0DigiHostCollection, hcal::Phase0DigiDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(hcal::Phase1DigiHostCollection, hcal::Phase1DigiDeviceCollection); diff --git a/DataFormats/HcalRecHit/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/HcalRecHit/plugins/alpaka/TrivialSerialisation.cc index 76d4f0a4b69ed..1e16e57140bc6 100644 --- a/DataFormats/HcalRecHit/plugins/alpaka/TrivialSerialisation.cc +++ b/DataFormats/HcalRecHit/plugins/alpaka/TrivialSerialisation.cc @@ -1,4 +1,5 @@ +#include "DataFormats/HcalRecHit/interface/HcalRecHitHostCollection.h" #include "DataFormats/HcalRecHit/interface/alpaka/HcalRecHitDeviceCollection.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(hcal::RecHitDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(hcal::RecHitHostCollection, hcal::RecHitDeviceCollection); diff --git a/DataFormats/ParticleFlowReco/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/ParticleFlowReco/plugins/alpaka/TrivialSerialisation.cc index 13b2aa9d7aaa8..dc2d75e2b0b95 100644 --- a/DataFormats/ParticleFlowReco/plugins/alpaka/TrivialSerialisation.cc +++ b/DataFormats/ParticleFlowReco/plugins/alpaka/TrivialSerialisation.cc @@ -1,13 +1,17 @@ // Include the Eigen core library before including the SoA definitions #include +#include "DataFormats/ParticleFlowReco/interface/CaloRecHitHostCollection.h" +#include "DataFormats/ParticleFlowReco/interface/PFClusterHostCollection.h" +#include "DataFormats/ParticleFlowReco/interface/PFRecHitFractionHostCollection.h" +#include "DataFormats/ParticleFlowReco/interface/PFRecHitHostCollection.h" #include "DataFormats/ParticleFlowReco/interface/alpaka/CaloRecHitDeviceCollection.h" #include "DataFormats/ParticleFlowReco/interface/alpaka/PFClusterDeviceCollection.h" #include "DataFormats/ParticleFlowReco/interface/alpaka/PFRecHitFractionDeviceCollection.h" #include "DataFormats/ParticleFlowReco/interface/alpaka/PFRecHitDeviceCollection.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(reco::CaloRecHitDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(reco::PFClusterDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(reco::PFRecHitFractionDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(reco::PFRecHitDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(reco::CaloRecHitHostCollection, reco::CaloRecHitDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(reco::PFClusterHostCollection, reco::PFClusterDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(reco::PFRecHitFractionHostCollection, reco::PFRecHitFractionDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(reco::PFRecHitHostCollection, reco::PFRecHitDeviceCollection); diff --git a/DataFormats/PortableTestObjects/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/PortableTestObjects/plugins/alpaka/TrivialSerialisation.cc index 9ebb83cb1c748..9be2703d1e035 100644 --- a/DataFormats/PortableTestObjects/plugins/alpaka/TrivialSerialisation.cc +++ b/DataFormats/PortableTestObjects/plugins/alpaka/TrivialSerialisation.cc @@ -1,3 +1,11 @@ +#include "DataFormats/PortableTestObjects/interface/ImageHostCollection.h" +#include "DataFormats/PortableTestObjects/interface/LogitsHostCollection.h" +#include "DataFormats/PortableTestObjects/interface/MaskHostCollection.h" +#include "DataFormats/PortableTestObjects/interface/MultiHeadNetHostCollection.h" +#include "DataFormats/PortableTestObjects/interface/ParticleHostCollection.h" +#include "DataFormats/PortableTestObjects/interface/SimpleNetHostCollection.h" +#include "DataFormats/PortableTestObjects/interface/TestHostCollection.h" +#include "DataFormats/PortableTestObjects/interface/TestHostObject.h" #include "DataFormats/PortableTestObjects/interface/alpaka/ImageDeviceCollection.h" #include "DataFormats/PortableTestObjects/interface/alpaka/LogitsDeviceCollection.h" #include "DataFormats/PortableTestObjects/interface/alpaka/MaskDeviceCollection.h" @@ -8,13 +16,15 @@ #include "DataFormats/PortableTestObjects/interface/alpaka/TestDeviceObject.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(portabletest::ImageDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(portabletest::LogitsDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(portabletest::MaskDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(portabletest::MultiHeadNetDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(portabletest::ParticleDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(portabletest::SimpleNetDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(portabletest::TestDeviceCollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(portabletest::TestDeviceCollection2); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(portabletest::TestDeviceCollection3); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(portabletest::TestDeviceObject); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(portabletest::ImageHostCollection, portabletest::ImageDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(portabletest::LogitsHostCollection, portabletest::LogitsDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(portabletest::MaskHostCollection, portabletest::MaskDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(portabletest::MultiHeadNetHostCollection, + portabletest::MultiHeadNetDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(portabletest::ParticleHostCollection, portabletest::ParticleDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(portabletest::SimpleNetHostCollection, + portabletest::SimpleNetDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(portabletest::TestHostCollection, portabletest::TestDeviceCollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(portabletest::TestHostCollection2, portabletest::TestDeviceCollection2); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(portabletest::TestHostCollection3, portabletest::TestDeviceCollection3); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(portabletest::TestHostObject, portabletest::TestDeviceObject); diff --git a/DataFormats/SiPixelClusterSoA/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/SiPixelClusterSoA/plugins/alpaka/TrivialSerialisation.cc index c0493fa10a264..88f5be8e431bc 100644 --- a/DataFormats/SiPixelClusterSoA/plugins/alpaka/TrivialSerialisation.cc +++ b/DataFormats/SiPixelClusterSoA/plugins/alpaka/TrivialSerialisation.cc @@ -1,4 +1,5 @@ +#include "DataFormats/SiPixelClusterSoA/interface/SiPixelClustersHost.h" #include "DataFormats/SiPixelClusterSoA/interface/alpaka/SiPixelClustersSoACollection.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(SiPixelClustersSoACollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(SiPixelClustersHost, SiPixelClustersSoACollection); diff --git a/DataFormats/SiPixelDigiSoA/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/SiPixelDigiSoA/plugins/alpaka/TrivialSerialisation.cc index 32fa6373548f4..243e272559e2d 100644 --- a/DataFormats/SiPixelDigiSoA/plugins/alpaka/TrivialSerialisation.cc +++ b/DataFormats/SiPixelDigiSoA/plugins/alpaka/TrivialSerialisation.cc @@ -1,6 +1,8 @@ +#include "DataFormats/SiPixelDigiSoA/interface/SiPixelDigiErrorsHost.h" +#include "DataFormats/SiPixelDigiSoA/interface/SiPixelDigisHost.h" #include "DataFormats/SiPixelDigiSoA/interface/alpaka/SiPixelDigiErrorsSoACollection.h" #include "DataFormats/SiPixelDigiSoA/interface/alpaka/SiPixelDigisSoACollection.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(SiPixelDigiErrorsSoACollection); -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(SiPixelDigisSoACollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(SiPixelDigiErrorsHost, SiPixelDigiErrorsSoACollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(SiPixelDigisHost, SiPixelDigisSoACollection); diff --git a/DataFormats/SiStripClusterSoA/plugins/BuildFile.xml b/DataFormats/SiStripClusterSoA/plugins/BuildFile.xml new file mode 100644 index 0000000000000..3423521ad1fcf --- /dev/null +++ b/DataFormats/SiStripClusterSoA/plugins/BuildFile.xml @@ -0,0 +1,12 @@ + + + + + + + + + + + + diff --git a/DataFormats/SiStripClusterSoA/plugins/TrivialSerialisation.cc b/DataFormats/SiStripClusterSoA/plugins/TrivialSerialisation.cc new file mode 100644 index 0000000000000..96673a22848bf --- /dev/null +++ b/DataFormats/SiStripClusterSoA/plugins/TrivialSerialisation.cc @@ -0,0 +1,4 @@ +#include "DataFormats/SiStripClusterSoA/interface/SiStripClusterHost.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserFactory.h" + +DEFINE_TRIVIAL_SERIALISER_PLUGIN(sistrip::SiStripClusterHost); diff --git a/DataFormats/SiStripClusterSoA/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/SiStripClusterSoA/plugins/alpaka/TrivialSerialisation.cc new file mode 100644 index 0000000000000..97725b88c83e6 --- /dev/null +++ b/DataFormats/SiStripClusterSoA/plugins/alpaka/TrivialSerialisation.cc @@ -0,0 +1,5 @@ +#include "DataFormats/SiStripClusterSoA/interface/SiStripClusterHost.h" +#include "DataFormats/SiStripClusterSoA/interface/alpaka/SiStripClusterDevice.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" + +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(sistrip::SiStripClusterHost, sistrip::SiStripClusterDevice); diff --git a/DataFormats/SiStripDigiSoA/plugins/BuildFile.xml b/DataFormats/SiStripDigiSoA/plugins/BuildFile.xml new file mode 100644 index 0000000000000..2b10aec36c105 --- /dev/null +++ b/DataFormats/SiStripDigiSoA/plugins/BuildFile.xml @@ -0,0 +1,12 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/DataFormats/SiStripDigiSoA/plugins/TrivialSerialisation.cc b/DataFormats/SiStripDigiSoA/plugins/TrivialSerialisation.cc new file mode 100644 index 0000000000000..4a0abf14505ad --- /dev/null +++ b/DataFormats/SiStripDigiSoA/plugins/TrivialSerialisation.cc @@ -0,0 +1,4 @@ +#include "DataFormats/SiStripDigiSoA/interface/SiStripDigiHost.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserFactory.h" + +DEFINE_TRIVIAL_SERIALISER_PLUGIN(sistrip::SiStripDigiHost); diff --git a/DataFormats/SiStripDigiSoA/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/SiStripDigiSoA/plugins/alpaka/TrivialSerialisation.cc new file mode 100644 index 0000000000000..baa9c5b79c5eb --- /dev/null +++ b/DataFormats/SiStripDigiSoA/plugins/alpaka/TrivialSerialisation.cc @@ -0,0 +1,5 @@ +#include "DataFormats/SiStripDigiSoA/interface/SiStripDigiHost.h" +#include "DataFormats/SiStripDigiSoA/interface/alpaka/SiStripDigiDevice.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" + +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(sistrip::SiStripDigiHost, sistrip::SiStripDigiDevice); diff --git a/DataFormats/TrackSoA/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/TrackSoA/plugins/alpaka/TrivialSerialisation.cc index 35ee64a006aaf..3b6da5ddfffb8 100644 --- a/DataFormats/TrackSoA/plugins/alpaka/TrivialSerialisation.cc +++ b/DataFormats/TrackSoA/plugins/alpaka/TrivialSerialisation.cc @@ -1,4 +1,5 @@ +#include "DataFormats/TrackSoA/interface/TracksHost.h" #include "DataFormats/TrackSoA/interface/alpaka/TracksSoACollection.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(reco::TracksSoACollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(reco::TracksHost, reco::TracksSoACollection); diff --git a/DataFormats/TrackingRecHitSoA/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/TrackingRecHitSoA/plugins/alpaka/TrivialSerialisation.cc index ff080ccab5de4..99931707500c6 100644 --- a/DataFormats/TrackingRecHitSoA/plugins/alpaka/TrivialSerialisation.cc +++ b/DataFormats/TrackingRecHitSoA/plugins/alpaka/TrivialSerialisation.cc @@ -1,4 +1,5 @@ +#include "DataFormats/TrackingRecHitSoA/interface/TrackingRecHitsHost.h" #include "DataFormats/TrackingRecHitSoA/interface/alpaka/TrackingRecHitsSoACollection.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(reco::TrackingRecHitsSoACollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(reco::TrackingRecHitHost, reco::TrackingRecHitsSoACollection); diff --git a/DataFormats/VertexSoA/plugins/alpaka/TrivialSerialisation.cc b/DataFormats/VertexSoA/plugins/alpaka/TrivialSerialisation.cc index d81108e1f7410..98bf2917f3485 100644 --- a/DataFormats/VertexSoA/plugins/alpaka/TrivialSerialisation.cc +++ b/DataFormats/VertexSoA/plugins/alpaka/TrivialSerialisation.cc @@ -1,4 +1,5 @@ +#include "DataFormats/VertexSoA/interface/ZVertexHost.h" #include "DataFormats/VertexSoA/interface/alpaka/ZVertexSoACollection.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" -DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(reco::ZVertexSoACollection); +DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(reco::ZVertexHost, reco::ZVertexSoACollection); diff --git a/FWCore/Framework/interface/stream/implementors.h b/FWCore/Framework/interface/stream/implementors.h index 0b4dbd6a6485c..ab95cd142727f 100644 --- a/FWCore/Framework/interface/stream/implementors.h +++ b/FWCore/Framework/interface/stream/implementors.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -369,6 +370,18 @@ namespace edm { }); } + // Non-templated overload for the registration at runtime of products + // whose type is not known at compile-time + void registerTransformAsync( + edm::EDPutToken iToken, + std::function iPre, + std::function(edm::StreamID, std::any)> iF, + edm::TypeID returnType, + std::string productInstance) { + TransformerBase::registerTransformAsyncImp( + *this, iToken, returnType, std::move(productInstance), std::move(iPre), std::move(iF)); + } + private: size_t transformIndex_(edm::ProductDescription const& iBranch) const noexcept final { return TransformerBase::findMatchingIndex(*this, iBranch); diff --git a/FWCore/Framework/test/stream_producer_catch2.cc b/FWCore/Framework/test/stream_producer_catch2.cc index 85605f9bb8c22..2270cf4ad3d4e 100644 --- a/FWCore/Framework/test/stream_producer_catch2.cc +++ b/FWCore/Framework/test/stream_producer_catch2.cc @@ -19,6 +19,7 @@ #include "FWCore/Framework/interface/stream/EDProducerAdaptor.h" #include "FWCore/Framework/interface/OccurrenceTraits.h" #include "FWCore/Framework/interface/ProductResolversFactory.h" +#include "DataFormats/Common/interface/Wrapper.h" #include "DataFormats/Provenance/interface/ProductRegistry.h" #include "DataFormats/Provenance/interface/BranchIDListHelper.h" #include "FWCore/Framework/interface/HistoryAppender.h" @@ -27,6 +28,7 @@ #include "FWCore/ServiceRegistry/interface/StreamContext.h" #include "FWCore/Concurrency/interface/FinalWaitingTask.h" #include "FWCore/Utilities/interface/GlobalIdentifier.h" +#include "FWCore/Utilities/interface/TypeID.h" #include "FWCore/Utilities/interface/Exception.h" @@ -349,6 +351,33 @@ namespace { edm::EDPutTokenT token_; }; + class TransformAsyncUntypedProd : public edm::stream::EDProducer { + public: + struct IntHolder { + IntHolder() : value_(0) {} + IntHolder(int iV) : value_(iV) {} + int value_; + }; + TransformAsyncUntypedProd(edm::ParameterSet const&) { + edm::EDPutToken token = produces(); + registerTransformAsync( + token, + [](edm::StreamID, edm::WrapperBase const& iGotProduct, edm::WaitingTaskWithArenaHolder) { + return std::any(IntHolder(*static_cast const&>(iGotProduct).product())); + }, + [](edm::StreamID, std::any iCache) -> std::unique_ptr { + return std::make_unique>(edm::WrapperBase::Emplace{}, + std::any_cast(iCache).value_); + }, + edm::TypeID(typeid(int)), + std::string{}); + } + + void produce(edm::Event& iEvent, edm::EventSetup const&) final { + //iEvent.emplace(token_, 3.625); + } + }; + unsigned int BasicProd::m_count = 0; unsigned int GlobalProd::m_count = 0; unsigned int GlobalProdWithBeginJob::m_count = 0; @@ -634,5 +663,10 @@ namespace { Trans::kStreamEndLuminosityBlock, Trans::kGlobalEndLuminosityBlock}); } + + SECTION("transformAsyncUntypedProdTest") { + auto mod = createModule(); + REQUIRE(mod.get() != nullptr); + } } } // namespace diff --git a/HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h b/HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h index 93605443c1433..c57522e339f7d 100644 --- a/HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h +++ b/HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h @@ -9,14 +9,19 @@ #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/Utilities/interface/EDPutToken.h" #include "FWCore/Utilities/interface/Transition.h" +#include "FWCore/Utilities/interface/TypeID.h" #include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadataAcquireSentry.h" #include "HeterogeneousCore/AlpakaCore/interface/modulePrevalidate.h" #include "HeterogeneousCore/AlpakaInterface/interface/Backend.h" #include "HeterogeneousCore/AlpakaInterface/interface/CopyToDevice.h" #include "HeterogeneousCore/AlpakaInterface/interface/CopyToHost.h" +#include +#include #include +#include #include +#include namespace ALPAKA_ACCELERATOR_NAMESPACE { template @@ -120,6 +125,38 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { } } + template + edm::EDPutToken produces(edm::TypeID deviceProductType, + edm::TypeID hostProductType, + std::string instanceName, + TCopyAsync&& copyAsync, + TTransform&& transform) { + edm::EDPutToken token = this->producesCollector().template produces(hostProductType, instanceName); + + if constexpr (not detail::useProductDirectly) { + using TplType = std::tuple>; + + this->registerTransformAsync( + token, + [copyAsync = std::forward(copyAsync), synchronize = this->synchronize()]( + edm::StreamID streamID, edm::WrapperBase const& wb, edm::WaitingTaskWithArenaHolder holder) -> std::any { + detail::EDMetadataAcquireSentry sentry(streamID, std::move(holder), synchronize); + auto productOnDevice = copyAsync(sentry.metadata()->queue(), wb); + return std::make_shared(std::move(productOnDevice), sentry.finish()); + }, + [transform = std::forward(transform)](edm::StreamID, + std::any cache) -> std::unique_ptr { + auto tplPtr = std::any_cast>(cache); + auto& productOnDevice = std::get<0>(*tplPtr); + return transform(productOnDevice, std::move(std::get<1>(*tplPtr))); + }, + deviceProductType, + std::move(instanceName)); + } + + return token; + } + // Device products // // intentionally not returning BranchAliasSetter @@ -159,6 +196,41 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { return token; } } + + template + edm::EDPutToken deviceProduces(edm::TypeID deviceProductType, + edm::TypeID hostProductType, + std::string instanceName, + TGetQueue&& getQueue, + TCopyAsync&& copyAsync, + TTransform&& transform) { + edm::EDPutToken token = this->producesCollector().template produces(deviceProductType, instanceName); + + if constexpr (not detail::useProductDirectly) { + using TplType = std::tuple>; + + this->registerTransformAsync( + token, + [getQueue = std::forward(getQueue), copyAsync = std::forward(copyAsync)]( + edm::StreamID, edm::WrapperBase const& wb, edm::WaitingTaskWithArenaHolder holder) -> std::any { + auto queue = getQueue(wb); + detail::EDMetadataAcquireSentry sentry(queue, std::move(holder)); + auto metadataPtr = sentry.metadata(); + auto productOnHost = copyAsync(*queue, *metadataPtr, wb); + return std::make_shared(std::move(productOnHost), sentry.finish()); + }, + [transform = std::forward(transform)](edm::StreamID, + std::any cache) -> std::unique_ptr { + auto tplPtr = std::any_cast>(cache); + auto productOnHost = std::get<0>(*tplPtr); + return transform(productOnHost); + }, + hostProductType, + std::move(instanceName)); + } + + return token; + } }; // Adaptor class to make the type-deducing produces() calls to work @@ -171,12 +243,40 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE { return producer_.template produces(label_); } + // for runtime-typed host-only products + template + edm::EDPutToken produces(edm::TypeID deviceProductType, + edm::TypeID hostProductType, + TCopyAsync&& copyAsync, + TTransform&& transform) { + return producer_.template produces(deviceProductType, + hostProductType, + label_, + std::forward(copyAsync), + std::forward(transform)); + } + // for device products template edm::EDPutTokenT deviceProduces() { return producer_.template deviceProduces(label_); } + // for runtime-typed device products + template + edm::EDPutToken deviceProduces(edm::TypeID deviceProductType, + edm::TypeID hostProductType, + TGetQueue&& getQueue, + TCopyAsync&& copyAsync, + TTransform&& transform) { + return producer_.template deviceProduces(deviceProductType, + hostProductType, + label_, + std::forward(getQueue), + std::forward(copyAsync), + std::forward(transform)); + } + private: // only ProducerBase is allowed to make an instance of this class friend TProducer; diff --git a/HeterogeneousCore/TrivialSerialisation/interface/Reader.h b/HeterogeneousCore/TrivialSerialisation/interface/Reader.h index c8d26e49ab333..3af8f756cf961 100644 --- a/HeterogeneousCore/TrivialSerialisation/interface/Reader.h +++ b/HeterogeneousCore/TrivialSerialisation/interface/Reader.h @@ -1,17 +1,20 @@ #ifndef HeterogeneousCore_TrivialSerialisation_interface_Reader_h #define HeterogeneousCore_TrivialSerialisation_interface_Reader_h -#include -#include #include +#include +#include #include "DataFormats/Common/interface/Wrapper.h" #include "DataFormats/TrivialSerialisation/interface/MemoryCopyTraits.h" -#include "FWCore/Utilities/interface/EDMException.h" #include "HeterogeneousCore/TrivialSerialisation/interface/AnyBuffer.h" #include "HeterogeneousCore/TrivialSerialisation/interface/Common.h" #include "HeterogeneousCore/TrivialSerialisation/interface/ReaderBase.h" +namespace ngt::detail { + [[noreturn]] void throwEmptyWrapperError(); +} + namespace ngt { // Reader for host products: extracts properties and memory regions from a Wrapper. @@ -32,7 +35,7 @@ namespace ngt { const T& object() const { const WrapperType& w = static_cast(*ptr_); if (not w.isPresent()) { - throw edm::Exception(edm::errors::LogicError) << "Attempt to access an empty Wrapper"; + detail::throwEmptyWrapperError(); } return w.bareProduct(); } diff --git a/HeterogeneousCore/TrivialSerialisation/interface/alpaka/Serialiser.h b/HeterogeneousCore/TrivialSerialisation/interface/alpaka/Serialiser.h index 7e9e0b497a9c4..5804e881d5c61 100644 --- a/HeterogeneousCore/TrivialSerialisation/interface/alpaka/Serialiser.h +++ b/HeterogeneousCore/TrivialSerialisation/interface/alpaka/Serialiser.h @@ -1,19 +1,43 @@ #ifndef HeterogeneousCore_TrivialSerialisation_interface_alpaka_Serialiser_h #define HeterogeneousCore_TrivialSerialisation_interface_alpaka_Serialiser_h +#include #include +#include -#include "DataFormats/Common/interface/Wrapper.h" -#include "DataFormats/Common/interface/WrapperBase.h" #include "DataFormats/AlpakaCommon/interface/alpaka/DeviceProductType.h" #include "DataFormats/AlpakaCommon/interface/alpaka/EDMetadata.h" +#include "DataFormats/Common/interface/Wrapper.h" +#include "DataFormats/Common/interface/WrapperBase.h" +#include "HeterogeneousCore/AlpakaInterface/interface/CopyToDevice.h" +#include "HeterogeneousCore/AlpakaInterface/interface/CopyToHost.h" #include "HeterogeneousCore/AlpakaInterface/interface/config.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/Reader.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserBase.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/Writer.h" +namespace ngt::detail { + [[noreturn]] void throwHostProductTypeIDError(); +} + namespace ALPAKA_ACCELERATOR_NAMESPACE::ngt { + template + concept HasCopyToHost = requires(TQueue& q, T const& t) { + { cms::alpakatools::CopyToHost::copyAsync(q, t) }; + }; + + template + concept HasCopyToDevice = requires(TQueue& q, T const& t) { + { cms::alpakatools::CopyToDevice::copyAsync(q, t) }; + }; + + // Get through CopyToHost the host-equivalent of T + template + requires HasCopyToHost + using HostTypeOf = std::remove_cvref_t::copyAsync( + std::declval(), std::declval()))>; + // Concrete Serialiser for device products. // T is the inner product type (e.g. PortableDeviceCollection<...>). template @@ -31,6 +55,125 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::ngt { return std::make_unique>(w.bareProduct().template getSynchronized(metadata)); } } + + // The methods below are not really related to serialisation. They might be + // moved elsewhere when a better place for them is found. + std::type_info const& productTypeID() const override { return typeid(detail::DeviceProductType); } + + std::type_info const& hostProductTypeID() const override { + if constexpr (HasCopyToHost) { + return typeid(HostTypeOf); + } else { + ::ngt::detail::throwHostProductTypeIDError(); + } + } + + bool hasCopyToHost() const override { return HasCopyToHost; } + + bool hasCopyToDevice() const override { + if constexpr (HasCopyToHost) { + return HasCopyToDevice, Queue>; + } else { + return false; + } + } + + std::function(edm::WrapperBase const&)> getQueue() const override { + if constexpr (not detail::useProductDirectly) { + return [](edm::WrapperBase const& wb) -> std::shared_ptr { + auto const& deviceProduct = dynamic_cast(wb).bareProduct(); + return deviceProduct.template metadata().shared_queue(); + }; + } else { + return nullptr; + } + } + + std::function preTransformDtoH() const override { + if constexpr (HasCopyToHost) { + using CopyT = cms::alpakatools::CopyToHost; + using HostProductType = HostTypeOf; + + return [](Queue& queue, EDMetadata& metadata, edm::WrapperBase const& wb) -> std::any { + auto const& deviceProduct = dynamic_cast(wb).bareProduct(); + T const& productOnDevice = deviceProduct.template getSynchronized(metadata); + auto productOnHost = CopyT::copyAsync(queue, productOnDevice); + return std::make_shared(std::move(productOnHost)); + }; + } else { + return nullptr; + } + } + + std::function(std::any const&)> transformDtoH() const override { + if constexpr (HasCopyToHost) { + using CopyT = cms::alpakatools::CopyToHost; + using HostProductType = HostTypeOf; + + return [](std::any const& cache) -> std::unique_ptr { + auto& productOnHost = *std::any_cast>(cache); + if constexpr (requires { CopyT::postCopy(productOnHost); }) { + CopyT::postCopy(productOnHost); + } + return std::make_unique>(edm::WrapperBase::Emplace{}, std::move(productOnHost)); + }; + } else { + return nullptr; + } + } + + std::function preTransformHtoD() const override { + if constexpr (HasCopyToHost) { + // HasCopyToHost is required to get HostProductType, which is needed to + // evaluate HasCopyToDevice. + using HostProductType = HostTypeOf; + if constexpr (HasCopyToDevice) { + using CopyT = cms::alpakatools::CopyToDevice; + using DeviceProductType = std::remove_cvref_t(), std::declval()))>; + static_assert(std::is_same_v, + "CopyToDevice>::copyAsync() must return a device product of type T!"); + + return [](Queue& queue, edm::WrapperBase const& wb) -> std::any { + auto const& hostProduct = dynamic_cast const&>(wb).bareProduct(); + auto productOnDevice = CopyT::copyAsync(queue, hostProduct); + return std::make_shared(std::move(productOnDevice)); + }; + } else { + return nullptr; + } + } else { + return nullptr; + } + } + + std::function(std::any const&, std::shared_ptr)> transformHtoD() + const override { + if constexpr (HasCopyToHost) { + using HostProductType = HostTypeOf; + if constexpr (HasCopyToDevice) { + using CopyT = cms::alpakatools::CopyToDevice; + using DeviceProductType = std::remove_cvref_t(), std::declval()))>; + static_assert(std::is_same_v, + "CopyToDevice>::copyAsync() must return a device product of type T!"); + + return [](std::any const& cache, std::shared_ptr metadata) -> std::unique_ptr { + auto& productOnDevice = *std::any_cast>(cache); + if constexpr (detail::useProductDirectly) { + return std::make_unique(edm::WrapperBase::Emplace{}, std::move(productOnDevice)); + } else { + return std::make_unique( + edm::WrapperBase::Emplace{}, std::move(metadata), std::move(productOnDevice)); + } + }; + } else { + return nullptr; + } + } else { + return nullptr; + } + } }; } // namespace ALPAKA_ACCELERATOR_NAMESPACE::ngt diff --git a/HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserBase.h b/HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserBase.h index 11dd14136ffeb..1b80438ae0302 100644 --- a/HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserBase.h +++ b/HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserBase.h @@ -1,10 +1,13 @@ #ifndef HeterogeneousCore_TrivialSerialisation_interface_alpaka_SerialiserBase_h #define HeterogeneousCore_TrivialSerialisation_interface_alpaka_SerialiserBase_h +#include +#include #include +#include -#include "DataFormats/Common/interface/WrapperBase.h" #include "DataFormats/AlpakaCommon/interface/alpaka/EDMetadata.h" +#include "DataFormats/Common/interface/WrapperBase.h" #include "HeterogeneousCore/AlpakaInterface/interface/config.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/ReaderBase.h" #include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/WriterBase.h" @@ -17,6 +20,26 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::ngt { virtual std::unique_ptr writer() = 0; virtual std::unique_ptr reader(const edm::WrapperBase& wrapper, EDMetadata& metadata) = 0; + virtual bool hasCopyToHost() const = 0; + virtual bool hasCopyToDevice() const = 0; + + // Methods needed to register a DtoH transform + virtual std::function preTransformDtoH() const = 0; + virtual std::function(std::any const&)> transformDtoH() const = 0; + virtual std::function(edm::WrapperBase const&)> getQueue() const = 0; + + // Methods needed to register a HtoD transform + virtual std::function preTransformHtoD() const = 0; + virtual std::function(std::any const&, std::shared_ptr)> + transformHtoD() const = 0; + + // Return the type_info of the product type (DeviceProduct for async + // backends, T for serial_sync) + virtual std::type_info const& productTypeID() const = 0; + + // Return the type_info of the host-equivalent of T + virtual std::type_info const& hostProductTypeID() const = 0; + virtual ~SerialiserBase() = default; }; } // namespace ALPAKA_ACCELERATOR_NAMESPACE::ngt diff --git a/HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h b/HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h index 7bb5c21d91bfd..aa290b339ed97 100644 --- a/HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h +++ b/HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h @@ -1,7 +1,6 @@ #ifndef HeterogeneousCore_TrivialSerialisation_interface_alpaka_SerialiserFactoryDevice_h #define HeterogeneousCore_TrivialSerialisation_interface_alpaka_SerialiserFactoryDevice_h -#include "DataFormats/Common/interface/DeviceProduct.h" #include "FWCore/PluginManager/interface/PluginFactory.h" #include "FWCore/Utilities/interface/stringize.h" #include "HeterogeneousCore/AlpakaInterface/interface/config.h" @@ -14,17 +13,24 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::ngt { // Helper macro to define Serialiser plugins. // -// TYPE is the inner product type (e.g. PortableDeviceCollection<...>), not -// wrapped in DeviceProduct, and without ALPAKA_ACCELERATOR_NAMESPACE:: attached -// to it (it is attached here). The plugin is registered under both the mangled -// typeid name and EDM_STRINGIZE(TYPE). EDM_STRINGIZE(TYPE) is more -// human-readable, and thus more suitable for Python configuration files. -#define DEFINE_TRIVIAL_SERIALISER_PLUGIN_DEVICE(TYPE) \ - DEFINE_EDM_PLUGIN(ALPAKA_ACCELERATOR_NAMESPACE::ngt::SerialiserFactoryDevice, \ - ALPAKA_ACCELERATOR_NAMESPACE::ngt::Serialiser, \ - typeid(edm::DeviceProduct).name()); \ - DEFINE_EDM_PLUGIN2(ALPAKA_ACCELERATOR_NAMESPACE::ngt::SerialiserFactoryDevice, \ - ALPAKA_ACCELERATOR_NAMESPACE::ngt::Serialiser, \ - EDM_STRINGIZE(TYPE)) - +// TYPE_DEVICE is the device type alias (e.g. sistrip::SiStripClusterDevice), +// without ALPAKA_ACCELERATOR_NAMESPACE:: attached to it (it is attached here). +// +// TYPE_HOST is the type that was passed to DEFINE_TRIVIAL_SERIALISER_PLUGIN +// when registering this type in the non-alpaka TrivialSerialisation factory. It +// is required to match a host type with a device serialiser. The H to D product +// transformation can then be registered through this device serialiser. +// +// The plugin is registered under two keys: +// +// 1. mangled typeid name of TYPE_HOST: used to look up the device serialiser +// for a host type. +// 2. EDM_STRINGIZE(TYPE_DEVICE): used to look up the device serialiser +#define DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN(TYPE_HOST, TYPE_DEVICE) \ + DEFINE_EDM_PLUGIN(ALPAKA_ACCELERATOR_NAMESPACE::ngt::SerialiserFactoryDevice, \ + ALPAKA_ACCELERATOR_NAMESPACE::ngt::Serialiser, \ + typeid(TYPE_HOST).name()); \ + DEFINE_EDM_PLUGIN2(ALPAKA_ACCELERATOR_NAMESPACE::ngt::SerialiserFactoryDevice, \ + ALPAKA_ACCELERATOR_NAMESPACE::ngt::Serialiser, \ + EDM_STRINGIZE(TYPE_DEVICE)) #endif // HeterogeneousCore_TrivialSerialisation_interface_alpaka_SerialiserFactoryDevice_h diff --git a/HeterogeneousCore/TrivialSerialisation/plugins/BuildFile.xml b/HeterogeneousCore/TrivialSerialisation/plugins/BuildFile.xml index 014e295e3082f..f6a61f680b335 100644 --- a/HeterogeneousCore/TrivialSerialisation/plugins/BuildFile.xml +++ b/HeterogeneousCore/TrivialSerialisation/plugins/BuildFile.xml @@ -8,3 +8,18 @@ + + + + + + + + + + + + + + + diff --git a/HeterogeneousCore/TrivialSerialisation/plugins/alpaka/GenericClonerPortable.cc b/HeterogeneousCore/TrivialSerialisation/plugins/alpaka/GenericClonerPortable.cc new file mode 100644 index 0000000000000..bf10959957e31 --- /dev/null +++ b/HeterogeneousCore/TrivialSerialisation/plugins/alpaka/GenericClonerPortable.cc @@ -0,0 +1,299 @@ +/* + * This Alpaka EDProducer clones host or device event products declared in + * its configuration, using the plugin-based NGT trivial serialisation. + * + * - Host type aliases (e.g. "portabletest::TestHostCollection") are cloned + * using the host TrivialSerialisation mechanism with std::memcpy. If a + * matching device serialiser is registered, the H->D transformation is + * also registered at construction time. + * + * - Device type aliases (e.g. "sistrip::SiStripClusterDevice") are cloned on + * device using alpaka::memcpy. The D->H transformation is registered if + * available. + * + * Products are configured as a VPSet with type and InputTag. + */ + +// C++ include files +#include +#include +#include +#include +#include + +#include +#include + +// CMSSW include files +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/WrapperBaseHandle.h" +#include "FWCore/Framework/interface/WrapperBaseOrphanHandle.h" +#include "FWCore/Framework/interface/stream/EDProducer.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Reflection/interface/TypeWithDict.h" +#include "FWCore/Utilities/interface/EDMException.h" +#include "FWCore/Utilities/interface/InputTag.h" +#include "FWCore/Utilities/interface/TypeID.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadataSentry.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/MakerMacros.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h" +#include "HeterogeneousCore/AlpakaInterface/interface/config.h" +#include "HeterogeneousCore/AlpakaInterface/interface/memory.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/ReaderBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserFactory.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/WriterBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/ReaderBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/WriterBase.h" + +namespace ALPAKA_ACCELERATOR_NAMESPACE::ngt { + + class GenericClonerPortable : public ProducerBase { + public: + explicit GenericClonerPortable(edm::ParameterSet const& config); + ~GenericClonerPortable() override = default; + + void produce(edm::Event& event, edm::EventSetup const&) final; + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions); + + private: + struct Entry { + std::string typeName; // human-readable type name from config + edm::TypeID typeID; + edm::EDGetToken getToken; + edm::EDPutToken putToken; + std::unique_ptr deviceSerialiser; + std::unique_ptr<::ngt::SerialiserBase> hostSerialiser; + edm::TypeWithDict wrappedType; + }; + + std::vector eventProducts_; + bool hasDeviceProducts_ = false; + bool verbose_; + }; + + GenericClonerPortable::GenericClonerPortable(edm::ParameterSet const& config) + : ProducerBase(config), verbose_(config.getUntrackedParameter("verbose")) { + auto const& products = config.getParameter>("products"); + eventProducts_.reserve(products.size()); + + for (auto const& product : products) { + auto const& type = product.getParameter("type"); + auto const& src = product.getParameter("src"); + + Entry entry; + entry.typeName = type; + + // Lookup the right serialiser. In order of preference: + // SerialiserFactoryDevice, SerialiserFactory, ROOT Serialisation. + // + // Check 1: Look up by the device type alias as written in the config + // (e.g. "sistrip::SiStripClusterDevice"). + std::unique_ptr deviceSerialiser = ngt::SerialiserFactoryDevice::get()->tryToCreate(type); + + if (deviceSerialiser) { + entry.typeID = edm::TypeID{deviceSerialiser->productTypeID()}; + entry.getToken = this->consumes(edm::TypeToGet{entry.typeID, edm::PRODUCT_TYPE}, src); + hasDeviceProducts_ = true; + + if (deviceSerialiser->hasCopyToHost()) { + entry.putToken = this->produces(src.instance()) + .deviceProduces(edm::TypeID{deviceSerialiser->productTypeID()}, + edm::TypeID{deviceSerialiser->hostProductTypeID()}, + deviceSerialiser->getQueue(), + deviceSerialiser->preTransformDtoH(), + deviceSerialiser->transformDtoH()); + } else { + entry.putToken = + this->producesCollector().template produces(entry.typeID, src.instance()); + } + + entry.deviceSerialiser = std::move(deviceSerialiser); + + if (verbose_) { + edm::LogInfo("GenericClonerPortable") << "will clone device product of type '" << type << "', " << src; + } + + eventProducts_.emplace_back(std::move(entry)); + continue; + } + + // Check 2: "type" could be a host type alias "T" for which a host + // serialiser (and perhaps a portable serialiser for the H->D transform) + // exists. + edm::TypeWithDict twd = edm::TypeWithDict::byName(type); + std::unique_ptr portableSerialiser; + std::unique_ptr<::ngt::SerialiserBase> hostSerialiser; + if (twd.typeInfo() != typeid(void)) { + portableSerialiser = ngt::SerialiserFactoryDevice::get()->tryToCreate(twd.typeInfo().name()); + hostSerialiser = ::ngt::SerialiserFactory::get()->tryToCreate(twd.typeInfo().name()); + } + + if (hostSerialiser && twd.typeInfo() != typeid(void)) { + entry.typeID = edm::TypeID{twd.typeInfo()}; + entry.getToken = this->consumes(edm::TypeToGet{entry.typeID, edm::PRODUCT_TYPE}, src); + + if (portableSerialiser && portableSerialiser->hasCopyToDevice()) { + entry.putToken = this->produces(src.instance()) + .produces(edm::TypeID{portableSerialiser->productTypeID()}, + edm::TypeID{portableSerialiser->hostProductTypeID()}, + portableSerialiser->preTransformHtoD(), + portableSerialiser->transformHtoD()); + } else { + entry.putToken = + this->producesCollector().template produces(entry.typeID, src.instance()); + } + + entry.hostSerialiser = std::move(hostSerialiser); + + if (verbose_) { + edm::LogInfo("GenericClonerPortable") << "will clone host product of type '" << type << "', label '" + << src.label() << "', instance '" << src.instance() << "'"; + } + + eventProducts_.emplace_back(std::move(entry)); + continue; + } + + // Check 3: Fall back to ROOT serialisation, if a ROOT dictionary is + // found for this type. + edm::TypeWithDict wrappedTwd = edm::TypeWithDict::byName("edm::Wrapper<" + type + ">"); + if (twd.typeInfo() == typeid(void) || !wrappedTwd.getClass()) { + throw cms::Exception("GenericClonerPortable") + << "No serialisation mechanism (device or host TrivialSerialisation, or ROOT dictionaries) found for " + "type '" + << type + << "'. Please register a serialiser via DEFINE_TRIVIAL_SERIALISER_PLUGIN or " + "DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN, or ensure a ROOT dictionary exists for this type."; + } + + entry.typeID = edm::TypeID{twd.typeInfo()}; + entry.getToken = this->consumes(edm::TypeToGet{entry.typeID, edm::PRODUCT_TYPE}, src); + entry.putToken = + this->producesCollector().template produces(entry.typeID, src.instance()); + entry.wrappedType = wrappedTwd; + + if (verbose_) { + edm::LogInfo("GenericClonerPortable") << "will clone ROOT-serialised product of type '" << type << "', label '" + << src.label() << "', instance '" << src.instance() << "'"; + } + + eventProducts_.emplace_back(std::move(entry)); + continue; + } + } + + void GenericClonerPortable::produce(edm::Event& event, edm::EventSetup const& /*unused*/) { + std::unique_ptr<::ALPAKA_ACCELERATOR_NAMESPACE::detail::EDMetadataSentry> sentry; + if (hasDeviceProducts_) { + sentry = std::make_unique<::ALPAKA_ACCELERATOR_NAMESPACE::detail::EDMetadataSentry>(event.streamID(), + this->synchronize()); + } + + for (auto& entry : eventProducts_) { + edm::Handle handle(entry.typeID.typeInfo()); + event.getByToken(entry.getToken, handle); + edm::WrapperBase const* wrapper = handle.product(); + if (wrapper == nullptr) { + throw edm::Exception(edm::errors::ProductNotFound) + << "Product of type '" << entry.typeName << "' not found in event."; + } + + if (entry.hostSerialiser) { + auto reader = entry.hostSerialiser->reader(*wrapper); + auto writer = entry.hostSerialiser->writer(); + + writer->initialize(reader->parameters()); + + auto targets = writer->regions(); + auto sources = reader->regions(); + + assert(sources.size() == targets.size()); + for (size_t j = 0; j < sources.size(); ++j) { + assert(sources[j].data() != nullptr); + assert(targets[j].data() != nullptr); + assert(targets[j].size_bytes() == sources[j].size_bytes()); + std::memcpy(targets[j].data(), sources[j].data(), sources[j].size_bytes()); + } + + writer->finalize(); + event.put(entry.putToken, writer->get()); + } else if (entry.deviceSerialiser) { + auto reader = entry.deviceSerialiser->reader(*wrapper, *sentry->metadata()); + auto writer = entry.deviceSerialiser->writer(); + + writer->initialize(sentry->metadata()->queue(), reader->parameters()); + + auto targets = writer->regions(); + auto sources = reader->regions(); + + assert(sources.size() == targets.size()); + for (size_t j = 0; j < sources.size(); ++j) { + assert(sources[j].data() != nullptr); + assert(targets[j].data() != nullptr); + assert(targets[j].size_bytes() == sources[j].size_bytes()); + alpaka::memcpy(sentry->metadata()->queue(), + cms::alpakatools::make_device_view(sentry->metadata()->queue(), targets[j]), + cms::alpakatools::make_device_view(sentry->metadata()->queue(), sources[j])); + } + + writer->finalize(); + event.put(entry.putToken, writer->get(sentry->metadata())); + } else { + TClass* cls = entry.wrappedType.getClass(); + if (!cls) { + throw edm::Exception(edm::errors::LogicError) + << "Failed to get ROOT dictionary class for type '" << entry.typeName << "'."; + } + + TBufferFile serializedBuffer(TBuffer::kWrite); + serializedBuffer.WriteObjectAny(wrapper, cls, false); + + serializedBuffer.SetReadMode(); + serializedBuffer.Reset(); + + auto clone = + std::unique_ptr(reinterpret_cast(serializedBuffer.ReadObjectAny(cls))); + if (!clone) { + throw edm::Exception(edm::errors::LogicError) + << "Failed to deserialize ROOT product for type '" << entry.typeName << "'."; + } + event.put(entry.putToken, std::move(clone)); + } + } + + this->putBackend(event); + if (sentry) { + sentry->finish(true); + } + } + + void GenericClonerPortable::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + descriptions.setComment( + "This Alpaka EDProducer will clone all the host or device event products declared by its configuration, " + "using the Host or Device TrivialSerialisation mechanism. "); + + edm::ParameterSetDescription product; + product.add("type")->setComment( + "Type alias of the product to clone. Use the host type alias " + "(e.g. \"portabletest::TestHostCollection\") to clone a host product, or the device type alias " + "without the ALPAKA_ACCELERATOR_NAMESPACE prefix " + "(e.g. \"sistrip::SiStripClusterDevice\") to clone a device product."); + product.add("src")->setComment("InputTag (label and instance) of the product to clone."); + + edm::ParameterSetDescription desc; + desc.addVPSet("products", product, {})->setComment("Host or device products to be cloned."); + desc.addUntracked("verbose", false)->setComment("Print the type names of the products that will be cloned."); + + descriptions.addWithDefaultLabel(desc); + } + +} // namespace ALPAKA_ACCELERATOR_NAMESPACE::ngt + +DEFINE_FWK_ALPAKA_MODULE(ngt::GenericClonerPortable); diff --git a/HeterogeneousCore/TrivialSerialisation/src/Serialiser.cc b/HeterogeneousCore/TrivialSerialisation/src/Serialiser.cc new file mode 100644 index 0000000000000..be83329567e77 --- /dev/null +++ b/HeterogeneousCore/TrivialSerialisation/src/Serialiser.cc @@ -0,0 +1,14 @@ +#include "FWCore/Utilities/interface/EDMException.h" + +namespace ngt::detail { + + void throwEmptyWrapperError() { + throw edm::Exception(edm::errors::LogicError) << "Attempt to access an empty Wrapper"; + } + + void throwHostProductTypeIDError() { + throw edm::Exception(edm::errors::LogicError) + << "hostProductTypeID() called on a type without a CopyToHost specialisation"; + } + +} // namespace ngt::detail diff --git a/HeterogeneousCore/TrivialSerialisation/test/BuildFile.xml b/HeterogeneousCore/TrivialSerialisation/test/BuildFile.xml index ca193d8113369..562e403e285e9 100644 --- a/HeterogeneousCore/TrivialSerialisation/test/BuildFile.xml +++ b/HeterogeneousCore/TrivialSerialisation/test/BuildFile.xml @@ -8,6 +8,7 @@ + diff --git a/HeterogeneousCore/TrivialSerialisation/test/alpaka/test_catch2_portableCollectionsSerialiserPluginFactory.dev.cc b/HeterogeneousCore/TrivialSerialisation/test/alpaka/test_catch2_portableCollectionsSerialiserPluginFactory.dev.cc index 4fdd8576e604c..3af25c5e24f71 100644 --- a/HeterogeneousCore/TrivialSerialisation/test/alpaka/test_catch2_portableCollectionsSerialiserPluginFactory.dev.cc +++ b/HeterogeneousCore/TrivialSerialisation/test/alpaka/test_catch2_portableCollectionsSerialiserPluginFactory.dev.cc @@ -3,9 +3,9 @@ #include #include +#include #include "DataFormats/AlpakaCommon/interface/alpaka/EDMetadata.h" -#include "DataFormats/Common/interface/DeviceProduct.h" #include "DataFormats/Common/interface/Wrapper.h" #include "DataFormats/Portable/interface/PortableCollection.h" #include "DataFormats/Portable/interface/PortableHostCollection.h" @@ -87,6 +87,7 @@ TEST_CASE("Test MemoryCopyTraits", "[MemoryCopyTraits]") { using PortableCollectionType = ::PortableCollection>; using DeviceProductType = detail::DeviceProductType; using PortableHostCollectionType = PortableHostCollection>; + const std::string deviceProductTypeAlias = EDM_STRINGIZE(portabletest::TestDeviceCollection); const int size = 10; for (auto const& device : devices) { @@ -148,9 +149,8 @@ TEST_CASE("Test MemoryCopyTraits", "[MemoryCopyTraits]") { static_assert(::ngt::HasMemoryCopyTraits); // Get the Serialiser plugin for this type - std::string typeName = typeid(edm::DeviceProduct).name(); std::unique_ptr serialiserSource{ - alpaka_ngt::SerialiserFactoryDevice::get()->create(typeName)}; + alpaka_ngt::SerialiserFactoryDevice::get()->create(deviceProductTypeAlias)}; // Create a "reader" and a "writer", then clone via memory regions auto reader = serialiserSource->reader(*wb_original, *metadata); @@ -188,6 +188,7 @@ TEST_CASE("Test MemoryCopyTraits", "[MemoryCopyTraits]") { SECTION("DeviceProduct") { using DeviceObjectType = alpaka_portabletest::TestDeviceObject; using DeviceProductType = detail::DeviceProductType; + const std::string deviceProductTypeAlias = EDM_STRINGIZE(portabletest::TestDeviceObject); const alpaka_portabletest::TestStruct testData{5.0, 12.0, 13.0, 42}; for (auto const& device : devices) { @@ -211,9 +212,8 @@ TEST_CASE("Test MemoryCopyTraits", "[MemoryCopyTraits]") { edm::WrapperBase const* wb = static_cast(&wrapper); // Get the serialiser plugin - std::string typeName = typeid(edm::DeviceProduct).name(); std::unique_ptr serialiser{ - alpaka_ngt::SerialiserFactoryDevice::get()->create(typeName)}; + alpaka_ngt::SerialiserFactoryDevice::get()->create(deviceProductTypeAlias)}; REQUIRE(serialiser); // Read and write @@ -254,6 +254,7 @@ TEST_CASE("Test MemoryCopyTraits", "[MemoryCopyTraits]") { using DeviceCollection2Type = alpaka_portabletest::TestDeviceCollection2; using DeviceProductType = detail::DeviceProductType; using HostCollection2Type = PortableHostCollection; + const std::string deviceProductTypeAlias = EDM_STRINGIZE(portabletest::TestDeviceCollection2); const int size1 = 7; const int size2 = 11; @@ -291,9 +292,8 @@ TEST_CASE("Test MemoryCopyTraits", "[MemoryCopyTraits]") { edm::WrapperBase const* wb = static_cast(&wrapper); // Get the serialiser plugin - std::string typeName = typeid(edm::DeviceProduct).name(); std::unique_ptr serialiser{ - alpaka_ngt::SerialiserFactoryDevice::get()->create(typeName)}; + alpaka_ngt::SerialiserFactoryDevice::get()->create(deviceProductTypeAlias)}; REQUIRE(serialiser); // Read and write diff --git a/HeterogeneousCore/TrivialSerialisation/test/testGenericClonerPortable_cfg.py b/HeterogeneousCore/TrivialSerialisation/test/testGenericClonerPortable_cfg.py new file mode 100644 index 0000000000000..adcb91fcb385e --- /dev/null +++ b/HeterogeneousCore/TrivialSerialisation/test/testGenericClonerPortable_cfg.py @@ -0,0 +1,95 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("TEST") + +process.load("FWCore.MessageService.MessageLogger_cfi") +process.MessageLogger.cerr.INFO.limit = 10000000 + +process.options.numberOfThreads = 1 +process.options.numberOfStreams = 1 + +process.source = cms.Source("EmptySource") +process.maxEvents.input = 10 + +# Produce, clone and validate a portable object, a portable collection, and some portable multicollections +process.load("Configuration.StandardSequences.Accelerators_cff") +process.load("HeterogeneousCore.AlpakaCore.ProcessAcceleratorAlpaka_cfi") + +# Produce portable objects on host (serial CPU backend) +process.producePortableObjects = cms.EDProducer("TestAlpakaProducer@alpaka", + size = cms.int32(42), + size2 = cms.int32(33), + size3 = cms.int32(61), + alpaka = cms.untracked.PSet( + backend = cms.untracked.string("serial_sync") + ) +) + +# Clone from host to host, registering the H->D transformation +process.clonePortableObjectsHtoH = cms.EDProducer("ngt::GenericClonerPortable@alpaka", + products = cms.VPSet( + cms.PSet( + src = cms.InputTag("producePortableObjects"), + type = cms.string("portabletest::TestHostObject") + ), + cms.PSet( + src = cms.InputTag("producePortableObjects"), + type = cms.string("portabletest::TestHostCollection") + ), + cms.PSet( + src = cms.InputTag("producePortableObjects"), + type = cms.string("portabletest::TestHostCollection2") + ), + cms.PSet( + src = cms.InputTag("producePortableObjects"), + type = cms.string("portabletest::TestHostCollection3") + ), + ), + verbose = cms.untracked.bool(True), + alpaka = cms.untracked.PSet( + backend = cms.untracked.string("") + ) +) + +# Clone from device to device, registering the D->H transformation +process.clonePortableObjectsDtoD = cms.EDProducer("ngt::GenericClonerPortable@alpaka", + products = cms.VPSet( + cms.PSet( + src = cms.InputTag("clonePortableObjectsHtoH"), + type = cms.string("portabletest::TestDeviceObject") + ), + cms.PSet( + src = cms.InputTag("clonePortableObjectsHtoH"), + type = cms.string("portabletest::TestDeviceCollection") + ), + cms.PSet( + src = cms.InputTag("clonePortableObjectsHtoH"), + type = cms.string("portabletest::TestDeviceCollection2") + ), + cms.PSet( + src = cms.InputTag("clonePortableObjectsHtoH"), + type = cms.string("portabletest::TestDeviceCollection3") + ), + ), + verbose = cms.untracked.bool(True), + alpaka = cms.untracked.PSet( + backend = cms.untracked.string("") + ) +) + +# Consume the products on host (via the D->H transformation registered above) +process.validatePortableCollections = cms.EDAnalyzer("TestAlpakaAnalyzer", + source = cms.InputTag("clonePortableObjectsDtoD") +) + +process.validatePortableObject = cms.EDAnalyzer("TestAlpakaObjectAnalyzer", + source = cms.InputTag("clonePortableObjectsDtoD") +) + +process.pathSoA = cms.Path( + process.producePortableObjects + + process.clonePortableObjectsHtoH + + process.clonePortableObjectsDtoD + + process.validatePortableCollections + + process.validatePortableObject +) From 495dd60178b70688a0d893abf9e471ebca734145 Mon Sep 17 00:00:00 2001 From: Mario Gonzalez Date: Thu, 7 May 2026 03:00:54 +0200 Subject: [PATCH 2/2] Add MPI Sender and Receiver modules that support arbitrary device collections --- .../SiStripDigiSoA/plugins/BuildFile.xml | 3 +- .../MPICore/interface/MPIChannel.h | 53 ++- .../MPICore/interface/metadata.h | 3 + .../MPICore/plugins/BuildFile.xml | 20 + .../plugins/alpaka/MPIReceiverPortable.cc | 417 ++++++++++++++++++ .../plugins/alpaka/MPISenderPortable.cc | 325 ++++++++++++++ HeterogeneousCore/MPICore/src/MPIChannel.cc | 44 +- HeterogeneousCore/MPICore/src/metadata.cc | 14 + HeterogeneousCore/MPICore/test/BuildFile.xml | 5 + .../MPICore/test/controller_soa_device_cfg.py | 68 +++ .../MPICore/test/follower_soa_device_cfg.py | 60 +++ 11 files changed, 1004 insertions(+), 8 deletions(-) create mode 100644 HeterogeneousCore/MPICore/plugins/alpaka/MPIReceiverPortable.cc create mode 100644 HeterogeneousCore/MPICore/plugins/alpaka/MPISenderPortable.cc create mode 100644 HeterogeneousCore/MPICore/test/controller_soa_device_cfg.py create mode 100644 HeterogeneousCore/MPICore/test/follower_soa_device_cfg.py diff --git a/DataFormats/SiStripDigiSoA/plugins/BuildFile.xml b/DataFormats/SiStripDigiSoA/plugins/BuildFile.xml index 2b10aec36c105..727538e9bcda4 100644 --- a/DataFormats/SiStripDigiSoA/plugins/BuildFile.xml +++ b/DataFormats/SiStripDigiSoA/plugins/BuildFile.xml @@ -9,4 +9,5 @@ - \ No newline at end of file + + diff --git a/HeterogeneousCore/MPICore/interface/MPIChannel.h b/HeterogeneousCore/MPICore/interface/MPIChannel.h index c5351841f45b0..3a57d83727dc7 100644 --- a/HeterogeneousCore/MPICore/interface/MPIChannel.h +++ b/HeterogeneousCore/MPICore/interface/MPIChannel.h @@ -4,9 +4,10 @@ // C++ standard library headers #include #include +#include #include #include -#include +#include // MPI headers #include @@ -15,9 +16,7 @@ #include // CMSSW headers -#include "DataFormats/Common/interface/WrapperBase.h" #include "DataFormats/Provenance/interface/ProvenanceFwd.h" -#include "FWCore/Reflection/interface/ObjectWithDict.h" #include "HeterogeneousCore/MPICore/interface/messages.h" #include "HeterogeneousCore/MPICore/interface/metadata.h" #include "HeterogeneousCore/TrivialSerialisation/interface/ReaderBase.h" @@ -113,7 +112,10 @@ class alignas(64) MPIChannel { } void sendMetadata(int instance, std::shared_ptr meta); + MPI_Request sendMetadataAsync(int instance, std::shared_ptr meta); + static void waitMetadata(MPI_Request& request); void receiveMetadata(int instance, std::shared_ptr meta); + void receiveMetadataAsync(int instance, std::shared_ptr meta); // send buffer of serialized products void sendBuffer(const void* buf, size_t size, int instance, EDM_MPI_MessageTag tag); @@ -155,12 +157,57 @@ class alignas(64) MPIChannel { // receive product buffer of known size std::unique_ptr receiveSerializedBuffer(int instance, int bufSize); + // transfer a pre-formed list of memory regions + void sendTrivialCopyProduct(int instance, std::vector> const& regions); + // transfer a wrapped object using its MemoryCopyTraits void sendTrivialCopyProduct(int instance, const ngt::ReaderBase& reader); + // send from a device reader (any type with regions()) + template + void sendTrivialCopyProduct(int instance, const Reader& reader) { + auto const& regions = reader.regions(); + sendTrivialCopyProduct(instance, regions); + } + + // wait for a batch of MPI requests to complete + static void waitAll(std::vector& requests); + // receive into wrapped object void receiveInitializedTrivialCopy(int instance, ngt::WriterBase& writer); + // receive into a device writer (any type with regions()) + template + void receiveInitializedTrivialCopy(int instance, Writer& writer) { + int tag = EDM_MPI_SendTrivialCopyProduct | instance * EDM_MPI_MessageTagWidth_; + MPI_Status status; + auto const& regions = writer.regions(); + for (size_t i = 0; i < regions.size(); ++i) { + assert(regions[i].data() != nullptr); + MPI_Recv(regions[i].data(), regions[i].size_bytes(), MPI_BYTE, dest_, tag, comm_, &status); + } + } + + // non-blocking receive into a wrapped object using its MemoryCopyTraits; + // appends one MPI_Request per memory region to the requests vector. + // The caller is responsible for keeping the underlying memory regions + // (and the writer) alive until the requests complete. + void receiveInitializedTrivialCopyAsync(int instance, ngt::WriterBase& writer, std::vector& requests); + + // non-blocking receive into a device writer (any type with regions()); + // appends one MPI_Request per region to the requests vector. + template + void receiveInitializedTrivialCopyAsync(int instance, Writer& writer, std::vector& requests) { + int tag = EDM_MPI_SendTrivialCopyProduct | instance * EDM_MPI_MessageTagWidth_; + auto const& regions = writer.regions(); + size_t base = requests.size(); + requests.resize(base + regions.size()); + for (size_t i = 0; i < regions.size(); ++i) { + assert(regions[i].data() != nullptr); + MPI_Irecv(regions[i].data(), regions[i].size_bytes(), MPI_BYTE, dest_, tag, comm_, &requests[base + i]); + } + } + private: // serialize an EDM object to a simplified representation that can be transmitted as an MPI message void edmToBuffer_(EDM_MPI_RunAuxiliary_t& buffer, edm::RunAuxiliary const& aux); diff --git a/HeterogeneousCore/MPICore/interface/metadata.h b/HeterogeneousCore/MPICore/interface/metadata.h index 49d98b7b9f625..09c2fd8d00f95 100644 --- a/HeterogeneousCore/MPICore/interface/metadata.h +++ b/HeterogeneousCore/MPICore/interface/metadata.h @@ -72,6 +72,8 @@ class ProductMetadataBuilder { // Receiver-side void receiveMetadata(int src, int tag, MPI_Comm comm); + void receiveMetadataAsync(int src, int tag, MPI_Comm comm); + void waitReceiveMetadata(); // Not memory safe for trivial copy products. // Please make sure that ProductMetadataBuilder lives longer than returned ProductMetadata @@ -113,6 +115,7 @@ class ProductMetadataBuilder { size_t capacity_; size_t size_; size_t readOffset_; + MPI_Request receiveRequest_ = MPI_REQUEST_NULL; }; #endif // HeterogeneousCore_MPICore_interface_metadata_h diff --git a/HeterogeneousCore/MPICore/plugins/BuildFile.xml b/HeterogeneousCore/MPICore/plugins/BuildFile.xml index 5034a4ea8c51c..7288cadad4d82 100644 --- a/HeterogeneousCore/MPICore/plugins/BuildFile.xml +++ b/HeterogeneousCore/MPICore/plugins/BuildFile.xml @@ -14,3 +14,23 @@ + + + + + + + + + + + + + + + + + + + + diff --git a/HeterogeneousCore/MPICore/plugins/alpaka/MPIReceiverPortable.cc b/HeterogeneousCore/MPICore/plugins/alpaka/MPIReceiverPortable.cc new file mode 100644 index 0000000000000..1ebb8b2037cd7 --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/alpaka/MPIReceiverPortable.cc @@ -0,0 +1,417 @@ +// C++ include files +#include +#include +#include +#include + +#include +#include + +// CMSSW include files +#include "DataFormats/AlpakaCommon/interface/alpaka/EDMetadata.h" +#include "DataFormats/Common/interface/PathStateToken.h" +#include "FWCore/Concurrency/interface/Async.h" +#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h" +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/WrapperBaseOrphanHandle.h" +#include "FWCore/Framework/interface/stream/EDProducer.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Reflection/interface/TypeWithDict.h" +#include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/ServiceRegistry/interface/ServiceMaker.h" +#include "FWCore/Utilities/interface/Exception.h" +#include "FWCore/Utilities/interface/InputTag.h" +#include "FWCore/Utilities/interface/TypeID.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadataSentry.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/MakerMacros.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/chooseDevice.h" +#include "HeterogeneousCore/AlpakaInterface/interface/config.h" +#include "HeterogeneousCore/MPICore/interface/MPIChannel.h" +#include "HeterogeneousCore/MPICore/interface/MPIToken.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/AnyBuffer.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserFactory.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/WriterBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/WriterBase.h" + +namespace ALPAKA_ACCELERATOR_NAMESPACE { + + // Inherit from ProducerBase. This is so we have access to the EDMetadata, + // which we need for synchronization + class MPIReceiverPortable : public ProducerBase { + public: + MPIReceiverPortable(edm::ParameterSet const& config) + : ProducerBase(config), + upstream_(consumes(config.getParameter("upstream"))), + token_(this->producesCollector().template produces()), + instance_(config.getParameter("instance")) { + // instance 0 is reserved for the MPIController / MPISource pair instance + // values greater than 255 may not fit in the MPI tag + if (instance_ < 1 or instance_ > 255) { + throw cms::Exception("InvalidValue") + << "Invalid MPIReceiverPortable instance value, please use a value between 1 and 255"; + } + + auto const& products = config.getParameter>("products"); + products_.reserve(products.size()); + for (auto const& product : products) { + auto const& type = product.getParameter("type"); + auto const& src = product.getParameter("src"); + + // Construct the instance that will be put into the event together with + // this product, and that will be used by downstream modules to consume + // this product. + // + // edmMpiSplitConfig convention = "src.label@src.instance" if both are + // set, "label" if only label is set and "instance" if only instance is + // set + std::string produceInstance; + if (src.label().empty()) { + produceInstance = src.instance(); + } else if (src.instance().empty()) { + produceInstance = src.label(); + } else { + produceInstance = src.label() + "@" + src.instance(); + } + + Entry entry; + entry.typeName = type; + + // Produce PathStateToken but do not transfer it over MPI; the path + // status is propagated through productCount (set to -1 if the path is + // inactive). + if (type == "edm::PathStateToken") { + entry.token = this->producesCollector().template produces(); + products_.emplace_back(std::move(entry)); + continue; + } + + // Lookup the right serialiser. In order of preference: + // SerialiserFactoryDevice, SerialiserFactory, ROOT Serialisation. + // + // Check 1: Look up by the device type alias as written in the config + // (e.g. "sistrip::SiStripClusterDevice"). + LogDebug("MPIReceiverPortable") << "looking for device serialiser for type \"" << type << "\""; + std::unique_ptr deviceSerialiser = ngt::SerialiserFactoryDevice::get()->tryToCreate(type); + + if (deviceSerialiser) { + edm::TypeID typeID{deviceSerialiser->productTypeID()}; + hasDeviceProducts_ = true; + + LogDebug("MPIReceiverPortable") << "found device serialiser for type \"" << type << "\""; + + if (deviceSerialiser->hasCopyToHost()) { + LogDebug("MPIReceiverPortable") << "Registering D to H transform for type \"" << type << "\""; + // Register the D to H transform + entry.token = this->produces(produceInstance) + .deviceProduces(edm::TypeID{deviceSerialiser->productTypeID()}, + edm::TypeID{deviceSerialiser->hostProductTypeID()}, + deviceSerialiser->getQueue(), + deviceSerialiser->preTransformDtoH(), + deviceSerialiser->transformDtoH()); + } else { + LogDebug("MPIReceiverPortable") << "No D to H transform found for type \"" << type << "\""; + entry.token = this->producesCollector().template produces(typeID, produceInstance); + } + entry.deviceSerialiser = std::move(deviceSerialiser); + + LogDebug("MPIReceiverPortable") << "receive device type \"" << typeID << "\" (" << type << ") for instance \"" + << produceInstance << "\" over MPI channel instance " << instance_; + + products_.emplace_back(std::move(entry)); + continue; + } + + // Check 2: "type" could be a host type alias "T" for which a host + // serialiser (and perhaps a portable serialiser for the H->D transform) + // exists. + edm::TypeWithDict twd = edm::TypeWithDict::byName(type); + std::unique_ptr portableSerialiser; + std::unique_ptr<::ngt::SerialiserBase> hostSerialiser; + LogDebug("MPIReceiverPortable") << "looking for host serialiser for type \"" << type << "\""; + if (twd.typeInfo() != typeid(void)) { + portableSerialiser = ngt::SerialiserFactoryDevice::get()->tryToCreate(twd.typeInfo().name()); + hostSerialiser = ::ngt::SerialiserFactory::get()->tryToCreate(twd.typeInfo().name()); + } + + if (hostSerialiser && twd.typeInfo() != typeid(void)) { + edm::TypeID typeID{twd.typeInfo()}; + LogDebug("MPIReceiverPortable") << "found host serialiser for type \"" << type << "\""; + + if (portableSerialiser && portableSerialiser->hasCopyToDevice()) { + LogDebug("MPIReceiverPortable") << "Registering H to D transform for type \"" << type << "\""; + // Register the H to D transform + entry.token = this->produces(produceInstance) + .produces(edm::TypeID{portableSerialiser->productTypeID()}, + edm::TypeID{portableSerialiser->hostProductTypeID()}, + portableSerialiser->preTransformHtoD(), + portableSerialiser->transformHtoD()); + } else { + LogDebug("MPIReceiverPortable") << "No H to D transform found for type \"" << type << "\""; + entry.token = this->producesCollector().template produces(typeID, produceInstance); + } + entry.hostSerialiser = std::move(hostSerialiser); + + LogDebug("MPIReceiverPortable") << "receive host type \"" << typeID << "\" (" << type << ") for instance \"" + << produceInstance << "\" over MPI channel instance " << instance_; + + products_.emplace_back(std::move(entry)); + continue; + } + + // Check 3: Fall back to ROOT serialisation, if a ROOT dictionary is + // found for this type + edm::TypeWithDict wrappedTwd = edm::TypeWithDict::byName("edm::Wrapper<" + type + ">"); + LogDebug("MPIReceiverPortable") << "looking for ROOT serialisation of type \"" << type + << "\" (wrapper resolved: " << wrappedTwd.typeInfo().name() << ")"; + if (twd.typeInfo() == typeid(void) || !wrappedTwd.getClass()) { + throw cms::Exception("MPIReceiverPortable") + << "No serialisation mechanism (device or host TrivialSerialisation, or ROOT dictionaries) found for " + "type '" + << type + << "'. Either register a serialiser via DEFINE_TRIVIAL_SERIALISER_PLUGIN or " + "DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN, or make sure a ROOT dictionary exists for this type."; + } + edm::TypeID typeID{twd.typeInfo()}; + entry.token = this->producesCollector().template produces(typeID, produceInstance); + entry.wrappedType = wrappedTwd; + + LogDebug("MPIReceiverPortable") << "found ROOT dictionary for type \"" << type << "\""; + LogDebug("MPIReceiverPortable") << "receive ROOT type \"" << typeID << "\" (" << type << ") for instance \"" + << produceInstance << "\" over MPI channel instance " << instance_; + + products_.emplace_back(std::move(entry)); + } + } + + void acquire(edm::Event const& event, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) final { + // reset the metadata that could have been left behind by a previous event + metadata_.reset(); + if (hasDeviceProducts_) { + metadata_ = std::make_shared(detail::chooseDevice(event.streamID())); + } + + const MPIToken& token = event.get(upstream_); + + receivedProductMetadata_ = std::make_shared(); + receivedWrappers_.resize(products_.size()); + asyncWorkLaunched_ = false; + + edm::Service as; + as->runAsync( + std::move(holder), + [this, token]() { + token.channel()->receiveMetadata(instance_, receivedProductMetadata_); +#ifdef EDM_ML_DEBUG + receivedProductMetadata_->debugPrintMetadataSummary(); +#endif + + if (receivedProductMetadata_->productCount() == -1) { + return; + } + + std::unique_ptr serialized_buffer; + if (receivedProductMetadata_->hasSerialized()) { + serialized_buffer = + token.channel()->receiveSerializedBuffer(instance_, receivedProductMetadata_->serializedBufferSize()); + } + + struct PendingDeviceWriter { + size_t index; + std::unique_ptr writer; + }; + struct PendingHostWriter { + size_t index; + std::unique_ptr<::ngt::WriterBase> writer; + }; + + std::vector requests; + std::vector pendingDeviceWriters; + std::vector pendingHostWriters; + + for (size_t i = 0; i < products_.size(); ++i) { + auto const& entry = products_[i]; + + // PathStateToken is not transferred; it is handled in produce(). + if (entry.typeName == "edm::PathStateToken") { + continue; + } + + auto product_meta = receivedProductMetadata_->getNext(); + + if (product_meta.kind == ProductMetadata::Kind::Missing) { + continue; + } + + if (product_meta.kind == ProductMetadata::Kind::Serialized) { + if (!serialized_buffer) { + throw cms::Exception("MPIReceiverPortable") + << "Received a Serialized product kind for '" << entry.typeName + << "' but no serialized buffer was received."; + } + TClass* cls = entry.wrappedType.getClass(); + if (!cls) { + throw cms::Exception("MPIReceiverPortable") + << "Failed to get TClass for ROOT product '" << entry.typeName << "'."; + } + auto wrapper = std::unique_ptr(reinterpret_cast(cls->New())); + cls->Streamer(wrapper.get(), *serialized_buffer); + receivedWrappers_[i] = std::move(wrapper); + continue; + } + + if (product_meta.kind != ProductMetadata::Kind::TrivialCopy) { + throw cms::Exception("MPIReceiverPortable") + << "Unexpected product metadata kind for product '" << entry.typeName << "'."; + } + + // At this point, all remaining products should be of type + // ProductMetadata::Kind::TrivialCopy, and thus a serialiser (host + // or device) should exist for them. + + if (entry.deviceSerialiser) { + auto writer = entry.deviceSerialiser->writer(); + ::ngt::AnyBuffer buffer = writer->uninitialized_parameters(); + if (buffer.size_bytes() != product_meta.sizeMeta) { + throw cms::Exception("MPIReceiverPortable") + << "Buffer size mismatch for device product '" << entry.typeName << "': deviceSerialiser expects " + << buffer.size_bytes() << " bytes of metadata, but sender sent " << product_meta.sizeMeta + << " bytes."; + } + std::memcpy(buffer.data(), product_meta.trivialCopyOffset, product_meta.sizeMeta); + + writer->initialize(metadata_->queue(), buffer); + asyncWorkLaunched_ = true; + token.channel()->receiveInitializedTrivialCopyAsync(instance_, *writer, requests); + pendingDeviceWriters.push_back({i, std::move(writer)}); + } else { + // Host path: allocate host buffer, then post a non-blocking receive. + auto writer = entry.hostSerialiser->writer(); + ::ngt::AnyBuffer buffer = writer->uninitialized_parameters(); + if (buffer.size_bytes() != product_meta.sizeMeta) { + throw cms::Exception("MPIReceiverPortable") + << "Buffer size mismatch for host product '" << entry.typeName << "': Serialiser expects " + << buffer.size_bytes() << " bytes of metadata, but sender sent " << product_meta.sizeMeta + << " bytes."; + } + std::memcpy(buffer.data(), product_meta.trivialCopyOffset, product_meta.sizeMeta); + + writer->initialize(buffer); + token.channel()->receiveInitializedTrivialCopyAsync(instance_, *writer, requests); + pendingHostWriters.push_back({i, std::move(writer)}); + } + } + + // Wait for all non-blocking receives to complete. + MPIChannel::waitAll(requests); + + for (auto& pending : pendingDeviceWriters) { + pending.writer->finalize(); + receivedWrappers_[pending.index] = pending.writer->get(metadata_); + } + for (auto& pending : pendingHostWriters) { + pending.writer->finalize(); + receivedWrappers_[pending.index] = pending.writer->get(); + } + }, + []() { return "Calling MPIReceiverPortable::acquire()"; }); + } + + void produce(edm::Event& event, edm::EventSetup const&) final { + std::unique_ptr sentry; + if (metadata_) { + sentry = std::make_unique(std::move(metadata_), this->synchronize()); + } + + MPIToken token = event.get(upstream_); + + if (receivedProductMetadata_->productCount() == -1) { + event.emplace(token_, token); + this->putBackend(event); + if (sentry) { + sentry->finish(false); + } + return; + } + + for (size_t i = 0; i < products_.size(); ++i) { + auto const& entry = products_[i]; + + if (entry.typeName == "edm::PathStateToken") { + // Put a fresh PathStateToken into the event, since the one created + // remotely was not transferred. + event.put(entry.token, std::make_unique()); + continue; + } + + if (!receivedWrappers_[i]) { + edm::LogWarning("MPIReceiverPortable") << "Product " << entry.typeName << " was not received."; + continue; + } + + event.put(entry.token, std::move(receivedWrappers_[i])); + } + + event.emplace(token_, token); + this->putBackend(event); + if (sentry) { + sentry->finish(asyncWorkLaunched_); + } + } + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + descriptions.setComment( + "This module can receive arbitrary device or host event products from an " + "\"MPISenderPortable\" module in a separate CMSSW job, and produce them into the event."); + + edm::ParameterSetDescription product; + product.add("type")->setComment( + "Type alias of the device product without the ALPAKA_ACCELERATOR_NAMESPACE prefix " + "(e.g. \"sistrip::SiStripClusterDevice\"). For host and ROOT products, the plain C++ type name."); + product.add("src", edm::InputTag{})->setComment("InputTag identifying the product to produce. "); + + edm::ParameterSetDescription desc; + desc.add("upstream", {"source"}) + ->setComment( + "MPI communication channel. Can be an \"MPIController\", \"MPISource\", or " + "\"MPISenderPortable\"/\"MPIReceiverPortable\"."); + desc.addVPSet("products", product, {}) + ->setComment("Host or device products to be received from a separate CMSSW job."); + desc.add("instance", 0) + ->setComment( + "A value between 1 and 255 used to identify a matching pair of " + "\"MPISenderPortable\"/\"MPIReceiverPortable\"."); + + descriptions.addWithDefaultLabel(desc); + } + + private: + struct Entry { + std::string typeName; // type name from config (for PathStateToken check and logging) + edm::EDPutToken token; + std::unique_ptr deviceSerialiser; + std::unique_ptr<::ngt::SerialiserBase> hostSerialiser; + edm::TypeWithDict wrappedType; + }; + + edm::EDGetTokenT const upstream_; + edm::EDPutTokenT const token_; + std::vector products_; + int32_t const instance_; + bool hasDeviceProducts_ = false; + + std::shared_ptr receivedProductMetadata_; + std::vector> receivedWrappers_; + bool asyncWorkLaunched_ = false; + std::shared_ptr metadata_; + }; + +} // namespace ALPAKA_ACCELERATOR_NAMESPACE + +DEFINE_FWK_ALPAKA_MODULE(MPIReceiverPortable); diff --git a/HeterogeneousCore/MPICore/plugins/alpaka/MPISenderPortable.cc b/HeterogeneousCore/MPICore/plugins/alpaka/MPISenderPortable.cc new file mode 100644 index 0000000000000..c801569a0cf2d --- /dev/null +++ b/HeterogeneousCore/MPICore/plugins/alpaka/MPISenderPortable.cc @@ -0,0 +1,325 @@ +// C++ include files +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +// CMSSW include files +#include "DataFormats/AlpakaCommon/interface/alpaka/EDMetadata.h" +#include "DataFormats/Common/interface/PathStateToken.h" +#include "FWCore/Concurrency/interface/Async.h" +#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h" +#include "FWCore/Framework/interface/Event.h" +#include "FWCore/Framework/interface/WrapperBaseHandle.h" +#include "FWCore/Framework/interface/stream/EDProducer.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Reflection/interface/TypeWithDict.h" +#include "FWCore/ServiceRegistry/interface/Service.h" +#include "FWCore/ServiceRegistry/interface/ServiceMaker.h" +#include "FWCore/Utilities/interface/Exception.h" +#include "FWCore/Utilities/interface/InputTag.h" +#include "FWCore/Utilities/interface/TypeID.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/MakerMacros.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ProducerBase.h" +#include "HeterogeneousCore/AlpakaCore/interface/alpaka/chooseDevice.h" +#include "HeterogeneousCore/AlpakaInterface/interface/config.h" +#include "HeterogeneousCore/MPICore/interface/MPIChannel.h" +#include "HeterogeneousCore/MPICore/interface/MPIToken.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/AnyBuffer.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/ReaderBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/SerialiserFactory.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/ReaderBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserBase.h" +#include "HeterogeneousCore/TrivialSerialisation/interface/alpaka/SerialiserFactoryDevice.h" + +namespace ALPAKA_ACCELERATOR_NAMESPACE { + + // Inherit from ProducerBase. This is so we have access to the EDMetadata, + // which we need for synchronization + class MPISenderPortable : public ProducerBase { + public: + MPISenderPortable(edm::ParameterSet const& config) + : ProducerBase(config), + upstream_(consumes(config.getParameter("upstream"))), + token_(this->producesCollector().template produces()), + instance_(config.getParameter("instance")) { + // instance 0 is reserved for the MPIController / MPISource pair instance + // values greater than 255 may not fit in the MPI tag + if (instance_ < 1 or instance_ > 255) { + throw cms::Exception("InvalidValue") + << "Invalid MPISenderPortable instance value, please use a value between 1 and 255"; + } + + auto const& products = config.getParameter>("products"); + products_.reserve(products.size()); + for (auto const& product : products) { + auto const& type = product.getParameter("type"); + auto const& src = product.getParameter("src"); + + Entry entry; + entry.typeName = type; + + // PathStateToken is not transferred over MPI; the path status is + // propagated through productCount, which will be set to -1 if the path + // is inactive. + if (type == "edm::PathStateToken") { + entry.typeID = edm::TypeID(typeid(edm::PathStateToken)); + entry.token = this->consumes(edm::TypeToGet{entry.typeID, edm::PRODUCT_TYPE}, src); + products_.emplace_back(std::move(entry)); + continue; + } + + // Lookup the right serialiser. In order of preference: + // SerialiserFactoryDevice, SerialiserFactory, ROOT Serialisation. + // + // Check 1: Look up by the device type alias as written in the config + // (e.g. "sistrip::SiStripClusterDevice"). + LogDebug("MPISenderPortable") << "looking for device serialiser for type \"" << type << "\""; + std::unique_ptr deviceSerialiser = ngt::SerialiserFactoryDevice::get()->tryToCreate(type); + + if (deviceSerialiser) { + LogDebug("MPISenderPortable") << "found device serialiser for type \"" << type << "\""; + // Get the edm::TypeID type from the serialiser, which we will later + // need to construct the edm::Handle where we will put the product we get + // from the event. This typeID is the product type wrapped in + // edm::DeviceProduct, and includes the alpaka device. + edm::TypeID typeID{deviceSerialiser->productTypeID()}; + hasDeviceProducts_ = true; + entry.typeID = typeID; + entry.token = this->consumes(edm::TypeToGet{typeID, edm::PRODUCT_TYPE}, src); + entry.deviceSerialiser = std::move(deviceSerialiser); + + LogDebug("MPISenderPortable") << "send device type \"" << typeID << "\" (" << type << "), label \"" + << src.label() << "\" instance \"" << src.instance() + << "\" over MPI channel instance " << instance_; + + products_.emplace_back(std::move(entry)); + continue; + } + + // Check 2: Lookup a host serialiser registered in the host + // SerialiserFactory. + edm::TypeWithDict twd = edm::TypeWithDict::byName(type); + std::unique_ptr<::ngt::SerialiserBase> hostSerialiser; + + LogDebug("MPISenderPortable") << "looking for host serialiser for type \"" << type << "\""; + if (twd.typeInfo() != typeid(void)) { + hostSerialiser = ::ngt::SerialiserFactory::get()->tryToCreate(twd.typeInfo().name()); + } + if (hostSerialiser) { + LogDebug("MPISenderPortable") << "found host serialiser for type \"" << type << "\""; + entry.typeID = edm::TypeID{twd.typeInfo()}; + entry.token = this->consumes(edm::TypeToGet{entry.typeID, edm::PRODUCT_TYPE}, src); + entry.hostSerialiser = std::move(hostSerialiser); + + LogDebug("MPISenderPortable") << "send host type \"" << entry.typeID << "\" (" << type << "), label \"" + << src.label() << "\" instance \"" << src.instance() + << "\" over MPI channel instance " << instance_; + + products_.emplace_back(std::move(entry)); + continue; + } + + // Check 3: Fall back to ROOT serialisation, if a ROOT dictionary is + // found for this type + edm::TypeWithDict wrappedTwd = edm::TypeWithDict::byName("edm::Wrapper<" + type + ">"); + LogDebug("MPISenderPortable") << "looking for ROOT serialisation of type \"" << type << "\""; + if (twd.typeInfo() == typeid(void) || !wrappedTwd.getClass()) { + throw cms::Exception("MPISenderPortable") + << "No serialisation mechanism (device or host TrivialSerialisation, or ROOT dictionaries) found for " + "type '" + << type + << "'. Either register a serialiser via DEFINE_TRIVIAL_SERIALISER_PLUGIN or " + "DEFINE_TRIVIAL_SERIALISER_PORTABLE_PLUGIN, or make sure a ROOT dictionary exists for this type."; + } + LogDebug("MPISenderPortable") << "found ROOT dictionary for type \"" << type << "\""; + + entry.typeID = edm::TypeID{twd.typeInfo()}; + entry.token = this->consumes(edm::TypeToGet{entry.typeID, edm::PRODUCT_TYPE}, src); + entry.wrappedType = wrappedTwd; + + LogDebug("MPISenderPortable") << "send ROOT type \"" << entry.typeID << "\" (" << type << "), label \"" + << src.label() << "\" instance \"" << src.instance() + << "\" over MPI channel instance " << instance_; + + products_.emplace_back(std::move(entry)); + } + + LogDebug("MPISenderPortable") << "configured to send " << products_.size() + << " products over MPI channel instance " << instance_; + } + + void acquire(edm::Event const& event, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) final { + MPIToken const& token = event.get(upstream_); + + size_t productCount = 0; + for (auto const& entry : products_) + if (entry.typeName != "edm::PathStateToken") + ++productCount; + + auto productMetadata = std::make_shared(productCount); + bool isActive = true; + + struct DataToBeSent { + using Regions = std::vector>; + std::vector pendingRegions; + // Anything we pass to runAsync needs to be copyable, so we wrap + // rootBuffer in this struct and pass to runAsync the + // std::make_shared() below. + std::unique_ptr rootBuffer; + }; + auto toBeSent = std::make_shared(); + toBeSent->pendingRegions.reserve(productCount); + + // The EDMetadata the device serialisers need to access a device product T + // from its wrapped form. + std::shared_ptr deviceMetadata; + if (hasDeviceProducts_) { + deviceMetadata = std::make_shared(detail::chooseDevice(event.streamID())); + } + + for (auto const& entry : products_) { + edm::Handle handle(entry.typeID.typeInfo()); + event.getByToken(entry.token, handle); + + if (not handle.isValid() and entry.typeName == "edm::PathStateToken") { + productMetadata->setProductCount(-1); + isActive = false; + break; + } + if (entry.typeName == "edm::PathStateToken") + continue; + + if (handle.isValid()) { + edm::WrapperBase const* wrapper = handle.product(); + // extract memory regions + if (entry.deviceSerialiser) { + // If the product is on device + auto reader = entry.deviceSerialiser->reader(*wrapper, *deviceMetadata); + ::ngt::AnyBuffer buffer = reader->parameters(); + productMetadata->addTrivialCopy(buffer.data(), buffer.size_bytes()); + toBeSent->pendingRegions.push_back(reader->regions()); + } else if (entry.hostSerialiser) { + // If the product is on host and we have a serialiser for it + auto reader = entry.hostSerialiser->reader(*wrapper); + ::ngt::AnyBuffer buffer = reader->parameters(); + productMetadata->addTrivialCopy(buffer.data(), buffer.size_bytes()); + toBeSent->pendingRegions.push_back(reader->regions()); + } else { + // If the product is serialised via ROOT + TClass* cls = entry.wrappedType.getClass(); + if (!cls) + throw cms::Exception("MPISenderPortable") << "Failed to get TClass for type: " << entry.typeName; + if (!toBeSent->rootBuffer) + toBeSent->rootBuffer = std::make_unique(TBuffer::kWrite); + size_t prevLen = toBeSent->rootBuffer->Length(); + cls->Streamer(const_cast(static_cast(wrapper)), *toBeSent->rootBuffer); + productMetadata->addSerialized(toBeSent->rootBuffer->Length() - prevLen); + } + } else { + productMetadata->addMissing(); + } + } + + // Send metadata immediately; wait for its completion in runAsync before + // sending product data. productMetadata will be passed to runAsync so it + // is kept alive until sendMetadataAsync completes. + auto productMetadataRequest = + std::make_shared(token.channel()->sendMetadataAsync(instance_, productMetadata)); + + // Lambda that waits for device work and the metadata send, then sends + // all data products, to be passed to runAsync. + auto sendData = + [token, instance = instance_, productMetadata, productMetadataRequest, toBeSent, isActive, deviceMetadata]() { + if (deviceMetadata) { + alpaka::wait(deviceMetadata->queue()); + } + MPIChannel::waitMetadata(*productMetadataRequest); + if (isActive) { + if (toBeSent->rootBuffer) + token.channel()->sendBuffer(toBeSent->rootBuffer->Buffer(), + toBeSent->rootBuffer->Length(), + instance, + EDM_MPI_SendSerializedProduct); + for (auto const& regions : toBeSent->pendingRegions) + token.channel()->sendTrivialCopyProduct(instance, regions); + } + }; + + edm::Service asyncService; + asyncService->runAsync( + std::move(holder), std::move(sendData), []() { return "Calling MPISenderPortable::acquire()"; }); + } + + void produce(edm::Event& event, edm::EventSetup const&) final { + // write a shallow copy of the channel to the output, so other modules can + // consume it to indicate that they should run after this + MPIToken token = event.get(upstream_); + event.emplace(token_, token); + + this->putBackend(event); + } + + static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + descriptions.setComment( + "This module can consume arbitrary host or device event products and copy them " + "to an \"MPIReceiverPortable\" module in a separate CMSSW job. " + "Products are serialised using the device serialiser (if available), the host " + "serialiser (if available), or ROOT as a fallback."); + + edm::ParameterSetDescription product; + product.add("type")->setComment( + "Type alias of the device product without the ALPAKA_ACCELERATOR_NAMESPACE prefix " + "(e.g. \"sistrip::SiStripClusterDevice\"). For host and ROOT products, the plain C++ type name."); + product.add("src")->setComment( + "InputTag identifying the product to consume: label is the producer module label, " + "instance is the product instance name."); + + edm::ParameterSetDescription desc; + desc.add("upstream", {"source"}) + ->setComment( + "MPI communication channel. Can be an \"MPIController\", \"MPISource\", or " + "\"MPISenderPortable\"/\"MPIReceiverPortable\". Passing an \"MPIController\" or \"MPISource\" " + "only identifies the pair of local and remote applications. Passing a sender or receiver " + "in addition imposes a scheduling dependency."); + desc.addVPSet("products", product, {}) + ->setComment("Host or device products to be consumed and copied over to a separate CMSSW job."); + desc.add("instance", 0) + ->setComment( + "A value between 1 and 255 used to identify a matching pair of " + "\"MPISenderPortable\"/\"MPIReceiverPortable\"."); + + descriptions.addWithDefaultLabel(desc); + } + + private: + struct Entry { + std::string typeName; // type name as written in the config + edm::TypeID typeID; + edm::EDGetToken token; + + std::unique_ptr<::ngt::SerialiserBase> hostSerialiser; + std::unique_ptr deviceSerialiser; + edm::TypeWithDict wrappedType; + }; + + edm::EDGetTokenT const upstream_; + edm::EDPutTokenT const token_; + std::vector products_; + int32_t const instance_; + bool hasDeviceProducts_ = false; + }; + +} // namespace ALPAKA_ACCELERATOR_NAMESPACE + +DEFINE_FWK_ALPAKA_MODULE(MPISenderPortable); diff --git a/HeterogeneousCore/MPICore/src/MPIChannel.cc b/HeterogeneousCore/MPICore/src/MPIChannel.cc index 93d4ad73f277f..c2124aee3eff7 100644 --- a/HeterogeneousCore/MPICore/src/MPIChannel.cc +++ b/HeterogeneousCore/MPICore/src/MPIChannel.cc @@ -14,7 +14,6 @@ #include "DataFormats/Provenance/interface/RunAuxiliary.h" #include "FWCore/MessageLogger/interface/MessageLogger.h" #include "HeterogeneousCore/MPICore/interface/MPIChannel.h" -#include "HeterogeneousCore/MPICore/interface/conversion.h" #include "HeterogeneousCore/MPICore/interface/messages.h" #include "HeterogeneousCore/TrivialSerialisation/interface/ReaderBase.h" #include "HeterogeneousCore/TrivialSerialisation/interface/WriterBase.h" @@ -252,11 +251,25 @@ void MPIChannel::sendMetadata(int instance, std::shared_ptrdata(), meta->size(), MPI_BYTE, dest_, tag, comm_); } +MPI_Request MPIChannel::sendMetadataAsync(int instance, std::shared_ptr meta) { + int tag = EDM_MPI_SendMetadata | instance * EDM_MPI_MessageTagWidth_; + MPI_Request request; + MPI_Isend(meta->data(), meta->size(), MPI_BYTE, dest_, tag, comm_, &request); + return request; +} + +void MPIChannel::waitMetadata(MPI_Request& request) { MPI_Wait(&request, MPI_STATUS_IGNORE); } + void MPIChannel::receiveMetadata(int instance, std::shared_ptr meta) { int tag = EDM_MPI_SendMetadata | instance * EDM_MPI_MessageTagWidth_; meta->receiveMetadata(dest_, tag, comm_); } +void MPIChannel::receiveMetadataAsync(int instance, std::shared_ptr meta) { + int tag = EDM_MPI_SendMetadata | instance * EDM_MPI_MessageTagWidth_; + meta->receiveMetadataAsync(dest_, tag, comm_); +} + void MPIChannel::sendBuffer(const void* buf, size_t size, int instance, EDM_MPI_MessageTag tag) { int commtag = tag | instance * EDM_MPI_MessageTagWidth_; MPI_Send(buf, size, MPI_BYTE, dest_, commtag, comm_); @@ -295,10 +308,8 @@ void MPIChannel::receiveSerializedProduct_(int instance, TClass const* type, voi type->Streamer(product, buffer); } -void MPIChannel::sendTrivialCopyProduct(int instance, const ngt::ReaderBase& reader) { +void MPIChannel::sendTrivialCopyProduct(int instance, std::vector> const& regions) { int tag = EDM_MPI_SendTrivialCopyProduct | instance * EDM_MPI_MessageTagWidth_; - // transfer the memory regions - auto regions = reader.regions(); // TODO send the number of regions ? for (size_t i = 0; i < regions.size(); ++i) { assert(regions[i].data() != nullptr); @@ -306,6 +317,18 @@ void MPIChannel::sendTrivialCopyProduct(int instance, const ngt::ReaderBase& rea } } +void MPIChannel::sendTrivialCopyProduct(int instance, const ngt::ReaderBase& reader) { + auto regions = reader.regions(); + sendTrivialCopyProduct(instance, regions); +} + +void MPIChannel::waitAll(std::vector& requests) { + if (requests.empty()) { + return; + } + MPI_Waitall(requests.size(), requests.data(), MPI_STATUSES_IGNORE); +} + void MPIChannel::receiveInitializedTrivialCopy(int instance, ngt::WriterBase& writer) { int tag = EDM_MPI_SendTrivialCopyProduct | instance * EDM_MPI_MessageTagWidth_; MPI_Status status; @@ -317,3 +340,16 @@ void MPIChannel::receiveInitializedTrivialCopy(int instance, ngt::WriterBase& wr MPI_Recv(regions[i].data(), regions[i].size_bytes(), MPI_BYTE, dest_, tag, comm_, &status); } } + +void MPIChannel::receiveInitializedTrivialCopyAsync(int instance, + ngt::WriterBase& writer, + std::vector& requests) { + int tag = EDM_MPI_SendTrivialCopyProduct | instance * EDM_MPI_MessageTagWidth_; + auto regions = writer.regions(); + size_t base = requests.size(); + requests.resize(base + regions.size()); + for (size_t i = 0; i < regions.size(); ++i) { + assert(regions[i].data() != nullptr); + MPI_Irecv(regions[i].data(), regions[i].size_bytes(), MPI_BYTE, dest_, tag, comm_, &requests[base + i]); + } +} diff --git a/HeterogeneousCore/MPICore/src/metadata.cc b/HeterogeneousCore/MPICore/src/metadata.cc index 0820d8bfda119..23316861115f7 100644 --- a/HeterogeneousCore/MPICore/src/metadata.cc +++ b/HeterogeneousCore/MPICore/src/metadata.cc @@ -113,6 +113,20 @@ void ProductMetadataBuilder::receiveMetadata(int src, int tag, MPI_Comm comm) { readOffset_ = sizeof(MetadataHeader); } +void ProductMetadataBuilder::receiveMetadataAsync(int src, int tag, MPI_Comm comm) { + MPI_Irecv(buffer_, maxMetadataSize_, MPI_BYTE, src, tag, comm, &receiveRequest_); +} + +void ProductMetadataBuilder::waitReceiveMetadata() { + MPI_Status status; + MPI_Wait(&receiveRequest_, &status); + int receivedBytes = 0; + MPI_Get_count(&status, MPI_BYTE, &receivedBytes); + assert(static_cast(receivedBytes) >= sizeof(MetadataHeader) && "received metadata was less than header size"); + size_ = receivedBytes; + readOffset_ = sizeof(MetadataHeader); +} + ProductMetadata ProductMetadataBuilder::getNext() { if (readOffset_ >= size_) throw std::out_of_range("No more metadata entries"); diff --git a/HeterogeneousCore/MPICore/test/BuildFile.xml b/HeterogeneousCore/MPICore/test/BuildFile.xml index dd74613cdb5f4..c6720461f0f93 100644 --- a/HeterogeneousCore/MPICore/test/BuildFile.xml +++ b/HeterogeneousCore/MPICore/test/BuildFile.xml @@ -32,6 +32,11 @@ command="testMPICommWorld.sh ${CMSSW_BASE}/src/HeterogeneousCore/MPICore/test/controller_soa_cfg.py ${CMSSW_BASE}/src/HeterogeneousCore/MPICore/test/follower_soa_cfg.py" /> + +