diff --git a/HeterogeneousCore/MPICore/plugins/MPIController.cc b/HeterogeneousCore/MPICore/plugins/MPIController.cc index 12c32714844fb..c0c74c3d78ab9 100644 --- a/HeterogeneousCore/MPICore/plugins/MPIController.cc +++ b/HeterogeneousCore/MPICore/plugins/MPIController.cc @@ -18,6 +18,7 @@ #include "FWCore/Framework/interface/MakerMacros.h" #include "FWCore/Framework/interface/Run.h" #include "FWCore/Framework/interface/one/EDProducer.h" +#include "FWCore/Framework/interface/TriggerNamesService.h" #include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" #include "FWCore/ParameterSet/interface/EmptyGroupDescription.h" @@ -99,34 +100,34 @@ MPIController::MPIController(edm::ParameterSet const& config) int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); - // Determine the rank of the other process. - auto followers = config.getUntrackedParameter>("followers"); - if (followers.empty()) { - // When there are only two proccesses, we can assume the ranks to be 0 and 1, - // and we can infer the other process rank from our own. - if (size == 2) { - followers = {1 - rank}; - } else { - throw edm::Exception(edm::errors::Configuration) - << "An empty list of remote processes is valid only where there are exactly two processes."; - } + edm::LogInfo("MPI") << "MPIController sees world size " << size; + + // Determine the ranks of the follower processes. + auto follower_name = config.getParameter("followerProcessName"); + if (follower_name.empty()) { + throw edm::Exception(edm::errors::Configuration) + << "ERROR: Follower process name cannot be empty. Aborting MPIController..."; } - if (followers.size() >= static_cast(size)) { + + edm::Service tns; + std::string const& this_process_name = tns->getProcessName(); + if (follower_name == this_process_name) { throw edm::Exception(edm::errors::Configuration) - << "The number of remote processes is invalid. Please specify at most " << size - 1 << "remote processes."; + << "ERROR: controller and follower processes cannot have the same name. Aborting MPIController..."; } - std::vector invalid; - for (int follower : followers) { - if (follower < 0 or follower >= size) { - invalid.push_back(follower); - } + + edm::Service mpiservice; + auto followers = mpiservice->getRanksByProcessName(follower_name); + if (followers.empty()) { + throw edm::Exception(edm::errors::Configuration) + << "ERROR: No follower process with name " << follower_name << " found. Aborting..."; } - if (invalid.size() == 1) { + + if (followers.size() == static_cast(size)) { throw edm::Exception(edm::errors::Configuration) - << fmt::format("The remote process {} is invalid. Valid ranks are 0 to {}.", invalid.front(), size - 1); - } else if (invalid.size() > 1) { - throw edm::Exception(edm::errors::Configuration) << fmt::format( - "The remote processes {} are invalid. Valid ranks are 0 to {}.", fmt::join(invalid, ", "), size - 1); + << "The number of found followers equals to the world size. " + << "Possible reason could be process names' hash collision. " + << "Please check process names in follower and controller. Aborting..."; } for (int follower : followers) { @@ -185,7 +186,9 @@ MPIController::~MPIController() { // Disconnect the per-stream communicators. for (auto& stream : streams_) { // TODO move this to end stream - stream->reset(); + if (stream) { + stream->reset(); + } } // Close the intercommunicator. @@ -352,15 +355,12 @@ void MPIController::fillDescriptions(edm::ConfigurationDescriptions& description desc.ifValue( edm::ParameterDescription("mode", "CommWorld", false), ModeDescription[kCommWorld] >> - edm::ParameterDescription>( - "followers", - {}, - false, - edm::Comment("Ranks of the remote \"follower\" processes.\n" - "When there are two or more follower processes, framework streams are associated to " - "each follower in a round-robin fashion.\n" - "When there is only one remote process, pass an empty list to autodetect its rank " - "based on the rank of the current process.")) or + edm::ParameterDescription( + "followerProcessName", + "", + true, + edm::Comment("All processes with this process name should act as followers, " + "and should be configured with an MPISource that follows this controller.")) or ModeDescription[kIntercommunicator] >> edm::ParameterDescription("name", "server", false)) ->setComment( "Valid modes are CommWorld (use MPI_COMM_WORLD) and Intercommunicator (use an MPI name server to setup an " diff --git a/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc b/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc index 556cc7aed52c6..f9c8bbe07edb3 100644 --- a/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc +++ b/HeterogeneousCore/MPICore/plugins/MPIReceiver.cc @@ -64,6 +64,8 @@ class MPIReceiver : public edm::stream::EDProducer { products_.emplace_back(std::move(entry)); } + + received_wrappers_.resize(products_.size()); } void acquire(edm::Event const& event, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) final { @@ -75,17 +77,87 @@ class MPIReceiver : public edm::stream::EDProducer { edm::Service as; as->runAsync( std::move(holder), - [this, token]() { token.channel()->receiveMetadata(instance_, received_meta_); }, + [this, token]() { + token.channel()->receiveMetadata(instance_, received_meta_); +#ifdef EDM_ML_DEBUG + // dump the summary of metadata + received_meta_->debugPrintMetadataSummary(); +#endif + + // if filter was false before the sender, receive nothing + if (received_meta_->productCount() == -1) { + return; + } + + std::unique_ptr serialized_buffer; + if (received_meta_->hasSerialized()) { + serialized_buffer = + token.channel()->receiveSerializedBuffer(instance_, received_meta_->serializedBufferSize()); +#ifdef EDM_ML_DEBUG + { + edm::LogSystem msg("MPIReceiver"); + msg << "Received serialised product:\n"; + for (int i = 0; i < received_meta_->serializedBufferSize(); ++i) { + msg << "0x" << std::hex << std::setw(2) << std::setfill('0') + << (unsigned int)(unsigned char)serialized_buffer->Buffer()[i] << (i % 16 == 15 ? '\n' : ' '); + } + } +#endif + } + + for (size_t i = 0; i < products_.size(); ++i) { + auto product_meta = received_meta_->getNext(); + if (product_meta.kind == ProductMetadata::Kind::Missing) { + continue; + } + + auto const& entry = products_[i]; + + if (product_meta.kind == ProductMetadata::Kind::Serialized) { + std::unique_ptr wrapper( + reinterpret_cast(entry.wrappedType.getClass()->New())); + assert(static_cast(serialized_buffer->Length() + product_meta.sizeMeta) <= + received_meta_->serializedBufferSize() && + "serialized data buffer is shorter than expected"); + entry.wrappedType.getClass()->Streamer(wrapper.get(), *serialized_buffer); + received_wrappers_[i] = std::move(wrapper); + } + + else if (product_meta.kind == ProductMetadata::Kind::TrivialCopy) { + if (not enableTrivialSerialisation_) { + edm::LogError("MPIReceiver") + << "Received a trivially-serialised product, but enableTrivialSerialisation is set to false in " + "this MPIReceiver. Please check that the MPISender and MPIReceiver have consistent settings."; + MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); + } + std::unique_ptr serialiser = + ngt::SerialiserFactory::get()->tryToCreate(entry.type.typeInfo().name()); + if (not serialiser) { + throw cms::Exception("SerializerError") + << "Receiver could not retrieve its serializer when it was expected"; + } + auto writer = serialiser->writer(); + ngt::AnyBuffer buffer = writer->uninitialized_parameters(); // constructs buffer with typeid + assert(buffer.size_bytes() == product_meta.sizeMeta); + std::memcpy(buffer.data(), product_meta.trivialCopyOffset, product_meta.sizeMeta); + writer->initialize(buffer); + token.channel()->receiveInitializedTrivialCopy(instance_, *writer); + writer->finalize(); + received_wrappers_[i] = writer->get(); + } + } + + if (received_meta_->hasSerialized()) { + assert(serialized_buffer->Length() == received_meta_->serializedBufferSize() && + "serialized data buffer is not equal to the expected length"); + } + }, []() { return "Calling MPIReceiver::acquire()"; }); } void produce(edm::Event& event, edm::EventSetup const&) final { // read the MPIToken used to establish the communication channel MPIToken token = event.get(upstream_); -#ifdef EDM_ML_DEBUG - // dump the summary of metadata - received_meta_->debugPrintMetadataSummary(); -#endif // if filter was false before the sender, receive nothing if (received_meta_->productCount() == -1) { @@ -96,68 +168,14 @@ class MPIReceiver : public edm::stream::EDProducer { event.emplace(pathStateToken_); } - std::unique_ptr serialized_buffer; - if (received_meta_->hasSerialized()) { - serialized_buffer = token.channel()->receiveSerializedBuffer(instance_, received_meta_->serializedBufferSize()); -#ifdef EDM_ML_DEBUG - { - edm::LogSystem msg("MPISender"); - msg << "Received serialised product:\n"; - for (int i = 0; i < received_meta_->serializedBufferSize(); ++i) { - msg << "0x" << std::hex << std::setw(2) << std::setfill('0') - << (unsigned int)(unsigned char)serialized_buffer->Buffer()[i] << (i % 16 == 15 ? '\n' : ' '); - } - } -#endif - } - - for (auto const& entry : products_) { - auto product_meta = received_meta_->getNext(); - if (product_meta.kind == ProductMetadata::Kind::Missing) { - edm::LogWarning("MPIReceiver") << "Product " << entry.type.name() << " was not received."; - continue; // Skip products that weren't sent - } - - else if (product_meta.kind == ProductMetadata::Kind::Serialized) { - std::unique_ptr wrapper( - reinterpret_cast(entry.wrappedType.getClass()->New())); - assert(static_cast(serialized_buffer->Length() + product_meta.sizeMeta) <= - received_meta_->serializedBufferSize() && - "serialized data buffer is shorter than expected"); - entry.wrappedType.getClass()->Streamer(wrapper.get(), *serialized_buffer); - // put the data into the Event - event.put(entry.token, std::move(wrapper)); - } - - else if (product_meta.kind == ProductMetadata::Kind::TrivialCopy) { - if (not enableTrivialSerialisation_) { - edm::LogError("MPIReceiver") - << "Received a trivially-serialised product, but enableTrivialSerialisation is set to false in this " - "MPIReceiver. Please check that the MPISender and MPIReceiver have consistent settings."; - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); - } - std::unique_ptr serialiser = - ngt::SerialiserFactory::get()->tryToCreate(entry.type.typeInfo().name()); - if (not serialiser) { - throw cms::Exception("SerializerError") << "Receiver could not retrieve its serializer when it was expected"; - } - auto writer = serialiser->writer(); - ngt::AnyBuffer buffer = writer->uninitialized_parameters(); // constructs buffer with typeid - assert(buffer.size_bytes() == product_meta.sizeMeta); - std::memcpy(buffer.data(), product_meta.trivialCopyOffset, product_meta.sizeMeta); - writer->initialize(buffer); - token.channel()->receiveInitializedTrivialCopy(instance_, *writer); - writer->finalize(); - // put the data into the Event - event.put(entry.token, writer->get()); + for (size_t i = 0; i < products_.size(); ++i) { + if (received_wrappers_[i]) { + event.put(products_[i].token, std::move(received_wrappers_[i])); + } else { + edm::LogWarning("MPIReceiver") << "Product " << products_[i].type.name() << " was not received."; } } - if (received_meta_->hasSerialized()) { - assert(serialized_buffer->Length() == received_meta_->serializedBufferSize() && - "serialized data buffer is not equal to the expected length"); - } - // write a shallow copy of the channel to the output, so other modules can consume it // to indicate that they should run after this event.emplace(token_, token); @@ -207,6 +225,7 @@ class MPIReceiver : public edm::stream::EDProducer { bool activity_; // indicator whether the PathStateToken will be received by the module edm::EDPutTokenT pathStateToken_; std::shared_ptr received_meta_; + std::vector> received_wrappers_; bool enableTrivialSerialisation_ = true; }; diff --git a/HeterogeneousCore/MPICore/plugins/MPISender.cc b/HeterogeneousCore/MPICore/plugins/MPISender.cc index a09154dd4dcfe..794bab55735a4 100644 --- a/HeterogeneousCore/MPICore/plugins/MPISender.cc +++ b/HeterogeneousCore/MPICore/plugins/MPISender.cc @@ -112,6 +112,13 @@ class MPISender : public edm::stream::EDProducer { const MPIToken& token = event.get(upstream_); // pass the number of products to estimate the right size for the metadata buffer auto meta = std::make_shared(products_.size()); + + // We use std::shared_ptr, instead of std::unique_ptr, so that readers can + // be captured by move by runAsync's lamnda. This is ultimately because this + // lambda is used to construct an std::function, which requires its callable + // to be copy-constructible. + std::vector> readers; + readers.reserve(products_.size()); size_t index = 0; buffer_->Reset(); has_serialized_ = false; @@ -145,6 +152,7 @@ class MPISender : public edm::stream::EDProducer { auto reader = serialiser->reader(*wrapper); ngt::AnyBuffer buffer = reader->parameters(); meta->addTrivialCopy(buffer.data(), buffer.size_bytes()); + readers.push_back(std::move(reader)); } else { LogDebug("MPISender") << "No serializer for type \"" << entry.type.name() << "\" (" << entry.type.typeInfo().name() << "), using ROOT serialization"; @@ -169,50 +177,32 @@ class MPISender : public edm::stream::EDProducer { edm::Service as; as->runAsync( std::move(holder), - [this, token, meta = std::move(meta)]() { token.channel()->sendMetadata(instance_, meta); }, + [this, token, meta = std::move(meta), readers = std::move(readers)]() { + token.channel()->sendMetadata(instance_, meta); + if (has_serialized_) { +#ifdef EDM_ML_DEBUG + { + edm::LogSystem msg("MPISender"); + msg << "Sending serialised product:\n"; + for (int i = 0; i < buffer_->Length(); ++i) { + msg << "0x" << std::hex << std::setw(2) << std::setfill('0') + << (unsigned int)(unsigned char)buffer_->Buffer()[i] << (i % 16 == 15 ? '\n' : ' '); + } + } +#endif + token.channel()->sendBuffer(buffer_->Buffer(), buffer_->Length(), instance_, EDM_MPI_SendSerializedProduct); + } + for (auto const& reader : readers) { + token.channel()->sendTrivialCopyProduct(instance_, *reader); + } + }, []() { return "Calling MPISender::acquire()"; }); } void produce(edm::Event& event, edm::EventSetup const&) final { - MPIToken token = event.get(upstream_); - - if (!is_active_) { - event.emplace(token_, token); - return; - } - - if (has_serialized_) { -#ifdef EDM_ML_DEBUG - { - edm::LogSystem msg("MPISender"); - msg << "Sending serialised product:\n"; - for (int i = 0; i < buffer_->Length(); ++i) { - msg << "0x" << std::hex << std::setw(2) << std::setfill('0') - << (unsigned int)(unsigned char)buffer_->Buffer()[i] << (i % 16 == 15 ? '\n' : ' '); - } - } -#endif - token.channel()->sendBuffer(buffer_->Buffer(), buffer_->Length(), instance_, EDM_MPI_SendSerializedProduct); - } - - for (auto const& entry : products_) { - edm::Handle handle(entry.type.typeInfo()); - event.getByToken(entry.token, handle); - edm::WrapperBase const* wrapper = handle.product(); - // we don't send missing products - if (handle.isValid()) { - std::unique_ptr serialiser; - if (enableTrivialSerialisation_) { - serialiser = ngt::SerialiserFactory::get()->tryToCreate(entry.type.typeInfo().name()); - } - if (serialiser) { - auto reader = serialiser->reader(*wrapper); - token.channel()->sendTrivialCopyProduct(instance_, *reader); - } - } - } // write a shallow copy of the channel to the output, so other modules can consume it // to indicate that they should run after this + MPIToken token = event.get(upstream_); event.emplace(token_, token); } diff --git a/HeterogeneousCore/MPICore/plugins/MPISource.cc b/HeterogeneousCore/MPICore/plugins/MPISource.cc index 0e07768f17c75..8bfb656589305 100644 --- a/HeterogeneousCore/MPICore/plugins/MPISource.cc +++ b/HeterogeneousCore/MPICore/plugins/MPISource.cc @@ -22,6 +22,7 @@ #include "FWCore/Framework/interface/InputSourceDescription.h" #include "FWCore/Framework/interface/InputSourceMacros.h" #include "FWCore/Framework/interface/ProductProvenanceRetriever.h" +#include "FWCore/Framework/interface/TriggerNamesService.h" #include "FWCore/MessageLogger/interface/ErrorObj.h" #include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" @@ -29,6 +30,7 @@ #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" #include "FWCore/ParameterSet/interface/ParameterSetDescriptionFiller.h" +#include "FWCore/ServiceRegistry/interface/Service.h" #include "FWCore/Sources/interface/ProducerSourceBase.h" #include "FWCore/Utilities/interface/EDMException.h" #include "FWCore/Utilities/interface/StreamID.h" @@ -101,21 +103,36 @@ MPISource::MPISource(edm::ParameterSet const& config, edm::InputSourceDescriptio int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); - // Determine the rank of the other process. - int remote = config.getUntrackedParameter("controller"); - if (remote == -1) { - // When there are only two proccesses, we can assume the ranks to be 0 and 1, - // and we can infer the other process rank from our own. - if (size == 2) { - remote = 1 - rank; - } else { - throw edm::Exception(edm::errors::Configuration) - << "Setting the remote rank to -1 is valid only where there are exactly two processes."; - } + edm::LogInfo("MPI") << "MPIController Comm World size: " << size; + + // All processes exchange the hashes of their names. + // One follower process has to make one communication channel with the controller process + // If controller process is not unique, error is thrown + auto controller_name = config.getParameter("controllerProcessName"); + if (controller_name.empty()) { + throw edm::Exception(edm::errors::Configuration) + << "ERROR: Controller process name cannot be empty. Aborting MPISource..."; + } + + edm::Service tns; + std::string const& this_process_name = tns->getProcessName(); + if (controller_name == this_process_name) { + throw edm::Exception(edm::errors::Configuration) + << "ERROR: controller and follower processes cannot have the same name. Aborting MPISource..."; } - if (remote < 0 or remote >= size) { + + edm::Service mpiservice; + std::vector controller_indices = mpiservice->getRanksByProcessName(controller_name); + int remote = -1; + if (controller_indices.empty()) { + throw edm::Exception(edm::errors::Configuration) + << "ERROR: No controller process with name " << controller_name << " found. Aborting..."; + } else if (controller_indices.size() == 1) { + remote = controller_indices[0]; + } else { throw edm::Exception(edm::errors::Configuration) - << "The rank of the remote process (" << remote << ") is invalid. Valid ranks are 0 to " << size - 1 << "."; + << "ERROR: Multiple controller processes with name " << controller_name + << " were found. Currently, only one controller process is supported. Aborting..."; } // Create a new communicator that spans only this process and the one with the given remote rank. @@ -126,7 +143,7 @@ MPISource::MPISource(edm::ParameterSet const& config, edm::InputSourceDescriptio MPI_Comm_create_group(MPI_COMM_WORLD, comm_group, 0, &comm_); MPI_Group_free(&world_group); MPI_Group_free(&comm_group); - edm::LogAbsolute("MPI") << "The remote process and MPISource have ranks " << remote << ", " << rank + edm::LogAbsolute("MPI") << "The MPIController process and MPISource have ranks " << remote << ", " << rank << " in MPI_COMM_WORLD, mapped to ranks 0, 1 in their private communicator."; // The remote process always has rank 0 in the new communicator. remote = 0; @@ -364,13 +381,12 @@ void MPISource::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { desc.ifValue( edm::ParameterDescription("mode", "CommWorld", false), ModeDescription[kCommWorld] >> - edm::ParameterDescription( - "controller", - -1, - false, - edm::Comment("Rank of the remote \"controller\" process.\n" - "When there are only two processes, pass -1 to autodetect the rank of the remote " - "process based on the rank of the current process.")) or + edm::ParameterDescription( + "controllerProcessName", + "", + true, + edm::Comment("Process name of the controller process corresponding to this MPISource.\n" + "Only one process with this name is expected.\n")) or ModeDescription[kIntercommunicator] >> edm::ParameterDescription("name", "server", false)) ->setComment( "Valid modes are CommWorld (use MPI_COMM_WORLD) and Intercommunicator (use an MPI name server to setup an " diff --git a/HeterogeneousCore/MPICore/python/configuration_splitter/editor_functions.py b/HeterogeneousCore/MPICore/python/configuration_splitter/editor_functions.py index 62d79bf65e79f..adb4b5afafc1e 100644 --- a/HeterogeneousCore/MPICore/python/configuration_splitter/editor_functions.py +++ b/HeterogeneousCore/MPICore/python/configuration_splitter/editor_functions.py @@ -5,12 +5,17 @@ from HLTrigger.Configuration.common import * from HeterogeneousCore.MPICore.modules import * -def add_controller_to_local(process): + +def add_controller_to_local(process, remote_name): process.load("HeterogeneousCore.MPIServices.MPIService_cfi") process.MPIService.pmix_server_uri = "file:server.uri" - process.mpiController = MPIController() + controller_name = f"mpiController{remote_name.title()}" + controller = cms.EDProducer("MPIController", + followerProcessName = cms.string(remote_name)) + setattr(process, controller_name, controller) # Multiple luminocity blocks are currently unsupported process.options.numberOfConcurrentLuminosityBlocks = 1 + return controller_name def clone_module_from_process(dst, src, name): @@ -23,8 +28,8 @@ def clone_module_from_process(dst, src, name): setattr(dst, name, getattr(src, name).clone()) -def create_remote_process(local_process, modules_to_run): - remote_process = cms.Process("REMOTE") +def create_remote_process(local_process, modules_to_run, remote_process_name, local_process_name): + remote_process = cms.Process(remote_process_name) remote_process.load("Configuration.StandardSequences.Accelerators_cff") # load the global psets and event setup modules @@ -34,14 +39,19 @@ def create_remote_process(local_process, modules_to_run): setattr(remote_process, module, getattr(local_process, module).clone()) for module in local_process.es_producers.keys(): setattr(remote_process, module, getattr(local_process, module).clone()) + + if hasattr(local_process, "MessageLogger"): + remote_process.MessageLogger = local_process.MessageLogger.clone() remote_process.load("HeterogeneousCore.MPIServices.MPIService_cfi") remote_process.MPIService.pmix_server_uri = "file:server.uri" # where do i get this firstRun parameter from? - remote_process.source = cms.Source("MPISource") + remote_process.source = MPISource( # firstRun = cms.untracked.uint32(process.Source) - # ) + mode = 'CommWorld', + controllerProcessName = local_process_name + ) remote_process.maxEvents.input = -1 for module in modules_to_run: diff --git a/HeterogeneousCore/MPICore/python/configuration_splitter/module_dependency_analyzer.py b/HeterogeneousCore/MPICore/python/configuration_splitter/module_dependency_analyzer.py index f4ee1ac7c6afc..490d4bcc9c788 100644 --- a/HeterogeneousCore/MPICore/python/configuration_splitter/module_dependency_analyzer.py +++ b/HeterogeneousCore/MPICore/python/configuration_splitter/module_dependency_analyzer.py @@ -5,24 +5,32 @@ import FWCore.ParameterSet.Config as cms -def flatten_all_to_module_set(process, user_args): +def flatten_all_to_module_list(process, user_args): """ - This function ensures that if one of the input arguments was path or sequence, - it will be flattened to a module set for input consistency + Flatten input arguments into an ordered list of module names. + Preserves user-provided order and avoids duplicates. """ - module_list = set() + module_list = [] + seen = set() + for name in user_args: if not hasattr(process, name): print(f"[WARN] process has no attribute named '{name}'") continue + obj_ = getattr(process, name) + if hasattr(obj_, "moduleNames"): - print("is sequence") - module_list.extend(obj_.moduleNames()) + for mod in obj_.moduleNames(): + if mod not in seen: + module_list.append(mod) + seen.add(mod) else: - module_list.add(name) - return module_list + if name not in seen: + module_list.append(name) + seen.add(name) + return module_list class ModuleDependencyAnalyzer: def __init__(self, process): @@ -74,7 +82,7 @@ def _extract_inputtags(self, value): return tags - def direct_dependencies(self, modules: Set[str]) -> Set[str]: + def direct_dependencies(self, modules: List[str]) -> Set[str]: """ Get the modules whose products are needed """ @@ -89,7 +97,7 @@ def _consumers_of(self, producer: str) -> Set[str]: """ return self.producer_to_consumers.get(producer, set()) - def _restricted_graph(self, modules: Set[str]) -> Dict[str, Set[str]]: + def _restricted_graph(self, modules: List[str]) -> Dict[str, Set[str]]: """ Restricted dependency graph, reflecting the relationships between the modules to offload """ @@ -104,7 +112,11 @@ def _restricted_graph(self, modules: Set[str]) -> Dict[str, Set[str]]: return graph - def _connected_groups(self, graph: Dict[str, Set[str]]) -> List[List[str]]: + def _connected_groups( + self, + graph: Dict[str, Set[str]], + module_order: List[str], + ) -> List[List[str]]: """ Return list of weakly connected components. Each component is returned as a list of modules ordered @@ -114,34 +126,35 @@ def _connected_groups(self, graph: Dict[str, Set[str]]) -> List[List[str]]: # ---- build undirected graph ---- undirected = defaultdict(set) - for src, dsts in graph.items(): - undirected[src] # ensure node exists - for dst in dsts: + for src in graph: + undirected[src] + for dst in graph[src]: undirected[src].add(dst) undirected[dst].add(src) seen = set() components = [] - # ---- find weakly connected components ---- - for node in undirected: - if node in seen: + # ---- deterministic traversal using module_order ---- + for node in module_order: + if node not in undirected or node in seen: continue stack = [node] - comp = set() + comp = [] while stack: n = stack.pop() if n in seen: continue seen.add(n) - comp.add(n) + comp.append(n) + stack.extend(undirected[n] - seen) components.append(comp) - # ---- order each component by dependencies ---- + # ---- order each component by dependencies ---- ordered_components = [] for comp in components: @@ -175,13 +188,13 @@ def _connected_groups(self, graph: Dict[str, Set[str]]) -> List[List[str]]: ordered_components.append(ordered) return ordered_components - - def dependency_groups(self, modules: Set[str]) -> List[List[str]]: + + def dependency_groups(self, modules: List[str]) -> List[List[str]]: """ Get dependency groups (ordered) """ graph = self._restricted_graph(modules) - return self._connected_groups(graph) + return self._connected_groups(graph, modules) def grouped_external_dependencies( self, @@ -222,7 +235,7 @@ def producer_to_groups( def modules_to_send_back_by_group( self, groups: List[List[str]], - modules_to_run_on_both: Set[str], + modules_to_run_on_both: List[str], ): """ Which offloaded modules must send products back, and which are not needed on local diff --git a/HeterogeneousCore/MPICore/python/configuration_splitter/multiple_remotes_option_parser.py b/HeterogeneousCore/MPICore/python/configuration_splitter/multiple_remotes_option_parser.py new file mode 100644 index 0000000000000..9f3b2f99e6c03 --- /dev/null +++ b/HeterogeneousCore/MPICore/python/configuration_splitter/multiple_remotes_option_parser.py @@ -0,0 +1,205 @@ +import argparse +import pathlib +import sys + + +def build_global_parser(): + parser = argparse.ArgumentParser(add_help=False) + + parser.add_argument( + "config", + metavar="config.py", + type=pathlib.Path, + help="python configuration file to be split" + ) + parser.add_argument("-c", "--reuse-cpp-names", action="store_true") + parser.add_argument("-v", "--verbose", action="store_true") + parser.add_argument( + "-l", + "--output-local", + type=pathlib.Path, + default=pathlib.Path("local.py"), + ) + + return parser + + +def build_process_parser(): + parser = argparse.ArgumentParser(add_help=False) + + parser.add_argument("-m", "--remote-modules", nargs="+", default=None) + parser.add_argument("-r", "--output-remote", type=pathlib.Path, default=None) + parser.add_argument("-d", "--duplicate-modules", nargs="+", default=None) + parser.add_argument("-n", "--remote-process-name", default="") + + return parser + + +def split_groups(argv): + groups = [] + current = [] + + for arg in argv: + if arg == ":": + groups.append(current) + current = [] + else: + current.append(arg) + + groups.append(current) + return groups + + +def print_help(): + print( +""" +edmMpiSplitConfig script splits a CMSSW configuration \ +into local and one or multiple remote processes for MPI execution. +Selected modules (or sequences) are offloaded to a remote process while \ +the remaining modules stay in the local process. +Data dependencies are \ +automatically analyzed and the required products are forwarded between processes. + + +USAGE: + edmMpiSplitConfig config.py [GLOBAL OPTIONS] [PROCESS OPTIONS] [: PROCESS OPTIONS] ... + +DESCRIPTION: + Arguments before ':' apply to one remote process. + You can define multiple remote processes by separating groups with ':'. + +GLOBAL OPTIONS: +Positional arguments (required): + config + Path to the input HLT Python configuration file. + +Optional arguments: + -l, --output-local + Path to the output configuration for the local process. + (default: local.py) + + -c, --reuse-cpp-names + False by default. If this script was run before, pass this + argument to reuse the generated file with C++ product names + + -v, --verbose + Print debug outputs + +PROCESS OPTIONS (can be passed for different remote processes): + -m, --remote-modules + Modules to be offloaded to the remote process. + Sequences can also be passed as a parameter, in which case + all modules of that sequence will be offloaded + + -r, --output-remote + Path to the output configuration for the remote process. + (default: remote.py) + + -d, --duplicate-modules + List of module labels that must run on both local and remote + processes. Products from these modules are not transferred + between processes. + + -n, --remote-process-name + Name of the remote process (default: REMOTE) + + +SINGLE REMOTE EXAMPLES: + +Example 1 (offloading GPU part of ECAL and HCAL): + + edmMpiSplitConfig hlt.py --remote-modules hltEcalDigisSoA hltEcalUncalibRecHitSoA \\ + hltHcalDigisSoA hltHbheRecoSoA hltParticleFlowRecHitHBHESoA hltParticleFlowClusterHBHESoA \\ + --duplicate-modules hltHcalDigis \\ + --output-local local.py \\ + --output-remote remote.py + +Example 2 (offloading GPU part of ECAL, HCAL and pixels): + + edmMpiSplitConfig hlt.py --remote-modules hltEcalDigisSoA hltEcalUncalibRecHitSoA \\ + hltHcalDigisSoA hltHbheRecoSoA hltParticleFlowRecHitHBHESoA hltParticleFlowClusterHBHESoA \\ + hltSiPixelClustersSoA hltSiPixelRecHitsSoA hltPixelTracksSoA hltPixelVerticesSoA \\ + --duplicate-modules hltHcalDigis hltOnlineBeamSpot hltOnlineBeamSpotDevice \\ + --output-local local_pixels.py \\ + --output-remote remote_pixels.py + +MULTI-REMOTE EXAMPLES: + + Separate remote groups using ':'. + + Example (offloading GPU part of ECAL to one process and HCAL to another): + + edmMpiSplitConfig hlt.py -l local.py \\ + -m hltEcalDigisSoA hltEcalUncalibRecHitSoA -r remote_ecal.py -n ECAL : \\ + -m hltHcalDigisSoA hltHbheRecoSoA hltParticleFlowRecHitHBHESoA hltParticleFlowClusterHBHESoA \\ + -d hltHcalDigis -r remote_hcal.py -n HCAL + +Notes: + +• To split a configuration it must be processed with 0 events. + This will cause creating output files and directories (by default in '.cppnamedir' directory). +• If the splitter was run before, --reuse-cpp-names avoids rerunning cmsRun for products' characteristics. + Passing this option will make the script run much faster, given that needed information already exists. +• For some modules it might be better to run on both processes + instead of sending their products. Use --duplicate-modules option to specify them. +• Only dependencies expressed via InputTag are analyzed. +• Execution order inside dependency groups is preserved. +""" + ) + + +def parse_mpi_style_args(argv): + if not argv or any(arg in ("-h", "--help") for arg in argv): + print_help() + sys.exit(0) + + global_parser = build_global_parser() + process_parser = build_process_parser() + + # parse global args from full argv + global_args, _ = global_parser.parse_known_args(argv) + + # split independently + groups = split_groups(argv) + configs = [] + + # if no ":" - single process + if len(groups) == 1: + proc_args, _ = process_parser.parse_known_args(argv) + + cfg = argparse.Namespace() + + # merge + cfg.config = global_args.config + cfg.output_local = global_args.output_local + cfg.reuse_cpp_names = global_args.reuse_cpp_names + cfg.verbose = global_args.verbose + + cfg.remote_modules = proc_args.remote_modules or [] + cfg.output_remote = proc_args.output_remote + cfg.duplicate_modules = proc_args.duplicate_modules or [] + cfg.remote_process_name = proc_args.remote_process_name or "" + + configs.append(cfg) + return configs + + # multi-process + for i, g in enumerate(groups): + proc_args, _ = process_parser.parse_known_args(g) + cfg = argparse.Namespace() + + # globals + cfg.config = global_args.config + cfg.output_local = global_args.output_local + cfg.reuse_cpp_names = global_args.reuse_cpp_names + cfg.verbose = global_args.verbose + + # per-process overrides + cfg.remote_modules = proc_args.remote_modules or [] + cfg.output_remote = proc_args.output_remote or pathlib.Path(f"remote{i}.py") + cfg.duplicate_modules = proc_args.duplicate_modules or [] + cfg.remote_process_name = proc_args.remote_process_name or "" + + configs.append(cfg) + + return configs diff --git a/HeterogeneousCore/MPICore/python/configuration_splitter/split_remote.py b/HeterogeneousCore/MPICore/python/configuration_splitter/split_remote.py new file mode 100644 index 0000000000000..7cecc525b12a1 --- /dev/null +++ b/HeterogeneousCore/MPICore/python/configuration_splitter/split_remote.py @@ -0,0 +1,210 @@ +""" +This module implements the logic to separate one remote process from the local one +""" + +import pathlib +import os +import sys + +from HeterogeneousCore.MPICore.configuration_splitter.cpp_name_getter import CPPNameGetter +from HeterogeneousCore.MPICore.configuration_splitter.module_dependency_analyzer import ModuleDependencyAnalyzer, flatten_all_to_module_list +from HeterogeneousCore.MPICore.configuration_splitter.editor_functions import * +from HeterogeneousCore.MPICore.configuration_splitter.path_state_helpers import * +from HeterogeneousCore.MPICore.configuration_splitter.multiple_remotes_option_parser import * + +def split_remote(local_process, args, cpp_names_of_the_products): + modules_to_offload = flatten_all_to_module_list(local_process, args.remote_modules) + modules_to_run_on_both = flatten_all_to_module_list(local_process, args.duplicate_modules) + + # list of all modules to run on remote + modules_to_offload.extend(m for m in modules_to_run_on_both if m not in modules_to_offload) + + analyzer = ModuleDependencyAnalyzer(local_process) + + groups = analyzer.dependency_groups(modules_to_offload) + grouped_deps = analyzer.grouped_external_dependencies(groups) + producer_to_groups = analyzer.producer_to_groups(grouped_deps) + + if args.verbose: + print("Dependency groups: ", groups) + print("Grouped depencencies:", grouped_deps ) + print("Local producers - dependant groups correspondance: ", producer_to_groups) + + # get products whose data needs to be sent, excluding modules without local dependencies and modules which should run on both processes + modules_to_send, modules_without_local_deps = analyzer.modules_to_send_back_by_group(groups, modules_to_run_on_both) + + if args.verbose: + print("Offloaded modues whose products need to be sent: ", modules_to_send) + print("Offloaded modules without local dependencies: ", modules_without_local_deps) + + # --- start editing --- + + controller_name = add_controller_to_local(local_process, args.remote_process_name) + remote_process = create_remote_process(local_process, modules_to_offload, args.remote_process_name, local_process.name_()) + + mpi_path_modules_local = [[controller_name] for _ in range(len(groups))] + mpi_path_modules_remote = [[] for _ in range(len(groups))] + + instance = 1 + + # how to add state captures to the splitter? + + # 1) For the local sender - create path state capture per sender. + # Insert this path state capture before the first module of each needed group. + # If multiple groups need these products, create individual path state captures + # and add separate senders on top + # + # 2) For the remote receiver - add path state product to the list. + # Add the general activity filter at the beginning of the group path + # and, if needed, separate activity filters for each group + # + # 3) For the remote sender - add the state capture at the end of the groups path. + # Each sender module for this group depens on this state capture + # + # 4) For the local receiver - before deleting the offloaded module, insert filter for the activity it receives + + # send the data needed by offloaded modules from local to remote + remote_filters_by_group = [[] for _ in range(len(groups))] + for local_dependency, group_indices in producer_to_groups.items(): + first_dependency_in_a_group = [groups[i][0] for i in group_indices] + capture_name = f"activityCaptureBefore{local_dependency.title()}" + insert_path_state_capture_before(local_process, first_modules_in_a_group=first_dependency_in_a_group, capture_name=capture_name) + sender = create_sender( + module_name=local_dependency, + products=cpp_names_of_the_products[local_dependency], + instance=instance, + sender_upstream=controller_name, + path_state_capture = capture_name + ) + sender_name = f"mpiSender{args.remote_process_name.title()}{local_dependency.title()}" + setattr(local_process, sender_name, sender) + + receiver = create_receiver( + products=cpp_names_of_the_products[local_dependency], + instance=instance, + receiver_upstream="source", + path_state_capture=True, + ) + # create filter for the path state + filter_name = f"activityFilterAfter{local_dependency.title()}" + add_activity_filter(remote_process, local_dependency, filter_name) + setattr(remote_process, local_dependency, receiver) + for group_idx in group_indices: + remote_filters_by_group[group_idx].append(filter_name) + mpi_path_modules_local[group_idx].append(sender_name) + mpi_path_modules_remote[group_idx].append(local_dependency) + + instance += 1 + + # Handle the case when onle local product is needed by multiple remote groups + # For sending from local process - if data is needed by multiple paths, insert additional path state capture and additional sender per group + # On remote insert one more receiver for this capture and one more filter in the beginning of the deps group + # In this approach if there are 2 senders with intersecting groups, it will result in only one group activation sender-receiver pair per group + if len(group_indices) >= 2: + for group_idx in group_indices: + capture_name=f"activityCaptureBefore{args.remote_process_name.title()}Group{group_idx}" + insert_path_state_capture_before(local_process, first_modules_in_a_group=[groups[group_idx][0]], capture_name=capture_name) + sender = create_sender( + module_name=local_dependency, + products=[], + instance=instance, + sender_upstream=controller_name, + path_state_capture = capture_name + ) + sender_name = f"mpiSender{args.remote_process_name.title()}Group{group_idx}Activity" + setattr(local_process, sender_name, sender) + + receiver = create_receiver( + products=[], + instance=instance, + receiver_upstream="source", + path_state_capture=True, + ) + receiver_name = f"mpiReceiver{args.remote_process_name.title()}Group{group_idx}Activity" + setattr(remote_process, receiver_name, receiver) + + # create filter for the path state + filter_name = f"activityFilterBefore{args.remote_process_name.title()}Group{group_idx}" + add_activity_filter(remote_process, receiver_name, filter_name) + remote_filters_by_group[group_idx].append(filter_name) + + instance += 1 + mpi_path_modules_local[group_idx].append(sender_name) + mpi_path_modules_remote[group_idx].append(receiver_name) + + + per_group_remote_captures = [[] for _ in range(len(groups))] + + # send the results from remote to local + for group_idx, group in enumerate(modules_to_send): + if len(group)==0: + continue + + remote_capture_name = f"activityCaptureAfter{args.remote_process_name.title()}Group{group_idx}" + setattr(remote_process, remote_capture_name, cms.EDProducer("PathStateCapture")) + per_group_remote_captures[group_idx].append(remote_capture_name) + + if len(mpi_path_modules_remote[group_idx]) != 0: + sender_upstream = mpi_path_modules_remote[group_idx][-1] + else: + sender_upstream = "source" + + sender = create_group_sender( + group=group, + all_products=cpp_names_of_the_products, + instance=instance, + upstream_module=sender_upstream, + path_state_capture=remote_capture_name, + ) + sender_name = f"mpiSender{args.remote_process_name.title()}Group{group_idx}" + setattr(remote_process, sender_name, sender) + + if len(mpi_path_modules_local[group_idx]) != 0: + receiver_upstream = mpi_path_modules_local[group_idx][-1] + else: + receiver_upstream = controller_name + + receiver = create_group_receiver( + group=group, + all_products=cpp_names_of_the_products, + instance=instance, + receiver_upstream=receiver_upstream, + path_state_capture=True, + ) + receiver_name = f"mpiReceiver{args.remote_process_name.title()}Group{group_idx}" + setattr(local_process, receiver_name, receiver) + + instance += 1 + + # insert filter on local before the first module which was supposed to run (is it correct?) + filter_name = f"activityFilterAfter{args.remote_process_name.title()}Group{group_idx}" + add_activity_filter(local_process, receiver_name, filter_name) + insert_modules_before(local_process, getattr(local_process, group[0]), getattr(local_process, filter_name)) + + mpi_path_modules_remote[group_idx].append(sender_name) + mpi_path_modules_local[group_idx].append(receiver_name) + + + for i, offloaded_module in enumerate(group): + delattr(local_process, offloaded_module) + module_alias = create_receiver_alias(receiver_name=receiver_name, + products=cpp_names_of_the_products[offloaded_module], + module_name=offloaded_module + ) + setattr(local_process, offloaded_module, module_alias) + + + # delete offloaded modules whose products are not needed on local from the local process: + for product in modules_without_local_deps: + delattr(local_process, product) + + # add all needed paths to the process and schedule them + for i, group in enumerate(groups): + make_new_path(local_process, f"Offload{args.remote_process_name.title()}Group{i}", mpi_path_modules_local[i]) + make_new_path(remote_process, f"MPIPathGroup{i}", mpi_path_modules_remote[i]) + make_new_path(remote_process, args.remote_process_name.title()+"RemoteOffloadedSequence"+str(i), remote_filters_by_group[i]+group+per_group_remote_captures[i]) + + if args.verbose: + print(f"Successfully split out remote config with name {args.remote_process_name}!") + + return remote_process diff --git a/HeterogeneousCore/MPICore/scripts/edmMpiSplitConfig b/HeterogeneousCore/MPICore/scripts/edmMpiSplitConfig index cc56b27b2ef9d..dcf5f65087cba 100755 --- a/HeterogeneousCore/MPICore/scripts/edmMpiSplitConfig +++ b/HeterogeneousCore/MPICore/scripts/edmMpiSplitConfig @@ -5,62 +5,17 @@ This script splits an HLT configuration into local and remote processes by offloading selected modules or sequences. This tool analyzes data dependencies between CMSSW modules and generates -two derived configuration files: - - a *local* process config - - a *remote* process config +the derived configuration files: + - one *local* process config + - one or multiple *remote* process configs -Modules specified for offloading will be moved to the remote process. +Modules specified for offloading will be moved to the remote processes. +If multiple remote processes are needed, individual parameters should +be separated by ":". Any required data products are automatically identified and forwarded between processes unless the producing module is explicitly marked as shared. -Positional arguments: - config - Path to the input HLT Python configuration file. - -Required arguments: - --remote-modules - Modules to be offloaded to the remote process. - Sequences can also be passed as a parameter, in which case - all modules of that sequence will be offloaded - -Optional arguments: - -l, --output-local - Path to the output configuration for the local process. - (default: local.py) - - -r, --output-remote - Path to the output configuration for the remote process. - (default: remote.py) - - -d, --duplicate-modules - List of module labels that must run on both local and remote - processes. Products from these modules are not transferred - between processes. - - -c, --reuse-cpp-names - False by default. If this script was run before, pass this - argument to reuse the generated file with C++ product names - - -v, --verbose - Print debug outputs - -Example 1 (offloading GPU part of ECAL and HCAL): - python3 edmMpiSplitConfig hlt.py --remote-modules hltEcalDigisSoA hltEcalUncalibRecHitSoA \ - hltHcalDigisSoA hltHbheRecoSoA hltParticleFlowRecHitHBHESoA hltParticleFlowClusterHBHESoA \ - --duplicate-modules hltHcalDigis \ - --output-local local.py \ - --output-remote remote.py - -Example 2 (offloading GPU part of ECAL, HCAL and pixels): - python3 edmMpiSplitConfig hlt.py --remote-modules hltEcalDigisSoA hltEcalUncalibRecHitSoA \ - hltHcalDigisSoA hltHbheRecoSoA hltParticleFlowRecHitHBHESoA hltParticleFlowClusterHBHESoA \ - hltSiPixelClustersSoA hltSiPixelRecHitsSoA hltPixelTracksSoA hltPixelVerticesSoA \ - --duplicate-modules hltHcalDigis hltOnlineBeamSpot hltOnlineBeamSpotDevice \ - --output-local local_pixels.py \ - --output-remote remote_pixels.py - - Notes: - Only data dependencies expressed via InputTag are considered. - Module execution order inside dependency groups is preserved. @@ -79,321 +34,58 @@ import os import sys from HeterogeneousCore.MPICore.configuration_splitter.cpp_name_getter import CPPNameGetter -from HeterogeneousCore.MPICore.configuration_splitter.module_dependency_analyzer import ModuleDependencyAnalyzer, flatten_all_to_module_set +from HeterogeneousCore.MPICore.configuration_splitter.module_dependency_analyzer import ModuleDependencyAnalyzer, flatten_all_to_module_list from HeterogeneousCore.MPICore.configuration_splitter.editor_functions import * from HeterogeneousCore.MPICore.configuration_splitter.path_state_helpers import * +from HeterogeneousCore.MPICore.configuration_splitter.multiple_remotes_option_parser import * +from HeterogeneousCore.MPICore.configuration_splitter.split_remote import * from FWCore.ParameterSet.processFromFile import processFromFile def main(): - parser = argparse.ArgumentParser( - description=( - "Split a CMSSW configuration into local and remote processes for MPI execution.\n" - "Selected modules (or sequences) are offloaded to a remote process while " - "the remaining modules stay in the local process. Data dependencies are " - "automatically analyzed and the required products are forwarded between processes." - ), - epilog=( - "Examples:\n" - "\n" - "Offload ECAL and HCAL GPU reconstruction modules:\n" - " python3 edmMpiSplitConfig.py hlt.py --remote-modules \\\n" - " hltEcalDigisSoA hltEcalUncalibRecHitSoA \\\n" - " hltHcalDigisSoA hltHbheRecoSoA \\\n" - " hltParticleFlowRecHitHBHESoA hltParticleFlowClusterHBHESoA \\\n" - " --duplicate-modules hltHcalDigis \\\n" - " --output-local local.py --output-remote remote.py\n" - "\n" - "Offload ECAL, HCAL and Pixel GPU reconstruction:\n" - " python3 edmMpiSplitConfig.py hlt.py --remote-modules \\\n" - " hltEcalDigisSoA hltEcalUncalibRecHitSoA \\\n" - " hltHcalDigisSoA hltHbheRecoSoA \\\n" - " hltParticleFlowRecHitHBHESoA hltParticleFlowClusterHBHESoA \\\n" - " hltSiPixelClustersSoA hltSiPixelRecHitsSoA \\\n" - " hltPixelTracksSoA hltPixelVerticesSoA \\\n" - " --duplicate-modules hltHcalDigis hltOnlineBeamSpot hltOnlineBeamSpotDevice\n" - "\n" - "Notes:\n" - " • To split a configurationm it must be processed with 0 events." - " This will cause creating output files and directories (by default in '.cppnamedir' directory).\n" - " • If the splitter was run before, --reuse-cpp-names avoids rerunning cmsRun for products' characteristics." - " Passing this option will make the script run much faster, given that needed information already exists.\n" - " • For some modules it might be better to run on both processes" - " instead of sending their products. Use --duplicate-modules option to specify them.\n" - " • Only dependencies expressed via InputTag are analyzed.\n" - " • Execution order inside dependency groups is preserved.\n" - ), - formatter_class=argparse.RawTextHelpFormatter, - ) - parser.add_argument( - "config", - metavar="[--] config.py", - type=pathlib.Path, - help="python configuration file to be split; use '--' to separate the positional argument from multi-value options" - ) - parser.add_argument( - "-m", - "--remote-modules", - nargs="+", - default=[], - help="Modules and/or sequences to offload", - ) - parser.add_argument( - "-l", - "--output-local", - type=pathlib.Path, - default=pathlib.Path("local.py"), - help="Output config path for the local process", - ) - parser.add_argument( - "-r", - "--output-remote", - type=pathlib.Path, - default=pathlib.Path("remote.py"), - help="Output config path for the remote process", - ) - parser.add_argument( - "-d", - "--duplicate-modules", - nargs="+", - default=[], - help="Modules that must run on both local and remote processes", - ) - parser.add_argument( - "-c", - "--reuse-cpp-names", - action="store_true", - help="Assume the file with C++ product names already exists; " - "do not run cmsRun to regenerate it", - ) - parser.add_argument( - "-v", - "--verbose", - action="store_true", - help="Print execution logs of the splitter", - ) - - args = parser.parse_args() - - local_process = processFromFile(str(args.config)) - - modules_to_offload = flatten_all_to_module_set(local_process, args.remote_modules) - modules_to_run_on_both = flatten_all_to_module_set(local_process, args.duplicate_modules) + configs = parse_mpi_style_args(sys.argv[1:]) - # list of all modules to run on remote - modules_to_offload |= modules_to_run_on_both - - analyzer = ModuleDependencyAnalyzer(local_process) - - groups = analyzer.dependency_groups(modules_to_offload) - grouped_deps = analyzer.grouped_external_dependencies(groups) - producer_to_groups = analyzer.producer_to_groups(grouped_deps) - - if args.verbose: - print("Dependency groups: ", groups) - print("Grouped depencencies:", grouped_deps ) - print("Local producers - dependant groups correspondance: ", producer_to_groups) - - - # get products whose data needs to be sent, excluding modules without local dependencies and modules which should run on both processes - modules_to_send, modules_without_local_deps = analyzer.modules_to_send_back_by_group(groups, modules_to_run_on_both) - - if args.verbose: - print("Offloaded modues whose products need to be sent: ", modules_to_send) - print("Offloaded modules without local dependencies: ", modules_without_local_deps) + if configs[0].verbose: + for i, cfg in enumerate(configs): + print(f"\n=== Process {i} ===") + print(cfg) + local_process = processFromFile(str(configs[0].config)) # -- get c++ names -- - if args.verbose: - if not args.reuse_cpp_names: + if configs[0].verbose: + if not configs[0].reuse_cpp_names: print("Launching cmsRun to get C++ names of all products in the process...") else: - print("Truing to get C++ product names from existing file...") + print("Trying to get C++ product names from existing file...") - cpp_names_getter = CPPNameGetter(local_process, reuse=args.reuse_cpp_names) + cpp_names_getter = CPPNameGetter(local_process, reuse=configs[0].reuse_cpp_names) cpp_names_of_the_products = cpp_names_getter.get_cpp_types_of_module_products() - if args.verbose: + if configs[0].verbose: print("Got C++ product names") - - # --- start editing --- - - add_controller_to_local(local_process) - remote_process = create_remote_process(local_process, modules_to_offload) - - mpi_path_modules_local = ["mpiController"] - mpi_path_modules_remote = [] - - instance = 1 - - # how to add state captures to the splitter? - - # 1) For the local sender - create path state capture per sender. - # Insert this path state capture before the first module of each needed group. - # If multiple groups need these products, create individual path state captures - # and add separate senders on top - # - # 2) For the remote receiver - add path state product to the list. - # Add the general activity filter at the beginning of the group path - # and, if needed, separate activity filters for each group - # - # 3) For the remote sender - add the state capture at the end of the groups path. - # Each sender module for this group depens on this state capture - # - # 4) For the local receiver - before deleting the offloaded module, insert filter for the activity it receives - - # send the data needed by offloaded modules from local to remote - remote_filters_by_group = [[] for _ in range(len(groups))] - local_sender_by_group = [[] for _ in range(len(groups))] - for local_dependency, group_indices in producer_to_groups.items(): - first_dependency_in_a_group = [groups[i][0] for i in group_indices] - capture_name = f"activityCaptureBefore{local_dependency.title()}" - insert_path_state_capture_before(local_process, first_modules_in_a_group=first_dependency_in_a_group, capture_name=capture_name) - sender = create_sender( - module_name=local_dependency, - products=cpp_names_of_the_products[local_dependency], - instance=instance, - sender_upstream="mpiController", - path_state_capture = capture_name - ) - sender_name = f"mpiSender{local_dependency.title()}" - setattr(local_process, sender_name, sender) - - receiver = create_receiver( - products=cpp_names_of_the_products[local_dependency], - instance=instance, - receiver_upstream="source", - path_state_capture=True, - ) - # create filter for the path state - filter_name = f"activityFilterAfter{local_dependency.title()}" - add_activity_filter(remote_process, local_dependency, filter_name) - for group_idx in group_indices: - remote_filters_by_group[group_idx].append(filter_name) - local_sender_by_group[group_idx].append(sender_name) - - setattr(remote_process, local_dependency, receiver) - - instance += 1 - mpi_path_modules_local.append(sender_name) - mpi_path_modules_remote.append(local_dependency) - - # Handle the case when onle local product is needed by multiple remote groups - # For sending from local process - if data is needed by multiple paths, insert additional path state capture and additional sender per group - # On remote insert one more receiver for this capture and one more filter in the beginning of the deps group - # In this approach if there are 2 senders with intersecting groups, it will result in only one group activation sender-receiver pair per group - if len(group_indices) >= 2: - for group_idx in group_indices: - capture_name=f"activityCaptureBeforeGroup{group_idx}" - insert_path_state_capture_before(local_process, first_modules_in_a_group=[groups[group_idx][0]], capture_name=capture_name) - sender = create_sender( - module_name=local_dependency, - products=[], - instance=instance, - sender_upstream="mpiController", - path_state_capture = capture_name - ) - sender_name = f"mpiSenderGroup{group_idx}Activity" - setattr(local_process, sender_name, sender) - - receiver = create_receiver( - products=[], - instance=instance, - receiver_upstream="source", - path_state_capture=True, - ) - receiver_name = f"mpiReceiverGroup{group_idx}Activity" - setattr(remote_process, receiver_name, receiver) - - # create filter for the path state - filter_name = f"activityFilterBeforeGroup{group_idx}" - add_activity_filter(remote_process, receiver_name, filter_name) - remote_filters_by_group[group_idx].append(filter_name) - local_sender_by_group[group_idx].append(sender_name) - - instance += 1 - mpi_path_modules_local.append(sender_name) - mpi_path_modules_remote.append(receiver_name) + num_remotes = len(configs) - - per_group_remote_captures = [[] for _ in range(len(groups))] - - # send the results from remote to local - for group_idx, group in enumerate(modules_to_send): - if len(group)==0: - continue - - remote_capture_name = f"activityCaptureAfterGroup{group_idx}" - setattr(remote_process, remote_capture_name, cms.EDProducer("PathStateCapture")) - per_group_remote_captures[group_idx].append(remote_capture_name) - - sender = create_group_sender( - group=group, - all_products=cpp_names_of_the_products, - instance=instance, - upstream_module=mpi_path_modules_remote[0], - path_state_capture=remote_capture_name, - ) - sender_name = f"mpiSenderGroup{group_idx}" - setattr(remote_process, sender_name, sender) - - if len(local_sender_by_group[group_idx]) != 0: - receiver_upstream = local_sender_by_group[group_idx][-1] - else: - receiver_upstream = "mpiController" + for i, process_args in enumerate(configs): + if process_args.remote_process_name is None or process_args.remote_process_name == "": + process_args.remote_process_name = f"REMOTE{i}" + if process_args.output_remote is None: + if num_remotes == 1: + process_args.output_remote = pathlib.Path("remote.py") + else: + process_args.output_remote = pathlib.Path( + f"remote_{process_args.remote_process_name}.py" + ) - receiver = create_group_receiver( - group=group, - all_products=cpp_names_of_the_products, - instance=instance, - receiver_upstream=receiver_upstream, - path_state_capture=True, - ) - receiver_name = f"mpiReceiverGroup{group_idx}" - setattr(local_process, receiver_name, receiver) - - instance += 1 - - # insert filter on local before the first module which was supposed to run (is it correct?) - filter_name = f"activityFilterAfterGroup{group_idx}" - add_activity_filter(local_process, receiver_name, filter_name) - insert_modules_before(local_process, getattr(local_process, group[0]), getattr(local_process, filter_name)) - - mpi_path_modules_remote.append(sender_name) - mpi_path_modules_local.append(receiver_name) - - - for i, offloaded_module in enumerate(group): - delattr(local_process, offloaded_module) - module_alias = create_receiver_alias(receiver_name=receiver_name, - products=cpp_names_of_the_products[offloaded_module], - module_name=offloaded_module - ) - setattr(local_process, offloaded_module, module_alias) - + remote_process = split_remote(local_process, process_args, cpp_names_of_the_products) + process_args.output_remote.parent.mkdir(parents=True, exist_ok=True) + process_args.output_remote.write_text(remote_process.dumpPython()) + + configs[0].output_local.parent.mkdir(parents=True, exist_ok=True) + configs[0].output_local.write_text(local_process.dumpPython()) - # delete offloaded modules whose products are not needed on local from the local process: - for product in modules_without_local_deps: - delattr(local_process, product) - - # add all needed paths to the processed and schedule them - - make_new_path(local_process, "Offload", mpi_path_modules_local) - make_new_path(remote_process, "MPIPath", mpi_path_modules_remote) - for i, group in enumerate(groups): - make_new_path(remote_process, "RemoteOffloadedSequence"+str(i), remote_filters_by_group[i]+group+per_group_remote_captures[i]) - - # dump the 2 processes into files - - args.output_local.parent.mkdir(parents=True, exist_ok=True) - args.output_remote.parent.mkdir(parents=True, exist_ok=True) - - args.output_local.write_text(local_process.dumpPython()) - args.output_remote.write_text(remote_process.dumpPython()) - print("Success!") - if __name__ == "__main__": main() diff --git a/HeterogeneousCore/MPICore/test/BuildFile.xml b/HeterogeneousCore/MPICore/test/BuildFile.xml index d86194afcdfb0..dd74613cdb5f4 100644 --- a/HeterogeneousCore/MPICore/test/BuildFile.xml +++ b/HeterogeneousCore/MPICore/test/BuildFile.xml @@ -44,5 +44,15 @@ + + + + diff --git a/HeterogeneousCore/MPICore/test/controller_cfg.py b/HeterogeneousCore/MPICore/test/controller_cfg.py index 69b174d4e8764..921d08398e74b 100644 --- a/HeterogeneousCore/MPICore/test/controller_cfg.py +++ b/HeterogeneousCore/MPICore/test/controller_cfg.py @@ -21,7 +21,8 @@ from HeterogeneousCore.MPICore.modules import * process.mpiController = MPIController( - mode = 'CommWorld' + mode = 'CommWorld', + followerProcessName = 'Follower' ) process.ids = cms.EDProducer("edmtest::EventIDProducer") diff --git a/HeterogeneousCore/MPICore/test/controller_complex_cfg.py b/HeterogeneousCore/MPICore/test/controller_complex_cfg.py index 3a5aee2eac9bd..2e5db3dbadeda 100644 --- a/HeterogeneousCore/MPICore/test/controller_complex_cfg.py +++ b/HeterogeneousCore/MPICore/test/controller_complex_cfg.py @@ -20,7 +20,8 @@ from HeterogeneousCore.MPICore.modules import * process.mpiController = MPIController( - mode = 'CommWorld' + mode = 'CommWorld', + followerProcessName = 'MPIFollower' ) # Phase-1 FED RAW data collection pseudo object diff --git a/HeterogeneousCore/MPICore/test/controller_filters_cfg.py b/HeterogeneousCore/MPICore/test/controller_filters_cfg.py index 4b814de8431e6..3b93689c937a5 100644 --- a/HeterogeneousCore/MPICore/test/controller_filters_cfg.py +++ b/HeterogeneousCore/MPICore/test/controller_filters_cfg.py @@ -24,7 +24,8 @@ from HeterogeneousCore.MPICore.modules import MPIController, MPISender, MPIReceiver process.mpiController = MPIController( - mode = 'CommWorld' + mode = 'CommWorld', + followerProcessName = 'MPIFollower' ) # Produce EventID, that will be sent to the Follower diff --git a/HeterogeneousCore/MPICore/test/controller_multi_cfg.py b/HeterogeneousCore/MPICore/test/controller_multi_cfg.py index a72bd4350e1d6..2ecc8c11d6a46 100644 --- a/HeterogeneousCore/MPICore/test/controller_multi_cfg.py +++ b/HeterogeneousCore/MPICore/test/controller_multi_cfg.py @@ -30,7 +30,7 @@ process.mpiController1 = MPIController( mode = 'CommWorld', - followers = [ 1 ] + followerProcessName = 'Follower1' ) process.sender1 = MPISender( @@ -72,7 +72,7 @@ process.mpiController2 = MPIController( mode = 'CommWorld', - followers = [ 2 ] + followerProcessName = 'Follower2' ) process.sender2 = MPISender( diff --git a/HeterogeneousCore/MPICore/test/controller_multi_rr_cfg.py b/HeterogeneousCore/MPICore/test/controller_multi_rr_cfg.py index 02a44122fb64a..2ecc8c11d6a46 100644 --- a/HeterogeneousCore/MPICore/test/controller_multi_rr_cfg.py +++ b/HeterogeneousCore/MPICore/test/controller_multi_rr_cfg.py @@ -30,7 +30,7 @@ process.mpiController1 = MPIController( mode = 'CommWorld', - followers = [ 1, 2 ] + followerProcessName = 'Follower1' ) process.sender1 = MPISender( @@ -72,7 +72,7 @@ process.mpiController2 = MPIController( mode = 'CommWorld', - followers = [ 3, 4 ] + followerProcessName = 'Follower2' ) process.sender2 = MPISender( diff --git a/HeterogeneousCore/MPICore/test/controller_rr_cfg.py b/HeterogeneousCore/MPICore/test/controller_rr_cfg.py index 5c546ea53c8ec..921d08398e74b 100644 --- a/HeterogeneousCore/MPICore/test/controller_rr_cfg.py +++ b/HeterogeneousCore/MPICore/test/controller_rr_cfg.py @@ -22,7 +22,7 @@ process.mpiController = MPIController( mode = 'CommWorld', - followers = [ 1, 2 ] + followerProcessName = 'Follower' ) process.ids = cms.EDProducer("edmtest::EventIDProducer") diff --git a/HeterogeneousCore/MPICore/test/controller_soa_cfg.py b/HeterogeneousCore/MPICore/test/controller_soa_cfg.py index 06767f653440f..6a7f6dc65c430 100644 --- a/HeterogeneousCore/MPICore/test/controller_soa_cfg.py +++ b/HeterogeneousCore/MPICore/test/controller_soa_cfg.py @@ -24,7 +24,8 @@ from HeterogeneousCore.MPICore.modules import * process.mpiController = MPIController( - mode = 'CommWorld' + mode = 'CommWorld', + followerProcessName = 'MPIFollower' ) process.producePortableObjects = cms.EDProducer("TestAlpakaProducer@alpaka", diff --git a/HeterogeneousCore/MPICore/test/controller_too_many_streams_cfg.py b/HeterogeneousCore/MPICore/test/controller_too_many_streams_cfg.py new file mode 100644 index 0000000000000..64599eff74193 --- /dev/null +++ b/HeterogeneousCore/MPICore/test/controller_too_many_streams_cfg.py @@ -0,0 +1,72 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("MPIController") + + +# This test tests two different scenarios that can lead to errors. + +# 1. It tests that streams that remain uninitialized are handled correctly by +# the MPI modules. This situation is reproduced by setting numberOfStreams +# higher than the number of events. + +# 2. Due to the non-deterministic task scheduling in CMSSW, non-matching MPI +# modules might be scheduled in the local and remote process. This is ok, unless +# these modules contain blocking MPI calls. In this case, if they utilize all +# available working threads, the execution will deadlock. The probability of +# this to happen is close to 100% when numberOfStreams >> numberOfThreads + +process.options.numberOfThreads = 4 +process.options.numberOfStreams = 40 +# MPIController supports a single concurrent LuminosityBlock +process.options.numberOfConcurrentLuminosityBlocks = 1 +process.options.numberOfConcurrentRuns = 1 +process.options.wantSummary = False + +process.load("HeterogeneousCore.MPIServices.MPIService_cfi") + +from eventlist_cff import eventlist +process.source = cms.Source("EmptySourceFromEventIDs", + events = cms.untracked(eventlist) +) + +process.maxEvents.input = 10 + +from HeterogeneousCore.MPICore.modules import * + +process.mpiController = MPIController( + mode = 'CommWorld', + followerProcessName = 'MPIFollower' +) + +process.ids = cms.EDProducer("edmtest::EventIDProducer") + +process.initialcheck = cms.EDAnalyzer("edmtest::EventIDValidator", + source = cms.untracked.InputTag('ids') +) + +process.sender = MPISender( + upstream = "mpiController", + instance = 42, + products = [ "edmEventID_ids__*" ] +) + +process.othersender = MPISender( + upstream = "mpiController", + instance = 19, + products = [ "edmEventID_ids__*" ] +) + +process.receiver = MPIReceiver( + upstream = "othersender", # guarantees that this module will only run after "othersender" has run + instance = 99, + products = [ dict( + type = "edm::EventID", + label = "" + )] +) + +process.finalcheck = cms.EDAnalyzer("edmtest::EventIDValidator", + source = cms.untracked.InputTag('receiver') +) + +process.path = cms.Path(process.mpiController + process.ids + process.initialcheck + process.sender + process.othersender + process.receiver + process.finalcheck) diff --git a/HeterogeneousCore/MPICore/test/follower_cfg.py b/HeterogeneousCore/MPICore/test/follower_cfg.py index 7759fec54c71e..ffb8edb053e94 100644 --- a/HeterogeneousCore/MPICore/test/follower_cfg.py +++ b/HeterogeneousCore/MPICore/test/follower_cfg.py @@ -1,6 +1,6 @@ import FWCore.ParameterSet.Config as cms -process = cms.Process("MPIFollower") +process = cms.Process("Follower") process.options.numberOfThreads = 8 process.options.numberOfStreams = 8 @@ -14,7 +14,7 @@ process.source = MPISource( mode = 'CommWorld', - controller = 0 + controllerProcessName = 'MPIController' ) process.maxEvents.input = -1 diff --git a/HeterogeneousCore/MPICore/test/follower_complex_cfg.py b/HeterogeneousCore/MPICore/test/follower_complex_cfg.py index 48cca01f17d85..cd3f14f1086d4 100644 --- a/HeterogeneousCore/MPICore/test/follower_complex_cfg.py +++ b/HeterogeneousCore/MPICore/test/follower_complex_cfg.py @@ -10,7 +10,10 @@ from HeterogeneousCore.MPICore.modules import * -process.source = MPISource() +process.source = MPISource( + mode = 'CommWorld', + controllerProcessName = 'MPIController' +) process.maxEvents.input = -1 diff --git a/HeterogeneousCore/MPICore/test/follower_filters_cfg.py b/HeterogeneousCore/MPICore/test/follower_filters_cfg.py index aba6901f5ee85..8809b30357d68 100644 --- a/HeterogeneousCore/MPICore/test/follower_filters_cfg.py +++ b/HeterogeneousCore/MPICore/test/follower_filters_cfg.py @@ -12,7 +12,10 @@ from HeterogeneousCore.MPICore.modules import MPISource, MPIReceiver, MPISender -process.source = MPISource() +process.source = MPISource( + mode = 'CommWorld', + controllerProcessName = 'MPIController' +) process.maxEvents.input = -1 diff --git a/HeterogeneousCore/MPICore/test/follower_multi1_cfg.py b/HeterogeneousCore/MPICore/test/follower_multi1_cfg.py index 6ac4682bb2497..2a96cf507fe3e 100644 --- a/HeterogeneousCore/MPICore/test/follower_multi1_cfg.py +++ b/HeterogeneousCore/MPICore/test/follower_multi1_cfg.py @@ -1,6 +1,6 @@ import FWCore.ParameterSet.Config as cms -process = cms.Process("MPIFollower") +process = cms.Process("Follower1") process.options.numberOfThreads = 8 process.options.numberOfStreams = 8 @@ -14,7 +14,7 @@ process.source = MPISource( mode = 'CommWorld', - controller = 0 + controllerProcessName = 'MPIController' ) process.maxEvents.input = -1 diff --git a/HeterogeneousCore/MPICore/test/follower_multi2_cfg.py b/HeterogeneousCore/MPICore/test/follower_multi2_cfg.py index 8658bc9e387aa..252992ff85314 100644 --- a/HeterogeneousCore/MPICore/test/follower_multi2_cfg.py +++ b/HeterogeneousCore/MPICore/test/follower_multi2_cfg.py @@ -1,6 +1,6 @@ import FWCore.ParameterSet.Config as cms -process = cms.Process("MPIFollower") +process = cms.Process("Follower2") process.options.numberOfThreads = 8 process.options.numberOfStreams = 8 @@ -14,7 +14,7 @@ process.source = MPISource( mode = 'CommWorld', - controller = 0 + controllerProcessName = 'MPIController' ) process.maxEvents.input = -1 diff --git a/HeterogeneousCore/MPICore/test/follower_soa_cfg.py b/HeterogeneousCore/MPICore/test/follower_soa_cfg.py index 721c197c5827a..becb80736bc7e 100644 --- a/HeterogeneousCore/MPICore/test/follower_soa_cfg.py +++ b/HeterogeneousCore/MPICore/test/follower_soa_cfg.py @@ -10,7 +10,9 @@ from HeterogeneousCore.MPICore.modules import * -process.source = MPISource() +process.source = MPISource(mode = 'CommWorld', + controllerProcessName = 'MPIController' +) process.maxEvents.input = -1 diff --git a/HeterogeneousCore/MPICore/test/follower_too_many_streams_cfg.py b/HeterogeneousCore/MPICore/test/follower_too_many_streams_cfg.py new file mode 100644 index 0000000000000..f749553a33172 --- /dev/null +++ b/HeterogeneousCore/MPICore/test/follower_too_many_streams_cfg.py @@ -0,0 +1,54 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("MPIFollower") + +process.options.numberOfThreads = 4 +process.options.numberOfStreams = 40 +process.options.numberOfConcurrentLuminosityBlocks = 2 +process.options.numberOfConcurrentRuns = 2 +process.options.wantSummary = False + +process.load("HeterogeneousCore.MPIServices.MPIService_cfi") + +from HeterogeneousCore.MPICore.modules import * + +process.source = MPISource( + mode = 'CommWorld', + controllerProcessName = 'MPIController' +) + +process.maxEvents.input = -1 + +# very verbose +#from HeterogeneousCore.MPICore.mpiReporter_cfi import mpiReporter as mpiReporter_ +#process.reporter = mpiReporter_.clone() + +process.receiver = MPIReceiver( + upstream = "source", + instance = 42, + products = [ dict( + type = "edm::EventID", + label = "" + )] +) + +process.otherreceiver = MPIReceiver( + upstream = "source", + instance = 19, + products = [ dict( + type = "edm::EventID", + label = "" + )] +) + +process.sender = MPISender( + upstream = "otherreceiver", # guarantees that this module will only run after otherreceiver has run + instance = 99, + products = [ "edmEventID_otherreceiver__*" ] +) + +process.analyzer = cms.EDAnalyzer("edmtest::EventIDValidator", + source = cms.untracked.InputTag("receiver") +) + +process.path = cms.Path(process.receiver + process.analyzer + process.otherreceiver + process.sender) diff --git a/HeterogeneousCore/MPICore/test/testAutomaticSplitter.sh b/HeterogeneousCore/MPICore/test/testAutomaticSplitter.sh index a19333c6a606f..e7e83521deb79 100755 --- a/HeterogeneousCore/MPICore/test/testAutomaticSplitter.sh +++ b/HeterogeneousCore/MPICore/test/testAutomaticSplitter.sh @@ -2,13 +2,12 @@ # Shell script for testing the automattic config splitter for MPI in CMSSW -SPLITTER=$(realpath "$1") -WHOLE_CONFIG=$(realpath "$2") +WHOLE_CONFIG=$(realpath "$1") LOCAL_PATH="./autosplit_result/local_test_config.py" REMOTE_PATH="./autosplit_result/remote_test_config.py" -python3 "$SPLITTER" "$WHOLE_CONFIG" \ +edmMpiSplitConfig "$WHOLE_CONFIG" \ --remote-modules triggerEventProducer testReadTriggerResults rawDataBufferProducer testReadFEDRawDataCollection \ -l "$LOCAL_PATH" -r "$REMOTE_PATH" diff --git a/HeterogeneousCore/MPICore/test/testAutomaticSplitterMultiProcess.sh b/HeterogeneousCore/MPICore/test/testAutomaticSplitterMultiProcess.sh new file mode 100755 index 0000000000000..5a739579bd42d --- /dev/null +++ b/HeterogeneousCore/MPICore/test/testAutomaticSplitterMultiProcess.sh @@ -0,0 +1,16 @@ +#! /bin/bash + +# Shell script for testing the automattic config splitter for MPI in CMSSW + +WHOLE_CONFIG=$(realpath "$1") + +LOCAL_PATH="./autosplit_result/local_test_config.py" +REMOTE_PATH_1="./autosplit_result/remote_test_config_1.py" +REMOTE_PATH_2="./autosplit_result/remote_test_config_2.py" + +edmMpiSplitConfig "$WHOLE_CONFIG" -l "$LOCAL_PATH"\ + --remote-modules triggerEventProducer testReadTriggerResults -r "$REMOTE_PATH_1" :\ + --remote-modules rawDataBufferProducer testReadFEDRawDataCollection -r "$REMOTE_PATH_2" + + +"$CMSSW_BASE"/src/HeterogeneousCore/MPICore/test/testMPICommWorld.sh "$LOCAL_PATH" "$REMOTE_PATH_1" "$REMOTE_PATH_2" diff --git a/HeterogeneousCore/MPIServices/interface/MPIService.h b/HeterogeneousCore/MPIServices/interface/MPIService.h index eb2832c71410e..40e67aa4e3f4b 100644 --- a/HeterogeneousCore/MPIServices/interface/MPIService.h +++ b/HeterogeneousCore/MPIServices/interface/MPIService.h @@ -1,6 +1,10 @@ #ifndef HeterogeneousCore_MPIServices_interface_MPIService_h #define HeterogeneousCore_MPIServices_interface_MPIService_h +#include +#include +#include + #include "FWCore/ParameterSet/interface/ParameterSetfwd.h" class MPIService { @@ -10,6 +14,13 @@ class MPIService { static void fillDescriptions(edm::ConfigurationDescriptions& descriptions); static void required(); + std::vector getRanksByProcessName(std::string const& processName); + +private: + std::once_flag init_flag_; + std::vector all_process_hashes_; + + void exchangeProcessHashes_(); }; #endif // HeterogeneousCore_MPIServices_interface_MPIService_h diff --git a/HeterogeneousCore/MPIServices/src/MPIService.cc b/HeterogeneousCore/MPIServices/src/MPIService.cc index 5234d76e2d15e..fbc7b3a524956 100644 --- a/HeterogeneousCore/MPIServices/src/MPIService.cc +++ b/HeterogeneousCore/MPIServices/src/MPIService.cc @@ -1,9 +1,12 @@ // -*- C++ -*- #include +#include +#include #include #include +#include "FWCore/Framework/interface/TriggerNamesService.h" #include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" @@ -85,8 +88,11 @@ MPIService::MPIService(edm::ParameterSet const& config) { } MPIService::~MPIService() { - // terminate the MPI execution environment - MPI_Finalize(); + // Finalize MPI execution environment if the program finished correctly + // Otherwise let it proceed naturally (this way original error will be printed) + if (std::uncaught_exceptions() == 0) { + MPI_Finalize(); + } } void MPIService::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { @@ -107,3 +113,28 @@ process.load("HeterogeneousCore.MPIServices.MPIService_cfi") )"; } } + +// Ensure that processes exchange their hashes only once +void MPIService::exchangeProcessHashes_() { + std::call_once(init_flag_, [&]() { + int world_size; + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + all_process_hashes_.resize(world_size); + edm::Service tns; + std::string const& processName = tns->getProcessName(); + uint64_t this_process_hash = std::hash{}(processName); + MPI_Allgather(&this_process_hash, 1, MPI_UINT64_T, all_process_hashes_.data(), 1, MPI_UINT64_T, MPI_COMM_WORLD); + }); +} + +std::vector MPIService::getRanksByProcessName(std::string const& processName) { + this->exchangeProcessHashes_(); + std::vector process_indices; + uint64_t other_process_hash = std::hash{}(processName); + for (size_t i = 0; i < all_process_hashes_.size(); i++) { + if (all_process_hashes_[i] == other_process_hash) { + process_indices.push_back(i); + } + } + return process_indices; +}