diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 7dda4cd..77e7f95 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -244,6 +244,12 @@ def schema_version(self): """ return self._message.schema_version() + def producer_name(self) -> str: + """ + Get the producer name which produced this message + """ + return self._message.producer_name() + @staticmethod def _wrap(_message): self = Message() diff --git a/src/message.cc b/src/message.cc index dd263b6..e18861a 100644 --- a/src/message.cc +++ b/src/message.cc @@ -105,7 +105,8 @@ void export_message(py::module_& m) { .def("topic_name", &Message::getTopicName, return_value_policy::copy) .def("redelivery_count", &Message::getRedeliveryCount) .def("int_schema_version", &Message::getLongSchemaVersion) - .def("schema_version", &Message::getSchemaVersion, return_value_policy::copy); + .def("schema_version", &Message::getSchemaVersion, return_value_policy::copy) + .def("producer_name", &Message::getProducerName, return_value_policy::copy); MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload, uint32_t batchSize) = &MessageBatch::parseFrom; diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index ee10579..3603d84 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -238,12 +238,14 @@ def test_producer_is_connected(self): def test_producer_consumer(self): client = Client(self.serviceUrl) consumer = client.subscribe("my-python-topic-producer-consumer", "my-sub", consumer_type=ConsumerType.Shared) - producer = client.create_producer("my-python-topic-producer-consumer") + producer = client.create_producer("my-python-topic-producer-consumer", + producer_name="my-producer") producer.send(b"hello") msg = consumer.receive(TM) self.assertTrue(msg) self.assertEqual(msg.data(), b"hello") + self.assertEqual(msg.producer_name(), "my-producer") with self.assertRaises(pulsar.Timeout): consumer.receive(100)