Skip to content

Commit 8cd99fe

Browse files
committed
unskip reliable topic tests
1 parent a10f08f commit 8cd99fe

File tree

9 files changed

+48
-36
lines changed

9 files changed

+48
-36
lines changed

src/Address.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
import {IdentifiedDataSerializable} from './serialization/Serializable';
1818
import {DataInput, DataOutput} from './serialization/Data';
19-
import {ADDRESS_CLASS_ID, CLUSTER_DATA_FACTORY_ID} from './ClusterDataFactory';
2019
import * as net from 'net';
20+
import {ClusterDataFactoryHelper} from './ClusterDataFactoryHelper';
2121

2222
class Address implements IdentifiedDataSerializable {
2323

@@ -48,11 +48,11 @@ class Address implements IdentifiedDataSerializable {
4848
}
4949

5050
getFactoryId(): number {
51-
return CLUSTER_DATA_FACTORY_ID;
51+
return ClusterDataFactoryHelper.FACTORY_ID;
5252
}
5353

5454
getClassId(): number {
55-
return ADDRESS_CLASS_ID;
55+
return ClusterDataFactoryHelper.ADDRESS_ID;
5656
}
5757

5858
equals(other: Address): boolean {

src/ClusterDataFactory.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,12 @@
1616

1717
import {IdentifiedDataSerializableFactory, IdentifiedDataSerializable} from './serialization/Serializable';
1818
import Address = require('./Address');
19-
20-
export const ADDRESS_CLASS_ID = 1;
21-
export const CLUSTER_DATA_FACTORY_ID = 0;
19+
import {ClusterDataFactoryHelper} from './ClusterDataFactoryHelper';
2220

2321
export class ClusterDataFactory implements IdentifiedDataSerializableFactory {
2422

2523
create(type: number): IdentifiedDataSerializable {
26-
27-
if (type === ADDRESS_CLASS_ID) {
24+
if (type === ClusterDataFactoryHelper.ADDRESS_ID) {
2825
return new Address();
2926
}
3027

src/ClusterDataFactoryHelper.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
export class ClusterDataFactoryHelper {
18+
static readonly FACTORY_ID = 0;
19+
static readonly ADDRESS_ID = 1;
20+
}

src/proxy/RingbufferProxy.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ export class RingbufferProxy<E> extends PartitionSpecificProxy implements IRingb
9191
.then<Array<E>>((raw: any) => {
9292
return raw['items'].map((r: Data) => {
9393
return this.toObject(r);
94-
});
94+
}, this);
9595
});
9696
}
9797
}

src/proxy/topic/RawTopicMessage.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ export const RELIABLE_TOPIC_MESSAGE_FACTORY_ID = -18;
2323
export const RELIABLE_TOPIC_CLASS_ID = 2;
2424

2525
export class RawTopicMessage implements IdentifiedDataSerializable {
26-
27-
2826
publishTime: Long;
2927
publisherAddress: Address;
3028
payload: Data;
@@ -52,11 +50,9 @@ export class RawTopicMessage implements IdentifiedDataSerializable {
5250

5351
export class ReliableTopicMessageFactory implements IdentifiedDataSerializableFactory {
5452
create(type: number): IdentifiedDataSerializable {
55-
5653
if (type === RELIABLE_TOPIC_CLASS_ID) {
5754
return new RawTopicMessage();
5855
}
59-
6056
return null;
6157
}
6258
}

src/proxy/topic/ReliableTopicListenerRunner.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ export class ReliableTopicListenerRunner<E> {
5454
this.ringbuffer.readMany(this.sequenceNumber, 1, this.batchSize).then((result: Array<RawTopicMessage>) => {
5555
if (!this.cancelled) {
5656
result.forEach((raw: RawTopicMessage) => {
57-
var msg = new TopicMessage<E>();
57+
let msg = new TopicMessage<E>();
5858
msg.messageObject = this.serializationService.toObject(raw.payload);
5959
msg.publisher = raw.publisherAddress;
6060
msg.publishingTime = raw.publishTime;
@@ -80,7 +80,7 @@ export class ReliableTopicListenerRunner<E> {
8080
return;
8181
}
8282

83-
var message = 'Listener of topic "' + this.proxy.getName() + '" caught an exception, terminating listener';
83+
var message = 'Listener of topic "' + this.proxy.getName() + '" caught an exception, terminating listener. ' + e;
8484
this.loggingService.warn('ReliableTopicListenerRunner', message);
8585

8686
this.proxy.removeMessageListener(this.listenerId);

src/proxy/topic/TopicMessageListener.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
import {TopicMessage} from './TopicMessage';
1617

1718
export interface TopicMessageListener<E> {
18-
(item: E): void;
19+
(message: TopicMessage<E>): void;
1920
}

src/serialization/SerializationService.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ import {PortableSerializer} from './portable/PortableSerializer';
3030
import {IdentifiedDataSerializableFactory} from './Serializable';
3131
import * as DefaultPredicates from './DefaultPredicates';
3232
import {PredicateFactory, PREDICATE_FACTORY_ID} from './PredicateFactory';
33-
import {PartitionAware} from '../core/PartitionAware';
3433
import {RELIABLE_TOPIC_MESSAGE_FACTORY_ID, ReliableTopicMessageFactory} from '../proxy/topic/RawTopicMessage';
35-
import {CLUSTER_DATA_FACTORY_ID, ClusterDataFactory} from '../ClusterDataFactory';
34+
import {ClusterDataFactoryHelper} from '../ClusterDataFactoryHelper';
35+
import {ClusterDataFactory} from '../ClusterDataFactory';
3636

3737
export interface SerializationService {
3838
toData(object: any, paritioningStrategy?: any) : Data;
@@ -257,7 +257,7 @@ export class SerializationServiceV1 implements SerializationService {
257257
}
258258
factories[PREDICATE_FACTORY_ID] = new PredicateFactory(DefaultPredicates);
259259
factories[RELIABLE_TOPIC_MESSAGE_FACTORY_ID] = new ReliableTopicMessageFactory();
260-
factories[CLUSTER_DATA_FACTORY_ID] = new ClusterDataFactory();
260+
factories[ClusterDataFactoryHelper.FACTORY_ID] = new ClusterDataFactory();
261261
this.registerSerializer('identified', new IdentifiedDataSerializableSerializer(factories));
262262
}
263263

test/topic/TopicTest.js

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ var Controller = require('./../RC');
2424
var RawTopicMessage = require('../../lib/proxy/topic/RawTopicMessage').RawTopicMessage;
2525
var fs = require('fs');
2626
var Long = require('long');
27-
2827
var Promise = require('bluebird');
2928

3029
var createConfig = function () {
@@ -53,7 +52,7 @@ var generateItems = function (client, howMany) {
5352
}
5453
return all;
5554
};
56-
describe.skip("Reliable Topic Proxy", function () {
55+
describe("Reliable Topic Proxy", function () {
5756

5857
var cluster;
5958
var clientOne;
@@ -68,7 +67,6 @@ describe.skip("Reliable Topic Proxy", function () {
6867
cluster = response;
6968
return Controller.startMember(cluster.id);
7069
}).then(function () {
71-
7270
var config = createConfig();
7371

7472
return Promise.all([
@@ -90,8 +88,9 @@ describe.skip("Reliable Topic Proxy", function () {
9088
});
9189

9290
it("writes and reads messages", function (done) {
93-
var topicOne = clientOne.getReliableTopic("t");
94-
var topicTwo = clientTwo.getReliableTopic("t");
91+
var topicName = 't' + Math.random();
92+
var topicOne = clientOne.getReliableTopic(topicName);
93+
var topicTwo = clientTwo.getReliableTopic(topicName);
9594

9695
topicTwo.addMessageListener(function (msg) {
9796
if (msg.messageObject["value"] === "foo") {
@@ -105,15 +104,16 @@ describe.skip("Reliable Topic Proxy", function () {
105104
});
106105

107106
it('removed message listener does not receive items after removal', function (done) {
108-
var topicOne = clientOne.getReliableTopic("t");
109-
var topicTwo = clientTwo.getReliableTopic("t");
107+
var topicName = 't' + Math.random();
108+
var topicOne = clientOne.getReliableTopic(topicName);
109+
var topicTwo = clientTwo.getReliableTopic(topicName);
110110

111111
var receivedMessages = 0;
112112

113113
var id = topicTwo.addMessageListener(function (msg) {
114114
receivedMessages++;
115115
if (receivedMessages > 2) {
116-
done(new Error('Keep receiving messages after removal.'));
116+
done(new Error('Kept receiving messages after message listener is removed.'));
117117
}
118118
});
119119

@@ -129,11 +129,11 @@ describe.skip("Reliable Topic Proxy", function () {
129129
}, 500);
130130
});
131131

132-
it("blocks when there is no more space", function (done) {
132+
it("blocks when there is no more space", function () {
133133
var topic = clientOne.getReliableTopic("blocking");
134134
var ringbuffer = topic.getRingbuffer();
135135

136-
ringbuffer.capacity().then(function (capacity) {
136+
return ringbuffer.capacity().then(function (capacity) {
137137
var all = [];
138138

139139
for (var i = 0; i < capacity.toNumber() + 1; i++) {
@@ -142,19 +142,19 @@ describe.skip("Reliable Topic Proxy", function () {
142142

143143
return ringbuffer.addAll(all);
144144
}).then(function () {
145-
var startTime = new Date().getTime();
146-
topic.publish(-50).then(function () {
145+
var startTime = Date.now();
146+
return topic.publish(-50).then(function () {
147147
/*
148148
Here we check that the call was indeed blocking
149149
until the TTL of the first inserted entry has passed
150150
*/
151151

152-
var elapsed = new Date().getTime() - startTime;
152+
var elapsed = Date.now() - startTime;
153153

154154
if (elapsed > 2000) {
155-
done();
155+
return;
156156
} else {
157-
done(new Error("Message was published too fast, expected at least a 2 second delay, got: " + elapsed));
157+
throw new Error("Message was published too fast, expected at least a 2 second delay, got: " + elapsed);
158158
}
159159
});
160160
});
@@ -173,7 +173,6 @@ describe.skip("Reliable Topic Proxy", function () {
173173

174174
var all = generateItems(clientOne, 20);
175175

176-
177176
ringbuffer.addAll(all);
178177
});
179178

@@ -195,7 +194,6 @@ describe.skip("Reliable Topic Proxy", function () {
195194
});
196195
});
197196

198-
199197
it("overwrites the oldest item when there is no more space", function () {
200198
var topic = clientOne.getReliableTopic("overwrite");
201199
var ringbuffer = topic.getRingbuffer();

0 commit comments

Comments
 (0)