Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d6ea161
Ignore .env files in any directory
sylwia-budzynska Nov 27, 2025
bad9ab4
Add CodeQL-Python MCP tool
sylwia-budzynska Nov 27, 2025
fe60ea1
Add auditer
sylwia-budzynska Nov 27, 2025
72a154c
Add CodeQL-Python taskflow
sylwia-budzynska Nov 27, 2025
4af4a65
Apply suggestions from code review
sylwia-budzynska Nov 27, 2025
ab3ec17
Apply suggestions from code review
sylwia-budzynska Nov 28, 2025
a864ffa
Use local resources in CodeQL for Python MCP
sylwia-budzynska Dec 1, 2025
b13e365
Merge branch 'codeql-python' of https://github.com/GitHubSecurityLab/…
sylwia-budzynska Dec 1, 2025
8c2f42f
Refactor process_repo into utils
sylwia-budzynska Dec 1, 2025
341fadd
Fix import
sylwia-budzynska Dec 1, 2025
5be4848
Rename Source type field to source_type
sylwia-budzynska Dec 1, 2025
6705d60
Apply suggestions from code review
sylwia-budzynska Dec 1, 2025
51b91b9
Update src/seclab_taskflows/taskflows/audit/remote_sources_local.yaml
sylwia-budzynska Dec 1, 2025
3d1fd19
Update src/seclab_taskflows/mcp_servers/codeql_python/mcp_server.py
sylwia-budzynska Dec 1, 2025
d274b99
Update src/seclab_taskflows/mcp_servers/codeql_python/codeql_sqlite_m…
sylwia-budzynska Dec 1, 2025
a3261aa
Update src/seclab_taskflows/taskflows/audit/remote_sources_local.yaml
sylwia-budzynska Dec 1, 2025
2b50b82
Automatically install Python QL pack
sylwia-budzynska Dec 3, 2025
009c3a2
Merge branch 'codeql-python' of https://github.com/GitHubSecurityLab/…
sylwia-budzynska Dec 3, 2025
29eb221
Use generated data dirs
sylwia-budzynska Dec 3, 2025
eb5a0ff
Use DATA_DIR path for storing sqlite db
sylwia-budzynska Dec 3, 2025
9385063
Add Fields to MCP tool params
sylwia-budzynska Dec 3, 2025
3ad757d
Fix env paths handling
sylwia-budzynska Dec 3, 2025
22ba2d2
Add license
sylwia-budzynska Dec 3, 2025
621deb8
Add process_repo docstring
sylwia-budzynska Dec 3, 2025
d72359b
Update src/seclab_taskflows/mcp_servers/codeql_python/mcp_server.py
sylwia-budzynska Dec 3, 2025
b30cbab
Update src/seclab_taskflows/taskflows/audit/remote_sources_local.yaml
sylwia-budzynska Dec 3, 2025
7b37b41
Update src/seclab_taskflows/mcp_servers/codeql_python/README.md
sylwia-budzynska Dec 3, 2025
44941a5
Improve error handling
sylwia-budzynska Dec 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/seclab_taskflows/mcp_servers/codeql_python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
Queries in support of the CodeQL MCP Server are maintained as query packs.

If you add your own queries, please follow established conventions for normal CodeQL query pack development.

