Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[checkout] fix kafka restart #1590

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ the release.
([#1592](https://github.com/open-telemetry/opentelemetry-demo/pull/1592))
* chore: Add service version to OTEL_RESOURCE_ATTRIBUTES
([#1594](https://github.com/open-telemetry/opentelemetry-demo/pull/1594))
* [checkout] increase Kafka resiliency and observability
([#1590](https://github.com/open-telemetry/opentelemetry-demo/pull/1590))

## 1.9.0

Expand Down
10 changes: 10 additions & 0 deletions src/checkoutservice/kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,18 @@ var (
)

func CreateKafkaProducer(brokers []string, log *logrus.Logger) (sarama.AsyncProducer, error) {
sarama.Logger = log

saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Return.Successes = true
saramaConfig.Producer.Return.Errors = true

// Sarama has an issue in a single broker kafka if the kafka broker is restarted.
// This setting is to prevent that issue from manifesting itself, but may swallow failed messages.
saramaConfig.Producer.RequiredAcks = sarama.NoResponse

saramaConfig.Version = ProtocolVersion

// So we can know the partition and offset of messages.
saramaConfig.Producer.Return.Successes = true

Expand Down
107 changes: 71 additions & 36 deletions src/checkoutservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ import (

"github.com/IBM/sarama"
"github.com/google/uuid"
otelhooks "github.com/open-feature/go-sdk-contrib/hooks/open-telemetry/pkg"
flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg"
"github.com/open-feature/go-sdk/openfeature"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/contrib/instrumentation/runtime"
"go.opentelemetry.io/otel"
otelcodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
Expand All @@ -34,9 +38,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
flagd "github.com/open-feature/go-sdk-contrib/providers/flagd/pkg"
"github.com/open-feature/go-sdk/openfeature"
otelhooks "github.com/open-feature/go-sdk-contrib/hooks/open-telemetry/pkg"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -440,13 +441,13 @@ func (cs *checkoutService) convertCurrency(ctx context.Context, from *pb.Money,
}

func (cs *checkoutService) chargeCard(ctx context.Context, amount *pb.Money, paymentInfo *pb.CreditCardInfo) (string, error) {
paymentService := cs.paymentSvcClient
paymentService := cs.paymentSvcClient
if cs.isFeatureFlagEnabled(ctx, "paymentServiceUnreachable") {
badAddress := "badAddress:50051"
c := mustCreateClient(context.Background(), badAddress)
badAddress := "badAddress:50051"
c := mustCreateClient(context.Background(), badAddress)
paymentService = pb.NewPaymentServiceClient(c)
}
}

paymentResp, err := paymentService.Charge(ctx, &pb.ChargeRequest{
Amount: amount,
CreditCard: paymentInfo})
Expand Down Expand Up @@ -504,18 +505,52 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O
span := createProducerSpan(ctx, &msg)
defer span.End()

cs.KafkaProducerClient.Input() <- &msg
successMsg := <-cs.KafkaProducerClient.Successes()
log.Infof("Successful to write message. offset: %v", successMsg.Offset)
// Send message and handle response
startTime := time.Now()
select {
case cs.KafkaProducerClient.Input() <- &msg:
log.Infof("Message sent to Kafka: %v", msg)
select {
case successMsg := <-cs.KafkaProducerClient.Successes():
span.SetAttributes(
attribute.Bool("messaging.kafka.producer.success", true),
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
attribute.KeyValue(semconv.MessagingKafkaMessageOffset(int(successMsg.Offset))),
)
log.Infof("Successful to write message. offset: %v, duration: %v", successMsg.Offset, time.Since(startTime))
case errMsg := <-cs.KafkaProducerClient.Errors():
span.SetAttributes(
attribute.Bool("messaging.kafka.producer.success", false),
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
)
span.SetStatus(otelcodes.Error, errMsg.Err.Error())
log.Errorf("Failed to write message: %v", errMsg.Err)
case <-ctx.Done():
span.SetAttributes(
attribute.Bool("messaging.kafka.producer.success", false),
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
)
span.SetStatus(otelcodes.Error, "Context cancelled: "+ctx.Err().Error())
log.Warnf("Context canceled before success message received: %v", ctx.Err())
}
case <-ctx.Done():
span.SetAttributes(
attribute.Bool("messaging.kafka.producer.success", false),
attribute.Int("messaging.kafka.producer.duration_ms", int(time.Since(startTime).Milliseconds())),
)
span.SetStatus(otelcodes.Error, "Failed to send: "+ctx.Err().Error())
log.Errorf("Failed to send message to Kafka within context deadline: %v", ctx.Err())
return
}

ffValue := cs.getIntFeatureFlag(ctx, "kafkaQueueProblems")
if ffValue > 0 {
log.Infof("Warning: FeatureFlag 'kafkaQueueProblems' is activated, overloading queue now.")
for i := 0; i < ffValue; i++ {
go func(i int) {
cs.KafkaProducerClient.Input() <- &msg
go func(i int) {
cs.KafkaProducerClient.Input() <- &msg
_ = <-cs.KafkaProducerClient.Successes()
}(i)
}(i)
}
log.Infof("Done with #%d messages for overload simulation.", ffValue)
}
Expand Down Expand Up @@ -548,29 +583,29 @@ func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.
}

func (cs *checkoutService) isFeatureFlagEnabled(ctx context.Context, featureFlagName string) bool {
client := openfeature.NewClient("checkout")
// Default value is set to false, but you could also make this a parameter.
featureEnabled, _ := client.BooleanValue(
ctx,
featureFlagName,
false,
openfeature.EvaluationContext{},
)
return featureEnabled
client := openfeature.NewClient("checkout")

// Default value is set to false, but you could also make this a parameter.
featureEnabled, _ := client.BooleanValue(
ctx,
featureFlagName,
false,
openfeature.EvaluationContext{},
)

return featureEnabled
}

func (cs *checkoutService) getIntFeatureFlag(ctx context.Context, featureFlagName string) int {
client := openfeature.NewClient("checkout")
// Default value is set to 0, but you could also make this a parameter.
featureFlagValue, _ := client.IntValue(
ctx,
featureFlagName,
0,
openfeature.EvaluationContext{},
)
return int(featureFlagValue)
client := openfeature.NewClient("checkout")

// Default value is set to 0, but you could also make this a parameter.
featureFlagValue, _ := client.IntValue(
ctx,
featureFlagName,
0,
openfeature.EvaluationContext{},
)

return int(featureFlagValue)
}
Loading