Skip to content
Draft
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
774aed4
feat: router proto update for cache event pipeline
SkArchon May 5, 2026
0d08e26
feat: export cache metrics from the router pipeline
SkArchon May 5, 2026
bb40f9a
feat: add pipeline in to the graphqlmetrics
SkArchon May 5, 2026
6190de3
fix(entity-caching-1): align with pinned graphql-go-tools
SkArchon May 6, 2026
51a54f6
ci(graphqlmetrics): include cacheevents proto in inlined buf generate
SkArchon May 6, 2026
c7bdb57
style(graphqlmetrics): gofmt server.go
SkArchon May 6, 2026
25c20ba
style(router): gofmt cacheevents/exporter/graphqlmetrics test files
SkArchon May 6, 2026
9d8e029
test(entity-caching-1): cover aggregate, exporter, builder helpers, a…
SkArchon May 6, 2026
d1cfe52
fix: review comments
SkArchon May 6, 2026
e9ea140
fix: review comments
SkArchon May 6, 2026
031785e
fix: review comments
SkArchon May 6, 2026
f47c3c3
fix(controlplane): split gql_cache_events migration to single statement
SkArchon May 6, 2026
e7b8178
fix(graphqlmetrics): align cacheevents Append order with gql_cache_ev…
SkArchon May 6, 2026
77b4abd
merge milinda/entity-intelligence-0 into entity-caching-1
SkArchon May 6, 2026
8455b62
merge milinda/entity-intelligence-0 into entity-caching-1
SkArchon May 6, 2026
c4d5882
fix(router): use buffered ArenaResolveGraphQLResponse to avoid trunca…
SkArchon May 6, 2026
0dfdaf9
Merge branch 'milinda/entity-caching-0' into milinda/entity-caching-1…
SkArchon May 6, 2026
0e742a9
feat(router): force hash analytics keys when cache events export is e…
SkArchon May 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/graphqlmetrics-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ on:
pull_request:
paths:
- "graphqlmetrics/**/*"
- "proto/wg/cosmo/cacheevents/**/*"
- ".github/workflows/graphqlmetrics-ci.yaml"

concurrency:
Expand Down Expand Up @@ -44,7 +45,7 @@ jobs:
run: make setup-build-tools

- name: Generate code
run: rm -rf graphqlmetrics/gen && buf generate --path proto/wg/cosmo/graphqlmetrics --path proto/wg/cosmo/common --template buf.graphqlmetrics.go.gen.yaml
run: rm -rf graphqlmetrics/gen && buf generate --path proto/wg/cosmo/graphqlmetrics --path proto/wg/cosmo/common --path proto/wg/cosmo/cacheevents --template buf.graphqlmetrics.go.gen.yaml
Comment thread
coderabbitai[bot] marked this conversation as resolved.

- uses: ./.github/actions/git-dirty-check
with:
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ generate:
make generate-go

generate-go:
rm -rf router/gen && buf generate --path proto/wg/cosmo/node --path proto/wg/cosmo/common --path proto/wg/cosmo/graphqlmetrics --template buf.router.go.gen.yaml
rm -rf graphqlmetrics/gen && buf generate --path proto/wg/cosmo/graphqlmetrics --path proto/wg/cosmo/common --template buf.graphqlmetrics.go.gen.yaml
rm -rf router/gen && buf generate --path proto/wg/cosmo/node --path proto/wg/cosmo/common --path proto/wg/cosmo/graphqlmetrics --path proto/wg/cosmo/cacheevents --template buf.router.go.gen.yaml
rm -rf graphqlmetrics/gen && buf generate --path proto/wg/cosmo/graphqlmetrics --path proto/wg/cosmo/common --path proto/wg/cosmo/cacheevents --template buf.graphqlmetrics.go.gen.yaml
rm -rf connect-go/wg && buf generate --path proto/wg/cosmo/platform --path proto/wg/cosmo/notifications --path proto/wg/cosmo/common --path proto/wg/cosmo/node --template buf.connect-go.go.gen.yaml

start-cp:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
-- migrate:up

