Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,29 @@ type BalanceStrategy interface {
AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
}

// SubscriptionUserDataProvider is an optional interface that a BalanceStrategy
// may implement to inject per-cycle metadata into the ConsumerGroupMemberMetadata
// UserData field on each JoinGroup. When a strategy implements this interface,
// Sarama invokes SubscriptionUserData immediately before each JoinGroup, passing
// the member's currently subscribed topics. The returned bytes replace the
// statically configured Consumer.Group.Member.UserData for the metadata of that
// strategy only.
//
// Returning a nil byte slice or a non-nil error causes Sarama to fall back to
// the statically configured UserData; an error is logged but does not fail the
// JoinGroup.
Comment thread
puellanivis marked this conversation as resolved.
Outdated
//
// This mirrors Java's ConsumerPartitionAssignor.subscriptionUserData and enables
// load-aware assignors that report fresh observations (e.g., CPU, lag, latency)
// to the group leader on every rebalance cycle.
//
// The interface is treated as V1; future revisions should be introduced as
// separate interfaces (e.g., SubscriptionUserDataProviderV2) to preserve
// backward compatibility with existing implementations.
Comment thread
puellanivis marked this conversation as resolved.
Outdated
type SubscriptionUserDataProvider interface {
Comment thread
puellanivis marked this conversation as resolved.
Outdated
SubscriptionUserData(topics []string) ([]byte, error)
Comment thread
lizthegrey marked this conversation as resolved.
}

// --------------------------------------------------------------------

// NewBalanceStrategyRange returns a range balance strategy,
Expand Down
40 changes: 32 additions & 8 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,18 +477,17 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
}
}

