Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions lib/redis.client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices';
import { Injectable, Logger } from '@nestjs/common';
import { CONNECT_EVENT, ERROR_EVENT } from '@nestjs/microservices/constants';
import { RedisEventsMap } from '@nestjs/microservices/events/redis.events';
import { ClientConstructorOptions, RedisInstance } from './interfaces';
import { createRedisConnection } from './redis.utils';
import { RequestsMap } from './requests-map';
Expand Down Expand Up @@ -37,7 +37,7 @@ export class RedisStreamClient extends ClientProxy {
this.handleError(this.redis);

// when server instance connect, bind handlers.
this.redis.on(CONNECT_EVENT, () => {
this.redis.on(RedisEventsMap.CONNECT, () => {
this.logger.log(
'Redis Client Responses Listener connected successfully on ' +
(this.options.connection?.url ??
Expand Down Expand Up @@ -69,7 +69,7 @@ export class RedisStreamClient extends ClientProxy {

this.client = createRedisConnection(this.options?.connection);
this.connection = await firstValueFrom(
this.connect$(this.client, ERROR_EVENT, CONNECT_EVENT).pipe(share()),
this.connect$(this.client, RedisEventsMap.ERROR, RedisEventsMap.CONNECT).pipe(share()),
);
this.handleError(this.client);
return this.connection;
Expand Down Expand Up @@ -408,9 +408,13 @@ export class RedisStreamClient extends ClientProxy {
}

public handleError(stream: any) {
stream.on(ERROR_EVENT, (err: any) => {
stream.on(RedisEventsMap.ERROR, (err: any) => {
this.logger.error('Redis Streams Client ' + err);
this.close();
});
}

public unwrap<T>(): T {
return this.client as T;
}
}
32 changes: 23 additions & 9 deletions lib/redis.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
} from './interfaces';

import { createRedisConnection } from './redis.utils';
import { CONNECT_EVENT, ERROR_EVENT } from '@nestjs/microservices/constants';
import { RedisEventsMap } from '@nestjs/microservices/events/redis.events';
import { deserialize, serialize } from './streams.utils';
import { RedisStreamContext } from './stream.context';
import { Observable } from 'rxjs';
Expand Down Expand Up @@ -37,7 +37,7 @@ export class RedisStreamStrategy
this.handleError(this.client);

// when server instance connect, bind handlers.
this.redis.on(CONNECT_EVENT, () => {
this.redis.on(RedisEventsMap.CONNECT, () => {
this.logger.log(
'Redis connected successfully on ' +
(this.options.connection?.url ??
Expand Down Expand Up @@ -150,13 +150,13 @@ export class RedisStreamStrategy
if (!this.client) throw new Error('Redis client instance not found.');

const commandArgs: RedisValue[] = [];
if(this.options.streams?.maxLen){
commandArgs.push("MAXLEN")
commandArgs.push("~")
commandArgs.push(this.options.streams.maxLen.toString())
if (this.options.streams?.maxLen) {
commandArgs.push('MAXLEN');
commandArgs.push('~');
commandArgs.push(this.options.streams.maxLen.toString());
}
commandArgs.push("*")
commandArgs.push('*');

await this.client.xadd(
responseObj.stream,
...commandArgs,
Expand Down Expand Up @@ -339,7 +339,7 @@ export class RedisStreamStrategy

// for redis instances. need to add mechanism to try to connect back.
public handleError(stream: any) {
stream.on(ERROR_EVENT, (err: any) => {
stream.on(RedisEventsMap.ERROR, (err: any) => {
this.logger.error('Redis instance error: ' + err);
this.close();
});
Expand All @@ -350,4 +350,18 @@ export class RedisStreamStrategy
this.redis && this.redis.quit();
this.client && this.client.quit();
}

public on<
EventKey extends string = string,
EventCallback extends Function = Function,
>(event: EventKey, callback: EventCallback): this {
if (this.redis) {
this.redis.on(event, callback as unknown as (...args: unknown[]) => void);
}
return this;
}

public unwrap<T>(): T {
return this.redis as T;
}
}
4 changes: 2 additions & 2 deletions lib/streams.utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { RedisStreamContext } from './stream.context';
import { Logger } from '@nestjs/common';
import { v4 as uuidv4 } from 'uuid';
import { randomUUID } from 'node:crypto';

let logger = new Logger('RedisStreams/streams-utils');

Expand Down Expand Up @@ -87,5 +87,5 @@ export function stringifyMessage(messageObj: Record<string, string>): string[] {
}

export function generateCorrelationId() {
return uuidv4();
return randomUUID();
}
Loading
Loading