Skip to content

Commit 4f52f64

Browse files
feat!: enabling group event lambda for oxbow module (#25)
1 parent 5beb00b commit 4f52f64

File tree

3 files changed

+131
-33
lines changed

3 files changed

+131
-33
lines changed

main.tf

Lines changed: 129 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,13 @@ resource "aws_lambda_function" "this_lambda" {
9696
AWS_S3_LOCKING_PROVIDER = var.aws_s3_locking_provider
9797
RUST_LOG = "deltalake=${var.rust_log_deltalake_debug_level},oxbow=${var.rust_log_oxbow_debug_level}"
9898
DYNAMO_LOCK_TABLE_NAME = var.dynamodb_table_name
99-
UNWRAP_SNS_ENVELOPE = var.sns_topic_arn == "" ? false : true
99+
UNWRAP_SNS_ENVELOPE = var.enable_group_events == true ? false : var.sns_topic_arn == "" ? false : true
100100
}
101101
}
102102
tags = var.tags
103103
}
104-
105-
resource "aws_lambda_function" "this_group_events_lambda" {
104+
#### This lambda is optional and used only when grouping of events is required
105+
resource "aws_lambda_function" "group_events_lambda" {
106106
count = local.enable_group_events ? 1 : 0
107107
description = "Group events for oxbow based on the table prefix"
108108
s3_key = var.events_lambda_s3_key
@@ -114,56 +114,153 @@ resource "aws_lambda_function" "this_group_events_lambda" {
114114

115115
environment {
116116
variables = {
117-
RUST_LOG = "group-events=${var.rust_log_oxbow_debug_level}"
118-
QUEUE_URL = aws_sqs_queue.this_sqs_fifo[0].url
117+
RUST_LOG = var.rust_log_oxbow_debug_level
118+
QUEUE_URL = aws_sqs_queue.oxbow_lambda_fifo_sqs[0].url
119+
UNWRAP_SNS_ENVELOPE = var.sns_topic_arn == "" ? false : true
119120
}
120121
}
121122
}
122123

123-
resource "aws_sqs_queue" "this_sqs_fifo" {
124-
count = local.enable_group_events ? 1 : 0
125-
name = "${var.sqs_fifo_queue_name}.fifo"
126-
policy = data.aws_iam_policy_document.this_sqs_queue_policy_data.json
127124

125+
data "aws_iam_policy_document" "oxbow_lambda_fifo_sqs" {
126+
count = local.enable_group_events ? 1 : 0
127+
statement {
128+
effect = "Allow"
129+
principals {
130+
type = "*"
131+
identifiers = ["*"]
132+
}
133+
actions = ["sqs:SendMessage", "sqs:ReceiveMessage"]
134+
# Hard-coding an ARN like syntax here because of the dependency cycle
135+
resources = ["arn:aws:sqs:*:*:${var.sqs_fifo_queue_name}.fifo"]
136+
137+
condition {
138+
test = "ArnEquals"
139+
variable = "aws:SourceArn"
140+
values = [var.warehouse_bucket_arn]
141+
}
142+
}
143+
}
144+
145+
data "aws_iam_policy_document" "oxbow_lambda_fifo_sqs_dlq" {
146+
count = local.enable_group_events ? 1 : 0
147+
statement {
148+
sid = "DLQSendMessages"
149+
effect = "Allow"
150+
principals {
151+
type = "AWS"
152+
identifiers = ["*"]
153+
}
154+
actions = ["sqs:SendMessage", "sqs:ReceiveMessage"]
155+
resources = ["arn:aws:sqs:*:*:${var.sqs_fifo_DL_queue_name}"]
156+
condition {
157+
test = "ForAllValues:StringEquals"
158+
variable = "aws:SourceArn"
159+
values = [
160+
"arn:aws:sqs:*:*:${var.sqs_fifo_DL_queue_name}.fifo"
161+
]
162+
}
163+
}
164+
}
165+
166+
resource "aws_sqs_queue" "oxbow_lambda_fifo_sqs" {
167+
count = local.enable_group_events ? 1 : 0
168+
name = "${var.sqs_fifo_queue_name}.fifo"
169+
policy = data.aws_iam_policy_document.oxbow_lambda_fifo_sqs[0].json
170+
visibility_timeout_seconds = var.sqs_visibility_timeout_seconds
171+
delay_seconds = var.sqs_delay_seconds
128172
content_based_deduplication = true
129173
fifo_queue = true
130-
174+
tags = var.tags
131175
redrive_policy = jsonencode({
132-
deadLetterTargetArn = aws_sqs_queue.this_sqs_fifo_dlq[0].arn
176+
deadLetterTargetArn = aws_sqs_queue.oxbow_lambda_fifo_sqs_dlq[0].arn
133177
maxReceiveCount = 8
134178
})
135179
}
136180

137-
resource "aws_sqs_queue" "this_sqs_fifo_dlq" {
181+
resource "aws_sqs_queue" "oxbow_lambda_fifo_sqs_dlq" {
138182
count = local.enable_group_events ? 1 : 0
139183
name = "${var.sqs_fifo_DL_queue_name}.fifo"
184+
policy = data.aws_iam_policy_document.oxbow_lambda_fifo_sqs_dlq[0].json
140185
fifo_queue = true
186+
tags = var.tags
141187
}
142188

143-
resource "aws_lambda_event_source_mapping" "this_group_events_trigger" {
189+
resource "aws_lambda_event_source_mapping" "group_events_lambda_sqs_trigger" {
144190
count = local.enable_group_events ? 1 : 0
145-
event_source_arn = aws_sqs_queue.this_group_events[0].arn
146-
function_name = aws_lambda_function.this_group_events_lambda[0].arn
191+
event_source_arn = aws_sqs_queue.group_events_lambda_sqs[0].arn
192+
function_name = aws_lambda_function.group_events_lambda[0].arn
193+
147194
}
148195

149-
resource "aws_sqs_queue" "this_group_events" {
150-
count = local.enable_group_events ? 1 : 0
151-
name = var.sqs_group_queue_name
152-
policy = data.aws_iam_policy_document.this_sqs_queue_policy_data.json
153196

197+
data "aws_iam_policy_document" "group_event_lambda_sqs" {
198+
count = local.enable_group_events ? 1 : 0
199+
statement {
200+
effect = "Allow"
201+
principals {
202+
type = "*"
203+
identifiers = ["*"]
204+
}
205+
actions = ["sqs:SendMessage", "sqs:ReceiveMessage"]
206+
# Hard-coding an ARN like syntax here because of the dependency cycle
207+
resources = ["arn:aws:sqs:*:*:${var.sqs_group_queue_name}"]
208+
condition {
209+
test = "ArnEquals"
210+
variable = "aws:SourceArn"
211+
values = [var.warehouse_bucket_arn]
212+
}
213+
}
214+
}
215+
216+
data "aws_iam_policy_document" "group_event_lambda_sqs_dlq" {
217+
count = local.enable_group_events ? 1 : 0
218+
statement {
219+
sid = "DLQSendMessages"
220+
effect = "Allow"
221+
principals {
222+
type = "AWS"
223+
identifiers = ["*"]
224+
}
225+
actions = ["sqs:SendMessage", "sqs:ReceiveMessage"]
226+
resources = ["arn:aws:sqs:*:*:${var.sqs_group_DL_queue_name}"]
227+
condition {
228+
test = "ForAllValues:StringEquals"
229+
variable = "aws:SourceArn"
230+
values = [
231+
"arn:aws:sqs:*:*:${var.sqs_group_queue_name}"
232+
]
233+
}
234+
}
235+
}
236+
237+
238+
resource "aws_sqs_queue" "group_events_lambda_sqs" {
239+
count = local.enable_group_events ? 1 : 0
240+
name = var.sqs_group_queue_name
241+
policy = var.sns_topic_arn == "" ? data.aws_iam_policy_document.group_event_lambda_sqs[0].json : data.aws_iam_policy_document.this_sns_to_sqs[0].json
242+
visibility_timeout_seconds = var.sqs_visibility_timeout_seconds
243+
delay_seconds = var.sqs_delay_seconds
154244
redrive_policy = jsonencode({
155-
deadLetterTargetArn = aws_sqs_queue.this_group_events_dlq[0].arn
245+
deadLetterTargetArn = aws_sqs_queue.group_events_lambda_sqs_dlq[0].arn
156246
maxReceiveCount = 8
157247
})
248+
tags = var.tags
158249
}
159250

160-
resource "aws_sqs_queue" "this_group_events_dlq" {
161-
count = local.enable_group_events ? 1 : 0
162-
name = var.sqs_group_DL_queue_name
251+
resource "aws_sqs_queue" "group_events_lambda_sqs_dlq" {
252+
count = local.enable_group_events ? 1 : 0
253+
policy = data.aws_iam_policy_document.group_event_lambda_sqs_dlq[0].json
254+
name = var.sqs_group_DL_queue_name
255+
tags = var.tags
163256
}
164257

258+
259+
260+
### This is to ensure we are triggering oxbow lambda properly whether grou event is enable or not
261+
### if group event is enabled we are using the fifo queue populated by group events as a source for oxbow
165262
resource "aws_lambda_event_source_mapping" "this_lambda_events" {
166-
event_source_arn = local.enable_group_events ? aws_sqs_queue.this_sqs_fifo[0].arn : aws_sqs_queue.this_sqs[0].arn
263+
event_source_arn = local.enable_group_events ? aws_sqs_queue.oxbow_lambda_fifo_sqs[0].arn : aws_sqs_queue.this_sqs[0].arn
167264
function_name = aws_lambda_function.this_lambda.arn
168265
}
169266

@@ -192,7 +289,7 @@ resource "aws_sns_topic_subscription" "this_sns_sub" {
192289

193290
topic_arn = var.sns_topic_arn
194291
protocol = "sqs"
195-
endpoint = aws_sqs_queue.this_sqs[0].arn
292+
endpoint = local.enable_group_events ? aws_sqs_queue.group_events_lambda_sqs[0].arn : aws_sqs_queue.this_sqs[0].arn
196293
}
197294

198295
resource "aws_lambda_permission" "this_lambda_allow_bucket_permissions" {
@@ -209,7 +306,7 @@ resource "aws_s3_bucket_notification" "this_bucket_notification" {
209306
count = local.enable_bucket_notification ? 1 : 0
210307
bucket = var.warehouse_bucket_name
211308
queue {
212-
queue_arn = local.enable_group_events ? aws_sqs_queue.this_group_events[0].arn : aws_sqs_queue.this_sqs[0].arn
309+
queue_arn = local.enable_group_events ? aws_sqs_queue.group_events_lambda_sqs[0].arn : aws_sqs_queue.this_sqs[0].arn
213310
events = ["s3:ObjectCreated:*"]
214311
filter_suffix = ".parquet"
215312
filter_prefix = "${var.s3_path}/"
@@ -277,7 +374,7 @@ resource "aws_iam_policy" "this_lambda_permissions" {
277374
},
278375
{
279376
Action = ["sqs:*"]
280-
Resource = local.enable_group_events ? aws_sqs_queue.this_group_events[0].arn : aws_sqs_queue.this_sqs[0].arn
377+
Resource = local.enable_group_events ? [aws_sqs_queue.group_events_lambda_sqs[0].arn, aws_sqs_queue.oxbow_lambda_fifo_sqs[0].arn] : [aws_sqs_queue.this_sqs[0].arn]
281378
Effect = "Allow"
282379
},
283380
{
@@ -302,7 +399,7 @@ data "aws_iam_policy_document" "this_sqs_queue_policy_data" {
302399
}
303400
actions = ["sqs:SendMessage"]
304401
# Hard-coding an ARN like syntax here because of the dependency cycle
305-
resources = local.enable_group_events ? ["arn:aws:sqs:*:*:${var.sqs_group_queue_name}", "arn:aws:sqs:*:*:${var.sqs_fifo_queue_name}.fifo"] : ["arn:aws:sqs:*:*:${var.sqs_queue_name}"]
402+
resources = ["arn:aws:sqs:*:*:${var.sqs_queue_name}"]
306403

307404
condition {
308405
test = "ArnEquals"
@@ -312,6 +409,7 @@ data "aws_iam_policy_document" "this_sqs_queue_policy_data" {
312409
}
313410
}
314411

412+
315413
data "aws_iam_policy_document" "this_sns_to_sqs" {
316414
count = var.sns_topic_arn == "" ? 0 : 1
317415

@@ -322,7 +420,7 @@ data "aws_iam_policy_document" "this_sns_to_sqs" {
322420
identifiers = ["*"]
323421
}
324422
actions = ["sqs:SendMessage"]
325-
resources = local.enable_group_events ? ["arn:aws:sqs:*:*:${var.sqs_group_queue_name}", "arn:aws:sqs:*:*:${var.sqs_fifo_queue_name}.fifo"] : ["arn:aws:sqs:*:*:${var.sqs_queue_name}"]
423+
resources = local.enable_group_events ? ["arn:aws:sqs:*:*:${var.sqs_group_queue_name}"] : ["arn:aws:sqs:*:*:${var.sqs_queue_name}"]
326424
condition {
327425
test = "ArnEquals"
328426
variable = "aws:SourceArn"
@@ -332,7 +430,6 @@ data "aws_iam_policy_document" "this_sns_to_sqs" {
332430

333431
}
334432

335-
336433
data "aws_iam_policy_document" "this_dead_letter_queue_policy" {
337434
statement {
338435
sid = "DLQSendMessages"
@@ -344,7 +441,7 @@ data "aws_iam_policy_document" "this_dead_letter_queue_policy" {
344441
actions = [
345442
"sqs:SendMessage"
346443
]
347-
resources = local.enable_group_events ? ["arn:aws:sqs:*:*:${var.sqs_group_DL_queue_name}", "arn:aws:sqs:*:*:${var.sqs_fifo_DL_queue_name}.fifo"] : ["arn:aws:sqs:*:*:${var.sqs_queue_name_dl}"]
444+
resources = ["arn:aws:sqs:*:*:${var.sqs_queue_name_dl}"]
348445
condition {
349446
test = "ForAllValues:StringEquals"
350447
variable = "aws:SourceArn"
@@ -355,6 +452,7 @@ data "aws_iam_policy_document" "this_dead_letter_queue_policy" {
355452
}
356453
}
357454

455+
358456
data "aws_iam_policy_document" "this_kinesis_policy_data" {
359457
count = local.enable_kinesis_firehose_delivery_stream ? 1 : 0
360458
statement {

monitoring.tf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ resource "datadog_monitor" "dead_letters_monitor" {
1111
type = "metric alert"
1212
name = "${var.sqs_queue_name_dl}-monitor"
1313
message = templatefile("${path.module}/templates/dl_monitor.tmpl", {
14-
dead_letters_queue_name = var.sqs_queue_name_dl
14+
dead_letters_queue_name = local.enable_group_events ? var.sqs_fifo_DL_queue_name : var.sqs_queue_name_dl
1515
notify = join(", ", var.dl_alert_recipients)
1616
})
1717
query = "avg(last_1h):avg:aws.sqs.approximate_number_of_messages_visible{queuename:${var.sqs_queue_name_dl}} > ${var.dl_critical}"

outputs.tf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ output "lambda_arn" {
1010

1111
output "sqs_queue_arn" {
1212
description = "SQSqueue ARN"
13-
value = length(aws_sqs_queue.this_sqs) > 0 ? aws_sqs_queue.this_sqs[0].arn : aws_sqs_queue.this_sqs_fifo[0].arn
13+
value = length(aws_sqs_queue.this_sqs) > 0 ? aws_sqs_queue.this_sqs[0].arn : aws_sqs_queue.oxbow_lambda_fifo_sqs[0].arn
1414
}
1515

1616
output "autotag_sqs_arn" {

0 commit comments

Comments
 (0)