From 0af4de3648123f107b54a7ad44125c3ccf019e9f Mon Sep 17 00:00:00 2001 From: peroxile <143724653+peroxile@users.noreply.github.com> Date: Tue, 28 Apr 2026 13:48:55 +0200 Subject: [PATCH] feat: add entry point for external callers --- core/engine.py | 197 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 164 insertions(+), 33 deletions(-) diff --git a/core/engine.py b/core/engine.py index c2ff66c..d61ed00 100644 --- a/core/engine.py +++ b/core/engine.py @@ -1,9 +1,9 @@ """ -core/engine.py - Coordinates the full SwiftBox loop: input -> normalize -> decide -> act -> state -> verify +Entry point for external callers (e.g. a Telegram bot, CLI, or scheduler). +Does not contain policy — reads everything from config. """ from __future__ import annotations @@ -39,10 +39,33 @@ logger = logging.getLogger(__name__) -# Config loader +# Config loaders + +def _load_defaults(root: Path) -> dict: + """Load config/default.yml if it exists. Returns empty dict if not found.""" + default_path = root / "config/default.yml" + if default_path.exists(): + return yaml.safe_load(default_path.read_text()) or {} + return {} + + +def _merge(base: dict, override: dict) -> dict: + """Shallow merge: override wins on conflict at top level and one level deep.""" + result = dict(base) + for k, v in override.items(): + if isinstance(v, dict) and isinstance(result.get(k), dict): + result[k] = {**result[k], **v} + else: + result[k] = v + return result + def load_host_config(path: str | Path) -> HostConfig: - data = yaml.safe_load(Path(path).read_text()) + path = Path(path) + root = path.parent.parent.parent # config/hosts/.yml -> repo root + defaults = _load_defaults(root) + host_raw = yaml.safe_load(path.read_text()) + data = _merge(defaults, host_raw) h = data["host"] exe = data.get("execution", {}) return HostConfig( @@ -61,15 +84,19 @@ def load_host_config(path: str | Path) -> HostConfig: repo_source=data.get("repo", {}).get("source"), repo_branch=data.get("repo", {}).get("branch", "main"), verify_checksums=data.get("repo", {}).get("verify_checksums", True), + ssh_config=data.get("ssh"), # None for local hosts ) -def _notify( - records: list[StateRecord], - targets: list[str], - log_path: str, -) -> None: - """Dispatch completed records to all configured notification targets.""" +def _notify_targets_for(host_config_path: Path) -> list[str]: + raw = yaml.safe_load(host_config_path.read_text()) + return raw.get("notifications", {}).get("targets", ["stdout"]) + + +# Notify dispatcher + + +def _notify(records: list[StateRecord], targets: list[str], log_path: str) -> None: for target in targets: if target == "stdout": notify_stdout.emit_all(records) @@ -82,10 +109,6 @@ def _notify( # Script executor def _execute_script(req: ActionRequest) -> ActionResult: - """ - Run the script for an ActionRequest. - Dry-run returns immediately without executing. - """ timestamp = datetime.now(timezone.utc).isoformat() if req.dry_run: @@ -119,7 +142,7 @@ def _execute_script(req: ActionRequest) -> ActionResult: ) if not req.script_path: - # notify_only workflow — nothing to execute + # notify_only workflow — no script to run return ActionResult( workflow_id=req.workflow_id, script_path="", @@ -154,12 +177,7 @@ def _execute_script(req: ActionRequest) -> ActionResult: start = time.monotonic() try: - proc = subprocess.run( - cmd, - capture_output=True, - text=True, - timeout=60, - ) + proc = subprocess.run(cmd, capture_output=True, text=True, timeout=60) duration = time.monotonic() - start status = ActionStatus.SUCCESS if proc.returncode == 0 else ActionStatus.FAILED return ActionResult( @@ -201,7 +219,7 @@ def _serial(obj: Any) -> Any: return {k: _serial(getattr(obj, k)) for k in obj.__dataclass_fields__} if isinstance(obj, (list, tuple)): return [_serial(i) for i in obj] - if hasattr(obj, "value"): # Enum + if hasattr(obj, "value"): return obj.value return obj @@ -228,6 +246,7 @@ def run( Execute one full SwiftBox loop. Returns all StateRecords produced in this run. """ + host_config_path = Path(host_config_path) state_dir = Path(state_dir) # Load config @@ -235,11 +254,7 @@ def run( checks = load_healthcheck_config(healthcheck_config_path) workflows = load_workflows(workflows_path) perms = load_permissions(permissions_path) - - # Read notification targets from host config yaml directly - _raw = yaml.safe_load(Path(host_config_path).read_text()) - notify_targets: list[str] = _raw.get("notifications", {}).get("targets", ["stdout"]) - + notify_targets = _notify_targets_for(host_config_path) check_def_map = {c.id: c for c in checks} logger.info( @@ -247,8 +262,13 @@ def run( host_config.name, host_config.execution_mode, host_config.dry_run, ) - # Detect - issues = run_all_checks(checks, host_config.name) + # Detect + if host_config.ssh_config: + logger.info("SSH mode: running checks remotely on %s", host_config.ssh_config.get("host")) + from adapters.ssh.checks import run_ssh_checks + issues = run_ssh_checks(host_config, checks) + else: + issues = run_all_checks(checks, host_config.name) actionable = [i for i in issues if i.status != CheckStatus.OK] logger.info("%d checks run, %d require attention", len(issues), len(actionable)) @@ -256,7 +276,7 @@ def run( logger.info("All checks passed. No action required.") return [] - # Plan + # Plan requests = plan(actionable, check_def_map, workflows, perms, host_config, role) logger.info("%d action requests planned", len(requests)) @@ -286,11 +306,122 @@ def run( _write_state(record, state_dir) records.append(record) - verified = all_passed(verifications) logger.info( "Workflow %s: status=%s verified=%s", - req.workflow_id, action_result.status, verified, + req.workflow_id, action_result.status, all_passed(verifications), ) _notify(records, notify_targets, host_config.log_path) - return records \ No newline at end of file + return records + + +# Single workflow dispatch +# Used by products that already know what workflow to run. + +def run_workflow( + workflow_id: str, + host_config_path: str | Path = "config/hosts/vps.yml", + workflows_path: str | Path = "config/workflows.yaml", + permissions_path: str | Path = "config/permissions.yml", + healthcheck_config_path: str | Path = "config/healthchecks/server.yml", + state_dir: str | Path = "state", + role: str = "operator", + issue: IssueReport | None = None, +) -> StateRecord: + """ + Execute one specific workflow by id and return a single StateRecord. + + The caller decides which workflow applies. SwiftBox enforces permissions, + dry-run, and verification. + + `issue` is optional context from the caller (e.g. the parsed CLI error). + If not provided, a minimal synthetic IssueReport is created. + """ + from core.plan import _is_allowed, _requires_approval + + host_config_path = Path(host_config_path) + state_dir = Path(state_dir) + + host_config = load_host_config(host_config_path) + workflows = load_workflows(workflows_path) + perms = load_permissions(permissions_path) + checks = load_healthcheck_config(healthcheck_config_path) + check_def_map = {c.id: c for c in checks} + notify_targets = _notify_targets_for(host_config_path) + + workflow = workflows.get(workflow_id) + if not workflow: + raise ValueError(f"Workflow '{workflow_id}' not found in {workflows_path}") + + if issue is None: + issue = IssueReport( + check_id=f"external.{workflow_id}", + host=host_config.name, + adapter="external", + method="dispatch", + status=CheckStatus.FAIL, + value=None, + message=f"Dispatched directly by caller: {workflow_id}", + ) + + allowed, reason = _is_allowed(workflow_id, role, host_config.name, perms) + if not allowed: + logger.warning("run_workflow blocked: %s", reason) + action_result = ActionResult( + workflow_id=workflow_id, + script_path="", + host=host_config.name, + status=ActionStatus.BLOCKED, + exit_code=None, + stdout="", + stderr=reason, + duration_seconds=0.0, + dry_run=host_config.dry_run, + timestamp=datetime.now(timezone.utc).isoformat(), + ) + record = StateRecord( + event="workflow.blocked", + host=host_config.name, + workflow_id=workflow_id, + issue=issue, + action=action_result, + verification=None, + timestamp=datetime.now(timezone.utc).isoformat(), + ) + _write_state(record, state_dir) + _notify([record], notify_targets, host_config.log_path) + return record + + effective_dry_run = perms.global_dry_run or host_config.dry_run + approved = not _requires_approval(workflow, workflow_id, perms, host_config) + + steps = workflow.steps + script_path = steps[0].get("script", "") if steps else "" + script_args: dict[str, Any] = dict(steps[0].get("args", {})) if steps else {} + + req = ActionRequest( + workflow_id=workflow_id, + host=host_config.name, + role=role, + trigger_issue=issue, + script_path=script_path, + script_args=script_args, + dry_run=effective_dry_run, + approved=approved, + ) + + action_result = _execute_script(req) + verifications = verify_workflow(workflow, action_result, issue, check_def_map, host_config.name) + + record = StateRecord( + event="workflow.executed", + host=host_config.name, + workflow_id=workflow_id, + issue=issue, + action=action_result, + verification=verifications[0] if verifications else None, + timestamp=datetime.now(timezone.utc).isoformat(), + ) + _write_state(record, state_dir) + _notify([record], notify_targets, host_config.log_path) + return record