Skip to content

Commit

Permalink
Swap out stream for KV
Browse files Browse the repository at this point in the history
Signed-off-by: Byron Ruth <b@devel.io>
  • Loading branch information
bruth committed Aug 4, 2022
1 parent 029f93d commit 3a4d3a7
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 50 deletions.
17 changes: 12 additions & 5 deletions examples/jetstream/republish/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,29 +1,36 @@
version: '3.9'
services:
leaf:
hub:
image: docker.io/synadia/nats-server:nightly
command:
- "--debug"
- "--http_port=8222"
- "--config=hub.conf"
- "--config=/nats.conf"
ports:
- "14222:4222"
- "18222:8222"
volumes:
- "./hub.conf:/nats.conf"
- "./accounts.conf:/accounts.conf"

leaf:
image: docker.io/synadia/nats-server:nightly
command:
- "--debug"
- "--http_port=8222"
- "--config=leaf.conf"
- "--config=/nats.conf"
ports:
- "14223:4222"
- "18223:8222"
volumes:
- "./leaf.conf:/nats.conf"
- "./accounts.conf:/accounts.conf"

app:
image: ${IMAGE_TAG}
environment:
- NATS_URL=nats://app@app:leaf:4222
- NATS_URL=nats://app:app@leaf:4222
depends_on:
- nats
- leaf
- hub

2 changes: 1 addition & 1 deletion examples/jetstream/republish/go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/bruth/nats-by-example/examples/jetstream/republish/go

go 1.18

require github.com/nats-io/nats.go v1.16.1-0.20220803180602-ec49000ee1e2
require github.com/nats-io/nats.go v1.16.1-0.20220803221958-cc189da40f83

