diff --git a/celerybeatmongo/models.py b/celerybeatmongo/models.py index 197f1f0..83a9e95 100644 --- a/celerybeatmongo/models.py +++ b/celerybeatmongo/models.py @@ -6,10 +6,13 @@ from datetime import datetime, timedelta -from mongoengine import * from celery import current_app import celery.schedules +from mongoengine.document import DynamicDocument, EmbeddedDocument +from mongoengine.errors import ValidationError +from mongoengine.fields import BooleanField, DateTimeField, DictField, EmbeddedDocumentField, IntField, ListField, StringField + def get_periodic_task_collection(): if hasattr(current_app.conf, "mongodb_scheduler_collection"): @@ -118,7 +121,7 @@ def save(self, force_insert=False, validate=True, clean=True, if not self.date_creation: self.date_creation = datetime.now() self.date_changed = datetime.now() - super(PeriodicTask, self).save(force_insert, validate, clean, + return super(PeriodicTask, self).save(force_insert, validate, clean, write_concern, cascade, cascade_kwargs, _refs, save_condition, signal_kwargs, **kwargs) diff --git a/celerybeatmongo/schedulers.py b/celerybeatmongo/schedulers.py index 14bcd5b..6518930 100644 --- a/celerybeatmongo/schedulers.py +++ b/celerybeatmongo/schedulers.py @@ -18,12 +18,47 @@ logger = get_logger(__name__) +def connect_mongo(app): + alias = get_alias(app) + host = get_host(app) + db = get_db(app) + + return mongoengine.connect(db, host=host, alias=alias) + +def get_host(app): + if hasattr(app.conf, "mongodb_scheduler_url"): + host = app.conf.get('mongodb_scheduler_url') + elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_URL"): + host = app.conf.CELERY_MONGODB_SCHEDULER_URL + else: + host = None + return host + +def get_db(app): + if hasattr(app.conf, "mongodb_scheduler_db"): + db = app.conf.get("mongodb_scheduler_db") + elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_DB"): + db = app.conf.CELERY_MONGODB_SCHEDULER_DB + else: + db = "celery" + return db + +def get_alias(app): + if hasattr(app.conf, "mongodb_scheduler_connection_alias"): + alias = app.conf.get('mongodb_scheduler_connection_alias') + elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_CONNECTION_ALIAS"): + alias = app.conf.CELERY_MONGODB_SCHEDULER_CONNECTION_ALIAS + else: + alias = mongoengine.DEFAULT_CONNECTION_NAME + return alias + + class MongoScheduleEntry(ScheduleEntry): - def __init__(self, task): + def __init__(self, task, app=None): self._task = task - self.app = current_app._get_current_object() + self.app = current_app._get_current_object() if app is None else app self.name = self._task.name self.task = self._task.task @@ -46,6 +81,7 @@ def __init__(self, task): if not self._task.last_run_at: self._task.last_run_at = self._default_now() self.last_run_at = self._task.last_run_at + self._mongo = connect_mongo(self.app) def _default_now(self): return self.app.now() @@ -94,11 +130,10 @@ def save(self): self._task.last_run_at = self.last_run_at self._task.run_immediately = False try: - self._task.save(save_condition={}) - except Exception: + return self._task.save(save_condition={}) + except Exception as ex: logger.error(traceback.format_exc()) - class MongoScheduler(Scheduler): #: how often should we sync in schedule information @@ -110,27 +145,10 @@ class MongoScheduler(Scheduler): Model = PeriodicTask def __init__(self, app, *args, **kwargs): - if hasattr(app.conf, "mongodb_scheduler_db"): - db = app.conf.get("mongodb_scheduler_db") - elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_DB"): - db = app.conf.CELERY_MONGODB_SCHEDULER_DB - else: - db = "celery" - if hasattr(app.conf, "mongodb_scheduler_connection_alias"): - alias = app.conf.get('mongodb_scheduler_connection_alias') - elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_CONNECTION_ALIAS"): - alias = app.conf.CELERY_MONGODB_SCHEDULER_CONNECTION_ALIAS - else: - alias = "default" - - if hasattr(app.conf, "mongodb_scheduler_url"): - host = app.conf.get('mongodb_scheduler_url') - elif hasattr(app.conf, "CELERY_MONGODB_SCHEDULER_URL"): - host = app.conf.CELERY_MONGODB_SCHEDULER_URL - else: - host = None + host = get_host(app) + db = get_db(app) - self._mongo = mongoengine.connect(db, host=host, alias=alias) + self._mongo = connect_mongo(app) if host: logger.info("backend scheduler using %s/%s:%s", @@ -138,6 +156,7 @@ def __init__(self, app, *args, **kwargs): else: logger.info("backend scheduler using %s/%s:%s", "mongodb://localhost", db, self.Model._get_collection().name) + self._schedule = {} self._last_updated = None Scheduler.__init__(self, app, *args, **kwargs)