Skip to content

Commit 327c4c3

Browse files
authored
fix: Replace raw SQL string interpolation with proper SQLAlchemy parameterized APIs in PostgresKVStore (#20104)
* fix: Eliminate SQL injection vulnerabilities in PostgresKVStore This commit addresses multiple SQL injection vulnerabilities in the PostgresKVStore integration by replacing unsafe string interpolation with proper SQLAlchemy parameterized APIs. ## Vulnerabilities Fixed 1. **_create_schema_if_not_exists()** (lines 223-231) - Replaced f-string interpolation in SELECT query - Replaced f-string interpolation in CREATE SCHEMA statement - Now uses sqlalchemy.schema.CreateSchema with if_not_exists parameter 2. **put_all()** (lines 305-310) - Replaced raw SQL text() with f-string table/schema names - Now uses sqlalchemy.dialects.postgresql.insert() with proper parameterization for both identifiers and values 3. **aput_all()** (lines 347-352) - Same vulnerabilities and fixes as put_all() for async version ## Changes - Import CreateSchema at module level for cleaner code - Replace text(f"SELECT ... WHERE schema_name = '{self.schema_name}'") with CreateSchema(self.schema_name, if_not_exists=True) - Replace text(f"INSERT INTO {self.schema_name}.{tablename} ...") with insert(self._table_class).values().on_conflict_do_update() - All user data continues to be properly parameterized ## Security Impact Before: Attackers could inject arbitrary SQL via schema_name or indirectly through table_name, potentially leading to: - Data exfiltration - Data manipulation - Privilege escalation - Database schema manipulation After: All SQL identifiers and values are properly handled through SQLAlchemy's parameterization APIs, eliminating SQL injection vectors. ## Testing Added 4 new security-focused tests: - test_schema_creation_uses_safe_api: Verifies CreateSchema usage - test_put_all_uses_safe_insert: Verifies parameterized insert - test_aput_all_uses_safe_insert: Verifies async parameterized insert - test_schema_name_with_special_characters: Tests injection attempts All existing and new tests pass (6/6 tests passing). * fix: Remove double JSON serialization in insert operations SQLAlchemy automatically handles JSON serialization for JSON/JSONB column types. Manually calling json.dumps() was causing double serialization, returning JSON strings instead of dicts. This fixes the failing integration tests that expected dict values. * bump pyproject.toml version to 0.4.2
1 parent 0f101b1 commit 327c4c3

File tree

3 files changed

+233
-70
lines changed

3 files changed

+233
-70
lines changed

llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-postgres/llama_index/storage/kvstore/postgres/base.py

Lines changed: 33 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
import json
21
from typing import Any, Dict, List, Optional, Tuple, Type
32
from urllib.parse import urlparse
43
from llama_index.core.bridge.pydantic import PrivateAttr
54

65
try:
76
import sqlalchemy
87
import sqlalchemy.ext.asyncio # noqa
8+
from sqlalchemy import Column, Index, Integer, UniqueConstraint
9+
from sqlalchemy.schema import CreateSchema
910
except ImportError:
1011
raise ImportError("`sqlalchemy[asyncio]` package should be pre installed")
1112

@@ -27,7 +28,6 @@ def get_data_model(
2728
"""
2829
This part create a dynamic sqlalchemy model with a new table.
2930
"""
30-
from sqlalchemy import Column, Index, Integer, UniqueConstraint
3131
from sqlalchemy.dialects.postgresql import JSON, JSONB, VARCHAR
3232

3333
tablename = "data_%s" % index_name # dynamic table name
@@ -216,21 +216,7 @@ def _connect(self) -> Any:
216216

217217
def _create_schema_if_not_exists(self) -> None:
218218
with self._session() as session, session.begin():
219-
from sqlalchemy import text
220-
221-
# Check if the specified schema exists with "CREATE" statement
222-
check_schema_statement = text(
223-
f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{self.schema_name}'"
224-
)
225-
result = session.execute(check_schema_statement).fetchone()
226-
227-
# If the schema does not exist, then create it
228-
if not result:
229-
create_schema_statement = text(
230-
f"CREATE SCHEMA IF NOT EXISTS {self.schema_name}"
231-
)
232-
session.execute(create_schema_statement)
233-
219+
session.execute(CreateSchema(self.schema_name, if_not_exists=True))
234220
session.commit()
235221

236222
def _create_tables_if_not_exists(self) -> None:
@@ -285,40 +271,29 @@ def put_all(
285271
collection: str = DEFAULT_COLLECTION,
286272
batch_size: int = DEFAULT_BATCH_SIZE,
287273
) -> None:
288-
from sqlalchemy import text
274+
from sqlalchemy.dialects.postgresql import insert
289275

290276
self._initialize()
291277
with self._session() as session:
292278
for i in range(0, len(kv_pairs), batch_size):
293279
batch = kv_pairs[i : i + batch_size]
294280

295-
# Prepare the VALUES part of the SQL statement
296-
values_clause = ", ".join(
297-
f"(:key_{i}, :namespace_{i}, :value_{i})"
298-
for i, _ in enumerate(batch)
281+
values_to_insert = [
282+
{
283+
"key": key,
284+
"namespace": collection,
285+
"value": value,
286+
}
287+
for key, value in batch
288+
]
289+
290+
stmt = insert(self._table_class).values(values_to_insert)
291+
stmt = stmt.on_conflict_do_update(
292+
index_elements=["key", "namespace"],
293+
set_={"value": stmt.excluded.value},
299294
)
300295

301-
# Prepare the raw SQL for bulk upsert
302-
# Note: This SQL is PostgreSQL-specific. Adjust for other databases.
303-
stmt = text(
304-
f"""
305-
INSERT INTO {self.schema_name}.{self._table_class.__tablename__} (key, namespace, value)
306-
VALUES {values_clause}
307-
ON CONFLICT (key, namespace)
308-
DO UPDATE SET
309-
value = EXCLUDED.value;
310-
"""
311-
)
312-
313-
# Flatten the list of tuples for execute parameters
314-
params = {}
315-
for i, (key, value) in enumerate(batch):
316-
params[f"key_{i}"] = key
317-
params[f"namespace_{i}"] = collection
318-
params[f"value_{i}"] = json.dumps(value)
319-
320-
# Execute the bulk upsert
321-
session.execute(stmt, params)
296+
session.execute(stmt)
322297
session.commit()
323298

324299
async def aput_all(
@@ -327,40 +302,29 @@ async def aput_all(
327302
collection: str = DEFAULT_COLLECTION,
328303
batch_size: int = DEFAULT_BATCH_SIZE,
329304
) -> None:
330-
from sqlalchemy import text
305+
from sqlalchemy.dialects.postgresql import insert
331306

332307
self._initialize()
333308
async with self._async_session() as session:
334309
for i in range(0, len(kv_pairs), batch_size):
335310
batch = kv_pairs[i : i + batch_size]
336311

337-
# Prepare the VALUES part of the SQL statement
338-
values_clause = ", ".join(
339-
f"(:key_{i}, :namespace_{i}, :value_{i})"
340-
for i, _ in enumerate(batch)
312+
values_to_insert = [
313+
{
314+
"key": key,
315+
"namespace": collection,
316+
"value": value,
317+
}
318+
for key, value in batch
319+
]
320+
321+
stmt = insert(self._table_class).values(values_to_insert)
322+
stmt = stmt.on_conflict_do_update(
323+
index_elements=["key", "namespace"],
324+
set_={"value": stmt.excluded.value},
341325
)
342326

343-
# Prepare the raw SQL for bulk upsert
344-
# Note: This SQL is PostgreSQL-specific. Adjust for other databases.
345-
stmt = text(
346-
f"""
347-
INSERT INTO {self.schema_name}.{self._table_class.__tablename__} (key, namespace, value)
348-
VALUES {values_clause}
349-
ON CONFLICT (key, namespace)
350-
DO UPDATE SET
351-
value = EXCLUDED.value;
352-
"""
353-
)
354-
355-
# Flatten the list of tuples for execute parameters
356-
params = {}
357-
for i, (key, value) in enumerate(batch):
358-
params[f"key_{i}"] = key
359-
params[f"namespace_{i}"] = collection
360-
params[f"value_{i}"] = json.dumps(value)
361-
362-
# Execute the bulk upsert
363-
await session.execute(stmt, params)
327+
await session.execute(stmt)
364328
await session.commit()
365329

366330
def get(self, key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]:

llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-postgres/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ dev = [
2828

2929
[project]
3030
name = "llama-index-storage-kvstore-postgres"
31-
version = "0.4.1"
31+
version = "0.4.2"
3232
description = "llama-index kvstore postgres integration"
3333
authors = [{name = "Your Name", email = "you@example.com"}]
3434
requires-python = ">=3.9,<4.0"

llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-postgres/tests/test_storage_kvstore_postgres.py

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import pytest
22
from importlib.util import find_spec
3+
from unittest.mock import MagicMock, patch
34
from llama_index.core.storage.kvstore.types import BaseKVStore
45
from llama_index.storage.kvstore.postgres import PostgresKVStore
56

@@ -51,3 +52,201 @@ def test_initialization():
5152
assert sum(errors) == 3
5253
assert pgstore4._engine is None
5354
assert pgstore4._async_engine is None
55+
56+
57+
@pytest.mark.skipif(
58+
no_packages, reason="asyncpg, pscopg2-binary and sqlalchemy not installed"
59+
)
60+
def test_schema_creation_uses_safe_api():
61+
import sqlalchemy
62+
63+
mock_engine = MagicMock()
64+
mock_async_engine = MagicMock()
65+
66+
mock_session_instance = MagicMock()
67+
mock_session_ctx = MagicMock()
68+
mock_session_ctx.__enter__.return_value = mock_session_instance
69+
mock_session_ctx.__exit__.return_value = None
70+
71+
mock_begin_ctx = MagicMock()
72+
mock_begin_ctx.__enter__.return_value = MagicMock()
73+
mock_begin_ctx.__exit__.return_value = None
74+
mock_session_instance.begin.return_value = mock_begin_ctx
75+
76+
mock_session_factory = MagicMock(return_value=mock_session_ctx)
77+
78+
with (
79+
patch.object(sqlalchemy, "create_engine", return_value=mock_engine),
80+
patch.object(
81+
sqlalchemy.ext.asyncio,
82+
"create_async_engine",
83+
return_value=mock_async_engine,
84+
),
85+
patch("sqlalchemy.orm.sessionmaker", return_value=mock_session_factory),
86+
):
87+
pgstore = PostgresKVStore(
88+
table_name="test_table",
89+
connection_string="postgresql://user:pass@localhost/db",
90+
async_connection_string="postgresql+asyncpg://user:pass@localhost/db",
91+
schema_name="test_schema",
92+
perform_setup=False,
93+
)
94+
95+
pgstore._connect()
96+
pgstore._create_schema_if_not_exists()
97+
98+
execute_calls = mock_session_instance.execute.call_args_list
99+
assert len(execute_calls) == 1
100+
101+
from sqlalchemy.schema import CreateSchema
102+
103+
executed_statement = execute_calls[0][0][0]
104+
assert isinstance(executed_statement, CreateSchema)
105+
assert executed_statement.element == "test_schema"
106+
assert executed_statement.if_not_exists is True
107+
108+
109+
@pytest.mark.skipif(
110+
no_packages, reason="asyncpg, pscopg2-binary and sqlalchemy not installed"
111+
)
112+
def test_put_all_uses_safe_insert():
113+
import sqlalchemy
114+
115+
mock_engine = MagicMock()
116+
mock_async_engine = MagicMock()
117+
118+
mock_session_instance = MagicMock()
119+
mock_session_ctx = MagicMock()
120+
mock_session_ctx.__enter__.return_value = mock_session_instance
121+
mock_session_ctx.__exit__.return_value = None
122+
123+
mock_session_factory = MagicMock(return_value=mock_session_ctx)
124+
125+
with (
126+
patch.object(sqlalchemy, "create_engine", return_value=mock_engine),
127+
patch.object(
128+
sqlalchemy.ext.asyncio,
129+
"create_async_engine",
130+
return_value=mock_async_engine,
131+
),
132+
patch("sqlalchemy.orm.sessionmaker", return_value=mock_session_factory),
133+
):
134+
pgstore = PostgresKVStore(
135+
table_name="test_table",
136+
connection_string="postgresql://user:pass@localhost/db",
137+
async_connection_string="postgresql+asyncpg://user:pass@localhost/db",
138+
schema_name="test_schema",
139+
perform_setup=False,
140+
)
141+
pgstore._connect()
142+
pgstore._is_initialized = True
143+
144+
test_data = [("key1", {"value": "data1"}), ("key2", {"value": "data2"})]
145+
pgstore.put_all(test_data)
146+
147+
execute_calls = mock_session_instance.execute.call_args_list
148+
assert len(execute_calls) >= 1
149+
150+
executed_statement = execute_calls[-1][0][0]
151+
assert hasattr(executed_statement, "compile")
152+
153+
154+
@pytest.mark.skipif(
155+
no_packages, reason="asyncpg, pscopg2-binary and sqlalchemy not installed"
156+
)
157+
@pytest.mark.asyncio
158+
async def test_aput_all_uses_safe_insert():
159+
import sqlalchemy
160+
from unittest.mock import AsyncMock
161+
162+
mock_engine = MagicMock()
163+
mock_async_engine = MagicMock()
164+
165+
mock_session_instance = MagicMock()
166+
mock_session_ctx_manager = MagicMock()
167+
mock_session_ctx_manager.__aenter__ = AsyncMock(return_value=mock_session_instance)
168+
mock_session_ctx_manager.__aexit__ = AsyncMock(return_value=None)
169+
170+
mock_async_session_factory = MagicMock(return_value=mock_session_ctx_manager)
171+
172+
with (
173+
patch.object(sqlalchemy, "create_engine", return_value=mock_engine),
174+
patch.object(
175+
sqlalchemy.ext.asyncio,
176+
"create_async_engine",
177+
return_value=mock_async_engine,
178+
),
179+
patch("sqlalchemy.orm.sessionmaker", return_value=mock_async_session_factory),
180+
):
181+
pgstore = PostgresKVStore(
182+
table_name="test_table",
183+
connection_string="postgresql://user:pass@localhost/db",
184+
async_connection_string="postgresql+asyncpg://user:pass@localhost/db",
185+
schema_name="test_schema",
186+
perform_setup=False,
187+
)
188+
pgstore._connect()
189+
pgstore._is_initialized = True
190+
191+
mock_session_instance.execute = AsyncMock()
192+
mock_session_instance.commit = AsyncMock()
193+
194+
test_data = [("key1", {"value": "data1"}), ("key2", {"value": "data2"})]
195+
await pgstore.aput_all(test_data)
196+
197+
execute_calls = mock_session_instance.execute.call_args_list
198+
assert len(execute_calls) >= 1
199+
200+
executed_statement = execute_calls[-1][0][0]
201+
assert hasattr(executed_statement, "compile")
202+
203+
204+
@pytest.mark.skipif(
205+
no_packages, reason="asyncpg, pscopg2-binary and sqlalchemy not installed"
206+
)
207+
def test_schema_name_with_special_characters():
208+
import sqlalchemy
209+
210+
mock_engine = MagicMock()
211+
mock_async_engine = MagicMock()
212+
213+
mock_session_instance = MagicMock()
214+
mock_session_ctx = MagicMock()
215+
mock_session_ctx.__enter__.return_value = mock_session_instance
216+
mock_session_ctx.__exit__.return_value = None
217+
218+
mock_begin_ctx = MagicMock()
219+
mock_begin_ctx.__enter__.return_value = MagicMock()
220+
mock_begin_ctx.__exit__.return_value = None
221+
mock_session_instance.begin.return_value = mock_begin_ctx
222+
223+
mock_session_factory = MagicMock(return_value=mock_session_ctx)
224+
225+
with (
226+
patch.object(sqlalchemy, "create_engine", return_value=mock_engine),
227+
patch.object(
228+
sqlalchemy.ext.asyncio,
229+
"create_async_engine",
230+
return_value=mock_async_engine,
231+
),
232+
patch("sqlalchemy.orm.sessionmaker", return_value=mock_session_factory),
233+
):
234+
special_schema = "test'schema"
235+
pgstore = PostgresKVStore(
236+
table_name="test_table",
237+
connection_string="postgresql://user:pass@localhost/db",
238+
async_connection_string="postgresql+asyncpg://user:pass@localhost/db",
239+
schema_name=special_schema,
240+
perform_setup=False,
241+
)
242+
243+
pgstore._connect()
244+
pgstore._create_schema_if_not_exists()
245+
246+
execute_calls = mock_session_instance.execute.call_args_list
247+
assert len(execute_calls) == 1
248+
249+
from sqlalchemy.schema import CreateSchema
250+
251+
executed_statement = execute_calls[0][0][0]
252+
assert isinstance(executed_statement, CreateSchema)

0 commit comments

Comments
 (0)