Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/api/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@de-otio/trellis",
"version": "0.10.7",
"version": "0.10.8",
"license": "AGPL-3.0-or-later",
"type": "module",
"main": "./dist/index.js",
Expand Down
42 changes: 25 additions & 17 deletions apps/api/src/lambda/post-confirmation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import type {
PostConfirmationTriggerHandler,
} from "aws-lambda";
import { Logger } from "@aws-lambda-powertools/logger";
import { getLambdaPrisma as getPrisma } from "../lib/lambda-prisma.js";
import { getLambdaPrisma as getPrisma, withLambdaDbBreaker } from "../lib/lambda-prisma.js";
import {
PrismaClient,
Prisma,
Expand Down Expand Up @@ -182,22 +182,30 @@ export const handler: PostConfirmationTriggerHandler = async (event) => {

const db = await getPrisma();

const result = await withHandleConflictRetry(() =>
db.$transaction(
async (tx) => provisionUserAndTenancy(tx, {
cognitoSub,
email,
emailVerified: attrs.email_verified,
federated,
idpGroups,
dateOfBirth,
ageTier,
providedHandle: attrs["custom:handle"],
invitationCode,
requestedMethod,
}),
{ timeout: 8000 },
),
// The provisioning transaction is the longest-held connection in a signup
// burst (multi-statement, up to the 8s timeout). Run it under the Lambda
// circuit breaker so a saturated DB trips fast-fail instead of being retried
// into an already-exhausted instance.
const result = await withLambdaDbBreaker(
() =>
withHandleConflictRetry(() =>
db.$transaction(
async (tx) => provisionUserAndTenancy(tx, {
cognitoSub,
email,
emailVerified: attrs.email_verified,
federated,
idpGroups,
dateOfBirth,
ageTier,
providedHandle: attrs["custom:handle"],
invitationCode,
requestedMethod,
}),
{ timeout: 8000 },
),
),
"post_confirmation.provision",
);

if (ageTier === "CHILD") {
Expand Down
12 changes: 10 additions & 2 deletions apps/api/src/lambda/pre-token-generation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import type {
} from "aws-lambda";
import { Logger } from "@aws-lambda-powertools/logger";
import { PrismaClient, type TenantRole } from "@prisma/client";
import { getLambdaPrisma as getPrisma } from "../lib/lambda-prisma.js";
import { getLambdaPrisma as getPrisma, withLambdaDbBreaker } from "../lib/lambda-prisma.js";
import {
ClaimsCache,
createClaimsCacheFromEnv,
Expand Down Expand Up @@ -194,6 +194,11 @@ export const handler: PreTokenGenerationV2TriggerHandler = async (event) => {
let cacheHit = !!claims;

if (!claims) {
// RDS is consulted only on a genuine cache miss. Emit a filterable event so
// a miss-rate metric can be derived (a miss storm — post-deploy, correlated
// TTL expiry, or the first-login wave after a signup burst — is the path
// that can exhaust DB connections; the warm cache is the primary defence).
logger.info("pretoken.cache_miss", { cognitoSub, federated });
const db = await getPrisma();
// Read the user's last explicit tenant preference, even from an expired
// cache row, so an admin-side switch-tenant call survives cache TTL.
Expand All @@ -206,7 +211,10 @@ export const handler: PreTokenGenerationV2TriggerHandler = async (event) => {
error: (err as { code?: string })?.code ?? "unknown",
});
}
const loaded = await loadFromRds(db, cognitoSub, federated, preferredTenantId);
const loaded = await withLambdaDbBreaker(
() => loadFromRds(db, cognitoSub, federated, preferredTenantId),
"pretoken.load_from_rds",
);

if (!loaded.user) {
logger.warn("pretoken.drift", { cognitoSub });
Expand Down
96 changes: 92 additions & 4 deletions apps/api/src/lib/lambda-prisma.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { Logger } from "@aws-lambda-powertools/logger";
import { getSecret } from "@aws-lambda-powertools/parameters/secrets";
import { PrismaPg } from "@prisma/adapter-pg";
import { PrismaClient } from "@prisma/client";
import { Pool } from "pg";
import { DatabaseCircuitBreaker } from "./database-circuit-breaker.js";

interface DbSecret {
username: string;
Expand All @@ -10,7 +13,73 @@ interface DbSecret {
dbname: string;
}

const logger = new Logger({ serviceName: "lambda-prisma" });

/**
* Operational knobs — env-configurable with safe defaults (threshold-secrecy
* rule: never compile operational limits into the public tarball).
*
* Why the pool MUST be capped here: each warm Lambda execution environment holds
* its OWN pg pool, and every concurrent environment shares the RDS
* `max_connections` budget (~107 usable on a t4g.micro). Under a signup burst —
* or a `pre-token` cache-miss storm — an unbounded pool lets a single function
* exhaust the instance, and Postgres then rejects connections from EVERY client
* (a global outage, not just failed signups). `?connection_limit=1` in the
* connection URL is a **no-op** under `@prisma/adapter-pg` (it is a Prisma-engine
* parameter the `pg` driver ignores), so the cap is set on the pool object — the
* only place it takes effect. See
* trellis-internal `analysis/db-connection-management/signup-burst-connection-exhaustion.md`.
*/
const DEFAULT_POOL_MAX = 1;
const DEFAULT_CONNECT_TIMEOUT_MS = 2000;
const DEFAULT_IDLE_TIMEOUT_MS = 10_000;
const DEFAULT_BREAKER_THRESHOLD = 5;
const DEFAULT_BREAKER_COOLDOWN_MS = 30_000;

let prisma: PrismaClient | null = null;
let pool: Pool | null = null;

/**
* Per-execution-environment circuit breaker for the Lambda DB path.
*
* The request path (`DatabaseConnectionManager`) has a breaker; Lambda handlers
* did not — so under DB saturation they would keep retrying into a saturated
* instance and amplify the incident. Opening the breaker makes a saturated
* environment fail fast (no connect-timeout wait, no slot held) until a cooldown
* probe succeeds. Module scope = one warm Lambda environment, the right blast
* radius. Wrap every RDS access in a handler with `withLambdaDbBreaker`.
*/
export const lambdaDbBreaker = new DatabaseCircuitBreaker({
failureThreshold: Number(
process.env.LAMBDA_DATABASE_BREAKER_THRESHOLD ?? DEFAULT_BREAKER_THRESHOLD,
),
cooldownMs: Number(
process.env.LAMBDA_DATABASE_BREAKER_COOLDOWN_MS ?? DEFAULT_BREAKER_COOLDOWN_MS,
),
});

/**
* Run a unit of DB work under the Lambda circuit breaker. Use around every RDS
* access in a Lambda handler so connection-exhaustion failures trip the breaker
* instead of being retried into a saturated instance. When the breaker is OPEN
* the call throws immediately (message begins "Circuit breaker is OPEN").
*/
export async function withLambdaDbBreaker<T>(
fn: () => Promise<T>,
operation?: string,
): Promise<T> {
try {
return await lambdaDbBreaker.execute(fn, { operation });
} catch (err) {
if (
err instanceof Error &&
err.message.startsWith("Circuit breaker is OPEN")
) {
logger.warn("lambda_db.breaker_open", { operation });
}
throw err;
}
}

/**
* Build (and cache) a PrismaClient for standalone Lambda handlers.
Expand All @@ -21,20 +90,39 @@ let prisma: PrismaClient | null = null;
* `DatabaseConnectionManager` (`ssl: { rejectUnauthorized: false }`); Lambda
* handlers must do the same. Prisma 7 supplies the connection through a pg
* driver adapter (the old `datasources` constructor option is gone), so the
* `ssl` option goes on the adapter's pool config.
* `ssl` option and the size cap go on the pool config.
*
* The pool is passed to `PrismaPg` as an explicit `pg.Pool` (NOT a
* connection-string config) because the pool is the only place `max` takes
* effect — see the knobs comment above.
*
* Cached at module scope so warm invocations reuse the client.
* Cached at module scope so warm invocations reuse the client (and its single
* connection).
*/
export async function getLambdaPrisma(): Promise<PrismaClient> {
if (prisma) return prisma;
const { username, password, host, port, dbname } = (await getSecret(
process.env.DB_SECRET_ARN!,
{ transform: "json" },
)) as unknown as DbSecret;
const adapter = new PrismaPg({
connectionString: `postgresql://${username}:${encodeURIComponent(password)}@${host}:${port}/${dbname}?connection_limit=1`,

// When an RDS Proxy is provisioned, the infra injects its endpoint here so the
// Lambda connects through the proxy (which multiplexes and caps the
// connections the DB ever sees); fall back to the direct instance endpoint
// when unset, so the default deployment is unchanged.
const dbHost = process.env.LAMBDA_DATABASE_PROXY_HOST || host;

pool = new Pool({
connectionString: `postgresql://${username}:${encodeURIComponent(password)}@${dbHost}:${port}/${dbname}`,
ssl: { rejectUnauthorized: false },
max: Number(process.env.LAMBDA_DATABASE_POOL_MAX ?? DEFAULT_POOL_MAX),
connectionTimeoutMillis: Number(
process.env.LAMBDA_DATABASE_CONNECT_TIMEOUT_MS ?? DEFAULT_CONNECT_TIMEOUT_MS,
),
idleTimeoutMillis: DEFAULT_IDLE_TIMEOUT_MS,
allowExitOnIdle: false,
});
const adapter = new PrismaPg(pool);
prisma = new PrismaClient({ adapter });
return prisma;
}
137 changes: 137 additions & 0 deletions apps/api/test/unit/lambda/lambda-prisma.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/**
* Unit Tests: Lambda Prisma helper (connection-exhaustion hardening)
*
* The Lambda DB path must cap each warm execution environment to a single
* connection (so burst demand = concurrency, not concurrency × pg-default-10),
* fail fast on connect, optionally route through an RDS Proxy, and trip a
* circuit breaker under sustained DB failure. See trellis-internal
* analysis/db-connection-management/signup-burst-connection-exhaustion.md.
*/

import { beforeEach, describe, expect, it, vi } from "vitest";

const { mockGetSecret, PoolMock, PrismaPgMock, PrismaClientMock } = vi.hoisted(
() => ({
mockGetSecret: vi.fn(),
PoolMock: vi.fn(),
PrismaPgMock: vi.fn(),
PrismaClientMock: vi.fn(),
}),
);

vi.mock("pg", () => ({ Pool: PoolMock }));
vi.mock("@prisma/adapter-pg", () => ({ PrismaPg: PrismaPgMock }));
vi.mock("@prisma/client", () => ({ PrismaClient: PrismaClientMock }));
vi.mock("@aws-lambda-powertools/parameters/secrets", () => ({
getSecret: mockGetSecret,
}));
vi.mock("@aws-lambda-powertools/logger", () => ({
Logger: class {
info = vi.fn();
warn = vi.fn();
error = vi.fn();
},
}));

const SECRET = {
username: "u",
password: "p@ss/word",
host: "db.internal",
port: 5432,
dbname: "app",
};

const IMPORT = "../../../src/lib/lambda-prisma.js";

describe("lambda-prisma", () => {
beforeEach(() => {
vi.resetModules();
vi.clearAllMocks();
mockGetSecret.mockResolvedValue(SECRET);
process.env.DB_SECRET_ARN = "arn:secret";
delete process.env.LAMBDA_DATABASE_POOL_MAX;
delete process.env.LAMBDA_DATABASE_CONNECT_TIMEOUT_MS;
delete process.env.LAMBDA_DATABASE_PROXY_HOST;
delete process.env.LAMBDA_DATABASE_BREAKER_THRESHOLD;
delete process.env.LAMBDA_DATABASE_BREAKER_COOLDOWN_MS;
});

it("caps the pool at max:1 by default with a fail-fast connect timeout", async () => {
const { getLambdaPrisma } = await import(IMPORT);
await getLambdaPrisma();

expect(PoolMock).toHaveBeenCalledTimes(1);
const opts = PoolMock.mock.calls[0][0];
expect(opts.max).toBe(1);
expect(opts.connectionTimeoutMillis).toBe(2000);
expect(opts.ssl).toEqual({ rejectUnauthorized: false });

// PrismaPg must receive the explicit Pool instance, NOT a connection-string
// config (the only place `max` actually takes effect under @prisma/adapter-pg).
expect(PrismaPgMock).toHaveBeenCalledTimes(1);
expect(PrismaPgMock.mock.calls[0][0]).toBe(PoolMock.mock.instances[0]);
});

it("honours pool-max and connect-timeout env overrides", async () => {
process.env.LAMBDA_DATABASE_POOL_MAX = "3";
process.env.LAMBDA_DATABASE_CONNECT_TIMEOUT_MS = "1500";
const { getLambdaPrisma } = await import(IMPORT);
await getLambdaPrisma();

const opts = PoolMock.mock.calls[0][0];
expect(opts.max).toBe(3);
expect(opts.connectionTimeoutMillis).toBe(1500);
});

it("connects to the direct instance host when no proxy host is set", async () => {
const { getLambdaPrisma } = await import(IMPORT);
await getLambdaPrisma();
expect(PoolMock.mock.calls[0][0].connectionString).toContain(
"@db.internal:5432/app",
);
});

it("routes through LAMBDA_DATABASE_PROXY_HOST when set", async () => {
process.env.LAMBDA_DATABASE_PROXY_HOST = "proxy.internal";
const { getLambdaPrisma } = await import(IMPORT);
await getLambdaPrisma();

const cs = PoolMock.mock.calls[0][0].connectionString;
expect(cs).toContain("@proxy.internal:5432/app");
expect(cs).not.toContain("@db.internal");
});

it("caches the client across calls (one pool per warm environment)", async () => {
const { getLambdaPrisma } = await import(IMPORT);
await getLambdaPrisma();
await getLambdaPrisma();
expect(PoolMock).toHaveBeenCalledTimes(1);
expect(mockGetSecret).toHaveBeenCalledTimes(1);
});

describe("withLambdaDbBreaker", () => {
it("passes results through on success", async () => {
const { withLambdaDbBreaker } = await import(IMPORT);
await expect(withLambdaDbBreaker(async () => "ok")).resolves.toBe("ok");
});

it("opens after the threshold, then fails fast without invoking fn", async () => {
process.env.LAMBDA_DATABASE_BREAKER_THRESHOLD = "3";
const { withLambdaDbBreaker } = await import(IMPORT);

const boom = vi.fn(async () => {
throw new Error("connect ETIMEDOUT");
});
for (let i = 0; i < 3; i++) {
await expect(withLambdaDbBreaker(boom, "op")).rejects.toThrow();
}
expect(boom).toHaveBeenCalledTimes(3);

// Breaker is OPEN — the next call short-circuits (fn NOT invoked again).
await expect(withLambdaDbBreaker(boom, "op")).rejects.toThrow(
/Circuit breaker is OPEN/,
);
expect(boom).toHaveBeenCalledTimes(3);
});
});
});
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.