Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion vulnerabilities/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
from vulnerabilities.severity_systems import ScoringSystem
from vulnerabilities.utils import classproperty
from vulnerabilities.utils import get_reference_id
from vulnerabilities.utils import is_commit
from vulnerabilities.utils import is_cve
from vulnerabilities.utils import nearest_patched_package
from vulnerabilities.utils import purl_to_dict
from vulnerabilities.utils import update_purl_version

Expand Down Expand Up @@ -194,6 +194,64 @@ def from_url(cls, url):
return cls(url=url)


@dataclasses.dataclass(eq=True)
@functools.total_ordering
class CodeCommitData:
commit_hash: str
vcs_url: str

commit_author: Optional[str] = None
commit_message: Optional[str] = None
commit_date: Optional[datetime.datetime] = None

def __post_init__(self):
if not self.commit_hash:
raise ValueError("Commit must have a non-empty commit_hash.")

if not is_commit(self.commit_hash):
raise ValueError("Commit must be a valid a commit_hash.")

if not self.vcs_url:
raise ValueError("Commit must have a non-empty vcs_url.")

def __lt__(self, other):
if not isinstance(other, CodeCommitData):
return NotImplemented
return self._cmp_key() < other._cmp_key()

# TODO: Add cache
def _cmp_key(self):
return (
self.commit_hash,
self.vcs_url,
self.commit_author,
self.commit_message,
self.commit_date,
)

def to_dict(self) -> dict:
"""Return a normalized dictionary representation of the commit."""
return {
"commit_hash": self.commit_hash,
"vcs_url": self.vcs_url,
"commit_author": self.commit_author,
"commit_message": self.commit_message,
"commit_date": self.commit_date,
}

@classmethod
def from_dict(cls, data: dict):
"""Create a Commit instance from a dictionary."""
commit_date = data.get("commit_date")
return cls(
commit_hash=str(data.get("commit_hash", "")),
vcs_url=data.get("vcs_url", ""),
commit_author=data.get("commit_author"),
commit_message=data.get("commit_message"),
commit_date=datetime.datetime.fromisoformat(commit_date) if commit_date else None,
)


