Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 20 additions & 175 deletions CLAUDE.md

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ lint-ci:
uv run ruff format --check
uv run ruff check --no-fix
uv run ty check
uv run python planning/index.py --check

# Print the planning change index (grouped by status) to stdout.
# Print the planning change index (flat, newest-first) to stdout.
index:
uv run python planning/index.py

# Validate planning bundles + decisions (frontmatter, lanes, spec links); CI runs this.
check-planning:
uv run python planning/index.py --check

publish:
rm -rf dist
uv version $GITHUB_REF_NAME
Expand Down
34 changes: 34 additions & 0 deletions architecture/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Architecture

The living truth about what `faststream-outbox` does **now** — one file per
capability, updated by hand whenever a change ships. The *why* and *how it got
here* live in [`../planning/changes/`](../planning/changes/), and decisions
deliberately taken (including options rejected) in
[`../planning/decisions/`](../planning/decisions/); this directory is the present.

Each capability file is an **implementation-detail** page. Its terse
**invariant summary** ("what Claude must not break") lives in
[`../CLAUDE.md`](../CLAUDE.md) § Architecture; the **user-facing** account lives
under `../docs/`.

These files carry **no frontmatter** — they are prose, dated by git.

## Capabilities

- [producer.md](producer.md) — the publish path and transactional contract.
- [relay.md](relay.md) — foreign-broker relay chain and guardrails.
- [timers.md](timers.md) — delayed delivery, `timer_id` dedup, `cancel_timer`.
- [schema.md](schema.md) — the user-owned table, partial indexes, `validate_schema()`.
- [dlq.md](dlq.md) — opt-in dead-letter on terminal failure.
- [subscriber.md](subscriber.md) — the two-loop subscriber and lease-token invariant.
- [drain.md](drain.md) — graceful drain on stop.
- [test-broker.md](test-broker.md) — `TestOutboxBroker`, the fake client, the client contract.
- [integration.md](integration.md) — annotations, FastAPI router, engine ownership.
- [metrics.md](metrics.md) — the recorder and native-middleware seams.
- [retry.md](retry.md) — retry strategies.

## Promotion rule

Shipping a change hand-edits the affected capability file(s) here to match the
new reality, in the same PR as the code. The change bundle stays in place under
[`../planning/changes/`](../planning/changes/) — no folder move.
30 changes: 30 additions & 0 deletions architecture/integration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Integration: annotations, FastAPI router, engine ownership — implementation detail

User-facing: `docs/` (FastAPI integration). Invariant summary: `CLAUDE.md` § Integration.

## Annotations module (`annotations.py`)

This module is the canonical home for the `Annotated[..., Context(...)]` shortcuts — `OutboxMessage`, `OutboxBroker`, `OutboxProducer`, and `OutboxClient`. Each shortcut shadows its underlying class, which is imported via `from … import X as _X` so the public name can be re-bound to the `Annotated` form while the plain class stays available under its `_`-prefixed alias.

Two of the shortcuts resolve through non-obvious attribute paths:

- **Producer path**: `Context("broker._producer")`. This resolves via the `BrokerUsecase._producer` property, which returns `self.config.producer`.
- **Client path**: `Context("broker.config.broker_config.client")`. The client lives only on the outbox-specific config layer, so the shortcut points at it directly rather than through a broker property.

`faststream_outbox.fastapi` re-exports these shortcuts with a FastAPI-aware `Context` (sourced from `faststream._internal.fastapi.context`).

## FastAPI router (`fastapi/router.py`)

`OutboxRouter` subclasses FastStream's `StreamRouter`, which itself subclasses `APIRouter`. Calling `app.include_router(router)` auto-starts the inner `OutboxBroker` via the FastAPI lifespan.

This bridge is critical for the transactional contract. `wrap_callable_to_fastapi_compatible` (a FastStream internal) bridges FastAPI's dependency resolver into the consume pipeline, so a `Depends(get_session)` inside a handler resolves the same `AsyncSession` it would in an HTTP route — and `OutboxResponse(session=...)` commits the follow-on row together with the handler's domain writes.

`subscriber()` and `publisher()` are overridden to pin defaults for FastAPI-specific kwargs (such as `response_model=Default(None)`) that the base declares keyword-only without defaults. The outbox kwargs flow through unchanged.

`apply_types` and the broker `dependencies` are intentionally not exposed: `StreamRouter` forces `apply_types=False` (FastDepends takes over), and the broker's `Dependant` list isn't useful in this flow.

`fastapi` is an optional dependency (`faststream-outbox[fastapi]`).

## Engine ownership

