Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 33 additions & 33 deletions HeterogeneousCore/MPICore/plugins/MPIController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "FWCore/Framework/interface/MakerMacros.h"
#include "FWCore/Framework/interface/Run.h"
#include "FWCore/Framework/interface/one/EDProducer.h"
#include "FWCore/Framework/interface/TriggerNamesService.h"
#include "FWCore/MessageLogger/interface/MessageLogger.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/EmptyGroupDescription.h"
Expand Down Expand Up @@ -99,34 +100,34 @@ MPIController::MPIController(edm::ParameterSet const& config)
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);

// Determine the rank of the other process.
auto followers = config.getUntrackedParameter<std::vector<int32_t>>("followers");
if (followers.empty()) {
// When there are only two proccesses, we can assume the ranks to be 0 and 1,
// and we can infer the other process rank from our own.
if (size == 2) {
followers = {1 - rank};
} else {
throw edm::Exception(edm::errors::Configuration)
<< "An empty list of remote processes is valid only where there are exactly two processes.";
}
edm::LogInfo("MPI") << "MPIController sees world size " << size;

// Determine the ranks of the follower processes.
auto follower_name = config.getParameter<std::string>("followerProcessName");
if (follower_name.empty()) {
throw edm::Exception(edm::errors::Configuration)
<< "ERROR: Follower process name cannot be empty. Aborting MPIController...";
}
if (followers.size() >= static_cast<size_t>(size)) {

edm::Service<edm::service::TriggerNamesService> tns;
std::string const& this_process_name = tns->getProcessName();
if (follower_name == this_process_name) {
throw edm::Exception(edm::errors::Configuration)
<< "The number of remote processes is invalid. Please specify at most " << size - 1 << "remote processes.";
<< "ERROR: controller and follower processes cannot have the same name. Aborting MPIController...";
}
std::vector<int32_t> invalid;
for (int follower : followers) {
if (follower < 0 or follower >= size) {
invalid.push_back(follower);
}

edm::Service<MPIService> mpiservice;
auto followers = mpiservice->getRanksByProcessName(follower_name);
if (followers.empty()) {
throw edm::Exception(edm::errors::Configuration)
<< "ERROR: No follower process with name " << follower_name << " found. Aborting...";
}
if (invalid.size() == 1) {

if (followers.size() == static_cast<size_t>(size)) {
throw edm::Exception(edm::errors::Configuration)
<< fmt::format("The remote process {} is invalid. Valid ranks are 0 to {}.", invalid.front(), size - 1);
} else if (invalid.size() > 1) {
throw edm::Exception(edm::errors::Configuration) << fmt::format(
"The remote processes {} are invalid. Valid ranks are 0 to {}.", fmt::join(invalid, ", "), size - 1);
<< "The number of found followers equals to the world size. "
<< "Possible reason could be process names' hash collision. "
<< "Please check process names in follower and controller. Aborting...";
}

for (int follower : followers) {
Expand Down Expand Up @@ -185,7 +186,9 @@ MPIController::~MPIController() {
// Disconnect the per-stream communicators.
for (auto& stream : streams_) {
// TODO move this to end stream
stream->reset();
if (stream) {
stream->reset();
}
}

// Close the intercommunicator.
Expand Down Expand Up @@ -352,15 +355,12 @@ void MPIController::fillDescriptions(edm::ConfigurationDescriptions& description
desc.ifValue(
edm::ParameterDescription<std::string>("mode", "CommWorld", false),
ModeDescription[kCommWorld] >>
edm::ParameterDescription<std::vector<int32_t>>(
"followers",
{},
false,
edm::Comment("Ranks of the remote \"follower\" processes.\n"
"When there are two or more follower processes, framework streams are associated to "
"each follower in a round-robin fashion.\n"
"When there is only one remote process, pass an empty list to autodetect its rank "
"based on the rank of the current process.")) or
edm::ParameterDescription<std::string>(
"followerProcessName",
"",
true,
edm::Comment("All processes with this process name should act as followers, "
"and should be configured with an MPISource that follows this controller.")) or
ModeDescription[kIntercommunicator] >> edm::ParameterDescription<std::string>("name", "server", false))
->setComment(
"Valid modes are CommWorld (use MPI_COMM_WORLD) and Intercommunicator (use an MPI name server to setup an "
Expand Down
147 changes: 83 additions & 64 deletions HeterogeneousCore/MPICore/plugins/MPIReceiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class MPIReceiver : public edm::stream::EDProducer<edm::ExternalWork> {

products_.emplace_back(std::move(entry));
}

received_wrappers_.resize(products_.size());
}

void acquire(edm::Event const& event, edm::EventSetup const&, edm::WaitingTaskWithArenaHolder holder) final {
Expand All @@ -75,17 +77,87 @@ class MPIReceiver : public edm::stream::EDProducer<edm::ExternalWork> {
edm::Service<edm::Async> as;
as->runAsync(
std::move(holder),
[this, token]() { token.channel()->receiveMetadata(instance_, received_meta_); },
[this, token]() {
token.channel()->receiveMetadata(instance_, received_meta_);
#ifdef EDM_ML_DEBUG
// dump the summary of metadata
received_meta_->debugPrintMetadataSummary();
#endif

// if filter was false before the sender, receive nothing
if (received_meta_->productCount() == -1) {
return;
}

std::unique_ptr<TBufferFile> serialized_buffer;
if (received_meta_->hasSerialized()) {
serialized_buffer =
token.channel()->receiveSerializedBuffer(instance_, received_meta_->serializedBufferSize());
#ifdef EDM_ML_DEBUG
{
edm::LogSystem msg("MPIReceiver");
msg << "Received serialised product:\n";
for (int i = 0; i < received_meta_->serializedBufferSize(); ++i) {
msg << "0x" << std::hex << std::setw(2) << std::setfill('0')
<< (unsigned int)(unsigned char)serialized_buffer->Buffer()[i] << (i % 16 == 15 ? '\n' : ' ');
}
}
#endif
}

for (size_t i = 0; i < products_.size(); ++i) {
auto product_meta = received_meta_->getNext();
if (product_meta.kind == ProductMetadata::Kind::Missing) {
continue;
}

auto const& entry = products_[i];

if (product_meta.kind == ProductMetadata::Kind::Serialized) {
std::unique_ptr<edm::WrapperBase> wrapper(
reinterpret_cast<edm::WrapperBase*>(entry.wrappedType.getClass()->New()));
assert(static_cast<int32_t>(serialized_buffer->Length() + product_meta.sizeMeta) <=
received_meta_->serializedBufferSize() &&
"serialized data buffer is shorter than expected");
entry.wrappedType.getClass()->Streamer(wrapper.get(), *serialized_buffer);
received_wrappers_[i] = std::move(wrapper);
}

else if (product_meta.kind == ProductMetadata::Kind::TrivialCopy) {
if (not enableTrivialSerialisation_) {
edm::LogError("MPIReceiver")
<< "Received a trivially-serialised product, but enableTrivialSerialisation is set to false in "
"this MPIReceiver. Please check that the MPISender and MPIReceiver have consistent settings.";
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
std::unique_ptr<ngt::SerialiserBase> serialiser =
ngt::SerialiserFactory::get()->tryToCreate(entry.type.typeInfo().name());
if (not serialiser) {
throw cms::Exception("SerializerError")
<< "Receiver could not retrieve its serializer when it was expected";
}
auto writer = serialiser->writer();
ngt::AnyBuffer buffer = writer->uninitialized_parameters(); // constructs buffer with typeid
assert(buffer.size_bytes() == product_meta.sizeMeta);
std::memcpy(buffer.data(), product_meta.trivialCopyOffset, product_meta.sizeMeta);
writer->initialize(buffer);
token.channel()->receiveInitializedTrivialCopy(instance_, *writer);
writer->finalize();
received_wrappers_[i] = writer->get();
}
}

if (received_meta_->hasSerialized()) {
assert(serialized_buffer->Length() == received_meta_->serializedBufferSize() &&
"serialized data buffer is not equal to the expected length");
}
},
[]() { return "Calling MPIReceiver::acquire()"; });
}

void produce(edm::Event& event, edm::EventSetup const&) final {
// read the MPIToken used to establish the communication channel
MPIToken token = event.get(upstream_);
#ifdef EDM_ML_DEBUG
// dump the summary of metadata
received_meta_->debugPrintMetadataSummary();
#endif

// if filter was false before the sender, receive nothing
if (received_meta_->productCount() == -1) {
Expand All @@ -96,68 +168,14 @@ class MPIReceiver : public edm::stream::EDProducer<edm::ExternalWork> {
event.emplace(pathStateToken_);
}

std::unique_ptr<TBufferFile> serialized_buffer;
if (received_meta_->hasSerialized()) {
serialized_buffer = token.channel()->receiveSerializedBuffer(instance_, received_meta_->serializedBufferSize());
#ifdef EDM_ML_DEBUG
{
edm::LogSystem msg("MPISender");
msg << "Received serialised product:\n";
for (int i = 0; i < received_meta_->serializedBufferSize(); ++i) {
msg << "0x" << std::hex << std::setw(2) << std::setfill('0')
<< (unsigned int)(unsigned char)serialized_buffer->Buffer()[i] << (i % 16 == 15 ? '\n' : ' ');
}
}
#endif
}

for (auto const& entry : products_) {
auto product_meta = received_meta_->getNext();
if (product_meta.kind == ProductMetadata::Kind::Missing) {
edm::LogWarning("MPIReceiver") << "Product " << entry.type.name() << " was not received.";
continue; // Skip products that weren't sent
}

else if (product_meta.kind == ProductMetadata::Kind::Serialized) {
std::unique_ptr<edm::WrapperBase> wrapper(
reinterpret_cast<edm::WrapperBase*>(entry.wrappedType.getClass()->New()));
assert(static_cast<int32_t>(serialized_buffer->Length() + product_meta.sizeMeta) <=
received_meta_->serializedBufferSize() &&
"serialized data buffer is shorter than expected");
entry.wrappedType.getClass()->Streamer(wrapper.get(), *serialized_buffer);
// put the data into the Event
event.put(entry.token, std::move(wrapper));
}

else if (product_meta.kind == ProductMetadata::Kind::TrivialCopy) {
if (not enableTrivialSerialisation_) {
edm::LogError("MPIReceiver")
<< "Received a trivially-serialised product, but enableTrivialSerialisation is set to false in this "
"MPIReceiver. Please check that the MPISender and MPIReceiver have consistent settings.";
MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
}
std::unique_ptr<ngt::SerialiserBase> serialiser =
ngt::SerialiserFactory::get()->tryToCreate(entry.type.typeInfo().name());
if (not serialiser) {
throw cms::Exception("SerializerError") << "Receiver could not retrieve its serializer when it was expected";
}
auto writer = serialiser->writer();
ngt::AnyBuffer buffer = writer->uninitialized_parameters(); // constructs buffer with typeid
assert(buffer.size_bytes() == product_meta.sizeMeta);
std::memcpy(buffer.data(), product_meta.trivialCopyOffset, product_meta.sizeMeta);
writer->initialize(buffer);
token.channel()->receiveInitializedTrivialCopy(instance_, *writer);
writer->finalize();
// put the data into the Event
event.put(entry.token, writer->get());
for (size_t i = 0; i < products_.size(); ++i) {
if (received_wrappers_[i]) {
event.put(products_[i].token, std::move(received_wrappers_[i]));
} else {
edm::LogWarning("MPIReceiver") << "Product " << products_[i].type.name() << " was not received.";
}
}

if (received_meta_->hasSerialized()) {
assert(serialized_buffer->Length() == received_meta_->serializedBufferSize() &&
"serialized data buffer is not equal to the expected length");
}

// write a shallow copy of the channel to the output, so other modules can consume it
// to indicate that they should run after this
event.emplace(token_, token);
Expand Down Expand Up @@ -207,6 +225,7 @@ class MPIReceiver : public edm::stream::EDProducer<edm::ExternalWork> {
bool activity_; // indicator whether the PathStateToken will be received by the module
edm::EDPutTokenT<edm::PathStateToken> pathStateToken_;
std::shared_ptr<ProductMetadataBuilder> received_meta_;
std::vector<std::unique_ptr<edm::WrapperBase>> received_wrappers_;
bool enableTrivialSerialisation_ = true;
};

Expand Down
66 changes: 28 additions & 38 deletions HeterogeneousCore/MPICore/plugins/MPISender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ class MPISender : public edm::stream::EDProducer<edm::ExternalWork> {
const MPIToken& token = event.get(upstream_);
// pass the number of products to estimate the right size for the metadata buffer
auto meta = std::make_shared<ProductMetadataBuilder>(products_.size());

// We use std::shared_ptr, instead of std::unique_ptr, so that readers can
// be captured by move by runAsync's lamnda. This is ultimately because this
// lambda is used to construct an std::function, which requires its callable
// to be copy-constructible.
std::vector<std::shared_ptr<const ngt::ReaderBase>> readers;
readers.reserve(products_.size());
size_t index = 0;
buffer_->Reset();
has_serialized_ = false;
Expand Down Expand Up @@ -145,6 +152,7 @@ class MPISender : public edm::stream::EDProducer<edm::ExternalWork> {
auto reader = serialiser->reader(*wrapper);
ngt::AnyBuffer buffer = reader->parameters();
meta->addTrivialCopy(buffer.data(), buffer.size_bytes());
readers.push_back(std::move(reader));
} else {
LogDebug("MPISender") << "No serializer for type \"" << entry.type.name() << "\" ("
<< entry.type.typeInfo().name() << "), using ROOT serialization";
Expand All @@ -169,50 +177,32 @@ class MPISender : public edm::stream::EDProducer<edm::ExternalWork> {
edm::Service<edm::Async> as;
as->runAsync(
std::move(holder),
[this, token, meta = std::move(meta)]() { token.channel()->sendMetadata(instance_, meta); },
[this, token, meta = std::move(meta), readers = std::move(readers)]() {
token.channel()->sendMetadata(instance_, meta);
if (has_serialized_) {
#ifdef EDM_ML_DEBUG
{
edm::LogSystem msg("MPISender");
msg << "Sending serialised product:\n";
for (int i = 0; i < buffer_->Length(); ++i) {
msg << "0x" << std::hex << std::setw(2) << std::setfill('0')
<< (unsigned int)(unsigned char)buffer_->Buffer()[i] << (i % 16 == 15 ? '\n' : ' ');
}
}
#endif
token.channel()->sendBuffer(buffer_->Buffer(), buffer_->Length(), instance_, EDM_MPI_SendSerializedProduct);
}
for (auto const& reader : readers) {
token.channel()->sendTrivialCopyProduct(instance_, *reader);
}
},
[]() { return "Calling MPISender::acquire()"; });
}

void produce(edm::Event& event, edm::EventSetup const&) final {
MPIToken token = event.get(upstream_);

if (!is_active_) {
event.emplace(token_, token);
return;
}

if (has_serialized_) {
#ifdef EDM_ML_DEBUG
{
edm::LogSystem msg("MPISender");
msg << "Sending serialised product:\n";
for (int i = 0; i < buffer_->Length(); ++i) {
msg << "0x" << std::hex << std::setw(2) << std::setfill('0')
<< (unsigned int)(unsigned char)buffer_->Buffer()[i] << (i % 16 == 15 ? '\n' : ' ');
}
}
#endif
token.channel()->sendBuffer(buffer_->Buffer(), buffer_->Length(), instance_, EDM_MPI_SendSerializedProduct);
}

for (auto const& entry : products_) {
edm::Handle<edm::WrapperBase> handle(entry.type.typeInfo());
event.getByToken(entry.token, handle);
edm::WrapperBase const* wrapper = handle.product();
// we don't send missing products
if (handle.isValid()) {
std::unique_ptr<ngt::SerialiserBase> serialiser;
if (enableTrivialSerialisation_) {
serialiser = ngt::SerialiserFactory::get()->tryToCreate(entry.type.typeInfo().name());
}
if (serialiser) {
auto reader = serialiser->reader(*wrapper);
token.channel()->sendTrivialCopyProduct(instance_, *reader);
}
}
}
// write a shallow copy of the channel to the output, so other modules can consume it
// to indicate that they should run after this
MPIToken token = event.get(upstream_);
event.emplace(token_, token);
}

Expand Down
Loading