diff --git a/kolibri/core/tasks/job.py b/kolibri/core/tasks/job.py index 6133a8b3774..1c2797f1002 100644 --- a/kolibri/core/tasks/job.py +++ b/kolibri/core/tasks/job.py @@ -392,15 +392,13 @@ def execute(self): def task(self): """ In theory we could read this from the task registry instead - but as this is running inside an ephemeral task runner thread - or process, we can potentially save ourselves some initialization - time and memory by just importing just this function - whereas initializing - the registry would import all of the registered tasks for this Kolibri. - This is less of an issue when the task runner is using threads and has - shared memory, but when it is using multiprocessing or is running in another - context, this will save some time. - - We don't bother caching this property, as we rely on the Python module import cache instead. + but as this is running inside an ephemeral task runner thread, + we can potentially save ourselves some initialization time and memory + by importing just this function - whereas initializing the registry + would import all of the registered tasks for this Kolibri. + + We don't bother caching this property, as we rely on the Python module + import cache instead. """ return import_path_to_callable(self.func) diff --git a/kolibri/core/tasks/main.py b/kolibri/core/tasks/main.py index 98c500fca15..ac9f92328c6 100644 --- a/kolibri/core/tasks/main.py +++ b/kolibri/core/tasks/main.py @@ -19,10 +19,9 @@ def __job_storage(): """ :type: Storage """ -def initialize_workers(log_queue=None): +def initialize_workers(): logger.info("Starting async task workers.") return Worker( regular_workers=conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"], high_workers=conf.OPTIONS["Tasks"]["HIGH_PRIORITY_WORKERS"], - log_queue=log_queue, ) diff --git a/kolibri/core/tasks/test/taskrunner/conftest.py b/kolibri/core/tasks/test/taskrunner/conftest.py deleted file mode 100644 index 129e3c4dbd3..00000000000 --- a/kolibri/core/tasks/test/taskrunner/conftest.py +++ /dev/null @@ -1,30 +0,0 @@ -import pytest - -from kolibri.utils import multiprocessing_compat - - -@pytest.fixture(params=[False, True], autouse=True) -def mock_compat(request, monkeypatch): - - if request.param: - from multiprocessing import Process as Thread # noqa - from multiprocessing import Event # noqa - from concurrent.futures import ProcessPoolExecutor as PoolExecutor # noqa - - class local: - """ - Dummy class to use for a local object for multiprocessing - """ - - pass - - else: - from threading import Thread # noqa - from threading import Event # noqa - from threading import local # noqa - from concurrent.futures import ThreadPoolExecutor as PoolExecutor # noqa - - monkeypatch.setattr(multiprocessing_compat, "Thread", Thread) - monkeypatch.setattr(multiprocessing_compat, "Event", Event) - monkeypatch.setattr(multiprocessing_compat, "local", local) - monkeypatch.setattr(multiprocessing_compat, "PoolExecutor", PoolExecutor) diff --git a/kolibri/core/tasks/test/taskrunner/test_job_running.py b/kolibri/core/tasks/test/taskrunner/test_job_running.py index 23716cf3b02..355219ab60a 100644 --- a/kolibri/core/tasks/test/taskrunner/test_job_running.py +++ b/kolibri/core/tasks/test/taskrunner/test_job_running.py @@ -1,5 +1,6 @@ import time import uuid +from threading import Event import pytest @@ -10,7 +11,6 @@ from kolibri.core.tasks.utils import get_current_job from kolibri.core.tasks.utils import import_path_to_callable from kolibri.core.tasks.worker import Worker -from kolibri.utils.multiprocessing_compat import Event @pytest.fixture diff --git a/kolibri/core/tasks/utils.py b/kolibri/core/tasks/utils.py index 7bdb2daaa85..21fd2230fbb 100644 --- a/kolibri/core/tasks/utils.py +++ b/kolibri/core/tasks/utils.py @@ -4,6 +4,8 @@ import sqlite3 import time import uuid +from threading import Event +from threading import local from threading import Thread import click @@ -18,7 +20,6 @@ from kolibri.core.sqlite.utils import repair_sqlite_db from kolibri.core.tasks.exceptions import UserCancelledError from kolibri.utils import conf -from kolibri.utils import multiprocessing_compat from kolibri.utils.options import FD_PER_THREAD from kolibri.utils.system import get_fd_limit @@ -28,7 +29,7 @@ # An object on which to store data about the current job # So far the only use is to track the job, but other metadata # could be added. -current_state_tracker = SimpleLazyObject(multiprocessing_compat.local) +current_state_tracker = SimpleLazyObject(local) def get_current_job(): @@ -77,7 +78,7 @@ def __init__(self, func, thread_name, wait_between_runs=1, *args, **kwargs): :param thread_name: the name of the thread to use during logging and debugging :param wait_between_runs: how many seconds to wait in between func calls. """ - self.shutdown_event = multiprocessing_compat.Event() + self.shutdown_event = Event() self.thread_name = thread_name self.thread_id = uuid.uuid4().hex self.logger = logging.getLogger( @@ -343,41 +344,28 @@ def fd_safe_executor(fds_per_task=2): Context manager to give an executor that should be safe for not overloading file descriptors. """ - # We should be deferring to conf.OPTIONS["Tasks"]["USE_WORKER_MULTIPROCESSING"] - # for this value, but unfortunately, the current way that the import logic - # is setup relies on shared memory that can only be used with threads. - use_multiprocessing = False - - executor = ( - concurrent.futures.ProcessPoolExecutor - if use_multiprocessing - else concurrent.futures.ThreadPoolExecutor - ) - max_workers = 10 - if not use_multiprocessing: - # If we're not using multiprocessing for workers, we may need - # to limit the number of workers depending on the number of allowed - # file descriptors. - # This is a heuristic method, where we know there can be issues if - # the max number of file descriptors for a process is 256, and we use 10 - # workers, with potentially 4 concurrent tasks downloading files. - # The number of concurrent tasks that might be downloading files is determined - # by the number of regular workers running in the task runner - # (although the high priority task queue could also be running a channel database download). - server_reserved_fd_count = ( - FD_PER_THREAD * conf.OPTIONS["Server"]["CHERRYPY_THREAD_POOL"] - ) - max_descriptors_per_task = ( - get_fd_limit() - server_reserved_fd_count - ) / conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"] - # Each task only needs to have a maximum of `fds_per_task` open file descriptors at once. - # To add tolerance, we divide the number of file descriptors that could be allocated to - # this task by double this number which should give us leeway in case of unforeseen - # descriptor use during the process. - max_workers = min( - max_workers, max(1, max_descriptors_per_task // (fds_per_task * 2)) - ) + # We may need to limit the number of workers depending on the number of + # allowed file descriptors. + # This is a heuristic method, where we know there can be issues if + # the max number of file descriptors for a process is 256, and we use 10 + # workers, with potentially 4 concurrent tasks downloading files. + # The number of concurrent tasks that might be downloading files is determined + # by the number of regular workers running in the task runner + # (although the high priority task queue could also be running a channel database download). + server_reserved_fd_count = ( + FD_PER_THREAD * conf.OPTIONS["Server"]["CHERRYPY_THREAD_POOL"] + ) + max_descriptors_per_task = ( + get_fd_limit() - server_reserved_fd_count + ) / conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"] + # Each task only needs to have a maximum of `fds_per_task` open file descriptors at once. + # To add tolerance, we divide the number of file descriptors that could be allocated to + # this task by double this number which should give us leeway in case of unforeseen + # descriptor use during the process. + max_workers = min( + max_workers, max(1, max_descriptors_per_task // (fds_per_task * 2)) + ) - return executor(max_workers=max_workers) + return concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) diff --git a/kolibri/core/tasks/worker.py b/kolibri/core/tasks/worker.py index 0b98da1df20..bafca403b5b 100644 --- a/kolibri/core/tasks/worker.py +++ b/kolibri/core/tasks/worker.py @@ -1,11 +1,11 @@ import logging from concurrent.futures import CancelledError +from concurrent.futures import ThreadPoolExecutor from django.db import connection as django_connection from kolibri.core.tasks.constants import Priority from kolibri.core.tasks.utils import InfiniteLoopThread -from kolibri.utils.multiprocessing_compat import PoolExecutor logger = logging.getLogger(__name__) @@ -16,7 +16,6 @@ def execute_job( worker_process=None, worker_thread=None, worker_extra=None, - log_queue=None, ): """ Call the function stored in the job.func. @@ -34,7 +33,7 @@ def execute_job( django_connection.close() -def execute_job_with_python_worker(job_id, log_queue=None): +def execute_job_with_python_worker(job_id): """ Call execute_job but additionally with the current host, process and thread information taken directly from python internals. @@ -48,12 +47,11 @@ def execute_job_with_python_worker(job_id, log_queue=None): worker_host=socket.gethostname(), worker_process=str(os.getpid()), worker_thread=str(threading.get_ident()), - log_queue=log_queue, ) class Worker: - def __init__(self, regular_workers=2, high_workers=1, log_queue=None): + def __init__(self, regular_workers=2, high_workers=1): # Internally, we use concurrent.future.Future to run and track # job executions. We need to keep track of which future maps to which # job they were made from, and we use the job_future_mapping dict to do @@ -75,8 +73,6 @@ def __init__(self, regular_workers=2, high_workers=1, log_queue=None): # High workers run only 'high' priority jobs. self.regular_workers = regular_workers self.max_workers = regular_workers + high_workers - # Track any log queue that is passed in - self.log_queue = log_queue self.workers = self.start_workers() self.job_checker = self.start_job_checker() @@ -100,7 +96,7 @@ def shutdown_workers(self, wait=True): self.workers.shutdown(wait=wait) def start_workers(self): - pool = PoolExecutor(max_workers=self.max_workers) + pool = ThreadPoolExecutor(max_workers=self.max_workers) return pool def handle_finished_future(self, future): @@ -194,7 +190,6 @@ def start_next_job(self, job): future = self.workers.submit( execute_job_with_python_worker, job_id=job.job_id, - log_queue=self.log_queue, ) # Check if the job ID already exists in the future_job_mapping dictionary diff --git a/kolibri/utils/logger.py b/kolibri/utils/logger.py index 68975761662..35793aa05ed 100644 --- a/kolibri/utils/logger.py +++ b/kolibri/utils/logger.py @@ -3,6 +3,7 @@ from logging.handlers import QueueHandler from logging.handlers import QueueListener from logging.handlers import TimedRotatingFileHandler +from queue import Queue from typing import Dict from typing import List from typing import Optional @@ -38,22 +39,7 @@ def __init__(self, queue, logger_name: str): self.logger_name = logger_name def prepare(self, record: logging.LogRecord) -> logging.LogRecord: - """Prepare a record for queuing, ensuring it can be pickled if needed""" - # Get Queue class at runtime to check if we need pickle safety - from kolibri.utils.multiprocessing_compat import use_multiprocessing - - # Only do pickle-safety preparation for logging if we're using multiprocessing - if use_multiprocessing(): - if hasattr(record, "exc_info") and record.exc_info: - record.exc_text = ( - logging.getLogger() - .handlers[0] - .formatter.formatException(record.exc_info) - ) - record.exc_info = None - if hasattr(record, "args"): - record.args = tuple(str(arg) for arg in record.args) - + """Prepare a record for queuing.""" record = super().prepare(record) record._logger_name = self.logger_name return record @@ -422,10 +408,6 @@ def setup_queue_logging() -> LoggerAwareQueueListener: Sets up queue-based logging for the main process. Returns the queue listener which can be used to stop logging and clean up. """ - # Import Queue at function scope to avoid import order issues - from kolibri.utils.multiprocessing_compat import Queue - - # Create queue using Kolibri's compatibility Queue log_queue = Queue() # Replace handlers and get original configurations @@ -438,30 +420,10 @@ def setup_queue_logging() -> LoggerAwareQueueListener: return listener -def setup_worker_logging(queue) -> None: - """Sets up logging in a worker to use the queue if not already configured.""" - try: - _replace_handlers_with_queue(queue) - except QueueLoggingInitializedError: - pass - - def cleanup_queue_logging(listener: Optional[LoggerAwareQueueListener]) -> None: - """ - Stops the queue listener and cleans up multiprocessing resources if needed. - """ + """Stops the queue listener.""" if not listener: return # Stop the listener to ensure pending logs are processed listener.stop() - - # Clean up queue if it's a multiprocessing queue - from kolibri.utils.multiprocessing_compat import use_multiprocessing - - if use_multiprocessing(): - try: - listener.queue.close() - listener.queue.join_thread() - except (ValueError, AttributeError): - pass diff --git a/kolibri/utils/multiprocessing_compat.py b/kolibri/utils/multiprocessing_compat.py deleted file mode 100644 index bfc0aa41687..00000000000 --- a/kolibri/utils/multiprocessing_compat.py +++ /dev/null @@ -1,59 +0,0 @@ -import multiprocessing -import threading -from concurrent import futures -from queue import Queue as ThreadingQueue - -from kolibri.utils.conf import OPTIONS - - -def use_multiprocessing(): - try: - if not OPTIONS["Tasks"]["USE_WORKER_MULTIPROCESSING"]: - raise ImportError() - # Import in order to check if multiprocessing is supported on this platform - from multiprocessing import synchronize # noqa - - return True - except ImportError: - return False - - -def Thread(*args, **kwargs): - if use_multiprocessing(): - return multiprocessing.Process(*args, **kwargs) - return threading.Thread(*args, **kwargs) - - -def Event(*args, **kwargs): - if use_multiprocessing(): - return multiprocessing.Event(*args, **kwargs) - return threading.Event(*args, **kwargs) - - -def Queue(*args, **kwargs): - if use_multiprocessing(): - return multiprocessing.Queue(*args, **kwargs) - return ThreadingQueue(*args, **kwargs) - - -class _Local: - """ - Dummy class to use for a local object for multiprocessing - """ - - pass - - -def local(*args, **kwargs): - if use_multiprocessing(): - # any variable is local to a process, so this is - # just a dummy - - return _Local(*args, **kwargs) - return threading.local(*args, **kwargs) - - -def PoolExecutor(*args, **kwargs): - if use_multiprocessing(): - return futures.ProcessPoolExecutor(*args, **kwargs) - return futures.ThreadPoolExecutor(*args, **kwargs) diff --git a/kolibri/utils/options.py b/kolibri/utils/options.py index 19846d88f37..01d46dbebcb 100644 --- a/kolibri/utils/options.py +++ b/kolibri/utils/options.py @@ -17,7 +17,6 @@ from configobj import get_extra_values from django.utils.functional import SimpleLazyObject from django.utils.module_loading import import_string -from validate import is_boolean from validate import is_option from validate import Validator from validate import VdtTypeError @@ -253,24 +252,6 @@ def url_prefix(value): return value.lstrip("/").rstrip("/") + "/" -def multiprocess_bool(value): - """ - Validate the boolean value of a multiprocessing option. - Do this by checking it's a boolean, and also that multiprocessing - can be imported properly on this platform. - """ - value = is_boolean(value) - try: - if not value: - raise ImportError() - # Import in order to check if multiprocessing is supported on this platform - from multiprocessing import synchronize # noqa - - return True - except ImportError: - return False - - def storage_option(value, *opts): """ Validate the storage options. @@ -800,14 +781,6 @@ def csp_source_list(value): }, }, "Tasks": { - "USE_WORKER_MULTIPROCESSING": { - "type": "multiprocess_bool", - "default": False, - "description": """ - Whether to use Python multiprocessing for worker pools. If False, then it will use threading. This may be useful, - if running on a dedicated device with multiple cores, and a lot of asynchronous tasks get run. - """, - }, "REGULAR_PRIORITY_WORKERS": { "type": "integer", "default": 4, @@ -843,7 +816,6 @@ def _get_validator(): "port": port, "url_prefix": url_prefix, "bytes": validate_bytes, - "multiprocess_bool": multiprocess_bool, "storage_option": storage_option, "cache_option": cache_option, "lazy_import_callback_list": lazy_import_callback_list, diff --git a/kolibri/utils/server/__init__.py b/kolibri/utils/server/__init__.py index a11b0e9d077..541146450f6 100644 --- a/kolibri/utils/server/__init__.py +++ b/kolibri/utils/server/__init__.py @@ -286,9 +286,7 @@ def START(self): from kolibri.core.tasks.main import initialize_workers # Initialize the iceqube engine to handle queued tasks - # Add a loose coupling between our LogPlugin and the ServicesPlugin - # by getting any log_queue that might be present on the bus - self.worker = initialize_workers(log_queue=getattr(self.bus, "log_queue", None)) + self.worker = initialize_workers() def STOP(self): if self.worker is not None: @@ -552,7 +550,6 @@ def ENTER(self): # which will reinitialize logging, and override # what we are doing here. self.queue_listener = setup_queue_logging() - self.bus.log_queue = self.queue_listener.queue def log(self, msg, level): logger.log(level, msg)