|
1 | | -# go-slim-event-bus |
| 1 | +# go-slim-event-bus |
| 2 | + |
| 3 | +A strongly opinionated Redis streams abstraction mainly designed for simple inter-service communication. The purpose of this module is for it to run alongside a service's HTTP server within the same binary. |
| 4 | + |
| 5 | +## Features |
| 6 | +- Concurrent Message Processing. |
| 7 | +- Pending message housekeeping. |
| 8 | + - In the instance listen, the program will process a certain number of pending messages before taking new incoming messages. |
| 9 | +- Graceful Shutdown. |
| 10 | +- Timeout managed handlers to prevent processes from running indefinitely . |
| 11 | + |
| 12 | +# Installation |
| 13 | +In terminal , with your Go project as the current directory paste the following : |
| 14 | +``` bash |
| 15 | +go get github.com/muki119/go-slim-event-bus |
| 16 | +``` |
| 17 | + |
| 18 | +# Usage |
| 19 | +## Create Event Bus instance from config. |
| 20 | +``` Go |
| 21 | +ebConfig := &seb.EventBusConfig{ |
| 22 | + Connection: conn, |
| 23 | + ConsumerName: "ConsumerFizz", |
| 24 | + ConsumerGroup: "FizzGroup", |
| 25 | + MaxCount: 100, |
| 26 | + Timeout: 3 * time.Second, |
| 27 | + MaxConcurrent: int64(runtime.NumCPU() / 10), |
| 28 | +} |
| 29 | +eventBus := ebConfig.NewFromConfig() |
| 30 | +``` |
| 31 | + |
| 32 | +## Create Event Bus instance from the constructor. |
| 33 | +```Go |
| 34 | +eventBus := seb.NewStreamsEventBus("ConsumerFizz", "FizzGroup", conn, 100, 3*time.Second, int64(runtime.NumCPU()/10)) |
| 35 | +``` |
| 36 | + |
| 37 | +## Register a Stream to listen to and a handler for its incoming data. |
| 38 | +Registration of a stream and handler function should be made before. |
| 39 | +```Go |
| 40 | +eventBus.StreamHandler("user.created", HandleUserCreation) |
| 41 | +``` |
| 42 | + |
| 43 | +## To Listen |
| 44 | +The listen method returns a channel that will only return errors and no other value. |
| 45 | +This is to ensure the program is non-blocking |
| 46 | +``` Go |
| 47 | +err := <-eventBus.Listen() |
| 48 | +``` |
| 49 | + |
| 50 | +## To close the instance. |
| 51 | +Waits for all messages acquired before closure to be processed or timeout, then closes the listener and connection. |
| 52 | +``` Go |
| 53 | +err := eventBus.Close() |
| 54 | +``` |
| 55 | + |
| 56 | + |
| 57 | +## Configuration options |
| 58 | +|Field|Description| |
| 59 | +|-|-| |
| 60 | +|Connection|Pointer to the Redis connection instance.| |
| 61 | +|ConsumerName|Name of the consumer instance. Used by the client to identify itself within the consumer groups.| |
| 62 | +|ConsumerGroup|Name of consumer group to be attached to for each stream.| |
| 63 | +|MaxCount|Maximum number of messages in each stream every read.| |
| 64 | +|Timeout|Max amount of time for handlers to process a message.| |
| 65 | +|MaxConcurrent|Maximum amount of messages that can be concurrently handled.| |
| 66 | + |
| 67 | +## Examples |
| 68 | +### From Config |
| 69 | +``` Go |
| 70 | +ebConfig := &seb.EventBusConfig{ |
| 71 | + Connection: conn, |
| 72 | + ConsumerName: "ConsumerFizz", |
| 73 | + ConsumerGroup: "FizzGroup", |
| 74 | + MaxCount: 100, |
| 75 | + Timeout: 3 * time.Second, |
| 76 | + MaxConcurrent: int64(runtime.NumCPU() / 10), |
| 77 | +} |
| 78 | +eventBus := ebConfig.NewFromConfig() |
| 79 | + |
| 80 | +shutdownChan := make(chan struct{}, 1) |
| 81 | +go func() { |
| 82 | + exitSignal := make(chan os.Signal) |
| 83 | + signal.Notify(exitSignal, syscall.SIGINT, syscall.SIGTERM) |
| 84 | + <-exitSignal |
| 85 | + if err := eventBus.Close(); err != nil { |
| 86 | + fmt.Printf("Error occurred while closing event bus: %v", err) |
| 87 | + } |
| 88 | + close(shutdownChan) |
| 89 | +}() |
| 90 | + |
| 91 | +eventBus.StreamHandler("user.created", HandleUserCreation) |
| 92 | +eventBus.StreamHandler("user.deleted", HandleUserDeletion) |
| 93 | + |
| 94 | +if err := <-eventBus.Listen(); err != nil { |
| 95 | + fmt.Printf("Error occurred: %v", err) |
| 96 | + if err := eventBus.Close(); err != nil { |
| 97 | + fmt.Printf("Error occurred while closing event bus: %v", err) |
| 98 | + } |
| 99 | + close(shutdownChan) |
| 100 | +} |
| 101 | +<-shutdownChan |
| 102 | +``` |
| 103 | + |
| 104 | +### From Constructor |
| 105 | +```Go |
| 106 | +eventBus := seb.NewStreamsEventBus("ConsumerFizz", "FizzGroup", conn, 100, 3*time.Second, int64(runtime.NumCPU()/10)) |
| 107 | + |
| 108 | +shutdownChan := make(chan struct{}, 1) |
| 109 | +go func() { |
| 110 | + exitSignal := make(chan os.Signal) |
| 111 | + signal.Notify(exitSignal, syscall.SIGINT, syscall.SIGTERM) |
| 112 | + <-exitSignal |
| 113 | + if err := eventBus.Close(); err != nil { |
| 114 | + fmt.Printf("Error occurred while closing event bus: %v", err) |
| 115 | + } |
| 116 | + close(shutdownChan) |
| 117 | +}() |
| 118 | + |
| 119 | +eventBus.StreamHandler("user.created", HandleUserCreation) |
| 120 | +eventBus.StreamHandler("user.deleted", HandleUserDeletion) |
| 121 | + |
| 122 | +if err := <-eventBus.Listen(); err != nil { |
| 123 | + fmt.Printf("Error occurred: %v", err) |
| 124 | + if err := eventBus.Close(); err != nil { |
| 125 | + fmt.Printf("Error occurred while closing event bus: %v", err) |
| 126 | + } |
| 127 | + close(shutdownChan) |
| 128 | +} |
| 129 | +<-shutdownChan |
| 130 | +``` |
| 131 | + |
| 132 | +## Recommendations |
| 133 | +- For handler functions , keep them aware of timeout context passed in each handler. |
| 134 | + |
| 135 | + |
| 136 | +## Future Additions/Improvments |
| 137 | + - Dead letter queueing |
| 138 | + |
0 commit comments