require (
github.com/golang/protobuf v1.5.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/jetstream/republish/go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ github.com/nats-io/nats.go v1.16.1-0.20220803074641-59d7d9ccc73e h1:E9cb15fk4R0K
github.com/nats-io/nats.go v1.16.1-0.20220803074641-59d7d9ccc73e/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.16.1-0.20220803180602-ec49000ee1e2 h1:SfnWrSDnTRFeLtpPfGpctfb6Ke/Npgmi9n/nLzeYfKo=
github.com/nats-io/nats.go v1.16.1-0.20220803180602-ec49000ee1e2/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.16.1-0.20220803221958-cc189da40f83 h1:EAK6rdyLzKzw3Cklsth//yfG2rZn0jdvCVuUdpFbpzQ=
github.com/nats-io/nats.go v1.16.1-0.20220803221958-cc189da40f83/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
1 change: 0 additions & 1 deletion examples/jetstream/republish/go/hub.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ server_name: hub

jetstream {
domain: hub
store_dir: ./hub
}

leafnodes {
Expand Down
1 change: 0 additions & 1 deletion examples/jetstream/republish/go/leaf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ server_name: leaf

jetstream {
domain: leaf
store_dir: ./leaf
}

leafnodes {
Expand Down
74 changes: 32 additions & 42 deletions examples/jetstream/republish/go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,14 @@ func main() {
// *Note: the Go client prior to v1.16.0 used an experimental API for
// this feature and changed the name of the type for `RePublish` from
// `SubjectMapping`.*
streamName := "CONFIG"
bucketName := "CONFIG"

js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{
"config.*.*",
},
kv, _ := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: bucketName,
History: 1,
RePublish: &nats.RePublish{
Source: "config.*.*",
Destination: "$1.config.$2",
Source: ">",
Destination: "repub.>",
},
})

Expand All @@ -72,24 +70,23 @@ func main() {
// a set of permissions authorized to only subscribe to its subset
// of messages. This provides a lightweight and secure fan-out option
// in lieu of consumers.
sub, _ := nc.SubscribeSync("device-1.config.*")
sub, _ := nc.SubscribeSync("repub.$KV.CONFIG.device-1.*")

// Let's publish a message to the stream and then receive it
// on the subscription.
js.Publish("config.device-1.driving", []byte(`true`))
kv.Put("device-1.driving", []byte(`true`))

msg, _ := sub.NextMsg(time.Second)
fmt.Printf("received message on %q\n", msg.Subject)

// Every republished message includes an augmented set of headers
// which allows for tracking and correlating this message with respect
// to the original stream.
fmt.Printf(`
- Nats-Subject: %s
- Nats-Stream: %s
- Nats-Sequence: %s
- Nats-Last-Sequence: %s
fmt.Printf(`republish headers:
- Nats-Subject: %s
- Nats-Stream: %s
- Nats-Sequence: %s
- Nats-Last-Sequence: %s
`,
msg.Header.Get("Nats-Subject"),
msg.Header.Get("Nats-Stream"),
Expand All @@ -107,17 +104,17 @@ func main() {
// from the message and set it into the config and print it.
prop, seq, val := parseConfigEntry(msg)
config.Set(prop, seq, val)
fmt.Println(config)
fmt.Printf("config: %s\n", config)

// And another...
js.Publish("config.device-1.temperature", []byte(`80`))
kv.Put("device-1.temperature", []byte(`80`))

msg, _ = sub.NextMsg(time.Second)
fmt.Printf("received message on %q\n", msg.Subject)

prop, seq, val = parseConfigEntry(msg)
config.Set(prop, seq, val)
fmt.Println(config)
fmt.Printf("config: %s\n", config)

// What happens if the device temporarily gets disconnected? How can
// we catch up? Since the messages are being republished to a subject
Expand All @@ -127,57 +124,50 @@ func main() {
// magnitude more devices.
// Let's unsubscribe and observe what happens.
sub.Unsubscribe()
fmt.Println("subscription closed")

js.Publish("config.device-1.temperature", []byte(`72`))
js.Publish("config.device-1.temperature", []byte(`76`))
kv.Put("device-1.temperature", []byte(`72`))
kv.Put("device-1.temperature", []byte(`76`))

// To recover from this situation, we can introduce an initialization
// step after we setup the subscription. Let's re-subscribe to start
// buffering new messages.
sub, _ = nc.SubscribeSync("device-1.config.*")

// In the meantime, we can leverage a new JetStream API which supports
// getting the latest message on a stream for a given subject.
// Given that we know the set of properties we want to fetch, we can
// iterate over them and _catch-up_ with the latest known property.
// This feature is opt-in, so we must enable it on a stream. We can do this by updating the stream.
js.UpdateStream(&nats.StreamConfig{
AllowDirect: true,
})
sub, _ = nc.SubscribeSync("repub.$KV.CONFIG.device-1.*")
fmt.Println("re-subscribed for new messages")

// This API is slightly lower-level and returns a `RawStreamMsg`.
// We can still extract the same information we need to update the
// config struct.
fmt.Println("direct latest props")
for _, prop := range props {
// Original subject?
subject := fmt.Sprintf("config.device-1.%s", prop)
rmsg, _ := js.GetLastMsg(streamName, subject)
subject := fmt.Sprintf("device-1.%s", prop)
entry, _ := kv.Get(subject)

// Just ignore if no message is available.
if rmsg == nil {
if entry == nil {
continue
}

seq := rmsg.Sequence
val := string(rmsg.Data)
seq := entry.Revision()
val := string(entry.Value())

fmt.Printf("catching up prop: %q\n", prop)
config.Set(prop, seq, val)
fmt.Printf("config: %s\n", config)
}

// Given that the two temperature values were appended to the stream
// we would expect the latest temp to be present (76).
fmt.Println(config)

// If we publish a new message to the stream, we can receive it on
// the subscription and continue on.
js.Publish("config.device-1.radio", []byte(`102.9 FM`))
kv.Put("device-1.radio", []byte(`102.9 FM`))

// Receive the message and set in the config.
msg, _ = sub.NextMsg(time.Second)
fmt.Printf("received message on %q\n", msg.Subject)

prop, seq, val = parseConfigEntry(msg)
config.Set(prop, seq, val)
fmt.Println(config)
fmt.Printf("config: %s\n", config)

sub.Unsubscribe()
}
Expand Down

1 comment on commit 3a4d3a7

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.