Skip to content

Commit 2aaa4f5

Browse files
committed
Optional manual approval step
1 parent b7c637f commit 2aaa4f5

File tree

5 files changed

+117
-15
lines changed

5 files changed

+117
-15
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import logging
2+
import os
3+
import time
4+
5+
import boto3
6+
from botocore.exceptions import ClientError
7+
8+
log = logging.getLogger(__name__)
9+
log.setLevel(logging.INFO)
10+
11+
BUCKET_NAME = os.environ['BUCKET_NAME']
12+
FILE_NAME = os.environ['FILE_NAME']
13+
14+
s3 = boto3.client('s3')
15+
16+
17+
def on_create(event):
18+
log.info("Got Create for %s", event)
19+
20+
while True:
21+
try:
22+
log.info(f"Try to fetch object '{FILE_NAME}' from bucket '{BUCKET_NAME}'")
23+
response = s3.head_object(Bucket=BUCKET_NAME, Key=FILE_NAME)
24+
25+
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
26+
log.info(f"Found the object with response: {response}")
27+
return {'PhysicalResourceId': f"Flink-Application-Created-In-{BUCKET_NAME}/{FILE_NAME}"}
28+
29+
else:
30+
log.error(f"Invalid response from S3: {response}")
31+
raise ValueError("Invalid response from S3: ", response)
32+
33+
except ClientError as e:
34+
if e.response['ResponseMetadata']['HTTPStatusCode'] == 404:
35+
log.info(f"Not found the object yet with response: {e}")
36+
time.sleep(30)
37+
38+
else:
39+
log.error(f"Invalid state from S3: {e}")
40+
raise ValueError("Invalid state from S3: ", e)
41+
42+
43+
def on_update(event):
44+
log.info("Got Update for %s", event["PhysicalResourceId"])
45+
# If the update resulted in a new resource being created, return an id for the new resource.
46+
# CloudFormation will send a delete event with the old id when stack update completes
47+
48+
49+
def on_delete(event):
50+
log.info("Got Delete for %s", event["PhysicalResourceId"])
51+
# Delete never returns anything. Should not fail if the underlying resources are already deleted.
52+
# Desired state.
53+
54+
55+
def on_event(event, context):
56+
log.info("Received event: %s", event)
57+
58+
request_type = event['RequestType']
59+
if request_type == 'Create':
60+
return on_create(event)
61+
if request_type == 'Update':
62+
return on_update(event)
63+
if request_type == 'Delete':
64+
return on_delete(event)
65+
66+
raise Exception("Invalid request type: %s" % request_type)

infrastructure-cdk/lib/application-stack.ts

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
import {Aws, Fn, RemovalPolicy, Stack, StackProps} from 'aws-cdk-lib';
1+
import {Aws, CustomResource, Duration, Fn, RemovalPolicy, Stack, StackProps} from 'aws-cdk-lib';
22
import {Construct} from 'constructs';
33
import {Bucket} from "aws-cdk-lib/aws-s3";
4-
import {APPLICATION_NAME, ASSET_BUCKET_EXPORT_NAME} from "./shared-vars";
4+
import {APPLICATION_NAME, ASSET_BUCKET_EXPORT_NAME, MANUAL_APPROVAL_REQUIRED} from "./shared-vars";
55
import {Stream, StreamEncryption, StreamMode} from "aws-cdk-lib/aws-kinesis";
6-
import {Code, Function, Runtime} from "aws-cdk-lib/aws-lambda";
6+
import {Architecture, Code, Function, Runtime} from "aws-cdk-lib/aws-lambda";
77
import * as kda from "@aws-cdk/aws-kinesisanalytics-flink-alpha";
88
import {ApplicationRuntime} from "./constructs/application-runtime";
9+
import {Provider} from "aws-cdk-lib/custom-resources";
910

