Skip to content

Commit 9b0eb84

Browse files
authored
EH/SB remove Message.LinkName (Azure#20520)
One more change from go-amqp to pick up.
1 parent 3ae758d commit 9b0eb84

File tree

8 files changed

+20
-42
lines changed

8 files changed

+20
-42
lines changed

sdk/messaging/azeventhubs/internal/go-amqp/message.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,8 @@ type Message struct {
102102
// encryption details).
103103
Footer Annotations
104104

105-
rcvr *Receiver // the receiving link
106-
deliveryID uint32 // used when sending disposition
107-
settled bool // whether transfer was settled by sender
105+
deliveryID uint32 // used when sending disposition
106+
settled bool // whether transfer was settled by sender
108107
}
109108

110109
// NewMessage returns a *Message with data as the payload.
@@ -127,14 +126,6 @@ func (m *Message) GetData() []byte {
127126
return m.Data[0]
128127
}
129128

130-
// LinkName returns the receiving link name or the empty string.
131-
func (m *Message) LinkName() string {
132-
if m.rcvr != nil {
133-
return m.rcvr.l.key.name
134-
}
135-
return ""
136-
}
137-
138129
// MarshalBinary encodes the message into binary form.
139130
func (m *Message) MarshalBinary() ([]byte, error) {
140131
buf := &buffer.Buffer{}

sdk/messaging/azeventhubs/internal/go-amqp/receiver.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ func (r *Receiver) Prefetched() *Message {
9090
}
9191

9292
debug.Log(3, "RX (Receiver): prefetched delivery ID %d", msg.deliveryID)
93-
msg.rcvr = r
9493

9594
if msg.settled {
9695
r.onSettlement(1)
@@ -121,7 +120,6 @@ func (r *Receiver) Receive(ctx context.Context, opts *ReceiveOptions) (*Message,
121120
msg := q.Dequeue()
122121
debug.Assert(msg != nil)
123122
debug.Log(3, "RX (Receiver): received delivery ID %d", msg.deliveryID)
124-
msg.rcvr = r
125123
r.messagesQ.Release(q)
126124
if msg.settled {
127125
r.onSettlement(1)

sdk/messaging/azservicebus/amqp_message.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func copyAnnotations(src map[any]any) amqp.Annotations {
222222
return dest
223223
}
224224

225-
func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message) *AMQPAnnotatedMessage {
225+
func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message, receivingLinkName string) *AMQPAnnotatedMessage {
226226
var header *AMQPAnnotatedMessageHeader
227227

228228
if goAMQPMessage.Header != nil {
@@ -273,7 +273,7 @@ func newAMQPAnnotatedMessage(goAMQPMessage *amqp.Message) *AMQPAnnotatedMessage
273273
DeliveryTag: goAMQPMessage.DeliveryTag,
274274
Footer: footer,
275275
Header: header,
276-
linkName: goAMQPMessage.LinkName(),
276+
linkName: receivingLinkName,
277277
Properties: properties,
278278
inner: goAMQPMessage,
279279
}

sdk/messaging/azservicebus/internal/go-amqp/message.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,8 @@ type Message struct {
102102
// encryption details).
103103
Footer Annotations
104104

105-
rcvr *Receiver // the receiving link
106-
deliveryID uint32 // used when sending disposition
107-
settled bool // whether transfer was settled by sender
105+
deliveryID uint32 // used when sending disposition
106+
settled bool // whether transfer was settled by sender
108107
}
109108

110109
// NewMessage returns a *Message with data as the payload.
@@ -127,14 +126,6 @@ func (m *Message) GetData() []byte {
127126
return m.Data[0]
128127
}
129128

130-
// LinkName returns the receiving link name or the empty string.
131-
func (m *Message) LinkName() string {
132-
if m.rcvr != nil {
133-
return m.rcvr.l.key.name
134-
}
135-
return ""
136-
}
137-
138129
// MarshalBinary encodes the message into binary form.
139130
func (m *Message) MarshalBinary() ([]byte, error) {
140131
buf := &buffer.Buffer{}

sdk/messaging/azservicebus/internal/go-amqp/receiver.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ func (r *Receiver) Prefetched() *Message {
9090
}
9191

9292
debug.Log(3, "RX (Receiver): prefetched delivery ID %d", msg.deliveryID)
93-
msg.rcvr = r
9493

9594
if msg.settled {
9695
r.onSettlement(1)
@@ -121,7 +120,6 @@ func (r *Receiver) Receive(ctx context.Context, opts *ReceiveOptions) (*Message,
121120
msg := q.Dequeue()
122121
debug.Assert(msg != nil)
123122
debug.Log(3, "RX (Receiver): received delivery ID %d", msg.deliveryID)
124-
msg.rcvr = r
125123
r.messagesQ.Release(q)
126124
if msg.settled {
127125
r.onSettlement(1)

sdk/messaging/azservicebus/message.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,9 @@ func (m *Message) toAMQPMessage() *amqp.Message {
312312
// newReceivedMessage creates a received message from an AMQP message.
313313
// NOTE: this converter assumes that the Body of this message will be the first
314314
// serialized byte array in the Data section of the messsage.
315-
func newReceivedMessage(amqpMsg *amqp.Message) *ReceivedMessage {
315+
func newReceivedMessage(amqpMsg *amqp.Message, receivingLinkName string) *ReceivedMessage {
316316
msg := &ReceivedMessage{
317-
RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg),
317+
RawAMQPMessage: newAMQPAnnotatedMessage(amqpMsg, receivingLinkName),
318318
State: MessageStateActive,
319319
}
320320

sdk/messaging/azservicebus/message_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func TestMessageUnitTest(t *testing.T) {
5050
func TestAMQPMessageToReceivedMessage(t *testing.T) {
5151
t.Run("empty_message", func(t *testing.T) {
5252
// nothing should blow up.
53-
rm := newReceivedMessage(&amqp.Message{})
53+
rm := newReceivedMessage(&amqp.Message{}, "receiving_link")
5454
require.NotNil(t, rm)
5555
})
5656

@@ -73,7 +73,7 @@ func TestAMQPMessageToReceivedMessage(t *testing.T) {
7373
},
7474
}
7575

76-
receivedMessage := newReceivedMessage(amqpMessage)
76+
receivedMessage := newReceivedMessage(amqpMessage, "receiving_link")
7777

7878
require.Equal(t, []byte("hello"), receivedMessage.Body)
7979
require.EqualValues(t, lockedUntil, *receivedMessage.LockedUntil)
@@ -134,7 +134,7 @@ func TestAMQPMessageToMessage(t *testing.T) {
134134
Data: [][]byte{[]byte("foo")},
135135
}
136136

137-
msg := newReceivedMessage(amqpMsg)
137+
msg := newReceivedMessage(amqpMsg, "receiving_link")
138138

139139
require.EqualValues(t, msg.MessageID, amqpMsg.Properties.MessageID, "messageID")
140140
require.EqualValues(t, msg.SessionID, amqpMsg.Properties.GroupID, "groupID")
@@ -179,40 +179,40 @@ func TestMessageState(t *testing.T) {
179179
Annotations: amqp.Annotations{
180180
messageStateAnnotation: td.PropValue,
181181
},
182-
})
182+
}, "receiving_link")
183183
require.EqualValues(t, td.Expected, m.State)
184184
})
185185
}
186186

187187
t.Run("NoAnnotations", func(t *testing.T) {
188188
m := newReceivedMessage(&amqp.Message{
189189
Annotations: nil,
190-
})
190+
}, "receiving_link")
191191
require.EqualValues(t, MessageStateActive, m.State)
192192
})
193193
}
194194

195195
func TestMessageWithIncorrectBody(t *testing.T) {
196196
// these are cases where the simple ReceivedMessage can't represent the AMQP message's
197197
// payload.
198-
message := newReceivedMessage(&amqp.Message{})
198+
message := newReceivedMessage(&amqp.Message{}, "receiving_link")
199199
require.Nil(t, message.Body)
200200

201201
message = newReceivedMessage(&amqp.Message{
202202
Value: "hello",
203-
})
203+
}, "receiving_link")
204204
require.Nil(t, message.Body)
205205

206206
message = newReceivedMessage(&amqp.Message{
207207
Sequence: [][]any{},
208-
})
208+
}, "receiving_link")
209209
require.Nil(t, message.Body)
210210

