Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.http.example
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ OPERATOR_KEY_MAIN= # Operator private key used to sign transaction
# ========== TRANSACTION POOL ========
# PENDING_TRANSACTION_STORAGE_TTL=30 # Time-to-live (TTL) in seconds for transaction payloads stored in Redis

# ========== LOCK SERVICE ===========
# LOCK_MAX_HOLD_MS=30000 # Maximum time in milliseconds a lock can be held before automatic force-release
# LOCAL_LOCK_MAX_ENTRIES=1000 # Maximum number of lock entries stored in memory

# ========== HBAR RATE LIMITING ==========
# HBAR_RATE_LIMIT_TINYBAR=25000000000 # Total HBAR budget (250 HBARs)
# HBAR_RATE_LIMIT_DURATION=86400000 # HBAR budget limit duration (1 day)
Expand Down
4 changes: 4 additions & 0 deletions .env.ws.example
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ SUBSCRIPTIONS_ENABLED=true # Must be true for the WebSocket server to func
# ========== TRANSACTION POOL ========
# PENDING_TRANSACTION_STORAGE_TTL=30 # Time-to-live (TTL) in seconds for transaction payloads stored in Redis

# ========== LOCK SERVICE ===========
# LOCK_MAX_HOLD_MS=30000 # Maximum time in milliseconds a lock can be held before automatic force-release
# LOCAL_LOCK_MAX_ENTRIES=1000 # Maximum number of lock entries stored in memory

# ========== OTHER SETTINGS ==========
# CLIENT_TRANSPORT_SECURITY=false # Enable or disable TLS for both networks
# USE_ASYNC_TX_PROCESSING=true # If true, returns tx hash immediately after prechecks
Expand Down
4 changes: 4 additions & 0 deletions charts/hedera-json-rpc-relay/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ config:
# REDIS_RECONNECT_DELAY_MS:
# MULTI_SET:

# ========== LOCK SERVICE CONFIGURATION ==========
# LOCK_MAX_HOLD_MS:
# LOCAL_LOCK_MAX_ENTRIES

# ========== DEVELOPMENT & TESTING ==========
# LOG_LEVEL: 'trace'

Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ Unless you need to set a non-default value, it is recommended to only populate o
| `JUMBO_TX_ENABLED` | "true" | Controls how large transactions are handled during `eth_sendRawTransaction`. When set to `true`, transactions up to 128KB can be sent directly to consensus nodes without using Hedera File Service (HFS), as long as contract bytecode doesn't exceed 24KB. When set to `false`, all transactions containing contract deployments use the traditional HFS approach. This feature leverages the increased transaction size limit to simplify processing of standard Ethereum transactions. |
| `LIMIT_DURATION` | "60000" | The maximum duration in ms applied to IP-method based rate limits. |
| `LOCAL_LOCK_MAX_ENTRIES` | "1000" | Maximum number of lock entries stored in memory. Prevents unbounded memory growth. |
| `LOCAL_LOCK_MAX_LOCK_TIME` | "30000" | Timer to auto-release if lock not manually released (in ms). |
| `MAX_GAS_ALLOWANCE_HBAR` | "0" | The maximum amount, in hbars, that the JSON-RPC Relay is willing to pay to complete the transaction in case the senders don't provide enough funds. Please note, in case of fully subsidized transactions, the sender must set the gas price to `0` and the JSON-RPC Relay must configure the `MAX_GAS_ALLOWANCE_HBAR` parameter high enough to cover the entire transaction cost. |
| `MAX_TRANSACTION_FEE_THRESHOLD` | "15000000" | Used to set the max transaction fee. This is the HAPI fee which is paid by the relay operator account. |
| `MIRROR_NODE_AGENT_CACHEABLE_DNS` | "true" | Flag to set if the mirror node agent should cacheable DNS lookups, using better-lookup library. |
Expand Down Expand Up @@ -107,6 +106,7 @@ Unless you need to set a non-default value, it is recommended to only populate o
| `TX_DEFAULT_GAS` | "400000" | Default gas for transactions that do not specify gas. |
| `TXPOOL_API_ENABLED` | "false" | Enables all txpool related methods. |
| `USE_ASYNC_TX_PROCESSING` | "true" | Set to `true` to enable `eth_sendRawTransaction` to return the transaction hash immediately after passing all prechecks, while processing the transaction asynchronously in the background. |
| `LOCK_MAX_HOLD_MS` | "30000" | Maximum time in milliseconds a lock can be held before automatic force-release. This TTL prevents deadlocks when transaction processing hangs or crashes. Default is 30 seconds. |
| `USE_MIRROR_NODE_MODULARIZED_SERVICES` | null | Controls routing of Mirror Node traffic through modularized services. When set to `true`, enables routing a percentage of traffic to modularized services. When set to `false`, ensures traffic follows the traditional non-modularized flow. When not set (i.e. `null` by default), no specific routing preference is applied. As Mirror Node gradually transitions to a fully modularized architecture across all networks, this setting will eventually default to `true`. |

