33// See the LICENSE file in the project root for more information
44
55using System . Collections . Concurrent ;
6- using System . Text . Json ;
76using Amazon . Lambda . Core ;
87using Amazon . Lambda . RuntimeSupport ;
98using Amazon . Lambda . Serialization . SystemTextJson ;
109using Amazon . Lambda . SQSEvents ;
1110using Amazon . S3 ;
12- using Amazon . S3 . Model ;
1311using Amazon . S3 . Util ;
1412using Elastic . Documentation . Lambda . LinkIndexUploader ;
1513using Elastic . Markdown . IO . State ;
1816const string bucketName = "elastic-docs-link-index" ;
1917const string indexFile = "link-index-test.json" ;
2018
21- await LambdaBootstrapBuilder . Create < SQSEvent , SQSBatchResponse > ( Handler , new SourceGeneratorLambdaJsonSerializer < LinkIndexUpdaterSerializerContext > ( ) )
19+ await LambdaBootstrapBuilder . Create < SQSEvent , SQSBatchResponse > ( Handler , new SourceGeneratorLambdaJsonSerializer < SerializerContext > ( ) )
2220 . Build ( )
2321 . RunAsync ( ) ;
2422
2725static async Task < SQSBatchResponse > Handler ( SQSEvent ev , ILambdaContext context )
2826{
2927 var s3Client = new AmazonS3Client ( ) ;
28+ var linkIndexUpdater = new LinkIndexUpdater ( s3Client , context . Logger , bucketName , indexFile ) ;
3029 var batchItemFailures = new List < SQSBatchResponse . BatchItemFailure > ( ) ;
31-
32- var getObjectRequest = new GetObjectRequest
33- {
34- BucketName = bucketName ,
35- Key = indexFile
36- } ;
37-
38- var getObjectResponse = await s3Client . GetObjectAsync ( getObjectRequest ) ;
39- await using var stream = getObjectResponse . ResponseStream ;
40- var linkIndex = LinkIndex . Deserialize ( stream ) ;
41- var currentETag = getObjectResponse . ETag ;
42- var processedMessageCount = 0 ;
43-
30+ var linkIndex = await linkIndexUpdater . GetLinkIndex ( ) ;
4431 foreach ( var message in ev . Records )
4532 {
4633 try
4734 {
48- var linkReferences = await GetLinkReferences ( s3Client , message , context ) ;
49- foreach ( var ( record , linkReference ) in linkReferences )
50- UpdateLinkIndex ( linkIndex , linkReference , record , context ) ;
51- processedMessageCount ++ ;
35+ var s3RecordLinkReferenceTuples = await GetS3RecordLinkReferenceTuples ( s3Client , message , context ) ;
36+ foreach ( var ( s3Record , linkReference ) in s3RecordLinkReferenceTuples )
37+ {
38+ var newEntry = ConvertToLinkIndexEntry ( s3Record , linkReference ) ;
39+ linkIndexUpdater . UpdateLinkIndexEntry ( linkIndex , newEntry ) ;
40+ }
5241 }
5342 catch ( Exception e )
5443 {
@@ -60,115 +49,54 @@ static async Task<SQSBatchResponse> Handler(SQSEvent ev, ILambdaContext context)
6049 } ) ;
6150 }
6251 }
63-
64- var json = LinkIndex . Serialize ( linkIndex ) ;
65-
66- var putObjectRequest = new PutObjectRequest
67- {
68- BucketName = bucketName ,
69- Key = indexFile ,
70- ContentBody = json ,
71- ContentType = "application/json" ,
72- IfMatch = currentETag
73- } ;
74-
7552 try
7653 {
77- _ = await s3Client . PutObjectAsync ( putObjectRequest ) ;
78- context . Logger . LogInformation ( "Successfully updated {bucketName}/{indexFile}." , bucketName , indexFile ) ;
79- context . Logger . LogInformation ( "Processed {processedMessageCount} messages." , processedMessageCount ) ;
54+ await linkIndexUpdater . SaveLinkIndex ( linkIndex ) ;
8055 var response = new SQSBatchResponse ( batchItemFailures ) ;
8156 if ( batchItemFailures . Count > 0 )
82- {
83- var jsonString = JsonSerializer . Serialize ( response , LinkIndexUpdaterSerializerContext . Default . SQSBatchResponse ) ;
84- context . Logger . LogInformation ( "Failed to process {batchItemFailuresCount} messages. Returning them to the queue." , batchItemFailures . Count ) ;
85- context . Logger . LogDebug ( jsonString ) ;
86- }
87-
57+ context . Logger . LogInformation ( "Failed to process {batchItemFailuresCount} of {allMessagesCount} messages. Returning them to the queue." , batchItemFailures . Count , ev . Records . Count ) ;
8858 return response ;
8959 }
9060 catch ( Exception ex )
9161 {
92- // if we fail to update the object, we need to return all the messages
9362 context . Logger . LogError ( "Failed to update {bucketName}/{indexFile}. Returning all {recordCount} messages to the queue." , bucketName , indexFile , ev . Records . Count ) ;
9463 context . Logger . LogError ( ex , ex . Message ) ;
95- ev . Records . ForEach ( m =>
96- {
97- context . Logger . LogInformation ( "Returning message {messageId} to the queue." , m . MessageId ) ;
98- } ) ;
99-
64+ // If we fail to update the link index, we need to return all messages to the queue
10065 var response = new SQSBatchResponse ( ev . Records . Select ( r => new SQSBatchResponse . BatchItemFailure
10166 {
10267 ItemIdentifier = r . MessageId
10368 } ) . ToList ( ) ) ;
104- var jsonString = JsonSerializer . Serialize ( response , LinkIndexUpdaterSerializerContext . Default . SQSBatchResponse ) ;
105- context . Logger . LogInformation ( jsonString ) ;
10669 return response ;
10770 }
10871}
10972
110- static async Task < IReadOnlyCollection < ( S3EventNotification . S3EventNotificationRecord , LinkReference ) > > GetLinkReferences ( IAmazonS3 s3Client , SQSEvent . SQSMessage message , ILambdaContext context )
73+ static LinkIndexEntry ConvertToLinkIndexEntry ( S3EventNotification . S3EventNotificationRecord record , LinkReference linkReference )
11174{
112- if ( string . IsNullOrEmpty ( message . Body ) )
113- throw new Exception ( "No Body in SQS Message." ) ;
114- context . Logger . LogDebug ( "Received message {messageBody}" , message . Body ) ;
115- var s3Event = S3EventNotification . ParseJson ( message . Body ) ;
116- if ( s3Event ? . Records == null || s3Event . Records . Count == 0 )
117- throw new Exception ( "Invalid S3 event message format" ) ;
118- var linkReferences = new ConcurrentBag < ( S3EventNotification . S3EventNotificationRecord , LinkReference ) > ( ) ;
119- await Parallel . ForEachAsync ( s3Event . Records , async ( record , ctx ) =>
120- {
121- var s3Bucket = record . S3 . Bucket ;
122- var s3Object = record . S3 . Object ;
123- context . Logger . LogInformation ( "Get object {key} from bucket {bucketName}" , s3Object . Key , s3Bucket . Name ) ;
124- var getObjectResponse = await s3Client . GetObjectAsync ( s3Bucket . Name , s3Object . Key , ctx ) ;
125- await using var stream = getObjectResponse . ResponseStream ;
126- context . Logger . LogInformation ( "Deserializing link reference from {key}" , s3Object . Key ) ;
127- var linkReference = LinkReference . Deserialize ( stream ) ;
128- linkReferences . Add ( ( record , linkReference ) ) ;
129- } ) ;
130- context . Logger . LogInformation ( "Deserialized {linkReferenceCount} link references from S3 event" , linkReferences . Count ) ;
131- return linkReferences ;
132- }
133-
134- static void UpdateLinkIndex ( LinkIndex linkIndex , LinkReference linkReference , S3EventNotification . S3EventNotificationRecord s3EventRecord , ILambdaContext context )
135- {
136- var s3Object = s3EventRecord . S3 . Object ;
75+ var s3Object = record . S3 . Object ;
13776 var keyTokens = s3Object . Key . Split ( '/' ) ;
13877 var repository = keyTokens [ 1 ] ;
13978 var branch = keyTokens [ 2 ] ;
140-
141- // TODO: This cannot be used for now because it's wrong if all link references were updated by the
142- // https://github.com/elastic/docs-internal-workflows/actions/workflows/update-all-link-reference.yml workflow
143- // var repository = linkReference.Origin.RepositoryName;
144- // var branch = linkReference.Origin.Branch;
145-
146- var newEntry = new LinkIndexEntry
79+ return new LinkIndexEntry
14780 {
14881 Repository = repository ,
14982 Branch = branch ,
15083 ETag = s3Object . ETag ,
15184 Path = s3Object . Key ,
152- UpdatedAt = s3EventRecord . EventTime ,
85+ UpdatedAt = record . EventTime ,
15386 GitReference = linkReference . Origin . Ref
15487 } ;
155- if ( linkIndex . Repositories . TryGetValue ( repository , out var existingEntry ) )
156- {
157- var newEntryIsNewer = DateTime . Compare ( newEntry . UpdatedAt , existingEntry [ branch ] . UpdatedAt ) > 0 ;
158- if ( newEntryIsNewer )
159- {
160- existingEntry [ branch ] = newEntry ;
161- context . Logger . LogInformation ( "Updated existing entry for {repository}@{branch}" , repository , branch ) ;
162- }
163- else
164- context . Logger . LogInformation ( "Skipping update for {repository}@{branch} because the existing entry is newer" , repository , branch ) ;
165- }
166- else
88+ }
89+
90+ static async Task < IReadOnlyCollection < ( S3EventNotification . S3EventNotificationRecord , LinkReference ) > > GetS3RecordLinkReferenceTuples ( IAmazonS3 s3Client ,
91+ SQSEvent . SQSMessage message , ILambdaContext context )
92+ {
93+ var s3Event = S3EventNotification . ParseJson ( message . Body ) ;
94+ var linkReferences = new ConcurrentBag < ( S3EventNotification . S3EventNotificationRecord , LinkReference ) > ( ) ;
95+ await Parallel . ForEachAsync ( s3Event . Records , async ( record , ctx ) =>
16796 {
168- linkIndex . Repositories . Add ( repository , new Dictionary < string , LinkIndexEntry >
169- {
170- { branch , newEntry }
171- } ) ;
172- context . Logger . LogInformation ( "Added new entry for {repository}@{branch}" , repository , branch ) ;
173- }
97+ var linkReferenceProvider = new LinkReferenceProvider ( s3Client , context . Logger , record . S3 . Bucket . Name ) ;
98+ var linkReference = await linkReferenceProvider . GetLinkReference ( record . S3 . Object . Key , ctx ) ;
99+ linkReferences . Add ( ( record , linkReference ) ) ;
100+ } ) ;
101+ return linkReferences ;
174102}
0 commit comments