The caller owns the `AsyncEngine` — the broker never disposes it. The engine lives on `OutboxBrokerConfig` (set by the broker constructor) and may be `None` until wired, so the broker can be constructed before the engine exists (used by the test broker).
35 changes: 35 additions & 0 deletions architecture/producer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Producer / publish path — implementation detail

User-facing: `docs/usage/` (publishing). Invariant summary: `CLAUDE.md` § Producer.

## The transactional contract

`broker.publish(body, *, queue, session, headers=None, correlation_id=None, activate_in=None, activate_at=None, timer_id=None)` and `broker.publish_batch(*bodies, queue, session, headers=None, activate_in=None, activate_at=None)` insert outbox rows through the caller's `AsyncSession`. They do not flush, commit, or open their own transaction — the row must commit with the caller's domain writes.

Both reject a non-`AsyncSession` with `TypeError`. `publish` returns the row id (or `None` on a `timer_id` conflict); `publish_batch` returns nothing and rejects `timer_id` (per-row dedup is meaningless in a batch). `broker.request` raises `NotImplementedError` (the outbox is fire-and-forget).

## OutboxProducer + the single insert path

`OutboxProducer` (`publisher/producer.py`) implements `ProducerProto[OutboxPublishCommand]` and is the canonical insert path. `broker.publish`, `publish_batch`, and `OutboxPublisher.publish` all build an `OutboxPublishCommand` (`response.py`) and route through `_basic_publish(cmd, producer=self.config.producer)` — encode + insert + NOTIFY semantics live in one place.

Session-type / queue / activate-args-mutex / tz validation lives in one shared `_validate_publish_args` (`response.py`), called by the `OutboxPublishCommand` constructor, `OutboxResponse.__init__`, and `broker.publish_batch`'s empty-batch branch — so every real publish entry point (including an empty batch) rejects the same misconfigurations identically and eagerly. The checks run in a fixed order: activate-args → session → queue.

`from_cmd` raises (relay chaining is unsupported here).

## Publisher wrapper

`broker.publisher(queue, *, headers=None, title=None, description=None, schema=None, include_in_schema=True)` returns an `OutboxPublisher` — a typed wrapper around `broker.publish` with the same transactional contract. Static decorator headers merge with per-call headers (per-call wins).

The publisher exists for AsyncAPI / per-queue config — not decorator-relay chaining: `OutboxPublisher.__call__` raises `NotImplementedError` at decoration time. A relay decorator can't reach an `AsyncSession` without breaking the transactional contract.

## Chained publishing via OutboxResponse

For chained publishing, handlers can `return OutboxResponse(body=..., queue=..., session=session)`.

`OutboxResponse.__init__` validates eagerly via the shared `_validate_publish_args` (so a misconfigured response raises at the `return` site, not at dispatch where it would masquerade as a handler failure); `as_publish_command()` re-runs the same validator, keeping `OutboxPublishCommand` the authoritative source.

FastStream gates `_make_response_publisher` on a truthy `message.reply_to`; `OutboxParser.parse_message` sets `reply_to=msg.queue` to trip it. The actual publisher is `OutboxFakePublisher` (`publisher/fake.py`), which gates on `isinstance(cmd, OutboxPublishCommand)` so plain returns (`None`, `dict`, …) become silent no-ops. `correlation_id` propagates via FastStream's `process_message` inheritance.

## Payload encoding

`_encode_payload` (`envelope.py`) is the internal helper that turns `body` into `(payload_bytes, headers_dict)`. It is used by both producers and is not exported.
57 changes: 57 additions & 0 deletions architecture/retry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Retry strategies — implementation detail

User-facing: `docs/usage/` (retries). Invariant summary: `CLAUDE.md` § Retry.

## get_next_attempt_delay

Retry strategies live in `retry.py`. The core method,
`get_next_attempt_delay(*, first_attempt_at, last_attempt_at, attempts_count, exception=None)`,
returns the delay in seconds before the next attempt, or `None` to signal
terminal failure.

The returned value is a delay, not an absolute timestamp: the DB computes
`next_attempt_at` from it server-side, so the timing is immune to clock skew
between the worker and the DB host.

The method receives the raised `exception` so subclasses can retry only on
transient errors.

## Template enforcement

`_RetryStrategyTemplate` enforces the two cross-cutting limits shared by the
concrete strategies: `max_attempts` and `max_total_delay_seconds`. A concrete
strategy that derives from the template gets both of these caps applied on top
of its own per-attempt delay computation.

## ExponentialRetry

`ExponentialRetry` adds two optional knobs on top of the template: jitter and
`max_delay_seconds`.

## max_total_delay_seconds is a lower bound