To run the CodeQL for Python server:
- create a codespace, preferably with more cores
- install CodeQL extension for VS Code
- press `Ctrl/Cmd + Shift + P` and type "CodeQL: Install Pack Dependencies". Choose "sylwia-budzynska/mcp-python" and press "OK".
- find the path to the codeql binary, which comes preinstalled with the VS Code CodeQL extension, with the command:
```bash
find ~ -type f -name codeql -executable 2>/dev/null
```
It will most likely look similar to this:
```
/home/codespace/.vscode-remote/data/User/globalStorage/github.vscode-codeql/distribution1/codeql/codeql
```
- create a folder named 'data'
- create or update your `.env` file in the root of this project with values for:
```
COPILOT_TOKEN= # a fine-grained GitHub personal access token with permission for "copilot chat"
CODEQL_DBS_BASE_PATH="/workspaces/seclab-taskflows/data/codeql_databases" #path to folder with your CodeQL databases

# Example values for a local setup, run with `python -m seclab_taskflow_agent -t seclab_taskflows.taskflows.audit.remote_sources_local`
MEMCACHE_STATE_DIR="/workspaces/seclab-taskflows/data" # path to folder for storing the memcache database
DATA_DIR="/workspaces/seclab-taskflows/data" # path to folder for storing the codeql_sqlite databases and all other data
GITHUB_PERSONAL_ACCESS_TOKEN= # can be the same token as COPILOT_TOKEN. Or another one, with access e.g. to private repositories
CODEQL_CLI= # output of command `find ~ -type f -name codeql -executable 2>/dev/null`

# Example docker env run with ./run_seclab_agent.sh [...]
# CODEQL_CLI="codeql"
# CODEQL_DBS_BASE_PATH="/app/data/codeql_databases"
# MEMCACHE_STATE_DIR="/app/data"
# DATA_DIR="/app/data"
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# SPDX-FileCopyrightText: 2025 GitHub
# SPDX-License-Identifier: MIT

from sqlalchemy import Text
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
from typing import Optional

class Base(DeclarativeBase):
pass


class Source(Base):
__tablename__ = 'source'

id: Mapped[int] = mapped_column(primary_key=True)
repo: Mapped[str]
source_location: Mapped[str]
line: Mapped[int]
source_type: Mapped[str]
notes: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

def __repr__(self):
return (f"<Source(id={self.id}, repo={self.repo}, "
f"location={self.source_location}, line={self.line}, source_type={self.source_type}, "
f"notes={self.notes})>")
227 changes: 227 additions & 0 deletions src/seclab_taskflows/mcp_servers/codeql_python/mcp_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# SPDX-FileCopyrightText: 2025 GitHub
# SPDX-License-Identifier: MIT


import logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='logs/mcp_codeql_python.log',
filemode='a'
)
from seclab_taskflow_agent.mcp_servers.codeql.client import run_query, _debug_log
# from seclab_taskflow_agent.path_utils import mcp_data_dir

from pydantic import Field
#from mcp.server.fastmcp import FastMCP, Context
from fastmcp import FastMCP # use FastMCP 2.0
from pathlib import Path
import os
import csv
import json
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
import subprocess
import importlib.resources

from .codeql_sqlite_models import Base, Source
from ..utils import process_repo

MEMORY = Path(os.getenv('DATA_DIR', default='/app/data'))
CODEQL_DBS_BASE_PATH = Path(os.getenv('CODEQL_DBS_BASE_PATH', default='/app/data'))
# MEMORY = mcp_data_dir('seclab-taskflows', 'codeql', 'DATA_DIR')
# CODEQL_DBS_BASE_PATH = mcp_data_dir('seclab-taskflows', 'codeql', 'CODEQL_DBS_BASE_PATH')

mcp = FastMCP("CodeQL-Python")

# tool name -> templated query lookup for supported languages
TEMPLATED_QUERY_PATHS = {
# to add a language, port the templated query pack and add its definition here
'python': {
'remote_sources': 'queries/mcp-python/remote_sources.ql'
}
}


def source_to_dict(result):
return {
"source_id": result.id,
"repo": result.repo,
"source_location": result.source_location,
"line": result.line,
"source_type": result.source_type,
"notes": result.notes
}

def _resolve_query_path(language: str, query: str) -> Path:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should refactor _resolve_query_path and resolve_db_path in seclab-taskflow-agent to take variables TEMPLATE_QUERY_PATHS and CODEQL_DBS_BASE_PATH instead of using globals and then we can reuse those here via import, but that'll be for another PR and also need to change seclab-taskflow-agent

global TEMPLATED_QUERY_PATHS
if language not in TEMPLATED_QUERY_PATHS:
raise RuntimeError(f"Error: Language `{language}` not supported!")
query_path = TEMPLATED_QUERY_PATHS[language].get(query)
if not query_path:
raise RuntimeError(f"Error: query `{query}` not supported for `{language}`!")
return Path(query_path)


def _resolve_db_path(relative_db_path: str | Path):
global CODEQL_DBS_BASE_PATH
# path joins will return "/B" if "/A" / "////B" etc. as well
# not windows compatible and probably needs additional hardening
relative_db_path = str(relative_db_path).strip().lstrip('/')
relative_db_path = Path(relative_db_path)
absolute_path = (CODEQL_DBS_BASE_PATH / relative_db_path).resolve()
if not str(absolute_path).startswith(str(CODEQL_DBS_BASE_PATH.resolve())):
raise RuntimeError(f"Error: Database path {absolute_path} is outside the base path {CODEQL_DBS_BASE_PATH}")
if not absolute_path.is_dir():
_debug_log(f"Database path not found: {absolute_path}")
raise RuntimeError(f"Error: Database not found at {absolute_path}!")
return str(absolute_path)

# This sqlite database is specifically made for CodeQL for Python MCP.
class CodeqlSqliteBackend:
def __init__(self, memcache_state_dir: str):
self.memcache_state_dir = memcache_state_dir
if not Path(self.memcache_state_dir).exists():
db_dir = 'sqlite://'
else:
db_dir = f'sqlite:///{self.memcache_state_dir}/codeql_sqlite.db'
self.engine = create_engine(db_dir, echo=False)
Base.metadata.create_all(self.engine, tables=[Source.__table__])


def store_new_source(self, repo, source_location, line, source_type, notes, update = False):
with Session(self.engine) as session:
existing = session.query(Source).filter_by(repo = repo, source_location = source_location, line = line).first()
if existing:
existing.notes = (existing.notes or "") + notes
session.commit()
return f"Updated notes for source at {source_location}, line {line} in {repo}."
else:
if update:
return f"No source exists at repo {repo}, location {source_location}, line {line} to update."
new_source = Source(repo = repo, source_location = source_location, line = line, source_type = source_type, notes = notes)
session.add(new_source)
session.commit()
return f"Added new source for {source_location} in {repo}."

def get_sources(self, repo):
with Session(self.engine) as session:
results = session.query(Source).filter_by(repo = repo).all()
sources = [source_to_dict(source) for source in results]
return sources


# our query result format is: "human readable template {val0} {val1},'key0,key1',val0,val1"
def _csv_parse(raw):
results = []
reader = csv.reader(raw.strip().splitlines())
try:
for i, row in enumerate(reader):
if i == 0:
continue
# col1 has what we care about, but offer flexibility
keys = row[1].split(',')
this_obj = {'description': row[0].format(*row[2:])}
for j, k in enumerate(keys):
this_obj[k.strip()] = row[j + 2]
results.append(this_obj)
except (csv.Error, IndexError, ValueError) as e:
return ["Error: CSV parsing error: " + str(e)]
return results


def _run_query(query_name: str, database_path: str, language: str, template_values: dict):
"""Run a CodeQL query and return the results"""

try:
database_path = _resolve_db_path(database_path)
except RuntimeError:
return f"The database path for {database_path} could not be resolved"
try:
query_path = _resolve_query_path(language, query_name)
except RuntimeError:
return f"The query {query_name} is not supported for language: {language}"
try:
csv = run_query(Path(__file__).parent.resolve() /
query_path,
database_path,
fmt='csv',
template_values=template_values,
log_stderr=True)
return _csv_parse(csv)
except Exception as e:
return f"The query {query_name} encountered an error: {e}"

backend = CodeqlSqliteBackend(MEMORY)

@mcp.tool()
def remote_sources(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
database_path: str = Field(description="The CodeQL database path."),
language: str = Field(description="The language used for the CodeQL database.")):
"""List all remote sources and their locations in a CodeQL database, then store the results in a database."""

repo = process_repo(owner, repo)
results = _run_query('remote_sources', database_path, language, {})

# Check if results is an error (list of strings) or valid data (list of dicts)
if isinstance(results, str):
return f"Error: {results}"
if results and isinstance(results[0], str):
return f"Error: {results[0]}"

# Store each result as a source
stored_count = 0
for result in results:
backend.store_new_source(
repo=repo,
source_location=result.get('location', ''),
source_type=result.get('source', ''),
line=int(result.get('line', '0')),
notes=None, #result.get('description', ''),
update=False
)
stored_count += 1

return f"Stored {stored_count} remote sources in {repo}."

@mcp.tool()
def fetch_sources(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository")):
"""
Fetch all sources from the repo
"""
repo = process_repo(owner, repo)
return json.dumps(backend.get_sources(repo))

@mcp.tool()
def add_source_notes(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository"),
source_location: str = Field(description="The path to the file"),
line: int = Field(description="The line number of the source"),
notes: str = Field(description="The notes to append to this source")):
"""
Add new notes to an existing source. The notes will be appended to any existing notes.
"""
repo = process_repo(owner, repo)
return backend.store_new_source(repo = repo, source_location = source_location, line = line, source_type = "", notes = notes, update=True)

@mcp.tool()
def clear_codeql_repo(owner: str = Field(description="The owner of the GitHub repository"),
repo: str = Field(description="The name of the GitHub repository")):
"""
Clear all data for a given repo from the database
"""
repo = process_repo(owner, repo)
with Session(backend.engine) as session:
deleted_sources = session.query(Source).filter_by(repo = repo).delete()
session.commit()
return f"Cleared {deleted_sources} sources from repo {repo}."

if __name__ == "__main__":
# Check if codeql/python-all pack is installed, if not install it
if not os.path.isdir('/.codeql/packages/codeql/python-all'):
pack_path = importlib.resources.files('seclab_taskflows.mcp_servers.codeql_python.queries').joinpath('mcp-python')
print(f"Installing CodeQL pack from {pack_path}")
subprocess.run(["codeql", "pack", "install", pack_path])
mcp.run(show_banner=False, transport="http", host="127.0.0.1", port=9998)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
lockVersion: 1.0.0
dependencies:
codeql/concepts:
version: 0.0.8
codeql/controlflow:
version: 2.0.18
codeql/dataflow:
version: 2.0.18
codeql/mad:
version: 1.0.34
codeql/python-all:
version: 4.1.0
codeql/regex:
version: 1.0.34
codeql/ssa:
version: 2.0.10
codeql/threat-models:
version: 1.0.34
codeql/tutorial:
version: 1.0.34
codeql/typetracking:
version: 2.0.18
codeql/util:
version: 2.0.21
codeql/xml:
version: 1.0.34
codeql/yaml:
version: 1.0.34
compiled: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
library: false
warnOnImplicitThis: false
name: sylwia-budzynska/mcp-python
version: 0.0.1
dependencies:
codeql/python-all: ^4.1.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/**
* This is an automatically generated file
* @name Hello world
* @kind problem
* @problem.severity warning
* @id python/example/hello-world
*/

import python

from File f
select f, "Hello, world!"
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* @id mcp-python/remote-sources
* @name Python Remote Sources
* @description Identifies nodes that act as remote sources in Python code, along with their locations.
* @tags source, location
*/
import python
import semmle.python.dataflow.new.RemoteFlowSources

// string normalizeLocation(Location l) {
// result = l.getFile().getRelativePath() + ":" + l.getStartLine().toString() + ":" + l.getStartColumn().toString()
// + ":" + l.getEndLine().toString() + ":" + l.getEndColumn().toString()
// }

from RemoteFlowSource source
select
"Remote source {0} is defined at {1} line {2}",
"source,location,line",
source.getSourceType(),
source.getLocation().getFile().getRelativePath(),
source.getLocation().getStartLine().toString()
Loading