CREATE TABLE IF NOT EXISTS gql_cache_events_raw
(
-- See https://github.com/PostHog/posthog/issues/10616 why ZSTD(3) is used
Timestamp DateTime64(9, 'UTC') CODEC(Delta, ZSTD(3)),

-- Tenant
OrganizationID LowCardinality(String) CODEC(ZSTD(3)),
FederatedGraphID LowCardinality(String) CODEC(ZSTD(3)),
RouterConfigVersion LowCardinality(String) CODEC(ZSTD(3)),

-- Event discriminator. Canonical lowercase string. Values:
-- 'l1_read','l2_read','l1_write','l2_write','fetch_timing',
-- 'subgraph_error','shadow_comparison','mutation','header_impact',
-- 'cache_op_error'
EventType LowCardinality(String) CODEC(ZSTD(3)),

-- Operation context
OperationHash LowCardinality(String) CODEC(ZSTD(3)),
OperationName LowCardinality(String) CODEC(ZSTD(3)),
OperationType LowCardinality(String) CODEC(ZSTD(3)),
ClientName LowCardinality(String) CODEC(ZSTD(3)),
ClientVersion LowCardinality(String) CODEC(ZSTD(3)),
TraceID String CODEC(ZSTD(3)),
IsShadow Bool CODEC(ZSTD(3)),

-- Cache identity
EntityType LowCardinality(String) CODEC(ZSTD(3)),
SubgraphID LowCardinality(String) CODEC(ZSTD(3)),
KeyHash UInt64 CODEC(ZSTD(3)),

-- Field-level identity (root field for entity fetches; nested fields for value-type traversal)
FieldName LowCardinality(String) CODEC(ZSTD(3)),
FieldHash UInt64 CODEC(ZSTD(3)),
FieldPath Array(LowCardinality(String)) CODEC(ZSTD(3)),
EntityCount UInt32 CODEC(ZSTD(3)),
EntityUniqueKeys UInt32 CODEC(ZSTD(3)),

-- Read events (l1_read, l2_read)
Verdict LowCardinality(String) CODEC(ZSTD(3)),
ByteSize UInt32 CODEC(ZSTD(3)),
CacheAgeMs UInt32 CODEC(ZSTD(3)),

-- Write events (l1_write, l2_write)
TTLMs UInt32 CODEC(ZSTD(3)),
WriteReason LowCardinality(String) CODEC(ZSTD(3)),
Source LowCardinality(String) CODEC(ZSTD(3)),

-- Fetch timing
FetchSource LowCardinality(String) CODEC(ZSTD(3)),
DurationMs Float64 CODEC(ZSTD(3)),
TTFBMs Float64 CODEC(ZSTD(3)),
ItemCount UInt32 CODEC(ZSTD(3)),
IsEntityFetch Bool CODEC(ZSTD(3)),
HttpStatusCode UInt16 CODEC(ZSTD(3)),
ResponseBytes UInt32 CODEC(ZSTD(3)),

-- Errors (subgraph_error, cache_op_error)
ErrorMessage String CODEC(ZSTD(3)),
ErrorCode LowCardinality(String) CODEC(ZSTD(3)),
CacheOp LowCardinality(String) CODEC(ZSTD(3)),
CacheName LowCardinality(String) CODEC(ZSTD(3)),

-- Shadow + mutation share these columns
ShadowIsFresh Bool CODEC(ZSTD(3)),
CachedHash UInt64 CODEC(ZSTD(3)),
FreshHash UInt64 CODEC(ZSTD(3)),
CachedBytes UInt32 CODEC(ZSTD(3)),
FreshBytes UInt32 CODEC(ZSTD(3)),
ConfiguredTTLMs UInt32 CODEC(ZSTD(3)),

-- Mutation
MutationRootField LowCardinality(String) CODEC(ZSTD(3)),
HadCachedValue Bool CODEC(ZSTD(3)),
IsStale Bool CODEC(ZSTD(3)),

-- Header impact
BaseKeyHash UInt64 CODEC(ZSTD(3)),
HeaderHash UInt64 CODEC(ZSTD(3)),
ResponseHash UInt64 CODEC(ZSTD(3)),

INDEX idx_op_hash OperationHash TYPE bloom_filter(0.001) GRANULARITY 1,
INDEX idx_entity EntityType TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_subgraph SubgraphID TYPE bloom_filter(0.01) GRANULARITY 1,
INDEX idx_key_hash KeyHash TYPE bloom_filter(0.001) GRANULARITY 1
)
engine = MergeTree PARTITION BY toDate(Timestamp)
ORDER BY (OrganizationID, FederatedGraphID, EventType, OperationHash, EntityType, SubgraphID, toUnixTimestamp(Timestamp))
TTL toDateTime(Timestamp) + toIntervalDay(7)
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1, non_replicated_deduplication_window = 1000;

