|
| 1 | +#!/usr/bin/env python |
| 2 | + |
| 3 | +""" |
| 4 | +Simple script that allows enabling/disabling access from the sample app's Lambda |
| 5 | +functions to the DynamoDB tables, by removing/adding permissions from/to the role |
| 6 | +policy that manages access. |
| 7 | +This can be handy to showcase IAM enforcement in LocalStack (IAM soft mode and hard mode). |
| 8 | +""" |
| 9 | + |
| 10 | +import json |
| 11 | +import os |
| 12 | +import sys |
| 13 | + |
| 14 | +import boto3 |
| 15 | + |
| 16 | +os.environ.setdefault("AWS_DEFAULT_REGION", "us-east-1") |
| 17 | +os.environ.setdefault("AWS_ACCESS_KEY_ID", "test") |
| 18 | +os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "test") |
| 19 | +iam_client = boto3.client("iam", endpoint_url="http://localhost:4566") |
| 20 | + |
| 21 | + |
| 22 | +def update_role_policy(allow: bool): |
| 23 | + roles = iam_client.list_roles() |
| 24 | + for role in roles["Roles"]: |
| 25 | + in_scope = ( |
| 26 | + "QuizAppStack-ScoringFunctionLambdaFun" in role["RoleName"] |
| 27 | + or "QuizAppStack-ListPublicQuizzes" in role["RoleName"] |
| 28 | + or "QuizAppStack-" in role["RoleName"] |
| 29 | + ) |
| 30 | + if not in_scope: |
| 31 | + continue |
| 32 | + |
| 33 | + # list policies for this role |
| 34 | + response = iam_client.list_role_policies(RoleName=role["RoleName"]) |
| 35 | + for policy_name in response["PolicyNames"]: |
| 36 | + response = iam_client.get_role_policy( |
| 37 | + RoleName=role["RoleName"], PolicyName=policy_name |
| 38 | + ) |
| 39 | + # get the policy document |
| 40 | + policy_doc = response["PolicyDocument"] |
| 41 | + # remove all policies that contain a statement with "dynamodb:GetItem" |
| 42 | + policy_doc["Statement"] = [ |
| 43 | + stmt |
| 44 | + for stmt in policy_doc["Statement"] |
| 45 | + if "dynamodb:GetItem" not in stmt["Action"] |
| 46 | + ] |
| 47 | + if allow: |
| 48 | + # if we're in `allow` mode, add a statement with the required actions back to the policy |
| 49 | + policy_doc["Statement"].append( |
| 50 | + { |
| 51 | + "Action": [ |
| 52 | + "dynamodb:BatchGetItem", |
| 53 | + "dynamodb:BatchWriteItem", |
| 54 | + "dynamodb:ConditionCheckItem", |
| 55 | + "dynamodb:DeleteItem", |
| 56 | + "dynamodb:DescribeTable", |
| 57 | + "dynamodb:GetItem", |
| 58 | + "dynamodb:GetRecords", |
| 59 | + "dynamodb:GetShardIterator", |
| 60 | + "dynamodb:PutItem", |
| 61 | + "dynamodb:Query", |
| 62 | + "dynamodb:Scan", |
| 63 | + "dynamodb:UpdateItem", |
| 64 | + ], |
| 65 | + "Effect": "Allow", |
| 66 | + "Resource": [ |
| 67 | + "arn:aws:dynamodb:us-east-1:000000000000:table/Quizzes", |
| 68 | + "arn:aws:dynamodb:us-east-1:000000000000:table/UserSubmissions", |
| 69 | + "arn:aws:dynamodb:us-east-1:000000000000:table/UserSubmissions/index/*", |
| 70 | + ], |
| 71 | + } |
| 72 | + ) |
| 73 | + |
| 74 | + if not policy_doc["Statement"]: |
| 75 | + # hack/workaround: statement cannot be fully empty, so we're adding a single dummy entry here |
| 76 | + policy_doc["Statement"].append( |
| 77 | + { |
| 78 | + "Action": ["dynamodb:ConditionCheckItem"], |
| 79 | + "Effect": "Allow", |
| 80 | + "Resource": [ |
| 81 | + "arn:aws:dynamodb:us-east-1:000000000000:table/Quizzes" |
| 82 | + ], |
| 83 | + } |
| 84 | + ) |
| 85 | + |
| 86 | + # update the role policy |
| 87 | + iam_client.put_role_policy( |
| 88 | + RoleName=role["RoleName"], |
| 89 | + PolicyName=policy_name, |
| 90 | + PolicyDocument=json.dumps(policy_doc), |
| 91 | + ) |
| 92 | + |
| 93 | + |
| 94 | +def main(): |
| 95 | + if len(sys.argv) <= 1 or sys.argv[1] not in ("enable", "disable"): |
| 96 | + raise Exception("Usage: update_policy.py [enable | disable]") |
| 97 | + update_role_policy(allow=sys.argv[1] == "enable") |
| 98 | + |
| 99 | + |
| 100 | +if __name__ == "__main__": |
| 101 | + main() |
0 commit comments