Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort events before applying them to aggregate #411

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type EventStore interface {
Load(context.Context, uuid.UUID) ([]Event, error)

// LoadFrom loads all events from version for the aggregate id from the store.
// Event store should provide events in version order
LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]Event, error)

// Close closes the EventStore.
Expand Down
61 changes: 61 additions & 0 deletions eventstore/eventsorter/event_sorter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package eventsorter

import (
"context"
eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/uuid"
"sort"
)

// EventSorter is an event store wrapper that warrants events are provided in version order.
// Version order is required for event sourcing to work correctly.
// Use it with an event store that does not warrant version order.
type EventSorter struct {
inner eh.EventStore
}

var _ eh.EventStore = (*EventSorter)(nil)

// NewEventSorter creates a new EventSorter wrapping the provided event store
func NewEventSorter(inner eh.EventStore) *EventSorter {
return &EventSorter{inner: inner}
}

func (e EventSorter) Save(ctx context.Context, events []eh.Event, originalVersion int) error {
return e.inner.Save(ctx, events, originalVersion)
}

func (e EventSorter) Load(ctx context.Context, uuid uuid.UUID) ([]eh.Event, error) {
events, err := e.inner.Load(ctx, uuid)

if err != nil {
return nil, err
}

return e.SortEvents(events), nil
}

func (e EventSorter) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]eh.Event, error) {
events, err := e.inner.LoadFrom(ctx, id, version)

if err != nil {
return nil, err
}

return e.SortEvents(events), nil
}

func (e EventSorter) Close() error {
return e.inner.Close()
}

func (e EventSorter) SortEvents(events []eh.Event) []eh.Event {
byVersion := func(i, j int) bool {
return events[i].Version() < events[j].Version()
}

// It is ok to sort in place, events slice is already the inner store response
sort.Slice(events, byVersion)

return events
}
102 changes: 102 additions & 0 deletions eventstore/eventsorter/event_sorter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package eventsorter

import (
"context"
eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"testing"
"time"
)

type EventSorterTestSuite struct {
suite.Suite

innerStore *EventStoreMock
eventSorter *EventSorter

unsortedEventList []eh.Event
}

// In order for 'go test' to run this suite, we need to create
// a normal test function and pass our suite to suite.Run
func TestEventSorterTestSuite(t *testing.T) {
suite.Run(t, &EventSorterTestSuite{})
}

// before each test
func (s *EventSorterTestSuite) SetupTest() {
s.innerStore = &EventStoreMock{}

s.eventSorter = NewEventSorter(s.innerStore)

s.unsortedEventList = []eh.Event{
eh.NewEvent("test", nil, time.Now(), eh.ForAggregate("test", uuid.New(), 3)),
eh.NewEvent("test", nil, time.Now(), eh.ForAggregate("test", uuid.New(), 2)),
eh.NewEvent("test", nil, time.Now(), eh.ForAggregate("test", uuid.New(), 1)),
}
}

func (s *EventSorterTestSuite) Test_can_sort_empty_event_list_on_Load() {
// Given a event store with no events
s.innerStore.On("Load", mock.Anything, mock.Anything).Return([]eh.Event{}, nil)

// When we load the events
events, err := s.eventSorter.Load(context.TODO(), uuid.New())

// Then no error is returned
s.NoError(err)

// And empty event list is returned
s.Len(events, 0)
}

func (s *EventSorterTestSuite) Test_can_sort_empty_event_list_on_LoafFrom() {
// Given a event store with no events
s.innerStore.On("LoadFrom", mock.Anything, mock.Anything, mock.Anything).Return([]eh.Event{}, nil)

// When we load the events
events, err := s.eventSorter.LoadFrom(context.TODO(), uuid.New(), 8)

// Then no error is returned
s.NoError(err)

// And empty event list is returned
s.Len(events, 0)
}

func (s *EventSorterTestSuite) Test_can_sort_event_list_on_Load() {
// Given a event store with no events
s.innerStore.On("Load", mock.Anything, mock.Anything).Return(s.unsortedEventList, nil)

// When we load the events
events, err := s.eventSorter.Load(context.TODO(), uuid.New())

// Then no error is returned
s.NoError(err)

// And the events are returned in version order
s.Len(events, 3)

s.Equal(1, events[0].Version())
s.Equal(2, events[1].Version())
s.Equal(3, events[2].Version())
}