CREATE TABLE IF NOT EXISTS gql_cache_events_5m_90d
(
Timestamp DateTime('UTC') CODEC(Delta, ZSTD(3)),

OrganizationID LowCardinality(String) CODEC(ZSTD(3)),
FederatedGraphID LowCardinality(String) CODEC(ZSTD(3)),
RouterConfigVersion LowCardinality(String) CODEC(ZSTD(3)),

EventType LowCardinality(String) CODEC(ZSTD(3)),
OperationHash LowCardinality(String) CODEC(ZSTD(3)),
OperationName LowCardinality(String) CODEC(ZSTD(3)),
OperationType LowCardinality(String) CODEC(ZSTD(3)),
ClientName LowCardinality(String) CODEC(ZSTD(3)),
ClientVersion LowCardinality(String) CODEC(ZSTD(3)),
EntityType LowCardinality(String) CODEC(ZSTD(3)),
SubgraphID LowCardinality(String) CODEC(ZSTD(3)),
Verdict LowCardinality(String) CODEC(ZSTD(3)),
FieldName LowCardinality(String) CODEC(ZSTD(3)),
FetchSource LowCardinality(String) CODEC(ZSTD(3)),
IsShadow Bool CODEC(ZSTD(3)),

Events UInt64 CODEC(ZSTD(3)),
SumByteSize UInt64 CODEC(ZSTD(3)),
SumDurationMs Float64 CODEC(ZSTD(3)),
SumCacheAgeMs UInt64 CODEC(ZSTD(3)),
SumStale UInt64 CODEC(ZSTD(3)),
SumEntityCount UInt64 CODEC(ZSTD(3))
)
engine = SummingMergeTree PARTITION BY toDate(Timestamp)
ORDER BY (OrganizationID, FederatedGraphID, EventType, OperationHash, EntityType, SubgraphID, ClientName, ClientVersion, RouterConfigVersion, OperationName, OperationType, Verdict, FetchSource, IsShadow, toUnixTimestamp(Timestamp))
TTL toDateTime(Timestamp) + toIntervalDay(90)
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1;

CREATE MATERIALIZED VIEW IF NOT EXISTS gql_cache_events_5m_90d_mv TO gql_cache_events_5m_90d AS
SELECT
toStartOfFiveMinute(Timestamp) as Timestamp,
toLowCardinality(OrganizationID) as OrganizationID,
toLowCardinality(FederatedGraphID) as FederatedGraphID,
toLowCardinality(RouterConfigVersion) as RouterConfigVersion,
toLowCardinality(EventType) as EventType,
toLowCardinality(OperationHash) as OperationHash,
toLowCardinality(OperationName) as OperationName,
toLowCardinality(OperationType) as OperationType,
toLowCardinality(ClientName) as ClientName,
toLowCardinality(ClientVersion) as ClientVersion,
toLowCardinality(EntityType) as EntityType,
toLowCardinality(SubgraphID) as SubgraphID,
toLowCardinality(Verdict) as Verdict,
toLowCardinality(FieldName) as FieldName,
toLowCardinality(FetchSource) as FetchSource,
IsShadow as IsShadow,
count() as Events,
sum(ByteSize) as SumByteSize,
sum(DurationMs) as SumDurationMs,
sum(CacheAgeMs) as SumCacheAgeMs,
sumIf(1, IsStale) as SumStale,
sum(EntityCount) as SumEntityCount
FROM gql_cache_events_raw
GROUP BY
Timestamp,
OrganizationID,
FederatedGraphID,
RouterConfigVersion,
EventType,
OperationHash,
OperationName,
OperationType,
ClientName,
ClientVersion,
EntityType,
SubgraphID,
Verdict,
FieldName,
FetchSource,
IsShadow
ORDER BY Timestamp;

CREATE TABLE IF NOT EXISTS gql_cache_events_1d_90d
(
Timestamp DateTime('UTC') CODEC(Delta, ZSTD(3)),

OrganizationID LowCardinality(String) CODEC(ZSTD(3)),
FederatedGraphID LowCardinality(String) CODEC(ZSTD(3)),

EventType LowCardinality(String) CODEC(ZSTD(3)),
EntityType LowCardinality(String) CODEC(ZSTD(3)),
SubgraphID LowCardinality(String) CODEC(ZSTD(3)),
Verdict LowCardinality(String) CODEC(ZSTD(3)),

Events UInt64 CODEC(ZSTD(3)),
SumByteSize UInt64 CODEC(ZSTD(3)),
SumDurationMs Float64 CODEC(ZSTD(3))
)
engine = SummingMergeTree PARTITION BY toDate(Timestamp)
ORDER BY (OrganizationID, FederatedGraphID, EventType, EntityType, SubgraphID, Verdict, toUnixTimestamp(Timestamp))
TTL toDateTime(Timestamp) + toIntervalDay(90)
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1;

