diff --git a/bin/meshroom_compute b/bin/meshroom_compute index 872f0bbd8c..1db64f6ad2 100755 --- a/bin/meshroom_compute +++ b/bin/meshroom_compute @@ -18,6 +18,7 @@ meshroom.setupEnvironment() import meshroom.core import meshroom.core.graph from meshroom.core.node import Status +from meshroom.core.node import ChunkIndex parser = argparse.ArgumentParser(description='Execute a Graph of processes.') @@ -49,11 +50,17 @@ parser.add_argument('-v', '--verbose', default=os.environ.get('MESHROOM_VERBOSE', 'info'), choices=['fatal', 'error', 'warning', 'info', 'debug', 'trace']) -parser.add_argument('-i', '--iteration', type=int, - default=-1, help='') +parser.add_argument('-i', '--iteration', type=int, default=ChunkIndex.NONE, help='') +parser.add_argument('--preprocess', help='Execute preprocess chunk', action='store_true') +parser.add_argument('--postprocess', help='Execute postprocess chunk', action='store_true') args = parser.parse_args() +if args.preprocess: + args.iteration = ChunkIndex.PREPROCESS +elif args.postprocess: + args.iteration = ChunkIndex.POSTPROCESS + # Setup the verbose level if args.extern: # For extern computation, we want to focus on the node computation log. @@ -102,7 +109,7 @@ if args.node: # If not running as "extern", the SUBMITTED status should generate a warning. submittedStatuses.append(Status.SUBMITTED) - if not node._chunksCreated: + if not node._chunksCreated and args.iteration >= 0: print(f"Error: Node {node} has been submitted before chunks have been created." \ f"See file: \"{node.nodeStatusFile}\".") sys.exit(-1) @@ -111,15 +118,21 @@ if args.node: print(f"InputNode: No computation to do.") sys.exit(0) + if args.preprocess: + chunks = [node._preprocessChunk] + elif args.postprocess: + chunks = [node._postprocessChunk] + elif args.iteration == ChunkIndex.NONE: # Warning : default value + chunks = node.chunks + else: + chunks = [node.chunks[args.iteration]] + if not args.forceStatus and not args.forceCompute: - if args.iteration != -1: - chunks = [node.chunks[args.iteration]] - else: - chunks = node.chunks for chunk in chunks: if chunk.status.status in submittedStatuses: # Particular case for the local isolated, the node status is set to RUNNING by the submitter directly. - # We ensure that no other instance has started to compute, by checking that the computeSessionUid is empty. + # We ensure that no other instance has start + # ed to compute, by checking that the computeSessionUid is empty. if chunk.node.getMrNodeType() == meshroom.core.MrNodeType.NODE and \ not chunk.status.computeSessionUid and node._nodeStatus.submitterSessionUid: continue @@ -129,24 +142,28 @@ if args.node: if args.extern: # Restore the log level logging.getLogger().setLevel(meshroom.logStringToPython[args.verbose]) - - node.prepareLogger(args.iteration) - node.preprocess() - if args.iteration != -1: - chunk = node.chunks[args.iteration] - if chunk._status.status == Status.STOPPED: - print(f"Chunk {chunk}: status is STOPPED") - killRunningJob(node) - chunk.process(args.forceCompute, args.inCurrentEnv) - else: + + if args.iteration == ChunkIndex.NONE: + # Process the whole node if node.nodeStatus.status == Status.STOPPED: print(f"Node {node}: status is STOPPED") killRunningJob(node) + node.preprocess(args.forceCompute, args.inCurrentEnv) + node.prepareLogger(args.iteration) node.process(args.forceCompute, args.inCurrentEnv) - node.postprocess() - node.restoreLogger() + node.restoreLogger() + node.postprocess(args.forceCompute, args.inCurrentEnv) + else: + chunk = chunks[0] + if chunk._status.status == Status.STOPPED: + print(f"Chunk {chunk}: status is STOPPED") + killRunningJob(node) + node.prepareLogger(args.iteration) + chunk.process(args.forceCompute, args.inCurrentEnv) + node.restoreLogger() + else: - if args.iteration != -1: + if args.iteration != ChunkIndex.NONE: print('Error: "--iteration" only makes sense when used with "--node".') sys.exit(-1) toNodes = None diff --git a/bin/meshroom_createChunks b/bin/meshroom_createChunks index e55ade807d..e573d34658 100755 --- a/bin/meshroom_createChunks +++ b/bin/meshroom_createChunks @@ -129,13 +129,13 @@ if not args.forceStatus and not args.forceCompute: f"submitterSessionUid: {node._nodeStatus.submitterSessionUid}") if chunksToProcess: - node.prepareLogger() node.preprocess() + node.prepareLogger() for chunk in chunksToProcess: logging.info(f"[MeshroomCreateChunks] process chunk {chunk}") chunk.process(args.forceCompute, args.inCurrentEnv) - node.postprocess() node.restoreLogger() + node.postprocess() else: logging.info(f"[MeshroomCreateChunks] -> create job to process chunks {[c for c in node.chunks]}") submitter.createChunkTask(node, graphFile=args.graphFile, cache=args.cache, diff --git a/localfarm/README.md b/localfarm/README.md new file mode 100644 index 0000000000..817ca8372e --- /dev/null +++ b/localfarm/README.md @@ -0,0 +1,174 @@ +# Meshroom Local Farm + +This folder contains a local farm tool for meshroom. It can be used in various ways : +- For testing we setup and launch the farm backend process and use it to test the submitting process +- We also added a submitter to be able to use it inside meshroom + +> [!NOTE] +> Note that the local famr only works in Unix for now because we use `fork` for daemonization. +> We could implement the [`DETACHED_PROCESS`](https://stackoverflow.com/a/12854376) flag with `subprocess.Popen` +> to handle the farm in Windows. + +## How to use + +### Launch + +First launch the farm process +```sh +python localfarm/localFarmLauncher.py start --root +``` + +The `FARM_ROOT` folder will contain the logs for each process and for the main process. + +### Commands + +- _start_ : Launch the farm +- _clean_ : Clean the files +- _stop_ : Stop the farm process +- _restart_ : Restart the farm process +- _status_ : Check the status +- _fullInfo_ : Display additional info + +### Add jobs + +The `test.py` script can be used to find examples on how to use it. +Basically here's how to create jobs and tasks : + +```py +import os +import datetime +from time import sleep +from collections import defaultdict +from localfarm.localFarm import Task, Job, LocalFarmEngine + +def now(): + now = datetime.datetime.now() + return now.strftime("%H:%M:%S ") + +def createTask(job, command, dependencies=[], _tasks=[]): + i = len(_tasks) + task = Task(f"Task {i}", f"echo '> Task {i}' && {command}") + job.addTask(task) + for parentTask in dependencies: + job.addTaskDependency(task, parentTask) + return task + +def getTasksByStatus(jid): + jobInfo = engine.get_job_status(jid) + if not jobInfo: + return {} + taskByStatus = defaultdict(set) + for task in jobInfo.get("tasks", []): + status = task.get("status", "UNKNOWN") + taskByStatus[status].add(task.get("tid")) + return dict(taskByStatus) + +# Get engine +engine = LocalFarmEngine(FARM_ROOT) +# Create job +job = Job("Example Job") +job.setEngine(engine) +# Add tasks +task1 = createTask(job, command="sleep 2", dependencies=[]) +task2 = createTask(job, command="sleep 2", dependencies=[task1]) +task3 = createTask(job, command="sleep 2", dependencies=[task1]) +task4 = createTask(job, command="sleep 2", dependencies=[task2, task3]) +task5 = createTask(job, command="sleep 2", dependencies=[task4]) +# Submit job +res = job.submit() +jid = res['jid'] +print(now() + f"-> job: {res}") + +# Monitor job +currentRunningTids = set() +while True: + sleep(1) + tasks = getTasksByStatus(jid) + if not tasks: + print("No tasks found for job") + break + runningTids = tasks.get("RUNNING") + activeTasks = tasks.get("SUBMITTED", set()).union(tasks.get("RUNNING", set())) + if not activeTasks: + print(now() + "All tasks completed") + break + if runningTids: + runningTids = [int(t) for t in runningTids] + newRunningTasks = set(runningTids) + if currentRunningTids != newRunningTasks: + print(now() + f"Now running tasks: {runningTids} (active tasks: {activeTasks})") + currentRunningTids = newRunningTasks +``` + +And this gives : + +``` +10:54:36 -> job: {'jid': 1} +10:54:37 Now running tasks: [1] (active tasks: {1, 2, 3, 4, 5}) +10:54:39 Now running tasks: [2, 3] (active tasks: {2, 3, 4, 5}) +10:54:41 Now running tasks: [4] (active tasks: {4, 5}) +10:54:44 Now running tasks: [5] (active tasks: {5}) +10:54:47 All tasks completed +``` + +### Launch the backend from a python process + +Instead of using the command line you can also use the launcher as an API : + +```py +from localfarm.localFarmLauncher import FarmLauncher + +# Launch +launcher = FarmLauncher(root=FARM_ROOT) +launcher.start() +# Add jobs & tasks & submit +... + +# Check status +launcher.status() + +# Stop the farm +launcher.stop() +``` + +And here are the logs : +``` + +Clean farm files... +Done. +Starting farm backend... +Farm root is: /homes/$USER/.local_farm +Farm backend started (PID: 6776) +Logs: /homes/$USER/.local_farm/backend.log + + +Farm backend is running (PID: 6776) +[LocalFarm][INFO] Connect to farm located at FARM_ROOT +Active jobs: 1 + - 1: RUNNING (5 tasks) -> {'SUCCESS': {1}, 'RUNNING': {2, 3}, 'SUBMITTED': {4, 5}} + + +Stopping farm backend (PID: 6776)... +Farm backend stopped +``` + +## Logs + +Here are the files we can find on the farm root : +``` +. +├── backend.log +├── backend.port +├── farm.pid +└── jobs + └── jid + └── tasks + ├── tid_min.log + ├── ... + └── tid_max.log +``` + +- _backend.log_ contains the logs for the backend process +- _farm.pid_ contains the PID for the backend process +- _backend.port_ contains the port used for the TCP connection +- In the jobs folder you can find all logs for the tasks of each job. The structure is : `jobs/{jid}/tasks/{tid}.log` diff --git a/localfarm/localFarm.py b/localfarm/localFarm.py index 13eb18cdf4..2245c6221f 100644 --- a/localfarm/localFarm.py +++ b/localfarm/localFarm.py @@ -16,7 +16,8 @@ logging.basicConfig( level=logging.INFO, - format='%(asctime)s [%(name)s][%(levelname)s] %(message)s' + format='%(asctime)s [%(name)s][%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', ) logger = logging.getLogger("LocalFarm") logger.setLevel(logging.INFO) @@ -31,7 +32,7 @@ def __init__(self, root): def connect(self): """ Connect to the backend. """ - print("Connect to farm located at", self.root) + logger.info(f"Connect to farm located at {self.root}") if self.tcpPortFile.exists(): try: port = int(self.tcpPortFile.read_text()) diff --git a/localfarm/localFarmBackend.py b/localfarm/localFarmBackend.py index 16770030f0..39de7227f1 100644 --- a/localfarm/localFarmBackend.py +++ b/localfarm/localFarmBackend.py @@ -358,6 +358,7 @@ def startTask(self, task: Task): with open(task.logFile, "w") as log: log.write(f"# ========== Starting task {task.tid} at {task.started_at.isoformat()}" f" (command=\"{task.command}\") ==========\n") + log.write(f"# metadata: {task.metadata}\n") log.write(f"# process_env:\n") log.write(f"# Additional env variables:\n") for _k, _v in additional_env.items(): diff --git a/meshroom/core/desc/node.py b/meshroom/core/desc/node.py index 79e137e067..cec3452e23 100644 --- a/meshroom/core/desc/node.py +++ b/meshroom/core/desc/node.py @@ -1,3 +1,5 @@ +# desc/node.py + import enum from inspect import getfile, getattr_static from pathlib import Path @@ -260,6 +262,11 @@ def preprocess(self, node): """ pass + @property + def _hasPreprocess(self): + """ Returns True if the class has a preprocess """ + return type(self).preprocess is not BaseNode.preprocess + def postprocess(self, node): """ Gets invoked after the processChunk method for the node. @@ -268,6 +275,11 @@ def postprocess(self, node): """ pass + @property + def _hasPostprocess(self): + """ Returns True if the class has a postprocess """ + return type(self).postprocess is not BaseNode.postprocess + def process(self, node): raise NotImplementedError(f'No process implementation on node: "{node.name}"') @@ -423,8 +435,11 @@ def processChunkInEnvironment(self, chunk): meshroomComputeCmd = f"{chunk.node.nodeDesc.pythonExecutable} {_MESHROOM_COMPUTE}" + \ f" \"{chunk.node.graph.filepath}\" --node {chunk.node.name}" + \ " --extern --inCurrentEnv" - - if len(chunk.node.getChunks()) > 1: + if chunk.isPreprocess: + meshroomComputeCmd += f" --preprocess" + elif chunk.isPostprocess: + meshroomComputeCmd += f" --postprocess" + elif len(chunk.node.getChunks()) >= 1: meshroomComputeCmd += f" --iteration {chunk.range.iteration}" runtimeEnv = chunk.node.nodeDesc.plugin.runtimeEnv diff --git a/meshroom/core/graph.py b/meshroom/core/graph.py index f014c66752..20ef461d3c 100644 --- a/meshroom/core/graph.py +++ b/meshroom/core/graph.py @@ -1781,7 +1781,7 @@ def executeGraph(graph, toNodes=None, forceCompute=False, forceStatus=False): logging.warning(f"{node.name} is in Compatibility Mode and cannot be computed: {node.issueDetails}.") continue - node.preprocess() + node.preprocess(forceCompute) if not node._chunksCreated: node.createChunks() multiChunks = len(node.chunks) > 1 @@ -1793,7 +1793,7 @@ def executeGraph(graph, toNodes=None, forceCompute=False, forceStatus=False): else: print(f'\n[{n + 1}/{len(nodes)}] {node.nodeType}') chunk.process(forceCompute) - node.postprocess() + node.postprocess(forceCompute) except Exception as exc: logging.error(f"Error on node computation: {exc}") graph.clearSubmittedNodes() diff --git a/meshroom/core/node.py b/meshroom/core/node.py index baf974dea9..4b53794bef 100644 --- a/meshroom/core/node.py +++ b/meshroom/core/node.py @@ -1,4 +1,7 @@ #!/usr/bin/env python + +# core/node.py + import sys import atexit import copy @@ -12,8 +15,8 @@ import time import uuid from collections import namedtuple, OrderedDict -from enum import Enum, auto -from typing import Callable, Optional, List +from enum import Enum, IntEnum, auto +from typing import Callable, Optional, List, Dict, Union import meshroom from meshroom.common import Signal, Variant, Property, BaseObject, Slot, ListModel, DictModel @@ -61,6 +64,13 @@ class ExecMode(Enum): EXTERN = auto() +class ChunkIndex(IntEnum): + NONE=-3 + PREPROCESS=-2 + POSTPROCESS=-1 + # Standard chunks are indexed from 0 + + # Simple structure for storing chunk information NodeChunkSetup = namedtuple("NodeChunks", ["blockSize", "fullSize", "nbBlocks"]) @@ -224,14 +234,10 @@ class ChunkStatusData(BaseObject): def __init__(self, nodeName='', mrNodeType: MrNodeType = MrNodeType.NONE, parent: BaseObject = None): super().__init__(parent) - self.nodeName: str = nodeName self.mrNodeType = mrNodeType - self.computeSessionUid: Optional[str] = None # Session where computation is done - self.execMode: ExecMode = ExecMode.NONE - self.resetDynamicValues() def resetDynamicValues(self): @@ -243,6 +249,9 @@ def resetDynamicValues(self): self.elapsedTime: float = 0.0 self.hostname: str = "" + def checkStatus(self, statusName): + return self.status == Status[statusName] + def setNode(self, node): """ Set the node information from one node instance. """ self.nodeName = node.name @@ -399,25 +408,22 @@ def makeProgressBar(self, end, message=''): f.close() - with open(self.logFile) as f: + with open(self.logFile, "r") as f: content = f.read() self.progressBarPosition = content.rfind('\n') - f.close() - def updateProgressBar(self, value): assert self.progressBar assert value <= self.progressEnd tics = round((value/self.progressEnd)*51) - with open(self.logFile, 'r+') as f: + with open(self.logFile, "r+") as f: text = f.read() for i in range(tics-self.currentProgressTics): text = text[:self.progressBarPosition]+'*'+text[self.progressBarPosition:] f.seek(0) f.write(text) - f.close() self.currentProgressTics = tics @@ -457,8 +463,8 @@ def clearProcessesStatus(): class NodeChunk(BaseObject): def __init__(self, node, range, parent=None): super().__init__(parent) - self.node = node - self.range = range + self.node: Node = node + self.range: desc.Range = range self._logManager = None self._status: ChunkStatusData = ChunkStatusData(nodeName=node.name, mrNodeType=node.getMrNodeType()) self.statistics: stats.Statistics = stats.Statistics() @@ -468,18 +474,29 @@ def __init__(self, node, range, parent=None): self.node.internalFolderChanged.connect(self.nodeFolderChanged) def __repr__(self): - return f"" + return f"" @property def index(self): return self.range.iteration + @property + def isPreprocess(self): + return self.index == ChunkIndex.PREPROCESS + + @property + def isPostprocess(self): + return self.index == ChunkIndex.POSTPROCESS + @property def name(self): + if self.isPreprocess: + return f"{self.node.name}(preprocess)" + if self.isPostprocess: + return f"{self.node.name}(postprocess)" if self.range.blockSize: return f"{self.node.name}({self.index})" - else: - return self.node.name + return self.node.name @property def logManager(self): @@ -544,13 +561,19 @@ def _getFile(self, fileType: str): Return the path for the requested type of file. It is expected to be prefixed by the chunk number, but for compatibility purposes, it may not be. """ - chunkIndex = self.index if self.range.blockSize else 0 + if self.isPreprocess: + chunkName = "preprocess" + elif self.isPostprocess: + chunkName = "postprocess" + else: + chunkIndex = self.index if self.range.blockSize else 0 + chunkName = str(chunkIndex) # Retro-compatibility: ensure we do not lose files computed when single chunks were not prefixed # If both the prefixed and not prefixed files exist, the prefixed one should be returned if os.path.exists(os.path.join(self.node.internalFolder, fileType)): - if not os.path.exists(os.path.join(self.node.internalFolder, str(chunkIndex) + "." + fileType)): + if not os.path.exists(os.path.join(self.node.internalFolder, chunkName + "." + fileType)): return os.path.join(self.node.internalFolder, fileType) - return os.path.join(self.node.internalFolder, str(chunkIndex) + "." + fileType) + return os.path.join(self.node.internalFolder, chunkName + "." + fileType) def getStatusFile(self): return self._getFile("status") @@ -656,8 +679,12 @@ def process(self, forceCompute=False, inCurrentEnv=False): self.statThread.start() try: - logging.info(f"[Process chunk] Start processing...") - self.node.nodeDesc.processChunk(self) + if self.isPreprocess: + self.node.nodeDesc.preprocess(self.node) + elif self.isPostprocess: + self.node.nodeDesc.postprocess(self.node) + else: + self.node.nodeDesc.processChunk(self) # NOTE: this assumes saving the output attributes for each chunk self.node.saveOutputAttr() executionStatus = Status.SUCCESS @@ -683,7 +710,6 @@ def process(self, forceCompute=False, inCurrentEnv=False): self.statistics = stats.Statistics() del runningProcesses[self.name] - def _processInIsolatedEnvironment(self): """ Process this node chunk in the isolated environment defined in the environment @@ -747,7 +773,7 @@ def isExtern(self): return meshroom.core.sessionUid not in (self.node._nodeStatus.submitterSessionUid, self._status.computeSessionUid) return False return False - + statusChanged = Signal() status = Property(Variant, lambda self: self._status, notify=statusChanged) statusName = Property(str, getStatusName, notify=statusChanged) @@ -811,6 +837,10 @@ def __init__(self, nodeType: str, position: Position = None, parent: BaseObject self.graph = None self.dirty: bool = True # whether this node's outputs must be re-evaluated on next Graph update self._chunks: list[NodeChunk] = ListModel(parent=self) + self._preprocessChunk = NodeChunk(self, desc.Range(ChunkIndex.PREPROCESS)) if \ + self.nodeDesc and self.nodeDesc._hasPreprocess else None + self._postprocessChunk = NodeChunk(self, desc.Range(ChunkIndex.POSTPROCESS)) if \ + self.nodeDesc and self.nodeDesc._hasPostprocess else None self._chunksCreated = False # Only initialize chunks on compute self._chunkPlaceholder: list[NodeChunk] = ListModel(parent=self) # Placeholder chunk for nodes with dynamic ones self._uid: str = uid @@ -1286,11 +1316,11 @@ def ram(self): return self.nodeDesc.resolvedRam(self) def hasStatus(self, status: Status): + if self.isInputNode: + return status == Status.INPUT if not self._chunks or not self._chunksCreated: - if self.isInputNode: - return status == Status.INPUT return status == Status.NONE - for chunk in self._chunks: + for chunk in self._allChunks: if chunk.status.status != status: return False return True @@ -1331,19 +1361,19 @@ def clearData(self): @Slot(result=str) def getStartDateTime(self): """ Return the date (str) of the first running chunk """ - dateTime = [chunk._status.startDateTime for chunk in self._chunks if chunk._status.status + dateTime = [chunk._status.startDateTime for chunk in self._allChunks if chunk._status.status not in (Status.NONE, Status.SUBMITTED) and chunk._status.startDateTime != ""] return min(dateTime) if len(dateTime) != 0 else "" def isAlreadySubmitted(self): if self._chunksCreated: - return any(c.isAlreadySubmitted() for c in self._chunks) + return any(c.isAlreadySubmitted() for c in self._allChunks) else: return self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING) def isAlreadySubmittedOrFinished(self): if self._chunksCreated: - return all(c.isAlreadySubmittedOrFinished() for c in self._chunks) + return all(c.isAlreadySubmittedOrFinished() for c in self._allChunks) else: return self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING, Status.SUCCESS) @@ -1357,7 +1387,7 @@ def isSubmittedOrRunning(self): return False if not self.isAlreadySubmittedOrFinished(): return False - for chunk in self._chunks: + for chunk in self._allChunks: if chunk.isRunning(): return True return False @@ -1365,7 +1395,7 @@ def isSubmittedOrRunning(self): @Slot(result=bool) def isRunning(self): """ Return True if at least one chunk of this Node is running, False otherwise. """ - return any(chunk.isRunning() for chunk in self._chunks) + return any(chunk.isRunning() for chunk in self._allChunks) @Slot(result=bool) def isFinishedOrRunning(self): @@ -1373,14 +1403,14 @@ def isFinishedOrRunning(self): Return True if all chunks of this Node is either finished or running, False otherwise. """ - if not self._chunks: + if not self._allChunks: return False - return all(chunk.isFinishedOrRunning() for chunk in self._chunks) + return all(chunk.isFinishedOrRunning() for chunk in self._allChunks) @Slot(result=bool) def isPartiallyFinished(self): """ Return True is at least one chunk of this Node is finished, False otherwise. """ - return any(chunk.isFinished() for chunk in self._chunks) + return any(chunk.isFinished() for chunk in self._allChunks) def isExtern(self): """ @@ -1399,7 +1429,7 @@ def isExtern(self): elif self._nodeStatus.execMode == ExecMode.LOCAL and self._nodeStatus.status in (Status.SUBMITTED, Status.RUNNING): return meshroom.core.sessionUid != self._nodeStatus.submitterSessionUid return False - return any(chunk.isExtern() for chunk in self._chunks) + return any(chunk.isExtern() for chunk in self._allChunks) @Slot() def clearSubmittedChunks(self): @@ -1411,28 +1441,30 @@ def clearSubmittedChunks(self): This must be used with caution. This could lead to inconsistent node status if the graph is still being computed. """ - if self._chunksCreated: - for chunk in self._chunks: - if chunk.isAlreadySubmitted(): - chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE) - else: - if self.isAlreadySubmitted(): - self.upgradeStatusTo(Status.NONE, ExecMode.NONE) + chunks: List[Union[BaseNode, NodeChunk]] = self._allChunks + if not self._chunksCreated: + chunks.append(self) + for chunk in chunks: + if chunk.isAlreadySubmitted(): + chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE) self.globalStatusChanged.emit() def clearLocallySubmittedChunks(self): """ Reset all locally submitted chunks to Status.NONE. """ - if self._chunksCreated: - for chunk in self._chunks: - if chunk.isAlreadySubmitted() and not chunk.isExtern(): - chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE) - else: - if self.isAlreadySubmitted() and not self.isExtern(): - self.upgradeStatusTo(Status.NONE, ExecMode.NONE) + chunks: List[Union[BaseNode, NodeChunk]] = self._allChunks + if not self._chunksCreated: + chunks.append(self) + for chunk in chunks: + if chunk.isAlreadySubmitted() and not chunk.isExtern(): + chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE) self.globalStatusChanged.emit() def upgradeStatusTo(self, newStatus, execMode=None): """ Upgrade node to the given status and save it on disk. """ + if self.nodeDesc._hasPreprocess: + self._preprocessChunk.upgradeStatusTo(newStatus) + if self.nodeDesc._hasPostprocess: + self._postprocessChunk.upgradeStatusTo(newStatus) if self._chunksCreated: for chunk in self._chunks: chunk.upgradeStatusTo(newStatus) @@ -1449,7 +1481,7 @@ def upgradeStatusTo(self, newStatus, execMode=None): self.globalStatusChanged.emit() def updateStatisticsFromCache(self): - for chunk in self._chunks: + for chunk in self._allChunks: chunk.updateStatisticsFromCache() def _resetChunks(self): @@ -1647,6 +1679,10 @@ def updateStatusFromCache(self): logging.warning(f"Could not create chunks from cache: {e}") return s = self.globalStatus + if self.nodeDesc._hasPreprocess: + self._preprocessChunk.updateStatusFromCache() + if self.nodeDesc._hasPostprocess: + self._postprocessChunk.updateStatusFromCache() if self._chunksCreated: for chunk in self._chunks: chunk.updateStatusFromCache() @@ -1681,7 +1717,7 @@ def initStatusOnSubmit(self, forceCompute=False): hasChunkToLaunch = False if not self._chunksCreated: hasChunkToLaunch = True - for chunk in self._chunks: + for chunk in self._allChunks: if forceCompute or chunk._status.status != Status.SUCCESS: hasChunkToLaunch = True chunk._status.setNode(self) @@ -1703,7 +1739,7 @@ def initStatusOnCompute(self, forceCompute=False): hasChunkToLaunch = False if not self._chunksCreated: hasChunkToLaunch = True - for chunk in self._chunks: + for chunk in self._allChunks: if forceCompute or (chunk._status.status not in (Status.RUNNING, Status.SUCCESS)): hasChunkToLaunch = True chunk._status.setNode(self) @@ -1720,30 +1756,47 @@ def initStatusOnCompute(self, forceCompute=False): chunkPlaceholder._status.status = self._nodeStatus.status self._chunkPlaceholder.setObjectList([chunkPlaceholder]) self.chunksChanged.emit() + + def getChunkName(self, iteration: int): + if iteration >= 0: + return str(self.chunks[iteration].index) + elif iteration == ChunkIndex.PREPROCESS: + return "preprocess" + elif iteration == ChunkIndex.POSTPROCESS: + return "postprocess" + return "0" def processIteration(self, iteration): self._chunks[iteration].process() - def preprocess(self): - # Invoke the Node Description's pre-process for the Client Node to prepare its processing - self.nodeDesc.preprocess(self) + def preprocess(self, forceCompute=False, inCurrentEnv=False): + """ Prepare the node processing """ + if self.nodeDesc._hasPreprocess: + self.prepareLogger(ChunkIndex.PREPROCESS) + self._preprocessChunk.process(forceCompute, inCurrentEnv) + self.restoreLogger() def process(self, forceCompute=False, inCurrentEnv=False): for chunk in self._chunks: chunk.process(forceCompute, inCurrentEnv) - def postprocess(self): - # Invoke the post process on Client Node to execute after the processing on the - # node is completed - self.nodeDesc.postprocess(self) + def postprocess(self, forceCompute=False, inCurrentEnv=False): + """ + Invoke the post process on Client Node to execute after the processing on the + node is completed + """ + if self.nodeDesc._hasPostprocess: + self.prepareLogger(ChunkIndex.POSTPROCESS) + self._postprocessChunk.process(forceCompute, inCurrentEnv) + self.restoreLogger() def getLogHandlers(self): return self._handlers - def prepareLogger(self, iteration=-1): + def prepareLogger(self, iteration=ChunkIndex.NONE): # Get file handler path - chunkIndex = self.chunks[iteration].index if iteration != -1 else 0 - logFileName = f"{chunkIndex}.log" + chunkName = self.getChunkName(iteration) + logFileName = f"{chunkName}.log" logFile = os.path.join(self.internalFolder, logFileName) # Setup logger rootLogger = logging.getLogger() @@ -1828,6 +1881,10 @@ def endSequence(self): def stopComputation(self): """ Stop the computation of this node. """ + if self.nodeDesc._hasPreprocess: + self._preprocessChunk.stopProcess() + if self.nodeDesc._hasPostprocess: + self._postprocessChunk.stopProcess() if self._chunks: for chunk in self._chunks.values(): chunk.stopProcess() @@ -1850,12 +1907,12 @@ def getGlobalStatus(self): if not self._chunksCreated: # Get status from nodeStatus return self._nodeStatus.status - if not self._chunks: + if not self._allChunks: return Status.NONE - if len(self._chunks) == 1: - return self._chunks[0]._status.status - - chunksStatus = [chunk._status.status for chunk in self._chunks] + if len(self._allChunks) == 1: + return self._allChunks[0]._status.status + + chunksStatus = [chunk._status.status for chunk in self._allChunks] anyOf = (Status.ERROR, Status.STOPPED, Status.KILLED, Status.RUNNING, Status.SUBMITTED) @@ -1872,11 +1929,11 @@ def getGlobalStatus(self): @Slot(result=ChunkStatusData) def getFusedStatus(self): - if not self._chunks: + if not self._allChunks: return ChunkStatusData() fusedStatus = ChunkStatusData() - fusedStatus.fromDict(self._chunks[0]._status.toDict()) - for chunk in self._chunks[1:]: + fusedStatus.fromDict(self._allChunks[0]._status.toDict()) + for chunk in self._allChunks[1:]: fusedStatus.merge(chunk._status) fusedStatus.status = self.getGlobalStatus() return fusedStatus @@ -1902,8 +1959,8 @@ def _isBackdropNode(self) -> bool: def globalExecMode(self): if not self._chunksCreated: return self._nodeStatus.execMode.name - if len(self._chunks): - return self._chunks.at(0).getExecModeName() + if len(self._allChunks): + return self._allChunks[0].getExecModeName() else: return ExecMode.NONE @@ -1919,6 +1976,25 @@ def _getJobName(self): def getChunks(self) -> list[NodeChunk]: return self._chunks + @property + def _allChunks(self) -> list[NodeChunk]: + chunks = [] + if self.nodeDesc._hasPreprocess: + chunks.append(self._preprocessChunk) + chunks.extend([c for c in self._chunks]) + if self.nodeDesc._hasPostprocess: + chunks.append(self._postprocessChunk) + return chunks + + def getAllChunks(self): + allChunks = [] + if self.nodeDesc._hasPreprocess: + allChunks.append({"chunkIndex": ChunkIndex.PREPROCESS, "chunk": self._preprocessChunk, "name": "Preprocess"}) + allChunks.extend([{"chunkIndex": i, "chunk": c, "name": str(i)} for i, c in enumerate(self._chunks)]) + if self.nodeDesc._hasPostprocess: + allChunks.append({"chunkIndex": ChunkIndex.POSTPROCESS, "chunk": self._postprocessChunk, "name": "Postprocess"}) + return allChunks + def getSize(self): return self._size @@ -2032,9 +2108,9 @@ def updateDuplicates(self, nodesPerUid): def initFromThisSession(self) -> bool: """ Check if the node was submitted from the current session """ - if not self._chunksCreated or not self._chunks: + if not self._chunksCreated or not self._allChunks: return meshroom.core.sessionUid == self._nodeStatus.submitterSessionUid - for chunk in self._chunks: + for chunk in self._allChunks: # Technically the check on chunk._status.computeSessionUid is useless if meshroom.core.sessionUid not in (chunk._status.computeSessionUid, self._nodeStatus.submitterSessionUid): return False @@ -2042,9 +2118,9 @@ def initFromThisSession(self) -> bool: def isMainNode(self) -> bool: """ In case of a node with duplicates, we check that the node is the one driving the computation. """ - if len(self._chunks) == 0: + if len(self._allChunks) == 0: return True - firstChunk = self._chunks.at(0) + firstChunk = self._allChunks[0] if not firstChunk.statusNodeName: # If nothing is declared, anyone could become the main (if there are duplicates). return True @@ -2171,6 +2247,7 @@ def _hasDisplayableShape(self): chunksCreated = Property(bool, lambda self: self._chunksCreated, notify=chunksCreatedChanged) chunksChanged = Signal() chunks = Property(Variant, getChunks, notify=chunksChanged) + allChunks = Property(Variant, getAllChunks, notify=chunksChanged) chunkPlaceholder = Property(Variant, lambda self: self._chunkPlaceholder, notify=chunksChanged) nbParallelizationBlocks = Property(int, lambda self: len(self._chunks) if self._chunksCreated else 0, notify=chunksChanged) sizeChanged = Signal() @@ -2313,10 +2390,14 @@ def _resetChunks(self): """ Set chunks on the node. # TODO : Maybe don't delete chunks if we will recreate them as before ? """ - if self.isInputNode: + if not self.isComputableType: self._chunksCreated = True return # Disconnect signals + if self._preprocessChunk: + self._preprocessChunk.statusChanged.disconnect(self.globalStatusChanged) + if self._postprocessChunk: + self._postprocessChunk.statusChanged.disconnect(self.globalStatusChanged) for chunk in self._chunks: chunk.statusChanged.disconnect(self.globalStatusChanged) # Empty list @@ -2351,7 +2432,18 @@ def _resetChunks(self): self._chunksCreated = False self.setSize(0) self._chunkPlaceholder.setObjectList([NodeChunk(self, desc.computation.Range())]) - + # Pre/post process + if self.nodeDesc._hasPreprocess: + self._preprocessChunk = NodeChunk(self, desc.Range(ChunkIndex.PREPROCESS)) + self._preprocessChunk.statusChanged.connect(self.globalStatusChanged) + else: + self._preprocessChunk = None + if self.nodeDesc._hasPostprocess: + self._postprocessChunk = NodeChunk(self, desc.Range(ChunkIndex.POSTPROCESS)) + self._postprocessChunk.statusChanged.connect(self.globalStatusChanged) + else: + self._postprocessChunk = None + # Create chunks when possible self.chunksCreatedChanged.emit() self.chunksChanged.emit() diff --git a/meshroom/core/submitter.py b/meshroom/core/submitter.py index 2b47ecb622..7181e6098b 100644 --- a/meshroom/core/submitter.py +++ b/meshroom/core/submitter.py @@ -1,11 +1,13 @@ #!/usr/bin/env python +from __future__ import annotations + import sys import logging import operator from enum import IntFlag, auto -from typing import Optional +from typing import Optional, Dict from itertools import accumulate import meshroom @@ -65,6 +67,246 @@ def __repr__(self): return f"" +class OrderedTaskType(IntFlag): + PLACEHOLDER = auto() + """No command : just here to have dependencies""" + PREPROCESS = auto() + """Task that executes a node preprocess method""" + EXPANDING = auto() + """Task that executes a node preprocess method""" + CHUNK = auto() + """Task that will expand during the processing""" + POSTPROCESS = auto() + """Task that executes a node postprocess method""" + + +class OrderedTask: + def __init__(self, taskType, node = None, iteration : int = -1): + self.taskType: OrderedTaskType = taskType + self.node = node # BaseNode + self.iteration = iteration + self.dependencies = [] + + def addDependency(self, otherTask: OrderedTask): + self.dependencies.append(otherTask) + + def __repr__(self): + if self.taskType == OrderedTaskType.PLACEHOLDER: + string = f"" + string = f"= 0: + string += f"iteration={self.iteration}, " + string += f"{len(self.dependencies)} deps)>" + return string + + +class OrderedNode: + """ Intermediate structure used to order tasks """ + + def __init__(self, node, dependencies=None): + # node can be None for placeholder tasks (task that don't do anything else than regrouping dependencies) + self.node = node # BaseNode + self.dependencies: list[OrderedNode] = dependencies or [] # Tasks that needs to run before the current one + + @property + def isPlaceholder(self) -> bool: + """ If the node is None then it's just a void item to be used as a task placeholder """ + return self.node is None + + @property + def isExpanding(self) -> bool: + """ Expanding nodes are nodes which number of chunks is not determined yet. + It will be resolved when the node processing starts. Therefore a first process is launched that + will create chunks and then chunk tasks are created later (from the submitted process). + """ + return not self.node._chunksCreated + + @property + def chunksIterations(self) -> list[int]: + """ Get all iterations to process. + Used in the case where the node is parallelized and when we know how many chunks are executed. + It should not be called is `self.isExpanding` therefore we return None + """ + if self.isExpanding: + return None + if not self.isExpanding and self.node.isParallelized: + _, _, nbBlocks = self.node.nodeDesc.parallelization.getSizes(self.node) + iterationsToIgnore = [] + for c in self.node._chunks: + if c._status.checkStatus("SUCCESS"): + iterationsToIgnore.append(c.range.iteration) + if nbBlocks > 0: + return [k for k in range(nbBlocks) if k not in iterationsToIgnore] + return [-1] + + @property + def hasPreprocess(self) -> bool: + return self.node.nodeDesc._hasPreprocess + + @property + def hasPostprocess(self) -> bool: + return self.node.nodeDesc._hasPostprocess + + def __repr__(self): + depsNames = "|".join([t.node.name for t in self.dependencies]) + if self.isPlaceholder: + return f"" + else: + return f"" + + +class OrderedTasks: + """ Build and provide access to tasks that are ordered + + Note: + We change a bit the logic from the meshroom graph because here the last node to be processed + is the "root" and its dependencies are the "children". This is necessary because this is usually + the order where the tasks will be created on the farm (we create one task, then add other tasks as + dependencies, and not we create a task, then we add a task to execute next as we do it here). + + TODO: Keep the meshroom order and just provide an `inverse` method. + """ + + def __init__(self, nodes, edges): + # First correctly order the nodes + self.nodesByLevel: list[list[OrderedNode]] = [] + self._orderNodes(nodes, edges) + # Now create all the OrderedChunkTask objects + self.rootTask = OrderedTask(taskType=OrderedTaskType.PLACEHOLDER) + self._nodeUidToLastTask: Dict[str, OrderedTask] = {} # { _uid: lastTaskToProcess } + self._orderTasks() + + def _orderNodes(self, nodes, edges): + """ + Take all the nodes and connections and orger them by processing step + 0 is the root nodes (can be executed last) + Then 1 is the level with the direct dependencies for the root nodes, and etc... + """ + # uid -> orderedNode + nodeToOrderedNode = {n._uid: OrderedNode(n) for n in nodes} + # Build dependency relationships from edges + for u, v in edges: + # Change a bit the ordering logic of Meshroom : + # parent task is the last one to be executed, child are their dependencies + parentTask = nodeToOrderedNode[u._uid] + childTask = nodeToOrderedNode[v._uid] + parentTask.dependencies.append(childTask) + + # Create a task + rootTask = OrderedNode(None, dependencies=nodeToOrderedNode.values()) + # Find each node depth (= what level the node is) + depthByTask = {} + def updateDepth(tasks, depthByTask, currentDepth=0): + for task in tasks: + if currentDepth > depthByTask.get(task, -1): + depthByTask[task] = currentDepth + if task.dependencies: + updateDepth(task.dependencies, depthByTask, currentDepth+1) + updateDepth([rootTask], depthByTask, currentDepth=-1) + # Regroup nodes by level + levels = list(set(l for l in list(depthByTask.values()))) + self.nodesByLevel = [[t for t, l in depthByTask.items() if l == lev] for lev in levels] + + def createNodeTasks(self, orderedNode: OrderedNode, parentTask: OrderedTask): + """ Create tasks corresponding to a node and link them correctly. + Also link them to the parent task, and recursively create children tasks. + """ + logger.debug(f"* (createNodeTasks) node {orderedNode.node._name}, parent {parentTask.node}") + # Check if task has already been created + visited = (nodeUid:=orderedNode.node._uid) in self._nodeUidToLastTask + if visited: + logger.debug(" -> is visited") + # If task is already created simply create the connection + lastTask = self._nodeUidToLastTask[nodeUid] + parentTask.addDependency(lastTask) + return + # Create node tasks + if orderedNode.isPlaceholder: + logger.debug(" -> is placeholder") + task = OrderedTask(OrderedTaskType.PLACEHOLDER, orderedNode.node) + firstTask = lastTask = task + else: + lastTask = firstTask = None + # Create pre/post tasks if needed + if orderedNode.hasPostprocess: + logger.debug(" -> postprocess") + lastTask = OrderedTask(OrderedTaskType.POSTPROCESS, orderedNode.node) + if orderedNode.hasPreprocess: + logger.debug(" -> preprocess") + firstTask = OrderedTask(OrderedTaskType.PREPROCESS, orderedNode.node) + # Process + if orderedNode.isExpanding: + logger.debug(" -> is expanding") + expandingTask = OrderedTask(OrderedTaskType.EXPANDING, orderedNode.node) + if lastTask: + lastTask.addDependency(expandingTask) + else: + lastTask = expandingTask + if firstTask: + expandingTask.addDependency(firstTask) + else: + firstTask = expandingTask + else: + logger.debug(f" -> has chunks : {orderedNode.chunksIterations}") + # Handle 0 chunks case + if len(orderedNode.chunksIterations) == 0: + if firstTask and lastTask: + lastTask.addDependency(firstTask) + elif firstTask: + lastTask = firstTask + elif lastTask: + firstTask = lastTask + else: + firstTask = lastTask = OrderedTask(OrderedTaskType.PLACEHOLDER, orderedNode.node) + # Create and link chunks + else: + # Create placeholders for pre/post + lastTask = lastTask if lastTask else OrderedTask(OrderedTaskType.PLACEHOLDER, orderedNode.node) + firstTask = firstTask if firstTask else OrderedTask(OrderedTaskType.PLACEHOLDER, orderedNode.node) + for iteration in orderedNode.chunksIterations: + logger.debug(f" - chunk {iteration}") + chunkTask = OrderedTask(OrderedTaskType.CHUNK, orderedNode.node, iteration=iteration) + lastTask.addDependency(chunkTask) + chunkTask.addDependency(firstTask) + # Add parent dependency + parentTask.addDependency(lastTask) + # Create children + for n in orderedNode.dependencies: + logger.debug(f" -> create deps {n}") + self.createNodeTasks(n, firstTask) + # Add the last task to execute for this node to _nodeUidToLastTask + self._nodeUidToLastTask[nodeUid] = lastTask + logger.debug(f" -> done {orderedNode.node._name}") + + def _orderTasks(self): + """ Use the nodesByLevel info to create all tasks to send to the submitter """ + firstLevelOrderedNodes = self.nodesByLevel[0] + # Start from a root task + self._nodeUidToLastTask = {} + for n in firstLevelOrderedNodes: + self.createNodeTasks(n, self.rootTask) + + def display(self, task:OrderedTask=None, level=0): + if task is None: + task = self.rootTask + logger.debug(f"{' '*4*level}[{level:02d}] {task}") + for child in task.dependencies: + self.display(child, level+1) + + def iterOnTasks(self, current:OrderedTask=None): + if current is None: + current = self.rootTask + yield current + for task in current.dependencies: + yield from self.iterOnTasks(task) + + def __iter__(self): + yield from self.iterOnTasks() + + class BaseSubmittedJob: """ Interface to manipulate the job via Meshroom @@ -200,7 +442,7 @@ def __init__(self, parent=None): def name(self): return self._name - def createJob(self, nodes, edges, filepath, submitLabel="{projectName}"): + def createJob(self, orderedTasks: OrderedTasks, filepath: str, submitLabel: str = "{projectName}"): """ Submit the given graph Returns: bool: whether the submission succeeded @@ -221,7 +463,8 @@ def submit(self, nodes, edges, filepath, submitLabel="{projectName}") -> BaseSub Returns: bool: whether the submission succeeded """ - job = self.createJob(nodes, edges, filepath, submitLabel) + orderedTasks = OrderedTasks(nodes, edges) + job = self.createJob(orderedTasks, filepath, submitLabel) if not job: # Failed to create the job return None diff --git a/meshroom/core/taskManager.py b/meshroom/core/taskManager.py index 4f1a111540..11ac31e5c3 100644 --- a/meshroom/core/taskManager.py +++ b/meshroom/core/taskManager.py @@ -6,7 +6,7 @@ import meshroom from meshroom.common import BaseObject, DictModel, Property, Signal, Slot -from meshroom.core.node import Node, Status +from meshroom.core.node import Node, Status, ExecMode from meshroom.core.graph import Graph from meshroom.core.submitter import jobManager, BaseSubmittedJob import meshroom.core.graph @@ -67,6 +67,38 @@ def onChunksCreated(createdNode): finally: self._manager.chunksCreated.disconnect(onChunksCreated) timer.stop() + + def execChunk(self, chunk, node, function) -> bool: + """ Handle the chunk process & fail + + Args: + chunk (chunk to process): _description_ + node (_type_): _description_ + function (_type_): _description_ + + Returns: + bool: _description_ + """ + try: + function(self.forceCompute) + except Exception as exc: + if chunk.isStopped(): + return True + else: + logging.error(f"Error on node computation: {exc}") + self.clearNodes(node) + return False + + def clearNodes(self, node): + nodesToRemove, _ = self._manager._graph.dfsOnDiscover(startNodes=[node], reverse=True) + # remove following nodes from the task queue + for n in nodesToRemove[1:]: # exclude current node + try: + self._manager._nodesToProcess.remove(n) + except ValueError: + # Node already removed (for instance a global clear of _nodesToProcess) + pass + n.clearSubmittedChunks() def run(self): """ Consume compute tasks. """ @@ -98,7 +130,22 @@ def run(self): except TypeError: continue - node.preprocess() + # Preprocess + try: + node.preprocess(self.forceCompute) + except Exception as exc: + if node._preprocessChunk.isStopped(): + stopAndRestart = True + else: + logging.error(f"Error on node preprocess: {exc}") + self.clearNodes(node) + for chunk in node._chunks: + if chunk.isAlreadySubmitted(): + chunk.upgradeStatusTo(Status.NONE, ExecMode.NONE) + if node.nodeDesc._hasPostprocess: + node._postprocessChunk.upgradeStatusTo(Status.NONE, ExecMode.NONE) + break + # Process for cId, chunk in enumerate(node.chunks): if chunk.isFinishedOrRunning() or not self.isRunning(): continue @@ -121,18 +168,16 @@ def run(self): break else: logging.error(f"Error on node computation: {exc}") - nodesToRemove, _ = self._manager._graph.dfsOnDiscover(startNodes=[node], reverse=True) - # remove following nodes from the task queue - for n in nodesToRemove[1:]: # exclude current node - try: - self._manager._nodesToProcess.remove(n) - except ValueError: - # Node already removed (for instance a global clear of _nodesToProcess) - pass - n.clearSubmittedChunks() - node.postprocess() - - if stopAndRestart: + self.clearNodes(node) + # Postprocess + try: + node.postprocess(self.forceCompute) + except Exception as exc: + if node._postprocessChunk.isStopped(): + stopAndRestart = True + else: + logging.error(f"Error on node postprocess: {exc}") + self.clearNodes(node) break if stopAndRestart: diff --git a/meshroom/submitters/localFarmSubmitter.py b/meshroom/submitters/localFarmSubmitter.py index ed35211786..cc87cf3206 100644 --- a/meshroom/submitters/localFarmSubmitter.py +++ b/meshroom/submitters/localFarmSubmitter.py @@ -7,6 +7,7 @@ from pathlib import Path from typing import List, Dict from meshroom.core.submitter import BaseSubmitter, SubmitterOptions, BaseSubmittedJob, SubmitterOptionsEnum +from meshroom.core.submitter import OrderedTask, OrderedTasks, OrderedTaskType from meshroom.core.node import Status from collections import namedtuple, defaultdict @@ -77,7 +78,7 @@ def getRequestPackages(packagesDelimiter="=="): return list(reqPackages) -def rezWrapCommand(cmd, useCurrentContext=False, useRequestedContext=True, otherRezPkg: list[str] = None): +def rezWrapCommand(cmd, useCurrentContext=False, useRequestedContext=True, otherRezPkg: list[str] = None, additionalEnv: dict=None): """ Wrap command to be runned using rez. :param cmd: command to run :type cmd: bool @@ -107,7 +108,11 @@ def rezWrapCommand(cmd, useCurrentContext=False, useRequestedContext=True, other rezBin = os.path.join(os.environ["REZ_PACKAGES_ROOT"], "bin/rez") elif shutil.which("rez"): rezBin = shutil.which("rez") - return f"{rezBin} env {packagesStr} -- {cmd}" + addEnvCmd = "" + if additionalEnv: + envVars = " ".join([f'{k}="{v}"' for k, v in additionalEnv.items()]) + addEnvCmd = f"env {envVars} " + return f"{rezBin} env {packagesStr} -- {addEnvCmd}{cmd}" return cmd @@ -235,111 +240,72 @@ def getChunks(chunkParams) -> list[Chunk]: it = [Chunk(i, item[0], item[-1]) for i, item in enumerate(slices) if i not in ignoreIterations] return it - @staticmethod - def getExpandWrappedCmd(cmdArgs, rezPackages): + def getExpandWrappedCmd(self, cmdArgs, rezPackages): # Wrap with create_chunks cmdBin = wrapMeshroomBin("meshroom_createChunks") cmd = f"{cmdBin} --submitter LocalFarm {cmdArgs}" # Wrap with rez - cmd = rezWrapCommand(cmd, otherRezPkg=rezPackages) + cmd = rezWrapCommand(cmd, otherRezPkg=rezPackages, additionalEnv=self.jobEnv) return cmd - def __createChunkTasks(self, job: Job, parentTask: Task, children: List[Task], chunkParams: dict) -> Task: - cmdArgs = chunkParams.get("chunkCmdArgs") - chunks = self.getChunks(chunkParams) - for c in chunks: - name = f"{parentTask.name}_{c.start}_{c.end}" - meta = parentTask.metadata.copy() - meta["iteration"] = c.iteration - cmdBin = wrapMeshroomBin("meshroom_compute") - cmd = f"{cmdBin} {cmdArgs} --iteration {c.iteration}" - cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages) - chunkTask = Task(name=name, command=cmd, metadata=meta, env=self.jobEnv) - job.addTask(chunkTask) - for child in children: - job.addTaskDependency(child, chunkTask) - job.addTaskDependency(chunkTask, parentTask) - - def createTask(self, meshroomFile: str, node) -> CreatedTask: - cmdArgs = f"--node {node.name} \"{meshroomFile}\" --extern" - metadata = {"nodeUid": node._uid} - - if not node._chunksCreated: + def createFarmTask(self, meshroomFile: str, orderedTask: OrderedTask, createdTasks: Dict[OrderedTask, Task]) -> Task: + metadata = dict() + if orderedTask.node: + metadata = {"nodeUid": orderedTask.node._uid} + + if orderedTask.iteration >= 0: + metadata["iteration"] = orderedTask.iteration + elif orderedTask.taskType == OrderedTaskType.PREPROCESS: + metadata["iteration"] = "preprocess" + elif orderedTask.taskType == OrderedTaskType.POSTPROCESS: + metadata["iteration"] = "postprocess" + + if orderedTask.taskType == OrderedTaskType.PLACEHOLDER: + return Task(name=orderedTask.node.name if orderedTask.node else "", command="", metadata=metadata) + + cmdArgs = f"--node {orderedTask.node.name} \"{meshroomFile}\" --extern" + + if orderedTask.taskType == OrderedTaskType.EXPANDING: cmd = self.getExpandWrappedCmd(cmdArgs, self.reqPackages) - task = Task(name=node.name, command=cmd, metadata=metadata, env=self.jobEnv) - task = CreatedTask(task, None) - - elif node.isParallelized: - _, _, nbBlocks = node.nodeDesc.parallelization.getSizes(node) - iterationsToIgnore = [] - for c in node._chunks: - if c._status.status == Status.SUCCESS: - iterationsToIgnore.append(c.range.iteration) - chunkParams = { - "start": 0, "end": nbBlocks - 1, "step": 1, - "ignoreIterations": iterationsToIgnore, - "chunkCmdArgs": cmdArgs - } - task = Task(name=node.name, command="", metadata=metadata, env=self.jobEnv) - task = CreatedTask(task, chunkParams) - + task = Task(name=orderedTask.node.name, command=cmd, metadata=metadata, env=self.jobEnv) else: cmdBin = wrapMeshroomBin("meshroom_compute") - cmd = f"{cmdBin} {cmdArgs} --iteration 0" - cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages) - task = Task(name=node.name, command=cmd, metadata=metadata, env=self.jobEnv) - task = CreatedTask(task, None) - - print("Created task: ", task) + cmd = f"{cmdBin} {cmdArgs}" + if orderedTask.taskType == OrderedTaskType.PREPROCESS: + cmd += f" --preprocess" + elif orderedTask.taskType == OrderedTaskType.POSTPROCESS: + cmd += f" --postprocess" + elif orderedTask.taskType == OrderedTaskType.CHUNK: + cmd += f" --iteration {orderedTask.iteration}" + cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages, additionalEnv=self.jobEnv) + task = Task(name=orderedTask.node.name, command=cmd, metadata=metadata, env=self.jobEnv) return task - def buildDependencies(self, job: Job, nodeUidToTask: Dict[str, CreatedTask], edges): - """ Gather and create dependencies. - First we get all parents and all children for each task. - Then for each task: - - we add the dependency to their parent and children - - if the task is a chunked task (which means multi iteration tasks) the we create the - chunk tasks and add dependencies from chunk tasks to children tasks - - # TODO: there's a lot of confusion between nodes and tasks here - """ - # Gather dependencies - tasksParentsUids = defaultdict(set) - tasksChildrenUids = defaultdict(set) - for u, v in edges: - # tasksParentsUids[v._uid].add(u._uid) - # tasksChildrenUids[u._uid].add(v._uid) - tasksParentsUids[u._uid].add(v._uid) - tasksChildrenUids[v._uid].add(u._uid) - # Create dependencies - for taskUid, createdTask in nodeUidToTask.items(): - parentsTasks = [nodeUidToTask[tuid].task for tuid in tasksParentsUids.get(taskUid, set())] - childrenTasks = [nodeUidToTask[tuid].task for tuid in tasksChildrenUids.get(taskUid, set())] - # Create regular dependencies - for parentTask in parentsTasks: - job.addTaskDependency(createdTask.task, parentTask) - for childTask in childrenTasks: - job.addTaskDependency(childTask, createdTask.task) - # Create chunk tasks if necessary - if createdTask.chunkParams: - self.__createChunkTasks(job, createdTask.task, childrenTasks, createdTask.chunkParams) - - def createJob(self, nodes, edges, filepath, submitLabel="{projectName}") -> LocalFarmJob: + def createJob(self, orderedTasks, filepath, submitLabel="{projectName}") -> LocalFarmJob: projectName = os.path.splitext(os.path.basename(filepath))[0] name = submitLabel.format(projectName=projectName) # Create job job = Job(name) + # Create tasks - nodeUidToTask: Dict[str, CreatedTask] = {} - for node in nodes: - if node._uid in nodeUidToTask: - continue # HACK: Should not be necessary - createdTask: CreatedTask = self.createTask(filepath, node) - job.addTask(createdTask.task) - nodeUidToTask[node._uid] = createdTask - # Build dependencies - self.buildDependencies(job, nodeUidToTask, edges) + orderedTasks.display() + createdTasks: Dict[OrderedTask, Task] = dict() + for taskToCreate in orderedTasks.iterOnTasks(): + if taskToCreate in createdTasks.keys(): + continue + createdTask = self.createFarmTask(filepath, taskToCreate, createdTasks) + job.addTask(createdTask) + createdTasks[taskToCreate] = createdTask + + for orderedTask, task in createdTasks.items(): + print(orderedTask, "->", task) + + for orderedTask, task in createdTasks.items(): + deps = [createdTasks.get(t) for t in orderedTask.dependencies] + for dependency in deps: + job.addTaskDependency(task, dependency) + # Submit job engine = LocalFarmEngine(self.farmPath) res = job.submit(engine) @@ -378,7 +344,7 @@ def createChunkTask(self, node, graphFile, **kwargs): metadata = {"nodeUid": node._uid, "iteration": chunk.iteration} cmdBin = wrapMeshroomBin("meshroom_compute") cmd = f"{cmdBin} {cmdArgs} --iteration {chunk.iteration}" - cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages) + cmd = rezWrapCommand(cmd, otherRezPkg=self.reqPackages, additionalEnv=self.jobEnv) print("Additional chunk task command: ", cmd) task = Task(name=name, command=cmd, metadata=metadata, env=taskEnv) engine.create_additional_task(currentJid, currentTid, task) diff --git a/meshroom/ui/qml/GraphEditor/ChunksListView.qml b/meshroom/ui/qml/GraphEditor/ChunksListView.qml index 5b9d319a6e..f1b008bd14 100644 --- a/meshroom/ui/qml/GraphEditor/ChunksListView.qml +++ b/meshroom/ui/qml/GraphEditor/ChunksListView.qml @@ -11,21 +11,39 @@ import Utils 1.0 ColumnLayout { id: root + enum IndexItems { + NULL=-3, + PREPROCESS=-2, + POSTPROCESS=-1 + } + property var uigraph: null property variant chunks property int currentIndex: 0 - property variant currentChunk: (chunks && currentIndex >= 0) ? chunks.at(currentIndex) : undefined + + function getCurrentChunkIndex() { + if ( currentIndex == undefined || !chunks || currentIndex == ChunksListView.IndexItems.NULL ) { return -1 } + let hasPreprocess = chunks.some(function(chk) { return chk.chunkIndex == ChunksListView.IndexItems.PREPROCESS }) + let hasPostprocess = chunks.some(function(chk) { return chk.chunkIndex == ChunksListView.IndexItems.POSTPROCESS }) + if ( currentIndex == ChunksListView.IndexItems.PREPROCESS ) return hasPreprocess ? 0 : -1 // Preprocess chunk + if ( currentIndex == ChunksListView.IndexItems.POSTPROCESS ) return hasPostprocess ? chunks.length - 1 : -1 // Postprocess chunk + return currentIndex + (hasPreprocess ? 1 : 0) // Process Chunk + } + + property int currentItemIndex: getCurrentChunkIndex() + + property variant currentChunk: (currentItemIndex >= 0 && chunks && chunks.length > currentItemIndex) ? chunks[currentItemIndex].chunk : undefined onChunksChanged: { // When the list changes, ensure the current index is in the new range if (!chunks) - currentIndex = -1 - else if (currentIndex >= chunks.count) - currentIndex = chunks.count-1 + currentIndex = ChunksListView.IndexItems.NULL + else if (currentIndex >= chunks.length) + currentIndex = chunks.length-1 } // chunksSummary is in sync with allChunks button (but not directly accessible as it is in a Component) - property bool chunksSummary: (currentIndex === -1) + property bool chunksSummary: (currentItemIndex === -1) width: 60 @@ -61,31 +79,23 @@ ColumnLayout { checked = summaryEnabled } onClicked: { - root.currentIndex = -1 + root.currentIndex = ChunksListView.IndexItems.NULL checked = true } } } - highlight: Component { - Rectangle { - visible: true // !root.chunksSummary - color: activePalette.highlight - opacity: 0.3 - z: 2 - } - } - highlightMoveDuration: 0 - highlightResizeDuration: 0 delegate: ItemDelegate { id: chunkDelegate - property var chunk: object - text: index + property var chunk: modelData.chunk + text: modelData.name + highlighted: (currentItemIndex >= 0 && index == currentItemIndex) + property int chunkIndex: modelData.chunkIndex width: ListView.view.width leftPadding: 8 onClicked: { chunksLV.forceActiveFocus() - root.currentIndex = index + root.currentIndex = chunkIndex } Rectangle { width: 4 @@ -98,8 +108,8 @@ ColumnLayout { Connections { target: _currentScene function onSelectedChunkChanged() { - for (var i = 0; i < root.chunks.count; i++) { - if (_currentScene.selectedChunk === root.chunks.at(i)) { + for (var i = 0; i < root.chunks.length; i++) { + if (_currentScene.selectedChunk === root.chunks[i].chunk) { root.currentIndex = i break; } diff --git a/meshroom/ui/qml/GraphEditor/NodeEditor.qml b/meshroom/ui/qml/GraphEditor/NodeEditor.qml index 7158edef24..a94141473f 100644 --- a/meshroom/ui/qml/GraphEditor/NodeEditor.qml +++ b/meshroom/ui/qml/GraphEditor/NodeEditor.qml @@ -441,9 +441,9 @@ Panel { ChunksListView { id: chunksLV enabled: root.node ? root.node.chunksCreated : false - chunks: root.node ? root.node.chunks : null + chunks: root.node ? root.node.allChunks : null visible: enabled && (tabBar.currentIndex >= 1 && tabBar.currentIndex <= 3) - SplitView.preferredWidth: 55 + SplitView.preferredWidth: 88 // Just fit to display "postprocess" SplitView.minimumWidth: 20 } diff --git a/meshroom/ui/qml/GraphEditor/NodeLog.qml b/meshroom/ui/qml/GraphEditor/NodeLog.qml index 163ffd4213..fdc7426e09 100644 --- a/meshroom/ui/qml/GraphEditor/NodeLog.qml +++ b/meshroom/ui/qml/GraphEditor/NodeLog.qml @@ -27,7 +27,7 @@ FocusScope { clip: true anchors.fill: parent - property string currentFile: (root.currentChunkIndex >= 0 && root.currentChunk) ? root.currentChunk["logFile"] : "" + property string currentFile: root.currentChunk ? root.currentChunk.logFile: "" property url sourceFile: Filepath.stringToUrl(currentFile) sourceComponent: textFileViewerComponent diff --git a/meshroom/ui/qml/Utils/Colors.qml b/meshroom/ui/qml/Utils/Colors.qml index 0d5b2c8936..5a36dec4b4 100644 --- a/meshroom/ui/qml/Utils/Colors.qml +++ b/meshroom/ui/qml/Utils/Colors.qml @@ -52,7 +52,7 @@ QtObject { ] function getChunkColor(chunk, overrides) { - if (chunk === undefined) + if (chunk === undefined || chunk == null) return "transparent" if (overrides && chunk.statusName in overrides) { return overrides[chunk.statusName] diff --git a/tests/plugins/meshroom/pluginSubmitter/PluginSubmitter.py b/tests/plugins/meshroom/pluginSubmitter/PluginSubmitter.py index c438bc487a..30444d75ed 100644 --- a/tests/plugins/meshroom/pluginSubmitter/PluginSubmitter.py +++ b/tests/plugins/meshroom/pluginSubmitter/PluginSubmitter.py @@ -8,7 +8,7 @@ LOGGER = logging.getLogger("TestSubmit") -class PluginSubmitterA(desc.BaseNode): +class PluginSubmitterA(desc.Node): """ Test process no parallelization """ @@ -16,18 +16,31 @@ class PluginSubmitterA(desc.BaseNode): inputs = [ desc.IntParam( - name="input", - label="Input", - description="input", + name="nbChunks", + label="nbChunks", + description="Nb Chunks", value=1, + exposed=True + ), + desc.ListAttribute( + elementDesc=desc.File( + name="inputfile", + label="Input file", + description="", + value="", + ), + name="inputs", + label="inputs", + description="inputs", + exposed=True, ), ] outputs = [ - desc.IntParam( + desc.File( name="output", label="Output", description="Output", - value=None, + value="", ), ] @@ -45,10 +58,19 @@ class PluginSubmitterB(PluginSubmitterA): size = desc.StaticNodeSize(2) parallelization = desc.Parallelization(blockSize=1) + def postprocess(self, node): + LOGGER.info(f"> PluginSubmitterB postprocess Done") + class PluginSubmitterC(PluginSubmitterA): """ Test process with parallelization and dynamic node size """ - size = desc.DynamicNodeSize("input") + size = desc.DynamicNodeSize("nbChunks") parallelization = desc.Parallelization(blockSize=1) + + def preprocess(self, node): + LOGGER.info(f"> PluginSubmitterC preprocess Done") + + def postprocess(self, node): + LOGGER.info(f"> PluginSubmitterC postprocess Done") diff --git a/tests/test_compute.py b/tests/test_compute.py index f28e436c25..569e1ebf20 100644 --- a/tests/test_compute.py +++ b/tests/test_compute.py @@ -13,7 +13,7 @@ from meshroom.core.graph import Graph, loadGraph from meshroom.core import desc, pluginManager, loadClassesNodes -from meshroom.core.node import Status +from meshroom.core.node import Status, ChunkIndex from meshroom.core.plugins import Plugin from .utils import registerNodeDesc, unregisterNodeDesc @@ -23,21 +23,21 @@ def executeChunks(node, size): os.makedirs(node.internalFolder) logFiles = {} + node.preprocess() for chunkIndex in range(size): - iteration = chunkIndex if size > 1 else -1 + iteration = chunkIndex if size > 0 else ChunkIndex.NONE logFileName = f"{chunkIndex}.log" logFile = Path(node.internalFolder) / logFileName logFiles[chunkIndex] = logFile logFile.touch() node.prepareLogger(iteration) - node.preprocess() if size > 1: chunk = node.chunks[chunkIndex] chunk.process(True, True) else: node.process(True, True) - node.postprocess() node.restoreLogger() + node.postprocess() return logFiles @@ -102,6 +102,26 @@ def process(self, node): LOGGER.info(f"> {node.name}") +class TestNodeD(TestNodeC): + """ + Implement preprocess / postprocess methods + """ + def preprocess(self, node): + LOGGER.info(f"> {node.name} (preprocess)") + + def postprocess(self, node): + LOGGER.info(f"> {node.name} (postprocess)") + + +class TestNodeE(TestNodeC): + """ + Implement preprocess / postprocess methods + """ + def preprocess(self, node): + raise RuntimeError() + + + class TestNodeLogger: """ Test that the logger is correctly set up during the different stages of the compute and that logs are correctly @@ -115,12 +135,14 @@ def setup_class(cls): registerNodeDesc(TestNodeA) registerNodeDesc(TestNodeB) registerNodeDesc(TestNodeC) + registerNodeDesc(TestNodeD) @classmethod def teardown_class(cls): unregisterNodeDesc(TestNodeA) unregisterNodeDesc(TestNodeB) unregisterNodeDesc(TestNodeC) + unregisterNodeDesc(TestNodeD) def test_processChunks(self, tmp_path): graph = Graph("") @@ -159,6 +181,26 @@ def test_process(self, tmp_path): reg = re.compile(self.logPrefix + "TestNodeC_1") assert len(reg.findall(content)) == 1 + def test_prepostprocess(self, tmp_path): + graph = Graph("") + graph._cacheDir = tmp_path + node = graph.addNewNode(TestNodeD.__name__) + # Compute + logFiles = executeChunks(node, 1) + chunkLog = logFiles[0] + root = chunkLog.parent + preprocessLog = root / "preprocess.log" + postprocessLog = root / "postprocess.log" + def check_file(path, suffix = ""): + with open(path, "r") as f: + content = f.read() + suffix = "" + reg = re.compile(self.logPrefix + "TestNodeD_1" + suffix) + assert len(reg.findall(content)) == 1 + check_file(preprocessLog, " (preprocess)") + check_file(chunkLog, "") + check_file(postprocessLog, " postprocess") + class TestLockUpdates: """ @@ -502,3 +544,73 @@ def test_correctSizeUpdate(self, graphSavedOnDisk): nodeC.createChunks() nodeC.process(inCurrentEnv=True) self.checkNodeSizeAndStatus(nodeC, 2, 2, Status.SUCCESS) + + +class TestPrePostProcess: + """ + Test that preprocess and postprocess are correctly executed + """ + @classmethod + def setup_class(cls): + registerNodeDesc(TestNodeD) + registerNodeDesc(TestNodeE) + + @classmethod + def teardown_class(cls): + unregisterNodeDesc(TestNodeD) + unregisterNodeDesc(TestNodeE) + + def test_status(self, graphSavedOnDisk): + graph: Graph = graphSavedOnDisk + node = graph.addNewNode(TestNodeD.__name__) + graph.save() + os.makedirs(node.internalFolder) + + # Check node + assert len(node.chunks) == 1 + assert node.nodeDesc._hasPreprocess + assert node.nodeDesc._hasPostprocess + + # Check status before + assert node.globalStatus == Status.NONE.name + assert node.chunks[0]._status.status == Status.NONE + assert node._preprocessChunk._status.status == Status.NONE + assert node._postprocessChunk._status.status == Status.NONE + + # Process + node.preprocess(inCurrentEnv=True) + node.process(inCurrentEnv=True) + node.postprocess(inCurrentEnv=True) + + # Check status after + assert node.globalStatus == Status.SUCCESS.name + assert node.chunks[0]._status.status == Status.SUCCESS + assert node._preprocessChunk._status.status == Status.SUCCESS + assert node._postprocessChunk._status.status == Status.SUCCESS + + def test_failingpreprocess(self, graphSavedOnDisk): + graph: Graph = graphSavedOnDisk + node = graph.addNewNode(TestNodeE.__name__) + graph.save() + os.makedirs(node.internalFolder) + + # Check status before + assert node.globalStatus == Status.NONE.name + assert node._preprocessChunk._status.status == Status.NONE + assert node.chunks[0]._status.status == Status.NONE + + # Process + try: + node.preprocess(inCurrentEnv=True) + except Exception: + pass + else: + raise RuntimeError + # We execute the process because we know this will succeed and + # we want to test that the failed preprocess leads the global status + node.process(inCurrentEnv=True) + + # Check status after + assert node.globalStatus == Status.ERROR.name + assert node.chunks[0]._status.status == Status.SUCCESS + assert node._preprocessChunk._status.status == Status.ERROR diff --git a/tests/test_submit.py b/tests/test_submit.py index d0ce9a77f8..a8fa415004 100644 --- a/tests/test_submit.py +++ b/tests/test_submit.py @@ -15,11 +15,17 @@ from meshroom.core.graph import Graph from meshroom.core.plugins import Plugin from meshroom.core.node import Node, Status +from meshroom.core.submitter import BaseSubmitter from meshroom.core.submitter import jobManager +from meshroom.core.submitter import OrderedTask, OrderedTasks, OrderedTaskType from meshroom.submitters.localFarmSubmitter import LocalFarmSubmitter, LocalFarmJob from localfarm.localFarmLauncher import FarmLauncher +import logging +from meshroom.core.submitter import logger +logger.setLevel(logging.DEBUG) + IS_LINUX = (platform == "linux" or platform == "linux2") @@ -46,18 +52,25 @@ def waitForNodeCompletion(job: LocalFarmJob, node: Node, timeout=25): print(f"Waiting for node {node.name} to complete...") startTime = time.time() while True: + time.sleep(1) + if time.time() - startTime > timeout: + raise TimeoutError(f"Node {node.name} did not complete within {timeout} seconds") + # Check for job error + err = job.getJobErrors() + if err: + raise RuntimeError(f"Job encountered an error: {err}") + # Check that all tasks are finished + for task in job.localfarmTasks.values(): + if task.get("status") not in (Status.NONE.name, Status.SUCCESS.name, Status.STOPPED.name, Status.ERROR.name): + continue + break + # Stop if the node switched to done node.updateStatusFromCache() nodeStatus = node.getGlobalStatus() if nodeStatus not in (Status.SUBMITTED, Status.RUNNING): print(f"Node status switched to {nodeStatus}") return - # Check for job error - err = job.getJobErrors() - if err: - raise RuntimeError(f"Job encountered an error: {err}") - if time.time() - startTime > timeout: - raise TimeoutError(f"Node {node.name} did not complete within {timeout} seconds") - time.sleep(1) + def processSubmit(node: Node, graph, tmp_path): """ @@ -93,6 +106,7 @@ def processSubmit(node: Node, graph, tmp_path): except Exception as e: error = e finally: + farmLauncher.status(allInfo=True) farmLauncher.stop() if error: raise error @@ -124,32 +138,95 @@ def teardown_class(cls): pluginManager.unregisterNode(node) pluginManager.removePlugin(cls.plugin) cls.plugin = None - - def setupNode(self, graph, name): + + def registerNode(self, name): plugin = pluginManager.getPlugin("pluginSubmitter") node = plugin.nodes[name] nodeType = node.nodeDescriptor registerNodeDesc(nodeType) - node = graph.addNewNode(nodeType.__name__) + return nodeType.__name__ + + def addNewNode(self, graph, name, nodeParams=None): + nodeTypeName = self.registerNode(name) + nodeParams = nodeParams or {} + node = graph.addNewNode(nodeTypeName, **nodeParams) return node + def test_orderTasks(self): + """ Here is the example we use for testing : + *" [B chk_0] "* + [phd start_A] - [A chk] - [phd end_A] - [phd start_B] [B post] - [C pre] - [C exp] - [C post] - [phd root] + *_ [B chk_1] _* + phd=placeholder (no command/process executed) + chk=chunk + exp=expand + """ + graph = Graph("") + # Add nodes + nodeA = self.addNewNode(graph, "PluginSubmitter"+"A", nodeParams={}) + nodeB = self.addNewNode(graph, "PluginSubmitter"+"B", nodeParams={"inputs": [nodeA.output]}) + nodeC = self.addNewNode(graph, "PluginSubmitter"+"C", nodeParams={"inputs": [nodeB.output]}) + # Order tasks + submitter = get_submitter() + nodes, edges = graph.dfsOnFinish(startNodes=[nodeC]) + orderedTasks = OrderedTasks(nodes, edges) + # === Test result === + def checkTask(task, taskType, nbDependencies): + assert task.taskType == taskType + assert len(task.dependencies) == nbDependencies + # root + rootTask = orderedTasks.rootTask + checkTask(rootTask, OrderedTaskType.PLACEHOLDER, 1) + # C (post) + task: OrderedTask = rootTask.dependencies[0] + checkTask(task, OrderedTaskType.POSTPROCESS, 1) + # C (expand) + task: OrderedTask = task.dependencies[0] + checkTask(task, OrderedTaskType.EXPANDING, 1) + # C (pre) + task: OrderedTask = task.dependencies[0] + checkTask(task, OrderedTaskType.PREPROCESS, 1) + # B (post) + task: OrderedTask = task.dependencies[0] + checkTask(task, OrderedTaskType.POSTPROCESS, 2) + # B (chunks) + task_0: OrderedTask = task.dependencies[0] + task_1: OrderedTask = task.dependencies[1] + checkTask(task_0, OrderedTaskType.CHUNK, 1) + checkTask(task_1, OrderedTaskType.CHUNK, 1) + assert (task_0.iteration, task_1.iteration) == (0, 1) + assert task_0.dependencies[0] == task_1.dependencies[0] + # B (pre) + task: OrderedTask = task_0.dependencies[0] + checkTask(task, OrderedTaskType.PLACEHOLDER, 1) + # A (post) + task: OrderedTask = task.dependencies[0] + checkTask(task, OrderedTaskType.PLACEHOLDER, 1) + # A (chunks) + task: OrderedTask = task.dependencies[0] + checkTask(task, OrderedTaskType.CHUNK, 1) + assert task.iteration == -1 + # A (pre) + task: OrderedTask = task.dependencies[0] + checkTask(task, OrderedTaskType.PLACEHOLDER, 0) + def test_submitNoParallel(self, tmp_path): graph = Graph("") graph._cacheDir = os.path.join(tmp_path, "cache") - node = self.setupNode(graph, "PluginSubmitterA") + node = self.addNewNode(graph, "PluginSubmitterA") # Submit processSubmit(node, graph, tmp_path) def test_submitStaticSize(self, tmp_path): graph = Graph("") graph._cacheDir = os.path.join(tmp_path, "cache") - node = self.setupNode(graph, "PluginSubmitterB") + node = self.addNewNode(graph, "PluginSubmitterB") # Submit processSubmit(node, graph, tmp_path) def test_submitDynamicSize(self, tmp_path): graph = Graph("") graph._cacheDir = os.path.join(tmp_path, "cache") - node = self.setupNode(graph, "PluginSubmitterC") + node = self.addNewNode(graph, "PluginSubmitterC") # Submit processSubmit(node, graph, tmp_path)