From 2eddb15cc49f1860af7d7c55b5eeb6a62b4fecb3 Mon Sep 17 00:00:00 2001 From: swatimodi-scout Date: Mon, 3 Nov 2025 14:53:39 +0530 Subject: [PATCH 01/18] make the v2 creds provider be the default Signed-off-by: swatimodi-scout --- bindings/aws/kinesis/kinesis.go | 11 +++++++---- common/authentication/aws/client.go | 24 ++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index cce67bd3da..1cf481c141 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -49,10 +49,13 @@ type AWSKinesis struct { consumerARN *string logger logger.Logger consumerMode string - - closed atomic.Bool - closeCh chan struct{} - wg sync.WaitGroup + closed atomic.Bool + closeCh chan struct{} + wg sync.WaitGroup + // applicationName is required for KCL (Kinesis Client Library) worker configuration + // in shared throughput mode. It identifies the consumer application and is used + // for DynamoDB table naming and checkpointing. + applicationName string } // TODO: we need to clean up the metadata fields here and update this binding to use the builtin aws auth provider and reflect in metadata.yaml diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index b210e32944..b2634347c3 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -17,8 +17,9 @@ import ( "context" "errors" "sync" - - "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go-v2/aws" + awsv2config "github.com/aws/aws-sdk-go-v2/config" + v2creds "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" @@ -193,12 +194,31 @@ func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, consumer, mode s const sharedMode = "shared" if c.Kinesis != nil { if mode == sharedMode { +<<<<<<< HEAD if c.Credentials != nil { kclConfig := config.NewKinesisClientLibConfigWithCredential(consumer, stream, c.Region, consumer, c.Credentials) return kclConfig } +======= + // Try v2 default config first (standard approach for v2 components) + v2Config, err := awsv2config.LoadDefaultConfig(ctx, awsv2config.WithRegion(region)) + if err == nil { + kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Config.Credentials) + return kclConfig + } + // Fallback to v1 credentials if v2 fails + v1Creds, v1Err := c.Credentials.Get() + if v1Err != nil { + // Both v2 and v1 failed, return nil + return nil + } + // Convert v1 credentials to v2 format + v2Creds := v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) + kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds) + return kclConfig +>>>>>>> 9b9f2b98 (make the v2 creds provider be the default) } } From cabcec5a41072227893bf0fcaffb82a1fe7eb345 Mon Sep 17 00:00:00 2001 From: devendrapohekar-scout Date: Fri, 31 Oct 2025 12:20:19 +0530 Subject: [PATCH 02/18] feat: Upgrade kinesis github.com/vmware/vmware-go-kcl-v1 to v2. Signed-off-by: devendrapohekar-scout --- bindings/aws/kinesis/kinesis.go | 23 ++++++++++++++--------- bindings/aws/kinesis/kinesis_test.go | 18 ++++++++++-------- common/authentication/aws/client.go | 24 +++++++++++++++++++++--- common/authentication/aws/client_test.go | 2 +- go.mod | 7 ++++++- go.sum | 11 ++++++++--- 6 files changed, 60 insertions(+), 25 deletions(-) diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index 1cf481c141..992eb587d7 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -27,8 +27,8 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" "github.com/cenkalti/backoff/v4" "github.com/google/uuid" - "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces" - "github.com/vmware/vmware-go-kcl/clientlibrary/worker" + "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" + "github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker" "github.com/dapr/components-contrib/bindings" awsAuth "github.com/dapr/components-contrib/common/authentication/aws" @@ -68,6 +68,7 @@ type kinesisMetadata struct { SecretKey string `json:"secretKey" mapstructure:"secretKey"` SessionToken string `json:"sessionToken" mapstructure:"sessionToken"` KinesisConsumerMode string `json:"mode" mapstructure:"mode"` + ApplicationName string `json:"applicationName" mapstructure:"applicationName"` } const ( @@ -119,6 +120,7 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error a.consumerMode = m.KinesisConsumerMode a.streamName = m.StreamName a.consumerName = m.ConsumerName + a.applicationName = m.ApplicationName a.metadata = m opts := awsAuth.Options{ @@ -161,19 +163,21 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er return errors.New("binding is closed") } - if a.metadata.KinesisConsumerMode == SharedThroughput { + switch a.metadata.KinesisConsumerMode { + case SharedThroughput: + // initalize worker configuration + config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.metadata.Region, a.consumerMode, a.applicationName) // Configure the KCL worker with custom endpoints for LocalStack - config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.consumerName, a.consumerMode) if a.metadata.Endpoint != "" { - config.KinesisEndpoint = a.metadata.Endpoint - config.DynamoDBEndpoint = a.metadata.Endpoint + config.WithKinesisEndpoint(a.metadata.Endpoint) + config.WithDynamoDBEndpoint(a.metadata.Endpoint) } a.worker = worker.NewWorker(a.recordProcessorFactory(ctx, handler), config) err = a.worker.Start() if err != nil { return err } - } else if a.metadata.KinesisConsumerMode == ExtendedFanout { + case ExtendedFanout: var stream *kinesis.DescribeStreamOutput stream, err = a.authProvider.Kinesis().Kinesis.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName}) if err != nil { @@ -197,9 +201,10 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er case <-ctx.Done(): case <-a.closeCh: } - if a.metadata.KinesisConsumerMode == SharedThroughput { + switch a.metadata.KinesisConsumerMode { + case SharedThroughput: a.worker.Shutdown() - } else if a.metadata.KinesisConsumerMode == ExtendedFanout { + case ExtendedFanout: a.deregisterConsumer(ctx, stream, a.consumerARN) } }() diff --git a/bindings/aws/kinesis/kinesis_test.go b/bindings/aws/kinesis/kinesis_test.go index aaca0c3c0a..154e9baa57 100644 --- a/bindings/aws/kinesis/kinesis_test.go +++ b/bindings/aws/kinesis/kinesis_test.go @@ -25,14 +25,15 @@ import ( func TestParseMetadata(t *testing.T) { m := bindings.Metadata{} m.Properties = map[string]string{ - "accessKey": "key", - "region": "region", - "secretKey": "secret", - "consumerName": "test", - "streamName": "stream", - "mode": "extended", - "endpoint": "endpoint", - "sessionToken": "token", + "accessKey": "key", + "region": "region", + "secretKey": "secret", + "consumerName": "test", + "streamName": "stream", + "mode": "extended", + "endpoint": "endpoint", + "sessionToken": "token", + "applicationName": "applicationName", } kinesis := AWSKinesis{} meta, err := kinesis.parseMetadata(m) @@ -45,4 +46,5 @@ func TestParseMetadata(t *testing.T) { assert.Equal(t, "endpoint", meta.Endpoint) assert.Equal(t, "token", meta.SessionToken) assert.Equal(t, "extended", meta.KinesisConsumerMode) + assert.Equal(t, "applicationName", meta.ApplicationName) } diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index b2634347c3..557a154261 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -37,7 +37,7 @@ import ( "github.com/aws/aws-sdk-go/service/ssm" "github.com/aws/aws-sdk-go/service/ssm/ssmiface" "github.com/aws/aws-sdk-go/service/sts" - "github.com/vmware/vmware-go-kcl/clientlibrary/config" + "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config" ) type Clients struct { @@ -182,18 +182,28 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string stream, err := c.Kinesis.DescribeStreamWithContext(ctx, &kinesis.DescribeStreamInput{ StreamName: aws.String(streamName), }) +<<<<<<< HEAD if stream != nil { return stream.StreamDescription.StreamARN, err +======= + /** + * If the error is not nil, do not proceed to the next step + * as it may cause a nil pointer error on stream.StreamDescription.StreamARN. + */ + if err != nil { + return nil, err +>>>>>>> 623adfcb (feat: Upgrade kinesis github.com/vmware/vmware-go-kcl-v1 to v2.) } } return nil, errors.New("unable to get stream arn due to empty client") } -func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, consumer, mode string) *config.KinesisClientLibConfiguration { +func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration { const sharedMode = "shared" if c.Kinesis != nil { if mode == sharedMode { +<<<<<<< HEAD <<<<<<< HEAD if c.Credentials != nil { kclConfig := config.NewKinesisClientLibConfigWithCredential(consumer, @@ -219,9 +229,17 @@ func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, consumer, mode s kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds) return kclConfig >>>>>>> 9b9f2b98 (make the v2 creds provider be the default) +======= + v1Creds, err := c.Credentials.Get() + if err != nil { + return nil + } + v2Creds := v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) + kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds) + return kclConfig +>>>>>>> 623adfcb (feat: Upgrade kinesis github.com/vmware/vmware-go-kcl-v1 to v2.) } } - return nil } diff --git a/common/authentication/aws/client_test.go b/common/authentication/aws/client_test.go index 85e0392aae..70dccf9b25 100644 --- a/common/authentication/aws/client_test.go +++ b/common/authentication/aws/client_test.go @@ -251,7 +251,7 @@ func TestKinesisClients_WorkerCfg(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfg := tt.kinesisClient.WorkerCfg(t.Context(), tt.streamName, tt.consumer, tt.mode) + cfg := tt.kinesisClient.WorkerCfg(t.Context(), tt.streamName, tt.kinesisClient.Region, tt.mode, tt.consumer) if tt.expectedConfig == nil { assert.Equal(t, tt.expectedConfig, cfg) return diff --git a/go.mod b/go.mod index 0d24d1cb13..bfef7e0dec 100644 --- a/go.mod +++ b/go.mod @@ -150,6 +150,11 @@ require ( sigs.k8s.io/yaml v1.4.0 ) +require ( + github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 // indirect + github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 // indirect +) + require ( cel.dev/expr v0.23.0 // indirect cloud.google.com/go v0.120.0 // indirect @@ -203,7 +208,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/sso v1.25.5 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.3 // indirect github.com/aws/smithy-go v1.22.5 // indirect - github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.4.0 // indirect @@ -392,6 +396,7 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/vmware/vmware-go-kcl-v2 v1.0.0 github.com/x448/float16 v0.8.4 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect diff --git a/go.sum b/go.sum index 5a8bc06ef4..05ffe45f9f 100644 --- a/go.sum +++ b/go.sum @@ -268,12 +268,12 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:W github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a h1:QFemvMGPnajaeRBkFc1HoEA7qzVjUv+rkYb1/ps1/UE= github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a/go.mod h1:MVYeeOhILFFemC/XlYTClvBjYZrg/EPd3ts885KrNTI= -github.com/aws/aws-sdk-go v1.19.48/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0= github.com/aws/aws-sdk-go v1.55.6 h1:cSg4pvZ3m8dgYcgqB97MrcdjUmZ1BeMYKUxMMB89IPk= github.com/aws/aws-sdk-go v1.55.6/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= +github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= github.com/aws/aws-sdk-go-v2 v1.9.2/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= github.com/aws/aws-sdk-go-v2 v1.36.5 h1:0OF9RiEMEdDdZEMqF9MRjevyxAQcf6gY+E7vwBILFj0= github.com/aws/aws-sdk-go-v2 v1.36.5/go.mod h1:EYrzvCCN9CMUTa5+6lf6MM4tq3Zjp8UhSGR/cBsjai0= @@ -313,6 +313,9 @@ github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.17/go.mod github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.2/go.mod h1:72HRZDLMtmVQiLG2tLfQcaWLCssELvGl+Zf2WVxMmR8= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17 h1:t0E6FzREdtCsiLIoLCWsYliNsRBgyGD/MCK571qk4MI= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17/go.mod h1:ygpklyoaypuyDvOM5ujWGrYWpAK3h7ugnmKCU/76Ys4= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 h1:p8dOJ/UKXOwttc1Cxw1Ek52klVmMuiaCUkhsUGxce1I= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1/go.mod h1:VpH1IBG1YYZHPu5qShNt7EGaqUQbHAJZrbDtEpqDvvY= github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.32.4 h1:BN6+zko+qO9Tl9S0ywUPNvY0gvlFK4Zmj2Y0a8paFkk= github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.32.4/go.mod h1:hbMVfSdZneCht4UmPOsejDt93QnetQPFuLOOqbuybqs= github.com/aws/aws-sdk-go-v2/service/sns v1.34.7 h1:OBuZE9Wt8h2imuRktu+WfjiTGrnYdCIJg8IX92aalHE= @@ -332,8 +335,8 @@ github.com/aws/rolesanywhere-credential-helper v1.0.4/go.mod h1:QVGNxlDlYhjR0/ZU github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= -github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f h1:Pf0BjJDga7C98f0vhw+Ip5EaiE07S3lTKpIYPNS0nMo= -github.com/awslabs/kinesis-aggregation/go v0.0.0-20210630091500-54e17340d32f/go.mod h1:SghidfnxvX7ribW6nHI7T+IBbc9puZ9kk5Tx/88h8P4= +github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8= +github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -1714,6 +1717,8 @@ github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+ github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/vmware/vmware-go-kcl v1.5.1 h1:1rJLfAX4sDnCyatNoD/WJzVafkwST6u/cgY/Uf2VgHk= github.com/vmware/vmware-go-kcl v1.5.1/go.mod h1:kXJmQ6h0dRMRrp1uWU9XbIXvwelDpTxSPquvQUBdpbo= +github.com/vmware/vmware-go-kcl-v2 v1.0.0 h1:HPT5vu+khRmGspBSc/+AilEWbRGoTZhjlYqdrBbRMZs= +github.com/vmware/vmware-go-kcl-v2 v1.0.0/go.mod h1:GBDu+P4Neo0vwZAk0ZUCEC8GYsUOWvi3XhFwAZR3SjA= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= From 112fd8de87780a7eccc8cf7c1e3c3b860a151087 Mon Sep 17 00:00:00 2001 From: swatimodi-scout Date: Tue, 11 Nov 2025 16:04:51 +0530 Subject: [PATCH 03/18] Resolved conflicts Signed-off-by: swatimodi-scout --- common/authentication/aws/client.go | 26 ++------------------------ 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index 557a154261..2dcff7dd27 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -17,6 +17,7 @@ import ( "context" "errors" "sync" + "github.com/aws/aws-sdk-go-v2/aws" awsv2config "github.com/aws/aws-sdk-go-v2/config" v2creds "github.com/aws/aws-sdk-go-v2/credentials" @@ -182,18 +183,14 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string stream, err := c.Kinesis.DescribeStreamWithContext(ctx, &kinesis.DescribeStreamInput{ StreamName: aws.String(streamName), }) -<<<<<<< HEAD - if stream != nil { - return stream.StreamDescription.StreamARN, err -======= /** * If the error is not nil, do not proceed to the next step * as it may cause a nil pointer error on stream.StreamDescription.StreamARN. */ if err != nil { return nil, err ->>>>>>> 623adfcb (feat: Upgrade kinesis github.com/vmware/vmware-go-kcl-v1 to v2.) } + return stream.StreamDescription.StreamARN, err } return nil, errors.New("unable to get stream arn due to empty client") @@ -203,15 +200,6 @@ func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, region, mode, ap const sharedMode = "shared" if c.Kinesis != nil { if mode == sharedMode { -<<<<<<< HEAD -<<<<<<< HEAD - if c.Credentials != nil { - kclConfig := config.NewKinesisClientLibConfigWithCredential(consumer, - stream, c.Region, consumer, - c.Credentials) - return kclConfig - } -======= // Try v2 default config first (standard approach for v2 components) v2Config, err := awsv2config.LoadDefaultConfig(ctx, awsv2config.WithRegion(region)) if err == nil { @@ -228,16 +216,6 @@ func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, region, mode, ap v2Creds := v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds) return kclConfig ->>>>>>> 9b9f2b98 (make the v2 creds provider be the default) -======= - v1Creds, err := c.Credentials.Get() - if err != nil { - return nil - } - v2Creds := v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) - kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds) - return kclConfig ->>>>>>> 623adfcb (feat: Upgrade kinesis github.com/vmware/vmware-go-kcl-v1 to v2.) } } return nil From de8646f5bb4743986b054e575cb97c646f06310e Mon Sep 17 00:00:00 2001 From: swatimodi-scout <159426020+swatimodi-scout@users.noreply.github.com> Date: Tue, 11 Nov 2025 17:42:43 +0530 Subject: [PATCH 04/18] Update common/authentication/aws/client.go Co-authored-by: Albert Callarisa Signed-off-by: swatimodi-scout <159426020+swatimodi-scout@users.noreply.github.com> --- common/authentication/aws/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index 2dcff7dd27..cf27b98b7c 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -190,7 +190,7 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string if err != nil { return nil, err } - return stream.StreamDescription.StreamARN, err + return stream.StreamDescription.StreamARN, nil } return nil, errors.New("unable to get stream arn due to empty client") From 90e071c249c044549a05b6690749c1a712532f91 Mon Sep 17 00:00:00 2001 From: swatimodi-scout <159426020+swatimodi-scout@users.noreply.github.com> Date: Tue, 11 Nov 2025 18:33:22 +0530 Subject: [PATCH 05/18] Update bindings/aws/kinesis/kinesis.go Co-authored-by: Albert Callarisa Signed-off-by: swatimodi-scout <159426020+swatimodi-scout@users.noreply.github.com> --- bindings/aws/kinesis/kinesis.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index 992eb587d7..e4d4251fdc 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -169,8 +169,8 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.metadata.Region, a.consumerMode, a.applicationName) // Configure the KCL worker with custom endpoints for LocalStack if a.metadata.Endpoint != "" { - config.WithKinesisEndpoint(a.metadata.Endpoint) - config.WithDynamoDBEndpoint(a.metadata.Endpoint) + config = config.WithKinesisEndpoint(a.metadata.Endpoint) + config = config.WithDynamoDBEndpoint(a.metadata.Endpoint) } a.worker = worker.NewWorker(a.recordProcessorFactory(ctx, handler), config) err = a.worker.Start() From 648a318d2e4ed58d4063a98287becc590793c5f7 Mon Sep 17 00:00:00 2001 From: swatimodi-scout Date: Tue, 11 Nov 2025 18:44:56 +0530 Subject: [PATCH 06/18] Resolved comments given by acroca Signed-off-by: swatimodi-scout --- bindings/aws/kinesis/kinesis.go | 5 +---- common/authentication/aws/client.go | 28 ++++++---------------------- 2 files changed, 7 insertions(+), 26 deletions(-) diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index e4d4251fdc..ddf96fb1f2 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -49,13 +49,10 @@ type AWSKinesis struct { consumerARN *string logger logger.Logger consumerMode string + applicationName string closed atomic.Bool closeCh chan struct{} wg sync.WaitGroup - // applicationName is required for KCL (Kinesis Client Library) worker configuration - // in shared throughput mode. It identifies the consumer application and is used - // for DynamoDB table naming and checkpointing. - applicationName string } // TODO: we need to clean up the metadata fields here and update this binding to use the builtin aws auth provider and reflect in metadata.yaml diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index cf27b98b7c..61d74056f0 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -183,10 +183,6 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string stream, err := c.Kinesis.DescribeStreamWithContext(ctx, &kinesis.DescribeStreamInput{ StreamName: aws.String(streamName), }) - /** - * If the error is not nil, do not proceed to the next step - * as it may cause a nil pointer error on stream.StreamDescription.StreamARN. - */ if err != nil { return nil, err } @@ -198,25 +194,13 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration { const sharedMode = "shared" - if c.Kinesis != nil { - if mode == sharedMode { - // Try v2 default config first (standard approach for v2 components) - v2Config, err := awsv2config.LoadDefaultConfig(ctx, awsv2config.WithRegion(region)) - if err == nil { - kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Config.Credentials) - return kclConfig - } - // Fallback to v1 credentials if v2 fails - v1Creds, v1Err := c.Credentials.Get() - if v1Err != nil { - // Both v2 and v1 failed, return nil - return nil - } - // Convert v1 credentials to v2 format - v2Creds := v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) - kclConfig := config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds) - return kclConfig + if c.Kinesis != nil && mode == sharedMode { + v1Creds, err := c.Credentials.Get() + if err != nil { + return nil } + v2Creds := v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) + return config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds) } return nil } From b686d944f72865401fc1f77e6b7ab90941dddce7 Mon Sep 17 00:00:00 2001 From: swatimodi-scout Date: Thu, 13 Nov 2025 15:21:50 +0530 Subject: [PATCH 07/18] Removed v1 credentials for kinesis Signed-off-by: swatimodi-scout --- common/authentication/aws/client.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index 61d74056f0..631b6a042e 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -20,7 +20,6 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" awsv2config "github.com/aws/aws-sdk-go-v2/config" - v2creds "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" @@ -194,13 +193,12 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration { const sharedMode = "shared" - if c.Kinesis != nil && mode == sharedMode { - v1Creds, err := c.Credentials.Get() + if mode == sharedMode { + v2Config, err := awsv2config.LoadDefaultConfig(ctx, awsv2config.WithRegion(region)) if err != nil { return nil } - v2Creds := v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) - return config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Creds) + return config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Config.Credentials) } return nil } From 9a7008245e9e79f235a654e4093703d4910b4242 Mon Sep 17 00:00:00 2001 From: swatimodi-scout Date: Thu, 13 Nov 2025 16:46:41 +0530 Subject: [PATCH 08/18] updated client.go Signed-off-by: swatimodi-scout Signed-off-by: rideshnath-scout --- common/authentication/aws/client.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index 631b6a042e..0f8f991e87 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -20,6 +20,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" awsv2config "github.com/aws/aws-sdk-go-v2/config" + v2creds "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" @@ -117,9 +118,10 @@ type ParameterStoreClients struct { } type KinesisClients struct { - Kinesis kinesisiface.KinesisAPI - Region string - Credentials *credentials.Credentials + Kinesis kinesisiface.KinesisAPI + Region string + Credentials *credentials.Credentials + V2Credentials aws.CredentialsProvider } type SesClients struct { @@ -175,6 +177,10 @@ func (c *KinesisClients) New(session *session.Session) { c.Kinesis = kinesis.New(session, session.Config) c.Region = *session.Config.Region c.Credentials = session.Config.Credentials + // Convert v1 credentials to v2 for KCL usage + if v1Creds, err := session.Config.Credentials.Get(); err == nil { + c.V2Credentials = v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) + } } func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string, error) { @@ -194,6 +200,11 @@ func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration { const sharedMode = "shared" if mode == sharedMode { + // Use converted v2 credentials if available + if c.V2Credentials != nil { + return config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", c.V2Credentials) + } + // Fallback to default v2 config if conversion failed v2Config, err := awsv2config.LoadDefaultConfig(ctx, awsv2config.WithRegion(region)) if err != nil { return nil From 9dafc6dcf633b0bcd9e83dd3e1ee416085217822 Mon Sep 17 00:00:00 2001 From: rideshnath-scout Date: Thu, 13 Nov 2025 19:25:54 +0530 Subject: [PATCH 09/18] Update AWS SDK to v2 and refactor Kinesis integration Signed-off-by: rideshnath-scout --- bindings/aws/kinesis/kinesis.go | 82 +++---- common/authentication/aws/client.go | 18 +- common/authentication/aws/client_test.go | 283 +++++++++++++++-------- go.mod | 15 +- go.sum | 18 +- 5 files changed, 254 insertions(+), 162 deletions(-) diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index ddf96fb1f2..6ae5a74db2 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -22,9 +22,8 @@ import ( "sync/atomic" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/request" - "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/cenkalti/backoff/v4" "github.com/google/uuid" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" @@ -44,11 +43,11 @@ type AWSKinesis struct { worker *worker.Worker - streamName string - consumerName string - consumerARN *string - logger logger.Logger - consumerMode string + streamName string + consumerName string + consumerARN *string + logger logger.Logger + consumerMode string applicationName string closed atomic.Bool closeCh chan struct{} @@ -146,7 +145,7 @@ func (a *AWSKinesis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (* if partitionKey == "" { partitionKey = uuid.New().String() } - _, err := a.authProvider.Kinesis().Kinesis.PutRecordWithContext(ctx, &kinesis.PutRecordInput{ + _, err := a.authProvider.Kinesis().Kinesis.PutRecord(ctx, &kinesis.PutRecordInput{ StreamName: &a.metadata.StreamName, Data: req.Data, PartitionKey: &partitionKey, @@ -176,7 +175,7 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er } case ExtendedFanout: var stream *kinesis.DescribeStreamOutput - stream, err = a.authProvider.Kinesis().Kinesis.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName}) + stream, err = a.authProvider.Kinesis().Kinesis.DescribeStream(ctx, &kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName}) if err != nil { return err } @@ -210,7 +209,7 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er } // Subscribe to all shards. -func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDescription, handler bindings.Handler) error { +func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc types.StreamDescription, handler bindings.Handler) error { consumerARN, err := a.ensureConsumer(ctx, streamDesc.StreamARN) if err != nil { a.logger.Error(err) @@ -221,7 +220,7 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDes a.wg.Add(len(streamDesc.Shards)) for i, shard := range streamDesc.Shards { - go func(idx int, s *kinesis.Shard) { + go func(idx int, s types.Shard) { defer a.wg.Done() // Reconnection backoff @@ -237,14 +236,14 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDes return default: } - sub, err := a.authProvider.Kinesis().Kinesis.SubscribeToShardWithContext(ctx, &kinesis.SubscribeToShardInput{ + sub, err := a.authProvider.Kinesis().Kinesis.SubscribeToShard(ctx, &kinesis.SubscribeToShardInput{ ConsumerARN: consumerARN, ShardId: s.ShardId, - StartingPosition: &kinesis.StartingPosition{Type: aws.String(kinesis.ShardIteratorTypeLatest)}, + StartingPosition: &types.StartingPosition{Type: types.ShardIteratorTypeLatest}, }) if err != nil { wait := bo.NextBackOff() - a.logger.Errorf("Error while reading from shard %v: %v. Attempting to reconnect in %s...", s.ShardId, err, wait) + a.logger.Errorf("Error while reading from shard %v: %v. Attempting to reconnect in %s...", *s.ShardId, err, wait) select { case <-ctx.Done(): return @@ -257,10 +256,10 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDes bo.Reset() // Process events - for event := range sub.EventStream.Events() { + for event := range sub.GetStream().Events() { switch e := event.(type) { - case *kinesis.SubscribeToShardEvent: - for _, rec := range e.Records { + case *types.SubscribeToShardEventStreamMemberSubscribeToShardEvent: + for _, rec := range e.Value.Records { handler(ctx, &bindings.ReadResponse{ Data: rec.Data, }) @@ -289,7 +288,7 @@ func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*st // Only set timeout on consumer call. conCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - consumer, err := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumerWithContext(conCtx, &kinesis.DescribeStreamConsumerInput{ + consumer, err := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumer(conCtx, &kinesis.DescribeStreamConsumerInput{ ConsumerName: &a.metadata.ConsumerName, StreamARN: streamARN, }) @@ -301,7 +300,7 @@ func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*st } func (a *AWSKinesis) registerConsumer(ctx context.Context, streamARN *string) (*string, error) { - consumer, err := a.authProvider.Kinesis().Kinesis.RegisterStreamConsumerWithContext(ctx, &kinesis.RegisterStreamConsumerInput{ + consumer, err := a.authProvider.Kinesis().Kinesis.RegisterStreamConsumer(ctx, &kinesis.RegisterStreamConsumerInput{ ConsumerName: &a.metadata.ConsumerName, StreamARN: streamARN, }) @@ -324,7 +323,7 @@ func (a *AWSKinesis) deregisterConsumer(ctx context.Context, streamARN *string, if a.consumerARN != nil { // Use a background context because the running context may have been canceled already ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - _, err := a.authProvider.Kinesis().Kinesis.DeregisterStreamConsumerWithContext(ctx, &kinesis.DeregisterStreamConsumerInput{ + _, err := a.authProvider.Kinesis().Kinesis.DeregisterStreamConsumer(ctx, &kinesis.DeregisterStreamConsumerInput{ ConsumerARN: consumerARN, StreamARN: streamARN, ConsumerName: &a.metadata.ConsumerName, @@ -337,34 +336,19 @@ func (a *AWSKinesis) deregisterConsumer(ctx context.Context, streamARN *string, return nil } -func (a *AWSKinesis) waitUntilConsumerExists(ctx aws.Context, input *kinesis.DescribeStreamConsumerInput, opts ...request.WaiterOption) error { - w := request.Waiter{ - Name: "WaitUntilConsumerExists", - MaxAttempts: 18, - Delay: request.ConstantWaiterDelay(10 * time.Second), - Acceptors: []request.WaiterAcceptor{ - { - State: request.SuccessWaiterState, - Matcher: request.PathWaiterMatch, Argument: "ConsumerDescription.ConsumerStatus", - Expected: "ACTIVE", - }, - }, - NewRequest: func(opts []request.Option) (*request.Request, error) { - var inCpy *kinesis.DescribeStreamConsumerInput - if input != nil { - tmp := *input - inCpy = &tmp - } - req, _ := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumerRequest(inCpy) - req.SetContext(ctx) - req.ApplyOptions(opts...) - - return req, nil - }, +func (a *AWSKinesis) waitUntilConsumerExists(ctx context.Context, input *kinesis.DescribeStreamConsumerInput) error { + // Poll until consumer is active + for i := 0; i < 18; i++ { + consumer, err := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumer(ctx, input) + if err != nil { + return err + } + if consumer.ConsumerDescription.ConsumerStatus == types.ConsumerStatusActive { + return nil + } + time.Sleep(10 * time.Second) } - w.ApplyOptions(opts...) - - return w.WaitWithContext(ctx) + return fmt.Errorf("consumer did not become active within timeout") } func (a *AWSKinesis) parseMetadata(meta bindings.Metadata) (*kinesisMetadata, error) { @@ -393,7 +377,7 @@ func (r *recordProcessorFactory) CreateProcessor() interfaces.IRecordProcessor { } func (p *recordProcessor) Initialize(input *interfaces.InitializationInput) { - p.logger.Infof("Processing ShardId: %v at checkpoint: %v", input.ShardId, aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber)) + p.logger.Infof("Processing ShardId: %v at checkpoint: %v", input.ShardId, *input.ExtendedSequenceNumber.SequenceNumber) } func (p *recordProcessor) ProcessRecords(input *interfaces.ProcessRecordsInput) { diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index 0f8f991e87..b8307409da 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -21,12 +21,11 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" awsv2config "github.com/aws/aws-sdk-go-v2/config" v2creds "github.com/aws/aws-sdk-go-v2/credentials" + kinesisv2 "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" - "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/secretsmanager" @@ -118,7 +117,7 @@ type ParameterStoreClients struct { } type KinesisClients struct { - Kinesis kinesisiface.KinesisAPI + Kinesis *kinesisv2.Client Region string Credentials *credentials.Credentials V2Credentials aws.CredentialsProvider @@ -174,19 +173,24 @@ func (c *ParameterStoreClients) New(session *session.Session) { } func (c *KinesisClients) New(session *session.Session) { - c.Kinesis = kinesis.New(session, session.Config) c.Region = *session.Config.Region c.Credentials = session.Config.Credentials - // Convert v1 credentials to v2 for KCL usage + // Convert v1 credentials to v2 for both Kinesis client and KCL usage if v1Creds, err := session.Config.Credentials.Get(); err == nil { c.V2Credentials = v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) + // Create v2 config and Kinesis client + v2Config := aws.Config{ + Region: c.Region, + Credentials: c.V2Credentials, + } + c.Kinesis = kinesisv2.NewFromConfig(v2Config) } } func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string, error) { if c.Kinesis != nil { - stream, err := c.Kinesis.DescribeStreamWithContext(ctx, &kinesis.DescribeStreamInput{ - StreamName: aws.String(streamName), + stream, err := c.Kinesis.DescribeStream(ctx, &kinesisv2.DescribeStreamInput{ + StreamName: &streamName, }) if err != nil { return nil, err diff --git a/common/authentication/aws/client_test.go b/common/authentication/aws/client_test.go index 70dccf9b25..a45fa566b1 100644 --- a/common/authentication/aws/client_test.go +++ b/common/authentication/aws/client_test.go @@ -18,17 +18,16 @@ import ( "errors" "testing" - "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" "github.com/aws/aws-sdk-go/service/sqs" "github.com/aws/aws-sdk-go/service/sqs/sqsiface" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/vmware/vmware-go-kcl/clientlibrary/config" ) type mockedSQS struct { @@ -40,13 +39,31 @@ func (m *mockedSQS) GetQueueUrlWithContext(ctx context.Context, input *sqs.GetQu return m.GetQueueURLFn(ctx, input) } -type mockedKinesis struct { - kinesisiface.KinesisAPI - DescribeStreamFn func(ctx context.Context, input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) +type mockedKinesisV2 struct { + DescribeStreamFn func(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) } -func (m *mockedKinesis) DescribeStreamWithContext(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...request.Option) (*kinesis.DescribeStreamOutput, error) { - return m.DescribeStreamFn(ctx, input) +func (m *mockedKinesisV2) DescribeStream(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) { + return m.DescribeStreamFn(ctx, input, opts...) +} + +// testKinesisClients wraps KinesisClients for testing with mock +type testKinesisClients struct { + *KinesisClients + mockKinesis *mockedKinesisV2 +} + +func (t *testKinesisClients) Stream(ctx context.Context, streamName string) (*string, error) { + if t.mockKinesis != nil { + stream, err := t.mockKinesis.DescribeStream(ctx, &kinesis.DescribeStreamInput{ + StreamName: &streamName, + }) + if err != nil { + return nil, err + } + return stream.StreamDescription.StreamARN, nil + } + return nil, errors.New("unable to get stream arn due to empty client") } func TestS3Clients_New(t *testing.T) { @@ -130,51 +147,144 @@ func TestSqsClients_QueueURL(t *testing.T) { func TestKinesisClients_Stream(t *testing.T) { tests := []struct { name string - kinesisClient *KinesisClients + kinesisClient *testKinesisClients streamName string - mockStreamARN *string - mockError error expectedStream *string - expectedErr error + expectedErr string }{ { name: "successfully retrieves stream ARN", - kinesisClient: &KinesisClients{ - Kinesis: &mockedKinesis{DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) { - return &kinesis.DescribeStreamOutput{ - StreamDescription: &kinesis.StreamDescription{ - StreamARN: aws.String("arn:aws:kinesis:some-region:123456789012:stream/some-stream"), - }, - }, nil - }}, - Region: "us-west-1", - Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + kinesisClient: &testKinesisClients{ + KinesisClients: &KinesisClients{ + Region: "us-west-2", + Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + }, + mockKinesis: &mockedKinesisV2{ + DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) { + streamARN := "arn:aws:kinesis:us-west-2:123456789012:stream/test-stream" + streamName := "test-stream" + return &kinesis.DescribeStreamOutput{ + StreamDescription: &types.StreamDescription{ + StreamARN: &streamARN, + StreamName: &streamName, + StreamStatus: types.StreamStatusActive, + }, + }, nil + }, + }, }, - streamName: "some-stream", - expectedStream: aws.String("arn:aws:kinesis:some-region:123456789012:stream/some-stream"), - expectedErr: nil, + streamName: "test-stream", + expectedStream: aws.String("arn:aws:kinesis:us-west-2:123456789012:stream/test-stream"), + expectedErr: "", }, { name: "returns error when stream not found", - kinesisClient: &KinesisClients{ - Kinesis: &mockedKinesis{DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) { - return nil, errors.New("stream not found") - }}, - Region: "us-west-1", - Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + kinesisClient: &testKinesisClients{ + KinesisClients: &KinesisClients{ + Region: "us-west-2", + Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + }, + mockKinesis: &mockedKinesisV2{ + DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) { + return nil, errors.New("ResourceNotFoundException: Stream nonexistent-stream under account 123456789012 not found") + }, + }, }, streamName: "nonexistent-stream", expectedStream: nil, - expectedErr: errors.New("unable to get stream arn due to empty client"), + expectedErr: "ResourceNotFoundException: Stream nonexistent-stream under account 123456789012 not found", + }, + { + name: "returns error when client is nil", + kinesisClient: &testKinesisClients{ + KinesisClients: &KinesisClients{ + Region: "us-west-2", + Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + }, + mockKinesis: nil, + }, + streamName: "test-stream", + expectedStream: nil, + expectedErr: "unable to get stream arn due to empty client", + }, + { + name: "handles stream with special characters in name", + kinesisClient: &testKinesisClients{ + KinesisClients: &KinesisClients{ + Region: "eu-central-1", + Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + }, + mockKinesis: &mockedKinesisV2{ + DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) { + streamARN := "arn:aws:kinesis:eu-central-1:123456789012:stream/my-test_stream.123" + streamName := "my-test_stream.123" + return &kinesis.DescribeStreamOutput{ + StreamDescription: &types.StreamDescription{ + StreamARN: &streamARN, + StreamName: &streamName, + StreamStatus: types.StreamStatusActive, + }, + }, nil + }, + }, + }, + streamName: "my-test_stream.123", + expectedStream: aws.String("arn:aws:kinesis:eu-central-1:123456789012:stream/my-test_stream.123"), + expectedErr: "", + }, + { + name: "handles empty stream name", + kinesisClient: &testKinesisClients{ + KinesisClients: &KinesisClients{ + Region: "us-east-1", + Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + }, + mockKinesis: &mockedKinesisV2{ + DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) { + return nil, errors.New("ValidationException: Stream name cannot be empty") + }, + }, + }, + streamName: "", + expectedStream: nil, + expectedErr: "ValidationException: Stream name cannot be empty", + }, + { + name: "handles stream in creating state", + kinesisClient: &testKinesisClients{ + KinesisClients: &KinesisClients{ + Region: "ap-southeast-1", + Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + }, + mockKinesis: &mockedKinesisV2{ + DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) { + streamARN := "arn:aws:kinesis:ap-southeast-1:123456789012:stream/creating-stream" + streamName := "creating-stream" + return &kinesis.DescribeStreamOutput{ + StreamDescription: &types.StreamDescription{ + StreamARN: &streamARN, + StreamName: &streamName, + StreamStatus: types.StreamStatusCreating, + }, + }, nil + }, + }, + }, + streamName: "creating-stream", + expectedStream: aws.String("arn:aws:kinesis:ap-southeast-1:123456789012:stream/creating-stream"), + expectedErr: "", }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := tt.kinesisClient.Stream(t.Context(), tt.streamName) - if tt.expectedErr != nil { + ctx := context.Background() + got, err := tt.kinesisClient.Stream(ctx, tt.streamName) + + if tt.expectedErr != "" { require.Error(t, err) - assert.Equal(t, tt.expectedErr.Error(), err.Error()) + assert.Equal(t, tt.expectedErr, err.Error()) + assert.Nil(t, got) } else { require.NoError(t, err) assert.Equal(t, tt.expectedStream, got) @@ -184,82 +294,71 @@ func TestKinesisClients_Stream(t *testing.T) { } func TestKinesisClients_WorkerCfg(t *testing.T) { - testCreds := credentials.NewStaticCredentials("accessKey", "secretKey", "") tests := []struct { - name string - kinesisClient *KinesisClients - streamName string - consumer string - mode string - expectedConfig *config.KinesisClientLibConfiguration + name string + kinesisClient *KinesisClients + streamName string + applicationName string + mode string + expectNil bool }{ { - name: "successfully creates shared mode worker config", + name: "successfully creates shared mode worker config with v2 credentials", kinesisClient: &KinesisClients{ - Kinesis: &mockedKinesis{ - DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) { - return &kinesis.DescribeStreamOutput{ - StreamDescription: &kinesis.StreamDescription{ - StreamARN: aws.String("arn:aws:kinesis:us-east-1:123456789012:stream/existing-stream"), - }, - }, nil - }, - }, - Region: "us-west-1", - Credentials: testCreds, + Region: "us-west-2", + Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + V2Credentials: aws.NewCredentialsCache(aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) { + return aws.Credentials{ + AccessKeyID: "accessKey", + SecretAccessKey: "secretKey", + }, nil + })), }, - streamName: "existing-stream", - consumer: "consumer1", - mode: "shared", - expectedConfig: config.NewKinesisClientLibConfigWithCredential( - "consumer1", "existing-stream", "us-west-1", "consumer1", testCreds, - ), + streamName: "test-stream", + applicationName: "test-app", + mode: "shared", + expectNil: false, }, { name: "returns nil when mode is not shared", kinesisClient: &KinesisClients{ - Kinesis: &mockedKinesis{ - DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) { - return &kinesis.DescribeStreamOutput{ - StreamDescription: &kinesis.StreamDescription{ - StreamARN: aws.String("arn:aws:kinesis:us-east-1:123456789012:stream/existing-stream"), - }, - }, nil - }, - }, - Region: "us-west-1", - Credentials: testCreds, + Region: "us-west-2", + Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), }, - streamName: "existing-stream", - consumer: "consumer1", - mode: "exclusive", - expectedConfig: nil, + streamName: "test-stream", + applicationName: "test-app", + mode: "extended", + expectNil: true, }, { - name: "returns nil when client is nil", + name: "falls back to default config when v2 credentials are nil", kinesisClient: &KinesisClients{ - Kinesis: nil, - Region: "us-west-1", - Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + Region: "us-east-1", + Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + V2Credentials: nil, }, - streamName: "existing-stream", - consumer: "consumer1", - mode: "shared", - expectedConfig: nil, + streamName: "fallback-stream", + applicationName: "fallback-app", + mode: "shared", + expectNil: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cfg := tt.kinesisClient.WorkerCfg(t.Context(), tt.streamName, tt.kinesisClient.Region, tt.mode, tt.consumer) - if tt.expectedConfig == nil { - assert.Equal(t, tt.expectedConfig, cfg) - return + ctx := context.Background() + cfg := tt.kinesisClient.WorkerCfg(ctx, tt.streamName, tt.kinesisClient.Region, tt.mode, tt.applicationName) + + if tt.expectNil { + assert.Nil(t, cfg) + } else { + assert.NotNil(t, cfg) + if cfg != nil { + assert.Equal(t, tt.streamName, cfg.StreamName) + assert.Equal(t, tt.applicationName, cfg.ApplicationName) + assert.Equal(t, tt.kinesisClient.Region, cfg.RegionName) + } } - assert.Equal(t, tt.expectedConfig.StreamName, cfg.StreamName) - assert.Equal(t, tt.expectedConfig.EnhancedFanOutConsumerName, cfg.EnhancedFanOutConsumerName) - assert.Equal(t, tt.expectedConfig.EnableEnhancedFanOutConsumer, cfg.EnableEnhancedFanOutConsumer) - assert.Equal(t, tt.expectedConfig.RegionName, cfg.RegionName) }) } } diff --git a/go.mod b/go.mod index bfef7e0dec..655ee5406e 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/apache/thrift v0.13.0 github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a github.com/aws/aws-sdk-go v1.55.6 - github.com/aws/aws-sdk-go-v2 v1.36.5 + github.com/aws/aws-sdk-go-v2 v1.39.6 github.com/aws/aws-sdk-go-v2/config v1.29.17 github.com/aws/aws-sdk-go-v2/credentials v1.17.70 github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.19.3 @@ -124,7 +124,7 @@ require ( github.com/tetratelabs/wazero v1.7.0 github.com/tmc/langchaingo v0.1.13 github.com/valyala/fasthttp v1.53.0 - github.com/vmware/vmware-go-kcl v1.5.1 + github.com/vmware/vmware-go-kcl-v2 v1.0.0 github.com/xdg-go/scram v1.1.2 go.etcd.io/etcd/client/v3 v3.5.21 go.mongodb.org/mongo-driver v1.14.0 @@ -151,7 +151,7 @@ require ( ) require ( - github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 // indirect + github.com/aws/aws-sdk-go-v2/service/kinesis v1.42.3 github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 // indirect ) @@ -196,10 +196,10 @@ require ( github.com/ardielle/ardielle-go v1.5.2 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect - github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 // indirect + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.32 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.25.6 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.4 // indirect @@ -207,7 +207,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.25.5 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.3 // indirect - github.com/aws/smithy-go v1.22.5 // indirect + github.com/aws/smithy-go v1.23.2 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.4.0 // indirect @@ -396,7 +396,6 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/vmware/vmware-go-kcl-v2 v1.0.0 github.com/x448/float16 v0.8.4 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect diff --git a/go.sum b/go.sum index 05ffe45f9f..dd34b46f48 100644 --- a/go.sum +++ b/go.sum @@ -275,10 +275,12 @@ github.com/aws/aws-sdk-go v1.55.6/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQ github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= github.com/aws/aws-sdk-go-v2 v1.9.2/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= -github.com/aws/aws-sdk-go-v2 v1.36.5 h1:0OF9RiEMEdDdZEMqF9MRjevyxAQcf6gY+E7vwBILFj0= -github.com/aws/aws-sdk-go-v2 v1.36.5/go.mod h1:EYrzvCCN9CMUTa5+6lf6MM4tq3Zjp8UhSGR/cBsjai0= +github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk= +github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 h1:12SpdwU8Djs+YGklkinSSlcrPyj3H4VifVsKf78KbwA= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11/go.mod h1:dd+Lkp6YmMryke+qxW/VnKyhMBDTYP41Q2Bb+6gNZgY= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3/go.mod h1:xdCzcZEtnSTKVDOmUZs4l/j3pSV6rpo1WXl5ugNsL8Y= github.com/aws/aws-sdk-go-v2/config v1.8.3/go.mod h1:4AEiLtAb8kLs7vgw2ZV3p2VZ1+hBavOc84hqxVNpCyw= github.com/aws/aws-sdk-go-v2/config v1.29.17 h1:jSuiQ5jEe4SAMH6lLRMY9OVC+TqJLP5655pBGjmnjr0= github.com/aws/aws-sdk-go-v2/config v1.29.17/go.mod h1:9P4wwACpbeXs9Pm9w1QTh6BwWwJjwYvJ1iCt5QbCXh8= @@ -294,8 +296,12 @@ github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.3.10 h1:z6fAXB4HSuYjrE/P8RU3NdC github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.3.10/go.mod h1:PoPjOi7j+/DtKIGC58HRfcdWKBPYYXwdKnRG+po+hzo= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36 h1:SsytQyTMHMDPspp+spo7XwXTP44aJZZAC7fBV2C5+5s= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36/go.mod h1:Q1lnJArKRXkenyog6+Y+zr7WDpk4e6XlR6gs20bbeNo= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13/go.mod h1:oGnKwIYZ4XttyU2JWxFrwvhF6YKiK/9/wmE3v3Iu9K8= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36 h1:i2vNHQiXUvKhs3quBR6aqlgJaiaexz/aNvdCktW/kAM= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36/go.mod h1:UdyGa7Q91id/sdyHPwth+043HhmP6yP9MBHgbZM0xo8= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 h1:HBSI2kDkMdWz4ZM7FjwE7e/pWDEZ+nR95x8Ztet1ooY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13/go.mod h1:YE94ZoDArI7awZqJzBAZ3PDD2zSfuP7w6P2knOzIn8M= github.com/aws/aws-sdk-go-v2/internal/ini v1.2.4/go.mod h1:ZcBrrI3zBKlhGFNYWvju0I3TR93I7YIgAfy82Fh4lcQ= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= @@ -316,6 +322,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17/go.mod h1:y github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI= github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 h1:p8dOJ/UKXOwttc1Cxw1Ek52klVmMuiaCUkhsUGxce1I= github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1/go.mod h1:VpH1IBG1YYZHPu5qShNt7EGaqUQbHAJZrbDtEpqDvvY= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.42.3 h1:A2HNxrABEFha5831yAU05G0mYNxaxYH4WG85FV6ZWIQ= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.42.3/go.mod h1:jTDNZao/9uv/6JeaeDWEqA4s+l6c8+cqaDeYFpM+818= github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.32.4 h1:BN6+zko+qO9Tl9S0ywUPNvY0gvlFK4Zmj2Y0a8paFkk= github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.32.4/go.mod h1:hbMVfSdZneCht4UmPOsejDt93QnetQPFuLOOqbuybqs= github.com/aws/aws-sdk-go-v2/service/sns v1.34.7 h1:OBuZE9Wt8h2imuRktu+WfjiTGrnYdCIJg8IX92aalHE= @@ -333,8 +341,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.34.0/go.mod h1:7ph2tGpfQvwzgistp2+zg github.com/aws/rolesanywhere-credential-helper v1.0.4 h1:kHIVVdyQQiFZoKBP+zywBdFilGCS8It+UvW5LolKbW8= github.com/aws/rolesanywhere-credential-helper v1.0.4/go.mod h1:QVGNxlDlYhjR0/ZUee7uGl0hNChWidNpe2+GD87Buqk= github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= -github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= -github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= +github.com/aws/smithy-go v1.23.2 h1:Crv0eatJUQhaManss33hS5r40CG3ZFH+21XSkqMrIUM= +github.com/aws/smithy-go v1.23.2/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8= github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc= github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= @@ -1715,8 +1723,6 @@ github.com/valyala/fasthttp v1.53.0 h1:lW/+SUkOxCx2vlIu0iaImv4JLrVRnbbkpCoaawvA4 github.com/valyala/fasthttp v1.53.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= -github.com/vmware/vmware-go-kcl v1.5.1 h1:1rJLfAX4sDnCyatNoD/WJzVafkwST6u/cgY/Uf2VgHk= -github.com/vmware/vmware-go-kcl v1.5.1/go.mod h1:kXJmQ6h0dRMRrp1uWU9XbIXvwelDpTxSPquvQUBdpbo= github.com/vmware/vmware-go-kcl-v2 v1.0.0 h1:HPT5vu+khRmGspBSc/+AilEWbRGoTZhjlYqdrBbRMZs= github.com/vmware/vmware-go-kcl-v2 v1.0.0/go.mod h1:GBDu+P4Neo0vwZAk0ZUCEC8GYsUOWvi3XhFwAZR3SjA= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= From 9b003aa134a629daf0a6c6c66900557ddc8b8ca9 Mon Sep 17 00:00:00 2001 From: rideshnath-scout Date: Tue, 18 Nov 2025 15:22:46 +0530 Subject: [PATCH 10/18] refactor: migrate Kinesis integration to AWS SDK v2 and update related tests Signed-off-by: rideshnath-scout --- bindings/aws/kinesis/kinesis.go | 72 ++++++- bindings/aws/kinesis/kinesis_test.go | 100 +++++++++ common/authentication/aws/client.go | 53 +---- common/authentication/aws/client_test.go | 247 +---------------------- go.sum | 8 - 5 files changed, 166 insertions(+), 314 deletions(-) diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index 6ae5a74db2..0a3b38eeaf 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -22,10 +22,14 @@ import ( "sync/atomic" "time" + "github.com/aws/aws-sdk-go-v2/aws" + awsv2config "github.com/aws/aws-sdk-go-v2/config" + v2creds "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/cenkalti/backoff/v4" "github.com/google/uuid" + "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker" @@ -41,7 +45,9 @@ type AWSKinesis struct { authProvider awsAuth.Provider metadata *kinesisMetadata - worker *worker.Worker + worker *worker.Worker + kinesisClient *kinesis.Client + v2Credentials aws.CredentialsProvider streamName string consumerName string @@ -133,6 +139,12 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error return err } a.authProvider = provider + + // Create AWS SDK v2 client + if err := a.createKinesisClient(ctx); err != nil { + return err + } + return nil } @@ -145,7 +157,7 @@ func (a *AWSKinesis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (* if partitionKey == "" { partitionKey = uuid.New().String() } - _, err := a.authProvider.Kinesis().Kinesis.PutRecord(ctx, &kinesis.PutRecordInput{ + _, err := a.kinesisClient.PutRecord(ctx, &kinesis.PutRecordInput{ StreamName: &a.metadata.StreamName, Data: req.Data, PartitionKey: &partitionKey, @@ -162,7 +174,7 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er switch a.metadata.KinesisConsumerMode { case SharedThroughput: // initalize worker configuration - config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.metadata.Region, a.consumerMode, a.applicationName) + config := a.workerCfg(ctx, a.streamName, a.metadata.Region, a.consumerMode, a.applicationName) // Configure the KCL worker with custom endpoints for LocalStack if a.metadata.Endpoint != "" { config = config.WithKinesisEndpoint(a.metadata.Endpoint) @@ -175,7 +187,7 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er } case ExtendedFanout: var stream *kinesis.DescribeStreamOutput - stream, err = a.authProvider.Kinesis().Kinesis.DescribeStream(ctx, &kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName}) + stream, err = a.kinesisClient.DescribeStream(ctx, &kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName}) if err != nil { return err } @@ -185,7 +197,7 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er } } - stream, err := a.authProvider.Kinesis().Stream(ctx, a.streamName) + stream, err := a.getStreamARN(ctx, a.streamName) if err != nil { return fmt.Errorf("failed to get kinesis stream arn: %v", err) } @@ -236,7 +248,7 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc types.StreamDescr return default: } - sub, err := a.authProvider.Kinesis().Kinesis.SubscribeToShard(ctx, &kinesis.SubscribeToShardInput{ + sub, err := a.kinesisClient.SubscribeToShard(ctx, &kinesis.SubscribeToShardInput{ ConsumerARN: consumerARN, ShardId: s.ShardId, StartingPosition: &types.StartingPosition{Type: types.ShardIteratorTypeLatest}, @@ -288,7 +300,7 @@ func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*st // Only set timeout on consumer call. conCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() - consumer, err := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumer(conCtx, &kinesis.DescribeStreamConsumerInput{ + consumer, err := a.kinesisClient.DescribeStreamConsumer(conCtx, &kinesis.DescribeStreamConsumerInput{ ConsumerName: &a.metadata.ConsumerName, StreamARN: streamARN, }) @@ -300,7 +312,7 @@ func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*st } func (a *AWSKinesis) registerConsumer(ctx context.Context, streamARN *string) (*string, error) { - consumer, err := a.authProvider.Kinesis().Kinesis.RegisterStreamConsumer(ctx, &kinesis.RegisterStreamConsumerInput{ + consumer, err := a.kinesisClient.RegisterStreamConsumer(ctx, &kinesis.RegisterStreamConsumerInput{ ConsumerName: &a.metadata.ConsumerName, StreamARN: streamARN, }) @@ -323,7 +335,7 @@ func (a *AWSKinesis) deregisterConsumer(ctx context.Context, streamARN *string, if a.consumerARN != nil { // Use a background context because the running context may have been canceled already ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - _, err := a.authProvider.Kinesis().Kinesis.DeregisterStreamConsumer(ctx, &kinesis.DeregisterStreamConsumerInput{ + _, err := a.kinesisClient.DeregisterStreamConsumer(ctx, &kinesis.DeregisterStreamConsumerInput{ ConsumerARN: consumerARN, StreamARN: streamARN, ConsumerName: &a.metadata.ConsumerName, @@ -339,7 +351,7 @@ func (a *AWSKinesis) deregisterConsumer(ctx context.Context, streamARN *string, func (a *AWSKinesis) waitUntilConsumerExists(ctx context.Context, input *kinesis.DescribeStreamConsumerInput) error { // Poll until consumer is active for i := 0; i < 18; i++ { - consumer, err := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumer(ctx, input) + consumer, err := a.kinesisClient.DescribeStreamConsumer(ctx, input) if err != nil { return err } @@ -403,6 +415,46 @@ func (p *recordProcessor) Shutdown(input *interfaces.ShutdownInput) { } } +func (a *AWSKinesis) createKinesisClient(ctx context.Context) error { + // Convert v1 credentials to v2 + if v1Creds, err := a.authProvider.Kinesis().Credentials.Get(); err == nil { + a.v2Credentials = v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) + } else { + // Fallback to default v2 config if conversion failed + v2Config, err := awsv2config.LoadDefaultConfig(ctx, awsv2config.WithRegion(a.authProvider.Kinesis().Region)) + if err != nil { + return err + } + a.v2Credentials = v2Config.Credentials + } + + // Create v2 config and Kinesis client + v2Config := aws.Config{ + Region: a.authProvider.Kinesis().Region, + Credentials: a.v2Credentials, + } + a.kinesisClient = kinesis.NewFromConfig(v2Config) + return nil +} + +func (a *AWSKinesis) getStreamARN(ctx context.Context, streamName string) (*string, error) { + stream, err := a.kinesisClient.DescribeStream(ctx, &kinesis.DescribeStreamInput{ + StreamName: &streamName, + }) + if err != nil { + return nil, err + } + return stream.StreamDescription.StreamARN, nil +} + +func (a *AWSKinesis) workerCfg(_ context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration { + const sharedMode = "shared" + if mode == sharedMode { + return config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", a.v2Credentials) + } + return nil +} + // GetComponentMetadata returns the metadata of the component. func (a *AWSKinesis) GetComponentMetadata() (metadataInfo metadata.MetadataMap) { metadataStruct := &kinesisMetadata{} diff --git a/bindings/aws/kinesis/kinesis_test.go b/bindings/aws/kinesis/kinesis_test.go index 154e9baa57..9739d97c20 100644 --- a/bindings/aws/kinesis/kinesis_test.go +++ b/bindings/aws/kinesis/kinesis_test.go @@ -14,8 +14,12 @@ limitations under the License. package kinesis import ( + "context" + "errors" "testing" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -48,3 +52,99 @@ func TestParseMetadata(t *testing.T) { assert.Equal(t, "extended", meta.KinesisConsumerMode) assert.Equal(t, "applicationName", meta.ApplicationName) } + +func getStreamARN(ctx context.Context, client *kinesis.Client, streamName string) (*string, error) { + if client == nil { + return nil, errors.New("unable to get stream arn due to empty client") + } + stream, err := client.DescribeStream(ctx, &kinesis.DescribeStreamInput{ + StreamName: &streamName, + }) + if err != nil { + return nil, err + } + return stream.StreamDescription.StreamARN, nil +} + +func TestKinesisClient_Stream(t *testing.T) { + tests := []struct { + name string + kinesisClient *kinesis.Client + streamName string + expectedErr string + }{ + { + name: "returns error when client is nil", + kinesisClient: nil, + streamName: "test-stream", + expectedErr: "unable to get stream arn due to empty client", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + got, err := getStreamARN(ctx, tt.kinesisClient, tt.streamName) + + if tt.expectedErr != "" { + require.Error(t, err) + assert.Equal(t, tt.expectedErr, err.Error()) + assert.Nil(t, got) + } else { + require.NoError(t, err) + assert.NotNil(t, got) + } + }) + } +} + +func TestAWSKinesis_WorkerCfg(t *testing.T) { + tests := []struct { + name string + streamName string + applicationName string + mode string + expectNil bool + }{ + { + name: "returns config for shared mode", + streamName: "test-stream", + applicationName: "test-app", + mode: "shared", + expectNil: false, + }, + { + name: "returns nil for extended mode", + streamName: "test-stream", + applicationName: "test-app", + mode: "extended", + expectNil: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + awsKinesis := &AWSKinesis{ + v2Credentials: aws.NewCredentialsCache(aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) { + return aws.Credentials{ + AccessKeyID: "test", + SecretAccessKey: "test", + }, nil + })), + } + cfg := awsKinesis.workerCfg(ctx, tt.streamName, "us-west-2", tt.mode, tt.applicationName) + + if tt.expectNil { + assert.Nil(t, cfg) + } else { + assert.NotNil(t, cfg) + if cfg != nil { + assert.Equal(t, tt.streamName, cfg.StreamName) + assert.Equal(t, tt.applicationName, cfg.ApplicationName) + assert.Equal(t, "us-west-2", cfg.RegionName) + } + } + }) + } +} diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index b8307409da..88af9b09b2 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -18,10 +18,7 @@ import ( "errors" "sync" - "github.com/aws/aws-sdk-go-v2/aws" - awsv2config "github.com/aws/aws-sdk-go-v2/config" - v2creds "github.com/aws/aws-sdk-go-v2/credentials" - kinesisv2 "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" @@ -37,7 +34,6 @@ import ( "github.com/aws/aws-sdk-go/service/ssm" "github.com/aws/aws-sdk-go/service/ssm/ssmiface" "github.com/aws/aws-sdk-go/service/sts" - "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config" ) type Clients struct { @@ -117,10 +113,8 @@ type ParameterStoreClients struct { } type KinesisClients struct { - Kinesis *kinesisv2.Client - Region string - Credentials *credentials.Credentials - V2Credentials aws.CredentialsProvider + Region string + Credentials *credentials.Credentials } type SesClients struct { @@ -175,47 +169,6 @@ func (c *ParameterStoreClients) New(session *session.Session) { func (c *KinesisClients) New(session *session.Session) { c.Region = *session.Config.Region c.Credentials = session.Config.Credentials - // Convert v1 credentials to v2 for both Kinesis client and KCL usage - if v1Creds, err := session.Config.Credentials.Get(); err == nil { - c.V2Credentials = v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) - // Create v2 config and Kinesis client - v2Config := aws.Config{ - Region: c.Region, - Credentials: c.V2Credentials, - } - c.Kinesis = kinesisv2.NewFromConfig(v2Config) - } -} - -func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string, error) { - if c.Kinesis != nil { - stream, err := c.Kinesis.DescribeStream(ctx, &kinesisv2.DescribeStreamInput{ - StreamName: &streamName, - }) - if err != nil { - return nil, err - } - return stream.StreamDescription.StreamARN, nil - } - - return nil, errors.New("unable to get stream arn due to empty client") -} - -func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration { - const sharedMode = "shared" - if mode == sharedMode { - // Use converted v2 credentials if available - if c.V2Credentials != nil { - return config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", c.V2Credentials) - } - // Fallback to default v2 config if conversion failed - v2Config, err := awsv2config.LoadDefaultConfig(ctx, awsv2config.WithRegion(region)) - if err != nil { - return nil - } - return config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", v2Config.Credentials) - } - return nil } func (c *SesClients) New(session *session.Session) { diff --git a/common/authentication/aws/client_test.go b/common/authentication/aws/client_test.go index a45fa566b1..19a833f2c6 100644 --- a/common/authentication/aws/client_test.go +++ b/common/authentication/aws/client_test.go @@ -18,10 +18,7 @@ import ( "errors" "testing" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/service/kinesis" - "github.com/aws/aws-sdk-go-v2/service/kinesis/types" - "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" @@ -39,32 +36,7 @@ func (m *mockedSQS) GetQueueUrlWithContext(ctx context.Context, input *sqs.GetQu return m.GetQueueURLFn(ctx, input) } -type mockedKinesisV2 struct { - DescribeStreamFn func(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) -} -func (m *mockedKinesisV2) DescribeStream(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) { - return m.DescribeStreamFn(ctx, input, opts...) -} - -// testKinesisClients wraps KinesisClients for testing with mock -type testKinesisClients struct { - *KinesisClients - mockKinesis *mockedKinesisV2 -} - -func (t *testKinesisClients) Stream(ctx context.Context, streamName string) (*string, error) { - if t.mockKinesis != nil { - stream, err := t.mockKinesis.DescribeStream(ctx, &kinesis.DescribeStreamInput{ - StreamName: &streamName, - }) - if err != nil { - return nil, err - } - return stream.StreamDescription.StreamARN, nil - } - return nil, errors.New("unable to get stream arn due to empty client") -} func TestS3Clients_New(t *testing.T) { tests := []struct { @@ -144,221 +116,4 @@ func TestSqsClients_QueueURL(t *testing.T) { } } -func TestKinesisClients_Stream(t *testing.T) { - tests := []struct { - name string - kinesisClient *testKinesisClients - streamName string - expectedStream *string - expectedErr string - }{ - { - name: "successfully retrieves stream ARN", - kinesisClient: &testKinesisClients{ - KinesisClients: &KinesisClients{ - Region: "us-west-2", - Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), - }, - mockKinesis: &mockedKinesisV2{ - DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) { - streamARN := "arn:aws:kinesis:us-west-2:123456789012:stream/test-stream" - streamName := "test-stream" - return &kinesis.DescribeStreamOutput{ - StreamDescription: &types.StreamDescription{ - StreamARN: &streamARN, - StreamName: &streamName, - StreamStatus: types.StreamStatusActive, - }, - }, nil - }, - }, - }, - streamName: "test-stream", - expectedStream: aws.String("arn:aws:kinesis:us-west-2:123456789012:stream/test-stream"), - expectedErr: "", - }, - { - name: "returns error when stream not found", - kinesisClient: &testKinesisClients{ - KinesisClients: &KinesisClients{ - Region: "us-west-2", - Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), - }, - mockKinesis: &mockedKinesisV2{ - DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) { - return nil, errors.New("ResourceNotFoundException: Stream nonexistent-stream under account 123456789012 not found") - }, - }, - }, - streamName: "nonexistent-stream", - expectedStream: nil, - expectedErr: "ResourceNotFoundException: Stream nonexistent-stream under account 123456789012 not found", - }, - { - name: "returns error when client is nil", - kinesisClient: &testKinesisClients{ - KinesisClients: &KinesisClients{ - Region: "us-west-2", - Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), - }, - mockKinesis: nil, - }, - streamName: "test-stream", - expectedStream: nil, - expectedErr: "unable to get stream arn due to empty client", - }, - { - name: "handles stream with special characters in name", - kinesisClient: &testKinesisClients{ - KinesisClients: &KinesisClients{ - Region: "eu-central-1", - Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), - }, - mockKinesis: &mockedKinesisV2{ - DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) { - streamARN := "arn:aws:kinesis:eu-central-1:123456789012:stream/my-test_stream.123" - streamName := "my-test_stream.123" - return &kinesis.DescribeStreamOutput{ - StreamDescription: &types.StreamDescription{ - StreamARN: &streamARN, - StreamName: &streamName, - StreamStatus: types.StreamStatusActive, - }, - }, nil - }, - }, - }, - streamName: "my-test_stream.123", - expectedStream: aws.String("arn:aws:kinesis:eu-central-1:123456789012:stream/my-test_stream.123"), - expectedErr: "", - }, - { - name: "handles empty stream name", - kinesisClient: &testKinesisClients{ - KinesisClients: &KinesisClients{ - Region: "us-east-1", - Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), - }, - mockKinesis: &mockedKinesisV2{ - DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) { - return nil, errors.New("ValidationException: Stream name cannot be empty") - }, - }, - }, - streamName: "", - expectedStream: nil, - expectedErr: "ValidationException: Stream name cannot be empty", - }, - { - name: "handles stream in creating state", - kinesisClient: &testKinesisClients{ - KinesisClients: &KinesisClients{ - Region: "ap-southeast-1", - Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), - }, - mockKinesis: &mockedKinesisV2{ - DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) { - streamARN := "arn:aws:kinesis:ap-southeast-1:123456789012:stream/creating-stream" - streamName := "creating-stream" - return &kinesis.DescribeStreamOutput{ - StreamDescription: &types.StreamDescription{ - StreamARN: &streamARN, - StreamName: &streamName, - StreamStatus: types.StreamStatusCreating, - }, - }, nil - }, - }, - }, - streamName: "creating-stream", - expectedStream: aws.String("arn:aws:kinesis:ap-southeast-1:123456789012:stream/creating-stream"), - expectedErr: "", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - got, err := tt.kinesisClient.Stream(ctx, tt.streamName) - - if tt.expectedErr != "" { - require.Error(t, err) - assert.Equal(t, tt.expectedErr, err.Error()) - assert.Nil(t, got) - } else { - require.NoError(t, err) - assert.Equal(t, tt.expectedStream, got) - } - }) - } -} - -func TestKinesisClients_WorkerCfg(t *testing.T) { - tests := []struct { - name string - kinesisClient *KinesisClients - streamName string - applicationName string - mode string - expectNil bool - }{ - { - name: "successfully creates shared mode worker config with v2 credentials", - kinesisClient: &KinesisClients{ - Region: "us-west-2", - Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), - V2Credentials: aws.NewCredentialsCache(aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) { - return aws.Credentials{ - AccessKeyID: "accessKey", - SecretAccessKey: "secretKey", - }, nil - })), - }, - streamName: "test-stream", - applicationName: "test-app", - mode: "shared", - expectNil: false, - }, - { - name: "returns nil when mode is not shared", - kinesisClient: &KinesisClients{ - Region: "us-west-2", - Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), - }, - streamName: "test-stream", - applicationName: "test-app", - mode: "extended", - expectNil: true, - }, - { - name: "falls back to default config when v2 credentials are nil", - kinesisClient: &KinesisClients{ - Region: "us-east-1", - Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), - V2Credentials: nil, - }, - streamName: "fallback-stream", - applicationName: "fallback-app", - mode: "shared", - expectNil: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - cfg := tt.kinesisClient.WorkerCfg(ctx, tt.streamName, tt.kinesisClient.Region, tt.mode, tt.applicationName) - - if tt.expectNil { - assert.Nil(t, cfg) - } else { - assert.NotNil(t, cfg) - if cfg != nil { - assert.Equal(t, tt.streamName, cfg.StreamName) - assert.Equal(t, tt.applicationName, cfg.ApplicationName) - assert.Equal(t, tt.kinesisClient.Region, cfg.RegionName) - } - } - }) - } -} diff --git a/go.sum b/go.sum index dd34b46f48..df4c9a431d 100644 --- a/go.sum +++ b/go.sum @@ -277,8 +277,6 @@ github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVj github.com/aws/aws-sdk-go-v2 v1.9.2/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk= github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 h1:12SpdwU8Djs+YGklkinSSlcrPyj3H4VifVsKf78KbwA= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11/go.mod h1:dd+Lkp6YmMryke+qxW/VnKyhMBDTYP41Q2Bb+6gNZgY= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3/go.mod h1:xdCzcZEtnSTKVDOmUZs4l/j3pSV6rpo1WXl5ugNsL8Y= github.com/aws/aws-sdk-go-v2/config v1.8.3/go.mod h1:4AEiLtAb8kLs7vgw2ZV3p2VZ1+hBavOc84hqxVNpCyw= @@ -294,12 +292,8 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.32 h1:KAXP9JSHO1vKGCr5f4O6Wm github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.32/go.mod h1:h4Sg6FQdexC1yYG9RDnOvLbW1a/P986++/Y/a+GyEM8= github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.3.10 h1:z6fAXB4HSuYjrE/P8RU3NdCaN+EPaeq/+80aisCjuF8= github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.3.10/go.mod h1:PoPjOi7j+/DtKIGC58HRfcdWKBPYYXwdKnRG+po+hzo= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36 h1:SsytQyTMHMDPspp+spo7XwXTP44aJZZAC7fBV2C5+5s= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36/go.mod h1:Q1lnJArKRXkenyog6+Y+zr7WDpk4e6XlR6gs20bbeNo= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13/go.mod h1:oGnKwIYZ4XttyU2JWxFrwvhF6YKiK/9/wmE3v3Iu9K8= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36 h1:i2vNHQiXUvKhs3quBR6aqlgJaiaexz/aNvdCktW/kAM= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36/go.mod h1:UdyGa7Q91id/sdyHPwth+043HhmP6yP9MBHgbZM0xo8= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 h1:HBSI2kDkMdWz4ZM7FjwE7e/pWDEZ+nR95x8Ztet1ooY= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13/go.mod h1:YE94ZoDArI7awZqJzBAZ3PDD2zSfuP7w6P2knOzIn8M= github.com/aws/aws-sdk-go-v2/internal/ini v1.2.4/go.mod h1:ZcBrrI3zBKlhGFNYWvju0I3TR93I7YIgAfy82Fh4lcQ= @@ -320,8 +314,6 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.3.2/go.mod h1:72H github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17 h1:t0E6FzREdtCsiLIoLCWsYliNsRBgyGD/MCK571qk4MI= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17/go.mod h1:ygpklyoaypuyDvOM5ujWGrYWpAK3h7ugnmKCU/76Ys4= github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI= -github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1 h1:p8dOJ/UKXOwttc1Cxw1Ek52klVmMuiaCUkhsUGxce1I= -github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.1/go.mod h1:VpH1IBG1YYZHPu5qShNt7EGaqUQbHAJZrbDtEpqDvvY= github.com/aws/aws-sdk-go-v2/service/kinesis v1.42.3 h1:A2HNxrABEFha5831yAU05G0mYNxaxYH4WG85FV6ZWIQ= github.com/aws/aws-sdk-go-v2/service/kinesis v1.42.3/go.mod h1:jTDNZao/9uv/6JeaeDWEqA4s+l6c8+cqaDeYFpM+818= github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.32.4 h1:BN6+zko+qO9Tl9S0ywUPNvY0gvlFK4Zmj2Y0a8paFkk= From 6ebe1a469170d5cdc8224e8077c09cd6bdc95d78 Mon Sep 17 00:00:00 2001 From: rideshnath-scout Date: Wed, 19 Nov 2025 13:14:34 +0530 Subject: [PATCH 11/18] refactor: streamline AWS Kinesis client creation and remove unused auth provider Signed-off-by: rideshnath-scout --- bindings/aws/kinesis/kinesis.go | 52 ++++++++++----------------------- 1 file changed, 16 insertions(+), 36 deletions(-) diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index 0a3b38eeaf..31d8a75ad0 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -22,19 +22,18 @@ import ( "sync/atomic" "time" - "github.com/aws/aws-sdk-go-v2/aws" - awsv2config "github.com/aws/aws-sdk-go-v2/config" - v2creds "github.com/aws/aws-sdk-go-v2/credentials" + awsv2 "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/cenkalti/backoff/v4" + aws "github.com/dapr/components-contrib/common/aws" "github.com/google/uuid" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker" "github.com/dapr/components-contrib/bindings" - awsAuth "github.com/dapr/components-contrib/common/authentication/aws" + awsAuth "github.com/dapr/components-contrib/common/aws/auth" "github.com/dapr/components-contrib/metadata" "github.com/dapr/kit/logger" kitmd "github.com/dapr/kit/metadata" @@ -42,12 +41,12 @@ import ( // AWSKinesis allows receiving and sending data to/from AWS Kinesis stream. type AWSKinesis struct { - authProvider awsAuth.Provider - metadata *kinesisMetadata + // authProvider awsAuth.Provider + metadata *kinesisMetadata worker *worker.Worker kinesisClient *kinesis.Client - v2Credentials aws.CredentialsProvider + v2Credentials awsv2.CredentialsProvider streamName string consumerName string @@ -133,17 +132,12 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error SecretKey: m.SecretKey, SessionToken: "", } - // extra configs needed per component type - provider, err := awsAuth.NewProvider(ctx, opts, awsAuth.GetConfig(opts)) - if err != nil { - return err - } - a.authProvider = provider - // Create AWS SDK v2 client - if err := a.createKinesisClient(ctx); err != nil { + kinesisClient, err := a.createKinesisClient(ctx, opts) + if err != nil { return err } + a.kinesisClient = kinesisClient return nil } @@ -290,9 +284,6 @@ func (a *AWSKinesis) Close() error { close(a.closeCh) } a.wg.Wait() - if a.authProvider != nil { - return a.authProvider.Close() - } return nil } @@ -415,26 +406,15 @@ func (p *recordProcessor) Shutdown(input *interfaces.ShutdownInput) { } } -func (a *AWSKinesis) createKinesisClient(ctx context.Context) error { - // Convert v1 credentials to v2 - if v1Creds, err := a.authProvider.Kinesis().Credentials.Get(); err == nil { - a.v2Credentials = v2creds.NewStaticCredentialsProvider(v1Creds.AccessKeyID, v1Creds.SecretAccessKey, v1Creds.SessionToken) - } else { - // Fallback to default v2 config if conversion failed - v2Config, err := awsv2config.LoadDefaultConfig(ctx, awsv2config.WithRegion(a.authProvider.Kinesis().Region)) - if err != nil { - return err - } - a.v2Credentials = v2Config.Credentials - } +func (a *AWSKinesis) createKinesisClient(ctx context.Context, opts awsAuth.Options) (*kinesis.Client, error) { - // Create v2 config and Kinesis client - v2Config := aws.Config{ - Region: a.authProvider.Kinesis().Region, - Credentials: a.v2Credentials, + awsConfig, configErr := aws.NewConfig(ctx, opts) + if configErr != nil { + return nil, configErr } - a.kinesisClient = kinesis.NewFromConfig(v2Config) - return nil + + kinesisClient := kinesis.NewFromConfig(awsConfig) + return kinesisClient, nil } func (a *AWSKinesis) getStreamARN(ctx context.Context, streamName string) (*string, error) { From c0641873eec896e865a8282b1bbfd9afd4cd853a Mon Sep 17 00:00:00 2001 From: rideshnath-scout Date: Wed, 19 Nov 2025 17:57:34 +0530 Subject: [PATCH 12/18] feat: add applicationName metadata field to Kinesis binding Signed-off-by: rideshnath-scout --- bindings/aws/kinesis/metadata.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bindings/aws/kinesis/metadata.yaml b/bindings/aws/kinesis/metadata.yaml index b9563973cf..6468160bde 100644 --- a/bindings/aws/kinesis/metadata.yaml +++ b/bindings/aws/kinesis/metadata.yaml @@ -64,3 +64,7 @@ metadata: required: false description: "The Kinesis endpoint URL" example: "http://localhost:4566" + - name: applicationName + required: false + description: "The Kinesis application name" + example: "my-application" From 716b8a15d9346097e088627482b7e24cc8e76b0a Mon Sep 17 00:00:00 2001 From: rideshnath-scout Date: Thu, 20 Nov 2025 10:54:25 +0530 Subject: [PATCH 13/18] refactor: update AWS SDK v2 dependencies and improve import organization Signed-off-by: rideshnath-scout --- bindings/aws/kinesis/kinesis.go | 11 +++++------ tests/certification/go.mod | 10 ++++------ tests/certification/go.sum | 18 ++++++++---------- 3 files changed, 17 insertions(+), 22 deletions(-) diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index 31d8a75ad0..af49f45a4a 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -22,11 +22,11 @@ import ( "sync/atomic" "time" - awsv2 "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/cenkalti/backoff/v4" - aws "github.com/dapr/components-contrib/common/aws" + awsCommon "github.com/dapr/components-contrib/common/aws" "github.com/google/uuid" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" @@ -41,12 +41,11 @@ import ( // AWSKinesis allows receiving and sending data to/from AWS Kinesis stream. type AWSKinesis struct { - // authProvider awsAuth.Provider metadata *kinesisMetadata worker *worker.Worker kinesisClient *kinesis.Client - v2Credentials awsv2.CredentialsProvider + v2Credentials aws.CredentialsProvider streamName string consumerName string @@ -322,7 +321,7 @@ func (a *AWSKinesis) registerConsumer(ctx context.Context, streamARN *string) (* return consumer.Consumer.ConsumerARN, nil } -func (a *AWSKinesis) deregisterConsumer(ctx context.Context, streamARN *string, consumerARN *string) error { +func (a *AWSKinesis) deregisterConsumer(_ context.Context, streamARN *string, consumerARN *string) error { if a.consumerARN != nil { // Use a background context because the running context may have been canceled already ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -408,7 +407,7 @@ func (p *recordProcessor) Shutdown(input *interfaces.ShutdownInput) { func (a *AWSKinesis) createKinesisClient(ctx context.Context, opts awsAuth.Options) (*kinesis.Client, error) { - awsConfig, configErr := aws.NewConfig(ctx, opts) + awsConfig, configErr := awsCommon.NewConfig(ctx, opts) if configErr != nil { return nil, configErr } diff --git a/tests/certification/go.mod b/tests/certification/go.mod index fce100b945..09fcde4eea 100644 --- a/tests/certification/go.mod +++ b/tests/certification/go.mod @@ -86,14 +86,14 @@ require ( github.com/armon/go-metrics v0.4.1 // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/aws/aws-msk-iam-sasl-signer-go v1.0.1-0.20241125194140-078c08b8574a // indirect - github.com/aws/aws-sdk-go-v2 v1.36.5 // indirect + github.com/aws/aws-sdk-go-v2 v1.39.6 // indirect github.com/aws/aws-sdk-go-v2/config v1.29.17 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.17.70 // indirect github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.19.3 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.32 // indirect github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.3.10 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect github.com/aws/aws-sdk-go-v2/service/dynamodb v1.43.4 // indirect github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.25.6 // indirect @@ -106,7 +106,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.3 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.34.0 // indirect github.com/aws/rolesanywhere-credential-helper v1.0.4 // indirect - github.com/aws/smithy-go v1.22.5 // indirect + github.com/aws/smithy-go v1.23.2 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.4.0 // indirect @@ -294,7 +294,6 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/tmc/langchaingo v0.1.13 // indirect - github.com/vmware/vmware-go-kcl v1.5.1 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect @@ -340,7 +339,6 @@ require ( google.golang.org/grpc v1.73.0 // indirect google.golang.org/protobuf v1.36.6 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/api v0.31.0 // indirect diff --git a/tests/certification/go.sum b/tests/certification/go.sum index 2d37fa82ee..f88b0efb22 100644 --- a/tests/certification/go.sum +++ b/tests/certification/go.sum @@ -207,8 +207,8 @@ github.com/aws/aws-sdk-go v1.55.6 h1:cSg4pvZ3m8dgYcgqB97MrcdjUmZ1BeMYKUxMMB89IPk github.com/aws/aws-sdk-go v1.55.6/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/aws/aws-sdk-go-v2 v1.9.2/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= -github.com/aws/aws-sdk-go-v2 v1.36.5 h1:0OF9RiEMEdDdZEMqF9MRjevyxAQcf6gY+E7vwBILFj0= -github.com/aws/aws-sdk-go-v2 v1.36.5/go.mod h1:EYrzvCCN9CMUTa5+6lf6MM4tq3Zjp8UhSGR/cBsjai0= +github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk= +github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE= github.com/aws/aws-sdk-go-v2/config v1.8.3/go.mod h1:4AEiLtAb8kLs7vgw2ZV3p2VZ1+hBavOc84hqxVNpCyw= github.com/aws/aws-sdk-go-v2/config v1.29.17 h1:jSuiQ5jEe4SAMH6lLRMY9OVC+TqJLP5655pBGjmnjr0= github.com/aws/aws-sdk-go-v2/config v1.29.17/go.mod h1:9P4wwACpbeXs9Pm9w1QTh6BwWwJjwYvJ1iCt5QbCXh8= @@ -222,10 +222,10 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.32 h1:KAXP9JSHO1vKGCr5f4O6Wm github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.32/go.mod h1:h4Sg6FQdexC1yYG9RDnOvLbW1a/P986++/Y/a+GyEM8= github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.3.10 h1:z6fAXB4HSuYjrE/P8RU3NdCaN+EPaeq/+80aisCjuF8= github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.3.10/go.mod h1:PoPjOi7j+/DtKIGC58HRfcdWKBPYYXwdKnRG+po+hzo= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36 h1:SsytQyTMHMDPspp+spo7XwXTP44aJZZAC7fBV2C5+5s= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36/go.mod h1:Q1lnJArKRXkenyog6+Y+zr7WDpk4e6XlR6gs20bbeNo= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36 h1:i2vNHQiXUvKhs3quBR6aqlgJaiaexz/aNvdCktW/kAM= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36/go.mod h1:UdyGa7Q91id/sdyHPwth+043HhmP6yP9MBHgbZM0xo8= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13/go.mod h1:oGnKwIYZ4XttyU2JWxFrwvhF6YKiK/9/wmE3v3Iu9K8= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 h1:HBSI2kDkMdWz4ZM7FjwE7e/pWDEZ+nR95x8Ztet1ooY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13/go.mod h1:YE94ZoDArI7awZqJzBAZ3PDD2zSfuP7w6P2knOzIn8M= github.com/aws/aws-sdk-go-v2/internal/ini v1.2.4/go.mod h1:ZcBrrI3zBKlhGFNYWvju0I3TR93I7YIgAfy82Fh4lcQ= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= @@ -256,8 +256,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.34.0/go.mod h1:7ph2tGpfQvwzgistp2+zg github.com/aws/rolesanywhere-credential-helper v1.0.4 h1:kHIVVdyQQiFZoKBP+zywBdFilGCS8It+UvW5LolKbW8= github.com/aws/rolesanywhere-credential-helper v1.0.4/go.mod h1:QVGNxlDlYhjR0/ZUee7uGl0hNChWidNpe2+GD87Buqk= github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= -github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= -github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= +github.com/aws/smithy-go v1.23.2 h1:Crv0eatJUQhaManss33hS5r40CG3ZFH+21XSkqMrIUM= +github.com/aws/smithy-go v1.23.2/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= @@ -1432,8 +1432,6 @@ github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= -github.com/vmware/vmware-go-kcl v1.5.1 h1:1rJLfAX4sDnCyatNoD/WJzVafkwST6u/cgY/Uf2VgHk= -github.com/vmware/vmware-go-kcl v1.5.1/go.mod h1:kXJmQ6h0dRMRrp1uWU9XbIXvwelDpTxSPquvQUBdpbo= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= From bf33f5232d0fa224d572d6fa538c934ef9765d24 Mon Sep 17 00:00:00 2001 From: rideshnath-scout Date: Thu, 20 Nov 2025 14:21:08 +0530 Subject: [PATCH 14/18] refactor: correct variable naming and improve context handling in tests (resolve linting issues) Signed-off-by: rideshnath-scout --- bindings/aws/kinesis/kinesis.go | 13 ++++++------- bindings/aws/kinesis/kinesis_test.go | 4 ++-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index af49f45a4a..439565e915 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -26,7 +26,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/cenkalti/backoff/v4" - awsCommon "github.com/dapr/components-contrib/common/aws" + awscommon "github.com/dapr/components-contrib/common/aws" "github.com/google/uuid" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" @@ -166,7 +166,7 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er switch a.metadata.KinesisConsumerMode { case SharedThroughput: - // initalize worker configuration + // initialize worker configuration config := a.workerCfg(ctx, a.streamName, a.metadata.Region, a.consumerMode, a.applicationName) // Configure the KCL worker with custom endpoints for LocalStack if a.metadata.Endpoint != "" { @@ -339,8 +339,8 @@ func (a *AWSKinesis) deregisterConsumer(_ context.Context, streamARN *string, co } func (a *AWSKinesis) waitUntilConsumerExists(ctx context.Context, input *kinesis.DescribeStreamConsumerInput) error { - // Poll until consumer is active - for i := 0; i < 18; i++ { + // Iterate 18 times + for range 18 { consumer, err := a.kinesisClient.DescribeStreamConsumer(ctx, input) if err != nil { return err @@ -350,7 +350,7 @@ func (a *AWSKinesis) waitUntilConsumerExists(ctx context.Context, input *kinesis } time.Sleep(10 * time.Second) } - return fmt.Errorf("consumer did not become active within timeout") + return errors.New("consumer did not become active within timeout") } func (a *AWSKinesis) parseMetadata(meta bindings.Metadata) (*kinesisMetadata, error) { @@ -406,8 +406,7 @@ func (p *recordProcessor) Shutdown(input *interfaces.ShutdownInput) { } func (a *AWSKinesis) createKinesisClient(ctx context.Context, opts awsAuth.Options) (*kinesis.Client, error) { - - awsConfig, configErr := awsCommon.NewConfig(ctx, opts) + awsConfig, configErr := awscommon.NewConfig(ctx, opts) if configErr != nil { return nil, configErr } diff --git a/bindings/aws/kinesis/kinesis_test.go b/bindings/aws/kinesis/kinesis_test.go index 9739d97c20..532c75d02b 100644 --- a/bindings/aws/kinesis/kinesis_test.go +++ b/bindings/aws/kinesis/kinesis_test.go @@ -83,7 +83,7 @@ func TestKinesisClient_Stream(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() + ctx := t.Context() got, err := getStreamARN(ctx, tt.kinesisClient, tt.streamName) if tt.expectedErr != "" { @@ -124,7 +124,7 @@ func TestAWSKinesis_WorkerCfg(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() + ctx := t.Context() awsKinesis := &AWSKinesis{ v2Credentials: aws.NewCredentialsCache(aws.CredentialsProviderFunc(func(ctx context.Context) (aws.Credentials, error) { return aws.Credentials{ From d8f510305dd07f147b3486f5630a088aa91ec4b4 Mon Sep 17 00:00:00 2001 From: rideshnath-scout Date: Thu, 20 Nov 2025 16:06:57 +0530 Subject: [PATCH 15/18] refactor: reorder AWS SDK imports (resolve lint issues) Signed-off-by: rideshnath-scout --- bindings/aws/kinesis/kinesis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index 439565e915..5bfe497b41 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -26,13 +26,13 @@ import ( "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/cenkalti/backoff/v4" - awscommon "github.com/dapr/components-contrib/common/aws" "github.com/google/uuid" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" "github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker" "github.com/dapr/components-contrib/bindings" + awscommon "github.com/dapr/components-contrib/common/aws" awsAuth "github.com/dapr/components-contrib/common/aws/auth" "github.com/dapr/components-contrib/metadata" "github.com/dapr/kit/logger" From d75c04c5210ff9f8c613abfe94110e1bb50be846 Mon Sep 17 00:00:00 2001 From: rideshnath-scout Date: Wed, 26 Nov 2025 12:18:33 +0530 Subject: [PATCH 16/18] Revert changes from '/common/authentication/aws' Signed-off-by: rideshnath-scout --- common/authentication/aws/client.go | 34 ++++++ common/authentication/aws/client_test.go | 146 +++++++++++++++++++++++ go.mod | 1 + go.sum | 2 + tests/certification/go.mod | 2 + tests/certification/go.sum | 2 + 6 files changed, 187 insertions(+) diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index 88af9b09b2..b210e32944 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -23,6 +23,8 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/aws/aws-sdk-go/service/secretsmanager" @@ -34,6 +36,7 @@ import ( "github.com/aws/aws-sdk-go/service/ssm" "github.com/aws/aws-sdk-go/service/ssm/ssmiface" "github.com/aws/aws-sdk-go/service/sts" + "github.com/vmware/vmware-go-kcl/clientlibrary/config" ) type Clients struct { @@ -113,6 +116,7 @@ type ParameterStoreClients struct { } type KinesisClients struct { + Kinesis kinesisiface.KinesisAPI Region string Credentials *credentials.Credentials } @@ -167,10 +171,40 @@ func (c *ParameterStoreClients) New(session *session.Session) { } func (c *KinesisClients) New(session *session.Session) { + c.Kinesis = kinesis.New(session, session.Config) c.Region = *session.Config.Region c.Credentials = session.Config.Credentials } +func (c *KinesisClients) Stream(ctx context.Context, streamName string) (*string, error) { + if c.Kinesis != nil { + stream, err := c.Kinesis.DescribeStreamWithContext(ctx, &kinesis.DescribeStreamInput{ + StreamName: aws.String(streamName), + }) + if stream != nil { + return stream.StreamDescription.StreamARN, err + } + } + + return nil, errors.New("unable to get stream arn due to empty client") +} + +func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, consumer, mode string) *config.KinesisClientLibConfiguration { + const sharedMode = "shared" + if c.Kinesis != nil { + if mode == sharedMode { + if c.Credentials != nil { + kclConfig := config.NewKinesisClientLibConfigWithCredential(consumer, + stream, c.Region, consumer, + c.Credentials) + return kclConfig + } + } + } + + return nil +} + func (c *SesClients) New(session *session.Session) { c.Ses = ses.New(session, session.Config) } diff --git a/common/authentication/aws/client_test.go b/common/authentication/aws/client_test.go index 19a833f2c6..85e0392aae 100644 --- a/common/authentication/aws/client_test.go +++ b/common/authentication/aws/client_test.go @@ -19,12 +19,16 @@ import ( "testing" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" "github.com/aws/aws-sdk-go/service/sqs" "github.com/aws/aws-sdk-go/service/sqs/sqsiface" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/vmware/vmware-go-kcl/clientlibrary/config" ) type mockedSQS struct { @@ -36,7 +40,14 @@ func (m *mockedSQS) GetQueueUrlWithContext(ctx context.Context, input *sqs.GetQu return m.GetQueueURLFn(ctx, input) } +type mockedKinesis struct { + kinesisiface.KinesisAPI + DescribeStreamFn func(ctx context.Context, input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) +} +func (m *mockedKinesis) DescribeStreamWithContext(ctx context.Context, input *kinesis.DescribeStreamInput, opts ...request.Option) (*kinesis.DescribeStreamOutput, error) { + return m.DescribeStreamFn(ctx, input) +} func TestS3Clients_New(t *testing.T) { tests := []struct { @@ -116,4 +127,139 @@ func TestSqsClients_QueueURL(t *testing.T) { } } +func TestKinesisClients_Stream(t *testing.T) { + tests := []struct { + name string + kinesisClient *KinesisClients + streamName string + mockStreamARN *string + mockError error + expectedStream *string + expectedErr error + }{ + { + name: "successfully retrieves stream ARN", + kinesisClient: &KinesisClients{ + Kinesis: &mockedKinesis{DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) { + return &kinesis.DescribeStreamOutput{ + StreamDescription: &kinesis.StreamDescription{ + StreamARN: aws.String("arn:aws:kinesis:some-region:123456789012:stream/some-stream"), + }, + }, nil + }}, + Region: "us-west-1", + Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + }, + streamName: "some-stream", + expectedStream: aws.String("arn:aws:kinesis:some-region:123456789012:stream/some-stream"), + expectedErr: nil, + }, + { + name: "returns error when stream not found", + kinesisClient: &KinesisClients{ + Kinesis: &mockedKinesis{DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) { + return nil, errors.New("stream not found") + }}, + Region: "us-west-1", + Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + }, + streamName: "nonexistent-stream", + expectedStream: nil, + expectedErr: errors.New("unable to get stream arn due to empty client"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.kinesisClient.Stream(t.Context(), tt.streamName) + if tt.expectedErr != nil { + require.Error(t, err) + assert.Equal(t, tt.expectedErr.Error(), err.Error()) + } else { + require.NoError(t, err) + assert.Equal(t, tt.expectedStream, got) + } + }) + } +} + +func TestKinesisClients_WorkerCfg(t *testing.T) { + testCreds := credentials.NewStaticCredentials("accessKey", "secretKey", "") + tests := []struct { + name string + kinesisClient *KinesisClients + streamName string + consumer string + mode string + expectedConfig *config.KinesisClientLibConfiguration + }{ + { + name: "successfully creates shared mode worker config", + kinesisClient: &KinesisClients{ + Kinesis: &mockedKinesis{ + DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) { + return &kinesis.DescribeStreamOutput{ + StreamDescription: &kinesis.StreamDescription{ + StreamARN: aws.String("arn:aws:kinesis:us-east-1:123456789012:stream/existing-stream"), + }, + }, nil + }, + }, + Region: "us-west-1", + Credentials: testCreds, + }, + streamName: "existing-stream", + consumer: "consumer1", + mode: "shared", + expectedConfig: config.NewKinesisClientLibConfigWithCredential( + "consumer1", "existing-stream", "us-west-1", "consumer1", testCreds, + ), + }, + { + name: "returns nil when mode is not shared", + kinesisClient: &KinesisClients{ + Kinesis: &mockedKinesis{ + DescribeStreamFn: func(ctx context.Context, input *kinesis.DescribeStreamInput) (*kinesis.DescribeStreamOutput, error) { + return &kinesis.DescribeStreamOutput{ + StreamDescription: &kinesis.StreamDescription{ + StreamARN: aws.String("arn:aws:kinesis:us-east-1:123456789012:stream/existing-stream"), + }, + }, nil + }, + }, + Region: "us-west-1", + Credentials: testCreds, + }, + streamName: "existing-stream", + consumer: "consumer1", + mode: "exclusive", + expectedConfig: nil, + }, + { + name: "returns nil when client is nil", + kinesisClient: &KinesisClients{ + Kinesis: nil, + Region: "us-west-1", + Credentials: credentials.NewStaticCredentials("accessKey", "secretKey", ""), + }, + streamName: "existing-stream", + consumer: "consumer1", + mode: "shared", + expectedConfig: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := tt.kinesisClient.WorkerCfg(t.Context(), tt.streamName, tt.consumer, tt.mode) + if tt.expectedConfig == nil { + assert.Equal(t, tt.expectedConfig, cfg) + return + } + assert.Equal(t, tt.expectedConfig.StreamName, cfg.StreamName) + assert.Equal(t, tt.expectedConfig.EnhancedFanOutConsumerName, cfg.EnhancedFanOutConsumerName) + assert.Equal(t, tt.expectedConfig.EnableEnhancedFanOutConsumer, cfg.EnableEnhancedFanOutConsumer) + assert.Equal(t, tt.expectedConfig.RegionName, cfg.RegionName) + }) + } +} diff --git a/go.mod b/go.mod index 2c1e5fed33..dab56365c4 100644 --- a/go.mod +++ b/go.mod @@ -127,6 +127,7 @@ require ( github.com/tetratelabs/wazero v1.7.0 github.com/tmc/langchaingo v0.1.13 github.com/valyala/fasthttp v1.53.0 + github.com/vmware/vmware-go-kcl v1.5.1 github.com/vmware/vmware-go-kcl-v2 v1.0.0 github.com/xdg-go/scram v1.1.2 go.etcd.io/etcd/client/v3 v3.5.21 diff --git a/go.sum b/go.sum index d64cc16d79..55356add5a 100644 --- a/go.sum +++ b/go.sum @@ -1717,6 +1717,8 @@ github.com/valyala/fasthttp v1.53.0 h1:lW/+SUkOxCx2vlIu0iaImv4JLrVRnbbkpCoaawvA4 github.com/valyala/fasthttp v1.53.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= +github.com/vmware/vmware-go-kcl v1.5.1 h1:1rJLfAX4sDnCyatNoD/WJzVafkwST6u/cgY/Uf2VgHk= +github.com/vmware/vmware-go-kcl v1.5.1/go.mod h1:kXJmQ6h0dRMRrp1uWU9XbIXvwelDpTxSPquvQUBdpbo= github.com/vmware/vmware-go-kcl-v2 v1.0.0 h1:HPT5vu+khRmGspBSc/+AilEWbRGoTZhjlYqdrBbRMZs= github.com/vmware/vmware-go-kcl-v2 v1.0.0/go.mod h1:GBDu+P4Neo0vwZAk0ZUCEC8GYsUOWvi3XhFwAZR3SjA= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= diff --git a/tests/certification/go.mod b/tests/certification/go.mod index 09fcde4eea..62bb69c81c 100644 --- a/tests/certification/go.mod +++ b/tests/certification/go.mod @@ -294,6 +294,7 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/tmc/langchaingo v0.1.13 // indirect + github.com/vmware/vmware-go-kcl v1.5.1 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect @@ -339,6 +340,7 @@ require ( google.golang.org/grpc v1.73.0 // indirect google.golang.org/protobuf v1.36.6 // indirect gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/api v0.31.0 // indirect diff --git a/tests/certification/go.sum b/tests/certification/go.sum index f88b0efb22..df721eb463 100644 --- a/tests/certification/go.sum +++ b/tests/certification/go.sum @@ -1432,6 +1432,8 @@ github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/vmware/vmware-go-kcl v1.5.1 h1:1rJLfAX4sDnCyatNoD/WJzVafkwST6u/cgY/Uf2VgHk= +github.com/vmware/vmware-go-kcl v1.5.1/go.mod h1:kXJmQ6h0dRMRrp1uWU9XbIXvwelDpTxSPquvQUBdpbo= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= From b1c1489bb6128550c8e8a33bafa537b00df8b902 Mon Sep 17 00:00:00 2001 From: rideshnath-scout Date: Wed, 26 Nov 2025 18:56:33 +0530 Subject: [PATCH 17/18] fix: improve consumer existence check with exponential backoff Signed-off-by: rideshnath-scout --- bindings/aws/kinesis/kinesis.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index 5bfe497b41..6acc0fecdd 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -339,18 +339,23 @@ func (a *AWSKinesis) deregisterConsumer(_ context.Context, streamARN *string, co } func (a *AWSKinesis) waitUntilConsumerExists(ctx context.Context, input *kinesis.DescribeStreamConsumerInput) error { - // Iterate 18 times - for range 18 { + ctx, cancel := context.WithTimeout(ctx, 3*time.Minute) + defer cancel() + + bo := backoff.NewExponentialBackOff() + bo.InitialInterval = 2 * time.Second + bo.MaxInterval = 30 * time.Second + + return backoff.Retry(func() error { consumer, err := a.kinesisClient.DescribeStreamConsumer(ctx, input) if err != nil { - return err + return backoff.Permanent(err) } - if consumer.ConsumerDescription.ConsumerStatus == types.ConsumerStatusActive { - return nil + if consumer.ConsumerDescription.ConsumerStatus != types.ConsumerStatusActive { + return errors.New("consumer not active yet") } - time.Sleep(10 * time.Second) - } - return errors.New("consumer did not become active within timeout") + return nil + }, backoff.WithContext(bo, ctx)) } func (a *AWSKinesis) parseMetadata(meta bindings.Metadata) (*kinesisMetadata, error) { From 9ca3669fbf4dc3d09e49e93f7a1b8306ba5c6046 Mon Sep 17 00:00:00 2001 From: rideshnath-scout Date: Thu, 27 Nov 2025 17:36:58 +0530 Subject: [PATCH 18/18] fix: clean up whitespace and improve consumer configuration check (resolve conflicts) Signed-off-by: rideshnath-scout --- bindings/aws/kinesis/kinesis.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/bindings/aws/kinesis/kinesis.go b/bindings/aws/kinesis/kinesis.go index 6acc0fecdd..575119e061 100644 --- a/bindings/aws/kinesis/kinesis.go +++ b/bindings/aws/kinesis/kinesis.go @@ -341,11 +341,11 @@ func (a *AWSKinesis) deregisterConsumer(_ context.Context, streamARN *string, co func (a *AWSKinesis) waitUntilConsumerExists(ctx context.Context, input *kinesis.DescribeStreamConsumerInput) error { ctx, cancel := context.WithTimeout(ctx, 3*time.Minute) defer cancel() - + bo := backoff.NewExponentialBackOff() bo.InitialInterval = 2 * time.Second bo.MaxInterval = 30 * time.Second - + return backoff.Retry(func() error { consumer, err := a.kinesisClient.DescribeStreamConsumer(ctx, input) if err != nil { @@ -431,8 +431,7 @@ func (a *AWSKinesis) getStreamARN(ctx context.Context, streamName string) (*stri } func (a *AWSKinesis) workerCfg(_ context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration { - const sharedMode = "shared" - if mode == sharedMode { + if mode == SharedThroughput { return config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", a.v2Credentials) } return nil