Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions social_media_analyzer/heuristics.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ def generate_suspicious_url_patterns(legitimate_domains):
"SWIFT_BIC_ADDRESS": 3.0,
"PHONE_NUMBER_UNSOLICITED": 1.0,
"SUSPICIOUS_URL_PATTERN": 3.0, # High weight for matching a suspicious URL pattern
"GOOGLE_SAFE_BROWSING_HIT": 10.0, # Very high weight for a positive Google Safe Browsing match
}

if __name__ == '__main__':
Expand Down
22 changes: 16 additions & 6 deletions social_media_analyzer/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import os
from . import fake_profile_detector
from . import scam_detector
from . import fake_news_detector

def get_api_key():
"""Gets the Google API key from environment variables."""
return os.environ.get("GOOGLE_API_KEY")

def analyze_news_url():
"""Analyzes a news URL for potential fake news."""
url_to_check = input("Please enter the full URL of the news article you want to analyze: ").strip()
Expand All @@ -22,7 +27,7 @@ def analyze_news_url():
for indicator in result['indicators_found']:
print(f"- {indicator}")

def analyze_website_url():
def analyze_website_url(api_key):
"""Analyzes a website URL for potential scams."""
url_to_check = input("Please enter the full URL you want to analyze: ").strip()
if not url_to_check:
Expand All @@ -34,7 +39,7 @@ def analyze_website_url():
url_to_check = 'http://' + url_to_check

print("\n--- Analyzing URL ---")
is_susp, reason = scam_detector.is_url_suspicious(url_to_check, platform="general_web")
is_susp, reason = scam_detector.is_url_suspicious(url_to_check, platform="general_web", api_key=api_key)
if is_susp:
print(f"\n[!] The URL '{url_to_check}' is flagged as IMMEDIATELY SUSPICIOUS.")
print(f"Reason: {reason}")
Expand All @@ -58,7 +63,7 @@ def analyze_website_url():
for indicator in content_result['indicators_found']:
print(f"- {indicator}")

def analyze_social_media():
def analyze_social_media(api_key):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): We've found these issues:


Explanation

The quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

  • Reduce the function length by extracting pieces of functionality out into
    their own functions. This is the most important thing you can do - ideally a
    function should be less than 10 lines.
  • Reduce nesting, perhaps by introducing guard clauses to return early.
  • Ensure that variables are tightly scoped, so that code using related concepts
    sits together within the function rather than being scattered.

