Skip to content

Commit 7ab9c10

Browse files
author
Andrei Marculescu
authored
Merge pull request #85 from xcomponent/ISSUE-84
Adding support for rabbitmq virtual hosts. Adding WorkerId and MessageIds
2 parents 49c0750 + 75487d4 commit 7ab9c10

File tree

16 files changed

+323
-309
lines changed

16 files changed

+323
-309
lines changed

ReactiveXComponent/Common/Header.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using System;
2+
13
namespace ReactiveXComponent.Common
24
{
35
public class Header
@@ -17,5 +19,7 @@ public class Header
1719
public string PublishTopic { get; set; }
1820

1921
public string ErrorMessage { get; set; }
22+
23+
public string MessageId { get; set; } = Guid.NewGuid().ToString();
2024
}
2125
}

ReactiveXComponent/Common/HeaderElement.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,7 @@ public static class HeaderElement
1313
public const string SessionData = "SessionData";
1414
public const string EventType = "EventType";
1515
public const string ErrorMessage = "ErrorMessage";
16+
public const string MessageId = "MessageId";
17+
public const string WorkerId = "WorkerId";
1618
}
1719
}
Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,10 @@
11

2+
using System;
3+
24
namespace ReactiveXComponent.Common
35
{
46
public class StateMachineRefHeader
57
{
6-
protected bool Equals(StateMachineRefHeader other)
7-
{
8-
return StateMachineId == other.StateMachineId && StateCode == other.StateCode && StateMachineCode == other.StateMachineCode && ComponentCode == other.ComponentCode && string.Equals(MessageType, other.MessageType) && string.Equals(PrivateTopic, other.PrivateTopic) && string.Equals(SessionData, other.SessionData) && string.Equals(ErrorMessage, other.ErrorMessage);
9-
}
10-
11-
public override bool Equals(object obj)
12-
{
13-
if (ReferenceEquals(null, obj)) return false;
14-
if (ReferenceEquals(this, obj)) return true;
15-
if (obj.GetType() != this.GetType()) return false;
16-
return Equals((StateMachineRefHeader) obj);
17-
}
18-
19-
public override int GetHashCode()
20-
{
21-
unchecked
22-
{
23-
var hashCode = (StateMachineId != null ? StateMachineId.GetHashCode() : 0);
24-
hashCode = (hashCode * 397) ^ StateCode;
25-
hashCode = (hashCode * 397) ^ StateMachineCode;
26-
hashCode = (hashCode * 397) ^ ComponentCode;
27-
hashCode = (hashCode * 397) ^ (MessageType != null ? MessageType.GetHashCode() : 0);
28-
hashCode = (hashCode * 397) ^ (PrivateTopic != null ? PrivateTopic.GetHashCode() : 0);
29-
hashCode = (hashCode * 397) ^ (SessionData != null ? SessionData.GetHashCode() : 0);
30-
hashCode = (hashCode * 397) ^ (ErrorMessage != null ? ErrorMessage.GetHashCode() : 0);
31-
return hashCode;
32-
}
33-
}
34-
358
public string StateMachineId { get; set; }
369

3710
public int StateCode { get; set; }
@@ -47,5 +20,9 @@ public override int GetHashCode()
4720
public string SessionData { get; set; }
4821

4922
public string ErrorMessage { get; set; }
23+
24+
public string MessageId { get; set; } = Guid.NewGuid().ToString();
25+
26+
public int WorkerId { get; set; }
5027
}
5128
}

ReactiveXComponent/Configuration/BusDetails.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ public BusDetails()
88

99
}
1010

11-
public BusDetails(string username, string password, string host, int port)
11+
public BusDetails(string username, string password, string host, string virtualHost, int port)
1212
{
1313
Username = username;
1414
Password = password;
1515
Host = host;
16+
VirtualHost = virtualHost;
1617
Port = port;
18+
1719
}
1820

1921
public string Username { get; set; }
@@ -22,14 +24,17 @@ public BusDetails(string username, string password, string host, int port)
2224

2325
public string Host { get; set; }
2426

27+
public string VirtualHost { get; set; }
28+
2529
public int Port { get; set; }
2630

