Skip to content

Commit 75487d4

Browse files
author
Andrei Marculescu
committed
#84: adding support for rabbit mq virtual hosts. Adding WorkerId to StateMachineRefHeader to avoid performance issue with context based messages (without the worker id, the message goes to all the workers of the statemachine. Adding message ids to all the events (context based or not) to be able to use the api against a xcomponent cluster
1 parent 49c0750 commit 75487d4

File tree

16 files changed

+323
-309
lines changed

16 files changed

+323
-309
lines changed

ReactiveXComponent/Common/Header.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using System;
2+
13
namespace ReactiveXComponent.Common
24
{
35
public class Header
@@ -17,5 +19,7 @@ public class Header
1719
public string PublishTopic { get; set; }
1820

1921
public string ErrorMessage { get; set; }
22+
23+
public string MessageId { get; set; } = Guid.NewGuid().ToString();
2024
}
2125
}

ReactiveXComponent/Common/HeaderElement.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,7 @@ public static class HeaderElement
1313
public const string SessionData = "SessionData";
1414
public const string EventType = "EventType";
1515
public const string ErrorMessage = "ErrorMessage";
16+
public const string MessageId = "MessageId";
17+
public const string WorkerId = "WorkerId";
1618
}
1719
}
Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,10 @@
11

2+
using System;
3+
24
namespace ReactiveXComponent.Common
35
{
46
public class StateMachineRefHeader
57
{
6-
protected bool Equals(StateMachineRefHeader other)
7-
{
8-
return StateMachineId == other.StateMachineId && StateCode == other.StateCode && StateMachineCode == other.StateMachineCode && ComponentCode == other.ComponentCode && string.Equals(MessageType, other.MessageType) && string.Equals(PrivateTopic, other.PrivateTopic) && string.Equals(SessionData, other.SessionData) && string.Equals(ErrorMessage, other.ErrorMessage);
9-
}
10-
11-
public override bool Equals(object obj)
12-
{
13-
if (ReferenceEquals(null, obj)) return false;
14-
if (ReferenceEquals(this, obj)) return true;
15-
if (obj.GetType() != this.GetType()) return false;
16-
return Equals((StateMachineRefHeader) obj);
17-
}
18-
19-
public override int GetHashCode()
20-
{
21-
unchecked
22-
{
23-
var hashCode = (StateMachineId != null ? StateMachineId.GetHashCode() : 0);
24-
hashCode = (hashCode * 397) ^ StateCode;
25-
hashCode = (hashCode * 397) ^ StateMachineCode;
26-
hashCode = (hashCode * 397) ^ ComponentCode;
27-
hashCode = (hashCode * 397) ^ (MessageType != null ? MessageType.GetHashCode() : 0);
28-
hashCode = (hashCode * 397) ^ (PrivateTopic != null ? PrivateTopic.GetHashCode() : 0);
29-
hashCode = (hashCode * 397) ^ (SessionData != null ? SessionData.GetHashCode() : 0);
30-
hashCode = (hashCode * 397) ^ (ErrorMessage != null ? ErrorMessage.GetHashCode() : 0);
31-
return hashCode;
32-
}
33-
}
34-
358
public string StateMachineId { get; set; }
369

3710
public int StateCode { get; set; }
@@ -47,5 +20,9 @@ public override int GetHashCode()
4720
public string SessionData { get; set; }
4821

4922
public string ErrorMessage { get; set; }
23+
24+
public string MessageId { get; set; } = Guid.NewGuid().ToString();
25+
26+
public int WorkerId { get; set; }
5027
}
5128
}

ReactiveXComponent/Configuration/BusDetails.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ public BusDetails()
88

99
}
1010

11-
public BusDetails(string username, string password, string host, int port)
11+
public BusDetails(string username, string password, string host, string virtualHost, int port)
1212
{
1313
Username = username;
1414
Password = password;
1515
Host = host;
16+
VirtualHost = virtualHost;
1617
Port = port;
18+
1719
}
1820

1921
public string Username { get; set; }
@@ -22,14 +24,17 @@ public BusDetails(string username, string password, string host, int port)
2224

2325
public string Host { get; set; }
2426

