fix the memory bug

This commit is contained in:
Kujtim Hoxha 2025-04-21 13:33:51 +02:00
parent 1da298e755
commit e7bb99baab
6 changed files with 128 additions and 56 deletions

View file

@ -5,47 +5,53 @@ import (
"sync"
)
const bufferSize = 1024
const bufferSize = 64
// Broker allows clients to publish events and subscribe to events
type Broker[T any] struct {
subs map[chan Event[T]]struct{} // subscriptions
mu sync.Mutex // sync access to map
done chan struct{} // close when broker is shutting down
subs map[chan Event[T]]struct{}
mu sync.RWMutex
done chan struct{}
subCount int
maxEvents int
}
// NewBroker constructs a pub/sub broker.
func NewBroker[T any]() *Broker[T] {
return NewBrokerWithOptions[T](bufferSize, 1000)
}
func NewBrokerWithOptions[T any](channelBufferSize, maxEvents int) *Broker[T] {
b := &Broker[T]{
subs: make(map[chan Event[T]]struct{}),
done: make(chan struct{}),
subs: make(map[chan Event[T]]struct{}),
done: make(chan struct{}),
subCount: 0,
maxEvents: maxEvents,
}
return b
}
// Shutdown the broker, terminating any subscriptions.
func (b *Broker[T]) Shutdown() {
close(b.done)
select {
case <-b.done: // Already closed
return
default:
close(b.done)
}
b.mu.Lock()
defer b.mu.Unlock()
// Remove each subscriber entry, so Publish() cannot send any further
// messages, and close each subscriber's channel, so the subscriber cannot
// consume any more messages.
for ch := range b.subs {
delete(b.subs, ch)
close(ch)
}
b.subCount = 0
}
// Subscribe subscribes the caller to a stream of events. The returned channel
// is closed when the broker is shutdown.
func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
b.mu.Lock()
defer b.mu.Unlock()
// Check if broker has shutdown and if so return closed channel
select {
case <-b.done:
ch := make(chan Event[T])
@ -54,18 +60,16 @@ func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
default:
}
// Subscribe
sub := make(chan Event[T], bufferSize)
b.subs[sub] = struct{}{}
b.subCount++
// Unsubscribe when context is done.
go func() {
<-ctx.Done()
b.mu.Lock()
defer b.mu.Unlock()
// Check if broker has shutdown and if so do nothing
select {
case <-b.done:
return
@ -74,21 +78,39 @@ func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
delete(b.subs, sub)
close(sub)
b.subCount--
}()
return sub
}
// Publish an event to subscribers.
func (b *Broker[T]) Publish(t EventType, payload T) {
b.mu.Lock()
defer b.mu.Unlock()
func (b *Broker[T]) GetSubscriberCount() int {
b.mu.RLock()
defer b.mu.RUnlock()
return b.subCount
}
func (b *Broker[T]) Publish(t EventType, payload T) {
b.mu.RLock()
select {
case <-b.done:
b.mu.RUnlock()
return
default:
}
subscribers := make([]chan Event[T], 0, len(b.subs))
for sub := range b.subs {
subscribers = append(subscribers, sub)
}
b.mu.RUnlock()
event := Event[T]{Type: t, Payload: payload}
for _, sub := range subscribers {
select {
case sub <- Event[T]{Type: t, Payload: payload}:
case <-b.done:
return
case sub <- event:
default:
}
}
}