class UnMergeablePackageError(Exception):
"""
Raised when a package cannot be merged with another one.
Expand Down Expand Up @@ -444,6 +502,8 @@ class AdvisoryData:
date_published: Optional[datetime.datetime] = None
weaknesses: List[int] = dataclasses.field(default_factory=list)
severities: List[VulnerabilitySeverity] = dataclasses.field(default_factory=list)
fixed_by_commits: List[CodeCommitData] = dataclasses.field(default_factory=list)
affected_by_commits: List[CodeCommitData] = dataclasses.field(default_factory=list)
url: Optional[str] = None
original_advisory_text: Optional[str] = None

Expand Down Expand Up @@ -476,6 +536,12 @@ def to_dict(self):
"severities": [sev.to_dict() for sev in self.severities],
"date_published": self.date_published.isoformat() if self.date_published else None,
"weaknesses": self.weaknesses,
"affected_by_commits": [
affected_by_commit.to_dict() for affected_by_commit in self.affected_by_commits
],
"fixed_by_commits": [
fixed_by_commit.to_dict() for fixed_by_commit in self.fixed_by_commits
],
"url": self.url if self.url else "",
}
return {
Expand Down Expand Up @@ -536,6 +602,8 @@ class AdvisoryDataV2:
date_published: Optional[datetime.datetime] = None
weaknesses: List[int] = dataclasses.field(default_factory=list)
url: Optional[str] = None
fixed_by_commits: List[CodeCommitData] = dataclasses.field(default_factory=list)
affected_by_commits: List[CodeCommitData] = dataclasses.field(default_factory=list)

def __post_init__(self):
if self.date_published and not self.date_published.tzinfo:
Expand All @@ -559,6 +627,12 @@ def to_dict(self):
"references": [ref.to_dict() for ref in self.references],
"date_published": self.date_published.isoformat() if self.date_published else None,
"weaknesses": self.weaknesses,
"affected_by_commits": [
affected_by_commit.to_dict() for affected_by_commit in self.affected_by_commits
],
"fixed_by_commits": [
fixed_by_commit.to_dict() for fixed_by_commit in self.fixed_by_commits
],
"url": self.url if self.url else "",
}

Expand All @@ -578,6 +652,14 @@ def from_dict(cls, advisory_data):
if date_published
else None,
"weaknesses": advisory_data["weaknesses"],
"affected_by_commits": [
CodeCommitData.from_dict(affected_by_commit)
for affected_by_commit in advisory_data["affected_by_commits"]
],
"fixed_by_commits": [
CodeCommitData.from_dict(fixed_by_commit)
for fixed_by_commit in advisory_data["fixed_by_commits"]
],
"url": advisory_data.get("url") or None,
}
return cls(**transformed)
Expand Down
2 changes: 1 addition & 1 deletion vulnerabilities/importers/curl.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def parse_advisory_data(raw_data) -> AdvisoryData:
... ]
... }
>>> parse_advisory_data(raw_data)
AdvisoryData(advisory_id='', aliases=['CVE-2024-2379'], summary='QUIC certificate check bypass with wolfSSL', affected_packages=[AffectedPackage(package=PackageURL(type='generic', namespace='curl.se', name='curl', version=None, qualifiers={}, subpath=None), affected_version_range=GenericVersionRange(constraints=(VersionConstraint(comparator='=', version=SemverVersion(string='8.6.0')),)), fixed_version=SemverVersion(string='8.7.0'))], references=[Reference(reference_id='', reference_type='', url='https://curl.se/docs/CVE-2024-2379.html', severities=[VulnerabilitySeverity(system=Cvssv3ScoringSystem(identifier='cvssv3.1', name='CVSSv3.1 Base Score', url='https://www.first.org/cvss/v3-1/', notes='CVSSv3.1 base score and vector'), value='Low', scoring_elements='', published_at=None, url=None)]), Reference(reference_id='', reference_type='', url='https://hackerone.com/reports/2410774', severities=[])], references_v2=[], date_published=datetime.datetime(2024, 3, 27, 8, 0, tzinfo=datetime.timezone.utc), weaknesses=[297], severities=[], url='https://curl.se/docs/CVE-2024-2379.json', original_advisory_text=None)
AdvisoryData(advisory_id='', aliases=['CVE-2024-2379'], summary='QUIC certificate check bypass with wolfSSL', affected_packages=[AffectedPackage(package=PackageURL(type='generic', namespace='curl.se', name='curl', version=None, qualifiers={}, subpath=None), affected_version_range=GenericVersionRange(constraints=(VersionConstraint(comparator='=', version=SemverVersion(string='8.6.0')),)), fixed_version=SemverVersion(string='8.7.0'))], references=[Reference(reference_id='', reference_type='', url='https://curl.se/docs/CVE-2024-2379.html', severities=[VulnerabilitySeverity(system=Cvssv3ScoringSystem(identifier='cvssv3.1', name='CVSSv3.1 Base Score', url='https://www.first.org/cvss/v3-1/', notes='CVSSv3.1 base score and vector'), value='Low', scoring_elements='', published_at=None, url=None)]), Reference(reference_id='', reference_type='', url='https://hackerone.com/reports/2410774', severities=[])], references_v2=[], date_published=datetime.datetime(2024, 3, 27, 8, 0, tzinfo=datetime.timezone.utc), weaknesses=[297], severities=[], fixed_by_commits=[], affected_by_commits=[], url='https://curl.se/docs/CVE-2024-2379.json', original_advisory_text=None)
"""

affected = get_item(raw_data, "affected")[0] if len(get_item(raw_data, "affected")) > 0 else []
Expand Down
124 changes: 81 additions & 43 deletions vulnerabilities/importers/osv.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple

import dateparser
from cvss.exceptions import CVSS3MalformedError
Expand All @@ -24,6 +25,7 @@
from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackage
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import CodeCommitData
from vulnerabilities.importer import Reference
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.importer import VulnerabilitySeverity
Expand Down Expand Up @@ -83,8 +85,8 @@ def parse_advisory_data(
)

