-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
508 lines (433 loc) · 18.7 KB
/
main.py
File metadata and controls
508 lines (433 loc) · 18.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
"""FastAPI application entry point with lifespan management."""
from __future__ import annotations
import asyncio
import contextlib
import logging
import os
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import sqlalchemy as sa
from fastapi import FastAPI, Request
from fastapi.exceptions import HTTPException as StarletteHTTPException, RequestValidationError
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from sqlalchemy import func, select
from edictum_server.config import Settings, get_settings
from edictum_server.db.engine import async_session_factory, get_engine, init_engine
from edictum_server.notifications.base import NotificationManager
from edictum_server.notifications.loader import load_db_channels
from edictum_server.push.manager import PushManager
from edictum_server.redis.client import create_redis_client
from edictum_server.routes import (
agent_registrations,
agents,
ai,
ai_usage,
approvals,
assignment_rules,
auth,
bundles,
compositions,
contracts,
deployments,
discord,
evaluate,
events,
health,
keys,
notifications,
sessions,
settings,
setup,
slack,
stats,
stream,
telegram,
)
from edictum_server.services.approval_service import expire_approvals
logger = logging.getLogger(__name__)
_PARTITION_INTERVAL = 24 * 60 * 60 # 24 hours
async def _partition_worker() -> None:
"""Ensure event partitions exist for the next 3 months, once per day."""
while True:
try:
engine = get_engine()
if engine.dialect.name != "postgresql":
return # no-op for SQLite (tests)
async with async_session_factory()() as db:
await db.execute(sa.text("SELECT ensure_event_partitions(3)"))
await db.commit()
logger.info("Ensured event partitions for next 3 months")
except Exception:
logger.exception("Partition worker error")
await asyncio.sleep(_PARTITION_INTERVAL)
async def _approval_timeout_worker(app: FastAPI) -> None:
"""Periodically expire pending approvals past their deadline."""
while True:
try:
async with async_session_factory()() as db:
expired = await expire_approvals(db)
await db.commit()
if expired:
logger.info("Expired %d approval(s)", len(expired))
push: PushManager = app.state.push_manager
for item in expired:
timeout_data = {
"type": "approval_timeout",
"approval_id": item["id"],
"agent_id": item["agent_id"],
"tool_name": item["tool_name"],
}
push.push_to_env(item["env"], timeout_data, tenant_id=item["tenant_id"])
push.push_to_dashboard(item["tenant_id"], timeout_data)
# Group expired items by tenant for tenant-scoped notification
mgr: NotificationManager = app.state.notification_manager
by_tenant: dict[str, list[dict[str, Any]]] = {}
for item in expired:
tid = str(item["tenant_id"])
by_tenant.setdefault(tid, []).append(item)
for tid, tenant_items in by_tenant.items():
for ch in mgr.channels_for_tenant(tid):
if hasattr(ch, "update_expired"):
try:
await ch.update_expired(tenant_items)
except Exception:
logger.exception("Failed to update expired notifications")
except Exception:
logger.exception("Approval timeout worker error")
await asyncio.sleep(10)
async def _worker_monitor(app: FastAPI) -> None:
"""Restart crashed background workers every 60 seconds."""
while True:
await asyncio.sleep(60)
try:
workers = app.state.background_workers
if workers["approval_timeout"].done():
logger.warning("Restarting crashed approval_timeout worker")
workers["approval_timeout"] = asyncio.create_task(
_approval_timeout_worker(app)
)
if workers["partition"].done():
logger.warning("Restarting crashed partition worker")
workers["partition"] = asyncio.create_task(_partition_worker())
except Exception:
logger.exception("Worker monitor error")
async def _bootstrap_admin(_app: FastAPI) -> None:
"""Create default tenant + admin user on first run if no users exist."""
settings = get_settings()
from edictum_server.auth.local import LocalAuthProvider
from edictum_server.db.models import SigningKey as SigningKeyModel
from edictum_server.db.models import Tenant, User
from edictum_server.services.signing_service import generate_signing_keypair
async with async_session_factory()() as db:
# Advisory lock prevents concurrent bootstrap across instances (S7).
# Lock 42 is shared with the /api/v1/setup endpoint so the two
# bootstrap paths are mutually exclusive.
await db.execute(sa.text("SELECT pg_advisory_xact_lock(42)"))
result = await db.execute(select(func.count()).select_from(User))
user_count = result.scalar() or 0
if user_count > 0:
return
# No users yet — check if env-var bootstrap is configured
if not settings.admin_email or not settings.admin_password:
logger.warning(
"No admin account exists. "
"Visit /dashboard/setup to create one, or set "
"EDICTUM_ADMIN_EMAIL and EDICTUM_ADMIN_PASSWORD and restart."
)
return
if len(settings.admin_password) < 12:
logger.error(
"EDICTUM_ADMIN_PASSWORD must be at least 12 characters. "
"Bootstrap aborted — visit /dashboard/setup instead."
)
return
# Create default tenant
tenant = Tenant(name="default")
db.add(tenant)
await db.flush()
# Create admin user
password_hash = LocalAuthProvider.hash_password(settings.admin_password)
admin = User(
tenant_id=tenant.id,
email=settings.admin_email,
password_hash=password_hash,
is_admin=True,
)
db.add(admin)
await db.flush()
# Create initial signing key for bundle deployment
if settings.signing_key_secret:
secret = bytes.fromhex(settings.signing_key_secret)
public_key_bytes, encrypted_private_key = generate_signing_keypair(secret)
signing_key = SigningKeyModel(
tenant_id=tenant.id,
public_key=public_key_bytes,
private_key_encrypted=encrypted_private_key,
active=True,
)
db.add(signing_key)
logger.info("Created initial signing key for tenant")
await db.commit()
logger.info("Bootstrapped admin user: %s", settings.admin_email)
async def _ensure_signing_keys(settings: Settings) -> None:
"""Backfill: create signing keys for tenants that don't have one.
This handles existing deployments that were bootstrapped before
signing key auto-creation was added.
"""
if not settings.signing_key_secret:
return
from edictum_server.db.models import SigningKey as SigningKeyModel
from edictum_server.db.models import Tenant
from edictum_server.services.signing_service import generate_signing_keypair
async with async_session_factory()() as db:
# Find tenants without an active signing key
tenants_with_keys = (
select(SigningKeyModel.tenant_id).where(SigningKeyModel.active.is_(True)).subquery()
)
result = await db.execute(
select(Tenant).where(Tenant.id.not_in(select(tenants_with_keys.c.tenant_id)))
)
tenants = result.scalars().all()
if not tenants:
return
secret = bytes.fromhex(settings.signing_key_secret)
for tenant in tenants:
public_key_bytes, encrypted_private_key = generate_signing_keypair(secret)
key = SigningKeyModel(
tenant_id=tenant.id,
public_key=public_key_bytes,
private_key_encrypted=encrypted_private_key,
active=True,
)
db.add(key)
logger.info("Created signing key for tenant %s", tenant.id)
await db.commit()
async def _cleanup_ai_usage() -> None:
"""Delete AI usage log rows older than 90 days.
NOTE: Intentionally cross-tenant — this is an internal maintenance
operation that only deletes expired rows and never returns data.
Do not copy this pattern for data-access queries.
"""
from edictum_server.db.models import AiUsageLog
try:
cutoff = datetime.now(UTC) - timedelta(days=90)
async with async_session_factory()() as db:
result = await db.execute(sa.delete(AiUsageLog).where(AiUsageLog.created_at < cutoff))
rows_deleted = result.rowcount # type: ignore[attr-defined]
if rows_deleted:
await db.commit()
logger.info("Cleaned up %d old AI usage log(s)", rows_deleted)
except Exception:
logger.exception("AI usage cleanup error")
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Startup / shutdown lifecycle hook."""
settings = get_settings()
settings.validate_required()
# Warn if base_url is not HTTPS in non-local environments
parsed = urlparse(settings.base_url)
if not settings.base_url.startswith("https://") and parsed.hostname not in (
"localhost",
"127.0.0.1",
):
logger.warning(
"EDICTUM_BASE_URL is not HTTPS (%s) — session cookies will not have "
"the Secure flag. Set EDICTUM_BASE_URL to your public HTTPS URL in production.",
settings.base_url,
)
# Warn when serving behind a proxy without trusted proxy config (H1/L1).
# Without ProxyHeadersMiddleware, Starlette uses http:// in redirects
# and rate-limiting keys on the proxy IP instead of the real client.
if settings.base_url.startswith("https://") and not settings.trusted_proxies:
logger.warning(
"EDICTUM_BASE_URL is HTTPS but EDICTUM_TRUSTED_PROXIES is not set. "
"Trailing-slash redirects will use http:// (downgrade) and rate "
"limiting will key on the proxy IP, not the real client. "
"Set EDICTUM_TRUSTED_PROXIES to your reverse proxy addresses "
"(e.g. '*' for Railway/Render, or specific CIDRs).",
)
# Validate signing key secret early — log clearly if misconfigured
try:
settings.get_signing_secret()
except ValueError as exc:
logger.warning(
"EDICTUM_SIGNING_KEY_SECRET not set — bundle signing and "
"notification encryption disabled: %s",
exc,
)
# Database
engine = init_engine(settings.database_url)
# Redis
app.state.redis = create_redis_client(settings.redis_url)
# Auth provider
from edictum_server.auth.local import LocalAuthProvider
app.state.auth_provider = LocalAuthProvider(
redis=app.state.redis,
session_ttl_hours=settings.session_ttl_hours,
secure_cookies=settings.base_url.startswith("https://"),
secret_key=settings.secret_key,
)
# Push manager (SSE)
app.state.push_manager = PushManager()
# Notification manager (tenant-keyed, all channels from DB)
notification_mgr = NotificationManager()
app.state.notification_manager = notification_mgr
# Load DB-configured notification channels and register Telegram webhooks
try:
signing_secret: bytes | None = None
with contextlib.suppress(ValueError):
signing_secret = settings.get_signing_secret()
async with async_session_factory()() as db:
channels_by_tenant = await load_db_channels(
db,
app.state.redis,
settings.base_url,
secret=signing_secret,
)
await notification_mgr.reload(channels_by_tenant)
total = sum(len(chs) for chs in channels_by_tenant.values())
logger.info("Loaded %d notification channel(s) from DB", total)
except Exception:
logger.exception("Failed to load notification channels from DB")
# Bootstrap admin on first run
await _bootstrap_admin(app)
# Ensure every tenant has an active signing key
await _ensure_signing_keys(settings)
# Clean up old AI usage logs
await _cleanup_ai_usage()
# Background workers
timeout_task = asyncio.create_task(_approval_timeout_worker(app))
partition_task = asyncio.create_task(_partition_worker())
app.state.push_manager.start_cleanup_task()
# Expose workers for health monitoring
app.state.background_workers = {
"approval_timeout": timeout_task,
"partition": partition_task,
}
# Auto-restart monitor for crashed workers
monitor_task = asyncio.create_task(_worker_monitor(app))
yield
# Shutdown
monitor_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await monitor_task
app.state.push_manager.stop_cleanup_task()
workers = app.state.background_workers
workers["approval_timeout"].cancel()
workers["partition"].cancel()
with contextlib.suppress(asyncio.CancelledError):
await workers["approval_timeout"]
with contextlib.suppress(asyncio.CancelledError):
await workers["partition"]
for ch in notification_mgr.channels:
try:
await ch.close()
except Exception:
logger.exception("Error closing notification channel %s", ch.name)
await app.state.redis.aclose()
await engine.dispose()
_settings = get_settings()
_is_production = _settings.env_name == "production"
app = FastAPI(
title="Edictum Console",
description="Self-hostable agent operations console -- runtime governance for AI agents",
version="0.1.0",
lifespan=lifespan,
docs_url=None if _is_production else "/docs",
redoc_url=None if _is_production else "/redoc",
openapi_url=None if _is_production else "/openapi.json",
)
app.add_middleware(
CORSMiddleware,
allow_origins=[o.strip() for o in _settings.cors_origins.split(",") if o.strip()],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type", "X-Requested-With", "Authorization", "X-Edictum-Agent-Id"],
)
# Security response headers (HSTS, CSP, X-Frame-Options, etc.)
from edictum_server.security.headers import SecurityHeadersMiddleware # noqa: E402
app.add_middleware(SecurityHeadersMiddleware)
# CSRF protection — must be added after CORS so it runs on the inner request.
# Requires X-Requested-With header on cookie-auth mutating requests.
from edictum_server.auth.csrf import CSRFMiddleware # noqa: E402
app.add_middleware(CSRFMiddleware)
# Trusted proxy support — properly resolve client IPs behind reverse proxies
if _settings.trusted_proxies:
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware # noqa: E402
_trusted_hosts = [h.strip() for h in _settings.trusted_proxies.split(",") if h.strip()]
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts=_trusted_hosts)
# Routers
app.include_router(health.router)
app.include_router(setup.router)
app.include_router(auth.router)
app.include_router(keys.router)
app.include_router(bundles.router)
app.include_router(compositions.router)
app.include_router(contracts.router)
app.include_router(evaluate.router)
app.include_router(deployments.router)
app.include_router(stream.router)
app.include_router(events.router)
app.include_router(sessions.router)
app.include_router(approvals.router)
app.include_router(stats.router)
app.include_router(telegram.router)
app.include_router(discord.router)
app.include_router(slack.router)
app.include_router(agents.router)
app.include_router(agent_registrations.router)
app.include_router(assignment_rules.router)
app.include_router(notifications.router)
app.include_router(settings.router)
app.include_router(ai.router)
app.include_router(ai_usage.router)
# --- Validation error handler: strip Pydantic internals ----------------------
@app.exception_handler(RequestValidationError)
async def validation_error_handler(
_request: Request, exc: RequestValidationError
) -> JSONResponse:
"""Return 422 with sanitized error details.
Strips ``ctx`` and ``type`` fields from Pydantic errors to avoid
leaking framework internals (L4 finding).
"""
sanitized = [
{"loc": e.get("loc", []), "msg": e.get("msg", "Validation error")}
for e in exc.errors()
]
return JSONResponse(status_code=422, content={"detail": sanitized})
# --- 404 handler: redirect non-API paths to dashboard -------------------------
@app.exception_handler(404)
async def not_found_handler(
request: Request, exc: StarletteHTTPException
) -> JSONResponse | RedirectResponse:
"""API routes get JSON 404; everything else redirects to the dashboard."""
if request.url.path.startswith("/api/"):
detail = exc.detail if exc.detail else "Not Found"
return JSONResponse({"detail": detail}, status_code=404)
return RedirectResponse(url="/dashboard", status_code=302)
# --- SPA serving (dashboard) ---------------------------------------------------
_STATIC_DIR = Path(os.environ.get("EDICTUM_STATIC_DIR", "/app/static/dashboard"))
@app.get("/dashboard/{full_path:path}", response_model=None)
async def serve_spa(request: Request, full_path: str) -> FileResponse | HTMLResponse: # noqa: ARG001
"""Serve the React SPA — return index.html for all routes (client-side routing)."""
index = _STATIC_DIR.resolve() / "index.html"
if index.is_file():
return FileResponse(index)
return HTMLResponse(
"<h1>Dashboard not built</h1>"
"<p>Run <code>cd dashboard && pnpm build</code> or use the Vite dev server.</p>",
status_code=404,
)
_ASSETS_DIR = _STATIC_DIR / "assets"
if _ASSETS_DIR.is_dir():
app.mount(
"/dashboard/assets",
StaticFiles(directory=str(_ASSETS_DIR)),
name="dashboard-assets",
)