Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions langgraph/checkpoint/redis/message_exporter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Message exporter for extracting conversation messages from checkpoints."""

from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Protocol

import orjson
Expand Down Expand Up @@ -162,5 +162,5 @@ def export_thread(self, thread_id: str) -> Dict[str, Any]:
return {
"thread_id": thread_id,
"messages": messages,
"export_timestamp": datetime.utcnow().isoformat(),
"export_timestamp": datetime.now(timezone.utc).isoformat(),
}
52 changes: 37 additions & 15 deletions langgraph/store/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
from langgraph.store.redis.aio import AsyncRedisStore
from langgraph.store.redis.base import (
REDIS_KEY_SEPARATOR,
STORE_PREFIX,
STORE_VECTOR_PREFIX,
BaseRedisStore,
RedisDocument,
_decode_ns,
Expand Down Expand Up @@ -86,11 +84,21 @@ def __init__(
index: Optional[IndexConfig] = None,
ttl: Optional[TTLConfig] = None,
cluster_mode: Optional[bool] = None,
store_prefix: str = "store",
vector_prefix: str = "store_vectors",
) -> None:
BaseStore.__init__(self)
BaseRedisStore.__init__(
self, conn, index=index, ttl=ttl, cluster_mode=cluster_mode
self,
conn,
index=index,
ttl=ttl,
cluster_mode=cluster_mode,
store_prefix=store_prefix,
vector_prefix=vector_prefix,
)
# Set client info for monitoring (sync store can call this safely)
self.set_client_info()
# Detection will happen in setup()

@classmethod
Expand All @@ -101,14 +109,22 @@ def from_conn_string(
*,
index: Optional[IndexConfig] = None,
ttl: Optional[TTLConfig] = None,
store_prefix: str = "store",
vector_prefix: str = "store_vectors",
) -> Iterator[RedisStore]:
"""Create store from Redis connection string."""
client = None
try:
client = RedisConnectionFactory.get_redis_connection(conn_string)
store = cls(client, index=index, ttl=ttl)
# Client info will already be set in __init__, but we set it up here
# to make the method behavior consistent with AsyncRedisStore
store = cls(
client,
index=index,
ttl=ttl,
store_prefix=store_prefix,
vector_prefix=vector_prefix,
)
# Client info is set in __init__, but set it again here to ensure
# it's available even if called before setup()
store.set_client_info()
yield store
finally:
Expand Down Expand Up @@ -259,7 +275,7 @@ def _batch_get_ops(
# Also add vector keys for the same document
doc_uuid = doc_id.split(":")[-1]
vector_key = (
f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}"
f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}"
)
refresh_keys_by_idx[idx].append(vector_key)

Expand Down Expand Up @@ -338,7 +354,9 @@ def _batch_put_ops(
doc_ids[(namespace, op.key)] = generated_doc_id
# Track TTL for this document if specified
if hasattr(op, "ttl") and op.ttl is not None:
main_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{generated_doc_id}"
main_key = (
f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{generated_doc_id}"
)
ttl_tracking[main_key] = ([], op.ttl)

# Load store docs with explicit keys
Expand All @@ -352,7 +370,7 @@ def _batch_put_ops(
doc.pop("expires_at", None)

store_docs.append(doc)
redis_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
redis_key = f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
store_keys.append(redis_key)

if store_docs:
Expand Down Expand Up @@ -408,11 +426,11 @@ def _batch_put_ops(
"updated_at": datetime.now(timezone.utc).timestamp(),
}
)
redis_vector_key = f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
redis_vector_key = f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
vector_keys.append(redis_vector_key)

# Add this vector key to the related keys list for TTL
main_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
main_key = f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
if main_key in ttl_tracking:
ttl_tracking[main_key][0].append(redis_vector_key)

Expand Down Expand Up @@ -472,7 +490,9 @@ def _batch_search_ops(
)
if doc_id:
doc_uuid = doc_id.split(":")[1]
store_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}"
store_key = (
f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}"
)
result_map[store_key] = doc
# Fetch individually in cluster mode
store_doc_item = self._redis.json().get(store_key)
Expand All @@ -489,7 +509,9 @@ def _batch_search_ops(
if not doc_id:
continue
doc_uuid = doc_id.split(":")[1]
store_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}"
store_key = (
f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}"
)
result_map[store_key] = doc
pipe.json().get(store_key)
# Execute all lookups in one batch
Expand Down Expand Up @@ -554,7 +576,7 @@ def _batch_search_ops(
# Also find associated vector keys with same ID
doc_id = store_key.split(":")[-1]
vector_key = (
f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
)
refresh_keys.append(vector_key)

Expand Down Expand Up @@ -625,7 +647,7 @@ def _batch_search_ops(
# Also find associated vector keys with same ID
doc_id = doc.id.split(":")[-1]
vector_key = (
f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
)
refresh_keys.append(vector_key)

Expand Down
118 changes: 77 additions & 41 deletions langgraph/store/redis/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@

from langgraph.store.redis.base import (
REDIS_KEY_SEPARATOR,
STORE_PREFIX,
STORE_VECTOR_PREFIX,
BaseRedisStore,
RedisDocument,
_decode_ns,
Expand Down Expand Up @@ -74,8 +72,10 @@ def __init__(
redis_client: Optional[AsyncRedis] = None,
index: Optional[IndexConfig] = None,
connection_args: Optional[dict[str, Any]] = None,
ttl: Optional[dict[str, Any]] = None,
ttl: Optional[TTLConfig] = None,
cluster_mode: Optional[bool] = None,
store_prefix: str = "store",
vector_prefix: str = "store_vectors",
) -> None:
"""Initialize store with Redis connection and optional index config."""
if redis_url is None and redis_client is None:
Expand All @@ -84,29 +84,7 @@ def __init__(
# Initialize base classes
AsyncBatchedBaseStore.__init__(self)

# Set up store configuration
self.index_config = index
self.ttl_config = ttl

if self.index_config:
self.index_config = self.index_config.copy()
self.embeddings = ensure_embeddings(
self.index_config.get("embed"),
)
fields = (
self.index_config.get("text_fields", ["$"])
or self.index_config.get("fields", ["$"])
or []
)
if isinstance(fields, str):
fields = [fields]

self.index_config["__tokenized_fields"] = [
(p, tokenize_path(p)) if p != "$" else (p, p)
for p in (self.index_config.get("fields") or ["$"])
]

# Configure client
# Configure client first
self.configure_client(
redis_url=redis_url,
redis_client=redis_client,
Expand All @@ -116,16 +94,60 @@ def __init__(
# Validate and store cluster_mode; None means auto-detect later
if cluster_mode is not None and not isinstance(cluster_mode, bool):
raise TypeError("cluster_mode must be a boolean or None")
self.cluster_mode: Optional[bool] = cluster_mode

# Create store index
# Initialize BaseRedisStore with prefix parameters
BaseRedisStore.__init__(
self,
conn=self._redis,
index=index,
ttl=ttl,
cluster_mode=cluster_mode,
store_prefix=store_prefix,
vector_prefix=vector_prefix,
)

# Update store_index to async version
from copy import deepcopy

store_schema = {
"index": {
"name": self.store_prefix,
"prefix": self.store_prefix + REDIS_KEY_SEPARATOR,
"storage_type": "json",
},
"fields": [
{"name": "prefix", "type": "text"},
{"name": "key", "type": "tag"},
{"name": "created_at", "type": "numeric"},
{"name": "updated_at", "type": "numeric"},
{"name": "ttl_minutes", "type": "numeric"},
{"name": "expires_at", "type": "numeric"},
],
}
self.store_index = AsyncSearchIndex.from_dict(
self.SCHEMAS[0], redis_client=self._redis
store_schema, redis_client=self._redis
)

# Configure vector index if needed
if self.index_config:
vector_schema = self.SCHEMAS[1].copy()
# Create custom vector schema with instance prefix
vector_schema = {
"index": {
"name": self.vector_prefix,
"prefix": self.vector_prefix + REDIS_KEY_SEPARATOR,
"storage_type": "json",
},
"fields": [
{"name": "prefix", "type": "text"},
{"name": "key", "type": "tag"},
{"name": "field_name", "type": "tag"},
{"name": "embedding", "type": "vector"},
{"name": "created_at", "type": "numeric"},
{"name": "updated_at", "type": "numeric"},
{"name": "ttl_minutes", "type": "numeric"},
{"name": "expires_at", "type": "numeric"},
],
}
vector_fields = vector_schema.get("fields", [])
vector_field = None
for f in vector_fields:
Expand Down Expand Up @@ -301,10 +323,18 @@ async def from_conn_string(
conn_string: str,
*,
index: Optional[IndexConfig] = None,
ttl: Optional[dict[str, Any]] = None,
ttl: Optional[TTLConfig] = None,
store_prefix: str = "store",
vector_prefix: str = "store_vectors",
) -> AsyncIterator[AsyncRedisStore]:
"""Create store from Redis connection string."""
async with cls(redis_url=conn_string, index=index, ttl=ttl) as store:
async with cls(
redis_url=conn_string,
index=index,
ttl=ttl,
store_prefix=store_prefix,
vector_prefix=vector_prefix,
) as store:
await store.setup()
# Set client information after setup
await store.aset_client_info()
Expand Down Expand Up @@ -460,7 +490,7 @@ async def _batch_get_ops(
# Also add vector keys for the same document
doc_uuid = doc_id.split(":")[-1]
vector_key = (
f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}"
f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}"
)
refresh_keys_by_idx[idx].append(vector_key)

Expand Down Expand Up @@ -624,7 +654,9 @@ async def _batch_put_ops(
doc_ids[(namespace, op.key)] = generated_doc_id
# Track TTL for this document if specified
if hasattr(op, "ttl") and op.ttl is not None:
main_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{generated_doc_id}"
main_key = (
f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{generated_doc_id}"
)
ttl_tracking[main_key] = ([], op.ttl)

# Load store docs with explicit keys
Expand All @@ -638,7 +670,7 @@ async def _batch_put_ops(
doc.pop("expires_at", None)

store_docs.append(doc)
redis_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
redis_key = f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
store_keys.append(redis_key)

if store_docs:
Expand Down Expand Up @@ -674,11 +706,11 @@ async def _batch_put_ops(
"updated_at": datetime.now(timezone.utc).timestamp(),
}
)
redis_vector_key = f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
redis_vector_key = f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
vector_keys.append(redis_vector_key)

# Add this vector key to the related keys list for TTL
main_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
main_key = f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
if main_key in ttl_tracking:
ttl_tracking[main_key][0].append(redis_vector_key)

Expand Down Expand Up @@ -740,7 +772,9 @@ async def _batch_search_ops(
)
if doc_id:
doc_uuid = doc_id.split(":")[1]
store_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}"
store_key = (
f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}"
)
result_map[store_key] = doc
# Fetch individually in cluster mode
store_doc_item = await self._redis.json().get(store_key) # type: ignore
Expand All @@ -760,7 +794,9 @@ async def _batch_search_ops(
)
if doc_id:
doc_uuid = doc_id.split(":")[1]
store_key = f"{STORE_PREFIX}{REDIS_KEY_SEPARATOR}{doc_uuid}"
store_key = (
f"{self.store_prefix}{REDIS_KEY_SEPARATOR}{doc_uuid}"
)
result_map[store_key] = doc
pipeline.json().get(store_key)
store_docs_raw = await pipeline.execute()
Expand Down Expand Up @@ -825,7 +861,7 @@ async def _batch_search_ops(
# Also find associated vector keys with same ID
doc_id = store_key.split(":")[-1]
vector_key = (
f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
)
refresh_keys.append(vector_key)

Expand Down Expand Up @@ -897,7 +933,7 @@ async def _batch_search_ops(
# Also find associated vector keys with same ID
doc_id = doc.id.split(":")[-1]
vector_key = (
f"{STORE_VECTOR_PREFIX}{REDIS_KEY_SEPARATOR}{doc_id}"
f"{self.vector_prefix}{REDIS_KEY_SEPARATOR}{doc_id}"
)
refresh_keys.append(vector_key)

Expand Down
Loading