2731
public BusDetails Clone()
2832
{
2933
return new BusDetails(
30-
(string)Username?.Clone(),
31-
(string)Password?.Clone(),
32-
(string)Host?.Clone(),
34+
Username,
35+
Password,
36+
Host,
37+
VirtualHost,
3338
Port);
3439
}
3540
}

ReactiveXComponent/Configuration/ConfigurationOverrides.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ public class ConfigurationOverrides
66
{
77
public string Host { get; set; }
88

9+
public string VirtualHost { get; set; }
10+
911
public string Port { get; set; }
1012

1113
public string Username { get; set; }

ReactiveXComponent/Parser/XCApiConfigParser.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ public BusDetails GetBusDetails()
186186
busInfos?.Attribute("user")?.Value,
187187
busInfos?.Attribute("password")?.Value,
188188
busInfos?.Attribute("host")?.Value,
189+
busInfos?.Attribute("virtualHost")?.Value,
189190
Convert.ToInt32(busInfos?.Attribute("port")?.Value));
190191

191192
return busDetails;

ReactiveXComponent/RabbitMq/RabbitMqConnection.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ public IXCSession CreateSession(ConfigurationOverrides configurationOverrides =
3030
busDetails.Host = configurationOverrides.Host;
3131
}
3232

33+
if (!string.IsNullOrEmpty(configurationOverrides.VirtualHost))
34+
{
35+
busDetails.VirtualHost = configurationOverrides.VirtualHost;
36+
}
37+
3338
if (configurationOverrides.Port != null)
3439
{
3540
busDetails.Port = int.Parse(configurationOverrides.Port);

ReactiveXComponent/RabbitMq/RabbitMqHeaderConverter.cs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ public static Dictionary<string, object> ConvertHeader(Header header)
2121
{HeaderElement.IncomingEventType, header?.IncomingEventType},
2222
{HeaderElement.PublishTopic, header?.PublishTopic != null ? encoding.GetBytes(header.PublishTopic) : encoding.GetBytes(string.Empty) },
2323
{HeaderElement.MessageType, header?.MessageType != null ? encoding.GetBytes(header.MessageType) : encoding.GetBytes(string.Empty)},
24-
{HeaderElement.ErrorMessage, header?.ErrorMessage != null ? encoding.GetBytes(header.ErrorMessage) : encoding.GetBytes(string.Empty)}
24+
{HeaderElement.ErrorMessage, header?.ErrorMessage != null ? encoding.GetBytes(header.ErrorMessage) : encoding.GetBytes(string.Empty)},
25+
{HeaderElement.MessageId, header?.MessageId != null ? encoding.GetBytes(header.MessageId) : encoding.GetBytes(string.Empty) },
26+
27+
2528
};
2629
return dico;
2730
}
@@ -40,7 +43,10 @@ public static Dictionary<string, object> CreateHeaderFromStateMachineRefHeader(S
4043
{HeaderElement.MessageType, stateMachineRefHeader?.MessageType != null ? encoding.GetBytes(stateMachineRefHeader.MessageType) : encoding.GetBytes(string.Empty)},
4144
{HeaderElement.EventType, eventCode},
4245
{HeaderElement.IncomingEventType, (int)incomingEventType},
43-
{HeaderElement.ErrorMessage, stateMachineRefHeader?.ErrorMessage != null ? encoding.GetBytes(stateMachineRefHeader.ErrorMessage) : encoding.GetBytes(string.Empty)}
46+
{HeaderElement.ErrorMessage, stateMachineRefHeader?.ErrorMessage != null ? encoding.GetBytes(stateMachineRefHeader.ErrorMessage) : encoding.GetBytes(string.Empty)},
47+
{HeaderElement.MessageId, stateMachineRefHeader?.MessageId != null ? encoding.GetBytes(stateMachineRefHeader.MessageId) : encoding.GetBytes(string.Empty) },
48+
{HeaderElement.WorkerId, stateMachineRefHeader?.WorkerId},
49+
4450
};
4551
return dico;
4652
}
@@ -56,6 +62,8 @@ public static StateMachineRefHeader ConvertStateMachineRefHeader(IDictionary<str
5662
var messageType = string.Empty;
5763
var sessionData = string.Empty;
5864
var errorMessage = string.Empty;
65+
var messageId = string.Empty;
66+
var workerId = -1;
5967

6068
if (stateMachineRefHeader.ContainsKey(HeaderElement.StateMachineId) && stateMachineRefHeader[HeaderElement.StateMachineId] != null)
6169
stateMachineId = Encoding.UTF8.GetString((byte[])stateMachineRefHeader[HeaderElement.StateMachineId]);
@@ -73,6 +81,10 @@ public static StateMachineRefHeader ConvertStateMachineRefHeader(IDictionary<str
7381
sessionData = encoding.GetString(stateMachineRefHeader[HeaderElement.SessionData] as byte[]);
7482
if (stateMachineRefHeader.ContainsKey(HeaderElement.ErrorMessage))
7583
errorMessage = encoding.GetString(stateMachineRefHeader[HeaderElement.ErrorMessage] as byte[]);
84+
if (stateMachineRefHeader.ContainsKey(HeaderElement.MessageId))
85+
messageId = encoding.GetString(stateMachineRefHeader[HeaderElement.MessageId] as byte[]);
86+
if (stateMachineRefHeader.ContainsKey(HeaderElement.WorkerId))
87+
workerId = Convert.ToInt32(stateMachineRefHeader[HeaderElement.WorkerId]);
7688

7789
return new StateMachineRefHeader()
7890
{
@@ -83,7 +95,9 @@ public static StateMachineRefHeader ConvertStateMachineRefHeader(IDictionary<str
8395
MessageType = messageType,
8496
PrivateTopic = publishTopic,
8597
SessionData = sessionData,
86-
ErrorMessage = errorMessage
98+
ErrorMessage = errorMessage,
99+
MessageId = messageId,
100+
WorkerId = workerId,
87101
};
88102
}
89103
}

ReactiveXComponent/RabbitMq/RabbitMqPublisher.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ private Header CreateHeader(string component, string stateMachine, object messag
110110
StateCode = defaultValue,
111111
EventCode = _configuration.GetPublisherEventCode(messageType),
112112
IncomingEventType = (int)IncomingEventType.Transition,
113-
PublishTopic = visibility == Visibility.Private && !string.IsNullOrEmpty(_privateCommunicationIdentifier)? _privateCommunicationIdentifier : string.Empty
113+
PublishTopic = visibility == Visibility.Private && !string.IsNullOrEmpty(_privateCommunicationIdentifier)? _privateCommunicationIdentifier : string.Empty,
114114
};
115115

116116
return header;
@@ -133,7 +133,9 @@ private StateMachineRefHeader CreateStateMachineRefHeader(StateMachineRefHeader
133133
StateMachineCode = stateMachineRefHeader.StateMachineCode,
134134
ComponentCode = stateMachineRefHeader.ComponentCode,
135135
MessageType = messageType,
136-
PrivateTopic = visibility == Visibility.Private && !string.IsNullOrEmpty(_privateCommunicationIdentifier) ? _privateCommunicationIdentifier : string.Empty
136+
PrivateTopic = visibility == Visibility.Private && !string.IsNullOrEmpty(_privateCommunicationIdentifier) ? _privateCommunicationIdentifier : string.Empty,
137+
MessageId = stateMachineRefHeader.MessageId,
138+
WorkerId = stateMachineRefHeader.WorkerId,
137139
};
138140

139141
return stateMachineRefheader;

ReactiveXComponent/RabbitMq/RabbitMqSession.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ private void InitConnection(BusDetails busDetails)
3333
{
3434
UserName = busDetails.Username ?? XCApiTags.DefaultUserName,
3535
Password = busDetails.Password ?? XCApiTags.DefaultPassword,
36-
VirtualHost = ConnectionFactory.DefaultVHost,
36+
VirtualHost = string.IsNullOrEmpty(busDetails.VirtualHost) ? ConnectionFactory.DefaultVHost : busDetails.VirtualHost,
3737
HostName = busDetails.Host,
3838
Port = busDetails.Port,
3939
Protocol = Protocols.DefaultProtocol

0 commit comments

Comments
 (0)