`max_total_delay_seconds` is a lower bound on the horizon, not an exact ceiling.
`elapsed` is measured as `last_attempt_at − first_attempt_at`, and both
timestamps are set equal on the first attempt. Because of that, the budget
always permits roughly one more interval beyond the nominal cap (F2-01).

Size `max_total_delay_seconds` as "at least this long", not as an exact ceiling.

## Default strategy

A subscriber with no explicit `retry_strategy` resolves to:

```python
ExponentialRetry(
initial_delay_seconds=1.0,
multiplier=2.0,
max_delay_seconds=300.0,
max_attempts=10,
jitter_factor=0.2,
)
```

This comes from `_default_retry_strategy()` in `registrator.py`.

"Delete on first error" is the wrong default for an outbox, so it is not the
default; opt in to that behavior explicitly with `NoRetry()`.
76 changes: 76 additions & 0 deletions architecture/schema.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# User-owned schema — implementation detail

User-facing: `docs/operations/alembic.md`. Invariant summary: `CLAUDE.md` § Schema.

## make_outbox_table + partial indexes

`make_outbox_table(metadata, table_name="outbox")` returns a `sqlalchemy.Table` on
the user's `MetaData`. The package never creates or migrates the table — that's
Alembic's job — but it declares three partial indexes on the table so that
autogenerate brings them up:

- `(queue, next_attempt_at) WHERE acquired_token IS NULL` — fetch CTE Branch A
(unleased rows).
- `(queue, acquired_at) WHERE acquired_token IS NOT NULL` — fetch CTE Branch B
(expired-lease reclaim).
- unique `(queue, timer_id) WHERE timer_id IS NOT NULL` — `timer_id` dedup.

## The lease CHECK constraint

In addition to the indexes, `make_outbox_table` declares a
`CHECK ((acquired_token IS NULL) = (acquired_at IS NULL))` — the `<table>_lease_ck`
constraint. It makes a half-set lease unrepresentable: the two lease columns must
either both be set or both be unset.

## Why the fetch CTE carries partial-index predicates

The fetch CTE's `OR` is written so that each disjunct explicitly carries its
partial-index predicate as a conjunct. Postgres only uses a partial index when the
query implies the index's `WHERE` clause; the naive form of the query (without the
predicate spelled out per disjunct) falls back to a seq-scan. Both fetch indexes
pay write amplification on every claim.

## ORDER BY and sort nodes

The fetch index also satisfies the `ORDER BY next_attempt_at, id`, but only for a
single-queue subscriber. A subscriber serving multiple queues
(`queue = ANY(:queues)`), or the expired-lease branch (which is ordered by
`next_attempt_at` while `_lease_idx` is keyed on `acquired_at`), adds a
`LIMIT`-bounded sort node. Prefer one subscriber per queue when fetch ordering cost
matters — the same segregation pattern as lease TTLs.

The `ORDER BY` lives on the inner CTE that selects and `LIMIT`s the rows; the outer
`UPDATE … RETURNING *` is unordered, so the order in which rows dispatch within a
single fetch batch is unspecified (F2-09). The ordering governs which rows are
claimed under contention (FIFO selection), not the per-row dispatch sequence —
which is irrelevant with `max_workers > 1` anyway. Don't rely on within-batch FIFO
delivery.

## No state column

There is no `state` column. A row is "available" iff `acquired_token IS NULL` or
`acquired_at < now() - lease_ttl_seconds`. Terminal failures `DELETE` by default;
opt in to audit via `dlq_table=make_dlq_table(metadata)`.

## validate_schema() — opt-in drift detection

`validate_schema()` is opt-in — call it from `/health` or a startup hook, not from
`broker.start()` — so that migrations can run against the same DB without a loop.

Beyond the alembic column/index diff it also probes the live partial-index
predicates (alembic ignores `postgresql_where`), catching a drifted or non-partial
`timer_id_uq` that would otherwise break `ON CONFLICT` at publish time (S2). It also
probes `pg_constraint` for the `<table>_lease_ck` CHECK (alembic has no
check-constraint comparator), catching a missing or drifted lease pairing.

Because these two probes (predicates + CHECK) catch drift that
`alembic revision --autogenerate` cannot remediate, the raised `RuntimeError`
appends a pointer to
`docs/operations/alembic.md#fixing-drift-autogenerate-cant-see` (the
hand-written-migration recipe) — but only when one of those two probes fired.
Autogenerate-fixable drift (columns, plain indexes, DLQ) gets no pointer. Message
composition lives in `_compose_schema_mismatch_message` (`client.py`), gated on
`has_blind_drift`.

Alembic is optional (`faststream-outbox[validate]`); without it `validate_schema()`
raises `ImportError`, but every other path works.
Loading