From 8a242265b05597450d0ae594d6180810d503c175 Mon Sep 17 00:00:00 2001 From: Liz Fong-Jones Date: Fri, 1 May 2026 09:57:12 +0000 Subject: [PATCH 1/3] feat: add SubscriptionUserDataProvider hook for BalanceStrategy Add an optional SubscriptionUserDataProvider interface that a BalanceStrategy may implement. When present, sarama invokes SubscriptionUserData(topics) immediately before each JoinGroup, and the returned bytes replace the statically configured Consumer.Group.Member.UserData for that strategy's subscription metadata only. A nil result or non-nil error falls back to the static UserData (errors are logged, not fatal). This mirrors the Java ConsumerPartitionAssignor.subscriptionUserData hook and enables load-aware assignors that report fresh per-cycle observations (CPU, lag, latency, etc.) to the group leader on every rebalance cycle. Existing BalanceStrategy implementations are unaffected: the interface is optional and the behavior for strategies that do not implement it is unchanged. The interface is treated as V1; future revisions should be introduced as separate interfaces (e.g., SubscriptionUserDataProviderV2) to preserve backward compatibility. Includes a unit test of the per-strategy metadata builder and a worked example under examples/consumer_load_aware/ that wraps the built-in sticky strategy with a load-reporting SubscriptionUserDataProvider. Fixes #3505 Signed-off-by: Liz Fong-Jones --- balance_strategy.go | 23 +++ consumer_group.go | 40 ++++- consumer_group_test.go | 75 +++++++++ examples/README.md | 4 + examples/consumer_load_aware/README.md | 26 ++++ examples/consumer_load_aware/go.mod | 25 +++ examples/consumer_load_aware/go.sum | 86 +++++++++++ .../consumer_load_aware/load_aware_sticky.go | 56 +++++++ examples/consumer_load_aware/main.go | 145 ++++++++++++++++++ 9 files changed, 472 insertions(+), 8 deletions(-) create mode 100644 examples/consumer_load_aware/README.md create mode 100644 examples/consumer_load_aware/go.mod create mode 100644 examples/consumer_load_aware/go.sum create mode 100644 examples/consumer_load_aware/load_aware_sticky.go create mode 100644 examples/consumer_load_aware/main.go diff --git a/balance_strategy.go b/balance_strategy.go index 59f894867..88081756b 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -57,6 +57,29 @@ type BalanceStrategy interface { AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) } +// SubscriptionUserDataProvider is an optional interface that a BalanceStrategy +// may implement to inject per-cycle metadata into the ConsumerGroupMemberMetadata +// UserData field on each JoinGroup. When a strategy implements this interface, +// Sarama invokes SubscriptionUserData immediately before each JoinGroup, passing +// the member's currently subscribed topics. The returned bytes replace the +// statically configured Consumer.Group.Member.UserData for the metadata of that +// strategy only. +// +// Returning a nil byte slice or a non-nil error causes Sarama to fall back to +// the statically configured UserData; an error is logged but does not fail the +// JoinGroup. +// +// This mirrors Java's ConsumerPartitionAssignor.subscriptionUserData and enables +// load-aware assignors that report fresh observations (e.g., CPU, lag, latency) +// to the group leader on every rebalance cycle. +// +// The interface is treated as V1; future revisions should be introduced as +// separate interfaces (e.g., SubscriptionUserDataProviderV2) to preserve +// backward compatibility with existing implementations. +type SubscriptionUserDataProvider interface { + SubscriptionUserData(topics []string) ([]byte, error) +} + // -------------------------------------------------------------------- // NewBalanceStrategyRange returns a range balance strategy, diff --git a/consumer_group.go b/consumer_group.go index 3599b689d..307c25d2b 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -477,18 +477,17 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) ( } } - meta := &ConsumerGroupMemberMetadata{ - Topics: topics, - UserData: c.userData, + addProtocol := func(strategy BalanceStrategy) error { + return req.AddGroupProtocolMetadata(strategy.Name(), c.subscriptionMetadata(strategy, topics)) } - var strategy BalanceStrategy - if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy != nil { - if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil { + + if strategy := c.config.Consumer.Group.Rebalance.Strategy; strategy != nil { + if err := addProtocol(strategy); err != nil { return nil, err } } else { - for _, strategy = range c.config.Consumer.Group.Rebalance.GroupStrategies { - if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil { + for _, strategy := range c.config.Consumer.Group.Rebalance.GroupStrategies { + if err := addProtocol(strategy); err != nil { return nil, err } } @@ -497,6 +496,31 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) ( return coordinator.JoinGroup(req) } +// subscriptionMetadata builds the ConsumerGroupMemberMetadata for a single +// strategy in a JoinGroup request. If the strategy implements +// SubscriptionUserDataProvider, its SubscriptionUserData hook is invoked to +// obtain per-cycle UserData; otherwise (or on error/nil result) the statically +// configured Consumer.Group.Member.UserData is used. +func (c *consumerGroup) subscriptionMetadata(strategy BalanceStrategy, topics []string) *ConsumerGroupMemberMetadata { + userData := c.userData + if p, ok := strategy.(SubscriptionUserDataProvider); ok { + data, err := p.SubscriptionUserData(topics) + switch { + case err != nil: + Logger.Printf( + "consumergroup/%s strategy %q SubscriptionUserData failed: %v; falling back to static UserData\n", + c.groupID, strategy.Name(), err, + ) + case data != nil: + userData = data + } + } + return &ConsumerGroupMemberMetadata{ + Topics: topics, + UserData: userData, + } +} + // findStrategy returns the BalanceStrategy with the specified protocolName // from the slice provided. func (c *consumerGroup) findStrategy(name string, groupStrategies []BalanceStrategy) (BalanceStrategy, bool) { diff --git a/consumer_group_test.go b/consumer_group_test.go index 2e6f219fc..1fdf66bf0 100644 --- a/consumer_group_test.go +++ b/consumer_group_test.go @@ -5,6 +5,7 @@ package sarama import ( "context" "errors" + "fmt" "sync" "testing" "time" @@ -252,3 +253,77 @@ func TestConsumerShouldNotRetrySessionIfContextCancelled(t *testing.T) { _, err = c.retryNewSession(ctx, nil, nil, 1024, true) assert.Equal(t, context.Canceled, err) } + +// strategyWithSubscriptionUserData wraps a BalanceStrategy and adds a +// SubscriptionUserDataProvider implementation that returns the configured +// data/error. It also records the topics it was invoked with so tests can +// assert per-cycle invocation. +type strategyWithSubscriptionUserData struct { + BalanceStrategy + data []byte + err error + called [][]string +} + +func (s *strategyWithSubscriptionUserData) SubscriptionUserData(topics []string) ([]byte, error) { + s.called = append(s.called, append([]string(nil), topics...)) + return s.data, s.err +} + +func TestSubscriptionMetadata(t *testing.T) { + staticUserData := []byte("static") + topics := []string{"my-topic"} + + t.Run("strategy without provider uses static user data", func(t *testing.T) { + c := &consumerGroup{userData: staticUserData} + meta := c.subscriptionMetadata(NewBalanceStrategyRange(), topics) + assert.Equal(t, topics, meta.Topics) + assert.Equal(t, staticUserData, meta.UserData) + }) + + t.Run("provider returning bytes overrides static user data", func(t *testing.T) { + c := &consumerGroup{userData: staticUserData} + strategy := &strategyWithSubscriptionUserData{ + BalanceStrategy: NewBalanceStrategyRange(), + data: []byte("per-cycle"), + } + meta := c.subscriptionMetadata(strategy, topics) + assert.Equal(t, []byte("per-cycle"), meta.UserData) + assert.Equal(t, [][]string{topics}, strategy.called) + }) + + t.Run("provider returning nil falls back to static user data", func(t *testing.T) { + c := &consumerGroup{userData: staticUserData} + strategy := &strategyWithSubscriptionUserData{ + BalanceStrategy: NewBalanceStrategyRange(), + data: nil, + } + meta := c.subscriptionMetadata(strategy, topics) + assert.Equal(t, staticUserData, meta.UserData) + }) + + t.Run("provider returning error falls back to static user data", func(t *testing.T) { + c := &consumerGroup{userData: staticUserData} + strategy := &strategyWithSubscriptionUserData{ + BalanceStrategy: NewBalanceStrategyRange(), + data: []byte("ignored"), + err: fmt.Errorf("boom"), + } + meta := c.subscriptionMetadata(strategy, topics) + assert.Equal(t, staticUserData, meta.UserData) + }) + + t.Run("each strategy in GroupStrategies receives its own user data", func(t *testing.T) { + c := &consumerGroup{userData: staticUserData} + s1 := &strategyWithSubscriptionUserData{ + BalanceStrategy: NewBalanceStrategyRange(), + data: []byte("from-s1"), + } + s2 := &strategyWithSubscriptionUserData{ + BalanceStrategy: NewBalanceStrategyRoundRobin(), + data: []byte("from-s2"), + } + assert.Equal(t, []byte("from-s1"), c.subscriptionMetadata(s1, topics).UserData) + assert.Equal(t, []byte("from-s2"), c.subscriptionMetadata(s2, topics).UserData) + }) +} diff --git a/examples/README.md b/examples/README.md index baded0116..2e743f1bc 100644 --- a/examples/README.md +++ b/examples/README.md @@ -17,3 +17,7 @@ Basic example to use a producer interceptor that produces [OpenTelemetry](https: #### Exacly-once transactional paradigm [exactly_once](./exactly_once) Basic example to use a transactional producer that produce consumed message from some topics within a Kafka transaction. To ensure transactional-id uniqueness it implement some **_ProducerProvider_** that build a producer using current message topic-partition. + +#### Load-aware sticky consumer + +[consumer_load_aware](./consumer_load_aware) demonstrates the `SubscriptionUserDataProvider` interface: a `LoadAwareSticky` strategy wraps the built-in sticky assignor and injects a fresh load sample (CPU%, in-flight count) into each JoinGroup's subscription metadata. diff --git a/examples/consumer_load_aware/README.md b/examples/consumer_load_aware/README.md new file mode 100644 index 000000000..eafc80c2d --- /dev/null +++ b/examples/consumer_load_aware/README.md @@ -0,0 +1,26 @@ +# Load-aware sticky consumer example + +This example demonstrates the `sarama.SubscriptionUserDataProvider` interface +(see issue [#3505](https://github.com/IBM/sarama/issues/3505)). A +`LoadAwareSticky` strategy wraps the built-in sticky assignor and injects a +fresh `LoadSample` (CPU percent, in-flight count, lag) into the member's +JoinGroup subscription metadata on every rebalance cycle. + +`Plan` and `AssignmentData` are delegated unchanged to the sticky strategy; +this example deliberately keeps the assignor logic simple to focus on the +per-cycle metadata hook. A production load-aware assignor would also implement +its own `Plan` and decode each member's `UserData` on the leader to weight the +assignment. + +## Run + +```bash +$ go run . -brokers="127.0.0.1:9092" -topics="sarama" -group="example" +``` + +## Files + +- `load_aware_sticky.go` — the wrapper strategy that implements + `SubscriptionUserDataProvider`. +- `main.go` — minimal consumer group runner that wires the strategy in via + `Config.Consumer.Group.Rebalance.GroupStrategies`. diff --git a/examples/consumer_load_aware/go.mod b/examples/consumer_load_aware/go.mod new file mode 100644 index 000000000..f19a19de5 --- /dev/null +++ b/examples/consumer_load_aware/go.mod @@ -0,0 +1,25 @@ +module github.com/IBM/sarama/examples/consumer_load_aware + +go 1.25.0 + +require github.com/IBM/sarama v1.46.3 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.18.5 // indirect + github.com/pierrec/lz4/v4 v4.1.26 // indirect + github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect + golang.org/x/crypto v0.50.0 // indirect + golang.org/x/net v0.53.0 // indirect + golang.org/x/sys v0.43.0 // indirect +) + +replace github.com/IBM/sarama => ../../ diff --git a/examples/consumer_load_aware/go.sum b/examples/consumer_load_aware/go.sum new file mode 100644 index 000000000..44d750fee --- /dev/null +++ b/examples/consumer_load_aware/go.sum @@ -0,0 +1,86 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE= +github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ= +github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY= +github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg= +github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI= +golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= +golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= +golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/examples/consumer_load_aware/load_aware_sticky.go b/examples/consumer_load_aware/load_aware_sticky.go new file mode 100644 index 000000000..902eca20f --- /dev/null +++ b/examples/consumer_load_aware/load_aware_sticky.go @@ -0,0 +1,56 @@ +package main + +import ( + "encoding/json" + + "github.com/IBM/sarama" +) + +// LoadSample is the per-cycle load observation that a member reports to the +// group leader as part of its JoinGroup subscription metadata. The schema is +// versioned so the leader can reject samples it does not understand. +type LoadSample struct { + Version int `json:"v"` + CPUPercent float64 `json:"cpu"` + InFlight int `json:"in_flight"` + LagMillis int64 `json:"lag_ms"` +} + +// LoadObserver returns a fresh sample of the local member's current load. +// Implementations should be cheap to call: it runs on every JoinGroup cycle. +type LoadObserver func() LoadSample + +// LoadAwareSticky wraps the built-in sticky balance strategy and reports a +// fresh LoadSample to the group leader on every JoinGroup. It implements +// sarama.SubscriptionUserDataProvider; assignment logic is delegated unchanged +// to NewBalanceStrategySticky, which keeps this example focused on the +// per-cycle metadata hook rather than on a custom assignor. +// +// A real load-aware assignor would also implement sarama.BalanceStrategy.Plan +// itself, decode each member's UserData on the leader, and weight the +// assignment by the reported load. +type LoadAwareSticky struct { + sarama.BalanceStrategy + observe LoadObserver +} + +// NewLoadAwareSticky returns a load-aware wrapper around the sticky strategy. +// The observe callback is invoked once per JoinGroup; its return value is +// JSON-serialized into the member's subscription UserData. +func NewLoadAwareSticky(observe LoadObserver) *LoadAwareSticky { + return &LoadAwareSticky{ + BalanceStrategy: sarama.NewBalanceStrategySticky(), + observe: observe, + } +} + +// SubscriptionUserData satisfies sarama.SubscriptionUserDataProvider. The +// topics argument is the member's currently subscribed topic set, supplied by +// sarama immediately before the JoinGroup is sent. +func (s *LoadAwareSticky) SubscriptionUserData(_ []string) ([]byte, error) { + sample := s.observe() + if sample.Version == 0 { + sample.Version = 1 + } + return json.Marshal(sample) +} diff --git a/examples/consumer_load_aware/main.go b/examples/consumer_load_aware/main.go new file mode 100644 index 000000000..c2a382a35 --- /dev/null +++ b/examples/consumer_load_aware/main.go @@ -0,0 +1,145 @@ +package main + +// Worked example: a load-aware sticky consumer that reports a fresh load +// sample (CPU%, in-flight count, lag) to the group leader on every JoinGroup +// cycle via sarama.SubscriptionUserDataProvider. See load_aware_sticky.go for +// the strategy wrapper. + +import ( + "context" + "errors" + "flag" + "log" + "math/rand" + "os" + "os/signal" + "strings" + "sync" + "sync/atomic" + "syscall" + + "github.com/IBM/sarama" +) + +var ( + brokers = "" + version = "" + group = "" + topics = "" + verbose = false +) + +func init() { + flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers, comma separated") + flag.StringVar(&group, "group", "", "Kafka consumer group id") + flag.StringVar(&version, "version", sarama.DefaultVersion.String(), "Kafka cluster version") + flag.StringVar(&topics, "topics", "", "Kafka topics to consume, comma separated") + flag.BoolVar(&verbose, "verbose", false, "Sarama logging") + flag.Parse() + + if brokers == "" || topics == "" || group == "" { + log.Panic("-brokers, -topics, and -group are all required") + } +} + +func main() { + if verbose { + sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) + } + + v, err := sarama.ParseKafkaVersion(version) + if err != nil { + log.Panicf("Error parsing Kafka version: %v", err) + } + + config := sarama.NewConfig() + config.Version = v + config.Consumer.Offsets.Initial = sarama.OffsetOldest + + consumer := &Consumer{ready: make(chan bool)} + + // Plug in the load-aware sticky strategy. The observer is called once per + // JoinGroup cycle to capture a fresh sample of local load. + config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{ + NewLoadAwareSticky(consumer.loadSample), + } + + client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config) + if err != nil { + log.Panicf("Error creating consumer group client: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for { + if err := client.Consume(ctx, strings.Split(topics, ","), consumer); err != nil { + if errors.Is(err, sarama.ErrClosedConsumerGroup) { + return + } + log.Panicf("Error from consumer: %v", err) + } + if ctx.Err() != nil { + return + } + consumer.ready = make(chan bool) + } + }() + + <-consumer.ready + log.Println("load-aware sticky consumer running") + + sigterm := make(chan os.Signal, 1) + signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM) + <-sigterm + + cancel() + wg.Wait() + if err = client.Close(); err != nil { + log.Panicf("Error closing client: %v", err) + } +} + +// Consumer is a minimal ConsumerGroupHandler that also tracks an in-flight +// counter so it can report a meaningful load sample. +type Consumer struct { + ready chan bool + inFlight atomic.Int64 +} + +// loadSample is invoked by LoadAwareSticky once per JoinGroup. In a real +// deployment this would read /proc/stat, runtime.NumGoroutine, consumer lag, +// or whatever metric the leader uses to weight assignment. +func (c *Consumer) loadSample() LoadSample { + return LoadSample{ + Version: 1, + CPUPercent: rand.Float64() * 100, + InFlight: int(c.inFlight.Load()), + } +} + +func (c *Consumer) Setup(sarama.ConsumerGroupSession) error { + close(c.ready) + return nil +} + +func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error { return nil } + +func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for { + select { + case message, ok := <-claim.Messages(): + if !ok { + return nil + } + c.inFlight.Add(1) + log.Printf("Message claimed: topic=%s partition=%d offset=%d", message.Topic, message.Partition, message.Offset) + session.MarkMessage(message, "") + c.inFlight.Add(-1) + case <-session.Context().Done(): + return nil + } + } +} From f7cc882d2b085856e09310623c8c780e32c3323b Mon Sep 17 00:00:00 2001 From: Liz Fong-Jones Date: Wed, 6 May 2026 15:08:09 +1000 Subject: [PATCH 2/3] review: address SubscriptionUserDataBalanceStrategy feedback - Rename SubscriptionUserDataProvider to SubscriptionUserDataBalanceStrategy and embed BalanceStrategy so implementations must also be a BalanceStrategy. - Drop the YAGNI paragraph about hypothetical V2. - Treat nil and empty returned slices identically: a nil error means use whatever was returned (including nil/empty) verbatim. Only a non-nil error falls back to the static UserData. Update doc comment accordingly. - Inline the single-line addProtocol helper in joinGroupRequest. - Restructure subscriptionMetadata around an early return rather than a mutable userData local; tighten the fallback log line. - Use slices.Clone in the test recorder and errors.New for the boom error. - Add a test case for the empty-slice-clears-static behaviour now that nil and empty are handled the same way. Signed-off-by: Liz Fong-Jones --- balance_strategy.go | 35 +++++++++-------- consumer_group.go | 38 +++++++++---------- consumer_group_test.go | 26 +++++++++---- examples/README.md | 2 +- examples/consumer_load_aware/README.md | 4 +- .../consumer_load_aware/load_aware_sticky.go | 4 +- examples/consumer_load_aware/main.go | 2 +- 7 files changed, 58 insertions(+), 53 deletions(-) diff --git a/balance_strategy.go b/balance_strategy.go index 88081756b..f66ce820a 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -57,26 +57,25 @@ type BalanceStrategy interface { AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) } -// SubscriptionUserDataProvider is an optional interface that a BalanceStrategy -// may implement to inject per-cycle metadata into the ConsumerGroupMemberMetadata -// UserData field on each JoinGroup. When a strategy implements this interface, -// Sarama invokes SubscriptionUserData immediately before each JoinGroup, passing -// the member's currently subscribed topics. The returned bytes replace the -// statically configured Consumer.Group.Member.UserData for the metadata of that -// strategy only. +// SubscriptionUserDataBalanceStrategy is an optional extension of +// BalanceStrategy that lets a strategy inject per-cycle metadata into the +// ConsumerGroupMemberMetadata UserData field on each JoinGroup. When a +// strategy implements this interface, Sarama invokes SubscriptionUserData +// immediately before each JoinGroup, passing the member's currently subscribed +// topics, and uses the returned bytes as the UserData for that strategy's +// subscription metadata in place of the statically configured +// Consumer.Group.Member.UserData. // -// Returning a nil byte slice or a non-nil error causes Sarama to fall back to -// the statically configured UserData; an error is logged but does not fail the -// JoinGroup. +// On a non-nil error Sarama logs the error and falls back to the statically +// configured UserData; the JoinGroup is not failed. On a nil error the +// returned slice is used verbatim, including empty and nil slices. // -// This mirrors Java's ConsumerPartitionAssignor.subscriptionUserData and enables -// load-aware assignors that report fresh observations (e.g., CPU, lag, latency) -// to the group leader on every rebalance cycle. -// -// The interface is treated as V1; future revisions should be introduced as -// separate interfaces (e.g., SubscriptionUserDataProviderV2) to preserve -// backward compatibility with existing implementations. -type SubscriptionUserDataProvider interface { +// This mirrors Java's ConsumerPartitionAssignor.subscriptionUserData and +// enables load-aware assignors that report fresh observations (e.g., CPU, +// lag, latency) to the group leader on every rebalance cycle. +type SubscriptionUserDataBalanceStrategy interface { + BalanceStrategy + SubscriptionUserData(topics []string) ([]byte, error) } diff --git a/consumer_group.go b/consumer_group.go index 307c25d2b..2c7c160b9 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -477,17 +477,13 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) ( } } - addProtocol := func(strategy BalanceStrategy) error { - return req.AddGroupProtocolMetadata(strategy.Name(), c.subscriptionMetadata(strategy, topics)) - } - if strategy := c.config.Consumer.Group.Rebalance.Strategy; strategy != nil { - if err := addProtocol(strategy); err != nil { + if err := req.AddGroupProtocolMetadata(strategy.Name(), c.subscriptionMetadata(strategy, topics)); err != nil { return nil, err } } else { for _, strategy := range c.config.Consumer.Group.Rebalance.GroupStrategies { - if err := addProtocol(strategy); err != nil { + if err := req.AddGroupProtocolMetadata(strategy.Name(), c.subscriptionMetadata(strategy, topics)); err != nil { return nil, err } } @@ -498,26 +494,26 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) ( // subscriptionMetadata builds the ConsumerGroupMemberMetadata for a single // strategy in a JoinGroup request. If the strategy implements -// SubscriptionUserDataProvider, its SubscriptionUserData hook is invoked to -// obtain per-cycle UserData; otherwise (or on error/nil result) the statically -// configured Consumer.Group.Member.UserData is used. +// SubscriptionUserDataBalanceStrategy, its SubscriptionUserData hook is invoked +// to obtain per-cycle UserData; on error the statically configured +// Consumer.Group.Member.UserData is used and the error is logged. func (c *consumerGroup) subscriptionMetadata(strategy BalanceStrategy, topics []string) *ConsumerGroupMemberMetadata { - userData := c.userData - if p, ok := strategy.(SubscriptionUserDataProvider); ok { - data, err := p.SubscriptionUserData(topics) - switch { - case err != nil: - Logger.Printf( - "consumergroup/%s strategy %q SubscriptionUserData failed: %v; falling back to static UserData\n", - c.groupID, strategy.Name(), err, - ) - case data != nil: - userData = data + if p, ok := strategy.(SubscriptionUserDataBalanceStrategy); ok { + userData, err := p.SubscriptionUserData(topics) + if err == nil { + return &ConsumerGroupMemberMetadata{ + Topics: topics, + UserData: userData, + } } + Logger.Printf( + "falling back to static user data for strategy %q consumergroup/%s: %v\n", + strategy.Name(), c.groupID, err, + ) } return &ConsumerGroupMemberMetadata{ Topics: topics, - UserData: userData, + UserData: c.userData, } } diff --git a/consumer_group_test.go b/consumer_group_test.go index 1fdf66bf0..863933cd3 100644 --- a/consumer_group_test.go +++ b/consumer_group_test.go @@ -5,7 +5,7 @@ package sarama import ( "context" "errors" - "fmt" + "slices" "sync" "testing" "time" @@ -255,9 +255,9 @@ func TestConsumerShouldNotRetrySessionIfContextCancelled(t *testing.T) { } // strategyWithSubscriptionUserData wraps a BalanceStrategy and adds a -// SubscriptionUserDataProvider implementation that returns the configured -// data/error. It also records the topics it was invoked with so tests can -// assert per-cycle invocation. +// SubscriptionUserDataBalanceStrategy implementation that returns the +// configured data/error. It also records the topics it was invoked with so +// tests can assert per-cycle invocation. type strategyWithSubscriptionUserData struct { BalanceStrategy data []byte @@ -266,7 +266,7 @@ type strategyWithSubscriptionUserData struct { } func (s *strategyWithSubscriptionUserData) SubscriptionUserData(topics []string) ([]byte, error) { - s.called = append(s.called, append([]string(nil), topics...)) + s.called = append(s.called, slices.Clone(topics)) return s.data, s.err } @@ -292,14 +292,24 @@ func TestSubscriptionMetadata(t *testing.T) { assert.Equal(t, [][]string{topics}, strategy.called) }) - t.Run("provider returning nil falls back to static user data", func(t *testing.T) { + t.Run("provider returning nil clears static user data", func(t *testing.T) { c := &consumerGroup{userData: staticUserData} strategy := &strategyWithSubscriptionUserData{ BalanceStrategy: NewBalanceStrategyRange(), data: nil, } meta := c.subscriptionMetadata(strategy, topics) - assert.Equal(t, staticUserData, meta.UserData) + assert.Nil(t, meta.UserData) + }) + + t.Run("provider returning empty slice clears static user data", func(t *testing.T) { + c := &consumerGroup{userData: staticUserData} + strategy := &strategyWithSubscriptionUserData{ + BalanceStrategy: NewBalanceStrategyRange(), + data: []byte{}, + } + meta := c.subscriptionMetadata(strategy, topics) + assert.Equal(t, []byte{}, meta.UserData) }) t.Run("provider returning error falls back to static user data", func(t *testing.T) { @@ -307,7 +317,7 @@ func TestSubscriptionMetadata(t *testing.T) { strategy := &strategyWithSubscriptionUserData{ BalanceStrategy: NewBalanceStrategyRange(), data: []byte("ignored"), - err: fmt.Errorf("boom"), + err: errors.New("boom"), } meta := c.subscriptionMetadata(strategy, topics) assert.Equal(t, staticUserData, meta.UserData) diff --git a/examples/README.md b/examples/README.md index 2e743f1bc..2240cc634 100644 --- a/examples/README.md +++ b/examples/README.md @@ -20,4 +20,4 @@ Basic example to use a producer interceptor that produces [OpenTelemetry](https: #### Load-aware sticky consumer -[consumer_load_aware](./consumer_load_aware) demonstrates the `SubscriptionUserDataProvider` interface: a `LoadAwareSticky` strategy wraps the built-in sticky assignor and injects a fresh load sample (CPU%, in-flight count) into each JoinGroup's subscription metadata. +[consumer_load_aware](./consumer_load_aware) demonstrates the `SubscriptionUserDataBalanceStrategy` interface: a `LoadAwareSticky` strategy wraps the built-in sticky assignor and injects a fresh load sample (CPU%, in-flight count) into each JoinGroup's subscription metadata. diff --git a/examples/consumer_load_aware/README.md b/examples/consumer_load_aware/README.md index eafc80c2d..958246348 100644 --- a/examples/consumer_load_aware/README.md +++ b/examples/consumer_load_aware/README.md @@ -1,6 +1,6 @@ # Load-aware sticky consumer example -This example demonstrates the `sarama.SubscriptionUserDataProvider` interface +This example demonstrates the `sarama.SubscriptionUserDataBalanceStrategy` interface (see issue [#3505](https://github.com/IBM/sarama/issues/3505)). A `LoadAwareSticky` strategy wraps the built-in sticky assignor and injects a fresh `LoadSample` (CPU percent, in-flight count, lag) into the member's @@ -21,6 +21,6 @@ $ go run . -brokers="127.0.0.1:9092" -topics="sarama" -group="example" ## Files - `load_aware_sticky.go` — the wrapper strategy that implements - `SubscriptionUserDataProvider`. + `SubscriptionUserDataBalanceStrategy`. - `main.go` — minimal consumer group runner that wires the strategy in via `Config.Consumer.Group.Rebalance.GroupStrategies`. diff --git a/examples/consumer_load_aware/load_aware_sticky.go b/examples/consumer_load_aware/load_aware_sticky.go index 902eca20f..f452f5373 100644 --- a/examples/consumer_load_aware/load_aware_sticky.go +++ b/examples/consumer_load_aware/load_aware_sticky.go @@ -22,7 +22,7 @@ type LoadObserver func() LoadSample // LoadAwareSticky wraps the built-in sticky balance strategy and reports a // fresh LoadSample to the group leader on every JoinGroup. It implements -// sarama.SubscriptionUserDataProvider; assignment logic is delegated unchanged +// sarama.SubscriptionUserDataBalanceStrategy; assignment logic is delegated unchanged // to NewBalanceStrategySticky, which keeps this example focused on the // per-cycle metadata hook rather than on a custom assignor. // @@ -44,7 +44,7 @@ func NewLoadAwareSticky(observe LoadObserver) *LoadAwareSticky { } } -// SubscriptionUserData satisfies sarama.SubscriptionUserDataProvider. The +// SubscriptionUserData satisfies sarama.SubscriptionUserDataBalanceStrategy. The // topics argument is the member's currently subscribed topic set, supplied by // sarama immediately before the JoinGroup is sent. func (s *LoadAwareSticky) SubscriptionUserData(_ []string) ([]byte, error) { diff --git a/examples/consumer_load_aware/main.go b/examples/consumer_load_aware/main.go index c2a382a35..ea02d5100 100644 --- a/examples/consumer_load_aware/main.go +++ b/examples/consumer_load_aware/main.go @@ -2,7 +2,7 @@ package main // Worked example: a load-aware sticky consumer that reports a fresh load // sample (CPU%, in-flight count, lag) to the group leader on every JoinGroup -// cycle via sarama.SubscriptionUserDataProvider. See load_aware_sticky.go for +// cycle via sarama.SubscriptionUserDataBalanceStrategy. See load_aware_sticky.go for // the strategy wrapper. import ( From b80770ab2df53a0cdf534ef60c27acaeab22eca1 Mon Sep 17 00:00:00 2001 From: Liz Fong-Jones Date: Tue, 12 May 2026 09:35:18 +1000 Subject: [PATCH 3/3] review: clone topics defensively and align fallback log line Per dnwe's review on #3506: - Pass slices.Clone(topics) to the SubscriptionUserData hook so a user-provided provider cannot mutate the slice we later attach to the JoinGroup request. - Rewrite the fallback log line to follow the codebase's "consumergroup/%s: ..." prefix convention. Signed-off-by: Liz Fong-Jones --- consumer_group.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/consumer_group.go b/consumer_group.go index 2c7c160b9..ed1c0d352 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "slices" "sort" "sync" "time" @@ -499,7 +500,9 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) ( // Consumer.Group.Member.UserData is used and the error is logged. func (c *consumerGroup) subscriptionMetadata(strategy BalanceStrategy, topics []string) *ConsumerGroupMemberMetadata { if p, ok := strategy.(SubscriptionUserDataBalanceStrategy); ok { - userData, err := p.SubscriptionUserData(topics) + // Hand the provider a throwaway copy so it cannot mutate the slice + // we later attach to the JoinGroup request. + userData, err := p.SubscriptionUserData(slices.Clone(topics)) if err == nil { return &ConsumerGroupMemberMetadata{ Topics: topics, @@ -507,8 +510,8 @@ func (c *consumerGroup) subscriptionMetadata(strategy BalanceStrategy, topics [] } } Logger.Printf( - "falling back to static user data for strategy %q consumergroup/%s: %v\n", - strategy.Name(), c.groupID, err, + "consumergroup/%s: falling back to static user data for strategy %q due to %v\n", + c.groupID, strategy.Name(), err, ) } return &ConsumerGroupMemberMetadata{