211211
message = newReceivedMessage(&amqp.Message{
212212
Data: [][]byte{
213213
[]byte("hello"),
214214
[]byte("world"),
215215
},
216-
})
216+
}, "receiving_link")
217217
require.Nil(t, message.Body)
218218
}

sdk/messaging/azservicebus/receiver.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers
229229
}
230230

231231
for _, amqpMsg := range amqpMessages {
232-
receivedMsg := newReceivedMessage(amqpMsg)
232+
receivedMsg := newReceivedMessage(amqpMsg, lwid.Receiver.LinkName())
233233
receivedMsg.deferred = true
234234

235235
receivedMessages = append(receivedMessages, receivedMsg)
@@ -279,7 +279,7 @@ func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, option
279279
receivedMessages = make([]*ReceivedMessage, len(messages))
280280

281281
for i := 0; i < len(messages); i++ {
282-
receivedMessages[i] = newReceivedMessage(messages[i])
282+
receivedMessages[i] = newReceivedMessage(messages[i], links.Receiver.LinkName())
283283
}
284284

285285
if len(receivedMessages) > 0 && updateInternalSequenceNumber {
@@ -440,7 +440,7 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt
440440
var receivedMessages []*ReceivedMessage
441441

442442
for _, msg := range result.Messages {
443-
receivedMessages = append(receivedMessages, newReceivedMessage(msg))
443+
receivedMessages = append(receivedMessages, newReceivedMessage(msg, linksWithID.Receiver.LinkName()))
444444
}
445445

446446
return receivedMessages, nil

0 commit comments

Comments
 (0)