From 534c99ca1a490da5f11080b24f09d83ab81fa46b Mon Sep 17 00:00:00 2001 From: Duong Date: Thu, 15 May 2025 15:49:27 -0400 Subject: [PATCH 1/5] default stageout commands for protocols --- src/python/WMCore/Storage/RucioFileCatalog.py | 56 ++++++++++++++++++- src/python/WMCore/Storage/SiteLocalConfig.py | 11 ++-- .../WMCore_t/Storage_t/SiteLocalConfig_t.py | 2 +- 3 files changed, 62 insertions(+), 7 deletions(-) diff --git a/src/python/WMCore/Storage/RucioFileCatalog.py b/src/python/WMCore/Storage/RucioFileCatalog.py index c1d0cc7bb2..81fc236def 100644 --- a/src/python/WMCore/Storage/RucioFileCatalog.py +++ b/src/python/WMCore/Storage/RucioFileCatalog.py @@ -16,7 +16,7 @@ import re from builtins import str, range - +from urllib.parse import urlparse class RucioFileCatalog(dict): """ @@ -245,3 +245,57 @@ def rseName(currentSite, currentSubsite, storageSite, volume): rse = jsElement['rse'] break return rse + +def get_default_cmd(currentSite, currentSubsite, storageSite, volume, protocolName): + """ + Return default command for a protocol, for example: + https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json?ref_type=heads#L17 + :currentSite is the site where jobs are executing + :currentSubsite is the sub site if jobs are running here + :storageSite is the site for storage + :volume is the volume name, for example: + https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json?ref_type=heads#L3 + :protocolName is the 'protocol' in site-local-config.xml under stageOut + """ + + storageJsonName = storageJsonPath(currentSite, currentSubsite, storageSite) + try: + with open(storageJsonName, encoding="utf-8") as jsonFile: + jsElements = json.load(jsonFile) + except Exception as ex: + msg = "RucioFileCatalog.py:getDefaultCmd() Error reading storage.json: %s\n" % storageJsonName + msg += str(ex) + raise RuntimeError(msg) + + url_scheme = '' + + for entry in jsElements: + if entry.get('site') != storageSite or entry.get('volume') != volume: + continue + + matchProto = '' + for proto in entry.get("protocols", []): + if proto.get("protocol") != protocolName: + continue + + matchProto = proto + # First try rules + rules = proto.get("rules", []) + if rules: + pfn = rules[0].get("pfn", "") + url_scheme = urlparse(pfn).scheme + + # If no rules try 'prefix' + if not url_scheme and "prefix" in proto: + url_scheme = urlparse(proto["prefix"]).scheme + + # Map scheme to command + return { + 'root': 'xrdcp', + 'davs': 'gfla2', + 'file': 'cp' + }.get(url_scheme, 'gfal2') + + break # matching site+volume found and processed + + return None #no matched protocol so command is None \ No newline at end of file diff --git a/src/python/WMCore/Storage/SiteLocalConfig.py b/src/python/WMCore/Storage/SiteLocalConfig.py index db89781823..d7eb033302 100644 --- a/src/python/WMCore/Storage/SiteLocalConfig.py +++ b/src/python/WMCore/Storage/SiteLocalConfig.py @@ -13,7 +13,7 @@ from builtins import next, str, object from WMCore.Algorithms.ParseXMLFile import xmlFileToNode -from WMCore.Storage.RucioFileCatalog import rseName +from WMCore.Storage.RucioFileCatalog import rseName,get_default_cmd def loadSiteLocalConfig(): @@ -300,14 +300,15 @@ def processStageOut(): localReport = {} localReport['storageSite'] = aStorageSite - localReport['command'] = subnode.attrs.get('command', None) - # use default command='gfal2' when 'command' is not specified - if localReport['command'] is None: - localReport['command'] = 'gfal2' + #Do not support 'command' from site-local-config.xml anymore + #get command based on rule PFN or prefix of the protocol for example root://, davs:// file:// + if aProtocol is None: localReport['command'] = None + else: localReport['command'] = get_default_cmd(report["siteName"], subSiteName, aStorageSite, aVolume, aProtocol) localReport['option'] = subnode.attrs.get('option', None) localReport['volume'] = aVolume localReport['protocol'] = aProtocol localReport['phedex-node'] = rseName(report["siteName"], subSiteName, aStorageSite, aVolume) + report['stageOuts'].append(localReport) diff --git a/test/python/WMCore_t/Storage_t/SiteLocalConfig_t.py b/test/python/WMCore_t/Storage_t/SiteLocalConfig_t.py index 7294def26b..eab299408a 100644 --- a/test/python/WMCore_t/Storage_t/SiteLocalConfig_t.py +++ b/test/python/WMCore_t/Storage_t/SiteLocalConfig_t.py @@ -73,7 +73,7 @@ def testFNALSiteLocalConfig(self): "Error: Protocol is not correct." assert mySiteConfig.stageOuts[0]["option"] == "-p", \ "Error: option is not correct." - # assert False + #assert False return def testLoadingConfigFromOverridenEnvVarriable(self): From e820cf48e6d40026b5834414967a592240a8f80f Mon Sep 17 00:00:00 2001 From: Duong Date: Tue, 20 May 2025 12:08:27 -0400 Subject: [PATCH 2/5] fix pylint --- src/python/WMCore/Storage/RucioFileCatalog.py | 33 ++++++++++--------- src/python/WMCore/Storage/SiteLocalConfig.py | 7 ++-- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/python/WMCore/Storage/RucioFileCatalog.py b/src/python/WMCore/Storage/RucioFileCatalog.py index 81fc236def..bea8292083 100644 --- a/src/python/WMCore/Storage/RucioFileCatalog.py +++ b/src/python/WMCore/Storage/RucioFileCatalog.py @@ -37,7 +37,7 @@ def addMapping(self, protocol, match, result, """ Add an lfn to pfn mapping to this instance :param protocol: name of protocol, for example XRootD - :param match: regular expression string to perform path matching + :param match: regular expression string to perform path matching :param result: result of the path matching :param chain: name of chained protocol :param mapping_type: type of path matching @@ -52,11 +52,14 @@ def addMapping(self, protocol, match, result, def _doMatch(self, protocol, path, style, caller): """ - Generalised way of building up the mappings. + Generalised way of building up the mappings. :param protocol: the name of a protocol, for example XRootD :path: a LFN path, for example /store/abc/xyz.root :style: type of conversion. lfn-to-pfn is to convert LFN to PFN and pfn-to-pfn is for PFN to LFN - :caller is the method from there this method was called. It's used for resolving chained rules. When a rule is chained, the path translation of protocol defined in "chain" attribute should be applied first before the one specified in this rule. Here is an example. In this storage description, https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json, the rule of protocol WebDAV of volume KIT_MSS is chained to the protocol pnfs of the same volume. The path translation of WebDAV rule must be done by applying the path translation of pnfs rule first before its own path translation is applied. + :caller: is the method from there this method was called. It's used for resolving chained rules. + When a rule is chained, the path translation of protocol defined in "chain" attribute should be applied first before the one specified in this rule. Here is an example. + In this storage description, https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json, the rule of protocol WebDAV of volume KIT_MSS is chained to the protocol pnfs of the same volume. + The path translation of WebDAV rule must be done by applying the path translation of pnfs rule first before its own path translation is applied. """ for mapping in self[style]: if mapping['protocol'] != protocol: @@ -136,7 +139,7 @@ def storageJsonPath(currentSite, currentSubsite, storageSite): # return path override if it is defined and exists siteConfigPathOverride = os.getenv('WMAGENT_RUCIO_CATALOG_OVERRIDE', None) if siteConfigPathOverride and os.path.exists(siteConfigPathOverride): - return siteConfigPathOverride + return siteConfigPathOverride # get site config siteConfigPath = os.getenv('SITECONFIG_PATH', None) @@ -182,7 +185,7 @@ def readRFC(filename, storageSite, volume, protocol): except Exception as ex: msg = "Error reading storage description file: %s\n" % filename msg += str(ex) - raise RuntimeError(msg) + raise RuntimeError(msg) from ex # now loop over elements, select the one matched with inputs (storageSite, volume, protocol) and fill lfn-to-pfn for jsElement in jsElements: # check to see if the storageSite and volume matchs with "site" and "volume" in storage.json @@ -239,7 +242,7 @@ def rseName(currentSite, currentSubsite, storageSite, volume): except Exception as ex: msg = "RucioFileCatalog.py:rseName() Error reading storage.json: %s\n" % storageJsonName msg += str(ex) - raise RuntimeError(msg) + raise RuntimeError(msg) from ex for jsElement in jsElements: if jsElement['site'] == storageSite and jsElement['volume'] == volume: rse = jsElement['rse'] @@ -265,20 +268,18 @@ def get_default_cmd(currentSite, currentSubsite, storageSite, volume, protocolNa except Exception as ex: msg = "RucioFileCatalog.py:getDefaultCmd() Error reading storage.json: %s\n" % storageJsonName msg += str(ex) - raise RuntimeError(msg) - + raise RuntimeError(msg) from ex + url_scheme = '' - + for entry in jsElements: if entry.get('site') != storageSite or entry.get('volume') != volume: continue - - matchProto = '' + for proto in entry.get("protocols", []): if proto.get("protocol") != protocolName: continue - matchProto = proto # First try rules rules = proto.get("rules", []) if rules: @@ -288,14 +289,14 @@ def get_default_cmd(currentSite, currentSubsite, storageSite, volume, protocolNa # If no rules try 'prefix' if not url_scheme and "prefix" in proto: url_scheme = urlparse(proto["prefix"]).scheme - + # Map scheme to command return { 'root': 'xrdcp', 'davs': 'gfla2', 'file': 'cp' }.get(url_scheme, 'gfal2') - + break # matching site+volume found and processed - - return None #no matched protocol so command is None \ No newline at end of file + + return None #no matched protocol so command is None diff --git a/src/python/WMCore/Storage/SiteLocalConfig.py b/src/python/WMCore/Storage/SiteLocalConfig.py index d7eb033302..ae7f9a3095 100644 --- a/src/python/WMCore/Storage/SiteLocalConfig.py +++ b/src/python/WMCore/Storage/SiteLocalConfig.py @@ -13,7 +13,7 @@ from builtins import next, str, object from WMCore.Algorithms.ParseXMLFile import xmlFileToNode -from WMCore.Storage.RucioFileCatalog import rseName,get_default_cmd +from WMCore.Storage.RucioFileCatalog import rseName, get_default_cmd def loadSiteLocalConfig(): @@ -73,7 +73,6 @@ class SiteConfigError(Exception): Exception class placeholder """ - pass class SiteLocalConfig(object): @@ -138,7 +137,7 @@ def read(self): except Exception as ex: msg = "Unable to read SiteConfigFile: %s\n" % self.siteConfigFile msg += str(ex) - raise SiteConfigError(msg) + raise SiteConfigError(msg) from ex nodeResult = nodeReader(node) @@ -308,7 +307,7 @@ def processStageOut(): localReport['volume'] = aVolume localReport['protocol'] = aProtocol localReport['phedex-node'] = rseName(report["siteName"], subSiteName, aStorageSite, aVolume) - + report['stageOuts'].append(localReport) From 68bd083ef7b67948856a5ef82979df881129eaa5 Mon Sep 17 00:00:00 2001 From: Duong Date: Tue, 20 May 2025 14:47:34 -0400 Subject: [PATCH 3/5] fix typo, gfla2 (should be gfal2) --- src/python/WMCore/Storage/RucioFileCatalog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/WMCore/Storage/RucioFileCatalog.py b/src/python/WMCore/Storage/RucioFileCatalog.py index bea8292083..f79df065fe 100644 --- a/src/python/WMCore/Storage/RucioFileCatalog.py +++ b/src/python/WMCore/Storage/RucioFileCatalog.py @@ -293,7 +293,7 @@ def get_default_cmd(currentSite, currentSubsite, storageSite, volume, protocolNa # Map scheme to command return { 'root': 'xrdcp', - 'davs': 'gfla2', + 'davs': 'gfal2', 'file': 'cp' }.get(url_scheme, 'gfal2') From 13377655b61cf2fe61f8109e98cc31a0dd0dc27d Mon Sep 17 00:00:00 2001 From: Duong Date: Mon, 22 Sep 2025 18:11:08 -0400 Subject: [PATCH 4/5] Addressing Alan comments --- src/python/WMCore/Storage/RucioFileCatalog.py | 38 +++++++++++++------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/src/python/WMCore/Storage/RucioFileCatalog.py b/src/python/WMCore/Storage/RucioFileCatalog.py index f79df065fe..53a1be7e64 100644 --- a/src/python/WMCore/Storage/RucioFileCatalog.py +++ b/src/python/WMCore/Storage/RucioFileCatalog.py @@ -18,6 +18,14 @@ from builtins import str, range from urllib.parse import urlparse +import logging + +STAGEOUT_PROTOCOL_MAP = { + 'root': 'xrdcp', + 'davs': 'gfal2', + 'file': 'cp' +} + class RucioFileCatalog(dict): """ _RucioFileCatalog_ @@ -56,7 +64,7 @@ def _doMatch(self, protocol, path, style, caller): :param protocol: the name of a protocol, for example XRootD :path: a LFN path, for example /store/abc/xyz.root :style: type of conversion. lfn-to-pfn is to convert LFN to PFN and pfn-to-pfn is for PFN to LFN - :caller: is the method from there this method was called. It's used for resolving chained rules. + :caller: is the method where this method was called. It's used for resolving chained rules. When a rule is chained, the path translation of protocol defined in "chain" attribute should be applied first before the one specified in this rule. Here is an example. In this storage description, https://gitlab.cern.ch/SITECONF/T1_DE_KIT/-/blob/master/storage.json, the rule of protocol WebDAV of volume KIT_MSS is chained to the protocol pnfs of the same volume. The path translation of WebDAV rule must be done by applying the path translation of pnfs rule first before its own path translation is applied. @@ -266,8 +274,7 @@ def get_default_cmd(currentSite, currentSubsite, storageSite, volume, protocolNa with open(storageJsonName, encoding="utf-8") as jsonFile: jsElements = json.load(jsonFile) except Exception as ex: - msg = "RucioFileCatalog.py:getDefaultCmd() Error reading storage.json: %s\n" % storageJsonName - msg += str(ex) + msg = f"Failed to open storage.json: {storageJsonName}\n. Error: {str(ex)}" raise RuntimeError(msg) from ex url_scheme = '' @@ -283,6 +290,7 @@ def get_default_cmd(currentSite, currentSubsite, storageSite, volume, protocolNa # First try rules rules = proto.get("rules", []) if rules: + #rules are just different matching patterns of the same protocol, use the first rule to get command from its pfn is enough pfn = rules[0].get("pfn", "") url_scheme = urlparse(pfn).scheme @@ -291,12 +299,18 @@ def get_default_cmd(currentSite, currentSubsite, storageSite, volume, protocolNa url_scheme = urlparse(proto["prefix"]).scheme # Map scheme to command - return { - 'root': 'xrdcp', - 'davs': 'gfal2', - 'file': 'cp' - }.get(url_scheme, 'gfal2') - - break # matching site+volume found and processed - - return None #no matched protocol so command is None + cmd = STAGEOUT_PROTOCOL_MAP.get(url_scheme, 'gfal2') + if cmd == 'gfal2' and url_scheme not in STAGEOUT_PROTOCOL_MAP: + if rules: + logging.log(logging.WARNING, "RucioFileCatalog.get_default_cmd: Can not get the command from rules of protocol %s of %s site and %s volume. Default command gfal2 is used. Rule: %s", protocolName, storageSite, volume, rules[0]) + else: + logging.log(logging.WARNING, "RucioFileCatalog.get_default_cmd: Can not get the command from prefix of protocol %s of %s site and %s volume. Default command gfal2 is used. Prefix: %s", protocolName, storageSite, volume, proto.get("prefix", None)) + return cmd + + # no matched protocol + logging.log(logging.ERROR, "RucioFileCatalog.get_default_cmd: No matched %s protocol for %s volume of %s storage site in the storage json %s found", protocolName, volume, storageSite, storageJsonName) + return None + + # no matched storage site or volume + logging.log(logging.ERROR, "RucioFileCatalog.get_default_cmd: No matched %s storage site or %s volume in the storage json %s found", storageSite, volume, storageJsonName) + return None \ No newline at end of file From aa80f07af7afd25105d6b084bbb113ac00d42e78 Mon Sep 17 00:00:00 2001 From: Duong Date: Mon, 22 Sep 2025 18:48:31 -0400 Subject: [PATCH 5/5] add unittest --- .../WMCore_t/Storage_t/SiteLocalConfig_t.py | 55 ++++++++++++++++++- ...te-local-config-testStageOut-T1_DE_KIT.xml | 6 +- .../Storage_t/T1_IT_CNAF_SiteLocalConfig.xml | 50 +++++++++++++++++ .../Storage_t/T1_US_FNAL_SiteLocalConfig.xml | 3 +- 4 files changed, 111 insertions(+), 3 deletions(-) create mode 100644 test/python/WMCore_t/Storage_t/T1_IT_CNAF_SiteLocalConfig.xml diff --git a/test/python/WMCore_t/Storage_t/SiteLocalConfig_t.py b/test/python/WMCore_t/Storage_t/SiteLocalConfig_t.py index eab299408a..8b7bee4cf4 100644 --- a/test/python/WMCore_t/Storage_t/SiteLocalConfig_t.py +++ b/test/python/WMCore_t/Storage_t/SiteLocalConfig_t.py @@ -33,6 +33,7 @@ def testFNALSiteLocalConfig(self): fnalConfigFileName = os.path.join(getTestBase(), "WMCore_t/Storage_t", "T1_US_FNAL_SiteLocalConfig.xml") + mySiteConfig = SiteLocalConfig(fnalConfigFileName) assert mySiteConfig.siteName == "T1_US_FNAL", "Error: Wrong site name." @@ -66,13 +67,65 @@ def testFNALSiteLocalConfig(self): assert len(goldenProxies) == 0, \ "Error: Missing proxy servers." - + + #test second stage out method with command extraction from rules assert mySiteConfig.stageOuts[0]["command"] == "xrdcp", \ "Error: Wrong stage out command." assert mySiteConfig.stageOuts[0]["protocol"] == "XRootD", \ "Error: Protocol is not correct." assert mySiteConfig.stageOuts[0]["option"] == "-p", \ "Error: option is not correct." + + #assert False + return + + def testGetDefaultStageOutCmd(self): + """ + _testGetDefaultStageOutCmd_ + + Verify that the default stage out command is returned correctly for given protocol. + """ + os.environ['SITECONFIG_PATH'] = os.path.join(getTestBase(), + 'WMCore_t/Storage_t', + 'T1_DE_KIT') + configFileName = os.path.join(getTestBase(), + "WMCore_t/Storage_t", + "T1_DE_KIT/JobConfig/site-local-config-testStageOut-T1_DE_KIT.xml") + + mySiteConfig = SiteLocalConfig(configFileName) + #test the first stage out method with command extraction from prefix + assert mySiteConfig.stageOuts[0]["command"] == "gfal2", \ + "Error: Wrong stage out command." + assert mySiteConfig.stageOuts[0]["protocol"] == "WebDAV", \ + "Error: Protocol is not correct." + #test the second stage out method with command extraction from rules + assert mySiteConfig.stageOuts[1]["command"] == "gfal2", \ + "Error: Wrong stage out command." + assert mySiteConfig.stageOuts[1]["protocol"] == "WebDAV", \ + "Error: Protocol is not correct." + #test the third stage out method with no command found, fall to default gfal2 + assert mySiteConfig.stageOuts[2]["command"] == "gfal2", \ + "Error: Wrong stage out command." + assert mySiteConfig.stageOuts[2]["protocol"] == "xrootd-module", \ + "Error: Protocol is not correct." + #test the fourth stage out method with "prefix": "root://172.26.19.197:1094//root://cmsxrootd-test.gridka.de:1094/" + assert mySiteConfig.stageOuts[3]["command"] == "xrdcp", \ + "Error: Wrong stage out command." + assert mySiteConfig.stageOuts[3]["protocol"] == "XRootDHoreKaGridKa", \ + "Error: Protocol is not correct." + + os.environ['SITECONFIG_PATH'] = '/cvmfs/cms.cern.ch/SITECONF/T1_IT_CNAF' + configFileName = os.path.join(getTestBase(), + "WMCore_t/Storage_t", + "T1_IT_CNAF_SiteLocalConfig.xml") + + mySiteConfig = SiteLocalConfig(configFileName) + #test the first stage out method for command 'cp' with protocol 'file' + assert mySiteConfig.stageOuts[0]["command"] == "cp", \ + "Error: Wrong stage out command." + assert mySiteConfig.stageOuts[0]["protocol"] == "file", \ + "Error: Protocol is not correct." + #assert False return diff --git a/test/python/WMCore_t/Storage_t/T1_DE_KIT/JobConfig/site-local-config-testStageOut-T1_DE_KIT.xml b/test/python/WMCore_t/Storage_t/T1_DE_KIT/JobConfig/site-local-config-testStageOut-T1_DE_KIT.xml index 1524df3ca5..30f42d2c50 100644 --- a/test/python/WMCore_t/Storage_t/T1_DE_KIT/JobConfig/site-local-config-testStageOut-T1_DE_KIT.xml +++ b/test/python/WMCore_t/Storage_t/T1_DE_KIT/JobConfig/site-local-config-testStageOut-T1_DE_KIT.xml @@ -33,7 +33,11 @@ - + + + + + diff --git a/test/python/WMCore_t/Storage_t/T1_IT_CNAF_SiteLocalConfig.xml b/test/python/WMCore_t/Storage_t/T1_IT_CNAF_SiteLocalConfig.xml new file mode 100644 index 0000000000..3c7d81b477 --- /dev/null +++ b/test/python/WMCore_t/Storage_t/T1_IT_CNAF_SiteLocalConfig.xml @@ -0,0 +1,50 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/test/python/WMCore_t/Storage_t/T1_US_FNAL_SiteLocalConfig.xml b/test/python/WMCore_t/Storage_t/T1_US_FNAL_SiteLocalConfig.xml index 79758b0f7f..9a382949e0 100644 --- a/test/python/WMCore_t/Storage_t/T1_US_FNAL_SiteLocalConfig.xml +++ b/test/python/WMCore_t/Storage_t/T1_US_FNAL_SiteLocalConfig.xml @@ -14,7 +14,8 @@ - + +