meta := &ConsumerGroupMemberMetadata{
Topics: topics,
UserData: c.userData,
addProtocol := func(strategy BalanceStrategy) error {
return req.AddGroupProtocolMetadata(strategy.Name(), c.subscriptionMetadata(strategy, topics))
}
Comment thread
puellanivis marked this conversation as resolved.
Outdated
var strategy BalanceStrategy
if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy != nil {
if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {

if strategy := c.config.Consumer.Group.Rebalance.Strategy; strategy != nil {
if err := addProtocol(strategy); err != nil {
return nil, err
}
} else {
for _, strategy = range c.config.Consumer.Group.Rebalance.GroupStrategies {
if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
for _, strategy := range c.config.Consumer.Group.Rebalance.GroupStrategies {
if err := addProtocol(strategy); err != nil {
return nil, err
}
}
Expand All @@ -497,6 +496,31 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
return coordinator.JoinGroup(req)
}

// subscriptionMetadata builds the ConsumerGroupMemberMetadata for a single
// strategy in a JoinGroup request. If the strategy implements
// SubscriptionUserDataProvider, its SubscriptionUserData hook is invoked to
// obtain per-cycle UserData; otherwise (or on error/nil result) the statically
// configured Consumer.Group.Member.UserData is used.
func (c *consumerGroup) subscriptionMetadata(strategy BalanceStrategy, topics []string) *ConsumerGroupMemberMetadata {
userData := c.userData
if p, ok := strategy.(SubscriptionUserDataProvider); ok {
data, err := p.SubscriptionUserData(topics)
switch {
case err != nil:
Logger.Printf(
"consumergroup/%s strategy %q SubscriptionUserData failed: %v; falling back to static UserData\n",
c.groupID, strategy.Name(), err,
)
case data != nil:
userData = data
Comment thread
puellanivis marked this conversation as resolved.
Outdated
}
}
return &ConsumerGroupMemberMetadata{
Topics: topics,
UserData: userData,
}
}

// findStrategy returns the BalanceStrategy with the specified protocolName
// from the slice provided.
func (c *consumerGroup) findStrategy(name string, groupStrategies []BalanceStrategy) (BalanceStrategy, bool) {
Expand Down
75 changes: 75 additions & 0 deletions consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package sarama
import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -252,3 +253,77 @@ func TestConsumerShouldNotRetrySessionIfContextCancelled(t *testing.T) {
_, err = c.retryNewSession(ctx, nil, nil, 1024, true)
assert.Equal(t, context.Canceled, err)
}

// strategyWithSubscriptionUserData wraps a BalanceStrategy and adds a
// SubscriptionUserDataProvider implementation that returns the configured
// data/error. It also records the topics it was invoked with so tests can
// assert per-cycle invocation.
type strategyWithSubscriptionUserData struct {
BalanceStrategy
data []byte
err error
called [][]string
}

func (s *strategyWithSubscriptionUserData) SubscriptionUserData(topics []string) ([]byte, error) {
s.called = append(s.called, append([]string(nil), topics...))
Comment thread
puellanivis marked this conversation as resolved.
Outdated
return s.data, s.err
}

func TestSubscriptionMetadata(t *testing.T) {
staticUserData := []byte("static")
topics := []string{"my-topic"}

t.Run("strategy without provider uses static user data", func(t *testing.T) {
c := &consumerGroup{userData: staticUserData}
meta := c.subscriptionMetadata(NewBalanceStrategyRange(), topics)
assert.Equal(t, topics, meta.Topics)
assert.Equal(t, staticUserData, meta.UserData)
})

t.Run("provider returning bytes overrides static user data", func(t *testing.T) {
c := &consumerGroup{userData: staticUserData}
strategy := &strategyWithSubscriptionUserData{
BalanceStrategy: NewBalanceStrategyRange(),
data: []byte("per-cycle"),
}
meta := c.subscriptionMetadata(strategy, topics)
assert.Equal(t, []byte("per-cycle"), meta.UserData)
assert.Equal(t, [][]string{topics}, strategy.called)
})

t.Run("provider returning nil falls back to static user data", func(t *testing.T) {
c := &consumerGroup{userData: staticUserData}
strategy := &strategyWithSubscriptionUserData{
BalanceStrategy: NewBalanceStrategyRange(),
data: nil,
}
meta := c.subscriptionMetadata(strategy, topics)
assert.Equal(t, staticUserData, meta.UserData)
})
Comment thread
puellanivis marked this conversation as resolved.
Outdated

t.Run("provider returning error falls back to static user data", func(t *testing.T) {
c := &consumerGroup{userData: staticUserData}
strategy := &strategyWithSubscriptionUserData{
BalanceStrategy: NewBalanceStrategyRange(),
data: []byte("ignored"),
err: fmt.Errorf("boom"),
Comment thread
puellanivis marked this conversation as resolved.
Outdated
}
meta := c.subscriptionMetadata(strategy, topics)
assert.Equal(t, staticUserData, meta.UserData)
})

t.Run("each strategy in GroupStrategies receives its own user data", func(t *testing.T) {
c := &consumerGroup{userData: staticUserData}
s1 := &strategyWithSubscriptionUserData{
BalanceStrategy: NewBalanceStrategyRange(),
data: []byte("from-s1"),
}
s2 := &strategyWithSubscriptionUserData{
BalanceStrategy: NewBalanceStrategyRoundRobin(),
data: []byte("from-s2"),
}
assert.Equal(t, []byte("from-s1"), c.subscriptionMetadata(s1, topics).UserData)
assert.Equal(t, []byte("from-s2"), c.subscriptionMetadata(s2, topics).UserData)
})
}
4 changes: 4 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ Basic example to use a producer interceptor that produces [OpenTelemetry](https:
#### Exacly-once transactional paradigm

[exactly_once](./exactly_once) Basic example to use a transactional producer that produce consumed message from some topics within a Kafka transaction. To ensure transactional-id uniqueness it implement some **_ProducerProvider_** that build a producer using current message topic-partition.

#### Load-aware sticky consumer

[consumer_load_aware](./consumer_load_aware) demonstrates the `SubscriptionUserDataProvider` interface: a `LoadAwareSticky` strategy wraps the built-in sticky assignor and injects a fresh load sample (CPU%, in-flight count) into each JoinGroup's subscription metadata.
26 changes: 26 additions & 0 deletions examples/consumer_load_aware/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Load-aware sticky consumer example

This example demonstrates the `sarama.SubscriptionUserDataProvider` interface
(see issue [#3505](https://github.com/IBM/sarama/issues/3505)). A
`LoadAwareSticky` strategy wraps the built-in sticky assignor and injects a
fresh `LoadSample` (CPU percent, in-flight count, lag) into the member's
JoinGroup subscription metadata on every rebalance cycle.

`Plan` and `AssignmentData` are delegated unchanged to the sticky strategy;
this example deliberately keeps the assignor logic simple to focus on the
per-cycle metadata hook. A production load-aware assignor would also implement
its own `Plan` and decode each member's `UserData` on the leader to weight the
assignment.

## Run

```bash
$ go run . -brokers="127.0.0.1:9092" -topics="sarama" -group="example"
```

## Files

- `load_aware_sticky.go` — the wrapper strategy that implements
`SubscriptionUserDataProvider`.
- `main.go` — minimal consumer group runner that wires the strategy in via
`Config.Consumer.Group.Rebalance.GroupStrategies`.
25 changes: 25 additions & 0 deletions examples/consumer_load_aware/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module github.com/IBM/sarama/examples/consumer_load_aware

go 1.25.0

require github.com/IBM/sarama v1.46.3

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.18.5 // indirect
github.com/pierrec/lz4/v4 v4.1.26 // indirect
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
golang.org/x/crypto v0.50.0 // indirect
golang.org/x/net v0.53.0 // indirect
golang.org/x/sys v0.43.0 // indirect
)

replace github.com/IBM/sarama => ../../
86 changes: 86 additions & 0 deletions examples/consumer_load_aware/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg=
github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo=
github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8=
github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/klauspost/compress v1.18.5 h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE=
github.com/klauspost/compress v1.18.5/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
github.com/pierrec/lz4/v4 v4.1.26 h1:GrpZw1gZttORinvzBdXPUXATeqlJjqUG/D87TKMnhjY=
github.com/pierrec/lz4/v4 v4.1.26/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 h1:bsUq1dX0N8AOIL7EB/X911+m4EHsnWEHeJ0c+3TTBrg=
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI=
golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI=
golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
56 changes: 56 additions & 0 deletions examples/consumer_load_aware/load_aware_sticky.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package main

import (
"encoding/json"

"github.com/IBM/sarama"
)

// LoadSample is the per-cycle load observation that a member reports to the
// group leader as part of its JoinGroup subscription metadata. The schema is
// versioned so the leader can reject samples it does not understand.
type LoadSample struct {
Version int `json:"v"`
CPUPercent float64 `json:"cpu"`
InFlight int `json:"in_flight"`
LagMillis int64 `json:"lag_ms"`
}

// LoadObserver returns a fresh sample of the local member's current load.
// Implementations should be cheap to call: it runs on every JoinGroup cycle.
type LoadObserver func() LoadSample

// LoadAwareSticky wraps the built-in sticky balance strategy and reports a
// fresh LoadSample to the group leader on every JoinGroup. It implements
// sarama.SubscriptionUserDataProvider; assignment logic is delegated unchanged
// to NewBalanceStrategySticky, which keeps this example focused on the
// per-cycle metadata hook rather than on a custom assignor.
//
// A real load-aware assignor would also implement sarama.BalanceStrategy.Plan
// itself, decode each member's UserData on the leader, and weight the
// assignment by the reported load.
type LoadAwareSticky struct {
sarama.BalanceStrategy
observe LoadObserver
}

// NewLoadAwareSticky returns a load-aware wrapper around the sticky strategy.
// The observe callback is invoked once per JoinGroup; its return value is
// JSON-serialized into the member's subscription UserData.
func NewLoadAwareSticky(observe LoadObserver) *LoadAwareSticky {
return &LoadAwareSticky{
BalanceStrategy: sarama.NewBalanceStrategySticky(),
observe: observe,
}
}

// SubscriptionUserData satisfies sarama.SubscriptionUserDataProvider. The
// topics argument is the member's currently subscribed topic set, supplied by
// sarama immediately before the JoinGroup is sent.
func (s *LoadAwareSticky) SubscriptionUserData(_ []string) ([]byte, error) {
sample := s.observe()
if sample.Version == 0 {
sample.Version = 1
}
return json.Marshal(sample)
}
Loading
Loading