Skip to content

Commit 7f37543

Browse files
committed
Added pipelines based on s3 source and functions
1 parent 962dd84 commit 7f37543

File tree

8 files changed

+205
-79
lines changed

8 files changed

+205
-79
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import boto3
2+
import os
3+
4+
kinesis = boto3.client("kinesis")
5+
6+
def lambda_handler(event, context):
7+
kinesis.put_record(StreamName=os.environ["STREAM_NAME"], Data=b'This is test message from the Lambda function', PartitionKey="default")
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import json
2+
import os
3+
import traceback
4+
5+
import boto3
6+
7+
asset_bucket_arn = os.environ["ASSET_BUCKET_ARN"]
8+
kinesis_apps = boto3.client("kinesisanalyticsv2")
9+
code_pipeline = boto3.client('codepipeline')
10+
11+
12+
def put_job_success(job, message):
13+
"""Notify CodePipeline of a successful job
14+
15+
Args:
16+
job: The CodePipeline job ID
17+
message: A message to be logged relating to the job status
18+
19+
Raises:
20+
Exception: Any exception thrown by .put_job_success_result()
21+
22+
"""
23+
print('Putting job success')
24+
print(message)
25+
code_pipeline.put_job_success_result(jobId=job)
26+
27+
28+
def put_job_failure(job, message):
29+
"""Notify CodePipeline of a failed job
30+
31+
Args:
32+
job: The CodePipeline job ID
33+
message: A message to be logged relating to the job status
34+
35+
Raises:
36+
Exception: Any exception thrown by .put_job_failure_result()
37+
38+
"""
39+
print('Putting job failure')
40+
print(message)
41+
code_pipeline.put_job_failure_result(jobId=job, failureDetails={'message': message, 'type': 'JobFailed'})
42+
43+
44+
def lambda_handler(event, context):
45+
print(json.dumps(event))
46+
# Extract the Job ID
47+
job_id = event['CodePipeline.job']['id']
48+
49+
if "APP_NAME" in event:
50+
app_name = event["APP_NAME"]
51+
else:
52+
app_name = os.environ["APP_NAME"]
53+
54+
if "FILE_KEY" in event:
55+
file_key = event["FILE_KEY"]
56+
else:
57+
file_key = os.environ["FILE_KEY"]
58+
59+
try:
60+
token = kinesis_apps.describe_application(ApplicationName=app_name)["ApplicationDetail"]["ConditionalToken"]
61+
kinesis_apps.update_application(ApplicationName=app_name,
62+
ApplicationConfigurationUpdate={
63+
"ApplicationCodeConfigurationUpdate": {
64+
"CodeContentUpdate": {
65+
"S3ContentLocationUpdate": {"BucketARNUpdate": asset_bucket_arn,
66+
"FileKeyUpdate": file_key}}}
67+
68+
}, ConditionalToken=token)
69+
put_job_success(job_id, "Success")
70+
except Exception as e:
71+
traceback.print_exc()
72+
put_job_failure(job_id, 'Function exception: ' + str(e))
73+
74+
75+
if __name__ == '__main__':
76+
lambda_handler({}, {})

infrastructure-cdk/lib/application-pipeline-stack.ts

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,37 @@
1-
import {Aws, CfnOutput, Stack, StackProps} from 'aws-cdk-lib';
1+
import {Aws, CfnOutput, RemovalPolicy, Stack, StackProps} from 'aws-cdk-lib';
22
import {Construct} from 'constructs';
33
import {JavaBuildPipeline} from "./constructs/java-build-pipeline";
44
import {BlockPublicAccess, Bucket, BucketEncryption} from "aws-cdk-lib/aws-s3";
5-
import {ASSET_BUCKET_EXPORT_NAME, REPOSITORY_FILE_PATH} from "./shared-vars";
5+
import {APPLICATION_NAME, ASSET_BUCKET_EXPORT_NAME} from "./shared-vars";
66

