changroup
is a Go library to create a group of channels (publish/subscribe pattern). A value is sent to each channel in the group. Channels can be acquired/released dynamically.
changroup.Group
allows to acquire/release channel and to send a value to all acquired channels.
changroup.AckableGroup
does the same, but sends changroup.Ackable
value. It calls original ack function only after all subscribers acked their copy of value. It's useful if you need to know when the message is processed.
The minimal supported go version is 1.18 because the library uses generics.
go get github.com/maratori/changroup
Create all channels before sending values (before publisher starts). In this case all channels are guaranteed to receive all values.
package main
import (
"fmt"
"sync"
"time"
"github.com/maratori/changroup"
)
func main() {
group := changroup.NewGroup[int]()
ch1, _ := group.Acquire()
ch2, _ := group.Acquire()
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
defer group.ReleaseAll() // close all channels and remove from group
for i := 0; i < 100; i++ {
group.Send(i)
time.Sleep(1 * time.Second)
}
}()
go func() {
defer wg.Done()
for i := range ch1 {
fmt.Println("subscriber 1 received", i)
}
fmt.Println("ch1 is closed because group.ReleaseAll() is called")
}()
go func() {
defer wg.Done()
for i := range ch2 {
fmt.Println("subscriber 2 received", i)
}
fmt.Println("ch2 is closed because group.ReleaseAll() is called")
}()
wg.Wait()
}
Create channels dynamically (after publisher started). In this case some values may be dropped because there are no subscribers at the moment.
package main
import (
"fmt"
"sync"
"time"
"github.com/maratori/changroup"
)
func main() {
group := changroup.NewGroup[int]()
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
defer group.ReleaseAll() // close all channels and remove from group
for i := 0; i < 100; i++ {
group.Send(i)
time.Sleep(1 * time.Second)
}
}()
// subscribe, wait for a specific value, then unsubscribe
go func() {
defer wg.Done()
ch, release := group.Acquire()
defer release() // close channel and remove from group
for i := range ch {
fmt.Println("subscriber 1 received", i)
if i == 20 {
// do something
return
}
}
}()
// read all values
go func() {
defer wg.Done()
ch, _ := group.Acquire()
for i := range ch {
fmt.Println("subscriber 2 received", i)
}
fmt.Println("ch is closed because group.ReleaseAll() is called")
}()
wg.Wait()
}
Do something in publisher after ack from all subscribers.
package main
import (
"fmt"
"sync"
"time"
"github.com/maratori/changroup"
)
func main() {
group := changroup.NewAckableGroup[int]()
ch1, _ := group.Acquire()
ch2, _ := group.Acquire()
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
defer wg.Done()
defer group.ReleaseAll() // close all channels and remove from group
for i := 0; i < 100; i++ {
group.Send(changroup.NewAckable(i, func() {
fmt.Println("publisher received acks from all subscribers for i =", i)
}))
time.Sleep(1 * time.Second)
}
}()
go func() {
defer wg.Done()
for a := range ch1 {
fmt.Println("subscriber 1 received", a.Value)
a.Ack() // value is processed
}
fmt.Println("ch1 is closed because group.ReleaseAll() is called")
}()
go func() {
defer wg.Done()
for a := range ch2 {
fmt.Println("subscriber 2 received", a.Value)
a.Ack() // value is processed
}
fmt.Println("ch2 is closed because group.ReleaseAll() is called")
}()
wg.Wait()
}
You are welcome to create an issue or pull request with improvements and fixes. See guide.