Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions cve_bin_tool/cvedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
DBNAME = "cve.db"
OLD_CACHE_DIR = Path("~") / ".cache" / "cvedb"

UNKNOWN_METRIC_ID = 0
EPSS_METRIC_ID = 1
CVSS_2_METRIC_ID = 2
CVSS_3_METRIC_ID = 3
Expand Down Expand Up @@ -416,6 +417,8 @@ def init_database(self) -> None:
for table in self.TABLE_SCHEMAS:
cursor.execute(self.TABLE_SCHEMAS[table])

# Ensure the UNKNOWN metric exists
self.ensure_unknown_metric(cursor)
# add indexes
for index in self.INDEXES:
cursor.execute(self.INDEXES[index])
Expand Down Expand Up @@ -619,6 +622,7 @@ def populate_metrics(self):
# Insert a row without specifying cve_metrics_id
insert_metrics = self.INSERT_QUERIES["insert_metrics"]
data = [
(UNKNOWN_METRIC_ID, "UNKNOWN"),
(EPSS_METRIC_ID, "EPSS"),
(CVSS_2_METRIC_ID, "CVSS-2"),
(CVSS_3_METRIC_ID, "CVSS-3"),
Expand All @@ -632,15 +636,15 @@ def populate_metrics(self):
def metric_finder(self, cursor, cve):
"""
SQL query to retrieve the metrics_name based on the metrics_id
currently cve["CVSS_version"] return 2,3 based on there version and they are mapped accordingly to there metrics name in metrics table.
currently cve["CVSS_version"] return 2,3 based on their version and they are mapped accordingly to their metrics name in metrics table.
"""
query = """
SELECT metrics_id FROM metrics
WHERE metrics_id=?
"""
metric = None
if cve["CVSS_version"] == "unknown":
metric = "unknown"
metric = 0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use UNKNOWN_METRIC_ID just to avoid magic numbers it increases the readability.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

else:
cursor.execute(query, [cve.get("CVSS_version")])
# Fetch all the results of the query and use 'map' to extract only the 'metrics_name' from the result
Expand Down Expand Up @@ -1173,8 +1177,18 @@ def fetch_from_mirror(self, mirror, pubkey, ignore_signature, log_signature_erro

@contextlib.contextmanager
def with_cursor(self):
"""Context manager for database cursor."""
cursor = self.db_open_and_get_cursor()
try:
yield cursor
finally:
self.db_close()

def ensure_unknown_metric(self, cursor):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not moving this to populate_metrics and ensure that the table metrics gets also monitored for changes like in example cve_range in get_cvelist_if_stale.

"""Ensure that the UNKNOWN metric exists in the metrics table."""
insert_metrics = self.INSERT_QUERIES["insert_metrics"]
try:
cursor.execute(insert_metrics, (UNKNOWN_METRIC_ID, "UNKNOWN"))
except sqlite3.IntegrityError:
# The metric already exists, no action needed
pass
Loading