Skip to content

Commit

Permalink
Merge pull request #369 from maxekman/fix-missing-event-in-errors
Browse files Browse the repository at this point in the history
Fix / Add missing event in Redis event bus error
  • Loading branch information
maxekman committed Dec 3, 2021
2 parents f84d75b + 973b5ec commit fb59d5d
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions eventbus/redis/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,10 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, groupName strin

// Ignore non-matching events.
if !m.Match(event) {
_, err := b.client.XAck(ctx, b.streamName, groupName, msg.ID).Result()
if err != nil {
err = fmt.Errorf("could not ack event: %w", err)
if _, err := b.client.XAck(ctx, b.streamName, groupName, msg.ID).Result(); err != nil {
err = fmt.Errorf("could not ack non-matching event: %w", err)
select {
case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}:
case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
log.Printf("eventhorizon: missed error in Redis event bus: %s", err)
}
Expand All @@ -297,9 +296,9 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, groupName strin

_, err = b.client.XAck(ctx, b.streamName, groupName, msg.ID).Result()
if err != nil {
err = fmt.Errorf("could not ack event: %w", err)
err = fmt.Errorf("could not ack handled event: %w", err)
select {
case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}:
case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
log.Printf("eventhorizon: missed error in Redis event bus: %s", err)
}
Expand Down

0 comments on commit fb59d5d

Please sign in to comment.