Skip to content

Commit 6485271

Browse files
committed
refactor(eventbus): improve channel and pipe management logic
- Replace atomic counter with simple increment for CowMap length calculation - Change channel stopCh type from chan any to chan struct{} for proper signaling - Store reflect.Value of topic in channel struct to avoid repeated conversions - Simplify transfer method by removing redundant topic parameter - Use LoadOrStore to prevent concurrent channel creation and properly close duplicates - Modify Pipe handlers map to use handler pointer as key via reflect.ValueOf().Pointer() - Replace sending struct{} on stopCh with channel close for graceful shutdown - Correct PublishSync to call synchronous publish method in singleton wrapper
1 parent 2aa8fbd commit 6485271

File tree

5 files changed

+45
-25
lines changed

5 files changed

+45
-25
lines changed

.claude/settings.local.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"permissions": {
3+
"allow": [
4+
"Bash(go test ./... -count=1)"
5+
]
6+
}
7+
}

cowmap.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package eventbus
22

33
import (
44
"sync"
5-
"sync/atomic"
65
)
76

87
// CowMap is a wrapper of Copy-On-Write map
@@ -26,7 +25,7 @@ func NewCowMap() *CowMap {
2625
func (c *CowMap) Len() uint32 {
2726
var size uint32
2827
c.Range(func(k, v any) bool {
29-
atomic.AddUint32(&size, 1)
28+
size++
3029
return true
3130
})
3231
return size

eventbus.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@ type channel struct {
1010
sync.RWMutex
1111
bufferSize int
1212
topic string
13+
topicValue reflect.Value
1314
channel chan any
1415
handlers *CowMap
1516
closed bool
16-
stopCh chan any
17+
stopCh chan struct{}
1718
}
1819

1920
// newChannel creates a new channel with a specified topic and buffer size.
@@ -28,20 +29,20 @@ func newChannel(topic string, bufferSize int) *channel {
2829
}
2930
c := &channel{
3031
topic: topic,
32+
topicValue: reflect.ValueOf(topic),
3133
bufferSize: bufferSize,
3234
channel: ch,
3335
handlers: NewCowMap(),
34-
stopCh: make(chan any),
36+
stopCh: make(chan struct{}),
3537
}
3638
go c.loop()
3739
return c
3840
}
3941

4042
// transfer calls all the handlers in the channel with the given payload.
4143
// It iterates over the handlers in the handlers map to call them with the payload.
42-
func (c *channel) transfer(topic string, payload any) {
44+
func (c *channel) transfer(payload any) {
4345
var payloadValue reflect.Value
44-
topicValue := reflect.ValueOf(c.topic)
4546

4647
c.handlers.Range(func(key any, fn any) bool {
4748
handler := fn.(*reflect.Value)
@@ -55,7 +56,7 @@ func (c *channel) transfer(topic string, payload any) {
5556
} else {
5657
payloadValue = reflect.ValueOf(payload)
5758
}
58-
(*handler).Call([]reflect.Value{topicValue, payloadValue})
59+
(*handler).Call([]reflect.Value{c.topicValue, payloadValue})
5960
return true
6061
})
6162
}
@@ -67,7 +68,7 @@ func (c *channel) loop() {
6768
for {
6869
select {
6970
case payload := <-c.channel:
70-
c.transfer(c.topic, payload)
71+
c.transfer(payload)
7172
case <-c.stopCh:
7273
return
7374
}
@@ -95,7 +96,7 @@ func (c *channel) publishSync(payload any) error {
9596
if c.closed {
9697
return ErrChannelClosed
9798
}
98-
c.transfer(c.topic, payload)
99+
c.transfer(payload)
99100
return nil
100101
}
101102

@@ -132,7 +133,7 @@ func (c *channel) close() {
132133
return
133134
}
134135
c.closed = true
135-
c.stopCh <- struct{}{}
136+
close(c.stopCh)
136137
c.handlers.Clear()
137138
close(c.channel)
138139
}
@@ -193,8 +194,12 @@ func (e *EventBus) Subscribe(topic string, handler any) error {
193194

194195
ch, ok := e.channels.Load(topic)
195196
if !ok {
196-
ch = newChannel(topic, e.bufferSize)
197-
e.channels.Store(topic, ch)
197+
newCh := newChannel(topic, e.bufferSize)
198+
ch, loaded := e.channels.LoadOrStore(topic, newCh)
199+
if loaded {
200+
newCh.close()
201+
}
202+
return ch.(*channel).subscribe(handler)
198203
}
199204
return ch.(*channel).subscribe(handler)
200205
}
@@ -205,10 +210,13 @@ func (e *EventBus) Subscribe(topic string, handler any) error {
205210
// The type of the payload must correspond to the second parameter of the handler in `Subscribe()`.
206211
func (e *EventBus) Publish(topic string, payload any) error {
207212
ch, ok := e.channels.Load(topic)
208-
209213
if !ok {
210-
ch = newChannel(topic, e.bufferSize)
211-
e.channels.Store(topic, ch)
214+
newCh := newChannel(topic, e.bufferSize)
215+
var loaded bool
216+
ch, loaded = e.channels.LoadOrStore(topic, newCh)
217+
if loaded {
218+
newCh.close()
219+
}
212220
}
213221

214222
return ch.(*channel).publish(payload)
@@ -219,10 +227,13 @@ func (e *EventBus) Publish(topic string, payload any) error {
219227
// It does not use channels and instead directly calls the handler function.
220228
func (e *EventBus) PublishSync(topic string, payload any) error {
221229
ch, ok := e.channels.Load(topic)
222-
223230
if !ok {
224-
ch = newChannel(topic, e.bufferSize)
225-
e.channels.Store(topic, ch)
231+
newCh := newChannel(topic, e.bufferSize)
232+
var loaded bool
233+
ch, loaded = e.channels.LoadOrStore(topic, newCh)
234+
if loaded {
235+
newCh.close()
236+
}
226237
}
227238

228239
return ch.(*channel).publishSync(payload)

pipe.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package eventbus
22

33
import (
4+
"reflect"
45
"sync"
56
)
67

@@ -16,15 +17,15 @@ type Pipe[T any] struct {
1617
channel chan T
1718
handlers *CowMap
1819
closed bool
19-
stopCh chan any
20+
stopCh chan struct{}
2021
}
2122

2223
// NewPipe create a unbuffered pipe
2324
func NewPipe[T any]() *Pipe[T] {
2425
p := &Pipe[T]{
2526
bufferSize: -1,
2627
channel: make(chan T),
27-
stopCh: make(chan any),
28+
stopCh: make(chan struct{}),
2829
handlers: NewCowMap(),
2930
}
3031

@@ -42,7 +43,7 @@ func NewBufferedPipe[T any](bufferSize int) *Pipe[T] {
4243
p := &Pipe[T]{
4344
bufferSize: bufferSize,
4445
channel: make(chan T, bufferSize),
45-
stopCh: make(chan any),
46+
stopCh: make(chan struct{}),
4647
handlers: NewCowMap(),
4748
}
4849

@@ -72,7 +73,8 @@ func (p *Pipe[T]) Subscribe(handler Handler[T]) error {
7273
if p.closed {
7374
return ErrChannelClosed
7475
}
75-
p.handlers.Store(&handler, handler)
76+
key := reflect.ValueOf(handler).Pointer()
77+
p.handlers.Store(key, handler)
7678
return nil
7779
}
7880

@@ -83,7 +85,8 @@ func (p *Pipe[T]) Unsubscribe(handler Handler[T]) error {
8385
if p.closed {
8486
return ErrChannelClosed
8587
}
86-
p.handlers.Delete(&handler)
88+
key := reflect.ValueOf(handler).Pointer()
89+
p.handlers.Delete(key)
8790
return nil
8891
}
8992

@@ -122,6 +125,6 @@ func (p *Pipe[T]) Close() {
122125
return
123126
}
124127
p.closed = true
125-
p.stopCh <- struct{}{}
128+
close(p.stopCh)
126129
close(p.channel)
127130
}

singleton.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func Publish(topic string, payload any) error {
4040
// PublishSync is a synchronous version of Publish that triggers the handlers defined for a topic with the given payload.
4141
// The type of the payload must correspond to the second parameter of the handler in `Subscribe()`.
4242
func PublishSync(topic string, payload any) error {
43-
return singleton.Publish(topic, payload)
43+
return singleton.PublishSync(topic, payload)
4444
}
4545

4646
// Close closes the singleton instance of EventBus.

0 commit comments

Comments
 (0)