diff --git a/app.py b/app.py index ffec821..fe28465 100644 --- a/app.py +++ b/app.py @@ -42,7 +42,8 @@ env=environment, ) -# Use existing system-defined inference profile for Claude 4 Sonnet (200k context) +# Use existing system-defined inference profile for Claude 4 Sonnet (200k +# context) bedrock_inference_profile_arn = f"arn:aws:bedrock:{config.env_region}:{config.env_account}:inference-profile/us.anthropic.claude-sonnet-4-20250514-v1:0" # Step Functions Stack for orchestration diff --git a/poetry.lock b/poetry.lock index 3809fd3..4e11a78 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. [[package]] name = "annotated-types" @@ -83,39 +83,39 @@ typeguard = ">=2.13.3,<5.0.0" [[package]] name = "aws-cdk-cloud-assembly-schema" -version = "45.2.0" +version = "48.6.0" description = "Schema for the protocol between CDK framework and CDK CLI" optional = false python-versions = "~=3.9" groups = ["main"] files = [ - {file = "aws_cdk_cloud_assembly_schema-45.2.0-py3-none-any.whl", hash = "sha256:aeb88c5e3c36eecae341ddca568907b5d70b9b67de51572ef51564dd486dbb48"}, - {file = "aws_cdk_cloud_assembly_schema-45.2.0.tar.gz", hash = "sha256:7affdd151be58028d31fcbbbd084c07ea2e9d433979d0653ed43e15a307015cd"}, + {file = "aws_cdk_cloud_assembly_schema-48.6.0-py3-none-any.whl", hash = "sha256:ee4a3014446bdd68de200dfb4086737f671ccaa7b3a1ec39653720d3e521f835"}, + {file = "aws_cdk_cloud_assembly_schema-48.6.0.tar.gz", hash = "sha256:716d27b6fdb05afd9981567804d91af5780e2f7f95fd9113e697380a1b69a623"}, ] [package.dependencies] -jsii = ">=1.112.0,<2.0.0" +jsii = ">=1.113.0,<2.0.0" publication = ">=0.0.3" typeguard = ">=2.13.3,<4.3.0" [[package]] name = "aws-cdk-lib" -version = "2.206.0" +version = "2.213.0" description = "Version 2 of the AWS Cloud Development Kit library" optional = false python-versions = "~=3.9" groups = ["main"] files = [ - {file = "aws_cdk_lib-2.206.0-py3-none-any.whl", hash = "sha256:930b0268186e5d2234622fe9953bf79f79a6bc1d06575a6f0ef85d197b853ed2"}, - {file = "aws_cdk_lib-2.206.0.tar.gz", hash = "sha256:73a5470c879252abb2e5fd1a8ea31707851400431e9e8e2e3c7eb2c9f8e88db7"}, + {file = "aws_cdk_lib-2.213.0-py3-none-any.whl", hash = "sha256:f78dfad4b852f5954d38f69924f097284bce4375b70fff0812431665c617b702"}, + {file = "aws_cdk_lib-2.213.0.tar.gz", hash = "sha256:7c0a4bb09526f72d8b1e12db9cbd0734d03b424fe3edb34811805e608aa18de3"}, ] [package.dependencies] "aws-cdk.asset-awscli-v1" = "2.2.242" "aws-cdk.asset-node-proxy-agent-v6" = ">=2.1.0,<3.0.0" -"aws-cdk.cloud-assembly-schema" = ">=45.0.0,<46.0.0" +"aws-cdk.cloud-assembly-schema" = ">=48.3.0,<49.0.0" constructs = ">=10.0.0,<11.0.0" -jsii = ">=1.112.0,<2.0.0" +jsii = ">=1.113.0,<2.0.0" publication = ">=0.0.3" typeguard = ">=2.13.3,<4.3.0" @@ -547,19 +547,19 @@ files = [ [[package]] name = "jsii" -version = "1.112.0" +version = "1.113.0" description = "Python client for jsii runtime" optional = false python-versions = "~=3.9" groups = ["main"] files = [ - {file = "jsii-1.112.0-py3-none-any.whl", hash = "sha256:6510c223074d9b206fd0570849a791e4d9ecfff7ffe68428de73870cea9f55a1"}, - {file = "jsii-1.112.0.tar.gz", hash = "sha256:6b7d19f361c2565b76828ecbe8cbed8b8d6028a82aa98a46b206a4ee5083157e"}, + {file = "jsii-1.113.0-py3-none-any.whl", hash = "sha256:62377c651554234ea945693f7c03cb96a969ba425a686950c88d43b0d4d76b07"}, + {file = "jsii-1.113.0.tar.gz", hash = "sha256:2dedea9d6006af53467a7a67f1d35a56ab3f75a3d6ed4b4536fffc3e1d1fe476"}, ] [package.dependencies] attrs = ">=21.2,<26.0" -cattrs = ">=1.8,<24.2" +cattrs = ">=1.8,<25.2" importlib_resources = ">=5.2.0" publication = ">=0.0.3" python-dateutil = "*" @@ -1115,4 +1115,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "f9a392c1e33ff23cf95afa8ac210f46578146c7c64e072dfbcee01674a630393" +content-hash = "359da89b7789c23d49f7495dd02d40b2aaa3e6042935415e327d25f1f111a337" diff --git a/pyproject.toml b/pyproject.toml index 129b55f..471eb0e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ python = "^3.12" boto3 = "^1.39.12" constructs = "^10.4.2" pytest = "^8.4.1" -aws-cdk-lib = "^2.206.0" +aws-cdk-lib = "^2.213.0" pydantic = "^2.11.7" pyjwt = "^2.10.1" cryptography = "^45.0.5" diff --git a/src/functions/answer_scorer/main.py b/src/functions/answer_scorer/main.py index 508a229..429b9b1 100644 --- a/src/functions/answer_scorer/main.py +++ b/src/functions/answer_scorer/main.py @@ -11,53 +11,57 @@ def handler(event, context): """ Process DynamoDB stream events from interview_qa table. Score answers against vacancy requirements using Claude 4 Sonnet. - + Triggered when new Q&A pairs are inserted into the interview_qa table. """ - + dynamodb = boto3.resource('dynamodb') bedrock = boto3.client('bedrock-runtime') - + processed_records = 0 - + for record in event['Records']: try: # Only process INSERT events (new Q&A pairs) if record['eventName'] != 'INSERT': continue - + # Extract Q&A data from DynamoDB stream record qa_data = record['dynamodb']['NewImage'] - + qa_id = qa_data['id']['S'] interview_id = qa_data['interview_id']['S'] question = qa_data['question']['S'] answer = qa_data['answer']['S'] - + print(f"Processing Q&A scoring for: {qa_id}") - + # Skip if already scored (has answer_score) if 'answer_score' in qa_data: print(f"Q&A {qa_id} already scored, skipping") continue - + # Get vacancy requirements from interview_transcriptions table - position_description = get_position_description(dynamodb, interview_id) - + position_description = get_position_description( + dynamodb, interview_id) + # Score the answer using Claude 4 Sonnet - score_result = score_answer(bedrock, question, answer, position_description) - + score_result = score_answer( + bedrock, question, answer, position_description) + # Update the Q&A record with score and summary update_qa_with_score(dynamodb, qa_id, score_result) - + processed_records += 1 - print(f"Successfully scored Q&A {qa_id}: {score_result['score']}/10") - + print( + f"Successfully scored Q&A {qa_id}: {score_result['score']}/10") + except Exception as e: - print(f"Error processing record {record.get('dynamodb', {}).get('Keys', {})}: {str(e)}") + print( + f"Error processing record {record.get('dynamodb', {}).get('Keys', {})}: {str(e)}") # Continue processing other records even if one fails continue - + return { 'statusCode': 200, 'processed_records': processed_records @@ -71,13 +75,15 @@ def get_position_description(dynamodb, interview_id): try: table = dynamodb.Table('interview_transcriptions') response = table.get_item(Key={'id': interview_id}) - + if 'Item' not in response: print(f"Interview {interview_id} not found, using default") return "No specific position requirements available" - - return response['Item'].get('position_description', 'No specific position requirements available') - + + return response['Item'].get( + 'position_description', + 'No specific position requirements available') + except Exception as e: print(f"Error getting position description: {str(e)}") return "No specific position requirements available" @@ -88,13 +94,14 @@ def score_answer(bedrock_client, question, answer, position_description): Use Claude 4 Sonnet to score an interview answer against vacancy requirements. Includes retry logic for throttling. """ - + # Get the inference profile ARN from environment variables inference_profile_arn = os.environ.get('BEDROCK_INFERENCE_PROFILE_ARN') if not inference_profile_arn: - raise Exception("BEDROCK_INFERENCE_PROFILE_ARN environment variable not set") + raise Exception( + "BEDROCK_INFERENCE_PROFILE_ARN environment variable not set") - system_prompt = """You are an expert technical interviewer and hiring manager. + system_prompt = """You are an expert technical interviewer and hiring manager. You must evaluate interview answers against job requirements and return ONLY valid JSON.""" user_prompt = f"""Evaluate this interview answer against the job requirements. @@ -109,7 +116,7 @@ def score_answer(bedrock_client, question, answer, position_description): Evaluate the answer on these criteria: - Technical accuracy and depth - Relevance to the question asked -- Alignment with position requirements +- Alignment with position requirements - Communication clarity - Specific examples or evidence provided @@ -142,16 +149,17 @@ def score_answer(bedrock_client, question, answer, position_description): # Retry logic with exponential backoff for throttling max_retries = 3 base_delay = 1.0 # Start with 1 second - + for attempt in range(max_retries + 1): try: # Add jitter to prevent thundering herd if attempt > 0: jitter = random.uniform(0.5, 1.5) delay = (base_delay * (2 ** (attempt - 1))) * jitter - print(f"Attempt {attempt + 1}, waiting {delay:.2f} seconds before retry") + print( + f"Attempt {attempt + 1}, waiting {delay:.2f} seconds before retry") time.sleep(delay) - + response = bedrock_client.invoke_model( modelId=inference_profile_arn, body=json.dumps(request_body), @@ -162,11 +170,12 @@ def score_answer(bedrock_client, question, answer, position_description): response_body = json.loads(response['body'].read()) content_text = response_body['content'][0]['text'] break # Success, exit retry loop - + except ClientError as e: error_code = e.response['Error']['Code'] if error_code == 'ThrottlingException' and attempt < max_retries: - print(f"Throttling detected, attempt {attempt + 1}/{max_retries + 1}") + print( + f"Throttling detected, attempt {attempt + 1}/{max_retries + 1}") continue else: print(f"Bedrock error after {attempt + 1} attempts: {str(e)}") @@ -180,41 +189,42 @@ def score_answer(bedrock_client, question, answer, position_description): # Parse the JSON response try: score_data = json.loads(content_text.strip()) - + # Validate response format - if not isinstance(score_data, dict) or 'score' not in score_data or 'summary' not in score_data: + if not isinstance( + score_data, + dict) or 'score' not in score_data or 'summary' not in score_data: raise ValueError("Invalid response format") - + # Validate score range score = int(score_data['score']) if score < 0 or score > 10: raise ValueError(f"Score out of range: {score}") - + return { 'score': score, 'summary': score_data['summary'].strip() } - + except (json.JSONDecodeError, ValueError, KeyError) as e: print(f"Failed to parse scoring response: {content_text}") print(f"Parse error: {str(e)}") - + # Fallback: try to extract JSON from wrapped text content_clean = content_text.strip() if content_clean.startswith('```json'): content_clean = content_clean[7:-3].strip() elif content_clean.startswith('```'): content_clean = content_clean[3:-3].strip() - + try: score_data = json.loads(content_clean) - score = int(score_data.get('score', 5)) # Default to 5 if missing + # Default to 5 if missing + score = int(score_data.get('score', 5)) score = max(0, min(10, score)) # Clamp to 0-10 range - - return { - 'score': score, - 'summary': score_data.get('summary', 'Unable to generate summary').strip() - } + + return {'score': score, 'summary': score_data.get( + 'summary', 'Unable to generate summary').strip()} except (json.JSONDecodeError, ValueError): print(f"Failed to parse after cleanup: {content_clean}") # Ultimate fallback @@ -224,11 +234,11 @@ def score_answer(bedrock_client, question, answer, position_description): } except Exception as e: - print(f"Error calling Bedrock for answer scoring after all retries: {str(e)}") + print( + f"Error calling Bedrock for answer scoring after all retries: {str(e)}") return { 'score': 5, - 'summary': 'Unable to score answer due to system error - please retry later' - } + 'summary': 'Unable to score answer due to system error - please retry later'} def update_qa_with_score(dynamodb, qa_id, score_result): @@ -237,7 +247,7 @@ def update_qa_with_score(dynamodb, qa_id, score_result): """ try: table = dynamodb.Table('interview_qa') - + table.update_item( Key={'id': qa_id}, UpdateExpression='SET answer_score = :score, answer_summary = :summary, processing_status = :status', @@ -247,7 +257,7 @@ def update_qa_with_score(dynamodb, qa_id, score_result): ':status': 'scored' } ) - + except Exception as e: print(f"Error updating Q&A {qa_id} with score: {str(e)}") - raise \ No newline at end of file + raise diff --git a/src/functions/qa_extractor/main.py b/src/functions/qa_extractor/main.py index 1626d4f..5795e0d 100644 --- a/src/functions/qa_extractor/main.py +++ b/src/functions/qa_extractor/main.py @@ -32,7 +32,9 @@ def handler(event, context): transcript = interview_data['interview_transcript'] # Step 2: Extract Q&A pairs using Bedrock Claude 4 Sonnet - qa_pairs = extract_qa_pairs(bedrock, transcript, interview_data.get('position_description', '')) + qa_pairs = extract_qa_pairs( + bedrock, transcript, interview_data.get( + 'position_description', '')) # Step 3: Save Q&A pairs to DynamoDB qa_table = dynamodb.Table('interview_qa') @@ -128,7 +130,8 @@ def extract_qa_pairs(bedrock_client, transcript, position_description): # Get the inference profile ARN from environment variables inference_profile_arn = os.environ.get('BEDROCK_INFERENCE_PROFILE_ARN') if not inference_profile_arn: - raise Exception("BEDROCK_INFERENCE_PROFILE_ARN environment variable not set") + raise Exception( + "BEDROCK_INFERENCE_PROFILE_ARN environment variable not set") response = bedrock_client.invoke_model( modelId=inference_profile_arn, @@ -147,14 +150,14 @@ def extract_qa_pairs(bedrock_client, transcript, position_description): except json.JSONDecodeError as e: print(f"Failed to parse JSON response: {content}") print(f"JSON Error: {str(e)}") - + # Try to extract JSON from text if wrapped in other content content_clean = content.strip() if content_clean.startswith('```json'): content_clean = content_clean[7:-3].strip() elif content_clean.startswith('```'): content_clean = content_clean[3:-3].strip() - + try: qa_data = json.loads(content_clean) qa_pairs = qa_data.get('qa_pairs', []) @@ -171,22 +174,24 @@ def extract_qa_pairs(bedrock_client, transcript, position_description): for pair in qa_pairs: if (isinstance(pair, dict) and 'question' in pair and 'answer' in pair and - len(pair['question'].strip()) > 10 and # Minimum question length + # Minimum question length + len(pair['question'].strip()) > 10 and len(pair['answer'].strip()) > 20): # Minimum answer length - + # Clean and validate the question question = pair['question'].strip() answer = pair['answer'].strip() - - # Skip if question doesn't end with proper punctuation or seem complete - if not (question.endswith('?') or question.endswith('.') or - 'what' in question.lower() or 'how' in question.lower() or - 'why' in question.lower() or 'when' in question.lower() or - 'where' in question.lower() or 'can you' in question.lower() or - 'tell me' in question.lower() or 'describe' in question.lower()): + + # Skip if question doesn't end with proper punctuation or seem + # complete + if not (question.endswith('?') or question.endswith('.') or + 'what' in question.lower() or 'how' in question.lower() or + 'why' in question.lower() or 'when' in question.lower() or + 'where' in question.lower() or 'can you' in question.lower() or + 'tell me' in question.lower() or 'describe' in question.lower()): print(f"Skipping incomplete question: {question[:50]}...") continue - + validated_pairs.append({ 'question': question, 'answer': answer, diff --git a/src/functions/s3_ingest_handler/main.py b/src/functions/s3_ingest_handler/main.py index 1000ea2..2f5d468 100644 --- a/src/functions/s3_ingest_handler/main.py +++ b/src/functions/s3_ingest_handler/main.py @@ -19,7 +19,8 @@ def handler(event, context): bucket = record['s3']['bucket']['name'] key = unquote_plus(record['s3']['object']['key']) - # Extract position_name from path (e.g., "python_senior/1.m4a" -> "python_senior") + # Extract position_name from path (e.g., "python_senior/1.m4a" -> + # "python_senior") path_parts = key.split('/') if len(path_parts) < 2: print(f"Invalid path structure: {key}") diff --git a/src/functions/transcribe_processor/main.py b/src/functions/transcribe_processor/main.py index cea2cd0..91d138a 100644 --- a/src/functions/transcribe_processor/main.py +++ b/src/functions/transcribe_processor/main.py @@ -39,7 +39,6 @@ def handler(event, context): print(f"Vacancy file not found: {vacancy_key}") position_description = f"Position: {position_name}" - # Step 2: Start Transcribe job job_name = f"interview-{interview_id}" audio_uri = f"s3://{bucket}/{key}" @@ -111,13 +110,16 @@ def check_transcription_status(event, context): # or https://bucket.s3.region.amazonaws.com/key if '.s3.' in transcript_uri: # Format: https://bucket.s3.region.amazonaws.com/key - uri_without_protocol = transcript_uri.replace('https://', '') + uri_without_protocol = transcript_uri.replace( + 'https://', '') parts = uri_without_protocol.split('/') - result_bucket = parts[0].split('.')[0] # Extract bucket from hostname + # Extract bucket from hostname + result_bucket = parts[0].split('.')[0] result_key = '/'.join(parts[1:]) # Everything after bucket else: # Format: https://s3.region.amazonaws.com/bucket/key - uri_without_protocol = transcript_uri.replace('https://', '') + uri_without_protocol = transcript_uri.replace( + 'https://', '') parts = uri_without_protocol.split('/') result_bucket = parts[1] # Second part is bucket result_key = '/'.join(parts[2:]) # Everything after bucket @@ -144,7 +146,8 @@ def check_transcription_status(event, context): utterances = build_utterances_with_timestamps(segments, items) # Save to DynamoDB without large raw data (due to 400KB item size limit) - # For large transcripts, we'll store utterances for chunking and skip raw data + # For large transcripts, we'll store utterances for chunking and + # skip raw data table = dynamodb.Table('interview_transcriptions') # Calculate sizes to determine what we can store @@ -173,13 +176,15 @@ def check_transcription_status(event, context): # For very large transcripts, store summary info only item[ 'interview_transcript'] = f"[Large transcript - {len(utterances)} utterances, {transcript_size} bytes]" - print(f"Transcript too large ({transcript_size} bytes), storing summary only") + print( + f"Transcript too large ({transcript_size} bytes), storing summary only") # Store utterances separately due to size # We'll chunk and process them in the next step # For now, just store a sample for debugging if len(utterances) > 10: - item['utterances_sample'] = utterances[:5] + utterances[-5:] # First and last 5 + item['utterances_sample'] = utterances[:5] + \ + utterances[-5:] # First and last 5 else: item['utterances_sample'] = utterances @@ -330,8 +335,10 @@ def build_utterances_with_timestamps(segments, items): # Calculate average confidence as Decimal avg_confidence = Decimal('0') if current_words: - confidence_sum = sum(w['confidence'] for w in current_words) - avg_confidence = confidence_sum / Decimal(str(len(current_words))) + confidence_sum = sum(w['confidence'] + for w in current_words) + avg_confidence = confidence_sum / \ + Decimal(str(len(current_words))) utterances.append({ 'utterance_id': utterance_id, diff --git a/stacks/dynamodb_stack.py b/stacks/dynamodb_stack.py index 4094a3d..efea85e 100644 --- a/stacks/dynamodb_stack.py +++ b/stacks/dynamodb_stack.py @@ -18,22 +18,21 @@ def __init__( # Interview Transcriptions Table # PK: id (uuid) - # Attributes: id, position_name, position_description, interview_transcript, created_at + # Attributes: id, position_name, position_description, + # interview_transcript, created_at self.interview_transcriptions_table = dynamodb.Table( self, "InterviewTranscriptionsTable", table_name="interview_transcriptions", partition_key=dynamodb.Attribute( name="id", - type=dynamodb.AttributeType.STRING - ), + type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, removal_policy=RemovalPolicy.DESTROY, encryption=dynamodb.TableEncryption.CUSTOMER_MANAGED, encryption_key=kms_key, point_in_time_recovery_specification=dynamodb.PointInTimeRecoverySpecification( - point_in_time_recovery_enabled=True - ), + point_in_time_recovery_enabled=True), ) # Add GSI for querying by position_name @@ -48,7 +47,8 @@ def __init__( # Interview Q&A Table # PK: id (uuid) - # Attributes: id, interview_id, index, question, answer, answer_score, answer_summary, created_at + # Attributes: id, interview_id, index, question, answer, answer_score, + # answer_summary, created_at self.interview_qa_table = dynamodb.Table( self, "InterviewQATable", @@ -77,4 +77,3 @@ def __init__( ), projection_type=dynamodb.ProjectionType.ALL ) - diff --git a/stacks/kms_stack.py b/stacks/kms_stack.py index 39c5360..cc78fdf 100644 --- a/stacks/kms_stack.py +++ b/stacks/kms_stack.py @@ -26,12 +26,11 @@ def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: "kms:GenerateDataKey", ], resources=["*"], - principals=[iam.ServicePrincipal("lambda.amazonaws.com")], + principals=[ + iam.ServicePrincipal("lambda.amazonaws.com")], conditions={ "StringEquals": { "kms:CallerAccount": self.account, "kms:ViaService": f"lambda.{self.region}.amazonaws.com", - } - }, - ) - ) + }}, + )) diff --git a/stacks/step_functions_stack.py b/stacks/step_functions_stack.py index b7b705f..a57d8a3 100644 --- a/stacks/step_functions_stack.py +++ b/stacks/step_functions_stack.py @@ -59,14 +59,16 @@ def __init__( } ) - # Lambda function for Q&A extraction using Bedrock Claude Sonnet (single-pass) + # Lambda function for Q&A extraction using Bedrock Claude Sonnet + # (single-pass) self.qa_extractor_function = _lambda.Function( self, "QAExtractorFunction", runtime=_lambda.Runtime.PYTHON_3_12, code=_lambda.Code.from_asset("src/functions/qa_extractor"), handler="main.handler", - timeout=Duration.minutes(15), # Increased timeout for large transcripts + timeout=Duration.minutes(15), + # Increased timeout for large transcripts environment={ "KMS_KEY_ID": kms_key.key_id, "BEDROCK_INFERENCE_PROFILE_ARN": bedrock_inference_profile_arn, @@ -87,7 +89,6 @@ def __init__( } ) - # Grant permissions to Lambda functions kms_key.grant_encrypt_decrypt(self.transcribe_processor_function) kms_key.grant_encrypt_decrypt(self.transcribe_status_checker_function) @@ -124,7 +125,8 @@ def __init__( "transcribe:StartTranscriptionJob", "transcribe:GetTranscriptionJob", "transcribe:ListTranscriptionJobs", - # Custom vocabulary permissions for Russian language optimization + # Custom vocabulary permissions for Russian language + # optimization "transcribe:CreateVocabulary", "transcribe:GetVocabulary", "transcribe:ListVocabularies", @@ -167,7 +169,8 @@ def __init__( # Grant Bedrock permissions for Q&A extraction # Need permissions for: # 1. The inference profile (can be cross-region) - # 2. The underlying foundation model in any region (inference profiles route across regions) + # 2. The underlying foundation model in any region (inference profiles + # route across regions) bedrock_policy = iam.PolicyStatement( effect=iam.Effect.ALLOW, actions=[ @@ -176,12 +179,12 @@ def __init__( resources=[ # Inference profile bedrock_inference_profile_arn, - # Foundation model in any region (needed for cross-region inference) + # Foundation model in any region (needed for cross-region + # inference) "arn:aws:bedrock:*::foundation-model/anthropic.claude-sonnet-4-20250514-v1:0" ] ) - self.transcribe_processor_function.add_to_role_policy( dynamodb_transcriptions_policy) self.transcribe_status_checker_function.add_to_role_policy( @@ -200,14 +203,15 @@ def __init__( dynamodb_stack.interview_qa_table, starting_position=_lambda.StartingPosition.LATEST, batch_size=3, # Reduced batch size to limit concurrent Bedrock calls - max_batching_window=Duration.seconds(10), # Increased window for better batching + # Increased window for better batching + max_batching_window=Duration.seconds(10), retry_attempts=2, # Reduced retries since we handle retries in the function parallelization_factor=1, # Process records sequentially to avoid throttling - max_record_age=Duration.hours(1), # Skip records older than 1 hour + # Skip records older than 1 hour + max_record_age=Duration.hours(1), ) ) - # Step Functions tasks start_transcription_task = tasks.LambdaInvoke( self, @@ -264,7 +268,8 @@ def __init__( .when( sfn.Condition.string_equals( "$.status_result.Payload.transcribe_status", "COMPLETED"), - # Direct Q&A extraction with Claude Sonnet (single-pass) + # Direct Q&A extraction with Claude Sonnet + # (single-pass) qa_extraction_task.next(success_state) ) .when(