Skip to content

Commit 2e56a4f

Browse files
[service-bus] Updating samples to fix a few issues: (Azure#15430)
Updating samples to fix a few issues: - Show the right way to restart your receiver when using sessions (which don't do automatic retries like non-sessions). - Some of the samples printed almost too little console output to indicate what they were doing or didn't demonstrate the right techniques (asking for multiple messages at a time, etc..). Those have been updated as well. Fixes Azure#14531
1 parent 75e2780 commit 2e56a4f

File tree

12 files changed

+253
-106
lines changed

12 files changed

+253
-106
lines changed

sdk/servicebus/service-bus/samples-dev/browseMessages.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,18 @@ export async function main() {
2929
const queueReceiver = sbClient.createReceiver(queueName);
3030

3131
try {
32-
for (let i = 0; i < 20; i++) {
33-
const [message] = await queueReceiver.peekMessages(1);
34-
if (!message) {
35-
console.log("No more messages to peek");
36-
break;
37-
}
38-
console.log(`Peeking message #${i}: ${message.body}`);
32+
// peeking messages does not lock or remove messages from a queue or subscription.
33+
// For locking and/or removal, look at the `receiveMessagesLoop` or `receiveMessagesStreaming` samples,
34+
// which cover using a receiver with a `receiveMode`.
35+
console.log(`Attempting to peek 10 messages at a time`);
36+
const peekedMessages = await queueReceiver.peekMessages(10);
37+
38+
console.log(`Got ${peekedMessages.length} messages.`);
39+
40+
for (let i = 0; i < peekedMessages.length; ++i) {
41+
console.log(`Peeked message #${i}: ${peekedMessages[i].body}`);
3942
}
43+
4044
await queueReceiver.close();
4145
} finally {
4246
await sbClient.close();

sdk/servicebus/service-bus/samples-dev/receiveMessagesLoop.ts

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,33 @@ export async function main() {
3131
// To receive messages from sessions, use getSessionReceiver instead of getReceiver or look at
3232
// the sample in sessions.ts file
3333
try {
34-
for (let i = 0; i < 10; i++) {
35-
const messages = await queueReceiver.receiveMessages(1, {
36-
maxWaitTimeInMs: 5000
34+
let allMessages = [];
35+
36+
console.log(`Receiving 10 messages...`);
37+
38+
while (allMessages.length < 10) {
39+
// NOTE: asking for 10 messages does not guarantee that we will return
40+
// all 10 at once so we must loop until we get all the messages we expected.
41+
const messages = await queueReceiver.receiveMessages(10, {
42+
maxWaitTimeInMs: 60 * 1000
3743
});
3844

3945
if (!messages.length) {
4046
console.log("No more messages to receive");
4147
break;
4248
}
4349

44-
console.log(`Received message #${i}: ${messages[0].body}`);
45-
await queueReceiver.completeMessage(messages[0]);
50+
console.log(`Received ${messages.length} messages`);
51+
allMessages.push(...messages);
52+
53+
for (let message of messages) {
54+
console.log(` Message: '${message.body}'`);
55+
56+
// completing the message will remove it from the remote queue or subscription.
57+
await queueReceiver.completeMessage(message);
58+
}
4659
}
60+
4761
await queueReceiver.close();
4862
} finally {
4963
await sbClient.close();

sdk/servicebus/service-bus/samples-dev/sendMessages.ts

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* @azsdk-weight 100
1515
*/
1616

17-
import { ServiceBusClient, ServiceBusMessage } from "@azure/service-bus";
17+
import { ServiceBusClient, ServiceBusMessage, ServiceBusMessageBatch } from "@azure/service-bus";
1818

1919
// Load the .env file if it exists
2020
import * as dotenv from "dotenv";
@@ -24,12 +24,15 @@ dotenv.config();
2424
const connectionString = process.env.SERVICEBUS_CONNECTION_STRING || "<connection string>";
2525
const queueName = process.env.QUEUE_NAME || "<queue name>";
2626

27-
const messages: ServiceBusMessage[] = [
27+
const firstSetOfMessages: ServiceBusMessage[] = [
2828
{ body: "Albert Einstein" },
2929
{ body: "Werner Heisenberg" },
3030
{ body: "Marie Curie" },
3131
{ body: "Steven Hawking" },
32-
{ body: "Isaac Newton" },
32+
{ body: "Isaac Newton" }
33+
];
34+
35+
const secondSetOfMessages: ServiceBusMessage[] = [
3336
{ body: "Niels Bohr" },
3437
{ body: "Michael Faraday" },
3538
{ body: "Galileo Galilei" },
@@ -46,27 +49,29 @@ export async function main() {
4649
try {
4750
// Tries to send all messages in a single batch.
4851
// Will fail if the messages cannot fit in a batch.
49-
await sender.sendMessages(messages);
52+
console.log(`Sending the first 5 scientists (as an array)`);
53+
await sender.sendMessages(firstSetOfMessages);
5054

5155
// Sends all messages using one or more ServiceBusMessageBatch objects as required
52-
let batch = await sender.createMessageBatch();
56+
let batch: ServiceBusMessageBatch = await sender.createMessageBatch();
5357

54-
for (let i = 0; i < messages.length; i++) {
55-
const message = messages[i];
58+
for (const message of secondSetOfMessages) {
5659
if (!batch.tryAddMessage(message)) {
5760
// Send the current batch as it is full and create a new one
5861
await sender.sendMessages(batch);
5962
batch = await sender.createMessageBatch();
6063

61-
if (!batch.tryAddMessage(messages[i])) {
64+
if (!batch.tryAddMessage(message)) {
6265
throw new Error("Message too big to fit in a batch");
6366
}
6467
}
6568
}
6669
// Send the batch
70+
console.log(`Sending the last 5 scientists (as a ServiceBusMessageBatch)`);
6771
await sender.sendMessages(batch);
6872

6973
// Close the sender
74+
console.log(`Done sending, closing...`);
7075
await sender.close();
7176
} finally {
7277
await sbClient.close();

sdk/servicebus/service-bus/samples-dev/session.ts

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,14 @@ export async function main() {
4343
const sbClient = new ServiceBusClient(connectionString);
4444

4545
try {
46+
console.log(`Sending 5 messages to 'session-1'`);
4647
await sendMessage(sbClient, listOfScientists[0], "session-1");
4748
await sendMessage(sbClient, listOfScientists[1], "session-1");
4849
await sendMessage(sbClient, listOfScientists[2], "session-1");
4950
await sendMessage(sbClient, listOfScientists[3], "session-1");
5051
await sendMessage(sbClient, listOfScientists[4], "session-1");
5152

53+
console.log(`Sending 5 messages to 'session-2'`);
5254
await sendMessage(sbClient, listOfScientists[5], "session-2");
5355
await sendMessage(sbClient, listOfScientists[6], "session-2");
5456
await sendMessage(sbClient, listOfScientists[7], "session-2");
@@ -80,22 +82,46 @@ async function sendMessage(sbClient: ServiceBusClient, scientist: any, sessionId
8082

8183
async function receiveMessages(sbClient: ServiceBusClient, sessionId: string) {
8284
// If receiving from a subscription you can use the acceptSession(topic, subscription, sessionId) overload
83-
const receiver = await sbClient.acceptSession(queueName, sessionId);
84-
85-
const processMessage = async (message: ServiceBusMessage) => {
86-
console.log(`Received: ${message.sessionId} - ${message.body} `);
87-
};
88-
const processError = async (args: ProcessErrorArgs) => {
89-
console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error);
90-
};
91-
receiver.subscribe({
92-
processMessage,
93-
processError
94-
});
95-
96-
await delay(5000);
97-
98-
await receiver.close();
85+
let endDate: number | undefined;
86+
87+
while (true) {
88+
console.log(`Creating session receiver for session '${sessionId}'`);
89+
const receiver = await sbClient.acceptSession(queueName, sessionId);
90+
91+
const subscribePromise = new Promise((_, reject) => {
92+
const processMessage = async (message: ServiceBusMessage) => {
93+
console.log(`Received: ${message.sessionId} - ${message.body} `);
94+
};
95+
const processError = async (args: ProcessErrorArgs) => {
96+
console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error);
97+
reject(args.error);
98+
};
99+
100+
receiver.subscribe({
101+
processMessage,
102+
processError
103+
});
104+
});
105+
106+
const now = Date.now();
107+
endDate = endDate ?? now + 20000;
108+
let remainingTime: number = endDate - now;
109+
110+
console.log(`Waiting for ${remainingTime} milliseconds for messages to arrive.`);
111+
112+
try {
113+
await Promise.race([subscribePromise, delay(remainingTime)]);
114+
115+
// wait time has expired, we can stop listening.
116+
console.log(`Time has expired, closing receiver for session '${sessionId}'`);
117+
118+
await receiver.close();
119+
break;
120+
} catch (err) {
121+
// `err` was already logged part of `processError` above.
122+
await receiver.close();
123+
}
124+
}
99125
}
100126

101127
main().catch((err) => {

sdk/servicebus/service-bus/samples/v7/javascript/browseMessages.js

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,18 @@ async function main() {
2828
const queueReceiver = sbClient.createReceiver(queueName);
2929

3030
try {
31-
for (let i = 0; i < 20; i++) {
32-
const [message] = await queueReceiver.peekMessages(1);
33-
if (!message) {
34-
console.log("No more messages to peek");
35-
break;
36-
}
37-
console.log(`Peeking message #${i}: ${message.body}`);
31+
// peeking messages does not lock or remove messages from a queue or subscription.
32+
// For locking and/or removal, look at the `receiveMessagesLoop` or `receiveMessagesStreaming` samples,
33+
// which cover using a receiver with a `receiveMode`.
34+
console.log(`Attempting to peek 10 messages at a time`);
35+
const peekedMessages = await queueReceiver.peekMessages(10);
36+
37+
console.log(`Got ${peekedMessages.length} messages.`);
38+
39+
for (let i = 0; i < peekedMessages.length; ++i) {
40+
console.log(`Peeked message #${i}: ${peekedMessages[i].body}`);
3841
}
42+
3943
await queueReceiver.close();
4044
} finally {
4145
await sbClient.close();

sdk/servicebus/service-bus/samples/v7/javascript/receiveMessagesLoop.js

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,33 @@ async function main() {
3030
// To receive messages from sessions, use getSessionReceiver instead of getReceiver or look at
3131
// the sample in sessions.ts file
3232
try {
33-
for (let i = 0; i < 10; i++) {
34-
const messages = await queueReceiver.receiveMessages(1, {
35-
maxWaitTimeInMs: 5000
33+
let allMessages = [];
34+
35+
console.log(`Receiving 10 messages...`);
36+
37+
while (allMessages.length < 10) {
38+
// NOTE: asking for 10 messages does not guarantee that we will return
39+
// all 10 at once so we must loop until we get all the messages we expected.
40+
const messages = await queueReceiver.receiveMessages(10, {
41+
maxWaitTimeInMs: 60 * 1000
3642
});
3743

3844
if (!messages.length) {
3945
console.log("No more messages to receive");
4046
break;
4147
}
4248

43-
console.log(`Received message #${i}: ${messages[0].body}`);
44-
await queueReceiver.completeMessage(messages[0]);
49+
console.log(`Received ${messages.length} messages`);
50+
allMessages.push(...messages);
51+
52+
for (let message of messages) {
53+
console.log(` Message: '${message.body}'`);
54+
55+
// completing the message will remove it from the remote queue or subscription.
56+
await queueReceiver.completeMessage(message);
57+
}
4558
}
59+
4660
await queueReceiver.close();
4761
} finally {
4862
await sbClient.close();

sdk/servicebus/service-bus/samples/v7/javascript/sendMessages.js

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@ dotenv.config();
2323
const connectionString = process.env.SERVICEBUS_CONNECTION_STRING || "<connection string>";
2424
const queueName = process.env.QUEUE_NAME || "<queue name>";
2525

26-
const messages = [
26+
const firstSetOfMessages = [
2727
{ body: "Albert Einstein" },
2828
{ body: "Werner Heisenberg" },
2929
{ body: "Marie Curie" },
3030
{ body: "Steven Hawking" },
31-
{ body: "Isaac Newton" },
31+
{ body: "Isaac Newton" }
32+
];
33+
34+
const secondSetOfMessages = [
3235
{ body: "Niels Bohr" },
3336
{ body: "Michael Faraday" },
3437
{ body: "Galileo Galilei" },
@@ -45,27 +48,29 @@ async function main() {
4548
try {
4649
// Tries to send all messages in a single batch.
4750
// Will fail if the messages cannot fit in a batch.
48-
await sender.sendMessages(messages);
51+
console.log(`Sending the first 5 scientists (as an array)`);
52+
await sender.sendMessages(firstSetOfMessages);
4953

5054
// Sends all messages using one or more ServiceBusMessageBatch objects as required
5155
let batch = await sender.createMessageBatch();
5256

53-
for (let i = 0; i < messages.length; i++) {
54-
const message = messages[i];
57+
for (const message of secondSetOfMessages) {
5558
if (!batch.tryAddMessage(message)) {
5659
// Send the current batch as it is full and create a new one
5760
await sender.sendMessages(batch);
5861
batch = await sender.createMessageBatch();
5962

60-
if (!batch.tryAddMessage(messages[i])) {
63+
if (!batch.tryAddMessage(message)) {
6164
throw new Error("Message too big to fit in a batch");
6265
}
6366
}
6467
}
6568
// Send the batch
69+
console.log(`Sending the last 5 scientists (as a ServiceBusMessageBatch)`);
6670
await sender.sendMessages(batch);
6771

6872
// Close the sender
73+
console.log(`Done sending, closing...`);
6974
await sender.close();
7075
} finally {
7176
await sbClient.close();

sdk/servicebus/service-bus/samples/v7/javascript/session.js

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,14 @@ async function main() {
4242
const sbClient = new ServiceBusClient(connectionString);
4343

4444
try {
45+
console.log(`Sending 5 messages to 'session-1'`);
4546
await sendMessage(sbClient, listOfScientists[0], "session-1");
4647
await sendMessage(sbClient, listOfScientists[1], "session-1");
4748
await sendMessage(sbClient, listOfScientists[2], "session-1");
4849
await sendMessage(sbClient, listOfScientists[3], "session-1");
4950
await sendMessage(sbClient, listOfScientists[4], "session-1");
5051

52+
console.log(`Sending 5 messages to 'session-2'`);
5153
await sendMessage(sbClient, listOfScientists[5], "session-2");
5254
await sendMessage(sbClient, listOfScientists[6], "session-2");
5355
await sendMessage(sbClient, listOfScientists[7], "session-2");
@@ -79,22 +81,46 @@ async function sendMessage(sbClient, scientist, sessionId) {
7981

8082
async function receiveMessages(sbClient, sessionId) {
8183
// If receiving from a subscription you can use the acceptSession(topic, subscription, sessionId) overload
82-
const receiver = await sbClient.acceptSession(queueName, sessionId);
83-
84-
const processMessage = async (message) => {
85-
console.log(`Received: ${message.sessionId} - ${message.body} `);
86-
};
87-
const processError = async (args) => {
88-
console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error);
89-
};
90-
receiver.subscribe({
91-
processMessage,
92-
processError
93-
});
94-
95-
await delay(5000);
96-
97-
await receiver.close();
84+
let endDate;
85+
86+
while (true) {
87+
console.log(`Creating session receiver for session '${sessionId}'`);
88+
const receiver = await sbClient.acceptSession(queueName, sessionId);
89+
90+
const subscribePromise = new Promise((_, reject) => {
91+
const processMessage = async (message) => {
92+
console.log(`Received: ${message.sessionId} - ${message.body} `);
93+
};
94+
const processError = async (args) => {
95+
console.log(`>>>>> Error from error source ${args.errorSource} occurred: `, args.error);
96+
reject(args.error);
97+
};
98+
99+
receiver.subscribe({
100+
processMessage,
101+
processError
102+
});
103+
});
104+
105+
const now = Date.now();
106+
endDate = endDate ?? now + 20000;
107+
let remainingTime = endDate - now;
108+
109+
console.log(`Waiting for ${remainingTime} milliseconds for messages to arrive.`);
110+
111+
try {
112+
await Promise.race([subscribePromise, delay(remainingTime)]);
113+
114+
// wait time has expired, we can stop listening.
115+
console.log(`Time has expired, closing receiver for session '${sessionId}'`);
116+
117+
await receiver.close();
118+
break;
119+
} catch (err) {
120+
// `err` was already logged part of `processError` above.
121+
await receiver.close();
122+
}
123+
}
98124
}
99125

100126
main().catch((err) => {

0 commit comments

Comments
 (0)