for fixed_range in affected_pkg.get("ranges") or []:
fixed_version = get_fixed_versions(
fixed_range=fixed_range, raw_id=raw_id, supported_ecosystem=purl.type
fixed_version, _ = get_fixed_versions_and_commits(
ranges=fixed_range, raw_id=raw_id, supported_ecosystem=purl.type
)

for version in fixed_version:
Expand Down Expand Up @@ -131,7 +133,8 @@ def parse_advisory_data_v2(
references = get_references_v2(raw_data=raw_data)

affected_packages = []

fixed_by_commits = []
affected_by_commits = []
for affected_pkg in raw_data.get("affected") or []:
purl = get_affected_purl(affected_pkg=affected_pkg, raw_id=advisory_id)

Expand All @@ -148,11 +151,14 @@ def parse_advisory_data_v2(
fixed_versions = []
fixed_version_range = None
for fixed_range in affected_pkg.get("ranges") or []:
fixed_version = get_fixed_versions(
fixed_range=fixed_range, raw_id=advisory_id, supported_ecosystem=purl.type
fixed_version, (introduced_commits, fixed_commits) = get_fixed_versions_and_commits(
ranges=fixed_range, raw_id=advisory_id, supported_ecosystem=purl.type
)
fixed_versions.extend([v.string for v in fixed_version])

fixed_by_commits.extend(fixed_commits)
affected_by_commits.extend(introduced_commits)

fixed_version_range = (
get_fixed_version_range(fixed_versions, purl.type) if fixed_versions else None
)
Expand Down Expand Up @@ -182,29 +188,33 @@ def parse_advisory_data_v2(
affected_packages=affected_packages,
date_published=date_published,
weaknesses=weaknesses,
fixed_by_commits=fixed_by_commits,
affected_by_commits=affected_by_commits,
url=advisory_url,
original_advisory_text=advisory_text or json.dumps(raw_data, indent=2, ensure_ascii=False),
)


def extract_fixed_versions(fixed_range) -> Iterable[str]:
def extract_introduced_and_fixed(ranges) -> Tuple[List[str], List[str]]:
"""
Return a list of fixed version strings given a ``fixed_range`` mapping of
OSV data.
Return pairs of introduced and fixed versions or commit hashes given a ``ranges``
mapping of OSV data.

>>> list(extract_fixed_versions(
... {"type": "SEMVER", "events": [{"introduced": "0"},{"fixed": "1.6.0"}]}))
['1.6.0']
Both introduced and fixed fields may represent semantic versions or commit hashes.

>>> list(extract_fixed_versions(
... {"type": "ECOSYSTEM","events":[{"introduced": "0"},
... {"fixed": "1.0.0"},{"fixed": "9.0.0"}]}))
['1.0.0', '9.0.0']
>>> list(extract_introduced_and_fixed(
... {"type": "SEMVER", "events": [{"introduced": "0"}, {"fixed": "1.6.0"}]}))
[('0', None), (None, '1.6.0')]

>>> list(extract_introduced_and_fixed(
... {"type": "GIT", "events": [{"introduced": "abc123"},
... {"fixed": "def456"}]}))
[('abc123', None), (None, 'def456')]
"""
for event in fixed_range.get("events") or []:
for event in ranges.get("events") or []:
introduced = event.get("introduced")
fixed = event.get("fixed")
if fixed:
yield fixed
yield introduced, fixed


def get_published_date(raw_data):
Expand Down Expand Up @@ -350,53 +360,81 @@ def get_fixed_version_range(versions, ecosystem):
logger.error(f"Failed to create VersionRange from: {versions}: error:{e!r}")


def get_fixed_versions(fixed_range, raw_id, supported_ecosystem) -> List[Version]:
def get_fixed_versions_and_commits(
ranges, raw_id, supported_ecosystem=None
) -> Tuple[List[Version], Tuple]:
"""
Return a list of unique fixed univers Versions given a ``fixed_range``
univers VersionRange and a ``raw_id``.
Extract and return all unique fixed versions and related commit data
from a given OSV vulnerability range.

For example::
>>> get_fixed_versions(fixed_range={}, raw_id="GHSA-j3f7-7rmc-6wqj", supported_ecosystem="pypi",)
[]
>>> get_fixed_versions(
... fixed_range={"type": "ECOSYSTEM", "events": [{"fixed": "1.7.0"}], },
>>> get_fixed_versions_and_commits(ranges={}, raw_id="GHSA-j3f7-7rmc-6wqj", supported_ecosystem="pypi",)
([], ([], []))
>>> get_fixed_versions_and_commits(
... ranges={"type": "ECOSYSTEM", "events": [{"fixed": "1.7.0"}], },
... raw_id="GHSA-j3f7-7rmc-6wqj",
... supported_ecosystem="pypi",
... )
[PypiVersion(string='1.7.0')]
([PypiVersion(string='1.7.0')], ([], []))
"""
fixed_versions = []
if "type" not in fixed_range:
logger.error(f"Invalid fixed_range type for: {fixed_range} for OSV id: {raw_id!r}")
return []
introduced_commits = []
fixed_commits = []

fixed_range_type = fixed_range["type"]
if "type" not in ranges:
logger.error(f"Invalid range type for: {ranges} for OSV id: {raw_id!r}")
return [], ([], [])

fixed_range_type = ranges["type"]

version_range_class = RANGE_CLASS_BY_SCHEMES.get(supported_ecosystem)
version_class = version_range_class.version_class if version_range_class else None

for version in extract_fixed_versions(fixed_range):
if fixed_range_type == "ECOSYSTEM":
for introduced, fixed in extract_introduced_and_fixed(ranges):
if fixed_range_type == "ECOSYSTEM" and fixed:
try:
if not version_class:
raise InvalidVersion(
f"Unsupported version for ecosystem: {supported_ecosystem}"
)
fixed_versions.append(version_class(version))
fixed_versions.append(version_class(fixed))
except InvalidVersion:
logger.error(
f"Invalid version class: {version_class} - {version!r} for OSV id: {raw_id!r}"
f"Invalid version class: {version_class} - {fixed!r} for OSV id: {raw_id!r}"
)

elif fixed_range_type == "SEMVER":
elif fixed_range_type == "SEMVER" and fixed:
try:
fixed_versions.append(SemverVersion(version))
fixed_versions.append(SemverVersion(fixed))
except InvalidVersion:
logger.error(f"Invalid SemverVersion: {version!r} for OSV id: {raw_id!r}")
else:
logger.error(f"Unsupported fixed version type: {version!r} for OSV id: {raw_id!r}")
logger.error(f"Invalid SemverVersion: {fixed!r} for OSV id: {raw_id!r}")

elif fixed_range_type == "GIT" and (fixed or introduced):
repo = ranges.get("repo")
if not repo:
logger.error(f"Missing 'repo' field in ranges: {ranges} (OSV id: {raw_id!r})")
continue

# Git uses this magic hash for the empty tree
if introduced == "0":
introduced = "4b825dc642cb6eb9a060e54bf8d69288fbee4904"

if introduced:
try:
introduced_commit = CodeCommitData(commit_hash=introduced, vcs_url=repo)
introduced_commits.append(introduced_commit)
except ValueError as e:
logger.error(f"Failed to extract introduced commits: {e!r}")

if fixed:
try:
fixed_commit = CodeCommitData(commit_hash=fixed, vcs_url=repo)
fixed_commits.append(fixed_commit)
except ValueError as e:
logger.error(f"Failed to extract fixed commits: {e!r}")

# if fixed_range_type == "GIT":
# TODO add GitHubVersion univers fix_version
# logger.error(f"NotImplementedError GIT Version - {raw_id !r} - {i !r}")
else:
if fixed:
logger.error(f"Unsupported fixed version type: {ranges!r} for OSV id: {raw_id!r}")

return dedupe(fixed_versions)
return dedupe(fixed_versions), (introduced_commits, fixed_commits)
Loading