diff --git a/social_media_analyzer/heuristics.py b/social_media_analyzer/heuristics.py index 36cb54d..6ff31f3 100644 --- a/social_media_analyzer/heuristics.py +++ b/social_media_analyzer/heuristics.py @@ -239,6 +239,7 @@ def generate_suspicious_url_patterns(legitimate_domains): "SWIFT_BIC_ADDRESS": 3.0, "PHONE_NUMBER_UNSOLICITED": 1.0, "SUSPICIOUS_URL_PATTERN": 3.0, # High weight for matching a suspicious URL pattern + "GOOGLE_SAFE_BROWSING_HIT": 10.0, # Very high weight for a positive Google Safe Browsing match } if __name__ == '__main__': diff --git a/social_media_analyzer/main.py b/social_media_analyzer/main.py index e12e7f3..ab32a70 100644 --- a/social_media_analyzer/main.py +++ b/social_media_analyzer/main.py @@ -1,7 +1,12 @@ +import os from . import fake_profile_detector from . import scam_detector from . import fake_news_detector +def get_api_key(): + """Gets the Google API key from environment variables.""" + return os.environ.get("GOOGLE_API_KEY") + def analyze_news_url(): """Analyzes a news URL for potential fake news.""" url_to_check = input("Please enter the full URL of the news article you want to analyze: ").strip() @@ -22,7 +27,7 @@ def analyze_news_url(): for indicator in result['indicators_found']: print(f"- {indicator}") -def analyze_website_url(): +def analyze_website_url(api_key): """Analyzes a website URL for potential scams.""" url_to_check = input("Please enter the full URL you want to analyze: ").strip() if not url_to_check: @@ -34,7 +39,7 @@ def analyze_website_url(): url_to_check = 'http://' + url_to_check print("\n--- Analyzing URL ---") - is_susp, reason = scam_detector.is_url_suspicious(url_to_check, platform="general_web") + is_susp, reason = scam_detector.is_url_suspicious(url_to_check, platform="general_web", api_key=api_key) if is_susp: print(f"\n[!] The URL '{url_to_check}' is flagged as IMMEDIATELY SUSPICIOUS.") print(f"Reason: {reason}") @@ -58,7 +63,7 @@ def analyze_website_url(): for indicator in content_result['indicators_found']: print(f"- {indicator}") -def analyze_social_media(): +def analyze_social_media(api_key): """Handles the analysis of social media platforms.""" platforms = sorted([ "facebook", "instagram", "whatsapp", "tiktok", "tinder", "snapchat", @@ -108,7 +113,7 @@ def analyze_social_media(): elif analysis_choice == 3: message = input("Paste the message you want to analyze: ").strip() if message: - result = scam_detector.analyze_text_for_scams(message, platform) + result = scam_detector.analyze_text_for_scams(message, platform, api_key=api_key) print("\n--- Scam Analysis Results ---") print(f"Score: {result['score']} (Higher is more suspicious)") print("Indicators Found:") @@ -127,8 +132,13 @@ def analyze_social_media(): def main(): """Main function to run the security analyzer.""" + api_key = get_api_key() print("--- Universal Security Analyzer ---") print("This tool helps you analyze social media, messages, and websites for potential scams and fake news.") + if not api_key: + print("\n[!] Google Safe Browsing API key not found.") + print(" To enable real-time URL checking against Google's threat database,") + print(" please set the GOOGLE_API_KEY environment variable.") while True: print("\n--- Main Menu ---") @@ -140,9 +150,9 @@ def main(): try: choice = int(input("Enter your choice (1-4): ")) if choice == 1: - analyze_social_media() + analyze_social_media(api_key) elif choice == 2: - analyze_website_url() + analyze_website_url(api_key) elif choice == 3: analyze_news_url() elif choice == 4: diff --git a/social_media_analyzer/requirements.txt b/social_media_analyzer/requirements.txt new file mode 100644 index 0000000..f229360 --- /dev/null +++ b/social_media_analyzer/requirements.txt @@ -0,0 +1 @@ +requests diff --git a/social_media_analyzer/scam_detector.py b/social_media_analyzer/scam_detector.py index fc3ac06..a73562a 100644 --- a/social_media_analyzer/scam_detector.py +++ b/social_media_analyzer/scam_detector.py @@ -1,5 +1,7 @@ import re import urllib.request +import requests +import os from urllib.parse import urlparse from .heuristics import ( URGENCY_KEYWORDS, @@ -17,6 +19,41 @@ SUSPICIOUS_URL_PATTERNS ) +def check_google_safe_browsing(url, api_key): + """ + Checks a URL against the Google Safe Browsing API. + Returns a tuple: (is_suspicious, reason) + """ + if not api_key: + return False, "Google Safe Browsing API key not configured." + + api_url = f"https://safebrowsing.googleapis.com/v4/threatMatches:find?key={api_key}" + payload = { + "client": { + "clientId": "social-media-analyzer", + "clientVersion": "1.0.0" + }, + "threatInfo": { + "threatTypes": ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION"], + "platformTypes": ["ANY_PLATFORM"], + "threatEntryTypes": ["URL"], + "threatEntries": [{"url": url}] + } + } + try: + response = requests.post(api_url, json=payload, timeout=10) + if response.status_code == 200: + data = response.json() + if "matches" in data: + threat_type = data["matches"][0]["threatType"] + return True, f"Flagged by Google Safe Browsing as {threat_type}." + else: + return False, "Clean according to Google Safe Browsing." + else: + return False, f"Google Safe Browsing API error: {response.status_code}" + except requests.RequestException as e: + return False, f"Could not connect to Google Safe Browsing: {e}" + def get_legitimate_domains(platform=None): """ Returns a list of legitimate domains for a given platform, @@ -35,16 +72,24 @@ def get_domain_from_url(url): domain = url.split("/")[0].split("?")[0] return domain.lower() -def is_url_suspicious(url, platform=None): +def is_url_suspicious(url, platform=None, api_key=None): """ - Checks if a URL is suspicious based on various patterns and lists. + Checks if a URL is suspicious based on various patterns and lists, + including Google Safe Browsing. Returns a tuple: (bool_is_suspicious, reason_string) """ + # 1. Google Safe Browsing Check + if api_key: + is_susp, reason = check_google_safe_browsing(url, api_key) + if is_susp: + return True, reason + + # 2. Local Heuristics normalized_url = url.lower() domain = get_domain_from_url(url) legitimate_domains = get_legitimate_domains(platform) - # 1. Check if the domain is in the legitimate list for the platform + # Check if the domain is in the legitimate list for the platform if domain in legitimate_domains: # Still check for impersonation patterns that might include the legit domain for pattern in SUSPICIOUS_URL_PATTERNS: @@ -53,24 +98,24 @@ def is_url_suspicious(url, platform=None): return True, f"URL impersonates a legitimate domain: {pattern}" return False, "URL domain is on the legitimate list." - # 2. Check against known suspicious patterns + # Check against known suspicious patterns for pattern in SUSPICIOUS_URL_PATTERNS: if re.search(pattern, normalized_url, re.IGNORECASE): return True, f"URL matches suspicious pattern: {pattern}" - # 3. Check for suspicious TLDs + # Check for suspicious TLDs suspicious_tld_regex = re.compile(r"\.(" + "|".join(tld.lstrip('.') for tld in SUSPICIOUS_TLDS) + r")$", re.IGNORECASE) if suspicious_tld_regex.search(domain): return True, f"URL uses a potentially suspicious TLD." - # 4. Check if a known legitimate service name is part of the domain, but it's not official + # Check if a known legitimate service name is part of the domain, but it's not official for service in LEGITIMATE_DOMAINS.keys(): if service != "general" and service in domain: return True, f"URL contains the name of a legitimate service ('{service}') but is not an official domain." return False, "URL does not match common suspicious patterns." -def analyze_text_for_scams(text_content, platform=None): +def analyze_text_for_scams(text_content, platform=None, api_key=None): """ Analyzes a block of text content for various scam indicators. """ @@ -103,10 +148,14 @@ def analyze_text_for_scams(text_content, platform=None): # 2. Regex-based checks found_urls = URL_PATTERN.findall(text_content) for url_str in found_urls: - is_susp, reason = is_url_suspicious(url_str, platform) + is_susp, reason = is_url_suspicious(url_str, platform, api_key) url_analysis = {"url": url_str, "is_suspicious": is_susp, "reason": reason} if is_susp: - score += HEURISTIC_WEIGHTS.get("SUSPICIOUS_URL_PATTERN", 3.0) + # Increase score significantly if flagged by Google + if "Google Safe Browsing" in reason: + score += HEURISTIC_WEIGHTS.get("GOOGLE_SAFE_BROWSING_HIT", 10.0) + else: + score += HEURISTIC_WEIGHTS.get("SUSPICIOUS_URL_PATTERN", 3.0) indicators_found.append(f"Suspicious URL found: {url_str} (Reason: {reason})") urls_analyzed_details.append(url_analysis) diff --git a/social_media_analyzer/test_runner.py b/social_media_analyzer/test_runner.py index 25a65d8..c9417d9 100644 --- a/social_media_analyzer/test_runner.py +++ b/social_media_analyzer/test_runner.py @@ -1,6 +1,8 @@ +import unittest +from unittest.mock import patch, Mock from social_media_analyzer.scam_detector import analyze_text_for_scams -if __name__ == '__main__': +def run_manual_tests(): # Example Usage test_cases = { "Instagram Phishing": { @@ -48,3 +50,53 @@ print("URLs Analyzed:") for url_info in analysis_result['urls_analyzed']: print(f" - URL: {url_info['url']}, Suspicious: {url_info['is_suspicious']}, Reason: {url_info['reason']}") + +class TestScamDetector(unittest.TestCase): + @patch('social_media_analyzer.scam_detector.requests.post') + def test_google_safe_browsing_malicious(self, mock_post): + # Mock the API response for a malicious URL + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "matches": [ + { + "threatType": "MALWARE", + "platformType": "ANY_PLATFORM", + "threat": {"url": "http://malware.testing.google.test/testing/malware/"}, + } + ] + } + mock_post.return_value = mock_response + + message = "check this out http://malware.testing.google.test/testing/malware/" + result = analyze_text_for_scams(message, api_key="fake_key") + + self.assertTrue(any("Google Safe Browsing" in reason for reason in result["indicators_found"])) + self.assertEqual(result['urls_analyzed'][0]['is_suspicious'], True) + + @patch('social_media_analyzer.scam_detector.requests.post') + def test_google_safe_browsing_clean(self, mock_post): + # Mock the API response for a clean URL + mock_response = Mock() + mock_response.status_code = 200 + mock_response.json.return_value = {} + mock_post.return_value = mock_response + + message = "this is a clean site http://www.google.com" + result = analyze_text_for_scams(message, api_key="fake_key") + + self.assertFalse(any("Google Safe Browsing" in reason for reason in result["indicators_found"])) + self.assertEqual(result['urls_analyzed'][0]['is_suspicious'], False) + +if __name__ == '__main__': + run_manual_tests() + # Run unit tests + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(TestScamDetector)) + runner = unittest.TextTestRunner() + print("\n--- Running Unit Tests for Google Safe Browsing Integration ---") + result = runner.run(suite) + if result.wasSuccessful(): + print("All tests passed!") + else: + print("Some tests failed.")