diff --git a/nexios_contrib/scheduler/__init__.py b/nexios_contrib/scheduler/__init__.py new file mode 100644 index 0000000..45d47e6 --- /dev/null +++ b/nexios_contrib/scheduler/__init__.py @@ -0,0 +1,98 @@ +""" +Nexios Scheduler - Job Scheduling for Nexios + +Provides interval-based, cron-based, and one-time job scheduling +integrated with the Nexios application lifecycle and dependency injection. +""" +from __future__ import annotations + +from typing import Optional + +from nexios import NexiosApp + +from .config import ( + CronTrigger, + DateTimeTrigger, + IntervalTrigger, + JobStatus, + SchedulerConfig, +) +from .dependency import SchedulerDepend, SchedulerDepends +from .manager import SchedulerManager +from .models import JobCallback, ScheduledJob + +__all__ = [ + # Main classes + "SchedulerManager", + "ScheduledJob", + "SchedulerConfig", + "JobStatus", + # Triggers + "IntervalTrigger", + "CronTrigger", + "DateTimeTrigger", + # Dependency injection + "SchedulerDepend", + "SchedulerDepends", + # Utility functions + "setup_scheduler", + "get_scheduler", +] + + +def setup_scheduler( + app: NexiosApp, config: Optional[SchedulerConfig] = None +) -> SchedulerManager: + """Set up the scheduler for a Nexios application. + + Initialises the ``SchedulerManager``, attaches it as ``app.scheduler``, + and registers the startup hook. + + Args: + app: The Nexios application instance. + config: Optional scheduler configuration. + + Returns: + The initialised ``SchedulerManager`` instance. + + Example:: + + from nexios import NexiosApp + from nexios_contrib.scheduler import ( + setup_scheduler, + IntervalTrigger, + ) + + app = NexiosApp() + scheduler = setup_scheduler(app) + + async def my_task(): + print("tick") + + scheduler.add_job(my_task, IntervalTrigger(seconds=30)) + """ + if not hasattr(app, "scheduler"): + scheduler = SchedulerManager(app, config=config) + app.scheduler = scheduler + app.on_startup(scheduler.start) + return app.scheduler + + +def get_scheduler(app: NexiosApp) -> SchedulerManager: + """Retrieve the scheduler instance from a Nexios app. + + Args: + app: The Nexios application instance. + + Returns: + The ``SchedulerManager`` instance. + + Raises: + AttributeError: If the scheduler has not been initialised. + """ + scheduler = getattr(app, "scheduler", None) + if scheduler is None: + raise AttributeError( + "Scheduler not initialised. Call setup_scheduler(app) during app setup." + ) + return scheduler diff --git a/nexios_contrib/scheduler/config.py b/nexios_contrib/scheduler/config.py new file mode 100644 index 0000000..c11ee3b --- /dev/null +++ b/nexios_contrib/scheduler/config.py @@ -0,0 +1,282 @@ +""" +Scheduler configuration for Nexios. + +This module provides configuration options and enums for the scheduler system. +""" +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Dict, Optional + + +class JobStatus(str, Enum): + """Status of a scheduled job.""" + + ACTIVE = "ACTIVE" + PAUSED = "PAUSED" + COMPLETED = "COMPLETED" + FAILED = "FAILED" + CANCELLED = "CANCELLED" + + +class TriggerType(str, Enum): + """Type of trigger for a scheduled job.""" + + INTERVAL = "INTERVAL" + CRON = "CRON" + DATETIME = "DATETIME" + + +@dataclass +class IntervalTrigger: + """Trigger that fires at fixed intervals. + + Attributes: + seconds: Number of seconds between runs. + minutes: Number of minutes between runs. + hours: Number of hours between runs. + days: Number of days between runs. + start_now: If True, the job runs immediately upon scheduling. + Otherwise, it waits for the first interval to elapse. + """ + + seconds: int = 0 + minutes: int = 0 + hours: int = 0 + days: int = 0 + start_now: bool = True + + def __post_init__(self) -> None: + if self.seconds < 0 or self.minutes < 0 or self.hours < 0 or self.days < 0: + raise ValueError("Interval values must be non-negative") + total = self.as_seconds() + if total <= 0: + raise ValueError("Total interval must be greater than 0 seconds") + + def as_seconds(self) -> float: + """Return the total interval in seconds.""" + return ( + self.days * 86400 + + self.hours * 3600 + + self.minutes * 60 + + self.seconds + ) + + +@dataclass +class CronTrigger: + """Trigger that fires based on a cron expression. + + Supports standard 5-field cron expressions: + minute hour day_of_month month day_of_week + + Each field supports: + - Exact values: ``5`` + - Wildcards: ``*`` + - Ranges: ``1-5`` + - Step values: ``*/5`` + - Lists: ``1,3,5`` + - Combinations: ``1-5,10`` + + Special strings: + ``"@hourly"``, ``"@daily"``, ``"@weekly"``, ``"@monthly"``, + ``"@yearly"``, ``"@every_minute"`` + + Args: + expr: Cron expression string (5-field or special alias). + """ + + expr: str + + def __post_init__(self) -> None: + resolved = self._resolve_alias(self.expr) + self._fields = self._parse_expression(resolved) + + @staticmethod + def _resolve_alias(expr: str) -> str: + aliases = { + "@every_minute": "* * * * *", + "@hourly": "0 * * * *", + "@daily": "0 0 * * *", + "@weekly": "0 0 * * 0", + "@monthly": "0 0 1 * *", + "@yearly": "0 0 1 1 *", + } + return aliases.get(expr, expr) + + @staticmethod + def _parse_expression(expr: str) -> list[list[str]]: + parts = expr.strip().split() + if len(parts) != 5: + raise ValueError( + f"Invalid cron expression: {expr!r}. " + f"Expected 5 fields (minute hour day month weekday), got {len(parts)}." + ) + field_names = ["minute", "hour", "day_of_month", "month", "day_of_week"] + fields: list[list[str]] = [] + for name, part in zip(field_names, parts): + parsed = CronTrigger._parse_field(part, name) + fields.append(parsed) + return fields + + @staticmethod + def _parse_field(part: str, name: str) -> list[str]: + """Parse a single cron field into a list of valid values.""" + ranges = { + "minute": (0, 59), + "hour": (0, 23), + "day_of_month": (1, 31), + "month": (1, 12), + "day_of_week": (0, 6), + } + if name not in ranges: + raise ValueError(f"Unknown cron field: {name}") + lo, hi = ranges[name] + + values: set[int] = set() + for segment in part.split(","): + segment = segment.strip() + if not segment: + continue + + step = 1 + if "/" in segment: + segment, step_str = segment.split("/", 1) + step = int(step_str) + + if segment == "*": + values.update(range(lo, hi + 1, step)) + elif "-" in segment: + start_str, end_str = segment.split("-", 1) + start = int(start_str) + end = int(end_str) + values.update(range(start, end + 1, step)) + else: + values.add(int(segment)) + + return [str(v) for v in sorted(values)] + + def get_next_run(self, from_timestamp: float) -> float: + """Calculate the next datetime this cron expression fires at. + + Uses a simple minute-resolution iteration starting from ``from_timestamp``. + """ + import calendar + import time as time_module + from datetime import datetime, timedelta, timezone + + dt = datetime.fromtimestamp(from_timestamp, tz=timezone.utc) + + # Start from the next full minute + dt = dt.replace(second=0, microsecond=0) + timedelta(minutes=1) + + for _ in range(525600): # search up to 1 year ahead + minute_vals = self._fields[0] + hour_vals = self._fields[1] + day_vals = self._fields[2] + month_vals = self._fields[3] + weekday_vals = self._fields[4] + + month_match = str(dt.month) in month_vals + day_match = str(dt.day) in day_vals + weekday_match = str(dt.weekday()) in weekday_vals + hour_match = str(dt.hour) in hour_vals + minute_match = str(dt.minute) in minute_vals + + # day_of_week OR day_of_month match (standard cron behavior) + day_valid = day_match or weekday_match + + if month_match and day_valid and hour_match and minute_match: + return dt.timestamp() + + dt += timedelta(minutes=1) + + raise RuntimeError( + f"Could not find next run time for cron expression: {self.expr}" + ) + + +@dataclass +class DateTimeTrigger: + """Trigger that fires once at a specific datetime. + + Args: + run_date: ISO-8601 datetime string (e.g. ``"2026-12-25T10:30:00"``). + If no timezone is specified, UTC is assumed. + """ + + run_date: str + + def __post_init__(self) -> None: + # Validate on construction + self.get_run_timestamp() + + def get_run_timestamp(self) -> float: + """Get the target timestamp for this trigger.""" + from datetime import datetime, timezone + + # Try parsing with various formats + for fmt in [ + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%dT%H:%M:%S%z", + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%d", + ]: + try: + dt = datetime.strptime(self.run_date, fmt) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt.timestamp() + except ValueError: + continue + + raise ValueError( + f"Could not parse datetime: {self.run_date!r}. " + f"Expected ISO-8601 format (e.g. '2026-12-25T10:30:00')." + ) + + +@dataclass +class SchedulerConfig: + """Configuration for the scheduler. + + Attributes: + timezone: Timezone string (e.g. ``"UTC"``, ``"America/New_York"``). + If None, UTC is used. + max_concurrent_jobs: Maximum number of jobs that can run simultaneously. + log_level: Logging level for scheduler-related logs. + job_defaults: Default settings applied to every job. + Supported keys: + - ``max_instances`` (int): Max concurrent instances of the + same job. Default: 3. + - ``misfire_grace_time`` (int): Seconds after the scheduled + fire time that the job will still be accepted. Default: 30. + - ``coalesce`` (bool): If True, missed firings are merged + into one. Default: True. + """ + + timezone: Optional[str] = None + max_concurrent_jobs: int = 10 + log_level: int = logging.INFO + job_defaults: Dict[str, Any] = field( + default_factory=lambda: { + "max_instances": 3, + "misfire_grace_time": 30, + "coalesce": True, + } + ) + + def to_dict(self) -> Dict[str, Any]: + """Convert the configuration to a dictionary.""" + return { + "timezone": self.timezone, + "max_concurrent_jobs": self.max_concurrent_jobs, + "log_level": self.log_level, + "job_defaults": self.job_defaults.copy(), + } + + +# Default configuration singleton +DEFAULT_CONFIG = SchedulerConfig() diff --git a/nexios_contrib/scheduler/dependency.py b/nexios_contrib/scheduler/dependency.py new file mode 100644 index 0000000..97454c8 --- /dev/null +++ b/nexios_contrib/scheduler/dependency.py @@ -0,0 +1,83 @@ +""" +Dependency injection for the Nexios scheduler. + +Provides a ``SchedulerDepend`` class and a ``SchedulerDepends`` callable +that can be used as a route dependency in Nexios handlers. +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +from nexios.dependencies import Context, Depend +from nexios.http import Request + +from .config import CronTrigger, DateTimeTrigger, IntervalTrigger, JobStatus +from .manager import SchedulerManager +from .models import JobCallback, ScheduledJob + + +class SchedulerDepend: + """Injectable dependency that exposes scheduler operations. + + Use via ``SchedulerDepends()`` in your route handler signature. + """ + + def __init__(self, request: Request) -> None: + self.request = request + self.scheduler: SchedulerManager = request.base_app.scheduler + + def add_job( + self, + func: JobCallback, + trigger: IntervalTrigger | CronTrigger | DateTimeTrigger, + *, + name: Optional[str] = None, + args: Optional[tuple] = None, + kwargs: Optional[Dict[str, Any]] = None, + max_instances: Optional[int] = None, + misfire_grace_time: Optional[int] = None, + coalesce: Optional[bool] = None, + id: Optional[str] = None, + ) -> ScheduledJob: + """Register a new scheduled job from within a route handler.""" + return self.scheduler.add_job( + func=func, + trigger=trigger, + name=name, + args=args, + kwargs=kwargs, + max_instances=max_instances, + misfire_grace_time=misfire_grace_time, + coalesce=coalesce, + id=id, + ) + + def remove_job(self, job_id: str) -> bool: + """Remove a scheduled job.""" + return self.scheduler.remove_job(job_id) + + def get_job(self, job_id: str) -> Optional[ScheduledJob]: + """Get a scheduled job by id.""" + return self.scheduler.get_job(job_id) + + def get_jobs(self, status: Optional[JobStatus] = None) -> List[ScheduledJob]: + """List all scheduled jobs, optionally filtered by status.""" + return self.scheduler.get_jobs(status=status) + + def pause_job(self, job_id: str) -> bool: + """Pause a scheduled job.""" + return self.scheduler.pause_job(job_id) + + def resume_job(self, job_id: str) -> bool: + """Resume a paused job.""" + return self.scheduler.resume_job(job_id) + + +def _get_scheduler_depend(ctx: Context = Context()) -> SchedulerDepend: + """Factory used by the ``SchedulerDepends`` callable.""" + return SchedulerDepend(ctx.request) + + +def SchedulerDepends() -> SchedulerDepend: + return Depend(_get_scheduler_depend) diff --git a/nexios_contrib/scheduler/manager.py b/nexios_contrib/scheduler/manager.py new file mode 100644 index 0000000..15e672b --- /dev/null +++ b/nexios_contrib/scheduler/manager.py @@ -0,0 +1,282 @@ +""" +Scheduler manager for Nexios. + +This module provides the SchedulerManager class which is responsible for +managing scheduled jobs and their execution lifecycle in a Nexios application. +""" +from __future__ import annotations + +import asyncio +import logging +import time +from typing import Any, Dict, List, Optional + +from nexios import NexiosApp + +from .config import ( + CronTrigger, + DateTimeTrigger, + IntervalTrigger, + JobStatus, + SchedulerConfig, +) +from .models import JobCallback, ScheduledJob + + +class SchedulerManager: + """Manages scheduled jobs for Nexios applications. + + Handles job registration, scheduling, lifecycle management, and + coordination with the Nexios app lifecycle (startup/shutdown). + """ + + def __init__( + self, app: Optional[NexiosApp] = None, config: Optional[SchedulerConfig] = None + ) -> None: + self.app = app + self.config = config or SchedulerConfig() + self._jobs: Dict[str, ScheduledJob] = {} + self._running = False + self._ticker_task: Optional[asyncio.Task] = None + self._logger = logging.getLogger("nexios.scheduler") + + logging.basicConfig(level=self.config.log_level) + + # --- Lifecycle --- + + async def start(self) -> None: + """Start the scheduler. + + Registers shutdown hook if an app is attached and begins the + background ticker loop. + """ + if self._running: + self._logger.warning("Scheduler is already running") + return + + self._running = True + + if self.app is not None: + self.app.on_shutdown(self.shutdown) + + # Compute initial next run times for all active jobs + now = time.time() + for job in self._jobs.values(): + if job.status == JobStatus.ACTIVE: + job.compute_next_run(now) + + self._ticker_task = asyncio.create_task(self._ticker_loop()) + self._logger.info("Scheduler started with %d active job(s)", self._active_count) + + async def shutdown(self) -> None: + """Gracefully shut down the scheduler. + + Cancels the ticker loop and cancels all active jobs. + """ + if not self._running: + return + + self._running = False + self._logger.info("Shutting down scheduler...") + + if self._ticker_task and not self._ticker_task.done(): + self._ticker_task.cancel() + try: + await self._ticker_task + except asyncio.CancelledError: + pass + + # Cancel all active/paused jobs + for job in self._jobs.values(): + if job.status in (JobStatus.ACTIVE, JobStatus.PAUSED): + job.cancel() + + self._logger.info("Scheduler shutdown complete") + + # --- Job Management --- + + def add_job( + self, + func: JobCallback, + trigger: IntervalTrigger | CronTrigger | DateTimeTrigger, + *, + name: Optional[str] = None, + args: Optional[tuple] = None, + kwargs: Optional[Dict[str, Any]] = None, + max_instances: Optional[int] = None, + misfire_grace_time: Optional[int] = None, + coalesce: Optional[bool] = None, + id: Optional[str] = None, + ) -> ScheduledJob: + """Register a new scheduled job. + + Args: + func: Async callable to execute. + trigger: One of ``IntervalTrigger``, ``CronTrigger``, or + ``DateTimeTrigger``. + name: Human-readable name (defaults to ``func.__name__``). + args: Positional arguments passed to ``func``. + kwargs: Keyword arguments passed to ``func``. + max_instances: Override the default from config. + misfire_grace_time: Override the default from config. + coalesce: Override the default from config. + id: Explicit job ID (auto-generated if omitted). + + Returns: + The newly created ``ScheduledJob`` instance. + """ + job = ScheduledJob( + func=func, + trigger=trigger, + name=name, + args=args or (), + kwargs=kwargs or {}, + max_instances=max_instances or self.config.job_defaults.get("max_instances", 3), + misfire_grace_time=misfire_grace_time + or self.config.job_defaults.get("misfire_grace_time", 30), + coalesce=coalesce or self.config.job_defaults.get("coalesce", True), + id=id, + ) + + self._jobs[job.id] = job + + if self._running: + job.compute_next_run() + + self._logger.info( + "Added job %s (%s) with trigger %s", + job.id, + job.name, + type(trigger).__name__, + ) + return job + + def remove_job(self, job_id: str) -> bool: + """Remove a job from the scheduler. + + Returns: + True if the job was found and removed, False otherwise. + """ + job = self._jobs.pop(job_id, None) + if job is None: + return False + job.cancel() + self._logger.debug("Removed job %s (%s)", job_id, job.name) + return True + + def get_job(self, job_id: str) -> Optional[ScheduledJob]: + """Look up a job by its id.""" + return self._jobs.get(job_id) + + def get_jobs(self, status: Optional[JobStatus] = None) -> List[ScheduledJob]: + """List registered jobs, optionally filtered by status.""" + if status is None: + return list(self._jobs.values()) + return [j for j in self._jobs.values() if j.status == status] + + def pause_job(self, job_id: str) -> bool: + """Pause a job so it won't fire. + + Returns: + True if the job was found and paused. + """ + job = self._jobs.get(job_id) + if job is None: + return False + job.pause() + self._logger.debug("Paused job %s (%s)", job_id, job.name) + return True + + def resume_job(self, job_id: str) -> bool: + """Resume a paused job. + + Returns: + True if the job was found and resumed. + """ + job = self._jobs.get(job_id) + if job is None: + return False + job.resume() + job.compute_next_run() + self._logger.debug("Resumed job %s (%s)", job_id, job.name) + return True + + # --- Internal --- + + @property + def _active_count(self) -> int: + return sum( + 1 for j in self._jobs.values() if j.status == JobStatus.ACTIVE + ) + + async def _ticker_loop(self) -> None: + """Background loop that checks every second for due jobs.""" + self._logger.debug("Ticker loop started (tick=1s)") + + try: + while self._running: + now = time.time() + due = self._get_due_jobs(now) + + for job in due: + asyncio.ensure_future(self._execute_job(job)) + + await asyncio.sleep(1) + except asyncio.CancelledError: + self._logger.debug("Ticker loop cancelled") + except Exception: + self._logger.exception("Ticker loop crashed") + raise + + def _get_due_jobs(self, now: float) -> List[ScheduledJob]: + """Return all jobs that are due to run at ``now``.""" + due: List[ScheduledJob] = [] + + for job in self._jobs.values(): + if job.status != JobStatus.ACTIVE: + continue + if job.next_run_time is None: + continue + + if job.next_run_time <= now: + # Check misfire grace window + misfire = job.misfire_grace_time + if misfire >= 0 and now - job.next_run_time > misfire: + self._logger.warning( + "Job %s (%s) misfired (scheduled: %s, now: %s)", + job.id, + job.name, + job.next_run_time, + now, + ) + job.compute_next_run(now) + continue + + # Check max concurrent instances + if job.current_instances >= job.max_instances: + self._logger.debug( + "Job %s (%s) at max instances (%s), skipping", + job.id, + job.name, + job.max_instances, + ) + continue + + due.append(job) + job.compute_next_run(now) + + return due + + async def _execute_job(self, job: ScheduledJob) -> None: + """Execute a single job and handle errors.""" + try: + await job.run() + except asyncio.CancelledError: + raise + except Exception: + self._logger.exception("Job %s (%s) raised an exception", job.id, job.name) + finally: + # For one-shot DateTimeTrigger jobs, mark as completed + if isinstance(job.trigger, DateTimeTrigger): + job._status = JobStatus.COMPLETED # type: ignore[attr-defined] + self._logger.debug("One-shot job %s completed", job.id) diff --git a/nexios_contrib/scheduler/models.py b/nexios_contrib/scheduler/models.py new file mode 100644 index 0000000..7bec6ad --- /dev/null +++ b/nexios_contrib/scheduler/models.py @@ -0,0 +1,179 @@ +""" +Data models for the scheduler system. + +This module defines the Job class that represents a scheduled task +along with its trigger configuration and execution state. +""" +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass, field +from typing import Any, Awaitable, Callable, Dict, Optional, TypeVar +from uuid import uuid4 + +from .config import ( + CronTrigger, + DateTimeTrigger, + IntervalTrigger, + JobStatus, + TriggerType, +) + +T = TypeVar("T") +JobCallback = Callable[..., Awaitable[Any]] + + +class ScheduledJob: + """Represents a scheduled job. + + Encapsulates the callable, trigger, and runtime state of a job + managed by the scheduler. + """ + + def __init__( + self, + func: JobCallback, + trigger: IntervalTrigger | CronTrigger | DateTimeTrigger, + *, + name: Optional[str] = None, + args: Optional[tuple[Any, ...]] = None, + kwargs: Optional[Dict[str, Any]] = None, + max_instances: int = 3, + misfire_grace_time: int = 30, + coalesce: bool = True, + id: Optional[str] = None, + ) -> None: + self.id = id or str(uuid4()) + self.name = name or func.__name__ + self.func = func + self.trigger = trigger + self.args = args or () + self.kwargs = kwargs or {} + self.max_instances = max_instances + self.misfire_grace_time = misfire_grace_time + self.coalesce = coalesce + + self._status: JobStatus = JobStatus.ACTIVE + self._created_at = time.time() + self._last_run_time: Optional[float] = None + self._next_run_time: Optional[float] = None + self._current_instances: int = 0 + self._total_run_count: int = 0 + self._last_error: Optional[str] = None + self._logger = logging.getLogger("nexios.scheduler.job") + + # --- Properties --- + + @property + def status(self) -> JobStatus: + return self._status + + @property + def created_at(self) -> float: + return self._created_at + + @property + def last_run_time(self) -> Optional[float]: + return self._last_run_time + + @property + def next_run_time(self) -> Optional[float]: + return self._next_run_time + + @property + def total_run_count(self) -> int: + return self._total_run_count + + @property + def last_error(self) -> Optional[str]: + return self._last_error + + @property + def current_instances(self) -> int: + return self._current_instances + + # --- Public API --- + + def compute_next_run(self, from_timestamp: Optional[float] = None) -> None: + """Calculate and store the next scheduled run time.""" + from_ts = from_timestamp if from_timestamp is not None else time.time() + + if isinstance(self.trigger, IntervalTrigger): + if self._last_run_time is None and self.trigger.start_now: + self._next_run_time = from_ts + else: + last = self._last_run_time or from_ts + self._next_run_time = last + self.trigger.as_seconds() + + elif isinstance(self.trigger, CronTrigger): + self._next_run_time = self.trigger.get_next_run(from_ts) + + elif isinstance(self.trigger, DateTimeTrigger): + self._next_run_time = self.trigger.get_run_timestamp() + + async def run(self) -> Any: + """Execute the job function and return its result.""" + if self._status == JobStatus.CANCELLED: + raise RuntimeError(f"Job {self.id} has been cancelled") + + self._current_instances += 1 + self._total_run_count += 1 + self._last_run_time = time.time() + self._status = JobStatus.ACTIVE + + try: + result = await self.func(*self.args, **self.kwargs) + return result + except Exception as exc: + self._last_error = str(exc) + self._status = JobStatus.FAILED + self._logger.exception( + "Job %s (%s) failed: %s", self.id, self.name, exc + ) + raise + finally: + self._current_instances -= 1 + + def pause(self) -> None: + """Pause this job. It will not fire until resumed.""" + self._status = JobStatus.PAUSED + + def resume(self) -> None: + """Resume a paused job.""" + self._status = JobStatus.ACTIVE + + def cancel(self) -> None: + """Cancel this job permanently.""" + self._status = JobStatus.CANCELLED + self._next_run_time = None + + def to_dict(self) -> Dict[str, Any]: + """Serialize the job state to a dictionary.""" + trigger_type = TriggerType.INTERVAL + trigger_repr = "" + if isinstance(self.trigger, IntervalTrigger): + trigger_type = TriggerType.INTERVAL + trigger_repr = f"{self.trigger.as_seconds()}s" + elif isinstance(self.trigger, CronTrigger): + trigger_type = TriggerType.CRON + trigger_repr = self.trigger.expr + elif isinstance(self.trigger, DateTimeTrigger): + trigger_type = TriggerType.DATETIME + trigger_repr = self.trigger.run_date + + return { + "id": self.id, + "name": self.name, + "status": self._status.value, + "trigger_type": trigger_type.value, + "trigger": trigger_repr, + "total_run_count": self._total_run_count, + "current_instances": self._current_instances, + "created_at": self._created_at, + "last_run_time": self._last_run_time, + "next_run_time": self._next_run_time, + "last_error": self._last_error, + "max_instances": self.max_instances, + "coalesce": self.coalesce, + } diff --git a/tests/test_scheduler/__init__.py b/tests/test_scheduler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_scheduler/test_config.py b/tests/test_scheduler/test_config.py new file mode 100644 index 0000000..f9d347b --- /dev/null +++ b/tests/test_scheduler/test_config.py @@ -0,0 +1,161 @@ +""" +Tests for the scheduler configuration module. +""" +import time +from datetime import datetime, timezone + +import pytest + +from nexios_contrib.scheduler.config import ( + CronTrigger, + DateTimeTrigger, + IntervalTrigger, + JobStatus, + SchedulerConfig, + TriggerType, +) + + +class TestIntervalTrigger: + def test_as_seconds(self): + t = IntervalTrigger(seconds=30) + assert t.as_seconds() == 30 + + t = IntervalTrigger(minutes=2, seconds=15) + assert t.as_seconds() == 135 + + t = IntervalTrigger(hours=1, minutes=30) + assert t.as_seconds() == 5400 + + t = IntervalTrigger(days=1, hours=2) + assert t.as_seconds() == 93600 + + def test_zero_total_raises(self): + with pytest.raises(ValueError, match="greater than 0"): + IntervalTrigger(seconds=0, minutes=0) + + def test_negative_raises(self): + with pytest.raises(ValueError, match="non-negative"): + IntervalTrigger(seconds=-5) + + +class TestCronTrigger: + def test_every_minute(self): + t = CronTrigger("* * * * *") + now = time.time() + next_run = t.get_next_run(now) + assert next_run > now + # Should be at most 60 seconds from now + assert next_run - now <= 70 + + def test_hourly(self): + t = CronTrigger("0 * * * *") + now = time.time() + next_run = t.get_next_run(now) + assert next_run > now + + dt = datetime.fromtimestamp(next_run, tz=timezone.utc) + assert dt.minute == 0 + + def test_daily_at_midnight(self): + t = CronTrigger("0 0 * * *") + now = time.time() + next_run = t.get_next_run(now) + dt = datetime.fromtimestamp(next_run, tz=timezone.utc) + assert dt.hour == 0 + assert dt.minute == 0 + + def test_invalid_expression(self): + with pytest.raises(ValueError, match="5 fields"): + CronTrigger("* * *") + + with pytest.raises(ValueError, match="5 fields"): + CronTrigger("* * * * * *") + + def test_alias_every_minute(self): + t = CronTrigger("@every_minute") + assert t._fields is not None + + def test_hourly_alias(self): + t = CronTrigger("@hourly") + now = time.time() + next_run = t.get_next_run(now) + dt = datetime.fromtimestamp(next_run, tz=timezone.utc) + assert dt.minute == 0 + + def test_daily_alias(self): + t = CronTrigger("@daily") + now = time.time() + next_run = t.get_next_run(now) + dt = datetime.fromtimestamp(next_run, tz=timezone.utc) + assert dt.hour == 0 + assert dt.minute == 0 + + def test_range_expression(self): + t = CronTrigger("0 9-17 * * *") + now = time.time() + next_run = t.get_next_run(now) + dt = datetime.fromtimestamp(next_run, tz=timezone.utc) + assert 9 <= dt.hour <= 17 + assert dt.minute == 0 + + def test_step_values(self): + t = CronTrigger("*/15 * * * *") + now = time.time() + next_run = t.get_next_run(now) + dt = datetime.fromtimestamp(next_run, tz=timezone.utc) + assert dt.minute % 15 == 0 + + def test_list_values(self): + t = CronTrigger("30 9,12,15 * * *") + now = time.time() + next_run = t.get_next_run(now) + dt = datetime.fromtimestamp(next_run, tz=timezone.utc) + assert dt.minute == 30 + assert dt.hour in (9, 12, 15) + + +class TestDateTimeTrigger: + def test_future_datetime(self): + t = DateTimeTrigger("2030-01-01T00:00:00") + ts = t.get_run_timestamp() + assert ts > time.time() + + def test_past_datetime(self): + t = DateTimeTrigger("2020-01-01T00:00:00") + ts = t.get_run_timestamp() + assert ts < time.time() + + def test_parses_date_only(self): + t = DateTimeTrigger("2030-06-15") + ts = t.get_run_timestamp() + dt = datetime.fromtimestamp(ts, tz=timezone.utc) + assert dt.month == 6 + assert dt.day == 15 + + def test_invalid_format(self): + with pytest.raises(ValueError, match="parse"): + DateTimeTrigger("not-a-date") + + +class TestJobStatus: + def test_values(self): + assert JobStatus.ACTIVE.value == "ACTIVE" + assert JobStatus.PAUSED.value == "PAUSED" + assert JobStatus.COMPLETED.value == "COMPLETED" + assert JobStatus.FAILED.value == "FAILED" + assert JobStatus.CANCELLED.value == "CANCELLED" + + +class TestSchedulerConfig: + def test_defaults(self): + config = SchedulerConfig() + assert config.max_concurrent_jobs == 10 + assert config.log_level is not None + assert config.job_defaults["max_instances"] == 3 + + def test_to_dict(self): + config = SchedulerConfig(timezone="UTC") + d = config.to_dict() + assert d["timezone"] == "UTC" + assert d["max_concurrent_jobs"] == 10 diff --git a/tests/test_scheduler/test_manager.py b/tests/test_scheduler/test_manager.py new file mode 100644 index 0000000..a6cea4b --- /dev/null +++ b/tests/test_scheduler/test_manager.py @@ -0,0 +1,294 @@ +""" +Tests for the SchedulerManager class. +""" +import asyncio + +import pytest + +from nexios import NexiosApp + +from nexios_contrib.scheduler.config import ( + CronTrigger, + DateTimeTrigger, + IntervalTrigger, + JobStatus, + SchedulerConfig, +) +from nexios_contrib.scheduler.manager import SchedulerManager + + +@pytest.fixture +def app(): + return NexiosApp() + + +@pytest.fixture +def scheduler(app): + return SchedulerManager(app) + + +@pytest.fixture +async def running_scheduler(scheduler): + await scheduler.start() + yield scheduler + await scheduler.shutdown() + + +class TestSchedulerManager: + def test_initial_state(self, scheduler): + assert scheduler._running is False + assert len(scheduler._jobs) == 0 + + def test_add_interval_job(self, scheduler): + async def my_task(): + pass + + job = scheduler.add_job(my_task, IntervalTrigger(seconds=60)) + assert job is not None + assert job.id in scheduler._jobs + assert len(scheduler._jobs) == 1 + + def test_add_cron_job(self, scheduler): + async def my_task(): + pass + + job = scheduler.add_job(my_task, CronTrigger("* * * * *")) + assert job is not None + assert job.id in scheduler._jobs + + def test_add_datetime_job(self, scheduler): + async def my_task(): + pass + + job = scheduler.add_job(my_task, DateTimeTrigger("2030-01-01T00:00:00")) + assert job is not None + assert job.id in scheduler._jobs + + def test_remove_job(self, scheduler): + async def my_task(): + pass + + job = scheduler.add_job(my_task, IntervalTrigger(seconds=60)) + assert len(scheduler._jobs) == 1 + + result = scheduler.remove_job(job.id) + assert result is True + assert len(scheduler._jobs) == 0 + + def test_remove_nonexistent_job(self, scheduler): + result = scheduler.remove_job("nonexistent") + assert result is False + + def test_get_job(self, scheduler): + async def my_task(): + pass + + job = scheduler.add_job(my_task, IntervalTrigger(seconds=60)) + retrieved = scheduler.get_job(job.id) + assert retrieved is job + + def test_get_nonexistent_job(self, scheduler): + assert scheduler.get_job("nonexistent") is None + + def test_get_jobs(self, scheduler): + async def task_a(): + pass + + async def task_b(): + pass + + scheduler.add_job(task_a, IntervalTrigger(seconds=60)) + scheduler.add_job(task_b, CronTrigger("* * * * *")) + + all_jobs = scheduler.get_jobs() + assert len(all_jobs) == 2 + + def test_get_jobs_filtered(self, scheduler): + async def task_a(): + pass + + async def task_b(): + pass + + job_a = scheduler.add_job(task_a, IntervalTrigger(seconds=60)) + scheduler.add_job(task_b, CronTrigger("* * * * *")) + + job_a.pause() + active = scheduler.get_jobs(status=JobStatus.ACTIVE) + paused = scheduler.get_jobs(status=JobStatus.PAUSED) + + assert len(active) == 1 + assert len(paused) == 1 + + def test_pause_and_resume_job(self, scheduler): + async def my_task(): + pass + + job = scheduler.add_job(my_task, IntervalTrigger(seconds=60)) + + paused = scheduler.pause_job(job.id) + assert paused is True + assert job.status == JobStatus.PAUSED + + resumed = scheduler.resume_job(job.id) + assert resumed is True + assert job.status == JobStatus.ACTIVE + + def test_pause_nonexistent(self, scheduler): + assert scheduler.pause_job("nonexistent") is False + + def test_resume_nonexistent(self, scheduler): + assert scheduler.resume_job("nonexistent") is False + + @pytest.mark.asyncio + async def test_start_and_shutdown(self, scheduler): + async def my_task(): + await asyncio.sleep(0.1) + return "done" + + scheduler.add_job(my_task, IntervalTrigger(seconds=60)) + assert scheduler._running is False + + await scheduler.start() + assert scheduler._running is True + + await scheduler.shutdown() + assert scheduler._running is False + + @pytest.mark.asyncio + async def test_double_start(self, running_scheduler): + await running_scheduler.start() + assert running_scheduler._running is True + + @pytest.mark.asyncio + async def test_job_execution_via_ticker(self): + app = NexiosApp() + scheduler = SchedulerManager(app, SchedulerConfig()) + + executed = False + + async def my_task(): + nonlocal executed + executed = True + + scheduler.add_job(my_task, IntervalTrigger(seconds=1, start_now=True)) + + await scheduler.start() + await asyncio.sleep(1.5) + await scheduler.shutdown() + + assert executed, "Job should have been executed by the ticker" + + @pytest.mark.asyncio + async def test_cron_job_schedules_next_run(self): + app = NexiosApp() + scheduler = SchedulerManager(app) + + async def my_task(): + pass + + job = scheduler.add_job(my_task, CronTrigger("* * * * *")) + + await scheduler.start() + job.compute_next_run() + assert job.next_run_time is not None + assert job.next_run_time > 0 + + await scheduler.shutdown() + + @pytest.mark.asyncio + async def test_datetime_job_completes(self): + app = NexiosApp() + scheduler = SchedulerManager(app) + + executed = False + + async def my_task(): + nonlocal executed + executed = True + + scheduler.add_job(my_task, DateTimeTrigger("2030-01-01T00:00:00")) + + await scheduler.start() + await asyncio.sleep(0.5) + await scheduler.shutdown() + + # DateTimeTrigger is in the future, should NOT have executed + assert executed is False + + @pytest.mark.asyncio + async def test_job_failure_logged(self): + app = NexiosApp() + scheduler = SchedulerManager(app) + + async def failing_task(): + raise RuntimeError("job failed") + + job = scheduler.add_job(failing_task, IntervalTrigger(seconds=1, start_now=True)) + + await scheduler.start() + await asyncio.sleep(1.5) + await scheduler.shutdown() + + assert job.last_error == "job failed" + assert job.status == JobStatus.FAILED + + @pytest.mark.asyncio + async def test_max_instances_respected(self): + app = NexiosApp() + scheduler = SchedulerManager(app) + + running = asyncio.Event() + proceed = asyncio.Event() + call_count = 0 + + async def slow_task(): + nonlocal call_count + call_count += 1 + running.set() + await proceed.wait() + + job = scheduler.add_job( + slow_task, + IntervalTrigger(seconds=1, start_now=True), + max_instances=1, + ) + + await scheduler.start() + await running.wait() + await asyncio.sleep(0.1) + + # Force a tick: next_run_time should be set to now + old_next = job.next_run_time + job._next_run_time = 0 # type: ignore[attr-defined] + await asyncio.sleep(1.5) + + proceed.set() + await asyncio.sleep(0.2) + await scheduler.shutdown() + + # Only 1 instance should have run (max_instances=1 prevented the second) + assert call_count == 1 + + @pytest.mark.asyncio + async def test_multiple_jobs(self): + app = NexiosApp() + scheduler = SchedulerManager(app) + + results = [] + + async def task_a(): + results.append("A") + + async def task_b(): + results.append("B") + + scheduler.add_job(task_a, IntervalTrigger(seconds=1, start_now=True)) + scheduler.add_job(task_b, IntervalTrigger(seconds=1, start_now=True)) + + await scheduler.start() + await asyncio.sleep(1.5) + await scheduler.shutdown() + + assert "A" in results + assert "B" in results diff --git a/tests/test_scheduler/test_models.py b/tests/test_scheduler/test_models.py new file mode 100644 index 0000000..d54e4ae --- /dev/null +++ b/tests/test_scheduler/test_models.py @@ -0,0 +1,109 @@ +""" +Tests for the scheduler models module. +""" +import time + +import pytest + +from nexios_contrib.scheduler.config import ( + CronTrigger, + DateTimeTrigger, + IntervalTrigger, + JobStatus, +) +from nexios_contrib.scheduler.models import ScheduledJob + + +@pytest.fixture +def sample_job(): + async def my_task(): + return 42 + + return ScheduledJob( + func=my_task, + trigger=IntervalTrigger(seconds=60), + name="test-job", + ) + + +class TestScheduledJob: + def test_initial_state(self, sample_job): + assert sample_job.status == JobStatus.ACTIVE + assert sample_job.total_run_count == 0 + assert sample_job.current_instances == 0 + assert sample_job.last_error is None + assert sample_job.next_run_time is None + assert sample_job.last_run_time is None + + def test_compute_next_run_interval(self, sample_job): + sample_job.compute_next_run() + assert sample_job.next_run_time is not None + assert sample_job.next_run_time > 0 + + def test_compute_next_run_cron(self): + async def task(): + pass + + job = ScheduledJob( + func=task, + trigger=CronTrigger("*/5 * * * *"), + ) + job.compute_next_run() + assert job.next_run_time is not None + assert job.next_run_time > time.time() + + def test_compute_next_run_datetime(self): + async def task(): + pass + + job = ScheduledJob( + func=task, + trigger=DateTimeTrigger("2030-01-01T00:00:00"), + ) + job.compute_next_run() + assert job.next_run_time is not None + assert job.next_run_time > time.time() + + def test_pause_and_resume(self, sample_job): + sample_job.pause() + assert sample_job.status == JobStatus.PAUSED + + sample_job.resume() + assert sample_job.status == JobStatus.ACTIVE + + def test_cancel(self, sample_job): + sample_job.cancel() + assert sample_job.status == JobStatus.CANCELLED + assert sample_job.next_run_time is None + + @pytest.mark.asyncio + async def test_run(self, sample_job): + result = await sample_job.run() + assert result == 42 + assert sample_job.total_run_count == 1 + assert sample_job.last_run_time is not None + + @pytest.mark.asyncio + async def test_run_failure(self): + async def failing_task(): + raise ValueError("boom") + + job = ScheduledJob( + func=failing_task, + trigger=IntervalTrigger(seconds=60), + ) + + with pytest.raises(ValueError, match="boom"): + await job.run() + + assert job.last_error == "boom" + assert job.status == JobStatus.FAILED + + def test_to_dict(self, sample_job): + d = sample_job.to_dict() + assert d["id"] == sample_job.id + assert d["name"] == "test-job" + assert d["status"] == JobStatus.ACTIVE.value + assert d["trigger_type"] == "INTERVAL" + assert "trigger" in d + assert d["total_run_count"] == 0