Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions kolibri/core/tasks/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,15 +392,13 @@ def execute(self):
def task(self):
"""
In theory we could read this from the task registry instead
but as this is running inside an ephemeral task runner thread
or process, we can potentially save ourselves some initialization
time and memory by just importing just this function - whereas initializing
the registry would import all of the registered tasks for this Kolibri.
This is less of an issue when the task runner is using threads and has
shared memory, but when it is using multiprocessing or is running in another
context, this will save some time.

We don't bother caching this property, as we rely on the Python module import cache instead.
but as this is running inside an ephemeral task runner thread,
we can potentially save ourselves some initialization time and memory
by importing just this function - whereas initializing the registry
would import all of the registered tasks for this Kolibri.

We don't bother caching this property, as we rely on the Python module
import cache instead.
"""
return import_path_to_callable(self.func)

Expand Down
3 changes: 1 addition & 2 deletions kolibri/core/tasks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ def __job_storage():
""" :type: Storage """


def initialize_workers(log_queue=None):
def initialize_workers():
logger.info("Starting async task workers.")
return Worker(
regular_workers=conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"],
high_workers=conf.OPTIONS["Tasks"]["HIGH_PRIORITY_WORKERS"],
log_queue=log_queue,
)
30 changes: 0 additions & 30 deletions kolibri/core/tasks/test/taskrunner/conftest.py

This file was deleted.

2 changes: 1 addition & 1 deletion kolibri/core/tasks/test/taskrunner/test_job_running.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
import uuid
from threading import Event

import pytest

Expand All @@ -10,7 +11,6 @@
from kolibri.core.tasks.utils import get_current_job
from kolibri.core.tasks.utils import import_path_to_callable
from kolibri.core.tasks.worker import Worker
from kolibri.utils.multiprocessing_compat import Event


@pytest.fixture
Expand Down
64 changes: 26 additions & 38 deletions kolibri/core/tasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import sqlite3
import time
import uuid
from threading import Event
from threading import local
from threading import Thread

import click
Expand All @@ -18,7 +20,6 @@
from kolibri.core.sqlite.utils import repair_sqlite_db
from kolibri.core.tasks.exceptions import UserCancelledError
from kolibri.utils import conf
from kolibri.utils import multiprocessing_compat
from kolibri.utils.options import FD_PER_THREAD
from kolibri.utils.system import get_fd_limit

Expand All @@ -28,7 +29,7 @@
# An object on which to store data about the current job
# So far the only use is to track the job, but other metadata
# could be added.
current_state_tracker = SimpleLazyObject(multiprocessing_compat.local)
current_state_tracker = SimpleLazyObject(local)


def get_current_job():
Expand Down Expand Up @@ -77,7 +78,7 @@ def __init__(self, func, thread_name, wait_between_runs=1, *args, **kwargs):
:param thread_name: the name of the thread to use during logging and debugging
:param wait_between_runs: how many seconds to wait in between func calls.
"""
self.shutdown_event = multiprocessing_compat.Event()
self.shutdown_event = Event()
self.thread_name = thread_name
self.thread_id = uuid.uuid4().hex
self.logger = logging.getLogger(
Expand Down Expand Up @@ -343,41 +344,28 @@ def fd_safe_executor(fds_per_task=2):
Context manager to give an executor that should be safe for not overloading
file descriptors.
"""
# We should be deferring to conf.OPTIONS["Tasks"]["USE_WORKER_MULTIPROCESSING"]
# for this value, but unfortunately, the current way that the import logic
# is setup relies on shared memory that can only be used with threads.
use_multiprocessing = False

executor = (
concurrent.futures.ProcessPoolExecutor
if use_multiprocessing
else concurrent.futures.ThreadPoolExecutor
)

max_workers = 10

