Skip to content

Commit

Permalink
feature: Postgres support
Browse files Browse the repository at this point in the history
  • Loading branch information
maxekman committed Nov 9, 2021
1 parent e82d246 commit f1c58ba
Show file tree
Hide file tree
Showing 12 changed files with 1,736 additions and 3 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ upload_coverage:

.PHONY: run
run:
docker-compose up -d mongodb gpubsub kafka redis nats
docker-compose up -d mongodb postgres gpubsub kafka redis nats

.PHONY: run_mongodb
run_mongodb:
docker-compose up -d mongodb

.PHONY: run_postgres
run_postgres:
docker-compose up -d postgres

.PHONY: run_gpubsub
run_gpubsub:
docker-compose up -d gpubsub
Expand Down
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ services:
volumes:
- mongodb:/data/db

postgres:
image: postgres:14
ports:
- "5432:5432"
environment:
POSTGRES_PASSWORD: "password"
# PGDATA: "/data/postgres"
# volumes:
# - postgres:/data/postgres

gpubsub:
image: gcr.io/google.com/cloudsdktool/cloud-sdk:355.0.0-emulators
ports:
Expand Down
133 changes: 133 additions & 0 deletions eventstore/postgres/eventmaintenance.go.off
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (c) 2021 - The Event Horizon authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package postgres

import (
"context"
"fmt"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"

// Register uuid.UUID as BSON type.
_ "github.com/looplab/eventhorizon/codec/bson"

eh "github.com/looplab/eventhorizon"
)

// Replace implements the Replace method of the eventhorizon.EventStore interface.
func (s *EventStore) Replace(ctx context.Context, event eh.Event) error {
sess, err := s.client.StartSession(nil)
if err != nil {
return eh.EventStoreError{
Err: eh.ErrCouldNotSaveEvents,
BaseErr: err,
}
}
defer sess.EndSession(ctx)

if _, err := sess.WithTransaction(ctx, func(txCtx mongo.SessionContext) (interface{}, error) {
// First check if the aggregate exists, the not found error in the update
// query can mean both that the aggregate or the event is not found.
if n, err := s.events.CountDocuments(ctx,
bson.M{"aggregate_id": event.AggregateID()}); n == 0 {
return nil, eh.ErrAggregateNotFound
} else if err != nil {
return nil, err
}

// Create the event record for the Database.
e, err := newEvt(ctx, event)
if err != nil {
return nil, err
}

// Copy the event position from the old event (and set in metadata).
res := s.events.FindOne(ctx, bson.M{
"aggregate_id": event.AggregateID(),
"version": event.Version(),
})
if res.Err() != nil {
if res.Err() == mongo.ErrNoDocuments {
return nil, eh.ErrInvalidEvent
}
return nil, fmt.Errorf("could not find event to replace: %w", res.Err())
}
var eventToReplace evt
if err := res.Decode(&eventToReplace); err != nil {
return nil, fmt.Errorf("could not decode event to replace: %w", err)
}
e.Position = eventToReplace.Position
e.Metadata["position"] = eventToReplace.Position

// Find and replace the event.
if r, err := s.events.ReplaceOne(ctx, bson.M{
"aggregate_id": event.AggregateID(),
"version": event.Version(),
}, e); err != nil {
return nil, err
} else if r.MatchedCount == 0 {
return nil, eh.ErrInvalidEvent
}
return nil, nil
}); err != nil {
// Return some errors intact.
if err == eh.ErrAggregateNotFound || err == eh.ErrInvalidEvent {
return err
}
return eh.EventStoreError{
Err: eh.ErrCouldNotSaveEvents,
BaseErr: err,
}
}

return nil
}

// RenameEvent implements the RenameEvent method of the eventhorizon.EventStore interface.
func (s *EventStore) RenameEvent(ctx context.Context, from, to eh.EventType) error {
// Find and rename all events.
// TODO: Maybe use change info.
if _, err := s.events.UpdateMany(ctx,
bson.M{
"event_type": from.String(),
},
bson.M{
"$set": bson.M{"event_type": to.String()},
},
); err != nil {
return eh.EventStoreError{
Err: eh.ErrCouldNotSaveEvents,
BaseErr: err,
}
}

return nil
}

// Clear clears the event storage.
func (s *EventStore) Clear(ctx context.Context) error {
if err := s.events.Drop(ctx); err != nil {
return eh.EventStoreError{
Err: fmt.Errorf("could not clear events collection: %w", err),
}
}
if err := s.streams.Drop(ctx); err != nil {
return eh.EventStoreError{
Err: fmt.Errorf("could not clear streams collection: %w", err),
}
}
return nil
}
57 changes: 57 additions & 0 deletions eventstore/postgres/eventmaintenance_test.go.off
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2021 - The Event Horizon authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package postgres

import (
"context"
"crypto/rand"
"encoding/hex"
"os"
"testing"

"github.com/looplab/eventhorizon/eventstore"
)

func TestEventStoreMaintenanceIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

// Use MongoDB in Docker with fallback to localhost.
addr := os.Getenv("MONGODB_ADDR")
if addr == "" {
addr = "localhost:27017"
}
url := "mongodb://" + addr

// Get a random DB name.
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
t.Fatal(err)
}
db := "test-" + hex.EncodeToString(b)
t.Log("using DB:", db)

store, err := NewEventStore(url, db)
if err != nil {
t.Fatal("there should be no error:", err)
}
if store == nil {
t.Fatal("there should be a store")
}
defer store.Close(context.Background())

eventstore.MaintenanceAcceptanceTest(t, store, store, context.Background())
}
Loading

0 comments on commit f1c58ba

Please sign in to comment.