27+
public string VirtualHost { get; set; }
28+
2529
public int Port { get; set; }
2630

2731
public BusDetails Clone()
2832
{
2933
return new BusDetails(
30-
(string)Username?.Clone(),
31-
(string)Password?.Clone(),
32-
(string)Host?.Clone(),
34+
Username,
35+
Password,
36+
Host,
37+
VirtualHost,
3338
Port);
3439
}
3540
}

ReactiveXComponent/Configuration/ConfigurationOverrides.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ public class ConfigurationOverrides
66
{
77
public string Host { get; set; }
88

9+
public string VirtualHost { get; set; }
10+
911
public string Port { get; set; }
1012

1113
public string Username { get; set; }

ReactiveXComponent/Parser/XCApiConfigParser.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ public BusDetails GetBusDetails()
186186
busInfos?.Attribute("user")?.Value,
187187
busInfos?.Attribute("password")?.Value,
188188
busInfos?.Attribute("host")?.Value,
189+
busInfos?.Attribute("virtualHost")?.Value,
189190
Convert.ToInt32(busInfos?.Attribute("port")?.Value));
190191

191192
return busDetails;

ReactiveXComponent/RabbitMq/RabbitMqConnection.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ public IXCSession CreateSession(ConfigurationOverrides configurationOverrides =
3030
busDetails.Host = configurationOverrides.Host;
3131
}
3232

