11# This module creates Kinesis Firehose service (optionally), SQS, lambda function OXBOW
22# to receive data and convert it into parquet then Delta log is added by Oxbow lambda
33data "aws_caller_identity" "current" {}
4+ data "aws_region" "current" {}
45
56locals {
67 enable_aws_glue_catalog_table = var. enable_aws_glue_catalog_table
78 enable_kinesis_firehose_delivery_stream = var. enable_kinesis_firehose_delivery_stream
89 enable_bucket_notification = var. enable_bucket_notification
910 enable_group_events = var. enable_group_events
11+ enable_glue_create = var. enable_glue_create
1012}
1113
1214
@@ -235,7 +237,6 @@ data "aws_iam_policy_document" "group_event_lambda_sqs_dlq" {
235237 }
236238}
237239
238-
239240resource "aws_sqs_queue" "group_events_lambda_sqs" {
240241 count = local. enable_group_events ? 1 : 0
241242 name = var. sqs_group_queue_name
@@ -258,7 +259,7 @@ resource "aws_sqs_queue" "group_events_lambda_sqs_dlq" {
258259
259260
260261
261- # ## This is to ensure we are triggering oxbow lambda properly whether grou event is enable or not
262+ # ## This is to ensure we are triggering oxbow lambda properly whether group event is enable or not
262263# ## if group event is enabled we are using the fifo queue populated by group events as a source for oxbow
263264resource "aws_lambda_event_source_mapping" "this_lambda_events" {
264265 event_source_arn = local. enable_group_events ? aws_sqs_queue. oxbow_lambda_fifo_sqs [0 ]. arn : aws_sqs_queue. this_sqs [0 ]. arn
@@ -536,3 +537,288 @@ resource "aws_dynamodb_table" "this_oxbow_locking" {
536537 }
537538 tags = var. tags
538539}
540+
541+ # glue-create lambda resource
542+ module "glue_create_athena_workgroup_bucket" {
543+ count = local. enable_glue_create ? 1 : 0
544+
545+ source = " terraform-aws-modules/s3-bucket/aws"
546+ version = " 4.1.2"
547+ bucket = var. glue_create_config . athena_bucket_name
548+ block_public_acls = true
549+ block_public_policy = true
550+ ignore_public_acls = true
551+ restrict_public_buckets = true
552+ control_object_ownership = true
553+ object_ownership = " BucketOwnerEnforced"
554+ tags = var. tags
555+ versioning = {
556+ enabled = false
557+ }
558+ }
559+
560+ resource "aws_athena_workgroup" "glue_create" {
561+ count = local. enable_glue_create ? 1 : 0
562+
563+ name = var. glue_create_config . athena_workgroup_name
564+ tags = var. tags
565+ configuration {
566+ enforce_workgroup_configuration = true
567+ publish_cloudwatch_metrics_enabled = false
568+
569+ result_configuration {
570+ output_location = " s3://${ module . glue_create_athena_workgroup_bucket [0 ]. s3_bucket_id } /"
571+ }
572+ }
573+ depends_on = [module . glue_create_athena_workgroup_bucket ]
574+ }
575+
576+ data "aws_iam_policy_document" "glue_create_sqs" {
577+ count = local. enable_glue_create ? 1 : 0
578+
579+ statement {
580+ effect = " Allow"
581+ principals {
582+ type = " *"
583+ identifiers = [" *" ]
584+ }
585+ actions = [" sqs:SendMessage" ]
586+ resources = [" arn:aws:sqs:*:*:${ var . glue_create_config . sqs_queue_name } " ]
587+ condition {
588+ test = " ArnEquals"
589+ variable = " aws:SourceArn"
590+ values = [var . glue_create_config . sns_topic_arn ]
591+ }
592+ }
593+ }
594+
595+ data "aws_iam_policy_document" "glue_create_sqs_dl" {
596+ count = local. enable_glue_create ? 1 : 0
597+
598+ statement {
599+ effect = " Allow"
600+ principals {
601+ type = " AWS"
602+ identifiers = [" *" ]
603+ }
604+ actions = [" sqs:SendMessage" ]
605+ resources = [" arn:aws:sqs:*:*:${ var . glue_create_config . sqs_queue_name_dl } " ]
606+ condition {
607+ test = " ForAllValues:StringEquals"
608+ variable = " aws:SourceArn"
609+ values = [" arn:aws:sqs:*:*:${ var . glue_create_config . sqs_queue_name } " ]
610+ }
611+ }
612+ }
613+
614+ resource "aws_sqs_queue" "glue_create" {
615+ count = local. enable_glue_create ? 1 : 0
616+
617+ name = var. glue_create_config . sqs_queue_name
618+ policy = data. aws_iam_policy_document . glue_create_sqs [0 ]. json
619+ visibility_timeout_seconds = var. sqs_visibility_timeout_seconds
620+ delay_seconds = var. sqs_delay_seconds
621+ redrive_policy = jsonencode ({
622+ deadLetterTargetArn = aws_sqs_queue.glue_create_dl[0 ].arn
623+ maxReceiveCount = var.sqs_redrive_policy_maxReceiveCount
624+ })
625+ tags = var. tags
626+ }
627+
628+ resource "aws_sqs_queue" "glue_create_dl" {
629+ count = local. enable_glue_create ? 1 : 0
630+
631+ name = var. glue_create_config . sqs_queue_name_dl
632+ policy = data. aws_iam_policy_document . glue_create_sqs_dl [0 ]. json
633+ tags = var. tags
634+ }
635+
636+ resource "aws_sqs_queue_redrive_allow_policy" "terraform_queue_redrive_allow_policy" {
637+ count = local. enable_glue_create ? 1 : 0
638+
639+ queue_url = aws_sqs_queue. glue_create_dl [0 ]. id
640+ redrive_allow_policy = jsonencode ({
641+ redrivePermission = " byQueue" ,
642+ sourceQueueArns = [aws_sqs_queue.glue_create[0 ].arn]
643+ })
644+ }
645+
646+ resource "aws_sns_topic_subscription" "glue_create_sns_sub" {
647+ count = local. enable_glue_create ? 1 : 0
648+
649+ topic_arn = var. glue_create_config . sns_topic_arn
650+ protocol = " sqs"
651+ endpoint = aws_sqs_queue. glue_create [0 ]. arn
652+ }
653+
654+ data "aws_iam_policy_document" "glue_create_assume" {
655+ count = local. enable_glue_create ? 1 : 0
656+
657+ statement {
658+ effect = " Allow"
659+ principals {
660+ type = " Service"
661+ identifiers = [" lambda.amazonaws.com" ]
662+ }
663+ actions = [
664+ " sts:AssumeRole" ,
665+ ]
666+ }
667+ }
668+
669+ data "aws_iam_policy_document" "glue_create" {
670+ count = local. enable_glue_create ? 1 : 0
671+
672+ statement {
673+ sid = " AthenaWorkgroupAthenaRW"
674+ actions = [
675+ " athena:StartQueryExecution" ,
676+ " athena:GetQueryResults" ,
677+ " athena:GetWorkGroup" ,
678+ " athena:StopQueryExecution" ,
679+ " athena:GetQueryExecution" ,
680+ ]
681+ resources = [
682+ aws_athena_workgroup . glue_create [0 ]. arn
683+ ]
684+ effect = " Allow"
685+ }
686+ statement {
687+ sid = " AthenaWorkgroupS3RW"
688+ effect = " Allow"
689+ actions = [
690+ " s3:PutObject" ,
691+ " s3:GetObject" ,
692+ " s3:AbortMultipartUpload" ,
693+ " s3:GetBucketLocation"
694+ ]
695+ resources = [
696+ " ${ module . glue_create_athena_workgroup_bucket [0 ]. s3_bucket_arn } /*" ,
697+ module . glue_create_athena_workgroup_bucket [0 ]. s3_bucket_arn
698+ ]
699+ }
700+ statement {
701+ sid = " AthenaWorkgroupList1"
702+ effect = " Allow"
703+ actions = [" athena:ListWorkGroups" ]
704+ resources = [" *" ]
705+ }
706+ statement {
707+ sid = " GlueAllowTables"
708+ effect = " Allow"
709+ actions = [
710+ " glue:GetTable" ,
711+ " glue:GetTables" ,
712+ " glue:GetPartitions" ,
713+ " glue:CreateTable"
714+ ]
715+ resources = [
716+ " arn:aws:glue:${ data . aws_region . current . name } :${ data . aws_caller_identity . current . account_id } :catalog" ,
717+ " arn:aws:glue:${ data . aws_region . current . name } :${ data . aws_caller_identity . current . account_id } :database/*" ,
718+ " arn:aws:glue:${ data . aws_region . current . name } :${ data . aws_caller_identity . current . account_id } :table/*"
719+ ]
720+ }
721+ statement {
722+ sid = " GlueCatalogAllowDatabases"
723+ effect = " Allow"
724+ actions = [
725+ " glue:GetDatabase" ,
726+ " glue:GetDatabases" ,
727+ ]
728+ resources = [
729+ " *"
730+ ]
731+ }
732+ statement {
733+ sid = " TableExtLocS3RO"
734+ effect = " Allow"
735+ actions = [
736+ " s3:GetObject" ,
737+ " s3:GetObjectTagging" ,
738+ " s3:GetObjectVersion" ,
739+ " s3:GetBucketLocation" ,
740+ " s3:ListBucket" ,
741+ " s3:ListBucketVersions"
742+ ]
743+ resources = [
744+ var . warehouse_bucket_arn ,
745+ " ${ var . warehouse_bucket_arn } /${ var . s3_path } /*"
746+ ]
747+ }
748+ statement {
749+ effect = " Allow"
750+ principals {
751+ type = " *"
752+ identifiers = [" *" ]
753+ }
754+ actions = [" sqs:ReceiveMessage" ]
755+ resources = [aws_sqs_queue . glue_create [0 ]. arn ]
756+
757+ condition {
758+ test = " ArnEquals"
759+ variable = " aws:SourceArn"
760+ values = [var . warehouse_bucket_arn ]
761+ }
762+ }
763+ statement {
764+ effect = " Allow"
765+ principals {
766+ type = " AWS"
767+ identifiers = [" *" ]
768+ }
769+ actions = [
770+ " sqs:SendMessage"
771+ ]
772+ resources = [aws_sqs_queue . glue_create [0 ]. arn ]
773+ condition {
774+ test = " ForAllValues:StringEquals"
775+ variable = " aws:SourceArn"
776+ values = [aws_sqs_queue . glue_create_dl [0 ]. arn ]
777+ }
778+ }
779+ }
780+
781+ resource "aws_iam_policy" "glue_create_managed" {
782+ count = local. enable_glue_create ? 1 : 0
783+
784+ name = var. glue_create_config . iam_police_name
785+ description = " Glue create policy allows access to Athena and S3"
786+ policy = data. aws_iam_policy_document . glue_create [0 ]. json
787+ tags = var. tags
788+ }
789+
790+ resource "aws_iam_role" "glue_create" {
791+ name = var. glue_create_config . iam_role_name
792+ assume_role_policy = data. aws_iam_policy_document . glue_create_assume [0 ]. json
793+ managed_policy_arns = [aws_iam_policy . glue_create_managed [0 ]. arn ]
794+ tags = var. tags
795+ }
796+
797+ resource "aws_lambda_function" "glue_create_lambda" {
798+ count = local. enable_glue_create ? 1 : 0
799+
800+ description = " Greate tables in AWS Glue catalog based on the table prefix"
801+ s3_key = var. glue_create_config . lambda_s3_key
802+ s3_bucket = var. glue_create_config . lambda_s3_bucket
803+ function_name = var. glue_create_config . lambda_function_name
804+ role = aws_iam_role. glue_create . arn
805+ handler = " provided"
806+ runtime = " provided.al2"
807+
808+ environment {
809+ variables = {
810+ RUST_LOG = var.rust_log_oxbow_debug_level
811+ ATHENA_WORKGROUP = var.glue_create_config.athena_workgroup_name
812+ ATHENA_DATA_SOURCE = var.glue_create_config.athena_data_source
813+ GLUE_PATH_REGEX = var.glue_create_config.path_regex
814+ UNWRAP_SNS_ENVELOPE = true
815+ }
816+ }
817+ }
818+
819+ resource "aws_lambda_event_source_mapping" "glue_create" {
820+ count = local. enable_glue_create ? 1 : 0
821+
822+ event_source_arn = aws_sqs_queue. glue_create [0 ]. arn
823+ function_name = aws_lambda_function. glue_create_lambda [0 ]. arn
824+ }
0 commit comments