Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion etc/WMAgentConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@
config.WorkQueueManager.queueParams["RowsPerSlice"] = 2500
# maximum number of available elements rows to be evaluated when acquiring GQ to LQ work
config.WorkQueueManager.queueParams["MaxRowsPerCycle"] = 50000
config.WorkQueueManager.queueParams["rucioAccount"] = "wmcore_transferor" # account for data locks
# Rucio accounts for input data locks and secondary data locks
config.WorkQueueManager.queueParams["rucioAccount"] = "wmcore_transferor"
config.WorkQueueManager.queueParams["rucioAccountPU"] = "wmcore_pileup"


config.component_("DBS3Upload")
Expand Down
6 changes: 3 additions & 3 deletions src/python/WMCore/WMRuntime/Monitors/PerformanceMonitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ def __init__(self):

self.pid = None
self.uid = os.getuid()
self.monitorBase = "ps -p %i -o pid,ppid,rss,pcpu,pmem,cmd -ww | grep %i"
self.pssMemoryCommand = "awk '/^Pss/ {pss += $2} END {print pss}' /proc/%i/smaps"
self.monitorBase = "pid=%i; ps -p $pid -o pid,ppid,rss,pcpu,pmem,cmd -ww | grep $pid"
self.pssMemoryCommand = "pid=%i; awk '/^Pss:/ {print $2}' /proc/$pid/smaps_rollup 2>/dev/null || awk '/^Pss:/ {pss += $2} END {print pss}' /proc/$pid/smaps"
self.monitorCommand = None
self.currentStepSpace = None
self.currentStepName = None
Expand Down Expand Up @@ -210,7 +210,7 @@ def periodicUpdate(self):

# Now we run the ps monitor command and collate the data
# Gathers RSS, %CPU and %MEM statistics from ps
ps_cmd = self.monitorBase % (stepPID, stepPID)
ps_cmd = self.monitorBase % (stepPID)
stdout, _stderr, _retcode = subprocessAlgos.runCommand(ps_cmd)

ps_output = stdout.split()
Expand Down
2 changes: 1 addition & 1 deletion src/python/WMCore/WMSpec/Steps/Fetchers/PileupFetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(self):
"""
super(PileupFetcher, self).__init__()
# FIXME: find a way to pass the Rucio account name to this fetcher module
self.rucioAcct = "wmcore_transferor"
self.rucioAcct = "wmcore_pileup"
self.rucio = None

def _queryDbsAndGetPileupConfig(self, stepHelper, dbsReader):
Expand Down
15 changes: 9 additions & 6 deletions src/python/WMCore/WorkQueue/DataLocationMapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(self, logger=None, **kwargs):
self.params.setdefault('incompleteBlocks', False)
self.params.setdefault('requireBlocksSubscribed', True)
self.params.setdefault('rucioAccount', "wmcore_transferor")
self.params.setdefault('rucioAccountPU', "wmcore_pileup")

validLocationFrom = ('subscription', 'location')
if self.params['locationFrom'] not in validLocationFrom:
Expand All @@ -67,34 +68,36 @@ def __init__(self, logger=None, **kwargs):
# the same object is not shared amongst multiple threads
self.dbses = {}

def __call__(self, dataItems):
def __call__(self, dataItems, rucioAcct=None):
rucioAcct = rucioAcct or self.params['rucioAccount']
result = {}

dataByDbs = self.organiseByDbs(dataItems)

for dbs, dataItems in viewitems(dataByDbs):
# if global use Rucio, else use dbs
if isGlobalDBS(dbs):
output = self.locationsFromRucio(dataItems)
output = self.locationsFromRucio(dataItems, rucioAcct)
else:
output = self.locationsFromDBS(dbs, dataItems)
result[dbs] = output

return result

def locationsFromRucio(self, dataItems):
def locationsFromRucio(self, dataItems, rucioAcct):
"""
Get data location from Rucio. Location is mapped to the actual
sites associated with them, so PSNs are actually returned
:param dataItems: list of datasets/blocks names
:param rucioAcct: string with the Rucio account name to check the rules against
:return: dictionary key'ed by the dataset/block, with a list of PSNs as value
"""
result = defaultdict(set)
self.logger.info("Fetching location from Rucio...")
self.logger.info("Fetching location from Rucio for account: %s", rucioAcct)
for dataItem in dataItems:
try:
dataLocations = self.rucio.getDataLockedAndAvailable(name=dataItem,
account=self.params['rucioAccount'])
account=rucioAcct)
# resolve the PNNs into PSNs
result[dataItem] = self.cric.PNNstoPSNs(dataLocations)
except Exception as ex:
Expand Down Expand Up @@ -207,7 +210,7 @@ def updatePileupLocation(self):
dataItems = self.backend.getActivePileupData()

# fullResync incorrect with multiple dbs's - fix!!!
dataLocations = DataLocationMapper.__call__(self, dataItems)
dataLocations = DataLocationMapper.__call__(self, dataItems, self.params['rucioAccountPU'])
self.logger.info("Found %d unique pileup data to update location", len(dataItems))

# Given that there might be multiple data items to be updated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(self, **args):
self.cric = CRIC()
# FIXME: for the moment, it will always use the default value
self.rucioAcct = self.args.get("rucioAcct", "wmcore_transferor")
self.rucioAcctPU = self.args.get("rucioAcctPU", "wmcore_pileup")
if not self.rucio:
self.rucio = Rucio(self.rucioAcct, configDict={'logger': self.logger})

Expand Down Expand Up @@ -256,7 +257,7 @@ def getDatasetLocations(self, datasets):
for dbsUrl in datasets:
for datasetPath in datasets[dbsUrl]:
locations = self.rucio.getDataLockedAndAvailable(name=datasetPath,
account=self.rucioAcct)
account=self.rucioAcctPU)
result[datasetPath] = self.cric.PNNstoPSNs(locations)
return result

Expand Down