Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,28 @@ type BalanceStrategy interface {
AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
}

// SubscriptionUserDataBalanceStrategy is an optional extension of
// BalanceStrategy that lets a strategy inject per-cycle metadata into the
// ConsumerGroupMemberMetadata UserData field on each JoinGroup. When a
// strategy implements this interface, Sarama invokes SubscriptionUserData
// immediately before each JoinGroup, passing the member's currently subscribed
// topics, and uses the returned bytes as the UserData for that strategy's
// subscription metadata in place of the statically configured
// Consumer.Group.Member.UserData.
//
// On a non-nil error Sarama logs the error and falls back to the statically
// configured UserData; the JoinGroup is not failed. On a nil error the
// returned slice is used verbatim, including empty and nil slices.
//
// This mirrors Java's ConsumerPartitionAssignor.subscriptionUserData and
// enables load-aware assignors that report fresh observations (e.g., CPU,
// lag, latency) to the group leader on every rebalance cycle.
type SubscriptionUserDataBalanceStrategy interface {
BalanceStrategy

SubscriptionUserData(topics []string) ([]byte, error)
Comment thread
lizthegrey marked this conversation as resolved.
}

// --------------------------------------------------------------------

// NewBalanceStrategyRange returns a range balance strategy,
Expand Down
38 changes: 29 additions & 9 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,18 +477,13 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
}
}

