From e7ff90a7269373b0735716d6a6540a6cabbbbca0 Mon Sep 17 00:00:00 2001 From: Walker Crouse Date: Sat, 6 Apr 2024 20:28:44 -0400 Subject: [PATCH] Event Hub Scaler: Remove or replace usages of Event Hub offsets (#5600) * Event Hub Scaler: Remove or replace usages of Event Hub offsets Signed-off-by: Walker Crouse * fix sequence number field name Signed-off-by: Walker Crouse * add CHANGELOG entry Signed-off-by: Walker Crouse * add issue link to changelog entry Signed-off-by: Walker Crouse * fix test that is based on an erroneous assumption Signed-off-by: Walker Crouse * whitespace Signed-off-by: Walker Crouse * remove baseCheckpoint Signed-off-by: Walker Crouse --------- Signed-off-by: Walker Crouse Co-authored-by: Walker Crouse --- CHANGELOG.md | 1 + .../azure/azure_eventhub_checkpoint.go | 30 ++-------- pkg/scalers/azure/azure_eventhub_test.go | 57 ++++--------------- pkg/scalers/azure_eventhub_scaler.go | 12 +--- pkg/scalers/azure_eventhub_scaler_test.go | 38 ++++++------- 5 files changed, 36 insertions(+), 102 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a56b3e4c63f..aad16f3f023 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ Here is an overview of all new **experimental** features: - **General**: Add GRPC Healthchecks ([#5590](https://github.com/kedacore/keda/issues/5590)) - **General**: Add OPENTELEMETRY flag in e2e test YAML ([#5375](https://github.com/kedacore/keda/issues/5375)) - **General**: Add support for cross tenant/cloud authentication when using Azure Workload Identity for TriggerAuthentication ([#5441](https://github.com/kedacore/keda/issues/5441)) +- **Azure Event Hub Scaler**: Remove usage of checkpoint offsets to account for SDK checkpointing implementation changes ([#5574](https://github.com/kedacore/keda/issues/5574)) - **GCP Stackdriver Scaler**: Add missing parameters 'rate' and 'count' for GCP Stackdriver Scaler alignment ([#5633](https://github.com/kedacore/keda/issues/5633)) - **Metrics API Scaler**: Add support for various formats: json, xml, yaml, prometheus ([#2633](https://github.com/kedacore/keda/issues/2633)) - **MongoDB Scaler**: Add scheme field support srv record ([#5544](https://github.com/kedacore/keda/issues/5544)) diff --git a/pkg/scalers/azure/azure_eventhub_checkpoint.go b/pkg/scalers/azure/azure_eventhub_checkpoint.go index 28f3048aa10..d5ba6c67827 100644 --- a/pkg/scalers/azure/azure_eventhub_checkpoint.go +++ b/pkg/scalers/azure/azure_eventhub_checkpoint.go @@ -35,29 +35,19 @@ import ( // goCheckpoint struct to adapt goSdk Checkpoint type goCheckpoint struct { Checkpoint struct { - SequenceNumber int64 `json:"sequenceNumber"` - Offset string `json:"offset"` + SequenceNumber int64 `json:"sequenceNumber"` } `json:"checkpoint"` PartitionID string `json:"partitionId"` } -type baseCheckpoint struct { - Epoch int64 `json:"Epoch"` - Offset string `json:"Offset"` - Owner string `json:"Owner"` - Token string `json:"Token"` -} - // Checkpoint in a common format type Checkpoint struct { - baseCheckpoint PartitionID string `json:"PartitionId"` SequenceNumber int64 `json:"SequenceNumber"` } // Older python sdk stores the checkpoint differently type pythonCheckpoint struct { - baseCheckpoint PartitionID string `json:"partition_id"` SequenceNumber int64 `json:"sequence_number"` } @@ -92,8 +82,8 @@ type defaultCheckpointer struct { containerName string } -func NewCheckpoint(offset string, sequenceNumber int64) Checkpoint { - return Checkpoint{baseCheckpoint: baseCheckpoint{Offset: offset}, SequenceNumber: sequenceNumber} +func NewCheckpoint(sequenceNumber int64) Checkpoint { + return Checkpoint{SequenceNumber: sequenceNumber} } // GetCheckpointFromBlobStorage reads depending of the CheckpointStrategy the checkpoint from a azure storage @@ -222,10 +212,7 @@ func newGoSdkCheckpoint(get *azblob.DownloadResponse) (Checkpoint, error) { return Checkpoint{ SequenceNumber: checkpoint.Checkpoint.SequenceNumber, - baseCheckpoint: baseCheckpoint{ - Offset: checkpoint.Checkpoint.Offset, - }, - PartitionID: checkpoint.PartitionID, + PartitionID: checkpoint.PartitionID, }, nil } @@ -318,15 +305,6 @@ func getCheckpointFromStorageMetadata(get *azblob.DownloadResponse, partitionID } } - if offset, ok := metadata["offset"]; ok { - if !ok { - if offset, ok = metadata["Offset"]; !ok { - return Checkpoint{}, fmt.Errorf("offset on blob not found") - } - } - checkpoint.Offset = offset - } - return checkpoint, nil } diff --git a/pkg/scalers/azure/azure_eventhub_test.go b/pkg/scalers/azure/azure_eventhub_test.go index ca95c77e660..a9179be25ea 100644 --- a/pkg/scalers/azure/azure_eventhub_test.go +++ b/pkg/scalers/azure/azure_eventhub_test.go @@ -25,23 +25,19 @@ func TestCheckpointFromBlobStorageAzureFunction(t *testing.T) { } partitionID := "0" - offset := "1001" consumerGroup := "$Default1" sequencenumber := int64(1) containerName := "azure-webjobs-eventhub" - checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}" - checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID) + checkpointFormat := "{\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}" + checkpoint := fmt.Sprintf(checkpointFormat, sequencenumber, partitionID) urlPath := fmt.Sprintf("eventhubnamespace.servicebus.windows.net/hub/%s/", consumerGroup) ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil) assert.Equal(t, err, nil) expectedCheckpoint := Checkpoint{ - baseCheckpoint: baseCheckpoint{ - Offset: offset, - }, PartitionID: partitionID, SequenceNumber: sequencenumber, } @@ -54,8 +50,6 @@ func TestCheckpointFromBlobStorageAzureFunction(t *testing.T) { } check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, "0") - _ = check.Offset - _ = expectedCheckpoint.Offset assert.Equal(t, check, expectedCheckpoint) } @@ -65,23 +59,19 @@ func TestCheckpointFromBlobStorageDefault(t *testing.T) { } partitionID := "1" - offset := "1005" consumerGroup := "$Default2" sequencenumber := int64(1) containerName := "defaultcontainer" - checkpointFormat := "{\"Offset\":\"%s\",\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}" - checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID) + checkpointFormat := "{\"SequenceNumber\":%d,\"PartitionId\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}" + checkpoint := fmt.Sprintf(checkpointFormat, sequencenumber, partitionID) urlPath := fmt.Sprintf("%s/", consumerGroup) ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil) assert.Equal(t, err, nil) expectedCheckpoint := Checkpoint{ - baseCheckpoint: baseCheckpoint{ - Offset: offset, - }, PartitionID: partitionID, SequenceNumber: sequencenumber, } @@ -95,8 +85,6 @@ func TestCheckpointFromBlobStorageDefault(t *testing.T) { } check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) - _ = check.Offset - _ = expectedCheckpoint.Offset assert.Equal(t, check, expectedCheckpoint) } @@ -106,23 +94,19 @@ func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T } partitionID := "2" - offset := "1006" consumerGroup := "$Default3" sequencenumber := int64(1) containerName := "defaultcontainerpython" - checkpointFormat := "{\"Offset\":\"%s\",\"sequence_number\":%d,\"partition_id\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}" - checkpoint := fmt.Sprintf(checkpointFormat, offset, sequencenumber, partitionID) + checkpointFormat := "{\"sequence_number\":%d,\"partition_id\":\"%s\",\"Owner\":\"\",\"Token\":\"\",\"Epoch\":0}" + checkpoint := fmt.Sprintf(checkpointFormat, sequencenumber, partitionID) urlPath := fmt.Sprintf("%s/", consumerGroup) ctx, err := createNewCheckpointInStorage(urlPath, containerName, partitionID, checkpoint, nil) assert.Equal(t, err, nil) expectedCheckpoint := Checkpoint{ - baseCheckpoint: baseCheckpoint{ - Offset: offset, - }, PartitionID: partitionID, SequenceNumber: sequencenumber, } @@ -136,8 +120,6 @@ func TestCheckpointFromBlobStorageDefaultDeprecatedPythonCheckpoint(t *testing.T } check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) - _ = check.Offset - _ = expectedCheckpoint.Offset assert.Equal(t, check, expectedCheckpoint) } @@ -147,13 +129,11 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) { } partitionID := "4" - offset := "1002" consumerGroup := "$default" sequencenumber := int64(1) metadata := map[string]string{ - "offset": offset, "sequencenumber": strconv.FormatInt(sequencenumber, 10), } @@ -164,9 +144,6 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) { assert.Equal(t, err, nil) expectedCheckpoint := Checkpoint{ - baseCheckpoint: baseCheckpoint{ - Offset: offset, - }, PartitionID: partitionID, SequenceNumber: sequencenumber, } @@ -181,8 +158,6 @@ func TestCheckpointFromBlobStorageWithBlobMetadata(t *testing.T) { } check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) - _ = check.Offset - _ = expectedCheckpoint.Offset assert.Equal(t, check, expectedCheckpoint) } @@ -192,13 +167,12 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) { } partitionID := "0" - offset := "1003" sequencenumber := int64(1) containerName := "gosdkcontainer" - checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"offset\":\"%s\",\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}" - checkpoint := fmt.Sprintf(checkpointFormat, partitionID, offset, sequencenumber) + checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}" + checkpoint := fmt.Sprintf(checkpointFormat, partitionID, sequencenumber) urlPath := "" @@ -206,9 +180,6 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) { assert.Equal(t, err, nil) expectedCheckpoint := Checkpoint{ - baseCheckpoint: baseCheckpoint{ - Offset: offset, - }, PartitionID: partitionID, SequenceNumber: sequencenumber, } @@ -222,8 +193,6 @@ func TestCheckpointFromBlobStorageGoSdk(t *testing.T) { } check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) - _ = check.Offset - _ = expectedCheckpoint.Offset assert.Equal(t, check, expectedCheckpoint) } @@ -233,15 +202,14 @@ func TestCheckpointFromBlobStorageDapr(t *testing.T) { } partitionID := "0" - offset := "1004" consumerGroup := "$default" eventhubName := "hub" sequencenumber := int64(1) containerName := fmt.Sprintf("dapr-%s-%s-%s", eventhubName, consumerGroup, partitionID) - checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"offset\":\"%s\",\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}" - checkpoint := fmt.Sprintf(checkpointFormat, partitionID, offset, sequencenumber) + checkpointFormat := "{\"partitionID\":\"%s\",\"epoch\":0,\"owner\":\"\",\"checkpoint\":{\"sequenceNumber\":%d,\"enqueueTime\":\"\"},\"state\":\"\",\"token\":\"\"}" + checkpoint := fmt.Sprintf(checkpointFormat, partitionID, sequencenumber) urlPath := "" @@ -249,9 +217,6 @@ func TestCheckpointFromBlobStorageDapr(t *testing.T) { assert.Equal(t, err, nil) expectedCheckpoint := Checkpoint{ - baseCheckpoint: baseCheckpoint{ - Offset: offset, - }, PartitionID: partitionID, SequenceNumber: sequencenumber, } @@ -265,8 +230,6 @@ func TestCheckpointFromBlobStorageDapr(t *testing.T) { } check, _ := GetCheckpointFromBlobStorage(ctx, http.DefaultClient, eventHubInfo, partitionID) - _ = check.Offset - _ = expectedCheckpoint.Offset assert.Equal(t, check, expectedCheckpoint) } diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 1e92c7ea491..2f48b4ed31a 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -280,8 +280,8 @@ func parseAzureEventHubAuthenticationMetadata(logger logr.Logger, config *scaler // GetUnprocessedEventCountInPartition gets number of unprocessed events in a given partition func (s *azureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Context, partitionInfo *eventhub.HubPartitionRuntimeInformation) (newEventCount int64, checkpoint azure.Checkpoint, err error) { - // if partitionInfo.LastEnqueuedOffset = -1, that means event hub partition is empty - if partitionInfo == nil || partitionInfo.LastEnqueuedOffset == "-1" { + // if partitionInfo.LastSequenceNumber = -1, that means event hub partition is empty + if partitionInfo == nil || partitionInfo.LastSequenceNumber == -1 { return 0, azure.Checkpoint{}, nil } @@ -306,14 +306,6 @@ func (s *azureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Co func calculateUnprocessedEvents(partitionInfo *eventhub.HubPartitionRuntimeInformation, checkpoint azure.Checkpoint, stalePartitionInfoThreshold int64) int64 { unprocessedEventCount := int64(0) - // If checkpoint.Offset is empty that means no messages has been processed from an event hub partition - // And since partitionInfo.LastSequenceNumber = 0 for the very first message hence - // total unprocessed message will be partitionInfo.LastSequenceNumber + 1 - if checkpoint.Offset == "" { - unprocessedEventCount = partitionInfo.LastSequenceNumber + 1 - return unprocessedEventCount - } - if partitionInfo.LastSequenceNumber >= checkpoint.SequenceNumber { unprocessedEventCount = partitionInfo.LastSequenceNumber - checkpoint.SequenceNumber } else { diff --git a/pkg/scalers/azure_eventhub_scaler_test.go b/pkg/scalers/azure_eventhub_scaler_test.go index 2717ebaf84a..dd2d12a1a34 100644 --- a/pkg/scalers/azure_eventhub_scaler_test.go +++ b/pkg/scalers/azure_eventhub_scaler_test.go @@ -211,51 +211,51 @@ var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestDat var calculateUnprocessedEventsDataset = []calculateUnprocessedEventsTestData{ { - checkpoint: azure.NewCheckpoint("1", 5), - partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10, LastEnqueuedOffset: "2"}, + checkpoint: azure.NewCheckpoint(5), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10}, unprocessedEvents: 5, }, { - checkpoint: azure.NewCheckpoint("1002", 4611686018427387903), - partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "1000"}, + checkpoint: azure.NewCheckpoint(4611686018427387903), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905}, unprocessedEvents: 2, }, { - checkpoint: azure.NewCheckpoint("900", 4611686018427387900), - partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "1000"}, + checkpoint: azure.NewCheckpoint(4611686018427387900), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905}, unprocessedEvents: 5, }, { - checkpoint: azure.NewCheckpoint("800", 4000000000000200000), - partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4000000000000000000, LastEnqueuedOffset: "750"}, + checkpoint: azure.NewCheckpoint(4000000000000200000), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4000000000000000000}, unprocessedEvents: 9223372036854575807, }, // Empty checkpoint { - checkpoint: azure.NewCheckpoint("", 0), - partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 1, LastEnqueuedOffset: "1"}, - unprocessedEvents: 2, + checkpoint: azure.NewCheckpoint(0), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 1}, + unprocessedEvents: 1, }, // Stale PartitionInfo { - checkpoint: azure.NewCheckpoint("5", 15), - partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10, LastEnqueuedOffset: "2"}, + checkpoint: azure.NewCheckpoint(15), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10}, unprocessedEvents: 0, }, { - checkpoint: azure.NewCheckpoint("1000", 4611686018427387910), - partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "900"}, + checkpoint: azure.NewCheckpoint(4611686018427387910), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905}, unprocessedEvents: 0, }, { - checkpoint: azure.NewCheckpoint("1", 5), - partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 9223372036854775797, LastEnqueuedOffset: "10000"}, + checkpoint: azure.NewCheckpoint(5), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 9223372036854775797}, unprocessedEvents: 0, }, // Circular buffer reset { - checkpoint: azure.NewCheckpoint("100000", 9223372036854775797), - partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 5, LastEnqueuedOffset: "1"}, + checkpoint: azure.NewCheckpoint(9223372036854775797), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 5}, unprocessedEvents: 15, }, }