-
Notifications
You must be signed in to change notification settings - Fork 544
Feat/kinesis binding vmware go kcl v2 latest #4082
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 18 commits
2eddb15
cabcec5
112fd8d
de8646f
90e071c
648a318
b686d94
9a70082
9dafc6d
8b93022
2ee207c
9b003aa
6ebe1a4
c064187
716b8a1
bf33f52
d8f5103
bd2639a
d75c04c
2a205cb
b1c1489
9ca3669
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,37 +22,40 @@ import ( | |
| "sync/atomic" | ||
| "time" | ||
|
|
||
| "github.com/aws/aws-sdk-go/aws" | ||
| "github.com/aws/aws-sdk-go/aws/request" | ||
| "github.com/aws/aws-sdk-go/service/kinesis" | ||
| "github.com/aws/aws-sdk-go-v2/aws" | ||
| "github.com/aws/aws-sdk-go-v2/service/kinesis" | ||
| "github.com/aws/aws-sdk-go-v2/service/kinesis/types" | ||
| "github.com/cenkalti/backoff/v4" | ||
| "github.com/google/uuid" | ||
| "github.com/vmware/vmware-go-kcl/clientlibrary/interfaces" | ||
| "github.com/vmware/vmware-go-kcl/clientlibrary/worker" | ||
| "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config" | ||
| "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" | ||
| "github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker" | ||
|
|
||
| "github.com/dapr/components-contrib/bindings" | ||
| awsAuth "github.com/dapr/components-contrib/common/authentication/aws" | ||
| awscommon "github.com/dapr/components-contrib/common/aws" | ||
| awsAuth "github.com/dapr/components-contrib/common/aws/auth" | ||
| "github.com/dapr/components-contrib/metadata" | ||
| "github.com/dapr/kit/logger" | ||
| kitmd "github.com/dapr/kit/metadata" | ||
| ) | ||
|
|
||
| // AWSKinesis allows receiving and sending data to/from AWS Kinesis stream. | ||
| type AWSKinesis struct { | ||
| authProvider awsAuth.Provider | ||
| metadata *kinesisMetadata | ||
|
|
||
| worker *worker.Worker | ||
|
|
||
| streamName string | ||
| consumerName string | ||
| consumerARN *string | ||
| logger logger.Logger | ||
| consumerMode string | ||
|
|
||
| closed atomic.Bool | ||
| closeCh chan struct{} | ||
| wg sync.WaitGroup | ||
| metadata *kinesisMetadata | ||
|
|
||
| worker *worker.Worker | ||
| kinesisClient *kinesis.Client | ||
| v2Credentials aws.CredentialsProvider | ||
|
|
||
| streamName string | ||
| consumerName string | ||
| consumerARN *string | ||
| logger logger.Logger | ||
| consumerMode string | ||
| applicationName string | ||
| closed atomic.Bool | ||
| closeCh chan struct{} | ||
| wg sync.WaitGroup | ||
| } | ||
|
|
||
| // TODO: we need to clean up the metadata fields here and update this binding to use the builtin aws auth provider and reflect in metadata.yaml | ||
|
|
@@ -65,6 +68,7 @@ type kinesisMetadata struct { | |
| SecretKey string `json:"secretKey" mapstructure:"secretKey"` | ||
| SessionToken string `json:"sessionToken" mapstructure:"sessionToken"` | ||
| KinesisConsumerMode string `json:"mode" mapstructure:"mode"` | ||
| ApplicationName string `json:"applicationName" mapstructure:"applicationName"` | ||
| } | ||
|
|
||
| const ( | ||
|
|
@@ -116,6 +120,7 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error | |
| a.consumerMode = m.KinesisConsumerMode | ||
| a.streamName = m.StreamName | ||
| a.consumerName = m.ConsumerName | ||
| a.applicationName = m.ApplicationName | ||
| a.metadata = m | ||
|
|
||
| opts := awsAuth.Options{ | ||
|
|
@@ -126,12 +131,13 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error | |
| SecretKey: m.SecretKey, | ||
| SessionToken: "", | ||
| } | ||
| // extra configs needed per component type | ||
| provider, err := awsAuth.NewProvider(ctx, opts, awsAuth.GetConfig(opts)) | ||
|
|
||
| kinesisClient, err := a.createKinesisClient(ctx, opts) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| a.authProvider = provider | ||
| a.kinesisClient = kinesisClient | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -144,7 +150,7 @@ func (a *AWSKinesis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (* | |
| if partitionKey == "" { | ||
| partitionKey = uuid.New().String() | ||
| } | ||
| _, err := a.authProvider.Kinesis().Kinesis.PutRecordWithContext(ctx, &kinesis.PutRecordInput{ | ||
| _, err := a.kinesisClient.PutRecord(ctx, &kinesis.PutRecordInput{ | ||
| StreamName: &a.metadata.StreamName, | ||
| Data: req.Data, | ||
| PartitionKey: &partitionKey, | ||
|
|
@@ -158,21 +164,23 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er | |
| return errors.New("binding is closed") | ||
| } | ||
|
|
||
| if a.metadata.KinesisConsumerMode == SharedThroughput { | ||
| switch a.metadata.KinesisConsumerMode { | ||
| case SharedThroughput: | ||
| // initialize worker configuration | ||
| config := a.workerCfg(ctx, a.streamName, a.metadata.Region, a.consumerMode, a.applicationName) | ||
| // Configure the KCL worker with custom endpoints for LocalStack | ||
| config := a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.consumerName, a.consumerMode) | ||
| if a.metadata.Endpoint != "" { | ||
| config.KinesisEndpoint = a.metadata.Endpoint | ||
| config.DynamoDBEndpoint = a.metadata.Endpoint | ||
| config = config.WithKinesisEndpoint(a.metadata.Endpoint) | ||
| config = config.WithDynamoDBEndpoint(a.metadata.Endpoint) | ||
| } | ||
| a.worker = worker.NewWorker(a.recordProcessorFactory(ctx, handler), config) | ||
| err = a.worker.Start() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } else if a.metadata.KinesisConsumerMode == ExtendedFanout { | ||
| case ExtendedFanout: | ||
| var stream *kinesis.DescribeStreamOutput | ||
| stream, err = a.authProvider.Kinesis().Kinesis.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName}) | ||
| stream, err = a.kinesisClient.DescribeStream(ctx, &kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName}) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
@@ -182,7 +190,7 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er | |
| } | ||
| } | ||
|
|
||
| stream, err := a.authProvider.Kinesis().Stream(ctx, a.streamName) | ||
| stream, err := a.getStreamARN(ctx, a.streamName) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get kinesis stream arn: %v", err) | ||
| } | ||
|
|
@@ -194,9 +202,10 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er | |
| case <-ctx.Done(): | ||
| case <-a.closeCh: | ||
| } | ||
| if a.metadata.KinesisConsumerMode == SharedThroughput { | ||
| switch a.metadata.KinesisConsumerMode { | ||
| case SharedThroughput: | ||
| a.worker.Shutdown() | ||
| } else if a.metadata.KinesisConsumerMode == ExtendedFanout { | ||
| case ExtendedFanout: | ||
| a.deregisterConsumer(ctx, stream, a.consumerARN) | ||
| } | ||
| }() | ||
|
|
@@ -205,7 +214,7 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er | |
| } | ||
|
|
||
| // Subscribe to all shards. | ||
| func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDescription, handler bindings.Handler) error { | ||
| func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc types.StreamDescription, handler bindings.Handler) error { | ||
| consumerARN, err := a.ensureConsumer(ctx, streamDesc.StreamARN) | ||
| if err != nil { | ||
| a.logger.Error(err) | ||
|
|
@@ -216,7 +225,7 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDes | |
|
|
||
| a.wg.Add(len(streamDesc.Shards)) | ||
| for i, shard := range streamDesc.Shards { | ||
| go func(idx int, s *kinesis.Shard) { | ||
| go func(idx int, s types.Shard) { | ||
| defer a.wg.Done() | ||
|
|
||
| // Reconnection backoff | ||
|
|
@@ -232,14 +241,14 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDes | |
| return | ||
| default: | ||
| } | ||
| sub, err := a.authProvider.Kinesis().Kinesis.SubscribeToShardWithContext(ctx, &kinesis.SubscribeToShardInput{ | ||
| sub, err := a.kinesisClient.SubscribeToShard(ctx, &kinesis.SubscribeToShardInput{ | ||
| ConsumerARN: consumerARN, | ||
| ShardId: s.ShardId, | ||
| StartingPosition: &kinesis.StartingPosition{Type: aws.String(kinesis.ShardIteratorTypeLatest)}, | ||
| StartingPosition: &types.StartingPosition{Type: types.ShardIteratorTypeLatest}, | ||
| }) | ||
| if err != nil { | ||
| wait := bo.NextBackOff() | ||
| a.logger.Errorf("Error while reading from shard %v: %v. Attempting to reconnect in %s...", s.ShardId, err, wait) | ||
| a.logger.Errorf("Error while reading from shard %v: %v. Attempting to reconnect in %s...", *s.ShardId, err, wait) | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
|
|
@@ -252,10 +261,10 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDes | |
| bo.Reset() | ||
|
|
||
| // Process events | ||
| for event := range sub.EventStream.Events() { | ||
| for event := range sub.GetStream().Events() { | ||
| switch e := event.(type) { | ||
| case *kinesis.SubscribeToShardEvent: | ||
| for _, rec := range e.Records { | ||
| case *types.SubscribeToShardEventStreamMemberSubscribeToShardEvent: | ||
| for _, rec := range e.Value.Records { | ||
| handler(ctx, &bindings.ReadResponse{ | ||
| Data: rec.Data, | ||
| }) | ||
|
|
@@ -274,17 +283,14 @@ func (a *AWSKinesis) Close() error { | |
| close(a.closeCh) | ||
| } | ||
| a.wg.Wait() | ||
| if a.authProvider != nil { | ||
| return a.authProvider.Close() | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*string, error) { | ||
| // Only set timeout on consumer call. | ||
| conCtx, cancel := context.WithTimeout(ctx, 30*time.Second) | ||
| defer cancel() | ||
| consumer, err := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumerWithContext(conCtx, &kinesis.DescribeStreamConsumerInput{ | ||
| consumer, err := a.kinesisClient.DescribeStreamConsumer(conCtx, &kinesis.DescribeStreamConsumerInput{ | ||
| ConsumerName: &a.metadata.ConsumerName, | ||
| StreamARN: streamARN, | ||
| }) | ||
|
|
@@ -296,7 +302,7 @@ func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*st | |
| } | ||
|
|
||
| func (a *AWSKinesis) registerConsumer(ctx context.Context, streamARN *string) (*string, error) { | ||
| consumer, err := a.authProvider.Kinesis().Kinesis.RegisterStreamConsumerWithContext(ctx, &kinesis.RegisterStreamConsumerInput{ | ||
| consumer, err := a.kinesisClient.RegisterStreamConsumer(ctx, &kinesis.RegisterStreamConsumerInput{ | ||
| ConsumerName: &a.metadata.ConsumerName, | ||
| StreamARN: streamARN, | ||
| }) | ||
|
|
@@ -315,11 +321,11 @@ func (a *AWSKinesis) registerConsumer(ctx context.Context, streamARN *string) (* | |
| return consumer.Consumer.ConsumerARN, nil | ||
| } | ||
|
|
||
| func (a *AWSKinesis) deregisterConsumer(ctx context.Context, streamARN *string, consumerARN *string) error { | ||
| func (a *AWSKinesis) deregisterConsumer(_ context.Context, streamARN *string, consumerARN *string) error { | ||
| if a.consumerARN != nil { | ||
| // Use a background context because the running context may have been canceled already | ||
| ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | ||
| _, err := a.authProvider.Kinesis().Kinesis.DeregisterStreamConsumerWithContext(ctx, &kinesis.DeregisterStreamConsumerInput{ | ||
| _, err := a.kinesisClient.DeregisterStreamConsumer(ctx, &kinesis.DeregisterStreamConsumerInput{ | ||
| ConsumerARN: consumerARN, | ||
| StreamARN: streamARN, | ||
| ConsumerName: &a.metadata.ConsumerName, | ||
|
|
@@ -332,34 +338,19 @@ func (a *AWSKinesis) deregisterConsumer(ctx context.Context, streamARN *string, | |
| return nil | ||
| } | ||
|
|
||
| func (a *AWSKinesis) waitUntilConsumerExists(ctx aws.Context, input *kinesis.DescribeStreamConsumerInput, opts ...request.WaiterOption) error { | ||
| w := request.Waiter{ | ||
| Name: "WaitUntilConsumerExists", | ||
| MaxAttempts: 18, | ||
| Delay: request.ConstantWaiterDelay(10 * time.Second), | ||
| Acceptors: []request.WaiterAcceptor{ | ||
| { | ||
| State: request.SuccessWaiterState, | ||
| Matcher: request.PathWaiterMatch, Argument: "ConsumerDescription.ConsumerStatus", | ||
| Expected: "ACTIVE", | ||
| }, | ||
| }, | ||
| NewRequest: func(opts []request.Option) (*request.Request, error) { | ||
| var inCpy *kinesis.DescribeStreamConsumerInput | ||
| if input != nil { | ||
| tmp := *input | ||
| inCpy = &tmp | ||
| } | ||
| req, _ := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumerRequest(inCpy) | ||
| req.SetContext(ctx) | ||
| req.ApplyOptions(opts...) | ||
|
|
||
| return req, nil | ||
| }, | ||
| func (a *AWSKinesis) waitUntilConsumerExists(ctx context.Context, input *kinesis.DescribeStreamConsumerInput) error { | ||
| // Iterate 18 times | ||
| for range 18 { | ||
| consumer, err := a.kinesisClient.DescribeStreamConsumer(ctx, input) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if consumer.ConsumerDescription.ConsumerStatus == types.ConsumerStatusActive { | ||
| return nil | ||
| } | ||
| time.Sleep(10 * time.Second) | ||
| } | ||
| w.ApplyOptions(opts...) | ||
|
|
||
| return w.WaitWithContext(ctx) | ||
| return errors.New("consumer did not become active within timeout") | ||
|
||
| } | ||
|
|
||
| func (a *AWSKinesis) parseMetadata(meta bindings.Metadata) (*kinesisMetadata, error) { | ||
|
|
@@ -388,7 +379,7 @@ func (r *recordProcessorFactory) CreateProcessor() interfaces.IRecordProcessor { | |
| } | ||
|
|
||
| func (p *recordProcessor) Initialize(input *interfaces.InitializationInput) { | ||
| p.logger.Infof("Processing ShardId: %v at checkpoint: %v", input.ShardId, aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber)) | ||
| p.logger.Infof("Processing ShardId: %v at checkpoint: %v", input.ShardId, *input.ExtendedSequenceNumber.SequenceNumber) | ||
| } | ||
|
|
||
| func (p *recordProcessor) ProcessRecords(input *interfaces.ProcessRecordsInput) { | ||
|
|
@@ -414,6 +405,34 @@ func (p *recordProcessor) Shutdown(input *interfaces.ShutdownInput) { | |
| } | ||
| } | ||
|
|
||
| func (a *AWSKinesis) createKinesisClient(ctx context.Context, opts awsAuth.Options) (*kinesis.Client, error) { | ||
| awsConfig, configErr := awscommon.NewConfig(ctx, opts) | ||
| if configErr != nil { | ||
| return nil, configErr | ||
| } | ||
|
|
||
| kinesisClient := kinesis.NewFromConfig(awsConfig) | ||
| return kinesisClient, nil | ||
| } | ||
|
|
||
| func (a *AWSKinesis) getStreamARN(ctx context.Context, streamName string) (*string, error) { | ||
| stream, err := a.kinesisClient.DescribeStream(ctx, &kinesis.DescribeStreamInput{ | ||
| StreamName: &streamName, | ||
| }) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return stream.StreamDescription.StreamARN, nil | ||
| } | ||
|
|
||
| func (a *AWSKinesis) workerCfg(_ context.Context, stream, region, mode, applicationName string) *config.KinesisClientLibConfiguration { | ||
| const sharedMode = "shared" | ||
| if mode == sharedMode { | ||
| return config.NewKinesisClientLibConfigWithCredential(applicationName, stream, region, "", a.v2Credentials) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // GetComponentMetadata returns the metadata of the component. | ||
| func (a *AWSKinesis) GetComponentMetadata() (metadataInfo metadata.MetadataMap) { | ||
| metadataStruct := &kinesisMetadata{} | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.