-
Notifications
You must be signed in to change notification settings - Fork 6
[unstable] Add dry run option #457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 4 commits
568da40
2dc960b
385fdb8
196ad84
4edb09c
d7dccbb
1471200
b7bc176
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,11 +52,12 @@ def my_task_function(self, task_context: TaskContext) -> None: | |
| from multiprocessing import Queue | ||
| from threading import RLock, Thread | ||
| from types import TracebackType | ||
| from typing import Generic, Literal, TypeVar | ||
| from typing import Any, Generic, Literal, TypeVar | ||
|
|
||
| from humps import pascalize | ||
| from typing_extensions import Self, assert_never | ||
|
|
||
| from cognite.client import CogniteClient | ||
| from cognite.extractorutils._inner_util import _resolve_log_level | ||
| from cognite.extractorutils.threading import CancellationToken | ||
| from cognite.extractorutils.unstable.configuration.models import ( | ||
|
|
@@ -84,6 +85,37 @@ def my_task_function(self, task_context: TaskContext) -> None: | |
| _T = TypeVar("_T", bound=ExtractorConfig) | ||
|
|
||
|
|
||
| class _NoOpCogniteClient: | ||
| """A mock CogniteClient that performs no actions, for use in dry-run mode.""" | ||
|
|
||
| class _MockResponse: | ||
| def __init__(self, url: str) -> None: | ||
| self._url = url | ||
|
|
||
| def json(self) -> dict: | ||
| if "integrations/checkin" in self._url: | ||
| return {"lastConfigRevision": None} | ||
| return {} | ||
|
|
||
| def __init__(self, config: ConnectionConfig | None, client_name: str) -> None: | ||
| class MockSDKConfig: | ||
| def __init__(self, project: str) -> None: | ||
| self.project = project | ||
|
|
||
| project_name = config.project if config else "dry-run-no-config" | ||
| self.config = MockSDKConfig(project_name) | ||
| self._logger = logging.getLogger(__name__) | ||
| self._logger.info(f"CogniteClient is in no-op mode (dry-run). Client name: {client_name}") | ||
|
|
||
| def post(self, url: str, json: dict, **kwargs: dict[str, Any]) -> _MockResponse: | ||
| self._logger.info(f"[DRY-RUN] SKIPPED POST to {url} with payload: {json}") | ||
| return self._MockResponse(url) | ||
|
|
||
| def get(self, url: str, **kwargs: dict[str, Any]) -> _MockResponse: | ||
| self._logger.info(f"[DRY-RUN] SKIPPED GET from {url}") | ||
| return self._MockResponse(url) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is all good, but I'd also like for us to be able to report what we're sending to the cdf. I believe the idea of a dry run especially with a combination of both dry run and debug, is for the user to know what the extractor is doing at every step.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed the mock response accordingly to the application config being fetched from api or not. |
||
|
|
||
|
|
||
| class FullConfig(Generic[_T]): | ||
| """ | ||
| A class that holds the full configuration for an extractor. | ||
|
|
@@ -94,15 +126,17 @@ class FullConfig(Generic[_T]): | |
|
|
||
| def __init__( | ||
| self, | ||
| connection_config: ConnectionConfig, | ||
| connection_config: ConnectionConfig | None, | ||
| application_config: _T, | ||
| current_config_revision: ConfigRevision, | ||
| log_level_override: str | None = None, | ||
| is_dry_run: bool = False, | ||
| ) -> None: | ||
| self.connection_config = connection_config | ||
| self.application_config = application_config | ||
| self.current_config_revision = current_config_revision | ||
| self.log_level_override = log_level_override | ||
| self.is_dry_run = is_dry_run | ||
|
|
||
|
|
||
| class Extractor(Generic[ConfigType], CogniteLogger): | ||
|
|
@@ -124,8 +158,14 @@ class Extractor(Generic[ConfigType], CogniteLogger): | |
| CONFIG_TYPE: type[ConfigType] | ||
|
|
||
| RESTART_POLICY: RestartPolicy = WHEN_CONTINUOUS_TASKS_CRASHES | ||
| SUPPORTS_DRY_RUN: bool = False | ||
| cognite_client: _NoOpCogniteClient | CogniteClient | ||
|
|
||
| def __init__(self, config: FullConfig[ConfigType]) -> None: | ||
| self.is_dry_run = config.is_dry_run | ||
| if self.is_dry_run and not self.SUPPORTS_DRY_RUN: | ||
| raise NotImplementedError(f"Extractor '{self.NAME}' does not support dry-run mode.") | ||
|
|
||
| self._logger = logging.getLogger(f"{self.EXTERNAL_ID}.main") | ||
|
|
||
| self.cancellation_token = CancellationToken() | ||
|
|
@@ -136,7 +176,12 @@ def __init__(self, config: FullConfig[ConfigType]) -> None: | |
| self.current_config_revision = config.current_config_revision | ||
| self.log_level_override = config.log_level_override | ||
|
|
||
| self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}") | ||
| if self.is_dry_run: | ||
| self.cognite_client = _NoOpCogniteClient(self.connection_config, f"{self.EXTERNAL_ID}-{self.VERSION}") | ||
| elif self.connection_config: | ||
| self.cognite_client = self.connection_config.get_cognite_client(f"{self.EXTERNAL_ID}-{self.VERSION}") | ||
| else: | ||
| raise ValueError("Connection config is missing and not in dry-run mode.") | ||
|
|
||
| self._checkin_lock = RLock() | ||
| self._runtime_messages: Queue[RuntimeMessage] | None = None | ||
|
|
@@ -219,6 +264,9 @@ def _set_runtime_message_queue(self, queue: Queue) -> None: | |
| self._runtime_messages = queue | ||
|
|
||
| def _checkin(self) -> None: | ||
| if not self.connection_config: | ||
| return | ||
|
|
||
| with self._checkin_lock: | ||
| task_updates = [t.model_dump() for t in self._task_updates] | ||
| self._task_updates.clear() | ||
|
|
@@ -358,6 +406,9 @@ def run_task(task_context: TaskContext) -> None: | |
| ) | ||
|
|
||
| def _report_extractor_info(self) -> None: | ||
| if not self.connection_config: | ||
| return | ||
|
|
||
| self.cognite_client.post( | ||
| f"/api/v1/projects/{self.cognite_client.config.project}/integrations/extractorinfo", | ||
| json={ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -101,7 +101,7 @@ def _create_argparser(self) -> ArgumentParser: | |
| "--connection-config", | ||
| nargs=1, | ||
| type=Path, | ||
| required=True, | ||
|
devendra-lohar marked this conversation as resolved.
|
||
| required=False, | ||
| help="Connection parameters", | ||
| ) | ||
| argparser.add_argument( | ||
|
|
@@ -134,6 +134,11 @@ def _create_argparser(self) -> ArgumentParser: | |
| required=False, | ||
| help="Set the current working directory for the extractor.", | ||
| ) | ||
| argparser.add_argument( | ||
| "--dry-run", | ||
| action="store_true", | ||
| help="Run without writing to CDF. The extractor must support this feature for this to work.", | ||
| ) | ||
|
|
||
| return argparser | ||
|
|
||
|
|
@@ -167,6 +172,9 @@ def _inner_run( | |
| with extractor: | ||
| extractor.run() | ||
|
|
||
| except NotImplementedError as e: | ||
| logging.getLogger(__name__).critical(f"Configuration error: {e}") | ||
|
|
||
| except Exception: | ||
| self.logger.exception("Extractor crashed, will attempt restart") | ||
| message_queue.put(RuntimeMessage.RESTART) | ||
|
|
@@ -188,7 +196,7 @@ def _spawn_extractor( | |
| def _try_get_application_config( | ||
| self, | ||
| args: Namespace, | ||
| connection_config: ConnectionConfig, | ||
| connection_config: ConnectionConfig | None, | ||
| ) -> tuple[ExtractorConfig, ConfigRevision]: | ||
| current_config_revision: ConfigRevision | ||
|
|
||
|
|
@@ -208,11 +216,12 @@ def _try_get_application_config( | |
| else: | ||
| self.logger.info("Loading application config from CDF") | ||
|
|
||
| application_config, current_config_revision = load_from_cdf( | ||
| self._cognite_client, | ||
| connection_config.integration.external_id, | ||
| self._extractor_class.CONFIG_TYPE, | ||
| ) | ||
| if connection_config: | ||
| application_config, current_config_revision = load_from_cdf( | ||
| self._cognite_client, | ||
| connection_config.integration.external_id, | ||
| self._extractor_class.CONFIG_TYPE, | ||
| ) | ||
|
|
||
| return application_config, current_config_revision | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should probably try to use pyright as your lsp because this is very unsafe. You could result in a situation where you get
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed this to fix UnboundLocalError |
||
|
|
||
|
|
@@ -230,8 +239,15 @@ def _try_change_cwd(self, cwd: Path | None) -> None: | |
| def _safe_get_application_config( | ||
| self, | ||
| args: Namespace, | ||
| connection_config: ConnectionConfig, | ||
| connection_config: ConnectionConfig | None, | ||
| ) -> tuple[ExtractorConfig, ConfigRevision] | None: | ||
| if args.dry_run and not args.force_local_config: | ||
| self.logger.warning( | ||
| "Running in dry-run mode without a local application config file (-f). " | ||
| "The extractor will not perform any actions." | ||
| ) | ||
| return None | ||
|
|
||
| prev_error: str | None = None | ||
|
|
||
| while not self._cancellation_token.is_cancelled: | ||
|
|
@@ -257,23 +273,28 @@ def _safe_get_application_config( | |
| task=None, | ||
| ) | ||
|
|
||
| self._cognite_client.post( | ||
| f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin", | ||
| json={ | ||
| "externalId": connection_config.integration.external_id, | ||
| "errors": [error.model_dump()], | ||
| }, | ||
| headers={"cdf-version": "alpha"}, | ||
| ) | ||
| if connection_config: | ||
| self._cognite_client.post( | ||
| f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin", | ||
| json={ | ||
| "externalId": connection_config.integration.external_id, | ||
| "errors": [error.model_dump()], | ||
| }, | ||
| headers={"cdf-version": "alpha"}, | ||
| ) | ||
|
|
||
| self._cancellation_token.wait(randint(1, self.RETRY_CONFIG_INTERVAL)) | ||
|
|
||
| return None | ||
|
|
||
| def _verify_connection_config(self, connection_config: ConnectionConfig) -> bool: | ||
| def _verify_connection_config(self, connection_config: ConnectionConfig | None) -> bool: | ||
| if connection_config is None: | ||
| return False | ||
|
|
||
| self._cognite_client = connection_config.get_cognite_client( | ||
| f"{self._extractor_class.EXTERNAL_ID}-{self._extractor_class.VERSION}" | ||
| ) | ||
|
|
||
| try: | ||
| self._cognite_client.post( | ||
| f"/api/v1/projects/{self._cognite_client.config.project}/odin/checkin", | ||
|
|
@@ -333,13 +354,26 @@ def run(self) -> None: | |
|
|
||
| try: | ||
| self._try_change_cwd(args.cwd[0]) | ||
| connection_config = load_file(args.connection_config[0], ConnectionConfig) | ||
|
|
||
| if args.dry_run: | ||
| self.logger.info("Running in dry-run mode. No data will be written to CDF.") | ||
|
|
||
| connection_config = ( | ||
| load_file(args.connection_config[0], ConnectionConfig) if args.connection_config else None | ||
| ) | ||
| else: | ||
| if not args.connection_config: | ||
| self.logger.critical("Connection config file is required when not in dry-run mode.") | ||
| sys.exit(1) | ||
|
|
||
| connection_config = load_file(args.connection_config[0], ConnectionConfig) | ||
|
|
||
| except InvalidConfigError as e: | ||
| self.logger.error(str(e)) | ||
| self.logger.critical("Could not load connection config") | ||
| sys.exit(1) | ||
|
|
||
| if not args.skip_init_checks and not self._verify_connection_config(connection_config): | ||
| if not args.dry_run and not args.skip_init_checks and not self._verify_connection_config(connection_config): | ||
| sys.exit(1) | ||
|
|
||
| # This has to be Any. We don't know the type of the extractors' config at type checking since the self doesn't | ||
|
|
@@ -363,6 +397,7 @@ def run(self) -> None: | |
| application_config=application_config, | ||
| current_config_revision=current_config_revision, | ||
| log_level_override=args.log_level, | ||
| is_dry_run=args.dry_run, | ||
| ) | ||
| ) | ||
| process.join() | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.