From 6bd92a1b06496e5d9d1b15f6e378da623f2b549a Mon Sep 17 00:00:00 2001 From: Leonardo D'Ippolito Date: Mon, 1 Jul 2024 08:35:24 +0100 Subject: [PATCH] Update GetAzureQueueLength in azure storage queue scaler to consider queueLengthStrategy Signed-off-by: Leonardo D'Ippolito --- CHANGELOG.md | 1 + pkg/scalers/azure/azure_queue.go | 34 ++++++++++++++++++++++++++- pkg/scalers/azure/azure_queue_test.go | 4 ++-- pkg/scalers/azure_queue_scaler.go | 14 +++++++++++ 4 files changed, 50 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c3c8ed1b4d7..42514d7540f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ Here is an overview of all new **experimental** features: ### Improvements +- **Azure queue scaler**: Added new configuration option 'queueLengthStrategy' ([#4478](https://github.com/kedacore/keda/issues/4478)) - **Cassandra Scaler**: Add TLS support for cassandra scaler ([#5802](https://github.com/kedacore/keda/issues/5802)) - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) - **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) diff --git a/pkg/scalers/azure/azure_queue.go b/pkg/scalers/azure/azure_queue.go index 39ddaa7eacc..80453888121 100644 --- a/pkg/scalers/azure/azure_queue.go +++ b/pkg/scalers/azure/azure_queue.go @@ -18,14 +18,21 @@ package azure import ( "context" + "strings" "github.com/Azure/azure-storage-queue-go/azqueue" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) +const ( + maxPeekMessages int32 = 32 + QueueLengthStrategyAll string = "all" + QueueLengthStrategyVisibleOnly string = "visibleonly" +) + // GetAzureQueueLength returns the length of a queue in int, see https://learn.microsoft.com/en-us/azure/storage/queues/storage-dotnet-how-to-use-queues?tabs=dotnet#get-the-queue-length -func GetAzureQueueLength(ctx context.Context, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, queueName, accountName, endpointSuffix string) (int64, error) { +func GetAzureQueueLength(ctx context.Context, podIdentity kedav1alpha1.AuthPodIdentity, connectionString, queueName, accountName, endpointSuffix, queueLengthStrategy string) (int64, error) { credential, endpoint, err := ParseAzureStorageQueueConnection(ctx, podIdentity, connectionString, accountName, endpointSuffix) if err != nil { return -1, err @@ -35,6 +42,21 @@ func GetAzureQueueLength(ctx context.Context, podIdentity kedav1alpha1.AuthPodId serviceURL := azqueue.NewServiceURL(*endpoint, p) queueURL := serviceURL.NewQueueURL(queueName) + strategy := strings.ToLower(queueLengthStrategy) + if strategy == QueueLengthStrategyVisibleOnly { + visibleMessageCount, err := getVisibleCount(ctx, &queueURL, maxPeekMessages) + if err != nil { + return -1, err + } + + // Queue has less messages than we allowed to peek for, + // so no need to fall back to the 'all' strategy + if visibleMessageCount < int64(maxPeekMessages) { + return visibleMessageCount, nil + } + } + + // Use the 'all' strategy (visible + invisible messages) props, err := queueURL.GetProperties(ctx) if err != nil { return -1, err @@ -42,3 +64,13 @@ func GetAzureQueueLength(ctx context.Context, podIdentity kedav1alpha1.AuthPodId return int64(props.ApproximateMessagesCount()), nil } + +func getVisibleCount(ctx context.Context, queueURL *azqueue.QueueURL, maxCount int32) (int64, error) { + messagesURL := queueURL.NewMessagesURL() + queue, err := messagesURL.Peek(ctx, maxCount) + if err != nil { + return 0, err + } + num := queue.NumMessages() + return int64(num), nil +} diff --git a/pkg/scalers/azure/azure_queue_test.go b/pkg/scalers/azure/azure_queue_test.go index eb47cfb0c59..d6730438290 100644 --- a/pkg/scalers/azure/azure_queue_test.go +++ b/pkg/scalers/azure/azure_queue_test.go @@ -10,7 +10,7 @@ import ( ) func TestGetQueueLength(t *testing.T) { - length, err := GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "", "queueName", "", "") + length, err := GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "", "queueName", "", "", "") if length != -1 { t.Error("Expected length to be -1, but got", length) } @@ -23,7 +23,7 @@ func TestGetQueueLength(t *testing.T) { t.Error("Expected error to contain parsing error message, but got", err.Error()) } - length, err = GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "queueName", "", "") + length, err = GetAzureQueueLength(context.TODO(), kedav1alpha1.AuthPodIdentity{}, "DefaultEndpointsProtocol=https;AccountName=name;AccountKey=key==;EndpointSuffix=core.windows.net", "queueName", "", "", "") if length != -1 { t.Error("Expected length to be -1, but got", length) diff --git a/pkg/scalers/azure_queue_scaler.go b/pkg/scalers/azure_queue_scaler.go index 8609d84a88a..6d997fe6efe 100644 --- a/pkg/scalers/azure_queue_scaler.go +++ b/pkg/scalers/azure_queue_scaler.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "strconv" + "strings" "github.com/go-logr/logr" v2 "k8s.io/api/autoscaling/v2" @@ -54,6 +55,7 @@ type azureQueueMetadata struct { connection string accountName string endpointSuffix string + queueLengthStrategy string triggerIndex int } @@ -120,6 +122,17 @@ func parseAzureQueueMetadata(config *scalersconfig.ScalerConfig, logger logr.Log return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("no queueName given") } + if val, ok := config.TriggerMetadata["queueLengthStrategy"]; ok && val != "" { + strategy := strings.ToLower(val) + if strategy == azure.QueueLengthStrategyAll || strategy == azure.QueueLengthStrategyVisibleOnly { + meta.queueLengthStrategy = strategy + } else { + return nil, kedav1alpha1.AuthPodIdentity{}, fmt.Errorf("invalid queueLengthStrategy %s given", val) + } + } else { + meta.queueLengthStrategy = azure.QueueLengthStrategyAll + } + // If the Use AAD Pod Identity is not present, or set to "none" // then check for connection string switch config.PodIdentity.Provider { @@ -179,6 +192,7 @@ func (s *azureQueueScaler) GetMetricsAndActivity(ctx context.Context, metricName s.metadata.queueName, s.metadata.accountName, s.metadata.endpointSuffix, + s.metadata.queueLengthStrategy, ) if err != nil {