if not use_multiprocessing:
# If we're not using multiprocessing for workers, we may need
# to limit the number of workers depending on the number of allowed
# file descriptors.
# This is a heuristic method, where we know there can be issues if
# the max number of file descriptors for a process is 256, and we use 10
# workers, with potentially 4 concurrent tasks downloading files.
# The number of concurrent tasks that might be downloading files is determined
# by the number of regular workers running in the task runner
# (although the high priority task queue could also be running a channel database download).
server_reserved_fd_count = (
FD_PER_THREAD * conf.OPTIONS["Server"]["CHERRYPY_THREAD_POOL"]
)
max_descriptors_per_task = (
get_fd_limit() - server_reserved_fd_count
) / conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"]
# Each task only needs to have a maximum of `fds_per_task` open file descriptors at once.
# To add tolerance, we divide the number of file descriptors that could be allocated to
# this task by double this number which should give us leeway in case of unforeseen
# descriptor use during the process.
max_workers = min(
max_workers, max(1, max_descriptors_per_task // (fds_per_task * 2))
)
# We may need to limit the number of workers depending on the number of
# allowed file descriptors.
# This is a heuristic method, where we know there can be issues if
# the max number of file descriptors for a process is 256, and we use 10
# workers, with potentially 4 concurrent tasks downloading files.
# The number of concurrent tasks that might be downloading files is determined
# by the number of regular workers running in the task runner
# (although the high priority task queue could also be running a channel database download).
server_reserved_fd_count = (
FD_PER_THREAD * conf.OPTIONS["Server"]["CHERRYPY_THREAD_POOL"]
)
max_descriptors_per_task = (
get_fd_limit() - server_reserved_fd_count
) / conf.OPTIONS["Tasks"]["REGULAR_PRIORITY_WORKERS"]
# Each task only needs to have a maximum of `fds_per_task` open file descriptors at once.
# To add tolerance, we divide the number of file descriptors that could be allocated to
# this task by double this number which should give us leeway in case of unforeseen
# descriptor use during the process.
max_workers = min(
max_workers, max(1, max_descriptors_per_task // (fds_per_task * 2))
)

return executor(max_workers=max_workers)
return concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
13 changes: 4 additions & 9 deletions kolibri/core/tasks/worker.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import logging
from concurrent.futures import CancelledError
from concurrent.futures import ThreadPoolExecutor

from django.db import connection as django_connection

from kolibri.core.tasks.constants import Priority
from kolibri.core.tasks.utils import InfiniteLoopThread
from kolibri.utils.multiprocessing_compat import PoolExecutor

logger = logging.getLogger(__name__)

Expand All @@ -16,7 +16,6 @@ def execute_job(
worker_process=None,
worker_thread=None,
worker_extra=None,
log_queue=None,
):
"""
Call the function stored in the job.func.
Expand All @@ -34,7 +33,7 @@ def execute_job(
django_connection.close()


def execute_job_with_python_worker(job_id, log_queue=None):
def execute_job_with_python_worker(job_id):
"""
Call execute_job but additionally with the current host, process and thread information taken
directly from python internals.
Expand All @@ -48,12 +47,11 @@ def execute_job_with_python_worker(job_id, log_queue=None):
worker_host=socket.gethostname(),
worker_process=str(os.getpid()),
worker_thread=str(threading.get_ident()),
log_queue=log_queue,
)


class Worker:
def __init__(self, regular_workers=2, high_workers=1, log_queue=None):
def __init__(self, regular_workers=2, high_workers=1):
# Internally, we use concurrent.future.Future to run and track
# job executions. We need to keep track of which future maps to which
# job they were made from, and we use the job_future_mapping dict to do
Expand All @@ -75,8 +73,6 @@ def __init__(self, regular_workers=2, high_workers=1, log_queue=None):
# High workers run only 'high' priority jobs.
self.regular_workers = regular_workers
self.max_workers = regular_workers + high_workers
# Track any log queue that is passed in
self.log_queue = log_queue

self.workers = self.start_workers()
self.job_checker = self.start_job_checker()
Expand All @@ -100,7 +96,7 @@ def shutdown_workers(self, wait=True):
self.workers.shutdown(wait=wait)

def start_workers(self):
pool = PoolExecutor(max_workers=self.max_workers)
pool = ThreadPoolExecutor(max_workers=self.max_workers)
return pool

def handle_finished_future(self, future):
Expand Down Expand Up @@ -194,7 +190,6 @@ def start_next_job(self, job):
future = self.workers.submit(
execute_job_with_python_worker,
job_id=job.job_id,
log_queue=self.log_queue,
)

# Check if the job ID already exists in the future_job_mapping dictionary
Expand Down
44 changes: 3 additions & 41 deletions kolibri/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from logging.handlers import QueueHandler
from logging.handlers import QueueListener
from logging.handlers import TimedRotatingFileHandler
from queue import Queue
from typing import Dict
from typing import List
from typing import Optional
Expand Down Expand Up @@ -38,22 +39,7 @@ def __init__(self, queue, logger_name: str):
self.logger_name = logger_name

def prepare(self, record: logging.LogRecord) -> logging.LogRecord:
"""Prepare a record for queuing, ensuring it can be pickled if needed"""
# Get Queue class at runtime to check if we need pickle safety
from kolibri.utils.multiprocessing_compat import use_multiprocessing

# Only do pickle-safety preparation for logging if we're using multiprocessing
if use_multiprocessing():
if hasattr(record, "exc_info") and record.exc_info:
record.exc_text = (
logging.getLogger()
.handlers[0]
.formatter.formatException(record.exc_info)
)
record.exc_info = None
if hasattr(record, "args"):
record.args = tuple(str(arg) for arg in record.args)

"""Prepare a record for queuing."""
record = super().prepare(record)
record._logger_name = self.logger_name
return record
Expand Down Expand Up @@ -422,10 +408,6 @@ def setup_queue_logging() -> LoggerAwareQueueListener:
Sets up queue-based logging for the main process.
Returns the queue listener which can be used to stop logging and clean up.
"""
# Import Queue at function scope to avoid import order issues
from kolibri.utils.multiprocessing_compat import Queue

# Create queue using Kolibri's compatibility Queue
log_queue = Queue()

# Replace handlers and get original configurations
Expand All @@ -438,30 +420,10 @@ def setup_queue_logging() -> LoggerAwareQueueListener:
return listener


def setup_worker_logging(queue) -> None:
"""Sets up logging in a worker to use the queue if not already configured."""
try:
_replace_handlers_with_queue(queue)
except QueueLoggingInitializedError:
pass


def cleanup_queue_logging(listener: Optional[LoggerAwareQueueListener]) -> None:
"""
Stops the queue listener and cleans up multiprocessing resources if needed.
"""
"""Stops the queue listener."""
if not listener:
return

# Stop the listener to ensure pending logs are processed
listener.stop()

# Clean up queue if it's a multiprocessing queue
from kolibri.utils.multiprocessing_compat import use_multiprocessing

if use_multiprocessing():
try:
listener.queue.close()
listener.queue.join_thread()
except (ValueError, AttributeError):
pass
59 changes: 0 additions & 59 deletions kolibri/utils/multiprocessing_compat.py

This file was deleted.

Loading
Loading