Skip to content

Conversation

@JavaSaBr
Copy link
Owner

@JavaSaBr JavaSaBr commented Dec 3, 2025

Improve publish delivering to subscribers, part 1

@JavaSaBr JavaSaBr requested a review from crazyrokr December 3, 2025 19:11
@JavaSaBr JavaSaBr self-assigned this Dec 3, 2025
@github-actions
Copy link

github-actions bot commented Dec 3, 2025

Test Coverage Report

Overall Project 85.02% -3.44% 🍏
Files changed 50.09%

File Coverage
AclRulesLoader.groovy 100% 🍏
PublishHandlingResult.java 100% 🍏
MqttServerConnectionConfig.java 100% 🍏
PublishCompleteMqttInMessageHandler.java 100% 🍏
UnsubscribeMqttInMessageHandler.java 100% 🍏
PublishReceiveMqttInMessageHandler.java 100% 🍏
ProcessingOutPublishesMqttInMessageHandler.java 100% 🍏
PublishAckMqttInMessageHandler.java 100% 🍏
SubscribeMqttInMessageHandler.java 100% 🍏
InMemorySubscriptionService.java 100% 🍏
DenySubscribeRule.java 100% 🍏
AllowSubscribeRule.java 100% 🍏
DenyPublishRule.java 100% 🍏
SubscriptionService.java 100% 🍏
Qos0MqttPublishOutMessageHandler.java 100% 🍏
Publish.java 98.75% 🍏
AbstractTopic.java 91.96% 🍏
InMemoryActiveSubscriptions.java 90.98% -9.02%
MqttBrokerSpringConfig.java 89.57% 🍏
ConnectInMqttInMessageHandler.java 80.54% 🍏
Qos1MqttPublishInMessageHandler.java 80.45% 🍏
AbstractNetworkMqttUser.java 79.35% -5.81% 🍏
InMemoryMessageTacker.java 78.33% -10%
InMemoryTopicNameMapping.java 77.33% -16%
PublishRetryer.java 75% 🍏
InMemoryProcessingPublishes.java 73.86% -6.82% 🍏
InMemoryNetworkMqttSession.java 70.83% -25%
Qos2MqttPublishInMessageHandler.java 65.33% -6.81%
TrackableMqttPublishOutMessageHandler.java 64.2% -35.8% 🍏
AbstractMqttPublishInMessageHandler.java 63.41% -9.35% 🍏
AbstractMqttPublishOutMessageHandler.java 61.98% -34.71%
AbstractMqttInMessageHandler.java 58.21% 🍏
InMemoryMqttSessionService.java 45.86% -11.72%
PublishReleaseMqttInMessageHandler.java 40.79% -6.58%
Qos2MqttPublishOutMessageHandler.java 38.83% -61.17%
Qos1MqttPublishOutMessageHandler.java 30.68% -69.32%


abstract class QosMqttPublishOutMessageHandlerTest extends IntegrationServiceSpecification {
static {
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the the static block required here?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazyrokr the second part will be focused on tests and I will use it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I advise against committing code for the future, because unclear intentions are confusing. If you need to add a static block in the future, it will be easier to do so than to remember a redundant empty block that needs to be removed if for some reason you don't need it.

"mqtt.external.connection.subscription.id.available",
boolean.class,
MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT),
false), // not implemented
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the comment required here?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazyrokr it's description why I disabled it now

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current comment doesn't clarify the original intent and is confusing. Please add more details to the comment or delete it.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazyrokr updated comments

Copy link
Collaborator

@crazyrokr crazyrokr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please cover new code with tests

protected void retryDeliveringImpl(ExternalNetworkMqttUser user, MqttSession session, Publish publish) {
int messageId = publish.messageId();
MessageTacker messageTacker = session.outMessageTracker();
TrackedMessageMeta messageMeta = messageTacker.stored(messageId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we consider to rename messageTacker.stored to messageTacker.getStoredMessageMeta ?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazyrokr message tracker contains only message meta, why do I need to repeat it in the method name?

Copy link
Collaborator

@crazyrokr crazyrokr Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neither the class name nor the variable name of the message tracker contains any reference to message metadataю Moreover, in the current implementation, calling the stored method looks like calling a setter, which does not imply returning a value from the method.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazyrokr it's a standard way for fluent API


PublishReleaseReasonCode releaseResult;
// we unknown this flow
if (trackedMessageMeta == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make sense to use } else if { branch here

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazyrokr how o_O

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this

PublishReleaseReasonCode releaseResult;
if (reasonCode != PublishReceivedReasonCode.SUCCESS) {
  log.warning(clientId, reasonCode, messageId,
      "[%s] Received error response:[%s] for publish:[%s]"::formatted);
  // we can cancel the flow
  if (trackedMessageMeta != null) {
    messageTacker.remove(messageId);
  }
  return true;
} else if (trackedMessageMeta == null) { // we unknown this flow
  releaseResult = PublishReleaseReasonCode.PACKET_IDENTIFIER_NOT_FOUND;
} else {
  releaseResult = PublishReleaseReasonCode.SUCCESS;
  messageTacker.update(messageId, MqttMessageType.PUBLISH_RELEASE, reasonCode);
}

}

// finish the flow
MessageTacker messageTacker = session.outMessageTracker();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to use default branch here?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazyrokr yes, why not?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it would be something like this

PublishCompletedReasonCode reasonCode = publishComplete.reasonCode();
if (trackedMessageMeta == null) {
  log.warning(clientId, messageId, "[%s] No any stored information for messageId:[%d]"::formatted);
  return;
} else if (trackedMessageMeta.messageType() != MqttMessageType.PUBLISH_RELEASE) {
  log.warning(clientId, trackedMessageMeta, messageId,
      "[%s] No expected message meta:[%s] for messageId:[%d]"::formatted);
  return;
} else if (reasonCode != PublishCompletedReasonCode.SUCCESS) {
  log.warning(clientId, reasonCode, messageId,
      "[%s] Received error response:[%s] for publish:[%s]"::formatted);
} else {
  // finish the flow
  MessageTacker messageTacker = session.outMessageTracker();
  messageTacker.remove(messageId);
}

}

PublishCompletedReasonCode reasonCode = publishComplete.reasonCode();
if (reasonCode != PublishCompletedReasonCode.SUCCESS) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make sense to use } else if { branch here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants