From 8120a4fccc1dc1a0160c69ab5a89a917640bc326 Mon Sep 17 00:00:00 2001 From: Brian Sam-Bodden Date: Wed, 1 Oct 2025 15:36:06 -0700 Subject: [PATCH 1/5] dependencies: upgrade to Redis VL 0.9.0 in poetry.lock --- poetry.lock | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/poetry.lock b/poetry.lock index 40ff552..4b9a1d4 100644 --- a/poetry.lock +++ b/poetry.lock @@ -963,8 +963,8 @@ httpx = ">=0.23.0,<1" orjson = {version = ">=3.9.14,<4.0.0", markers = "platform_python_implementation != \"PyPy\""} packaging = ">=23.2" pydantic = [ - {version = ">=1,<3", markers = "python_full_version < \"3.12.4\""}, {version = ">=2.7.4,<3.0.0", markers = "python_full_version >= \"3.12.4\""}, + {version = ">=1,<3", markers = "python_full_version < \"3.12.4\""}, ] requests = ">=2,<3" requests-toolbelt = ">=1.0.0,<2.0.0" @@ -1856,14 +1856,14 @@ ocsp = ["cryptography (>=36.0.1)", "pyopenssl (>=20.0.1)", "requests (>=2.31.0)" [[package]] name = "redisvl" -version = "0.8.0" +version = "0.9.0" description = "Python client library and CLI for using Redis as a vector database" optional = false python-versions = "<3.14,>=3.9" groups = ["main"] files = [ - {file = "redisvl-0.8.0-py3-none-any.whl", hash = "sha256:365c31819224b3e4e9acca1ed2ac9eed347d4ee4ca8d822010dbd51a8b725705"}, - {file = "redisvl-0.8.0.tar.gz", hash = "sha256:00645cf126039ee4d734a1ff273cc4e8fea59118f7790625eeff510fce08b0d4"}, + {file = "redisvl-0.9.0-py3-none-any.whl", hash = "sha256:1c2f69a5f6eed49b591a24295437733929ecace345ea60488f0dd12edaecef06"}, + {file = "redisvl-0.9.0.tar.gz", hash = "sha256:aba3ef653c2830c859e7fab6ed78ebafa53d02921f87e43901b6d7e7d58cf9e1"}, ] [package.dependencies] @@ -2165,6 +2165,7 @@ description = "A lil' TOML parser" optional = false python-versions = ">=3.8" groups = ["main", "dev"] +markers = "python_version < \"3.11\"" files = [ {file = "tomli-2.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249"}, {file = "tomli-2.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6"}, @@ -2199,7 +2200,6 @@ files = [ {file = "tomli-2.2.1-py3-none-any.whl", hash = "sha256:cb55c73c5f4408779d0cf3eef9f762b9c9f147a77de7b258bef0a5628adc85cc"}, {file = "tomli-2.2.1.tar.gz", hash = "sha256:cd45e1dc79c835ce60f7404ec8119f2eb06d38b1deba146f07ced3bbc44505ff"}, ] -markers = {main = "python_version < \"3.11\"", dev = "python_full_version <= \"3.11.0a6\""} [[package]] name = "tqdm" From e96d9acea759b35a928e18ece03fde01965fe2c9 Mon Sep 17 00:00:00 2001 From: Brian Sam-Bodden Date: Wed, 1 Oct 2025 15:45:09 -0700 Subject: [PATCH 2/5] fix: remove incorrect skip decorator from alist deserialization test The test_async_alist_with_deserialization test was incorrectly marked as skipped with a misleading comment claiming that alist() deserialization was not addressed in PR #87. However, the fix was already implemented in the alist() method via _recursive_deserialize() call at line 742 of aio.py. This commit: - Removes the @pytest.mark.skip decorator - Updates the docstring to accurately reflect that the fix is implemented - Ensures the test runs and validates proper message deserialization The test passes successfully, confirming that alist() properly deserializes LangChain messages when listing checkpoints. --- tests/test_issue_87_async_deserialization.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/tests/test_issue_87_async_deserialization.py b/tests/test_issue_87_async_deserialization.py index 7077acc..6759f92 100644 --- a/tests/test_issue_87_async_deserialization.py +++ b/tests/test_issue_87_async_deserialization.py @@ -478,16 +478,12 @@ async def test_async_mixed_content_types(redis_url: str): @pytest.mark.asyncio -@pytest.mark.skip( - reason="alist() deserialization not addressed in PR #87 - needs separate fix" -) async def test_async_alist_with_deserialization(redis_url: str): """Test that alist() also properly deserializes messages. - NOTE: This test is skipped because PR #87 only addresses aget_tuple() - deserialization. The alist() method also needs the same fix applied - to properly deserialize LangChain messages. This should be addressed - in a follow-up PR. + This test verifies that the alist() method properly deserializes + LangChain messages when listing checkpoints, matching the behavior + of aget_tuple(). """ async with AsyncRedisSaver.from_conn_string(redis_url) as saver: thread_id = str(uuid4()) From c9712519d62949c929e8e0b2f6f8b33ff0207018 Mon Sep 17 00:00:00 2001 From: Brian Sam-Bodden Date: Wed, 1 Oct 2025 16:46:10 -0700 Subject: [PATCH 3/5] dependencies: upgrade to Redis VL 0.9.1 in poetry.lock --- poetry.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/poetry.lock b/poetry.lock index 4b9a1d4..ba403d4 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1856,14 +1856,14 @@ ocsp = ["cryptography (>=36.0.1)", "pyopenssl (>=20.0.1)", "requests (>=2.31.0)" [[package]] name = "redisvl" -version = "0.9.0" +version = "0.9.1" description = "Python client library and CLI for using Redis as a vector database" optional = false python-versions = "<3.14,>=3.9" groups = ["main"] files = [ - {file = "redisvl-0.9.0-py3-none-any.whl", hash = "sha256:1c2f69a5f6eed49b591a24295437733929ecace345ea60488f0dd12edaecef06"}, - {file = "redisvl-0.9.0.tar.gz", hash = "sha256:aba3ef653c2830c859e7fab6ed78ebafa53d02921f87e43901b6d7e7d58cf9e1"}, + {file = "redisvl-0.9.1-py3-none-any.whl", hash = "sha256:aaec441cfcb37ce7cced028dcf9a748337a27422dcaf1b494a4c6198f577dcf4"}, + {file = "redisvl-0.9.1.tar.gz", hash = "sha256:a735ecf3238e804800b54a513b85a8cf4300fe6d111fb055bd75528f77dd5419"}, ] [package.dependencies] From 39ee6a9b51abedb0bc17669d388e2c995d3114fc Mon Sep 17 00:00:00 2001 From: Brian Sam-Bodden Date: Wed, 1 Oct 2025 16:48:23 -0700 Subject: [PATCH 4/5] test: add regression tests for issue #96 cluster mode AttributeError (#96) Issue #96 reported AttributeError: 'ClusterPipeline' object has no attribute 'nodes_manager' when using AsyncRedisStore with Redis Cluster. The issue was fixed upstream in redisvl 0.9.0 via redisvl issue #365, which added a safe wrapper around get_protocol_version() to handle ClusterPipeline objects that don't have the nodes_manager attribute. This commit adds comprehensive regression tests to prevent this issue from recurring: - test_async_store_batch_put_no_attribute_error: Tests standalone mode - test_async_store_cluster_mode_batch_put: Tests cluster mode (exact scenario from #96) - test_async_store_large_batch_cluster_mode: Stress tests with 50 items - test_async_store_update_operations_cluster_mode: Tests delete+insert operations All tests pass with redisvl 0.9.1, confirming the fix works correctly. Fixes #96 --- tests/test_issue_96_cluster_pipeline.py | 161 ++++++++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 tests/test_issue_96_cluster_pipeline.py diff --git a/tests/test_issue_96_cluster_pipeline.py b/tests/test_issue_96_cluster_pipeline.py new file mode 100644 index 0000000..5c79127 --- /dev/null +++ b/tests/test_issue_96_cluster_pipeline.py @@ -0,0 +1,161 @@ +"""Tests for issue #96: AttributeError with AsyncRedisStore in cluster mode. + +Issue: https://github.com/redis-developer/langgraph-redis/issues/96 + +The issue was caused by redisvl's get_protocol_version() accessing nodes_manager +attribute on ClusterPipeline objects which don't always have this attribute. + +Fixed in redisvl 0.9.0 via issue #365. + +This test suite focuses on AsyncRedisStore as that was where the issue was +originally reported. The issue occurred in redisvl's SearchIndex.load() method +which is used by both sync and async stores, so testing async coverage is +sufficient. +""" + +import pytest + +from langgraph.store.redis import AsyncRedisStore + + +@pytest.mark.asyncio +async def test_async_store_batch_put_no_attribute_error(redis_url: str) -> None: + """Test that AsyncRedisStore batch put operations don't raise AttributeError. + + This is the primary test for issue #96 which was originally reported + with AsyncRedisStore. + """ + store = AsyncRedisStore(redis_url, cluster_mode=False) + await store.setup() + + try: + namespace = ("test", "issue_96_async") + + # Put multiple items to trigger batch operations + items = [ + (f"async_item_{i}", {"data": f"async_value_{i}", "index": i}) + for i in range(10) + ] + + for key, value in items: + await store.aput(namespace, key, value) + + # Verify items were stored correctly + retrieved = await store.aget(namespace, "async_item_0") + assert retrieved is not None + assert retrieved.value["data"] == "async_value_0" + + finally: + # Cleanup + for key, _ in items: + await store.adelete(namespace, key) + await store._redis.aclose() + + +@pytest.mark.asyncio +async def test_async_store_cluster_mode_batch_put(redis_url: str) -> None: + """Test AsyncRedisStore with cluster_mode=True for batch operations. + + This is the exact scenario from issue #96 - using AsyncRedisStore with + cluster mode enabled, which should trigger the code path that was causing + the AttributeError about nodes_manager. + """ + store = AsyncRedisStore(redis_url, cluster_mode=True) + await store.setup() + + try: + namespace = ("test", "issue_96_async_cluster") + + # Put multiple items to trigger batch operations + items = [ + ( + f"async_cluster_item_{i}", + {"data": f"async_cluster_value_{i}", "index": i}, + ) + for i in range(10) + ] + + # This was raising AttributeError: 'ClusterPipeline' object has no attribute 'nodes_manager' + # Should work now with redisvl 0.9.0 + for key, value in items: + await store.aput(namespace, key, value) + + # Verify items were stored + retrieved = await store.aget(namespace, "async_cluster_item_0") + assert retrieved is not None + assert retrieved.value["data"] == "async_cluster_value_0" + + finally: + # Cleanup + for key, _ in items: + await store.adelete(namespace, key) + await store._redis.aclose() + + +@pytest.mark.asyncio +async def test_async_store_large_batch_cluster_mode(redis_url: str) -> None: + """Test AsyncRedisStore with larger batch to stress test the fix. + + This ensures the fix works with more substantial batch operations. + """ + store = AsyncRedisStore(redis_url, cluster_mode=True) + await store.setup() + + try: + namespace = ("test", "issue_96_large_batch") + + # Put a larger batch of items + items = [ + (f"large_batch_item_{i}", {"data": f"large_batch_value_{i}", "index": i}) + for i in range(50) + ] + + # This should handle larger batches without AttributeError + for key, value in items: + await store.aput(namespace, key, value) + + # Verify some items were stored + retrieved_first = await store.aget(namespace, "large_batch_item_0") + assert retrieved_first is not None + assert retrieved_first.value["index"] == 0 + + retrieved_last = await store.aget(namespace, "large_batch_item_49") + assert retrieved_last is not None + assert retrieved_last.value["index"] == 49 + + finally: + # Cleanup + for key, _ in items: + await store.adelete(namespace, key) + await store._redis.aclose() + + +@pytest.mark.asyncio +async def test_async_store_update_operations_cluster_mode(redis_url: str) -> None: + """Test AsyncRedisStore update operations in cluster mode. + + Updates trigger both delete and insert operations in batch_put_ops, + exercising the code path that was problematic in issue #96. + """ + store = AsyncRedisStore(redis_url, cluster_mode=True) + await store.setup() + + try: + namespace = ("test", "issue_96_updates") + + # Initial put + await store.aput(namespace, "update_test", {"version": 1, "data": "initial"}) + + # Update the same key - this triggers delete + insert in batch_put_ops + await store.aput(namespace, "update_test", {"version": 2, "data": "updated"}) + + # Verify the update worked + retrieved = await store.aget(namespace, "update_test") + assert retrieved is not None + assert retrieved.value["version"] == 2 + assert retrieved.value["data"] == "updated" + + finally: + # Cleanup + await store.adelete(namespace, "update_test") + await store._redis.aclose() From 00e6c15c468e10ae3f0b73702407cc0278b8e9dc Mon Sep 17 00:00:00 2001 From: Brian Sam-Bodden Date: Wed, 1 Oct 2025 17:58:58 -0700 Subject: [PATCH 5/5] feat: add custom store prefix support and fix runtime warnings (#99) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add store_prefix and vector_prefix parameters to RedisStore and AsyncRedisStore - Enable multiple isolated stores in same Redis instance with custom prefixes - Replace module-level STORE_PREFIX constants with instance attributes - Fix RuntimeWarning by moving set_client_info() from BaseRedisStore to concrete implementations - Fix DeprecationWarning by replacing datetime.utcnow() with datetime.now(timezone.utc) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../checkpoint/redis/message_exporter.py | 4 +- langgraph/store/redis/__init__.py | 52 ++-- langgraph/store/redis/aio.py | 118 ++++++--- langgraph/store/redis/base.py | 63 ++++- tests/test_issue_99_custom_store_prefixes.py | 236 ++++++++++++++++++ 5 files changed, 406 insertions(+), 67 deletions(-) create mode 100644 tests/test_issue_99_custom_store_prefixes.py diff --git a/langgraph/checkpoint/redis/message_exporter.py b/langgraph/checkpoint/redis/message_exporter.py index 34da74c..b0ba3d3 100644 --- a/langgraph/checkpoint/redis/message_exporter.py +++ b/langgraph/checkpoint/redis/message_exporter.py @@ -1,6 +1,6 @@ """Message exporter for extracting conversation messages from checkpoints.""" -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Protocol import orjson @@ -162,5 +162,5 @@ def export_thread(self, thread_id: str) -> Dict[str, Any]: return { "thread_id": thread_id, "messages": messages, - "export_timestamp": datetime.utcnow().isoformat(), + "export_timestamp": datetime.now(timezone.utc).isoformat(), } diff --git a/langgraph/store/redis/__init__.py b/langgraph/store/redis/__init__.py index 5eea3fe..b79a1ec 100644 --- a/langgraph/store/redis/__init__.py +++ b/langgraph/store/redis/__init__.py @@ -33,8 +33,6 @@ from langgraph.store.redis.aio import AsyncRedisStore from langgraph.store.redis.base import ( REDIS_KEY_SEPARATOR, - STORE_PREFIX, - STORE_VECTOR_PREFIX, BaseRedisStore, RedisDocument, _decode_ns, @@ -86,11 +84,21 @@ def __init__( index: Optional[IndexConfig] = None, ttl: Optional[TTLConfig] = None, cluster_mode: Optional[bool] = None, + store_prefix: str = "store", + vector_prefix: str = "store_vectors", ) -> None: BaseStore.__init__(self) BaseRedisStore.__init__( - self, conn, index=index, ttl=ttl, cluster_mode=cluster_mode + self, + conn, + index=index, + ttl=ttl, + cluster_mode=cluster_mode, + store_prefix=store_prefix, + vector_prefix=vector_prefix, ) + # Set client info for monitoring (sync store can call this safely) + self.set_client_info() # Detection will happen in setup() @classmethod @@ -101,14 +109,22 @@ def from_conn_string( *, index: Optional[IndexConfig] = None, ttl: Optional[TTLConfig] = None, + store_prefix: str = "store", + vector_prefix: str = "store_vectors", ) -> Iterator[RedisStore]: """Create store from Redis connection string.""" client = None try: client = RedisConnectionFactory.get_redis_connection(conn_string) - store = cls(client, index=index, ttl=ttl) - # Client info will already be set in __init__, but we set it up here - # to make the method behavior consistent with AsyncRedisStore + store = cls( + client, + index=index, + ttl=ttl, + store_prefix=store_prefix, + vector_prefix=vector_prefix, + ) + # Client info is set in __init__, but set it again here to ensure + # it's available even if called before setup() store.set_client_info() yield store finally: @@ -259,7 +275,7 @@ def _batch_get_ops( # Also add vector keys for the same document doc_uuid = doc_id.split(":")[-1] vector_key = ( - f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}" + f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}" ) refresh_keys_by_idx[idx].append(vector_key) @@ -338,7 +354,9 @@ def _batch_put_ops( doc_ids[(namespace, op.key)] = generated_doc_id # Track TTL for this document if specified if hasattr(op, "ttl") and op.ttl is not None: - main_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{generated_doc_id}" + main_key = ( + f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{generated_doc_id}" + ) ttl_tracking[main_key] = ([], op.ttl) # Load store docs with explicit keys @@ -352,7 +370,7 @@ def _batch_put_ops( doc.pop("expires_at", None) store_docs.append(doc) - redis_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}" + redis_key = f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_id}" store_keys.append(redis_key) if store_docs: @@ -408,11 +426,11 @@ def _batch_put_ops( "updated_at": datetime.now(timezone.utc).timestamp(), } ) - redis_vector_key = f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}" + redis_vector_key = f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}" vector_keys.append(redis_vector_key) # Add this vector key to the related keys list for TTL - main_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}" + main_key = f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_id}" if main_key in ttl_tracking: ttl_tracking[main_key][0].append(redis_vector_key) @@ -472,7 +490,9 @@ def _batch_search_ops( ) if doc_id: doc_uuid = doc_id.split(":")[1] - store_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}" + store_key = ( + f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}" + ) result_map[store_key] = doc # Fetch individually in cluster mode store_doc_item = self._redis.json().get(store_key) @@ -489,7 +509,9 @@ def _batch_search_ops( if not doc_id: continue doc_uuid = doc_id.split(":")[1] - store_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}" + store_key = ( + f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}" + ) result_map[store_key] = doc pipe.json().get(store_key) # Execute all lookups in one batch @@ -554,7 +576,7 @@ def _batch_search_ops( # Also find associated vector keys with same ID doc_id = store_key.split(":")[-1] vector_key = ( - f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}" + f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}" ) refresh_keys.append(vector_key) @@ -625,7 +647,7 @@ def _batch_search_ops( # Also find associated vector keys with same ID doc_id = doc.id.split(":")[-1] vector_key = ( - f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}" + f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}" ) refresh_keys.append(vector_key) diff --git a/langgraph/store/redis/aio.py b/langgraph/store/redis/aio.py index 7504fe1..2fd81b3 100644 --- a/langgraph/store/redis/aio.py +++ b/langgraph/store/redis/aio.py @@ -31,8 +31,6 @@ from langgraph.store.redis.base import ( REDIS_KEY_SEPARATOR, - STORE_PREFIX, - STORE_VECTOR_PREFIX, BaseRedisStore, RedisDocument, _decode_ns, @@ -74,8 +72,10 @@ def __init__( redis_client: Optional[AsyncRedis] = None, index: Optional[IndexConfig] = None, connection_args: Optional[dict[str, Any]] = None, - ttl: Optional[dict[str, Any]] = None, + ttl: Optional[TTLConfig] = None, cluster_mode: Optional[bool] = None, + store_prefix: str = "store", + vector_prefix: str = "store_vectors", ) -> None: """Initialize store with Redis connection and optional index config.""" if redis_url is None and redis_client is None: @@ -84,29 +84,7 @@ def __init__( # Initialize base classes AsyncBatchedBaseStore.__init__(self) - # Set up store configuration - self.index_config = index - self.ttl_config = ttl - - if self.index_config: - self.index_config = self.index_config.copy() - self.embeddings = ensure_embeddings( - self.index_config.get("embed"), - ) - fields = ( - self.index_config.get("text_fields", ["$"]) - or self.index_config.get("fields", ["$"]) - or [] - ) - if isinstance(fields, str): - fields = [fields] - - self.index_config["__tokenized_fields"] = [ - (p, tokenize_path(p)) if p != "$" else (p, p) - for p in (self.index_config.get("fields") or ["$"]) - ] - - # Configure client + # Configure client first self.configure_client( redis_url=redis_url, redis_client=redis_client, @@ -116,16 +94,60 @@ def __init__( # Validate and store cluster_mode; None means auto-detect later if cluster_mode is not None and not isinstance(cluster_mode, bool): raise TypeError("cluster_mode must be a boolean or None") - self.cluster_mode: Optional[bool] = cluster_mode - # Create store index + # Initialize BaseRedisStore with prefix parameters + BaseRedisStore.__init__( + self, + conn=self._redis, + index=index, + ttl=ttl, + cluster_mode=cluster_mode, + store_prefix=store_prefix, + vector_prefix=vector_prefix, + ) + + # Update store_index to async version + from copy import deepcopy + + store_schema = { + "index": { + "name": self.store_prefix, + "prefix": self.store_prefix + REDIS_KEY_SEPARATOR, + "storage_type": "json", + }, + "fields": [ + {"name": "prefix", "type": "text"}, + {"name": "key", "type": "tag"}, + {"name": "created_at", "type": "numeric"}, + {"name": "updated_at", "type": "numeric"}, + {"name": "ttl_minutes", "type": "numeric"}, + {"name": "expires_at", "type": "numeric"}, + ], + } self.store_index = AsyncSearchIndex.from_dict( - self.SCHEMAS[0], redis_client=self._redis + store_schema, redis_client=self._redis ) # Configure vector index if needed if self.index_config: - vector_schema = self.SCHEMAS[1].copy() + # Create custom vector schema with instance prefix + vector_schema = { + "index": { + "name": self.vector_prefix, + "prefix": self.vector_prefix + REDIS_KEY_SEPARATOR, + "storage_type": "json", + }, + "fields": [ + {"name": "prefix", "type": "text"}, + {"name": "key", "type": "tag"}, + {"name": "field_name", "type": "tag"}, + {"name": "embedding", "type": "vector"}, + {"name": "created_at", "type": "numeric"}, + {"name": "updated_at", "type": "numeric"}, + {"name": "ttl_minutes", "type": "numeric"}, + {"name": "expires_at", "type": "numeric"}, + ], + } vector_fields = vector_schema.get("fields", []) vector_field = None for f in vector_fields: @@ -301,10 +323,18 @@ async def from_conn_string( conn_string: str, *, index: Optional[IndexConfig] = None, - ttl: Optional[dict[str, Any]] = None, + ttl: Optional[TTLConfig] = None, + store_prefix: str = "store", + vector_prefix: str = "store_vectors", ) -> AsyncIterator[AsyncRedisStore]: """Create store from Redis connection string.""" - async with cls(redis_url=conn_string, index=index, ttl=ttl) as store: + async with cls( + redis_url=conn_string, + index=index, + ttl=ttl, + store_prefix=store_prefix, + vector_prefix=vector_prefix, + ) as store: await store.setup() # Set client information after setup await store.aset_client_info() @@ -460,7 +490,7 @@ async def _batch_get_ops( # Also add vector keys for the same document doc_uuid = doc_id.split(":")[-1] vector_key = ( - f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}" + f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}" ) refresh_keys_by_idx[idx].append(vector_key) @@ -624,7 +654,9 @@ async def _batch_put_ops( doc_ids[(namespace, op.key)] = generated_doc_id # Track TTL for this document if specified if hasattr(op, "ttl") and op.ttl is not None: - main_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{generated_doc_id}" + main_key = ( + f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{generated_doc_id}" + ) ttl_tracking[main_key] = ([], op.ttl) # Load store docs with explicit keys @@ -638,7 +670,7 @@ async def _batch_put_ops( doc.pop("expires_at", None) store_docs.append(doc) - redis_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}" + redis_key = f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_id}" store_keys.append(redis_key) if store_docs: @@ -674,11 +706,11 @@ async def _batch_put_ops( "updated_at": datetime.now(timezone.utc).timestamp(), } ) - redis_vector_key = f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}" + redis_vector_key = f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}" vector_keys.append(redis_vector_key) # Add this vector key to the related keys list for TTL - main_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}" + main_key = f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_id}" if main_key in ttl_tracking: ttl_tracking[main_key][0].append(redis_vector_key) @@ -740,7 +772,9 @@ async def _batch_search_ops( ) if doc_id: doc_uuid = doc_id.split(":")[1] - store_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}" + store_key = ( + f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}" + ) result_map[store_key] = doc # Fetch individually in cluster mode store_doc_item = await self._redis.json().get(store_key) # type: ignore @@ -760,7 +794,9 @@ async def _batch_search_ops( ) if doc_id: doc_uuid = doc_id.split(":")[1] - store_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}" + store_key = ( + f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}" + ) result_map[store_key] = doc pipeline.json().get(store_key) store_docs_raw = await pipeline.execute() @@ -825,7 +861,7 @@ async def _batch_search_ops( # Also find associated vector keys with same ID doc_id = store_key.split(":")[-1] vector_key = ( - f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}" + f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}" ) refresh_keys.append(vector_key) @@ -897,7 +933,7 @@ async def _batch_search_ops( # Also find associated vector keys with same ID doc_id = doc.id.split(":")[-1] vector_key = ( - f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}" + f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}" ) refresh_keys.append(vector_key) diff --git a/langgraph/store/redis/base.py b/langgraph/store/redis/base.py index 34a404c..bd278ec 100644 --- a/langgraph/store/redis/base.py +++ b/langgraph/store/redis/base.py @@ -205,14 +205,29 @@ def __init__( index: Optional[IndexConfig] = None, ttl: Optional[TTLConfig] = None, # Corrected type hint for ttl cluster_mode: Optional[bool] = None, + store_prefix: str = "store", + vector_prefix: str = "store_vectors", ) -> None: - """Initialize store with Redis connection and optional index config.""" + """Initialize store with Redis connection and optional index config. + + Args: + conn: Redis client connection + index: Optional index configuration for vector search + ttl: Optional TTL configuration + cluster_mode: Optional cluster mode setting (None = auto-detect) + store_prefix: Prefix for store keys (default: "store") + vector_prefix: Prefix for vector keys (default: "store_vectors") + """ self.index_config = index self.ttl_config = ttl self._redis = conn # Store cluster_mode; None means auto-detect in RedisStore or AsyncRedisStore self.cluster_mode = cluster_mode + # Store custom prefixes + self.store_prefix = store_prefix + self.vector_prefix = vector_prefix + if self.index_config: self.index_config = self.index_config.copy() self.embeddings = ensure_embeddings( @@ -225,10 +240,25 @@ def __init__( (p, tokenize_path(p)) if p != "$" else (p, p) for p in fields ] + # Create custom schemas with instance prefixes + store_schema = { + "index": { + "name": self.store_prefix, + "prefix": self.store_prefix + REDIS_KEY_SEPARATOR, + "storage_type": "json", + }, + "fields": [ + {"name": "prefix", "type": "text"}, + {"name": "key", "type": "tag"}, + {"name": "created_at", "type": "numeric"}, + {"name": "updated_at", "type": "numeric"}, + {"name": "ttl_minutes", "type": "numeric"}, + {"name": "expires_at", "type": "numeric"}, + ], + } + # Initialize search indices - self.store_index = SearchIndex.from_dict( - self.SCHEMAS[0], redis_client=self._redis - ) + self.store_index = SearchIndex.from_dict(store_schema, redis_client=self._redis) # Configure vector index if needed if self.index_config: @@ -237,9 +267,24 @@ def __init__( index_dict = dict(self.index_config) vector_storage_type = index_dict.get("vector_storage_type", "json") - vector_schema: Dict[str, Any] = copy.deepcopy(self.SCHEMAS[1]) - # Update storage type in schema - vector_schema["index"]["storage_type"] = vector_storage_type + # Create custom vector schema with instance prefix + vector_schema: Dict[str, Any] = { + "index": { + "name": self.vector_prefix, + "prefix": self.vector_prefix + REDIS_KEY_SEPARATOR, + "storage_type": vector_storage_type, + }, + "fields": [ + {"name": "prefix", "type": "text"}, + {"name": "key", "type": "tag"}, + {"name": "field_name", "type": "tag"}, + {"name": "embedding", "type": "vector"}, + {"name": "created_at", "type": "numeric"}, + {"name": "updated_at", "type": "numeric"}, + {"name": "ttl_minutes", "type": "numeric"}, + {"name": "expires_at", "type": "numeric"}, + ], + } vector_fields = vector_schema.get("fields", []) vector_field = None @@ -274,8 +319,8 @@ def __init__( vector_schema, redis_client=self._redis ) - # Set client information in Redis - self.set_client_info() + # Note: set_client_info() should be called by concrete implementations + # after initialization to avoid async/sync conflicts def set_client_info(self) -> None: """Set client info for Redis monitoring.""" diff --git a/tests/test_issue_99_custom_store_prefixes.py b/tests/test_issue_99_custom_store_prefixes.py new file mode 100644 index 0000000..88c6064 --- /dev/null +++ b/tests/test_issue_99_custom_store_prefixes.py @@ -0,0 +1,236 @@ +"""Tests for issue #99: Support customization of store prefixes. + +Issue: https://github.com/redis-developer/langgraph-redis/issues/99 + +Feature request to allow customization of DEFAULT_STORE_PREFIX and +DEFAULT_STORE_VECTOR_PREFIX to enable multiple isolated stores in the same Redis instance. +""" + +import pytest + +from langgraph.store.redis import AsyncRedisStore, RedisStore + + +def test_default_store_prefix_is_store(redis_url: str) -> None: + """Test that default store prefix is 'store'.""" + with RedisStore.from_conn_string(redis_url) as store: + store.setup() + + # Default prefix should be "store" + assert store.store_prefix == "store" + assert store.vector_prefix == "store_vectors" + + # Verify keys use default prefix + namespace = ("test", "default_prefix") + store.put(namespace, "key1", {"data": "value1"}) + + # Check that the key was created with default prefix + keys = list(store._redis.scan_iter("store:*")) + assert len(keys) > 0 + assert any(b"store:" in key for key in keys) + + +def test_custom_store_prefix_sync(redis_url: str) -> None: + """Test RedisStore with custom store prefix.""" + custom_prefix = "myapp_store" + custom_vector_prefix = "myapp_vectors" + + with RedisStore.from_conn_string( + redis_url, store_prefix=custom_prefix, vector_prefix=custom_vector_prefix + ) as store: + store.setup() + + # Verify custom prefixes are set + assert store.store_prefix == custom_prefix + assert store.vector_prefix == custom_vector_prefix + + namespace = ("test", "custom_prefix") + store.put(namespace, "key1", {"data": "value1"}) + + # Verify keys use custom prefix + keys = list(store._redis.scan_iter(f"{custom_prefix}:*")) + assert len(keys) > 0 + + # Verify default prefix is NOT used + default_keys = list(store._redis.scan_iter("store:*")) + # Filter out any keys from other tests + default_keys = [k for k in default_keys if b"custom_prefix" in k] + assert len(default_keys) == 0 + + # Verify data can be retrieved + item = store.get(namespace, "key1") + assert item is not None + assert item.value["data"] == "value1" + + +def test_custom_prefix_isolation(redis_url: str) -> None: + """Test that different prefixes create isolated stores.""" + namespace = ("test", "isolation") + + # Store 1 with prefix "app1" + with RedisStore.from_conn_string(redis_url, store_prefix="app1") as store1: + store1.setup() + store1.put(namespace, "shared_key", {"app": "app1", "value": "from_app1"}) + + # Store 2 with prefix "app2" + with RedisStore.from_conn_string(redis_url, store_prefix="app2") as store2: + store2.setup() + store2.put(namespace, "shared_key", {"app": "app2", "value": "from_app2"}) + + # Verify isolation - each store should only see its own data + with RedisStore.from_conn_string(redis_url, store_prefix="app1") as store1: + store1.setup() + item1 = store1.get(namespace, "shared_key") + assert item1 is not None + assert item1.value["app"] == "app1" + assert item1.value["value"] == "from_app1" + + with RedisStore.from_conn_string(redis_url, store_prefix="app2") as store2: + store2.setup() + item2 = store2.get(namespace, "shared_key") + assert item2 is not None + assert item2.value["app"] == "app2" + assert item2.value["value"] == "from_app2" + + +@pytest.mark.asyncio +async def test_custom_store_prefix_async(redis_url: str) -> None: + """Test AsyncRedisStore with custom store prefix.""" + custom_prefix = "async_store" + custom_vector_prefix = "async_vectors" + + store = AsyncRedisStore( + redis_url, store_prefix=custom_prefix, vector_prefix=custom_vector_prefix + ) + await store.setup() + + try: + # Verify custom prefixes are set + assert store.store_prefix == custom_prefix + assert store.vector_prefix == custom_vector_prefix + + namespace = ("test", "async_custom") + await store.aput(namespace, "key1", {"data": "async_value"}) + + # Verify keys use custom prefix + keys = [] + async for key in store._redis.scan_iter(f"{custom_prefix}:*"): + keys.append(key) + assert len(keys) > 0 + + # Verify data can be retrieved + item = await store.aget(namespace, "key1") + assert item is not None + assert item.value["data"] == "async_value" + finally: + await store._redis.aclose() + + +@pytest.mark.asyncio +async def test_custom_vector_prefix_async(redis_url: str) -> None: + """Test that custom vector prefix is set correctly.""" + custom_vector_prefix = "custom_vectors" + + store = AsyncRedisStore( + redis_url, + store_prefix="custom_store", + vector_prefix=custom_vector_prefix, + ) + await store.setup() + + try: + # Verify custom vector prefix is set + assert store.vector_prefix == custom_vector_prefix + assert store.store_prefix == "custom_store" + + # Verify the vector index name uses custom prefix + if hasattr(store, "vector_index"): + assert custom_vector_prefix == store.vector_index.schema.index.name + finally: + await store._redis.aclose() + + +def test_custom_prefix_with_special_characters(redis_url: str) -> None: + """Test that custom prefixes work with various characters.""" + custom_prefix = "app-v1.2.3_store" + + with RedisStore.from_conn_string(redis_url, store_prefix=custom_prefix) as store: + store.setup() + + namespace = ("test", "special_chars") + store.put(namespace, "key1", {"data": "value1"}) + + # Verify the custom prefix is preserved + keys = list(store._redis.scan_iter(f"{custom_prefix}:*")) + assert len(keys) > 0 + + item = store.get(namespace, "key1") + assert item is not None + assert item.value["data"] == "value1" + + +def test_index_names_reflect_custom_prefix(redis_url: str) -> None: + """Test that Redis search index names reflect custom prefixes.""" + custom_prefix = "myindex" + custom_vector_prefix = "myvectors" + + with RedisStore.from_conn_string( + redis_url, store_prefix=custom_prefix, vector_prefix=custom_vector_prefix + ) as store: + store.setup() + + # Index names should incorporate the custom prefix + # This prevents index name collisions when using multiple stores + assert custom_prefix in store.store_index.schema.index.name + + # If vector index exists, it should also use custom prefix + if hasattr(store, "vector_index"): + assert custom_vector_prefix in store.vector_index.schema.index.name + + +@pytest.mark.asyncio +async def test_put_and_get_with_custom_prefix(redis_url: str) -> None: + """Test that put and get work correctly with custom prefix.""" + custom_prefix = "ns_test" + + store = AsyncRedisStore(redis_url, store_prefix=custom_prefix) + await store.setup() + + try: + # Add items to different namespaces + await store.aput(("app", "users"), "user1", {"name": "Alice"}) + await store.aput(("app", "products"), "prod1", {"name": "Widget"}) + + # Retrieve items - should work with custom prefix + user = await store.aget(("app", "users"), "user1") + assert user is not None + assert user.value["name"] == "Alice" + + product = await store.aget(("app", "products"), "prod1") + assert product is not None + assert product.value["name"] == "Widget" + + # Verify keys use custom prefix + keys = [] + async for key in store._redis.scan_iter(f"{custom_prefix}:*"): + keys.append(key) + assert len(keys) == 2 + finally: + await store._redis.aclose() + + +def test_backward_compatibility_no_prefix_specified(redis_url: str) -> None: + """Test that not specifying prefix uses defaults (backward compatibility).""" + with RedisStore.from_conn_string(redis_url) as store: + store.setup() + + # Should use defaults + assert store.store_prefix == "store" + assert store.vector_prefix == "store_vectors" + + namespace = ("test", "backward_compat") + store.put(namespace, "key1", {"data": "value1"}) + + item = store.get(namespace, "key1") + assert item is not None + assert item.value["data"] == "value1"