77
export class ApplicationPipelineStack extends Stack {
88
constructor(scope: Construct, id: string, props?: StackProps) {
99
super(scope, id, props);
1010

11-
const artefactBucket = new Bucket(this, 'ArtefactBucket', {
11+
const artifactBucket = new Bucket(this, 'ArtefactBucket', {
1212
blockPublicAccess: BlockPublicAccess.BLOCK_ALL,
1313
enforceSSL: true,
14-
encryption: BucketEncryption.S3_MANAGED
14+
encryption: BucketEncryption.S3_MANAGED,
15+
versioned: true,
16+
autoDeleteObjects: true,
17+
removalPolicy: RemovalPolicy.DESTROY
1518
});
1619

1720
new JavaBuildPipeline(this, 'java-app', {
18-
deployBucket: artefactBucket, repositoryName: REPOSITORY_FILE_PATH
21+
appName: APPLICATION_NAME,
22+
deployBucket: artifactBucket,
23+
repositoryName: APPLICATION_NAME
1924
});
2025

21-
new CfnOutput(this, 'AssetBucketName', {
22-
value: artefactBucket.bucketName,
23-
description: "Artefact Bucket name storing application binaries",
26+
new CfnOutput(this, 'ArtifactBucketName', {
27+
value: artifactBucket.bucketName,
28+
description: "Artifact Bucket name storing application binaries",
2429
exportName: ASSET_BUCKET_EXPORT_NAME
2530
});
2631

27-
new CfnOutput(this, 'AssetBucketLink', {
28-
value: "https://s3.console.aws.amazon.com/s3/buckets/" + artefactBucket.bucketName + "?region=" + Aws.REGION + "&tab=objects",
29-
description: "Artefact Bucket Link"
32+
new CfnOutput(this, 'ArtifactBucketLink', {
33+
value: "https://s3.console.aws.amazon.com/s3/buckets/" + artifactBucket.bucketName + "?region=" + Aws.REGION + "&tab=objects",
34+
description: "Artifact Bucket Link"
3035
});
3136

3237
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import {Aws, Fn, Stack, StackProps} from 'aws-cdk-lib';
2+
import {Construct} from 'constructs';
3+
import {Bucket} from "aws-cdk-lib/aws-s3";
4+
import {APPLICATION_NAME, ASSET_BUCKET_EXPORT_NAME} from "./shared-vars";
5+
import {Stream, StreamEncryption, StreamMode} from "aws-cdk-lib/aws-kinesis";
6+
import {Code, Function, Runtime} from "aws-cdk-lib/aws-lambda";
7+
import * as kda from "@aws-cdk/aws-kinesisanalytics-flink-alpha";
8+
9+
export class ApplicationStack extends Stack {
10+
constructor(scope: Construct, id: string, props?: StackProps) {
11+
super(scope, id, props);
12+
const assetBucket = Bucket.fromBucketName(this, 'imported-asset-bucket', Fn.importValue(ASSET_BUCKET_EXPORT_NAME));
13+
14+
const stream = new Stream(this, 'raw', {
15+
streamMode: StreamMode.PROVISIONED,
16+
shardCount: 1,
17+
encryption: StreamEncryption.MANAGED
18+
});
19+
20+
const dataSourceFn = new Function(this, 'data-source', {
21+
code: Code.fromAsset("data-source-function"),
22+
handler: "app.lambda_handler",
23+
runtime: Runtime.PYTHON_3_9,
24+
environment: {
25+
STREAM_NAME: stream.streamName
26+
}
27+
});
28+
29+
stream.grantWrite(dataSourceFn);
30+
31+
const application = new kda.Application(this, 'app', {
32+
code: kda.ApplicationCode.fromBucket(assetBucket, "jars/" + APPLICATION_NAME + "-latest.jar"),
33+
runtime: kda.Runtime.FLINK_1_13,
34+
propertyGroups: {
35+
"KinesisReader": {
36+
"input.stream.name": stream.streamName,
37+
"aws.region": Aws.REGION,
38+
"flink.stream.initpos": "LATEST"
39+
},
40+
},
41+
snapshotsEnabled: false,
42+
parallelismPerKpu: 1
43+
}
44+
);
45+
46+
stream.grantRead(application);
47+
48+
}
49+
}

infrastructure-cdk/lib/constructs/java-build-pipeline.ts

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,22 @@ import {
66
CodeBuildAction,
77
LambdaInvokeAction,
88
ManualApprovalAction,
9-
S3DeployAction
9+
S3DeployAction,
10+
S3SourceAction
1011
} from "aws-cdk-lib/aws-codepipeline-actions";
11-
import {IFunction} from "aws-cdk-lib/aws-lambda/lib/function-base";
12+
import * as path from "path";
13+
import {Code, Function, Runtime} from "aws-cdk-lib/aws-lambda";
14+
import {Aws, Duration} from "aws-cdk-lib";
15+
import {PolicyStatement} from "aws-cdk-lib/aws-iam";
16+
import {SOURCE_CODE_ZIP} from "../shared-vars";
1217

1318

1419
interface JavaBuildPipelineProps {
20+
appName: string
1521
repositoryName: string
1622
deployBucket: IBucket
1723
projectRoot?: string
1824
deployBucketBasePath?: string
19-
postActionLambda?: IFunction
2025
}
2126

2227
export class JavaBuildPipeline extends Construct {
@@ -43,8 +48,6 @@ export class JavaBuildPipeline extends Construct {
4348
},
4449
build: {
4550
commands: [
46-
`curl ${props.repositoryName} --output app.zip`, // Download zip directly from Github
47-
'unzip app.zip',
4851
`cd ${directory}`,
4952
'mvn clean package -B',
5053
`mkdir -p ${s3BasePath}`,
@@ -83,9 +86,17 @@ export class JavaBuildPipeline extends Construct {
8386
// repo: props.repositoryName
8487
// })]
8588
// },
89+
{
90+
stageName: "source", actions: [new S3SourceAction({
91+
output: sourceAsset,
92+
actionName: "Checkout",
93+
bucket: props.deployBucket,
94+
bucketKey: SOURCE_CODE_ZIP
95+
})]
96+
},
8697
{
8798
stageName: "build", actions: [new CodeBuildAction({
88-
actionName: "CodeBuild", input: sourceAsset, project: project, outputs: [buildOutput]
99+
input: sourceAsset, actionName: "CodeBuild", project: project, outputs: [buildOutput]
89100
})]
90101
}, {
91102
stageName: "saveArtefact", actions: [new S3DeployAction({
@@ -102,13 +113,33 @@ export class JavaBuildPipeline extends Construct {
102113
}]
103114
});
104115

105-
if (props.postActionLambda) {
106-
pipeline.addStage({
107-
stageName: "deploy", actions: [new LambdaInvokeAction({
108-
actionName: "Deploy",
109-
lambda: props.postActionLambda
110-
})]
111-
});
112-
}
116+
const versionUpdateFn = new Function(this, 'version-update-fn', {
117+
code: Code.fromAsset(path.join(__dirname, '../../flink-app-redeploy-hook')),
118+
handler: "app.lambda_handler",
119+
runtime: Runtime.PYTHON_3_9,
120+
environment: {
121+
ASSET_BUCKET_ARN: props.deployBucket.bucketArn,
122+
FILE_KEY: s3BasePath + "/" + props.appName + "-latest.jar",
123+
APP_NAME: props.appName
124+
},
125+
timeout: Duration.minutes(1)
126+
});
127+
128+
versionUpdateFn.addToRolePolicy(new PolicyStatement({
129+
actions: ["kinesisanalytics:DescribeApplication", "kinesisanalytics:UpdateApplication"],
130+
resources: ["arn:aws:kinesisanalytics:" + Aws.REGION + ":" + Aws.ACCOUNT_ID + ":application/*"]
131+
}));
132+
versionUpdateFn.addToRolePolicy(new PolicyStatement({
133+
resources: ["*"],
134+
actions: ["codepipeline:PutJobSuccessResult", "codepipeline:PutJobFailureResult"]
135+
}));
136+
137+
pipeline.addStage({
138+
stageName: "deploy", actions: [new LambdaInvokeAction({
139+
actionName: "Deploy",
140+
lambda: versionUpdateFn
141+
})]
142+
});
143+
113144
}
114145
}

infrastructure-cdk/lib/infra-pipeline-stack.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
1-
import {Stack, StackProps} from 'aws-cdk-lib';
1+
import {Fn, Stack, StackProps} from 'aws-cdk-lib';
22
import {Construct} from 'constructs';
3-
import {CodePipeline, ShellStep} from "aws-cdk-lib/pipelines";
3+
import {CodePipeline, CodePipelineSource, ShellStep} from "aws-cdk-lib/pipelines";
44
import {RealtimeApplication} from "./real-time-application";
5-
import {REPOSITORY_FILE_PATH} from "./shared-vars";
5+
import {Bucket} from "aws-cdk-lib/aws-s3";
6+
import {BUCKET_NAME_OUTPUT} from "aws-cdk/lib";
7+
import {SOURCE_CODE_ZIP} from "./shared-vars";
68

79
export class InfraPipelineStack extends Stack {
810
constructor(scope: Construct, id: string, props?: StackProps) {
911
super(scope, id, props);
1012

13+
const artifactBucket = Bucket.fromBucketName(this, 'artifactBucket-import', Fn.importValue(BUCKET_NAME_OUTPUT))
1114

1215
const pipeline = new CodePipeline(this, 'Pipeline', {
1316
selfMutation: false,
1417
synth: new ShellStep('Synth', {
18+
input: CodePipelineSource.s3(artifactBucket, SOURCE_CODE_ZIP),
1519
commands: [
16-
"curl " + REPOSITORY_FILE_PATH + " --output app.zip",
17-
"unzip app.zip",
1820
"cd infrastructure-cdk",
1921
"npm ci",
2022
"npm run build",
Lines changed: 3 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,12 @@
1-
import {Aws, Fn, Stage, StageProps} from "aws-cdk-lib";
1+
import {Stage, StageProps} from "aws-cdk-lib";
22
import {Construct} from "constructs";
3-
import {Code, Function, Runtime} from "aws-cdk-lib/aws-lambda";
4-
import {Stream, StreamEncryption, StreamMode} from "aws-cdk-lib/aws-kinesis";
5-
import * as kda from "@aws-cdk/aws-kinesisanalytics-flink-alpha";
6-
import {APPLICATION_NAME, ASSET_BUCKET_EXPORT_NAME} from "./shared-vars";
7-
import {Bucket} from "aws-cdk-lib/aws-s3";
3+
import {ApplicationStack} from "./application-stack";
84

95

106
export class RealtimeApplication extends Stage {
117
constructor(scope: Construct, id: string, props?: StageProps) {
128
super(scope, id, props);
139

14-
const assetBucket = Bucket.fromBucketName(this, 'imported-asset-bucket', Fn.importValue(ASSET_BUCKET_EXPORT_NAME));
15-
16-
const stream = new Stream(this, 'raw', {
17-
streamMode: StreamMode.PROVISIONED,
18-
shardCount: 1,
19-
encryption: StreamEncryption.MANAGED
20-
});
21-
22-
const dataSourceFn = new Function(this, 'data-source', {
23-
code: Code.fromInline(
24-
"import boto3\n" +
25-
"import os\n\n" +
26-
"kinesis = boto3.client('kinesis')\n\n" +
27-
"def lambda_handler(event, context):\n" +
28-
" kinesis.put_record(StreamName=os.environ['STREAM_NAME']), Data=b'{\"message\":\"This is the test message from the lambda\"}', PartitionKey='default')\n"
29-
),
30-
handler: "index.lambda_handler",
31-
runtime: Runtime.PYTHON_3_9,
32-
environment: {
33-
STREAM_NAME: stream.streamName
34-
}
35-
});
36-
37-
stream.grantWrite(dataSourceFn);
38-
39-
const application = new kda.Application(this, 'app', {
40-
code: kda.ApplicationCode.fromBucket(assetBucket, "jars/" + APPLICATION_NAME + "-1.0.0.jar"),
41-
runtime: kda.Runtime.FLINK_1_13,
42-
propertyGroups: {
43-
"KinesisReader": {
44-
"input.stream.name": stream.streamName,
45-
"aws.region": Aws.REGION,
46-
"flink.stream.initpos": "LATEST"
47-
},
48-
},
49-
snapshotsEnabled: false,
50-
parallelismPerKpu: 1
51-
}
52-
);
53-
54-
stream.grantRead(application);
10+
new ApplicationStack(this, 'ApplicationStack');
5511
}
5612
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
export const REPOSITORY_FILE_PATH = "aws-samples/automate-deployment-and-version-update-of-kda-application"
21
export const APPLICATION_NAME = "kinesis-analytics-application"
3-
export const ASSET_BUCKET_EXPORT_NAME = "Blog::Artefact::BucketName"
2+
export const SOURCE_CODE_ZIP = "automate-deployment-and-version-update-of-kda-application.zip"
3+
export const ASSET_BUCKET_EXPORT_NAME = "Blog::Artifact::BucketName"

0 commit comments

Comments
 (0)