-
Notifications
You must be signed in to change notification settings - Fork 11
/
store.go
127 lines (113 loc) · 3.15 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package eventstore
import (
"context"
"errors"
"fmt"
"sync"
"github.com/google/uuid"
"github.com/modernice/goes/event"
"github.com/modernice/goes/event/query"
)
// New returns a thread-safe in-memory event store. The provided events are
// immediately inserted into the store.
//
// This event store is not production ready. It is intended to be used for
// testing and prototyping. In production, use the MongoDB event store instead.
// TODO(bounoable): List other event store implementations when they are ready.
func New(events ...event.Event) event.Store {
store := &memstore{
idMap: make(map[uuid.UUID]event.Event),
events: events,
}
for _, evt := range events {
store.idMap[evt.ID()] = evt
}
return store
}
var (
errEventNotFound = errors.New("event not found")
errDuplicateEvent = errors.New("duplicate event")
)
type memstore struct {
mux sync.RWMutex
events []event.Event
idMap map[uuid.UUID]event.Event
}
// Insert inserts the provided events into the in-memory event store. If an
// event with the same ID already exists, an error is returned.
func (s *memstore) Insert(ctx context.Context, events ...event.Event) error {
for _, evt := range events {
if err := s.insert(ctx, evt); err != nil {
return fmt.Errorf("%s:%s %w", evt.Name(), evt.ID(), err)
}
}
return nil
}
func (s *memstore) insert(ctx context.Context, evt event.Event) error {
if _, err := s.Find(ctx, evt.ID()); err == nil {
return errDuplicateEvent
}
defer s.reslice()
s.mux.Lock()
defer s.mux.Unlock()
s.idMap[evt.ID()] = evt
return nil
}
// Find returns the event with the given UUID or errEventNotFound if no such
// event exists in the store.
func (s *memstore) Find(ctx context.Context, id uuid.UUID) (event.Event, error) {
s.mux.RLock()
defer s.mux.RUnlock()
if evt := s.idMap[id]; evt != nil {
return evt, nil
}
return nil, errEventNotFound
}
// Query returns a channel of events and a channel of errors. The events channel
// will emit all events in the store that match the given query. The order of
// the emitted events is determined by the query's sorting options. If the
// context is cancelled, the function will return immediately.
func (s *memstore) Query(ctx context.Context, q event.Query) (<-chan event.Event, <-chan error, error) {
s.mux.RLock()
defer s.mux.RUnlock()
var events []event.Event
for _, evt := range s.events {
if query.Test(q, evt) {
events = append(events, evt)
}
}
events = event.SortMulti(events, q.Sortings()...)
out := make(chan event.Event)
errs := make(chan error)
go func() {
defer close(errs)
defer close(out)
for _, evt := range events {
select {
case <-ctx.Done():
return
case out <- evt:
}
}
}()
return out, errs, nil
}
// Delete removes the specified events from the store. Events are provided as a
// slice of event.Event.
func (s *memstore) Delete(ctx context.Context, events ...event.Event) error {
defer s.reslice()
s.mux.Lock()
defer s.mux.Unlock()
for _, evt := range events {
delete(s.idMap, evt.ID())
}
return nil
}
func (s *memstore) reslice() {
s.mux.Lock()
defer s.mux.Unlock()
s.events = s.events[:0]
for _, evt := range s.idMap {
s.events = append(s.events, evt)
}
}