"""Handles the analysis of social media platforms."""
platforms = sorted([
"facebook", "instagram", "whatsapp", "tiktok", "tinder", "snapchat",
Expand Down Expand Up @@ -108,7 +113,7 @@ def analyze_social_media():
elif analysis_choice == 3:
message = input("Paste the message you want to analyze: ").strip()
if message:
result = scam_detector.analyze_text_for_scams(message, platform)
result = scam_detector.analyze_text_for_scams(message, platform, api_key=api_key)
print("\n--- Scam Analysis Results ---")
print(f"Score: {result['score']} (Higher is more suspicious)")
print("Indicators Found:")
Expand All @@ -127,8 +132,13 @@ def analyze_social_media():

def main():
"""Main function to run the security analyzer."""
api_key = get_api_key()
print("--- Universal Security Analyzer ---")
print("This tool helps you analyze social media, messages, and websites for potential scams and fake news.")
if not api_key:
print("\n[!] Google Safe Browsing API key not found.")
print(" To enable real-time URL checking against Google's threat database,")
print(" please set the GOOGLE_API_KEY environment variable.")

while True:
print("\n--- Main Menu ---")
Expand All @@ -140,9 +150,9 @@ def main():
try:
choice = int(input("Enter your choice (1-4): "))
if choice == 1:
analyze_social_media()
analyze_social_media(api_key)
elif choice == 2:
analyze_website_url()
analyze_website_url(api_key)
elif choice == 3:
analyze_news_url()
elif choice == 4:
Expand Down
1 change: 1 addition & 0 deletions social_media_analyzer/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
requests
67 changes: 58 additions & 9 deletions social_media_analyzer/scam_detector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import re
import urllib.request
import requests
import os
from urllib.parse import urlparse
from .heuristics import (
URGENCY_KEYWORDS,
Expand All @@ -17,6 +19,41 @@
SUSPICIOUS_URL_PATTERNS
)

def check_google_safe_browsing(url, api_key):
"""
Checks a URL against the Google Safe Browsing API.
Returns a tuple: (is_suspicious, reason)
"""
if not api_key:
return False, "Google Safe Browsing API key not configured."

api_url = f"https://safebrowsing.googleapis.com/v4/threatMatches:find?key={api_key}"
payload = {
"client": {
"clientId": "social-media-analyzer",
"clientVersion": "1.0.0"
},
"threatInfo": {
"threatTypes": ["MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION"],
"platformTypes": ["ANY_PLATFORM"],
"threatEntryTypes": ["URL"],
"threatEntries": [{"url": url}]
}
}
try:
response = requests.post(api_url, json=payload, timeout=10)
if response.status_code == 200:
data = response.json()
if "matches" in data:
threat_type = data["matches"][0]["threatType"]
return True, f"Flagged by Google Safe Browsing as {threat_type}."
else:
return False, "Clean according to Google Safe Browsing."
else:
return False, f"Google Safe Browsing API error: {response.status_code}"
except requests.RequestException as e:
return False, f"Could not connect to Google Safe Browsing: {e}"

def get_legitimate_domains(platform=None):
"""
Returns a list of legitimate domains for a given platform,
Expand All @@ -35,16 +72,24 @@ def get_domain_from_url(url):
domain = url.split("/")[0].split("?")[0]
return domain.lower()

def is_url_suspicious(url, platform=None):
def is_url_suspicious(url, platform=None, api_key=None):
"""
Checks if a URL is suspicious based on various patterns and lists.
Checks if a URL is suspicious based on various patterns and lists,
including Google Safe Browsing.
Returns a tuple: (bool_is_suspicious, reason_string)
"""
# 1. Google Safe Browsing Check
if api_key:
is_susp, reason = check_google_safe_browsing(url, api_key)
if is_susp:
return True, reason

# 2. Local Heuristics
normalized_url = url.lower()
domain = get_domain_from_url(url)
legitimate_domains = get_legitimate_domains(platform)

# 1. Check if the domain is in the legitimate list for the platform
# Check if the domain is in the legitimate list for the platform
if domain in legitimate_domains:
# Still check for impersonation patterns that might include the legit domain
for pattern in SUSPICIOUS_URL_PATTERNS:
Expand All @@ -53,24 +98,24 @@ def is_url_suspicious(url, platform=None):
return True, f"URL impersonates a legitimate domain: {pattern}"
return False, "URL domain is on the legitimate list."

# 2. Check against known suspicious patterns
# Check against known suspicious patterns
for pattern in SUSPICIOUS_URL_PATTERNS:
if re.search(pattern, normalized_url, re.IGNORECASE):
return True, f"URL matches suspicious pattern: {pattern}"

# 3. Check for suspicious TLDs
# Check for suspicious TLDs
suspicious_tld_regex = re.compile(r"\.(" + "|".join(tld.lstrip('.') for tld in SUSPICIOUS_TLDS) + r")$", re.IGNORECASE)
if suspicious_tld_regex.search(domain):
return True, f"URL uses a potentially suspicious TLD."

# 4. Check if a known legitimate service name is part of the domain, but it's not official
# Check if a known legitimate service name is part of the domain, but it's not official
for service in LEGITIMATE_DOMAINS.keys():
if service != "general" and service in domain:
return True, f"URL contains the name of a legitimate service ('{service}') but is not an official domain."

return False, "URL does not match common suspicious patterns."

def analyze_text_for_scams(text_content, platform=None):
def analyze_text_for_scams(text_content, platform=None, api_key=None):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Low code quality found in analyze_text_for_scams - 25% (low-code-quality)


ExplanationThe quality score for this function is below the quality threshold of 25%.
This score is a combination of the method length, cognitive complexity and working memory.

How can you solve this?

It might be worth refactoring this function to make it shorter and more readable.

  • Reduce the function length by extracting pieces of functionality out into
    their own functions. This is the most important thing you can do - ideally a
    function should be less than 10 lines.
  • Reduce nesting, perhaps by introducing guard clauses to return early.
  • Ensure that variables are tightly scoped, so that code using related concepts
    sits together within the function rather than being scattered.

"""
Analyzes a block of text content for various scam indicators.
"""
Expand Down Expand Up @@ -103,10 +148,14 @@ def analyze_text_for_scams(text_content, platform=None):
# 2. Regex-based checks
found_urls = URL_PATTERN.findall(text_content)
for url_str in found_urls:
is_susp, reason = is_url_suspicious(url_str, platform)
is_susp, reason = is_url_suspicious(url_str, platform, api_key)
url_analysis = {"url": url_str, "is_suspicious": is_susp, "reason": reason}
if is_susp:
score += HEURISTIC_WEIGHTS.get("SUSPICIOUS_URL_PATTERN", 3.0)
# Increase score significantly if flagged by Google
if "Google Safe Browsing" in reason:
score += HEURISTIC_WEIGHTS.get("GOOGLE_SAFE_BROWSING_HIT", 10.0)
else:
score += HEURISTIC_WEIGHTS.get("SUSPICIOUS_URL_PATTERN", 3.0)
indicators_found.append(f"Suspicious URL found: {url_str} (Reason: {reason})")
urls_analyzed_details.append(url_analysis)

Expand Down
54 changes: 53 additions & 1 deletion social_media_analyzer/test_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import unittest
from unittest.mock import patch, Mock
from social_media_analyzer.scam_detector import analyze_text_for_scams

if __name__ == '__main__':
def run_manual_tests():
# Example Usage
test_cases = {
"Instagram Phishing": {
Expand Down Expand Up @@ -48,3 +50,53 @@
print("URLs Analyzed:")
for url_info in analysis_result['urls_analyzed']:
print(f" - URL: {url_info['url']}, Suspicious: {url_info['is_suspicious']}, Reason: {url_info['reason']}")

class TestScamDetector(unittest.TestCase):
@patch('social_media_analyzer.scam_detector.requests.post')
def test_google_safe_browsing_malicious(self, mock_post):
# Mock the API response for a malicious URL
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"matches": [
{
"threatType": "MALWARE",
"platformType": "ANY_PLATFORM",
"threat": {"url": "http://malware.testing.google.test/testing/malware/"},
}
]
}
mock_post.return_value = mock_response

message = "check this out http://malware.testing.google.test/testing/malware/"
result = analyze_text_for_scams(message, api_key="fake_key")

self.assertTrue(any("Google Safe Browsing" in reason for reason in result["indicators_found"]))
self.assertEqual(result['urls_analyzed'][0]['is_suspicious'], True)

@patch('social_media_analyzer.scam_detector.requests.post')
def test_google_safe_browsing_clean(self, mock_post):
# Mock the API response for a clean URL
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {}
mock_post.return_value = mock_response

message = "this is a clean site http://www.google.com"
result = analyze_text_for_scams(message, api_key="fake_key")

self.assertFalse(any("Google Safe Browsing" in reason for reason in result["indicators_found"]))
self.assertEqual(result['urls_analyzed'][0]['is_suspicious'], False)

if __name__ == '__main__':
run_manual_tests()
# Run unit tests
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestScamDetector))
runner = unittest.TextTestRunner()
print("\n--- Running Unit Tests for Google Safe Browsing Integration ---")
result = runner.run(suite)
if result.wasSuccessful():
print("All tests passed!")
else:
print("Some tests failed.")
Loading