meta := &ConsumerGroupMemberMetadata{
Topics: topics,
UserData: c.userData,
}
var strategy BalanceStrategy
if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy != nil {
if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
if strategy := c.config.Consumer.Group.Rebalance.Strategy; strategy != nil {
if err := req.AddGroupProtocolMetadata(strategy.Name(), c.subscriptionMetadata(strategy, topics)); err != nil {
return nil, err
}
} else {
for _, strategy = range c.config.Consumer.Group.Rebalance.GroupStrategies {
if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
for _, strategy := range c.config.Consumer.Group.Rebalance.GroupStrategies {
if err := req.AddGroupProtocolMetadata(strategy.Name(), c.subscriptionMetadata(strategy, topics)); err != nil {
return nil, err
}
}
Expand All @@ -497,6 +492,31 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
return coordinator.JoinGroup(req)
}

// subscriptionMetadata builds the ConsumerGroupMemberMetadata for a single
// strategy in a JoinGroup request. If the strategy implements
// SubscriptionUserDataBalanceStrategy, its SubscriptionUserData hook is invoked
// to obtain per-cycle UserData; on error the statically configured
// Consumer.Group.Member.UserData is used and the error is logged.
func (c *consumerGroup) subscriptionMetadata(strategy BalanceStrategy, topics []string) *ConsumerGroupMemberMetadata {
if p, ok := strategy.(SubscriptionUserDataBalanceStrategy); ok {
userData, err := p.SubscriptionUserData(topics)
Comment thread
lizthegrey marked this conversation as resolved.
Outdated
if err == nil {
return &ConsumerGroupMemberMetadata{
Topics: topics,
UserData: userData,
}
}
Logger.Printf(
"falling back to static user data for strategy %q consumergroup/%s: %v\n",
strategy.Name(), c.groupID, err,
)
Comment thread
lizthegrey marked this conversation as resolved.
}
return &ConsumerGroupMemberMetadata{
Topics: topics,
UserData: c.userData,
}
}

// findStrategy returns the BalanceStrategy with the specified protocolName
// from the slice provided.
func (c *consumerGroup) findStrategy(name string, groupStrategies []BalanceStrategy) (BalanceStrategy, bool) {
Expand Down
85 changes: 85 additions & 0 deletions consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package sarama
import (
"context"
"errors"
"slices"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -252,3 +253,87 @@ func TestConsumerShouldNotRetrySessionIfContextCancelled(t *testing.T) {
_, err = c.retryNewSession(ctx, nil, nil, 1024, true)
assert.Equal(t, context.Canceled, err)
}

// strategyWithSubscriptionUserData wraps a BalanceStrategy and adds a
// SubscriptionUserDataBalanceStrategy implementation that returns the
// configured data/error. It also records the topics it was invoked with so
// tests can assert per-cycle invocation.
type strategyWithSubscriptionUserData struct {
BalanceStrategy
data []byte
err error
called [][]string
}

func (s *strategyWithSubscriptionUserData) SubscriptionUserData(topics []string) ([]byte, error) {
s.called = append(s.called, slices.Clone(topics))
return s.data, s.err
}

func TestSubscriptionMetadata(t *testing.T) {
staticUserData := []byte("static")
topics := []string{"my-topic"}

t.Run("strategy without provider uses static user data", func(t *testing.T) {
c := &consumerGroup{userData: staticUserData}
meta := c.subscriptionMetadata(NewBalanceStrategyRange(), topics)
assert.Equal(t, topics, meta.Topics)
assert.Equal(t, staticUserData, meta.UserData)
})

t.Run("provider returning bytes overrides static user data", func(t *testing.T) {
c := &consumerGroup{userData: staticUserData}
strategy := &strategyWithSubscriptionUserData{
BalanceStrategy: NewBalanceStrategyRange(),
data: []byte("per-cycle"),
}
meta := c.subscriptionMetadata(strategy, topics)
assert.Equal(t, []byte("per-cycle"), meta.UserData)
assert.Equal(t, [][]string{topics}, strategy.called)
})

t.Run("provider returning nil clears static user data", func(t *testing.T) {
c := &consumerGroup{userData: staticUserData}
strategy := &strategyWithSubscriptionUserData{
BalanceStrategy: NewBalanceStrategyRange(),
data: nil,
}
meta := c.subscriptionMetadata(strategy, topics)
assert.Nil(t, meta.UserData)
})

t.Run("provider returning empty slice clears static user data", func(t *testing.T) {
c := &consumerGroup{userData: staticUserData}
strategy := &strategyWithSubscriptionUserData{
BalanceStrategy: NewBalanceStrategyRange(),
data: []byte{},
}
meta := c.subscriptionMetadata(strategy, topics)
assert.Equal(t, []byte{}, meta.UserData)
})

t.Run("provider returning error falls back to static user data", func(t *testing.T) {
c := &consumerGroup{userData: staticUserData}
strategy := &strategyWithSubscriptionUserData{
BalanceStrategy: NewBalanceStrategyRange(),
data: []byte("ignored"),
err: errors.New("boom"),
}
meta := c.subscriptionMetadata(strategy, topics)
assert.Equal(t, staticUserData, meta.UserData)
})

t.Run("each strategy in GroupStrategies receives its own user data", func(t *testing.T) {
c := &consumerGroup{userData: staticUserData}
s1 := &strategyWithSubscriptionUserData{
BalanceStrategy: NewBalanceStrategyRange(),
data: []byte("from-s1"),
}
s2 := &strategyWithSubscriptionUserData{
BalanceStrategy: NewBalanceStrategyRoundRobin(),
data: []byte("from-s2"),
}
assert.Equal(t, []byte("from-s1"), c.subscriptionMetadata(s1, topics).UserData)
assert.Equal(t, []byte("from-s2"), c.subscriptionMetadata(s2, topics).UserData)
})
}
4 changes: 4 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ Basic example to use a producer interceptor that produces [OpenTelemetry](https:
#### Exacly-once transactional paradigm

[exactly_once](./exactly_once) Basic example to use a transactional producer that produce consumed message from some topics within a Kafka transaction. To ensure transactional-id uniqueness it implement some **_ProducerProvider_** that build a producer using current message topic-partition.

#### Load-aware sticky consumer

[consumer_load_aware](./consumer_load_aware) demonstrates the `SubscriptionUserDataBalanceStrategy` interface: a `LoadAwareSticky` strategy wraps the built-in sticky assignor and injects a fresh load sample (CPU%, in-flight count) into each JoinGroup's subscription metadata.
26 changes: 26 additions & 0 deletions examples/consumer_load_aware/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Load-aware sticky consumer example

This example demonstrates the `sarama.SubscriptionUserDataBalanceStrategy` interface
(see issue [#3505](https://github.com/IBM/sarama/issues/3505)). A
`LoadAwareSticky` strategy wraps the built-in sticky assignor and injects a
fresh `LoadSample` (CPU percent, in-flight count, lag) into the member's
JoinGroup subscription metadata on every rebalance cycle.

`Plan` and `AssignmentData` are delegated unchanged to the sticky strategy;
this example deliberately keeps the assignor logic simple to focus on the
per-cycle metadata hook. A production load-aware assignor would also implement
its own `Plan` and decode each member's `UserData` on the leader to weight the
assignment.

## Run

```bash
$ go run . -brokers="127.0.0.1:9092" -topics="sarama" -group="example"
```

## Files

- `load_aware_sticky.go` — the wrapper strategy that implements
`SubscriptionUserDataBalanceStrategy`.
- `main.go` — minimal consumer group runner that wires the strategy in via
`Config.Consumer.Group.Rebalance.GroupStrategies`.
25 changes: 25 additions & 0 deletions examples/consumer_load_aware/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module github.com/IBM/sarama/examples/consumer_load_aware

go 1.25.0

require github.com/IBM/sarama v1.46.3

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.18.5 // indirect
github.com/pierrec/lz4/v4 v4.1.26 // indirect
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
golang.org/x/crypto v0.50.0 // indirect
golang.org/x/net v0.53.0 // indirect
golang.org/x/sys v0.43.0 // indirect
)

replace github.com/IBM/sarama => ../../
86 changes: 86 additions & 0 deletions examples/consumer_load_aware/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg=
github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo=
github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8=
github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE=
github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY=
github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg=
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI=
golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
56 changes: 56 additions & 0 deletions examples/consumer_load_aware/load_aware_sticky.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package main

import (
"encoding/json"

"github.com/IBM/sarama"
)

// LoadSample is the per-cycle load observation that a member reports to the
// group leader as part of its JoinGroup subscription metadata. The schema is
// versioned so the leader can reject samples it does not understand.
type LoadSample struct {
Version int `json:"v"`
CPUPercent float64 `json:"cpu"`
InFlight int `json:"in_flight"`
LagMillis int64 `json:"lag_ms"`
}

// LoadObserver returns a fresh sample of the local member's current load.
// Implementations should be cheap to call: it runs on every JoinGroup cycle.
type LoadObserver func() LoadSample

// LoadAwareSticky wraps the built-in sticky balance strategy and reports a
// fresh LoadSample to the group leader on every JoinGroup. It implements
// sarama.SubscriptionUserDataBalanceStrategy; assignment logic is delegated unchanged
// to NewBalanceStrategySticky, which keeps this example focused on the
// per-cycle metadata hook rather than on a custom assignor.
//
// A real load-aware assignor would also implement sarama.BalanceStrategy.Plan
// itself, decode each member's UserData on the leader, and weight the
// assignment by the reported load.
type LoadAwareSticky struct {
sarama.BalanceStrategy
observe LoadObserver
}

// NewLoadAwareSticky returns a load-aware wrapper around the sticky strategy.
// The observe callback is invoked once per JoinGroup; its return value is
// JSON-serialized into the member's subscription UserData.
func NewLoadAwareSticky(observe LoadObserver) *LoadAwareSticky {
return &LoadAwareSticky{
BalanceStrategy: sarama.NewBalanceStrategySticky(),
observe: observe,
}
}

// SubscriptionUserData satisfies sarama.SubscriptionUserDataBalanceStrategy. The
// topics argument is the member's currently subscribed topic set, supplied by
// sarama immediately before the JoinGroup is sent.
func (s *LoadAwareSticky) SubscriptionUserData(_ []string) ([]byte, error) {
sample := s.observe()
if sample.Version == 0 {
sample.Version = 1
}
return json.Marshal(sample)
}
Loading
Loading