CREATE MATERIALIZED VIEW IF NOT EXISTS gql_cache_events_1d_90d_mv TO gql_cache_events_1d_90d AS
SELECT
toStartOfDay(Timestamp) as Timestamp,
toLowCardinality(OrganizationID) as OrganizationID,
toLowCardinality(FederatedGraphID) as FederatedGraphID,
toLowCardinality(EventType) as EventType,
toLowCardinality(EntityType) as EntityType,
toLowCardinality(SubgraphID) as SubgraphID,
toLowCardinality(Verdict) as Verdict,
count() as Events,
sum(ByteSize) as SumByteSize,
sum(DurationMs) as SumDurationMs
FROM gql_cache_events_raw
GROUP BY
Timestamp,
OrganizationID,
FederatedGraphID,
EventType,
EntityType,
SubgraphID,
Verdict
ORDER BY Timestamp;

-- migrate:down

DROP VIEW IF EXISTS gql_cache_events_1d_90d_mv;
DROP TABLE IF EXISTS gql_cache_events_1d_90d;
DROP VIEW IF EXISTS gql_cache_events_5m_90d_mv;
DROP TABLE IF EXISTS gql_cache_events_5m_90d;
DROP TABLE IF EXISTS gql_cache_events_raw;
50 changes: 50 additions & 0 deletions graphqlmetrics/cacheevents/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package cacheevents

import (
"time"

cacheeventsv1 "github.com/wundergraph/cosmo/graphqlmetrics/gen/proto/wg/cosmo/cacheevents/v1"
utils "github.com/wundergraph/cosmo/graphqlmetrics/pkg/utils"
)

// BatchItem is the unit of work pushed onto the cache-events batch processor.
// One BatchItem corresponds to one PublishEntityCacheEvents RPC call from
// a router; it carries the events and the JWT claims that authenticated
// the request.
type BatchItem struct {
Events []*cacheeventsv1.CacheEvent
Claims *utils.GraphAPITokenClaims
}

// ProcessorConfig carries the tunables for the cache-events batch processor.
// Defaults are set higher than the schema-usage processor because cache
// events are 10-100x request volume.
type ProcessorConfig struct {
MaxBatchSize int
MaxQueueSize int
MaxWorkers int
Interval time.Duration
}

// DefaultProcessorConfig returns the resource-isolated defaults used when no
// env-overrides are provided. These are intentionally separate from the
// schema-usage processor's defaults so a cache-events spike does not
// degrade schema-usage SLAs.
func DefaultProcessorConfig() ProcessorConfig {
return ProcessorConfig{
MaxBatchSize: 8192,
MaxQueueSize: 131072,
MaxWorkers: 4,
Interval: 5 * time.Second,
}
}

// batchCost returns the number of events in the batch — used by the
// generic batchprocessor as the cost function.
func batchCost(items []BatchItem) int {
n := 0
for _, it := range items {
n += len(it.Events)
}
return n
}
46 changes: 46 additions & 0 deletions graphqlmetrics/cacheevents/processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package cacheevents

import (
"testing"
"time"

"github.com/stretchr/testify/require"
cacheeventsv1 "github.com/wundergraph/cosmo/graphqlmetrics/gen/proto/wg/cosmo/cacheevents/v1"
)

func TestDefaultProcessorConfig(t *testing.T) {
t.Parallel()

cfg := DefaultProcessorConfig()
require.Equal(t, 8192, cfg.MaxBatchSize)
require.Equal(t, 131072, cfg.MaxQueueSize)
require.Equal(t, 4, cfg.MaxWorkers)
require.Equal(t, 5*time.Second, cfg.Interval)
}

func TestBatchCost(t *testing.T) {
t.Parallel()

t.Run("nil slice has zero cost", func(t *testing.T) {
require.Equal(t, 0, batchCost(nil))
})

t.Run("empty slice has zero cost", func(t *testing.T) {
require.Equal(t, 0, batchCost([]BatchItem{}))
})

t.Run("sums event counts across items", func(t *testing.T) {
items := []BatchItem{
{Events: []*cacheeventsv1.CacheEvent{{}, {}, {}}},
{Events: []*cacheeventsv1.CacheEvent{{}}},
{Events: nil},
{Events: []*cacheeventsv1.CacheEvent{{}, {}}},
}
require.Equal(t, 6, batchCost(items))
})

t.Run("item with nil events contributes zero", func(t *testing.T) {
items := []BatchItem{{Events: nil}, {Events: nil}}
require.Equal(t, 0, batchCost(items))
})
}
Loading
Loading