## Server
Expand Down
10 changes: 5 additions & 5 deletions packages/config-service/src/services/globalConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,6 @@ const _CONFIG = {
required: false,
defaultValue: 1000,
},
LOCAL_LOCK_MAX_LOCK_TIME: {
type: 'number',
required: false,
defaultValue: 30000,
},
LOG_LEVEL: {
type: 'string',
required: false,
Expand Down Expand Up @@ -659,6 +654,11 @@ const _CONFIG = {
required: false,
defaultValue: true,
},
LOCK_MAX_HOLD_MS: {
type: 'number',
required: false,
defaultValue: 30000,
},
USE_MIRROR_NODE_MODULARIZED_SERVICES: {
type: 'boolean',
required: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { randomUUID } from 'crypto';
import { LRUCache } from 'lru-cache';
import { Logger } from 'pino';

import { LockService } from './LockService';

/**
* Represents the internal state for a lock associated with a given address.
*/
Expand Down Expand Up @@ -71,7 +73,7 @@ export class LocalLockStrategy {
// Start a 30-second timer to auto-release if lock not manually released
state.lockTimeoutId = setTimeout(() => {
this.forceReleaseExpiredLock(address, sessionKey);
}, ConfigService.get('LOCAL_LOCK_MAX_LOCK_TIME'));
}, ConfigService.get('LOCK_MAX_HOLD_MS'));

return sessionKey;
}
Expand All @@ -83,13 +85,13 @@ export class LocalLockStrategy {
* @param sessionKey - The session key of the lock holder
*/
async releaseLock(address: string, sessionKey: string): Promise<void> {
if (this.logger.isLevelEnabled('debug')) {
const holdTime = Date.now() - state.acquiredAt!;
const state = this.localLockStates.get(address);

if (this.logger.isLevelEnabled('debug') && state?.acquiredAt) {
const holdTime = Date.now() - state.acquiredAt;
this.logger.debug(`Releasing lock for address ${address} and session key ${sessionKey} held for ${holdTime}ms.`);
}

const state = this.localLockStates.get(address);

// Ensure only the lock owner can release
if (state?.sessionKey === sessionKey) {
await this.doRelease(state);
Expand All @@ -103,17 +105,17 @@ export class LocalLockStrategy {
* @returns The LockState object associated with the address
*/
private getOrCreateState(address: string): LockState {
address = address.toLowerCase();
if (!this.localLockStates.has(address)) {
this.localLockStates.set(address, {
const normalizedAddress = LockService.normalizeAddress(address);
if (!this.localLockStates.has(normalizedAddress)) {
this.localLockStates.set(normalizedAddress, {
mutex: new Mutex(),
sessionKey: null,
acquiredAt: null,
lockTimeoutId: null,
});
}

return this.localLockStates.get(address)!;
return this.localLockStates.get(normalizedAddress)!;
}

/**
Expand Down
14 changes: 12 additions & 2 deletions packages/relay/src/lib/services/lockService/LockService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ export class LockService {
* Blocks until the lock is available (no timeout on waiting).
*
* @param address - The sender address to acquire the lock for.
* @returns A promise that resolves to a unique session key.
* @returns A promise that resolves to a unique session key, or null if acquisition fails (fail open).
*/
async acquireLock(address: string): Promise<string> {
async acquireLock(address: string): Promise<string | null> {
return await this.strategy.acquireLock(address);
}

Expand All @@ -42,4 +42,14 @@ export class LockService {
async releaseLock(address: string, sessionKey: string): Promise<void> {
await this.strategy.releaseLock(address, sessionKey);
}

/**
* Normalizes an address to lowercase for consistent key generation across lock strategies.
*
* @param address - The address to normalize.
* @returns The normalized address.
*/
static normalizeAddress(address: string): string {
return address.toLowerCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { RedisClientType } from 'redis';

import { LockStrategy } from '../../types';
import { LocalLockStrategy } from './LocalLockStrategy';
import { RedisLockStrategy } from './RedisLockStrategy';

/**
* Factory for creating LockStrategy instances.
Expand All @@ -23,9 +24,8 @@ export class LockStrategyFactory {
*/

static create(redisClient: RedisClientType | undefined, logger: Logger): LockStrategy {
// TODO: Remove placeholder errors once strategies are implemented
if (redisClient) {
// throw new Error('Redis lock strategy not yet implemented');
return new RedisLockStrategy(redisClient, logger.child({ name: 'redis-lock-strategy' }));
}

return new LocalLockStrategy(logger);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be updated in the next follow up PR but can we add a name for this logger?

Expand Down
197 changes: 197 additions & 0 deletions packages/relay/src/lib/services/lockService/RedisLockStrategy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// SPDX-License-Identifier: Apache-2.0

import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services';
import { randomUUID } from 'crypto';
import { Logger } from 'pino';
import { RedisClientType } from 'redis';

import { LockStrategy } from '../../types/lock';
import { LockService } from './LockService';

/**
* Redis-based distributed lock strategy implementing FIFO queue semantics.
*
* Uses Redis SET NX + LIST for distributed locking across multiple relay instances.
* Provides automatic TTL-based expiration and polling-based lock acquisition.
*
* @remarks
* - Lock keys: `{prefix}:{address}` stores current holder's session key
* - Queue keys: `{prefix}:queue:{address}` stores FIFO queue of waiters
* - TTL on lock keys provides automatic cleanup on crashes/hangs
*/
export class RedisLockStrategy implements LockStrategy {
private readonly redisClient: RedisClientType;
private readonly logger: Logger;
private readonly maxLockHoldMs: number;
private readonly pollIntervalMs = 50;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I made a misleading comment earlier. I meant we just need the keyPrefix to be a hardcoded value, but pollIntervalMs can still be configurable, right? Also can address in next follow up PR

private readonly keyPrefix = 'lock';

constructor(redisClient: RedisClientType, logger: Logger) {
this.redisClient = redisClient;
this.logger = logger;
this.maxLockHoldMs = ConfigService.get('LOCK_MAX_HOLD_MS');
}

/**
* Acquires a lock for the specified address using FIFO queue semantics.
*
* @param address - The sender address to acquire the lock for (will be normalized).
* @returns A promise that resolves to a unique session key upon successful acquisition, or null if acquisition fails (fail open).
*/
async acquireLock(address: string): Promise<string | null> {
const sessionKey = this.generateSessionKey();
const lockKey = this.getLockKey(address);
const queueKey = this.getQueueKey(address);
const startTime = Date.now();
let joinedQueue = false;

try {
// Join FIFO queue
await this.redisClient.lPush(queueKey, sessionKey);
joinedQueue = true;

if (this.logger.isLevelEnabled('trace')) {
this.logger.trace(`Lock acquisition started: address=${address}, sessionKey=${sessionKey}`);
}

// Poll until first in queue and can acquire lock
while (true) {
// Check if first in line
const firstInQueue = await this.redisClient.lIndex(queueKey, -1);

if (firstInQueue === sessionKey) {
// Try to acquire lock with TTL
const acquired = await this.redisClient.set(lockKey, sessionKey, {
NX: true, // Only set if not exists
PX: this.maxLockHoldMs, // TTL in milliseconds
});

if (acquired) {
const acquisitionDuration = Date.now() - startTime;
const queueLength = await this.redisClient.lLen(queueKey);

this.logger.debug(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's still add the log level guard check to make sure JS doesn't build the string if not necessary

`Lock acquired: address=${address}, sessionKey=${sessionKey}, duration=${acquisitionDuration}ms, queueLength=${queueLength}`,
);

return sessionKey;
}
}

// Wait before checking again
await this.sleep(this.pollIntervalMs);
}
} catch (error) {
this.logger.error(error, `Failed to acquire lock: address=${address}, sessionKey=${sessionKey}. Failing open.`);
return null;
} finally {
// Always remove from queue if we joined it (whether success or failure)
if (joinedQueue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non blocking but looks like we don't need joinedQueue, right? lRem(queueKey, 1, sessionKey) will remove sessionKey if it's there or it will ignore if it's not there, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case that lPush fails we would try to remove from the queue something that's guaranteed to not be there which would be an extra network call to Redis. It's a very small optimization but I suggest we keep it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah that's fair! Thanks for the explanation

await this.removeFromQueue(queueKey, sessionKey, address);
}
}
}

/**
* Releases a lock for the specified address.
* Only succeeds if the session key matches the current lock holder.
*
* @param address - The sender address to release the lock for (will be normalized).
* @param sessionKey - The session key proving ownership of the lock.
*/
async releaseLock(address: string, sessionKey: string): Promise<void> {
const lockKey = this.getLockKey(address);

try {
// Atomic check-and-delete using Lua script
// Only deletes the lock if the session key matches (ownership check)
const result = await this.redisClient.eval(
`
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`,
{
keys: [lockKey],
arguments: [sessionKey],
},
);

if (result === 1) {
this.logger.debug(`Lock released: address=${address}, sessionKey=${sessionKey}`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's still add the log level guard check to make sure JS doesn't build the string if not necessary

} else {
// Lock was already released or owned by someone else - ignore
if (this.logger.isLevelEnabled('trace')) {
this.logger.trace(
`Lock release ignored (not owner or already released): address=${address}, sessionKey=${sessionKey}`,
);
}
}
} catch (error) {
this.logger.error(error, `Failed to release lock: address=${address}, sessionKey=${sessionKey}`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this only warn, not an error? I don't think it's critical enough to be an error yeah?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would likely be a Redis error due to a operational failure. I suggest we keep it as error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah okay that works too

// Don't throw - release failures should not block the caller
}
}

/**
* Generates the Redis key for a lock.
* Automatically normalizes the address to ensure consistency.
*
* @param address - The sender address (will be normalized to lowercase).
* @returns The Redis lock key.
*/
private getLockKey(address: string): string {
const normalizedAddress = LockService.normalizeAddress(address);
return `${this.keyPrefix}:${normalizedAddress}`;
}

/**
* Generates the Redis key for a lock queue.
* Automatically normalizes the address to ensure consistency.
*
* @param address - The sender address (will be normalized to lowercase).
* @returns The Redis queue key.
*/
private getQueueKey(address: string): string {
const normalizedAddress = LockService.normalizeAddress(address);
return `${this.keyPrefix}:queue:${normalizedAddress}`;
}

/**
* Removes a session key from the queue.
* Used for cleanup after successful acquisition or on error.
*
* @param queueKey - The queue key.
* @param sessionKey - The session key to remove.
* @param address - The address (for logging).
*/
private async removeFromQueue(queueKey: string, sessionKey: string, address: string): Promise<void> {
try {
await this.redisClient.lRem(queueKey, 1, sessionKey);
this.logger.trace(`Removed from queue: address=${address}, sessionKey=${sessionKey}`);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's still add the log level guard check to make sure JS doesn't build the string if not necessary

} catch (error) {
this.logger.error(error, `Failed to remove from queue: address=${address}, sessionKey=${sessionKey}`);
Copy link
Contributor

@quiet-node quiet-node Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this as warn level, not an error? I don't think it's critical enough to be an error yeah?

}
}

/**
* Generates a unique session key for lock acquisition.
* Protected to allow test mocking.
*
* @returns A unique session key.
*/
protected generateSessionKey(): string {
return randomUUID();
}

Comment on lines +192 to +195
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think randomUUID() itself is clean and compact enough, and it doesn't need a wrapper method for it. Should we just directly use it to avoid indirection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

crypto.randomUUID cannot be stubbed directly in the tests and results in the following error TypeError: Descriptor for property randomUUID is non-configurable and non-writable. This wrapper was added as a workaround to address that limitation. Let me know if you have a better approach for this scenario

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see thanks

/**
* Sleeps for the specified duration.
*
* @param ms - Duration in milliseconds.
*/
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
Loading