Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 59 additions & 45 deletions PerfTools/AllocMonitor/plugins/ModuleEventAllocMonitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// system include files
#include <atomic>
#include <numeric>
#include <ranges>
#include <unordered_map>
#include <unordered_set>

// user include files
Expand All @@ -29,7 +31,6 @@
#include "FWCore/Utilities/interface/thread_safety_macros.h"

#include "monitor_file_utilities.h"
#include "mea_AllocMap.h"
#include "ThreadTracker.h"

#define DEBUGGER_BREAK
Expand All @@ -40,9 +41,14 @@ void break_on_unmatched_dealloc() {}
}
#endif
namespace {
using namespace edm::service::moduleEventAlloc;
using namespace edm::moduleAlloc::monitor_file_utilities;

using AllocMap = std::unordered_map<void const*, std::size_t>;
auto accumulateValues(AllocMap const& map) {
auto const values = map | std::views::transform([](auto const& element) { return element.second; });
return std::accumulate(values.begin(), values.end(), 0ULL);
}

struct ThreadAllocInfo {
AllocMap allocMap_;
std::vector<void const*> unmatched_;
Expand All @@ -55,11 +61,16 @@ namespace {
std::size_t numUnmatchedDeallocs_ = 0;

bool active_ = false;
void alloc(void const* iAddress, std::size_t iSize) { allocMap_.insert(iAddress, iSize); }
void alloc(void const* iAddress, std::size_t iSize) {
// Note that the iAddress might already be in the map if the
// allocation occurred within and the deallocation outside of
// the measurement region.
allocMap_[iAddress] = iSize;
}

void dealloc(void const* iAddress, std::size_t iSize) {
auto size = allocMap_.erase(iAddress);
if (size == 0) {
auto nErased = allocMap_.erase(iAddress);
if (nErased == 0) {
#if defined(DEBUGGER_BREAK)
break_on_unmatched_dealloc();
#endif
Expand All @@ -77,7 +88,7 @@ namespace {
totalUnmatchedDealloc_ = 0;
numMatchedDeallocs_ = 0;
numUnmatchedDeallocs_ = 0;
allocMap_.clear();
AllocMap{}.swap(allocMap_); // release memory
unmatched_.clear();
active_ = true;
}
Expand Down Expand Up @@ -293,7 +304,8 @@ class ModuleEventAllocMonitor {

iAR.watchPreModuleEvent([this](auto const& iStream, auto const& iMod) {
auto mod_id = module_id(iMod);
auto acquireInfo = [this, iStream, mod_id]() {
// Explicit return type to avoid copies of return value for sure
auto acquireInfo = [this, iStream, mod_id]() -> AllocMap const& {
//acquire might have started stuff
streamSync_[iStream.streamID().value()].load();
auto index = moduleIndex(mod_id);
Expand All @@ -308,16 +320,15 @@ class ModuleEventAllocMonitor {
auto mod_id = module_id(iMod);
auto info = filter_.stopOnThread(mod_id);
if (info) {
auto v = std::accumulate(info->allocMap_.allocationSizes().begin(), info->allocMap_.allocationSizes().end(), 0);
auto v = accumulateValues(info->allocMap_);
std::stringstream s;
s << "M " << mod_id << " " << iStream.streamID().value() << " " << info->totalMatchedDeallocSize_ << " "
<< info->numMatchedDeallocs_ << " " << info->totalUnmatchedDealloc_ << " " << info->numUnmatchedDeallocs_
<< " " << v << " " << info->allocMap_.allocationSizes().size() << "\n";
<< " " << v << " " << info->allocMap_.size() << "\n";
file->write(s.str());
auto index = moduleIndex(mod_id);
auto finishedOrder = streamNFinishedModules_[iStream.streamID().value()]++;
streamModuleFinishOrder_[finishedOrder + nModules_ * iStream.streamID().value()] =
nModules_ * iStream.streamID().value() + index;
streamModuleFinishOrder_[finishedOrder + nModules_ * iStream.streamID().value()] = index;
streamModuleAllocs_[nModules_ * iStream.streamID().value() + index] = info->allocMap_;
++streamSync_[iStream.streamID().value()];
}
Expand All @@ -335,27 +346,18 @@ class ModuleEventAllocMonitor {
auto mod_id = module_id(iMod);
auto info = filter_.stopOnThread(mod_id);
if (info) {
assert(info->allocMap_.allocationSizes().size() == info->allocMap_.size());
auto v = std::accumulate(info->allocMap_.allocationSizes().begin(), info->allocMap_.allocationSizes().end(), 0);
auto v = accumulateValues(info->allocMap_);
std::stringstream s;
s << "A " << mod_id << " " << iStream.streamID().value() << " " << info->totalMatchedDeallocSize_ << " "
<< info->numMatchedDeallocs_ << " " << info->totalUnmatchedDealloc_ << " " << info->numUnmatchedDeallocs_
<< " " << v << " " << info->allocMap_.allocationSizes().size() << "\n";
<< " " << v << " " << info->allocMap_.size() << "\n";
file->write(s.str());
auto index = mod_id;
if (not moduleIDs_.empty()) {
auto it = std::lower_bound(moduleIDs_.begin(), moduleIDs_.end(), mod_id);
index = it - moduleIDs_.begin();
}
{
auto const& alloc = streamModuleAllocs_[nModules_ * iStream.streamID().value() + index];
assert(alloc.size() == alloc.allocationSizes().size());
}
streamModuleAllocs_[nModules_ * iStream.streamID().value() + index] = info->allocMap_;
{
auto const& alloc = streamModuleAllocs_[nModules_ * iStream.streamID().value() + index];
assert(alloc.size() == alloc.allocationSizes().size());
}
++streamSync_[iStream.streamID().value()];
streamModuleInAcquire_[nModules_ * iStream.streamID().value() + index].store(false);
}
Expand All @@ -377,45 +379,57 @@ class ModuleEventAllocMonitor {
auto info = filter_.stopOnThread();
if (info) {
streamSync_[iStream.streamID().value()].load();
//search for associated allocs to deallocs in reverse order that modules finished
auto nRan = streamNFinishedModules_[iStream.streamID().value()].load();
auto itBegin =
std::reverse_iterator(streamModuleFinishOrder_.begin() + iStream.streamID().value() * nModules_ + nRan);
auto const itEnd = itBegin + nRan;

// value is size, index
std::unordered_map<void const*, std::pair<std::size_t, std::size_t>> combinedAllocMap;
{
auto const nRan = streamNFinishedModules_[iStream.streamID().value()].load();
assert(nRan <= nModules_);
auto const streamModuleOffset = iStream.streamID().value() * nModules_;
auto const itBegin = streamModuleFinishOrder_.begin() + streamModuleOffset;
auto const itEnd = itBegin + nRan;

std::size_t const total =
std::accumulate(itBegin, itEnd, 0U, [this, streamModuleOffset](unsigned int a, auto const index) {
return a + streamModuleAllocs_[index + streamModuleOffset].size();
});
combinedAllocMap.reserve(total);

for (auto it = itBegin; it != itEnd; ++it) {
auto& allocs = streamModuleAllocs_[*it + streamModuleOffset];
for (auto const [addr, size] : allocs) {
// need to keep the address -> (size, module)
// association of the last finished module as that is
// the most likely module (that we can figure out) that
// allocated the memory of a data product that was
// destructed here
combinedAllocMap[addr] = std::pair(size, *it);
}
AllocMap{}.swap(allocs);
}
}
streamNFinishedModules_[iStream.streamID().value()].store(0);
{
std::vector<std::size_t> moduleDeallocSize(nModules_);
std::vector<unsigned int> moduleDeallocCount(nModules_);
for (auto& address : info->unmatched_) {
decltype(streamModuleAllocs_[0].findOffset(address)) offset = 0;
auto found = std::find_if(itBegin, itEnd, [&address, &offset, this](auto const& index) {
auto const& elem = streamModuleAllocs_[index];
return elem.size() != 0 and (offset = elem.findOffset(address)) != elem.size();
});
if (found != itEnd) {
auto index = *found - nModules_ * iStream.streamID().value();
moduleDeallocSize[index] += streamModuleAllocs_[*found].allocationSizes()[offset];
for (auto const& address : info->unmatched_) {
auto const found = combinedAllocMap.find(address);
if (found != combinedAllocMap.end()) {
auto const index = found->second.second;
moduleDeallocSize[index] += found->second.first;
moduleDeallocCount[index] += 1;
}
}
for (unsigned int index = 0; index < nModules_; ++index) {
if (moduleDeallocCount[index] != 0) {
auto id = moduleIDs_.empty() ? index : moduleIDs_[index];
auto const id = moduleIDs_.empty() ? index : moduleIDs_[index];
std::stringstream s;
s << "D " << id << " " << iStream.streamID().value() << " " << moduleDeallocSize[index] << " "
<< moduleDeallocCount[index] << "\n";
file->write(s.str());
}
}
}

{
auto itBegin = streamModuleAllocs_.begin() + nModules_ * iStream.streamID().value();
auto itEnd = itBegin + nModules_;
for (auto it = itBegin; it != itEnd; ++it) {
it->clear();
}
}
}
});
}
Expand Down
98 changes: 0 additions & 98 deletions PerfTools/AllocMonitor/plugins/mea_AllocMap.h

This file was deleted.

1 change: 1 addition & 0 deletions PerfTools/AllocMonitor/test/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<flags CXXFLAGS="-O0"/>
</bin>
<test name="TestPerfToolsModuleAllocMonitor" command="runModuleAlloc.sh"/>
<test name="TestPerfToolsModuleEventAllocMonitor" command="runModuleEventAlloc.sh"/>

<!-- can't be part of the catch2 tests above because of ASAN exclusion -->
<bin file="test_intrusiveAllocMonitor.cc" name="testIntrusiveAllocMonitor">
Expand Down
70 changes: 70 additions & 0 deletions PerfTools/AllocMonitor/test/moduleEventAlloc_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import FWCore.ParameterSet.Config as cms

import argparse
import sys

parser = argparse.ArgumentParser(prog=sys.argv[0], description='Test ModuleEventAllocMonitor service.')
parser.add_argument("--output", default="moduleEventAlloc.log", help="Output log file")
parser.add_argument("--skipEvents", action="store_true", help="Test skipping events")
parser.add_argument("--edmodule", action="store_true", help="Show only specific ed module")
parser.add_argument("--maxEvents", type=int, default=3, help="Specify maxEvents")
parser.add_argument("--threads", type=int, default=1, help="Set number of threads and streams")
args = parser.parse_args()


process = cms.Process("TEST")

process.source = cms.Source("EmptySource")

process.maxEvents.input = args.maxEvents
if args.threads > 1:
process.options.numberOfThreads = args.threads

process.thing = cms.EDProducer("ThingProducer",
offsetDelta = cms.int32(1)
)

process.OtherThing = cms.EDProducer("OtherThingProducer", thingTag = cms.InputTag("thing"))

process.thingProducer = cms.EDProducer("ThingProducer",
offsetDelta = cms.int32(100),
nThings = cms.int32(50)
)
process.get = cms.EDAnalyzer("edmtest::ThingAnalyzer")

process.Int = cms.EDProducer(
"IntProducer",
ivalue = cms.int32(67)
)
process.add_(cms.Service("WaitingService"))
process.acquireInt = cms.EDProducer(
"AcquireIntStreamProducer",
tags = cms.VInputTag(),
produceTag = cms.InputTag("Int")
)
process.getInt = cms.EDAnalyzer(
"MultipleIntsAnalyzer",
getFromModules = cms.untracked.VInputTag("acquireInt")
)

process.out = cms.OutputModule("AsciiOutputModule")


process.ep = cms.EndPath(
process.out +
process.get +
process.getInt,
cms.Task(process.thing,
process.OtherThing,
process.thingProducer,
process.Int,
process.acquireInt)
)

#process.add_(cms.Service("Tracer"))
process.add_(cms.Service("ModuleEventAllocMonitor", fileName = cms.untracked.string(args.output)))
if args.skipEvents:
process.ModuleEventAllocMonitor.nEventsToSkip = cms.untracked.uint32(2)

if args.edmodule:
process.ModuleEventAllocMonitor.moduleNames = cms.untracked.vstring(["thingProducer"])
Loading