33+
if (!string.IsNullOrEmpty(configurationOverrides.VirtualHost))
34+
{
35+
busDetails.VirtualHost = configurationOverrides.VirtualHost;
36+
}
37+
3338
if (configurationOverrides.Port != null)
3439
{
3540
busDetails.Port = int.Parse(configurationOverrides.Port);

ReactiveXComponent/RabbitMq/RabbitMqHeaderConverter.cs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ public static Dictionary<string, object> ConvertHeader(Header header)
2121
{HeaderElement.IncomingEventType, header?.IncomingEventType},
2222
{HeaderElement.PublishTopic, header?.PublishTopic != null ? encoding.GetBytes(header.PublishTopic) : encoding.GetBytes(string.Empty) },
2323
{HeaderElement.MessageType, header?.MessageType != null ? encoding.GetBytes(header.MessageType) : encoding.GetBytes(string.Empty)},
24-
{HeaderElement.ErrorMessage, header?.ErrorMessage != null ? encoding.GetBytes(header.ErrorMessage) : encoding.GetBytes(string.Empty)}
24+
{HeaderElement.ErrorMessage, header?.ErrorMessage != null ? encoding.GetBytes(header.ErrorMessage) : encoding.GetBytes(string.Empty)},
25+
{HeaderElement.MessageId, header?.MessageId != null ? encoding.GetBytes(header.MessageId) : encoding.GetBytes(string.Empty) },
26+
27+
2528
};
2629
return dico;
2730
}
@@ -40,7 +43,10 @@ public static Dictionary<string, object> CreateHeaderFromStateMachineRefHeader(S
4043
{HeaderElement.MessageType, stateMachineRefHeader?.MessageType != null ? encoding.GetBytes(stateMachineRefHeader.MessageType) : encoding.GetBytes(string.Empty)},
4144
{HeaderElement.EventType, eventCode},
4245
{HeaderElement.IncomingEventType, (int)incomingEventType},
43-
{HeaderElement.ErrorMessage, stateMachineRefHeader?.ErrorMessage != null ? encoding.GetBytes(stateMachineRefHeader.ErrorMessage) : encoding.GetBytes(string.Empty)}
46+
{HeaderElement.ErrorMessage, stateMachineRefHeader?.ErrorMessage != null ? encoding.GetBytes(stateMachineRefHeader.ErrorMessage) : encoding.GetBytes(string.Empty)},
47+
{HeaderElement.MessageId, stateMachineRefHeader?.MessageId != null ? encoding.GetBytes(stateMachineRefHeader.MessageId) : encoding.GetBytes(string.Empty) },
48+
{HeaderElement.WorkerId, stateMachineRefHeader?.WorkerId},
49+
4450
};
4551
return dico;
4652
}
@@ -56,6 +62,8 @@ public static StateMachineRefHeader ConvertStateMachineRefHeader(IDictionary<str
5662
var messageType = string.Empty;
5763
var sessionData = string.Empty;
5864
var errorMessage = string.Empty;
65+
var messageId = string.Empty;
66+
var workerId = -1;
5967

6068
if (stateMachineRefHeader.ContainsKey(HeaderElement.StateMachineId) && stateMachineRefHeader[HeaderElement.StateMachineId] != null)
6169
stateMachineId = Encoding.UTF8.GetString((byte[])stateMachineRefHeader[HeaderElement.StateMachineId]);
@@ -73,6 +81,10 @@ public static StateMachineRefHeader ConvertStateMachineRefHeader(IDictionary<str
7381
sessionData = encoding.GetString(stateMachineRefHeader[HeaderElement.SessionData] as byte[]);
7482
if (stateMachineRefHeader.ContainsKey(HeaderElement.ErrorMessage))
7583
errorMessage = encoding.GetString(stateMachineRefHeader[HeaderElement.ErrorMessage] as byte[]);
84+
if (stateMachineRefHeader.ContainsKey(HeaderElement.MessageId))
85+
messageId = encoding.GetString(stateMachineRefHeader[HeaderElement.MessageId] as byte[]);
86+
if (stateMachineRefHeader.ContainsKey(HeaderElement.WorkerId))
87+
workerId = Convert.ToInt32(stateMachineRefHeader[HeaderElement.WorkerId]);
7688

7789
return new StateMachineRefHeader()
7890
{
@@ -83,7 +95,9 @@ public static StateMachineRefHeader ConvertStateMachineRefHeader(IDictionary<str
8395
MessageType = messageType,
8496
PrivateTopic = publishTopic,
8597
SessionData = sessionData,
86-
ErrorMessage = errorMessage
98+
ErrorMessage = errorMessage,
99+
MessageId = messageId,
100+
WorkerId = workerId,
87101
};
88102
}
89103
}

ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ private Header CreateHeader(string component, string stateMachine, object messag
110110
StateCode = defaultValue,
111111
EventCode = _configuration.GetPublisherEventCode(messageType),
112112
IncomingEventType = (int)IncomingEventType.Transition,
113-
PublishTopic = visibility == Visibility.Private && !string.IsNullOrEmpty(_privateCommunicationIdentifier)? _privateCommunicationIdentifier : string.Empty
113+
PublishTopic = visibility == Visibility.Private && !string.IsNullOrEmpty(_privateCommunicationIdentifier)? _privateCommunicationIdentifier : string.Empty,
114114
};
115115

116116
return header;
@@ -133,7 +133,9 @@ private StateMachineRefHeader CreateStateMachineRefHeader(StateMachineRefHeader
133133
StateMachineCode = stateMachineRefHeader.StateMachineCode,
134134
ComponentCode = stateMachineRefHeader.ComponentCode,
135135
MessageType = messageType,
136-
PrivateTopic = visibility == Visibility.Private && !string.IsNullOrEmpty(_privateCommunicationIdentifier) ? _privateCommunicationIdentifier : string.Empty
136+
PrivateTopic = visibility == Visibility.Private && !string.IsNullOrEmpty(_privateCommunicationIdentifier) ? _privateCommunicationIdentifier : string.Empty,
137+
MessageId = stateMachineRefHeader.MessageId,
138+
WorkerId = stateMachineRefHeader.WorkerId,
137139
};
138140

139141
return stateMachineRefheader;

ReactiveXComponent/RabbitMq/RabbitMqSession.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ private void InitConnection(BusDetails busDetails)
3333
{
3434
UserName = busDetails.Username ?? XCApiTags.DefaultUserName,
3535
Password = busDetails.Password ?? XCApiTags.DefaultPassword,
36-
VirtualHost = ConnectionFactory.DefaultVHost,
36+
VirtualHost = string.IsNullOrEmpty(busDetails.VirtualHost) ? ConnectionFactory.DefaultVHost : busDetails.VirtualHost,
3737
HostName = busDetails.Host,
3838
Port = busDetails.Port,
3939
Protocol = Protocols.DefaultProtocol

0 commit comments

Comments
 (0)