Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions src/python/RucioUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@
from rucio.client import Client as NativeClient
from rucio.common.exception import RSENotFound, RuleNotFound, RucioException

def withExponentialBackOffRetry(retryAttempts=5, fatalExceptions=(), retryExceptions=(Exception,)):
RETRIABLE_RUCIO_HTTP_STATUSES = [503]

def _is_rucio_retriable_http_error(exc):
"""True if this RucioException wraps a transient HTTP error we should retry."""
msg = str(exc).lower()
return any(f"http status code: {code}" in msg for code in RETRIABLE_RUCIO_HTTP_STATUSES)

def withExponentialBackOffRetry(retryAttempts=5, fatalExceptions=(), retryExceptions=(Exception,), retryPredicate=None):
"""
Generic Exponential Back-off Retry
- retryAttempts: The number of retry attempts to perform before giving up.
Expand All @@ -17,6 +24,8 @@ def withExponentialBackOffRetry(retryAttempts=5, fatalExceptions=(), retryExcept
9/10 attempts -- Long-running or critical operations such as job submission, task management, and tape recall (≈ 17/34 minutes tolerance).
- fatalExceptions: A tuple of exception types that should not be retried, raise immediately.
- retryExceptions: A tuple of exception types that are eligible for retry. Otherwise will be raise right away as well. (But our default is built-in Exception, so all exception will be catch and retry anyhow.
- retryPredicate: Optional callable(exception) -> bool. When a fatalException is caught,
if retryPredicate returns True, it is retried instead of raised. Extra sleep tolerance is applied.
"""
fatalExceptions = tuple(fatalExceptions)
retryExceptions = tuple(retryExceptions)
Expand All @@ -38,7 +47,18 @@ def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except fatalExceptions as e:
# Fatal Exception will not be retry, raise immediately.
if retryPredicate and retryPredicate(e):
if attempt > retryAttempts:
logger.error(f"Operation '{name}' failed after {attempt} retries (retryPredicate matched): {e}")
raise
sleepTime = 20 + (2 ** attempt)
logger.warning(
f"Retriable exception in '{name}' (attempt {attempt+1}/{retryAttempts}): "
f"{e}, waiting for {sleepTime} seconds..."
)
time.sleep(sleepTime)
attempt += 1
continue
logger.exception(f"Fatal exception in '{name}' : {e}")
logger.exception(f"Type of Exception: {type(e)}")
logger.exception(f"repr(): {repr(e)}")
Expand Down Expand Up @@ -67,7 +87,7 @@ def __getattr__(self, name):
if not callable(attr):
return attr

@withExponentialBackOffRetry(retryAttempts=8, fatalExceptions=(RucioException,))
@withExponentialBackOffRetry(retryAttempts=10, fatalExceptions=(RucioException,), retryPredicate=_is_rucio_retriable_http_error)
@wraps(attr)
def call(*args, **kwargs):
return attr(*args, **kwargs)
Expand Down Expand Up @@ -173,7 +193,7 @@ def getWritePFN(rucioClient=None, siteName='', lfn='', # pylint: disable=danger

return pfn

@withExponentialBackOffRetry(retryAttempts=8, fatalExceptions=(RucioException,))
@withExponentialBackOffRetry(retryAttempts=10, fatalExceptions=(RucioException,))
def getRuleQuota(rucioClient=None, ruleId=None):
""" return quota needed by this rule in Bytes """
size = 0
Expand All @@ -185,7 +205,7 @@ def getRuleQuota(rucioClient=None, ruleId=None):
size = sum(file['bytes'] for file in files)
return size

@withExponentialBackOffRetry(retryAttempts=8, fatalExceptions=(RucioException,))
@withExponentialBackOffRetry(retryAttempts=10, fatalExceptions=(RucioException,))
def getRucioUsage(rucioClient=None, account=None, activity =None):
""" size of Rucio usage for this account (if provided) or by activity """
if activity is None:
Expand Down