1011
interface ApplicationStackProps {
1112
runtime: ApplicationRuntime,
@@ -69,5 +70,31 @@ export class ApplicationStack extends Stack {
6970

7071
stream.grantRead(application);
7172

73+
if (!MANUAL_APPROVAL_REQUIRED) {
74+
// Wait for the build artifact to be initialized
75+
const flinkArtifactCreatedHook = new Function(this, 'FlinkArtifactCreatedHook', {
76+
runtime: Runtime.PYTHON_3_9,
77+
code: Code.fromAsset('./flink-artifact-created-hook'),
78+
handler: 'app.on_event',
79+
architecture: Architecture.ARM_64,
80+
timeout: Duration.minutes(15),
81+
environment: {
82+
BUCKET_NAME: assetBucket.bucketName,
83+
FILE_NAME: binaryPath
84+
}
85+
});
86+
assetBucket.grantRead(flinkArtifactCreatedHook);
87+
88+
89+
const flinkArtifactCreatedHookProvider = new Provider(this, 'FlinkArtifactCreatedHookProvider', {
90+
onEventHandler: flinkArtifactCreatedHook
91+
});
92+
const flinkArtifactCreatedHookCustomResource = new CustomResource(this, 'FlinkArtifactCreatedHookCustomResource', {
93+
serviceToken: flinkArtifactCreatedHookProvider.serviceToken
94+
});
95+
96+
application.node.addDependency(flinkArtifactCreatedHookCustomResource);
97+
}
98+
7299
}
73100
}

infrastructure-cdk/lib/constructs/java-build-pipeline.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
import {Code, Function, Runtime} from "aws-cdk-lib/aws-lambda";
1313
import {Aws, Duration} from "aws-cdk-lib";
1414
import {PolicyStatement} from "aws-cdk-lib/aws-iam";
15-
import {SOURCE_CODE_ZIP} from "../shared-vars";
15+
import {MANUAL_APPROVAL_REQUIRED, SOURCE_CODE_ZIP} from "../shared-vars";
1616

1717

1818
interface JavaBuildPipelineProps {
@@ -106,14 +106,18 @@ export class JavaBuildPipeline extends Construct {
106106
input: buildOutput,
107107
extract: true
108108
})]
109-
}, {
110-
stageName: "approval",
111-
actions: [new ManualApprovalAction({
112-
actionName: "Manual"
113-
})]
114109
}]
115110
});
116111

112+
if (MANUAL_APPROVAL_REQUIRED) {
113+
this.pipeline.addStage({
114+
stageName: "approval",
115+
actions: [new ManualApprovalAction({
116+
actionName: "Manual"
117+
})]
118+
})
119+
}
120+
117121
const versionUpdateFn = new Function(this, 'version-update-fn', {
118122
code: Code.fromAsset('flink-app-redeploy-hook'),
119123
handler: "app.lambda_handler",

infrastructure-cdk/lib/constructs/python-build-pipeline.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
import {Code, Function, Runtime} from "aws-cdk-lib/aws-lambda";
1313
import {Aws, Duration} from "aws-cdk-lib";
1414
import {PolicyStatement} from "aws-cdk-lib/aws-iam";
15-
import {SOURCE_CODE_ZIP} from "../shared-vars";
15+
import {MANUAL_APPROVAL_REQUIRED, SOURCE_CODE_ZIP} from "../shared-vars";
1616

1717

1818
interface PythonBuildPipelineProps {
@@ -107,14 +107,18 @@ export class PythonBuildPipeline extends Construct {
107107
input: buildOutput,
108108
extract: true
109109
})]
110-
}, {
111-
stageName: "approval",
112-
actions: [new ManualApprovalAction({
113-
actionName: "Manual"
114-
})]
115110
}]
116111
});
117112

113+
if (MANUAL_APPROVAL_REQUIRED) {
114+
this.pipeline.addStage({
115+
stageName: "approval",
116+
actions: [new ManualApprovalAction({
117+
actionName: "Manual"
118+
})]
119+
})
120+
}
121+
118122
const versionUpdateFn = new Function(this, 'version-update-fn', {
119123
code: Code.fromAsset('flink-app-redeploy-hook'),
120124
handler: "app.lambda_handler",

infrastructure-cdk/lib/shared-vars.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ export const APPLICATION_NAME = "kinesis-analytics-application"
44
export const BUILD_FOR_RUNTIME = ApplicationRuntime.JAVA
55
export const SOURCE_CODE_ZIP = "automate-deployment-and-version-update-of-kda-application-main.zip"
66
export const ASSET_BUCKET_EXPORT_NAME = "Blog::Artifact::BucketName"
7+
export const MANUAL_APPROVAL_REQUIRED = true

0 commit comments

Comments
 (0)