func (s *EventSorterTestSuite) Test_can_sort_event_list_on_LoadFrom() {
// Given a event store with no events
s.innerStore.On("LoadFrom", mock.Anything, mock.Anything, 2).Return(s.unsortedEventList[0:2], nil)

// When we load the events
events, err := s.eventSorter.LoadFrom(context.TODO(), uuid.New(), 2)

// Then no error is returned
s.NoError(err)

// And the events are returned in version order
s.Len(events, 2)

s.Equal(2, events[0].Version())
s.Equal(3, events[1].Version())
}
40 changes: 40 additions & 0 deletions eventstore/eventsorter/eventstore_stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package eventsorter

import (
"context"
eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/uuid"
"github.com/stretchr/testify/mock"
)

type EventStoreMock struct {
mock.Mock
}

var _ eh.EventStore = (*EventStoreMock)(nil)

func (e *EventStoreMock) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]eh.Event, error) {
args := e.Called(ctx, id, version)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).([]eh.Event), args.Error(1)
}

func (e *EventStoreMock) Save(ctx context.Context, events []eh.Event, originalVersion int) error {
args := e.Called(ctx, events, originalVersion)
return args.Error(0)
}

func (e *EventStoreMock) Load(ctx context.Context, u uuid.UUID) ([]eh.Event, error) {
args := e.Called(ctx, u)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).([]eh.Event), args.Error(1)
}

func (e *EventStoreMock) Close() error {
args := e.Called()
return args.Error(0)
}
3 changes: 2 additions & 1 deletion eventstore/mongodb/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mongodb

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -332,7 +333,7 @@ func (s *EventStore) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([
var aggregate aggregateRecord
if err := s.aggregates.FindOne(ctx, bson.M{"_id": id}).Decode(&aggregate); err != nil {
// Translate to our own not found error.
if err == mongo.ErrNoDocuments {
if errors.Is(err, mongo.ErrNoDocuments) {
err = eh.ErrAggregateNotFound
}

Expand Down
25 changes: 23 additions & 2 deletions eventstore/mongodb_v2/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ import (
"github.com/looplab/eventhorizon/uuid"
)

const Ascending = 1

// EventStore is an eventhorizon.EventStore for MongoDB, using one collection
// for all events and another to keep track of all aggregates/streams. It also
// keeps track of the global position of events, stored as metadata.
// This implementation warrants event order by Version on Load and LoadFrom methods (configurable, see WithSortEventsOnDB).
type EventStore struct {
client *mongo.Client
clientOwnership clientOwnership
Expand All @@ -52,6 +55,7 @@ type EventStore struct {
eventHandlerAfterSave eh.EventHandler
eventHandlerInTX eh.EventHandler
skipNonRegisteredEvents bool
sortEventsOnDb bool // if true, events will be sorted on DB side. Default is false for backward compatibility.
}

type clientOwnership int
Expand Down Expand Up @@ -223,6 +227,16 @@ func WithSnapshotCollectionName(snapshotColl string) Option {
}
}

// WithSortEventsOnDB enables sorting events on DB.
// Without this option, events order should be warranted by DB default ordering. This is not the case for MongoDB.
func WithSortEventsOnDB() Option {
return func(s *EventStore) error {
s.sortEventsOnDb = true

return nil
}
}

// Save implements the Save method of the eventhorizon.EventStore interface.
func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersion int) error {
if len(events) == 0 {
Expand Down Expand Up @@ -430,7 +444,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio

// Load implements the Load method of the eventhorizon.EventStore interface.
func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) {
cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id})
cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id}, s.makeFindOptions())
if err != nil {
return nil, &eh.EventStoreError{
Err: fmt.Errorf("could not find event: %w", err),
Expand All @@ -444,7 +458,7 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error)

// LoadFrom implements LoadFrom method of the eventhorizon.SnapshotStore interface.
func (s *EventStore) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]eh.Event, error) {
cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id, "version": bson.M{"$gte": version}})
cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id, "version": bson.M{"$gte": version}}, s.makeFindOptions())
if err != nil {
return nil, &eh.EventStoreError{
Err: fmt.Errorf("could not find event: %w", err),
Expand Down Expand Up @@ -574,6 +588,13 @@ func (s *EventStore) LoadSnapshot(ctx context.Context, id uuid.UUID) (*eh.Snapsh
return snapshot, nil
}

func (s *EventStore) makeFindOptions() *mongoOptions.FindOptions {
if s.sortEventsOnDb {
return options.Find().SetSort(bson.M{"version": Ascending})
}
return options.Find()
}

func (s *EventStore) SaveSnapshot(ctx context.Context, id uuid.UUID, snapshot eh.Snapshot) (err error) {
if snapshot.AggregateType == "" {
return &eh.EventStoreError{
Expand Down
Loading