diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 4997929..5ae4e82 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -32,8 +32,7 @@ jobs: run: | python -m pip install --upgrade pip pip install --upgrade pyyaml - pip install boto3 - pip install requests + pip install boto3 requests schema - name: Deploy to ${{ matrix.environment.name }} continue-on-error: true env: @@ -42,9 +41,10 @@ jobs: SPLUNK_USERNAME: ${{ secrets.SPLUNK_USERNAME }} SPLUNK_PASSWORD: ${{ secrets.SPLUNK_PASSWORD }} SPLUNK_TOKEN: ${{ secrets[matrix.environment.token_key] }} + DEPLOYMENT_CONFIG_PATH: "environments/${{ matrix.environment.name }}" run: | echo "Deploying to ${{ matrix.environment.name }} environment" - python -u deploy.py ${{ matrix.environment.name }} + python -u deploy.py - name: Upload deployment report as artifact uses: actions/upload-artifact@v3 with: diff --git a/README.md b/README.md index 3110457..9249faf 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Splunk Apps Deployment Architecture -This is just an idea developed for Philips Electronics Nederland within the context of [JIRA ticket](https://splunk.atlassian.net/browse/FDSE-2571). To be extended and used at own risk. +This is just an idea developed within the context of [JIRA ticket](https://splunk.atlassian.net/browse/FDSE-2571). To be extended and used at own risk. Assumptions: * All apps are stored into a single GitHub repository @@ -17,6 +17,10 @@ Assumptions: │ ├── collections.conf │ └── logging.conf ├── deploy.py +├── modules +│ ├── apps_processing.py +│ ├── report_generator.py +│ └── splunkcloud.py └── environments ├── prod │ ├── es @@ -38,6 +42,7 @@ Assumptions: * deployment instructions per each environment (`deployment.yml`) * specific apps configurations (e.g. `uat/es/app1`) * `deploy.py` Used by the automation to perform the deployment +* `modules/` Contains methods used in deployment automation This repository follows the same structure. Please navigate it to verify its content. @@ -45,9 +50,11 @@ This repository follows the same structure. Please navigate it to verify its con As mentioned, these deployment files specify the apps and configurations needed on each specific environment. Example: ```yml target: - url: + url: https://admin.splunk.com/{stack} + experience: apps: # Private apps + # - Leave empty if target does not need private apps app1: s3-bucket: bucket-1 source: apps/app1.tgz @@ -57,7 +64,8 @@ apps: - ./app1/*.conf splunkbase-apps: # Splunkbase apps - cb-protection-app-for-splunk: + # - Leave empty if target does not need private apps + Cb Protection App for Splunk: version: 1.0.0 ``` diff --git a/deploy.py b/deploy.py index 3cf0329..c886b38 100644 --- a/deploy.py +++ b/deploy.py @@ -1,115 +1,92 @@ -import sys -import json import os +import boto3 -import yaml +from modules.splunkcloud import SplunkCloudConnector +from modules.apps_processing import AppFilesProcessor, DeploymentParser +from modules.report_generator import DeploymentReportGenerator -from utils import * +DEPLOYMENT_CONFIG_PATH = os.getenv("DEPLOYMENT_CONFIG_PATH") -# FOR LOCAL TESTING -# from dotenv import load_dotenv -# load_dotenv(dotenv_path="local.env") def main(): - if len(sys.argv) != 2: - print("Usage: python script.py ") - sys.exit(1) + # Initiate deployment report + deployment_report = DeploymentReportGenerator() + # Initiate AwsS3Connector object + s3_connector = boto3.client("s3") + # Initiate DeploymentParser object + config = DeploymentParser() + # Initiate AppFilesProcessor object + app_processor = AppFilesProcessor(config) + # Initiate SplunkCloudConnector object + cloud_connector = SplunkCloudConnector(config.url, config.cloud_experience) - yaml_file_path = "environments/" + sys.argv[1] + "/deployment.yml" - - deployment_report = {} - - try: - data = read_yaml(yaml_file_path) - except FileNotFoundError: - print(f"Error: The file '{yaml_file_path}' was not found.") - except yaml.YAMLError as e: - print(f"Error parsing YAML file: {e}") - sys.exit(1) - ### 1. Validate data and retrieve all apps listed in deployment.yml from S3 ### - private_apps, splunkbase_apps = validate_data(data) - # List all apps in yaml file and then their S3 bucket - if private_apps: - apps = data.get("apps", {}).keys() - s3_buckets = [data["apps"][app]["s3-bucket"] for app in apps] - app_directories = [data["apps"][app]["source"] for app in apps] - target_url = data["target"]["url"] - # Download all apps from S3 - if private_apps: - print("Found private apps in deployment.yml, starting deployment...") - for app, bucket, directory in zip(apps, s3_buckets, app_directories): - object_name = directory + # Check for private apps + if config.has_private_apps(): + print("Found private apps, starting deployment...") + # Loop through all apps + for app in config.private_apps.keys(): + bucket = config.get_bucket(app) + app_path = config.get_app_path(app) file_name = f"{app}.tgz" # Donwload app from S3 - download_file_from_s3(bucket, object_name, file_name) + try: + s3_connector.download_file(bucket, app_path, file_name) + except Exception as e: + raise Exception(f"Error downloading {app_path} from {bucket}: {e}") - ### 2. Upload_local_configuration ### - # Check if the configuration exists for the app - path = os.path.join("environments", sys.argv[1], app) - print(path) - if path: - unpack_merge_conf_and_meta_repack(app, path) + ### Upload_local_configuration ### + # Check whether the app needs specific configs for this env + path = os.path.join(DEPLOYMENT_CONFIG_PATH, app) + if len(config.get_app_configs(app)) > 0: + app_processor.unpack_merge_conf_and_meta_repack(app, path) else: - print(f"No configuration found for app {app}. Skipping.") + print(f"No configurations needed for app {app}. Skipping.") - ### 3. Validate app for Splunk Cloud ### - report, token = cloud_validate_app(app) - if report is None: - print(f"App {app} failed validation.") - deployment_report[app] = {"validation": "failed"} + ### Validate app for Splunk Cloud ### + appinspect_handler = cloud_connector.get_appinspect_handler() + is_valid = appinspect_handler.validate(app) + if not is_valid: + print(f"App {app} failed validation. Skipping distribution.\n") + deployment_report.add_data(app, ("report", appinspect_handler.report)) + deployment_report.add_data(app, ("validation", "failed")) + deployment_report.add_data( + app, ("distribution", "failed due to app validation error") + ) continue - result = report["summary"] - deployment_report[app] = report - ### 4. If app is valid, distribute it ### - if ( - result["error"] == 0 - and result["failure"] == 0 - and result["manual_check"] == 0 - ): - distribution_status = distribute_app(app, target_url, token) - if distribution_status == 200: - print(f"App {app} successfully distributed.\n") - deployment_report[app]["distribution"] = "success" - else: - print(f"App {app} failed distribution.") - deployment_report[app][ - "distribution" - ] = f"failed with status code: {distribution_status}" + ### App is valid: distribute it ### + deployment_report.add_data(app, ("report", appinspect_handler.report)) + dist_succeeded, dist_status = cloud_connector.distribute(app) + if dist_succeeded: + print(f"App {app} successfully distributed.\n") + deployment_report.add_data(app, ("distribution", "success")) else: - print(f"App {app} failed validation. Skipping distribution.\n") - deployment_report[app][ - "distribution" - ] = "failed due to app validation error" + print(f"App {app} failed distribution.") + deployment_report.add_data( + app, + ( + "distribution", + f"failed with status code: {dist_status}", + ), + ) else: - print("No private apps found in deployment.yml, skipping...") + print("No private apps found, skipping...") - ### 5. Handle Splunkbase apps ### - if splunkbase_apps: - print("Found Splunkbase apps in deployment.yml, starting deployment...") - splunkbase_apps_dict = data.get("splunkbase-apps", {}) - for splunkbase_app in splunkbase_apps_dict: - app = splunkbase_apps_dict[splunkbase_app] - app_name = splunkbase_app - version = app['version'] - app_id = get_app_id(app_name) - token = os.getenv("SPLUNK_TOKEN") - license = get_license_url(app_name) - install_status = install_splunkbase_app(app_name, app_id, version, target_url, token, license) - print(f"App {app_name} installation status: {install_status}") - deployment_report[app_name] = { - "splunkbase_installation": install_status, - "version": version, - "app_id": app_id, - } + ### Handle Splunkbase apps ### + if config.has_splunkbase_apps(): + print("Found Splunkbase apps, starting deployment...") + for splunkbase_app in config.splunkbase_apps.keys(): + version = config.get_version(splunkbase_app) + install_status = cloud_connector.install(splunkbase_app, version) + print(f"App {splunkbase_app} installation status: {install_status}") + deployment_report.add_data( + splunkbase_app, + {"splunkbase_installation": install_status, "version": version}, + ) else: - print("No Splunkbase apps found in deployment.yml, skipping...") + print("No Splunkbase apps found, skipping...") - ### 6. Save deployment report to json file ### - report_prefix = f"{sys.argv[1].split('/')[-2]}_{sys.argv[1].split('/')[-1]}" - output_dir = "artifacts" - os.makedirs(output_dir, exist_ok=True) - with open(f"{output_dir}/{report_prefix}_deployment_report.json", "w") as file: - json.dump(deployment_report, file) + ### Save deployment report to json file ### + deployment_report.generate_report() if __name__ == "__main__": diff --git a/environments/prod/es/deployment.yml b/environments/prod/es/deployment.yml index 96aca8a..8d7522d 100644 --- a/environments/prod/es/deployment.yml +++ b/environments/prod/es/deployment.yml @@ -1,5 +1,6 @@ target: url: https://splunk-es-prod.example.com + experience: classic apps: app1: s3-bucket: splunk-apps-deployment diff --git a/environments/prod/ses/deployment.yml b/environments/prod/ses/deployment.yml index 3c57cb4..70159ea 100644 --- a/environments/prod/ses/deployment.yml +++ b/environments/prod/ses/deployment.yml @@ -1,6 +1,8 @@ target: url: + experience: apps: app1: s3-bucket: splunk-apps-deployment - source: apps/Splunk_TA_app1.tgz \ No newline at end of file + source: apps/Splunk_TA_app1.tgz +splunkbase-apps: diff --git a/environments/uat/es/deployment.yml b/environments/uat/es/deployment.yml index 522eaf8..8d5e980 100644 --- a/environments/uat/es/deployment.yml +++ b/environments/uat/es/deployment.yml @@ -1,5 +1,6 @@ target: - url: https://staging.admin.splunk.com/scv-shw-217bd09bcbf264/adminconfig/v2/apps/victoria + url: https://staging.admin.splunk.com/scv-shw-a7f6020a334e01 + experience: victoria apps: Splunk_TA_app1: s3-bucket: splunk-apps-deployment @@ -12,7 +13,6 @@ apps: config: - ./buttercup_app_for_splunk/*.conf splunkbase-apps: - # Splunkbase apps Splunk Add-on for Amazon Web Services: version: 7.9.0 Cisco Networks Add-on for Splunk Enterprise: diff --git a/environments/uat/ses/deployment.yml b/environments/uat/ses/deployment.yml index 75ba15b..2b9d76e 100644 --- a/environments/uat/ses/deployment.yml +++ b/environments/uat/ses/deployment.yml @@ -1,5 +1,6 @@ target: - url: https://staging.admin.splunk.com/scv-shw-d037e758abafa2/adminconfig/v2/apps/victoria + url: https://staging.admin.splunk.com/scv-shw-d037e758abafa2 + experience: victoria apps: Splunk_TA_app1: s3-bucket: splunk-apps-deployment @@ -7,7 +8,7 @@ apps: config: - ./Splunk_TA_app1/*.conf splunkbase-apps: - TA for Tebable: + Tenable Add-On for Splunk: version: 7.0.0 - TA for MS Azure: + Splunk Add on for Microsoft Azure: version: 4.2.0 diff --git a/modules/apps_processing.py b/modules/apps_processing.py new file mode 100644 index 0000000..240aed9 --- /dev/null +++ b/modules/apps_processing.py @@ -0,0 +1,287 @@ +import sys +import os +import yaml +import shutil +import configparser +import tarfile +import json +from io import StringIO +from schema import ( + Schema, + SchemaError, + Or, + Optional, + Regex +) + + +deployment_schema = Schema({ + "target": { + "url": str, + "experience": Or("classic", "victoria") + }, + "apps": { + Optional(str): { + "s3-bucket": str, + "source": str, + Optional("config"): [ + str + ] + } + }, + "splunkbase-apps": { + Optional(str): { + "version": Regex(r"^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$") + } + } +}) + +class DeploymentParser: + """Class for parsing the deployment configuration file.""" + private_apps: dict = {} + splunkbase_apps: dict = {} + target: dict = {} + + def __init__(self): + # Read and parse data + if not "DEPLOYMENT_CONFIG_PATH" in os.environ: + raise Exception( + f"Error - Environment variable DEPLOYMENT_CONFIG_PATH does not exist." + ) + yml_path = os.path.join(os.getenv("DEPLOYMENT_CONFIG_PATH"), "deployment.yml") + + try: + with open(yml_path, "r") as file: + data = yaml.safe_load(file) + deployment_schema.validate(data) + + self.private_apps = data.get("apps", {}) + self.splunkbase_apps = data.get("splunkbase-apps", {}) + self.target = data.get("target", {}) + except FileNotFoundError: + raise Exception(f"File not found: {yml_path}") + except yaml.YAMLError as e: + raise Exception(f"Error parsing YAML file: {e}") + except SchemaError as se: + raise Exception(f"Error validating {yml_path}: {se}") + + def has_private_apps(self) -> bool: + """ + Check if private apps are present in the deployment configuration. + + has_private_apps() -> bool + """ + return True if self.private_apps else False + + def has_splunkbase_apps(self) -> bool: + """ + Check if Splunkbase apps are present in the deployment configuration. + + has_splunkbase_apps() -> bool + """ + return True if self.splunkbase_apps else False + + @property + def url(self) -> str: + """ + Return the targeted url from the deployment configuration. + """ + return self.target["url"] + + @property + def cloud_experience(self) -> str: + """ + Return the targeted platform cloud experience from the deployment configuration. + """ + return self.target["experience"] + + def get_bucket(self, app: str) -> str: + """ + Return the app S3 bucket from the deployment configuration. + + get_bucket(app) -> str + """ + return self.private_apps[app]["s3-bucket"] + + def get_app_path(self, app: str) -> str: + """ + Return the app path from the deployment configuration. + + get_app_path(app) -> str + """ + return self.private_apps[app]["source"] + + def get_app_configs(self, app: str) -> list: + """ + Return a list of app configuration paths from the deployment configuration. + + get_app_configs(app) -> list + """ + return self.private_apps[app]["config"] + + def get_version(self, app: str) -> str: + """ + Return the Splunkbase app version from the deployment configuration. + + get_version(app) -> str + """ + return self.splunkbase_apps[app]["version"] + + +class AppFilesProcessor: + """Class for handling local app files and configurations.""" + + def __init__(self, deployment_parser: DeploymentParser): + self.deployment_config = deployment_parser + + def _preprocess_empty_headers(self, file_path: str) -> list: + """ + Preprocess the file to handle empty section headers by replacing `[]` with a valid section name. + """ + valid_lines = [] + with open(file_path, "r") as file: + for line in file: + # Replace empty section headers with a placeholder + if line.strip() == "[]": + valid_lines.append("[DEFAULT]\n") # Or any placeholder section name + else: + valid_lines.append(line) + return valid_lines + + def _replace_default_with_empty_header(self, file_path: str) -> None: + """ + Replace '[DEFAULT]' header with '[]' in the specified file. + """ + with open(file_path, "r") as file: + lines = file.readlines() + + with open(file_path, "w") as file: + for line in lines: + # Replace '[DEFAULT]' with '[]' + if line.strip() == "[DEFAULT]": + file.write("[]\n") + else: + file.write(line) + + def merge_or_copy_conf(self, source_path: str, dest_path: str) -> None: + # Get the filename from the source path + filename = os.path.basename(source_path) + dest_file = os.path.join(dest_path, filename) + + # Check if the file exists in the destination directory + if not os.path.exists(dest_file): + # If the file doesn't exist, copy it + shutil.copy(source_path, dest_path) + print(f"Copied {filename} to {dest_path}") + else: + # If the file exists, merge the configurations + print(f"Merging {filename} with existing file in {dest_path}") + + # Read the source file + source_config = configparser.ConfigParser() + source_config.read(source_path) + + # Read the destination file + dest_config = configparser.ConfigParser() + dest_config.read(dest_file) + + # Merge source into destination + for section in source_config.sections(): + if not dest_config.has_section(section): + dest_config.add_section(section) + for option, value in source_config.items(section): + dest_config.set(section, option, value) + + # Write the merged configuration back to the destination file + with open(dest_file, "w") as file: + dest_config.write(file) + print(f"Merged configuration saved to {dest_file}") + + def merge_or_copy_meta(self, local_meta_file: str, default_dir: str) -> None: + """Merge local.meta with default.meta""" + filename = os.path.basename(local_meta_file) + dest_file = os.path.join(default_dir, "default.meta") + + # Check if the file exists in the destination directory + if not os.path.exists(dest_file): + # If the file doesn't exist, copy it + shutil.copy(local_meta_file, dest_file) + print(f"Copied {filename} to {dest_file}") + else: + # If the file exists, merge the configurations + print(f"Merging {filename} with existing file in {dest_file}") + + # Preprocess the default file + default_preprocessed_lines = self._preprocess_empty_headers(dest_file) + default_preprocessed_content = StringIO("".join(default_preprocessed_lines)) + + # Read the default.meta file + default_meta = configparser.ConfigParser() + default_meta.read_file(default_preprocessed_content) + + # Preprocess the local file + local_preprocessed_lines = self._preprocess_empty_headers(local_meta_file) + local_preprocessed_content = StringIO("".join(local_preprocessed_lines)) + + # Read the local.meta file + local_meta = configparser.ConfigParser() + local_meta.read_file(local_preprocessed_content) + + # Merge local.meta into default.meta + for section in local_meta.sections(): + if not default_meta.has_section(section): + default_meta.add_section(section) + for option, value in local_meta.items(section): + if default_meta.has_option(section, option): + # Merge logic: Option exists in both, decide whether to overwrite + default_value = default_meta.get(section, option) + if value != default_value: + print( + f"Conflict detected: {section} {option} - {default_value} -> {value}" + ) + # Overwrite the option in default.meta + default_meta.set(section, option, value) + default_meta.set(section, option, value) + + # Write the merged configuration back to the output file + with open(dest_file, "w") as file: + default_meta.write(file) + + # Replace '[DEFAULT]' with '[]' in the output file + self._replace_default_with_empty_header(dest_file) + + print(f"Merged metadata saved to {dest_file}") + + def unpack_merge_conf_and_meta_repack(self, app: str, path: str) -> None: + """Unpack the app, load environment configuration files and repack the app.""" + temp_dir = "temp_unpack" + os.makedirs(temp_dir, exist_ok=True) + + # Unpack the tar.gz file + with tarfile.open(f"{app}.tgz", "r:gz") as tar: + tar.extractall(path=temp_dir) + # Create default directory for unpacked app + base_default_dir = f"{temp_dir}/{app}" + # Load the environment configuration files + app_dir = path + # Copy all .conf files in app_dir to temp_dir of unpacked app + for file in os.listdir(app_dir): + if file.endswith(".conf"): + default_dir = base_default_dir + "/default" + os.makedirs(default_dir, exist_ok=True) + source_path = os.path.join(app_dir, file) + self.merge_or_copy_conf(source_path, default_dir) + # Copy all metadata files in app_dir to temp_dir of unpacked app + for file in os.listdir(app_dir): + if file.endswith(".meta"): + default_dir = base_default_dir + "/metadata" + os.makedirs(default_dir, exist_ok=True) + source_path = os.path.join(app_dir, file) + self.merge_or_copy_meta(source_path, default_dir) + # Repack the app and place it in the root directory + with tarfile.open(f"{app}.tgz", "w:gz") as tar: + for root, _, files in os.walk(f"{temp_dir}/{app}"): + for file in files: + full_path = os.path.join(root, file) + arcname = os.path.relpath(full_path, temp_dir) + tar.add(full_path, arcname=arcname) diff --git a/modules/report_generator.py b/modules/report_generator.py new file mode 100644 index 0000000..e7d86ed --- /dev/null +++ b/modules/report_generator.py @@ -0,0 +1,37 @@ +import os +import json +from typing import Union + + +class DeploymentReportGenerator: + """Class for generating deployment report.""" + + def __init__(self): + self.deployment_report = {} + + def __str__(self) -> str: + return str(self.deployment_report) + + def add_data(self, key: str, value: Union[tuple, dict]) -> None: + """Add data to deployment report.""" + deployment_report = self.deployment_report + if key not in deployment_report: + deployment_report[key] = {} + # Handle situation if passed value is a dictionary + if isinstance(value, dict): + deployment_report[key].update(value) + # Handle situation if passed value is a tuple + elif isinstance(value, tuple): + deployment_report[key][value[0]] = value[1] + else: + raise ValueError("Value must be a tuple or a dictionary.") + + def generate_report(self) -> None: + """Generate deployment report.""" + DEPLOYMENT_CONFIG_PATH = os.getenv("DEPLOYMENT_CONFIG_PATH") + report_prefix = f"{DEPLOYMENT_CONFIG_PATH.split('/')[-2]}_{DEPLOYMENT_CONFIG_PATH.split('/')[-1]}" + output_dir = "artifacts" + + os.makedirs(output_dir, exist_ok=True) + with open(f"{output_dir}/{report_prefix}_deployment_report.json", "w") as file: + json.dump(self.deployment_report, file) diff --git a/modules/splunkcloud.py b/modules/splunkcloud.py new file mode 100644 index 0000000..0d6ec42 --- /dev/null +++ b/modules/splunkcloud.py @@ -0,0 +1,253 @@ +import os +import requests +import time + +import xml.etree.ElementTree as ET + +# TODO remove following. Used for local testing only. +# from dotenv import load_dotenv +# load_dotenv(dotenv_path="local.env") + + +class SplunkCloudAccountConfig: + username: str = os.getenv("SPLUNK_USERNAME") + password: str = os.getenv("SPLUNK_PASSWORD") + token: str = os.getenv("SPLUNK_TOKEN") + + @classmethod + def to_dict(cls) -> dict: + return cls.__dict__ + + +class AppInspectService: + base_url: str = "https://appinspect.splunk.com/v1" + auth_url: str = "https://api.splunk.com/2.0/rest/login/splunk" + report: dict = {} + tags: str = "private_victoria" + + def __init__(self, cloud_type: str = "victoria"): + self.account = SplunkCloudAccountConfig.to_dict() + self.tags = f"private_{cloud_type}" + + def get_token(self) -> str: + """ + Authenticate to the Splunk Cloud. + + get_token() -> token : str + """ + try: + response = requests.get( + self.auth_url, auth=(self.account["username"], self.account["password"]) + ) + token = response.json()["data"]["token"] + print("AppInspectService: get_token() - success") + return token + except requests.exceptions.RequestException as e: + print(f"Error getting token: {e}") + return None + + def _get_request_id(self, headers: dict, files: dict) -> str: + """ + Helper function to make a validation request and return the request ID. + + _get_request_id(headers, files) -> request_id : str + """ + url = f"{self.base_url}/app/validate" + + try: + response = requests.post(url, headers=headers, files=files, timeout=120) + response_json = response.json() + request_id = response_json["request_id"] + except requests.exceptions.RequestException as e: + print(f"Error making app validation request: {e}") + return None + return request_id + + def validate(self, app: str) -> bool: + """ + Validate the app for the Splunk Cloud. + + validate(app) -> is_valid: bool + """ + token = self.get_token() + headers = {"Authorization": f"Bearer {token}"} + + print(f"Validating app {app}...") + with open(f"{app}.tgz", "rb") as file: + request_id = self._get_request_id(headers, {"app_package": file}) + status_url = f"{self.base_url}/app/validate/status/{request_id}?included_tags={self.tags}" + + try: + response = requests.get(status_url, headers=headers) + except requests.exceptions.RequestException as e: + print(f"Error: {e}") + return None, None + + max_retries = 60 # Maximum number of retries + retries = 0 + response_json = response.json() + + while response_json["status"] != "SUCCESS" and retries < max_retries: + response = requests.get(status_url, headers=headers) + response_json = response.json() + retries += 1 + if response_json["status"] == "FAILURE": + print(f"App {app} failed validation: {response_json['errors']}") + break + else: + print(f"App {app} awaiting validation...") + print(f"Current status: {response_json['status']}") + if response_json["status"] == "SUCCESS": + break + time.sleep(10) + if retries == max_retries: + print(f"App {app} validation timed out.") + return + + if response_json["status"] == "SUCCESS": + print("App validation successful.") + + response = requests.get( + f"{self.base_url}/app/report/{request_id}?included_tags=private_victoria", + headers=headers, + ) + self.report = response.json() + summary = self.report["summary"] + + return ( + summary["error"] == 0 + and summary["failure"] == 0 + and summary["manual_check"] == 0 + ) + + +class SplunkbaseService: + base_url: str = "https://splunkbase.splunk.com/api/v1/app" + auth_url: str = "https://splunkbase.splunk.com/api/account:login" + token: str = None + + def __init__(self): + self.account = SplunkCloudAccountConfig.to_dict() + + def get_app_info(self, app_name: str) -> dict: + params = {"query": app_name, "limit": 1} + response = requests.get(self.base_url, params=params) + if len(response.json().get("results")) > 0: + return response.json().get("results")[0] + else: + print(f"App {app_name} not found on Splunkbase.") + return None + + def _authenticate(self) -> None: + """ + Authenticate to Splunkbase. + + _authenticate() -> token : str + """ + data = { + "username": self.account["username"], + "password": self.account["password"], + } + + response = requests.post(self.auth_url, data=data) + if response.ok: + # Parse the XML response + xml_root = ET.fromstring(response.text) + # Extract the token from the tag + namespace = {"atom": "http://www.w3.org/2005/Atom"} # Define the namespace + splunkbase_token = xml_root.find( + "atom:id", namespace + ).text # Find the tag with the namespace + self.token = splunkbase_token + else: + print("Splunkbase login failed!") + print(f"Status code: {response.status_code}") + print(response.text) + + def get_token(self): + if not self.token: + self._authenticate() + return self.token + + +class SplunkCloudConnector: + """Class for connecting to Splunk Cloud and Splunkbase.""" + + def __init__(self, splunk_host: str = None, cloud_type: str = "victoria"): + self.config = SplunkCloudAccountConfig.to_dict() + self.appinspect = AppInspectService() + self.splunkbase = SplunkbaseService() + self.host = splunk_host + if cloud_type == "classic": + cloud_type = "" + self.cloud_type = f"/{cloud_type}" + + def get_appinspect_handler(self): + return self.appinspect + + def distribute(self, app: str) -> tuple: + """ + Distribute a private app to the target URL. + + distribute(app) -> was_successful : bool, status_code: int + """ + url = f"{self.host}/adminconfig/v2/apps{self.cloud_type}" + print(f"Distributing {app} to {url}") + headers = { + "X-Splunk-Authorization": self.appinspect.get_token(), + "Authorization": f"Bearer {self.config.get('token')}", + "ACS-Legal-Ack": "Y", + } + try: + with open(f"{app}.tgz", "rb") as file: + response = requests.post(url, headers=headers, data=file) + print(f"Distributed {app} to {url} with response: {response.status_code}") + except Exception as e: + print(f"Error distributing {app} to {url}: {e}") + return False, 500 + + return response.status_code == 200, response.status_code + + def install(self, app: str, version: str) -> str: + """ + Install a Splunkbase app. + + install(app, version) -> status : str + """ + token = self.splunkbase.get_token() + url = f"{self.host}/adminconfig/v2/apps{self.cloud_type}?splunkbase=true" + app_info = self.splunkbase.get_app_info(app) + headers = { + "X-Splunkbase-Authorization": token, + "ACS-Licensing-Ack": app_info.get("license_url"), + "Authorization": f"Bearer {self.config.get('token')}", + "Content-Type": "application/x-www-form-urlencoded", + } + data = {"splunkbaseID": app_info.get("uid"), "version": version} + + response = requests.post(url, headers=headers, data=data) + # Handle the case where the app is already installed + if response.status_code == 409: + print(f"App {app} is already installed.") + app_name = app_info.get("appid") + print(f"Updating app {app} ({app_name}) to version {version}...") + # Update the app + url = f"{self.host}/{app_name}" + data = {"version": version} + response = requests.patch(url, headers=headers, data=data) + return "success - existing app updated" + + if response.ok: + request_status = response.json()["status"] + print(f"Request status: {request_status}") + if request_status in ("installed", "processing"): + print(f"App {app} version {version} installation successful.") + return "success" + else: + print(f"App {app} version {version} installation failed.") + return f"failed with status: {request_status} - {response.text}" + + print("Request failed!") + print(f"Status code: {response.status_code}") + print(response.text) + return f"failed with status code: {response.status_code} - {response.text}" diff --git a/utils.py b/utils.py deleted file mode 100644 index f32d2a9..0000000 --- a/utils.py +++ /dev/null @@ -1,471 +0,0 @@ -import sys -import time -import json -import os -import tarfile -import shutil - -import yaml -import boto3 -import requests -import itertools -import configparser -import xml.etree.ElementTree as ET - -from io import StringIO - - -SPLUNK_APPINSPECT_BASE_URL = "https://appinspect.splunk.com/v1" -SPLUNKBASE_BASE_URL = "https://splunkbase.splunk.com/api/account:login" -SPLUNK_AUTH_BASE_URL = "https://api.splunk.com/2.0/rest/login/splunk" - -def read_yaml(file_path: str) -> dict: - """Read and return the contents of a YAML file.""" - with open(file_path, "r") as file: - return yaml.safe_load(file) - -def check_all_letter_cases(base_path: str, app_name: str) -> str: - """Check all letter cases for the app configuration.""" - # Generate all case combinations of "app" - case_variations = map("".join, itertools.product(*([char.lower(), char.upper()] for char in app_name))) - - # Check each variation in the path - for variation in case_variations: - path = os.path.join("environments", base_path, variation) - if os.path.exists(path): - print(f"Found: {path}") - return path - return None - -def validate_data(data: dict) -> tuple: - """ - Validate the data in the YAML file. - - Return boolean values for private_apps and splunkbase_apps presence in the environment configuration - - validate_data(data) -> (bool, bool) - """ - if "apps" not in data: - print("Error: The 'apps' key is missing in deploy.yml fime.") - sys.exit(1) - if "target" not in data: - print("Error: The 'target' key is missing in deploy.yml file.") - sys.exit(1) - if "url" not in data["target"]: - print("Error: The 'url' key is missing in the 'target' section.") - sys.exit(1) - if "splunkbase-apps" not in data: - print("Error: The 'splunkbase-apps' key is missing.") - sys.exit(1) - - app_dict = data.get("apps", {}) - splunkbase_dict = data.get("splunkbase-apps", {}) - - private_apps = True if app_dict else False - splunkbase_apps = True if splunkbase_dict else False - - return private_apps, splunkbase_apps - -def download_file_from_s3(bucket_name: str, object_name: str, file_name: str) -> None: - """Download a file from an S3 bucket.""" - s3 = boto3.client( - "s3", - aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), - aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), - ) - try: - s3.download_file(bucket_name, object_name, file_name) - print(f"Downloaded {object_name} from {bucket_name} to {file_name}") - except Exception as e: - print(f"Error downloading {object_name} from {bucket_name}: {e}") - -def preprocess_empty_headers(file_path: str) -> list: - """ - Preprocess the file to handle empty section headers by replacing `[]` with a valid section name. - """ - valid_lines = [] - with open(file_path, 'r') as file: - for line in file: - # Replace empty section headers with a placeholder - if line.strip() == "[]": - valid_lines.append("[DEFAULT]\n") # Or any placeholder section name - else: - valid_lines.append(line) - return valid_lines - -def replace_default_with_empty_header(file_path: str) -> None: - """ - Replace '[DEFAULT]' header with '[]' in the specified file. - """ - with open(file_path, 'r') as file: - lines = file.readlines() - - with open(file_path, 'w') as file: - for line in lines: - # Replace '[DEFAULT]' with '[]' - if line.strip() == "[DEFAULT]": - file.write("[]\n") - else: - file.write(line) - -def merge_or_copy_conf(source_path: str, dest_path: str) -> None: - # Get the filename from the source path - filename = os.path.basename(source_path) - dest_file = os.path.join(dest_path, filename) - - # Check if the file exists in the destination directory - if not os.path.exists(dest_file): - # If the file doesn't exist, copy it - shutil.copy(source_path, dest_path) - print(f"Copied {filename} to {dest_path}") - else: - # If the file exists, merge the configurations - print(f"Merging {filename} with existing file in {dest_path}") - - # Read the source file - source_config = configparser.ConfigParser() - source_config.read(source_path) - - # Read the destination file - dest_config = configparser.ConfigParser() - dest_config.read(dest_file) - - # Merge source into destination - for section in source_config.sections(): - if not dest_config.has_section(section): - dest_config.add_section(section) - for option, value in source_config.items(section): - dest_config.set(section, option, value) - - # Write the merged configuration back to the destination file - with open(dest_file, 'w') as file: - dest_config.write(file) - print(f"Merged configuration saved to {dest_file}") - -def merge_or_copy_meta(local_meta_file: str, default_dir: str) -> None: - """Merge local.meta with default.meta""" - filename = os.path.basename(local_meta_file) - dest_file = os.path.join(default_dir, "default.meta") - - # Check if the file exists in the destination directory - if not os.path.exists(dest_file): - # If the file doesn't exist, copy it - shutil.copy(local_meta_file, dest_file) - print(f"Copied {filename} to {dest_file}") - else: - # If the file exists, merge the configurations - print(f"Merging {filename} with existing file in {dest_file}") - - # Preprocess the default file - default_preprocessed_lines = preprocess_empty_headers(dest_file) - default_preprocessed_content = StringIO(''.join(default_preprocessed_lines)) - - # Read the default.meta file - default_meta = configparser.ConfigParser() - default_meta.read_file(default_preprocessed_content) - - # Preprocess the local file - local_preprocessed_lines = preprocess_empty_headers(local_meta_file) - local_preprocessed_content = StringIO(''.join(local_preprocessed_lines)) - - # Read the local.meta file - local_meta = configparser.ConfigParser() - local_meta.read_file(local_preprocessed_content) - - # Merge local.meta into default.meta - for section in local_meta.sections(): - if not default_meta.has_section(section): - default_meta.add_section(section) - for option, value in local_meta.items(section): - if default_meta.has_option(section, option): - # Merge logic: Option exists in both, decide whether to overwrite - default_value = default_meta.get(section, option) - if value != default_value: - print(f"Conflict detected: {section} {option} - {default_value} -> {value}") - # Overwrite the option in default.meta - default_meta.set(section, option, value) - default_meta.set(section, option, value) - - # Write the merged configuration back to the output file - with open(dest_file, 'w') as file: - default_meta.write(file) - - # Replace '[DEFAULT]' with '[]' in the output file - replace_default_with_empty_header(dest_file) - - print(f"Merged metadata saved to {dest_file}") - - -def unpack_merge_conf_and_meta_repack(app: str, path: str) -> None: - """Unpack the app, load environment configuration files and repack the app.""" - temp_dir = "temp_unpack" - os.makedirs(temp_dir, exist_ok=True) - - # Unpack the tar.gz file - with tarfile.open(f"{app}.tgz", "r:gz") as tar: - tar.extractall(path=temp_dir) - # Create default directory for unpacked app - base_default_dir = f"{temp_dir}/{app}" - # Load the environment configuration files - app_dir = path - # Copy all .conf files in app_dir to temp_dir of unpacked app - for file in os.listdir(app_dir): - if file.endswith(".conf"): - default_dir = base_default_dir + "/default" - os.makedirs(default_dir, exist_ok=True) - source_path = os.path.join(app_dir, file) - merge_or_copy_conf(source_path, default_dir) - # Copy all metadata files in app_dir to temp_dir of unpacked app - for file in os.listdir(app_dir): - if file.endswith(".meta"): - default_dir = base_default_dir + "/metadata" - os.makedirs(default_dir, exist_ok=True) - source_path = os.path.join(app_dir, file) - merge_or_copy_meta(source_path, default_dir) - # Repack the app and place it in the root directory - with tarfile.open(f"{app}.tgz", "w:gz") as tar: - for root, _, files in os.walk(f"{temp_dir}/{app}"): - for file in files: - full_path = os.path.join(root, file) - arcname = os.path.relpath(full_path, temp_dir) - tar.add(full_path, arcname=arcname) - -def get_appinspect_token() -> str: - """ - Authenticate to the Splunk Cloud. - - get_appinspect_token() -> token : str - """ - url = SPLUNK_AUTH_BASE_URL - username = os.getenv("SPLUNK_USERNAME") - password = os.getenv("SPLUNK_PASSWORD") - - response = requests.get(url, auth=(username, password)) - token = response.json()["data"]["token"] - return token - - -def validation_request_helper(url: str, headers: dict , files: dict) -> str: - """ - Helper function to make a validation request and return the request ID. - - validation_request_helper(url, headers, files) -> request_id : str - """ - try: - response = requests.post(url, headers=headers, files=files, timeout=120) - response_json = response.json() - request_id = response_json["request_id"] - except requests.exceptions.RequestException as e: - print(f"Error: {e}") - return None - return request_id - - -def cloud_validate_app(app: str) -> tuple: - """ - Validate the app for the Splunk Cloud. - - cloud_validate_app(app) -> report : dict, token : str - """ - token = get_appinspect_token() - base_url = SPLUNK_APPINSPECT_BASE_URL - url = f"{base_url}/app/validate" - - headers = {"Authorization": f"Bearer {token}"} - app_file_path = f"{app}.tgz" - - print(f"Validating app {app}...") - with open(app_file_path, "rb") as file: - files = {"app_package": file} - request_id = validation_request_helper(url, headers, files) - headers = {"Authorization": f"Bearer {token}"} - status_url = f"{base_url}/app/validate/status/{request_id}?included_tags=private_victoria" - try: - response_status = requests.get(status_url, headers=headers) - except requests.exceptions.RequestException as e: - print(f"Error: {e}") - return None, None - - max_retries = 60 # Maximum number of retries - retries = 0 - response_status_json = response_status.json() - - while response_status_json["status"] != "SUCCESS" and retries < max_retries: - response_status = requests.get(status_url, headers=headers) - response_status_json = response_status.json() - retries += 1 - if response_status_json["status"] == "FAILURE": - print(f"App {app} failed validation: {response_status_json['errors']}") - break - else: - print(f"App {app} awaiting validation...") - print(f"Current status: {response_status_json['status']}") - time.sleep(10) - response_status = requests.get(status_url, headers=headers) - response_status_json = response_status.json() - continue - if retries == max_retries: - print(f"App {app} validation timed out.") - return - - print(f"Current status: {response_status_json['status']}") - if response_status_json["status"] == "SUCCESS": - print("App validation successful.") - print("Installing app...") - - response_report = requests.get( - f"{base_url}/app/report/{request_id}?included_tags=private_victoria", - headers=headers, - ) - report = response_report.json() - result = report["summary"] - print(result) - - return report, token - - -def distribute_app(app: str, target_url: str, token: str) -> int: - """ - Distribute the app to the target URL. - - distribute_app(app, target_url, token) -> status_code : int - """ - print(f"Distributing {app} to {target_url}") - url = target_url - admin_token = os.getenv("SPLUNK_TOKEN") - headers = { - "X-Splunk-Authorization": token, - "Authorization": f"Bearer {admin_token}", - "ACS-Legal-Ack": "Y", - } - file_path = f"{app}.tgz" - try: - with open(file_path, "rb") as file: - response = requests.post(url, headers=headers, data=file) - print( - f"Distributed {app} to {target_url} with response: {response.status_code} {response.text}" - ) - except Exception as e: - print(f"Error distributing {app} to {target_url}: {e}") - return 500 - - return response.status_code - -def authenticate_splunkbase() -> str: - """ - Authenticate to Splunkbase. - - authenticate_splunkbase() -> token : str - """ - url = SPLUNKBASE_BASE_URL - data = { - 'username': os.getenv("SPLUNK_USERNAME"), - 'password': os.getenv("SPLUNK_PASSWORD") - } - response = requests.post(url, data=data) - - if response.ok: - # Parse the XML response - xml_root = ET.fromstring(response.text) - # Extract the token from the tag - namespace = {'atom': 'http://www.w3.org/2005/Atom'} # Define the namespace - splunkbase_token = xml_root.find('atom:id', namespace).text # Find the tag with the namespace - return splunkbase_token - else: - print("Splunkbase login failed!") - print(f"Status code: {response.status_code}") - print(response.text) - return None - -def install_splunkbase_app(app: str, app_id: str, version: str, target_url: str, token: str, licence: str) -> str: - """ - Install a Splunkbase app. - - install_splunkbase_app(app, app_id, version, target_url, token, licence) -> status : str - """ - # Authenticate to Splunkbase - splunkbase_token = authenticate_splunkbase() - # Install the app - url = f"{target_url}?splunkbase=true" - - headers = { - 'X-Splunkbase-Authorization': splunkbase_token, - 'ACS-Licensing-Ack': licence, - 'Authorization': f'Bearer {token}', - 'Content-Type': 'application/x-www-form-urlencoded', - } - - data = { - 'splunkbaseID': app_id, - 'version': version - } - - response = requests.post(url, headers=headers, data=data) - # Handle the case where the app is already installed - if response.status_code == 409: - print(f"App {app} is already installed.") - print(f"Updating app {app} to version {version}...") - # Get app name - url = f"https://splunkbase.splunk.com/api/v1/app/{app_id}" - response = requests.get(url) - app_name = response.json().get('appid') - print(f"App name: {app_name}") - # Update the app - url = f"{target_url}/{app_name}" - data = { - 'version': version - } - response = requests.patch(url, headers=headers, data=data) - return "success - existing app updated" - elif response.ok: - request_status = response.json()['status'] - print(f"Request status: {request_status}") - if request_status in ("installed", "processing"): - print(f"App {app} version {version} installation successful.") - return "success" - else: - print(f"App {app} version {version} installation failed.") - return f"failed with status: {request_status} - {response.text}" - else: - print("Request failed!") - print(f"Status code: {response.status_code}") - print(response.text) - return f"failed with status code: {response.status_code} - {response.text}" - -def get_app_id(app_name: str) -> str: - """ - Get the Splunkbase app ID. - - get_app_id(app_name) -> app_id : str - """ - url = f"https://splunkbase.splunk.com/api/v1/app" - params = { - "query": app_name, - "limit": 1 - } - response = requests.get(url, params=params) - if len(response.json().get('results')) > 0: - app_id = response.json().get('results')[0].get('uid') - return app_id - else: - print(f"App {app_name} not found on Splunkbase.") - return None - -def get_license_url(app_name: str) -> str: - """ - Get the licence URL for a Splunkbase app. - - get_licence_url(app_name) -> licence_url : str - """ - url = f"https://splunkbase.splunk.com/api/v1/app" - params = { - "query": app_name, - "limit": 1 - } - response = requests.get(url, params=params) - if len(response.json().get('results')) > 0: - license_url = response.json().get('results')[0].get('license_url') - return license_url - else: - print(f"App {app_name} not found on Splunkbase.") - return None