-
Notifications
You must be signed in to change notification settings - Fork 0
Refactor: Switch from singular utils.py with all fuctions to class based approach #69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 3 commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
62be394
refactor: swithc from singular utils.py to app_processing, aws_3 and …
bdebek-splunk caff4c7
docs: update README to include new modules directory and its purpose
bdebek-splunk 51fe80b
fix: update validate_data method signature to return a dictionary and…
bdebek-splunk 93b87d2
refactor: update deployment workflow, deleted AwsS3Connector, added …
bdebek-splunk bdd94cc
fix: update deployment report to use add_data method for successful a…
bdebek-splunk 05dbeb9
fix: added report_generator.py to the architectire tree
bdebek-splunk e69d682
style: clean up code formatting and improve type hinting in deploymen…
bdebek-splunk e88b721
refactor: removed hardcoded endpoint from instance url defined in dep…
bdebek-splunk 08c3f0e
refactor: reworked splunkcloud.py and deploy.py modules
bdebek-splunk c8d4813
fix: remove redundant status print statement in AppInspectService
bdebek-splunk ec1c621
docs:updated README
edro15 b2faa37
chore:updated deployment config files to support experience key
edro15 2b21c85
ci:added deps to workflow
edro15 89c53cf
feat+refactor:added yml validation against schema
edro15 d864a9f
docs:edited README
edro15 a57470e
style:removed whitespace
edro15 efecedb
docs:updated README
edro15 060e145
fix: add break condition for successful app validation in AppInspectS…
bdebek-splunk 4c0f6d7
chore: remove unused AWS credentials from deployment workflows and sc…
bdebek-splunk 401e7d7
fix: revert deletion of AWS credentials in github workflow files
bdebek-splunk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,209 @@ | ||
| import sys | ||
| import os | ||
| import yaml | ||
| import shutil | ||
| import configparser | ||
| import tarfile | ||
| from io import StringIO | ||
|
|
||
|
|
||
| class AppFilesProcessor: | ||
| """Class for handling local app files and configurations.""" | ||
|
|
||
| def __init__(self, yml_path): | ||
| self.yml_path = yml_path | ||
|
|
||
| def _read_yaml(self) -> dict: | ||
| """Read and return the contents of a YAML file.""" | ||
| file_path = self.yml_path | ||
| with open(file_path, "r") as file: | ||
| return yaml.safe_load(file) | ||
bdebek-splunk marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def validate_data(self) -> tuple: | ||
| """ | ||
| Validate the data in the YAML file. | ||
| Return boolean values for private_apps and splunkbase_apps presence in the environment configuration | ||
| validate_data(data) -> (dict, bool, bool) | ||
| """ | ||
| try: | ||
| data = self._read_yaml() | ||
| except FileNotFoundError: | ||
| print(f"Error: The file '{self.yml_path}' was not found.") | ||
| except yaml.YAMLError as e: | ||
| print(f"Error parsing YAML file: {e}") | ||
| sys.exit(1) | ||
|
|
||
| if "apps" not in data: | ||
| print("Error: The 'apps' key is missing in deploy.yml fime.") | ||
| sys.exit(1) | ||
| if "target" not in data: | ||
| print("Error: The 'target' key is missing in deploy.yml file.") | ||
| sys.exit(1) | ||
| if "url" not in data["target"]: | ||
| print("Error: The 'url' key is missing in the 'target' section.") | ||
| sys.exit(1) | ||
| if "splunkbase-apps" not in data: | ||
| print("Error: The 'splunkbase-apps' key is missing.") | ||
| sys.exit(1) | ||
|
|
||
| app_dict = data.get("apps", {}) | ||
| splunkbase_dict = data.get("splunkbase-apps", {}) | ||
|
|
||
| private_apps = True if app_dict else False | ||
| splunkbase_apps = True if splunkbase_dict else False | ||
|
|
||
| return data, private_apps, splunkbase_apps | ||
bdebek-splunk marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def _preprocess_empty_headers(self, file_path: str) -> list: | ||
| """ | ||
| Preprocess the file to handle empty section headers by replacing `[]` with a valid section name. | ||
| """ | ||
| valid_lines = [] | ||
| with open(file_path, "r") as file: | ||
| for line in file: | ||
| # Replace empty section headers with a placeholder | ||
| if line.strip() == "[]": | ||
| valid_lines.append("[DEFAULT]\n") # Or any placeholder section name | ||
| else: | ||
| valid_lines.append(line) | ||
| return valid_lines | ||
|
|
||
| def _replace_default_with_empty_header(self, file_path: str) -> None: | ||
| """ | ||
| Replace '[DEFAULT]' header with '[]' in the specified file. | ||
| """ | ||
| with open(file_path, "r") as file: | ||
| lines = file.readlines() | ||
|
|
||
| with open(file_path, "w") as file: | ||
| for line in lines: | ||
| # Replace '[DEFAULT]' with '[]' | ||
| if line.strip() == "[DEFAULT]": | ||
| file.write("[]\n") | ||
| else: | ||
| file.write(line) | ||
|
|
||
| def merge_or_copy_conf(self, source_path: str, dest_path: str) -> None: | ||
| # Get the filename from the source path | ||
| filename = os.path.basename(source_path) | ||
| dest_file = os.path.join(dest_path, filename) | ||
|
|
||
| # Check if the file exists in the destination directory | ||
| if not os.path.exists(dest_file): | ||
| # If the file doesn't exist, copy it | ||
| shutil.copy(source_path, dest_path) | ||
| print(f"Copied {filename} to {dest_path}") | ||
| else: | ||
| # If the file exists, merge the configurations | ||
| print(f"Merging {filename} with existing file in {dest_path}") | ||
|
|
||
| # Read the source file | ||
| source_config = configparser.ConfigParser() | ||
| source_config.read(source_path) | ||
|
|
||
| # Read the destination file | ||
| dest_config = configparser.ConfigParser() | ||
| dest_config.read(dest_file) | ||
|
|
||
| # Merge source into destination | ||
| for section in source_config.sections(): | ||
| if not dest_config.has_section(section): | ||
| dest_config.add_section(section) | ||
| for option, value in source_config.items(section): | ||
| dest_config.set(section, option, value) | ||
|
|
||
| # Write the merged configuration back to the destination file | ||
| with open(dest_file, "w") as file: | ||
| dest_config.write(file) | ||
| print(f"Merged configuration saved to {dest_file}") | ||
|
|
||
| def merge_or_copy_meta(self, local_meta_file: str, default_dir: str) -> None: | ||
| """Merge local.meta with default.meta""" | ||
| filename = os.path.basename(local_meta_file) | ||
| dest_file = os.path.join(default_dir, "default.meta") | ||
|
|
||
| # Check if the file exists in the destination directory | ||
| if not os.path.exists(dest_file): | ||
| # If the file doesn't exist, copy it | ||
| shutil.copy(local_meta_file, dest_file) | ||
| print(f"Copied {filename} to {dest_file}") | ||
| else: | ||
| # If the file exists, merge the configurations | ||
| print(f"Merging {filename} with existing file in {dest_file}") | ||
|
|
||
| # Preprocess the default file | ||
| default_preprocessed_lines = self._preprocess_empty_headers(dest_file) | ||
| default_preprocessed_content = StringIO("".join(default_preprocessed_lines)) | ||
|
|
||
| # Read the default.meta file | ||
| default_meta = configparser.ConfigParser() | ||
| default_meta.read_file(default_preprocessed_content) | ||
|
|
||
| # Preprocess the local file | ||
| local_preprocessed_lines = self._preprocess_empty_headers(local_meta_file) | ||
| local_preprocessed_content = StringIO("".join(local_preprocessed_lines)) | ||
|
|
||
| # Read the local.meta file | ||
| local_meta = configparser.ConfigParser() | ||
| local_meta.read_file(local_preprocessed_content) | ||
|
|
||
| # Merge local.meta into default.meta | ||
| for section in local_meta.sections(): | ||
| if not default_meta.has_section(section): | ||
| default_meta.add_section(section) | ||
| for option, value in local_meta.items(section): | ||
| if default_meta.has_option(section, option): | ||
| # Merge logic: Option exists in both, decide whether to overwrite | ||
| default_value = default_meta.get(section, option) | ||
| if value != default_value: | ||
| print( | ||
| f"Conflict detected: {section} {option} - {default_value} -> {value}" | ||
| ) | ||
| # Overwrite the option in default.meta | ||
| default_meta.set(section, option, value) | ||
| default_meta.set(section, option, value) | ||
|
|
||
| # Write the merged configuration back to the output file | ||
| with open(dest_file, "w") as file: | ||
| default_meta.write(file) | ||
|
|
||
| # Replace '[DEFAULT]' with '[]' in the output file | ||
| self._replace_default_with_empty_header(dest_file) | ||
|
|
||
| print(f"Merged metadata saved to {dest_file}") | ||
|
|
||
| def unpack_merge_conf_and_meta_repack(self, app: str, path: str) -> None: | ||
| """Unpack the app, load environment configuration files and repack the app.""" | ||
| temp_dir = "temp_unpack" | ||
| os.makedirs(temp_dir, exist_ok=True) | ||
|
|
||
| # Unpack the tar.gz file | ||
| with tarfile.open(f"{app}.tgz", "r:gz") as tar: | ||
| tar.extractall(path=temp_dir) | ||
| # Create default directory for unpacked app | ||
| base_default_dir = f"{temp_dir}/{app}" | ||
| # Load the environment configuration files | ||
| app_dir = path | ||
| # Copy all .conf files in app_dir to temp_dir of unpacked app | ||
| for file in os.listdir(app_dir): | ||
| if file.endswith(".conf"): | ||
| default_dir = base_default_dir + "/default" | ||
| os.makedirs(default_dir, exist_ok=True) | ||
| source_path = os.path.join(app_dir, file) | ||
| self.merge_or_copy_conf(source_path, default_dir) | ||
| # Copy all metadata files in app_dir to temp_dir of unpacked app | ||
| for file in os.listdir(app_dir): | ||
| if file.endswith(".meta"): | ||
| default_dir = base_default_dir + "/metadata" | ||
| os.makedirs(default_dir, exist_ok=True) | ||
| source_path = os.path.join(app_dir, file) | ||
| self.merge_or_copy_meta(source_path, default_dir) | ||
| # Repack the app and place it in the root directory | ||
| with tarfile.open(f"{app}.tgz", "w:gz") as tar: | ||
| for root, _, files in os.walk(f"{temp_dir}/{app}"): | ||
| for file in files: | ||
| full_path = os.path.join(root, file) | ||
| arcname = os.path.relpath(full_path, temp_dir) | ||
| tar.add(full_path, arcname=arcname) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| import boto3 | ||
|
|
||
|
|
||
| class AwsS3Connector: | ||
| """Class to connect to AWS S3 and download files.""" | ||
|
|
||
| def __init__(self, aws_access_key_id, aws_secret_access_key): | ||
| self.aws_access_key_id = aws_access_key_id | ||
| self.aws_secret_access_key = aws_secret_access_key | ||
|
|
||
| def download_file_from_s3( | ||
| self, bucket_name: str, object_name: str, file_name: str | ||
| ) -> None: | ||
| """Download a file from an S3 bucket.""" | ||
| s3 = boto3.client( | ||
| "s3", | ||
| aws_access_key_id=self.aws_access_key_id, | ||
| aws_secret_access_key=self.aws_secret_access_key, | ||
| ) | ||
| try: | ||
| s3.download_file(bucket_name, object_name, file_name) | ||
| print(f"Downloaded {object_name} from {bucket_name} to {file_name}") | ||
| except Exception as e: | ||
| print(f"Error downloading {object_name} from {bucket_name}: {e}") |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.