From 3f62c36b1086b40c458cc234b077351dee10835c Mon Sep 17 00:00:00 2001 From: Dave Paul Date: Fri, 2 Sep 2022 09:45:22 -0400 Subject: [PATCH 1/3] allows Bus to operate with 11-bit CAN messages --- j1939/__init__.py | 148 +++++++++++++++++++++++++++------------------- 1 file changed, 88 insertions(+), 60 deletions(-) diff --git a/j1939/__init__.py b/j1939/__init__.py index 6de336e..bf38461 100644 --- a/j1939/__init__.py +++ b/j1939/__init__.py @@ -64,7 +64,7 @@ fileHandler.setLevel(lLevel) logger.addHandler(fileHandler) - can_set_logging_level('debug') + # can_set_logging_level('debug') class j1939Listner(canListener): @@ -82,6 +82,13 @@ class Bus(BusABC): """ A CAN Bus that implements the J1939 Protocol. + :param bool strict: + + indicates whether to operate on strictly J1939 message (True) + or also handle 11-bit CAN messages. If False, then any specfified + `can_filters` will be honored; incoming 11-bit messages will be + accepted; and outgoing 11-bit messages will be sent. + :param list j1939_filters: a list of dictionaries that specify filters that messages must match to be received by this Bus. Messages can match any of the @@ -94,7 +101,7 @@ class Bus(BusABC): channel_info = "j1939 bus" - def __init__(self, pdu_type=PDU, broadcast=True, *args, **kwargs): + def __init__(self, pdu_type=PDU, broadcast=True, strict=True, *args, **kwargs): logger.debug("Creating a new j1939 bus") #self.rx_can_message_queue = Queue() @@ -104,6 +111,7 @@ def __init__(self, pdu_type=PDU, broadcast=True, *args, **kwargs): super(Bus, self).__init__(kwargs.get('channel'), kwargs.get('can_filters')) self._pdu_type = pdu_type + self._strict = strict self.timeout = 1 self._long_message_throttler = threading.Thread(target=self._throttler_function) self._long_message_throttler.daemon = True @@ -129,7 +137,7 @@ def __init__(self, pdu_type=PDU, broadcast=True, *args, **kwargs): if 'j1939_filters' in kwargs and kwargs['j1939_filters'] is not None: filters = kwargs.pop('j1939_filters') logger.debug("Got filters: {}".format(filters)) - can_filters = [] + can_filters = [] if self._strict else kwargs.get('can_filters', []) for filt in filters: can_id, can_mask = 0, 0 if 'pgn' in filt: @@ -159,13 +167,15 @@ def __init__(self, pdu_type=PDU, broadcast=True, *args, **kwargs): raise ValueError("Bad timeout type") logger.debug("Creating a new can bus") - self.can_bus = RawCanBus(*args, **kwargs) + self.can_bus = self._make_can_bus(*args, **kwargs) canListener = j1939Listner(self.notification) self.can_notifier = canNotifier(self.can_bus, [canListener], timeout=self.timeout) self._long_message_throttler.start() + def _make_can_bus(self, *args, **kwargs): + return RawCanBus(*args, **kwargs) def notification(self, inboundMessage): #self.rx_can_message_queue.put(inboundMessage) @@ -176,55 +186,64 @@ def notification(self, inboundMessage): if isinstance(inboundMessage, Message): logger.info('\n\n{}: Got a Message from CAN: {}'.format(inspect.stack()[0][3],inboundMessage)) if inboundMessage.id_type: - # Extended ID - # Only J1939 messages (i.e. 29-bit IDs) should go further than this point. - # Non-J1939 systems can co-exist with J1939 systems, but J1939 doesn't care - # about the content of their messages. - logger.info('{}: Message is j1939 msg'.format(inspect.stack()[0][3])) - - # - # Need to determine if it's a broadcast message or - # limit to listening nodes only - # - arbitration_id = ArbitrationID() - arbitration_id.can_id = inboundMessage.arbitration_id - logger.info("{}: ArbitrationID = {}, inboundMessage.arbitration_id: 0x{:08x}".format(inspect.stack()[0][3],arbitration_id, inboundMessage.arbitration_id)) - - for (node, l_notifier) in self.node_queue_list: - logger.debug("notification: node=%s" % (node)) - logger.debug(" notifier=%s" % (l_notifier)) - logger.debug(" arbitration_id.pgn=%s" % (arbitration_id.pgn)) - logger.debug(" destination_address=%s" % (arbitration_id.destination_address)) - - # redirect the AC stuff to the node processors. the rest can go - # to the main queue. - if node and (arbitration_id.pgn in [PGN_AC_ADDRESS_CLAIMED, PGN_AC_COMMANDED_ADDRESS, PGN_REQUEST_FOR_PGN]): - logger.info("{}: sending to notifier queue".format(inspect.stack()[0][3])) - # send the PDU to the node processor. - l_notifier.queue.put(inboundMessage) - - # if node has the destination address, do something with the PDU - elif node and (arbitration_id.destination_address in node.address_list): - logger.info("{}: sending to process_incoming_message".format(inspect.stack()[0][3])) - rx_pdu = self._process_incoming_message(inboundMessage) - if rx_pdu: - logger.info("WP02: notification: sent to general queue: %s QQ=%s" % (rx_pdu, self.queue)) - self.queue.put(rx_pdu) - elif node and (arbitration_id.destination_address is None): - logger.info("{}: sending broadcast to general queue".format(inspect.stack()[0][3])) - rx_pdu = self._process_incoming_message(inboundMessage) - logger.info("WP01: notification: sent broadcast to general queue: %s QQ=%s" % (rx_pdu, self.queue)) - self.queue.put(rx_pdu) - elif node is None: - # always send the message to the logging queue - logger.info("{}: sending to general queue".format(inspect.stack()[0][3])) - rx_pdu = self._process_incoming_message(inboundMessage) - logger.info("WP03: notification: sent pdu [%s] to general queue" % rx_pdu) - self.queue.put(rx_pdu) - else: - logger.info("WP04: notification: pdu dropped: %s\n\n" % inboundMessage) + self._handle_pdu_msg(inboundMessage) + else: + self._handle_non_pdu_msg(inboundMessage) + + def _handle_pdu_msg(self, inboundMessage): + # Extended ID + # Only J1939 messages (i.e. 29-bit IDs) should go further than this point. + # Non-J1939 systems can co-exist with J1939 systems, but J1939 doesn't care + # about the content of their messages. + logger.info('{}: Message is j1939 msg'.format(inspect.stack()[0][3])) + + # + # Need to determine if it's a broadcast message or + # limit to listening nodes only + # + arbitration_id = ArbitrationID() + arbitration_id.can_id = inboundMessage.arbitration_id + logger.info("{}: ArbitrationID = {}, inboundMessage.arbitration_id: 0x{:08x}".format(inspect.stack()[0][3],arbitration_id, inboundMessage.arbitration_id)) + + for (node, l_notifier) in self.node_queue_list: + logger.debug("notification: node=%s" % (node)) + logger.debug(" notifier=%s" % (l_notifier)) + logger.debug(" arbitration_id.pgn=%s" % (arbitration_id.pgn)) + logger.debug(" destination_address=%s" % (arbitration_id.destination_address)) + + # redirect the AC stuff to the node processors. the rest can go + # to the main queue. + if node and (arbitration_id.pgn in [PGN_AC_ADDRESS_CLAIMED, PGN_AC_COMMANDED_ADDRESS, PGN_REQUEST_FOR_PGN]): + logger.info("{}: sending to notifier queue".format(inspect.stack()[0][3])) + # send the PDU to the node processor. + l_notifier.queue.put(inboundMessage) + + # if node has the destination address, do something with the PDU + elif node and (arbitration_id.destination_address in node.address_list): + logger.info("{}: sending to process_incoming_message".format(inspect.stack()[0][3])) + rx_pdu = self._process_incoming_message(inboundMessage) + if rx_pdu: + logger.info("WP02: notification: sent to general queue: %s QQ=%s" % (rx_pdu, self.queue)) + self.queue.put(rx_pdu) + elif node and (arbitration_id.destination_address is None): + logger.info("{}: sending broadcast to general queue".format(inspect.stack()[0][3])) + rx_pdu = self._process_incoming_message(inboundMessage) + logger.info("WP01: notification: sent broadcast to general queue: %s QQ=%s" % (rx_pdu, self.queue)) + self.queue.put(rx_pdu) + elif node is None: + # always send the message to the logging queue + logger.info("{}: sending to general queue".format(inspect.stack()[0][3])) + rx_pdu = self._process_incoming_message(inboundMessage) + logger.info("WP03: notification: sent pdu [%s] to general queue" % rx_pdu) + self.queue.put(rx_pdu) else: - logger.info("Received non J1939 message (ignoring)") + logger.info("WP04: notification: pdu dropped: %s\n\n" % inboundMessage) + + def _handle_non_pdu_msg(self, inboundMessage): + if self._strict: + logger.info("Received non J1939 message (ignoring)") + else: + self.queue.put(inboundMessage) def connect(self, node): """ @@ -263,7 +282,13 @@ def recv(self, timeout=None): # listener.on_error_received(rx_error) def send(self, msg, timeout=None): - logger.info("j1939.send: msg={}".format(msg)) + logger.info("j1939.send: msg=%s", msg) + if isinstance(msg, self._pdu_type): + self._send_pdu(msg) + elif not self._strict: + self._send(msg) + + def _send_pdu(self, msg): messages = [] if len(msg.data) > 8: logger.info("j1939.send: message is > than 8 bytes") @@ -403,13 +428,16 @@ def send(self, msg, timeout=None): dlc=len(msg.data), data=msg.data) - logger.debug("j1939.send: calling can_bus_send: can-msg: {}".format(can_message)) - try: - self.can_bus.send(can_message) - except CanError: - if self._ignore_can_send_error: - pass - raise + self._send(can_message) + + def _send(self, can_message): + logger.debug("j1939.send: calling can_bus_send: can-msg: %s", can_message) + try: + self.can_bus.send(can_message) + except CanError: + if self._ignore_can_send_error: + pass + raise def shutdown(self): self.can_notifier._running = False From 0653564424b046610b452fad289ed6e4d0efa53e Mon Sep 17 00:00:00 2001 From: Dave Paul Date: Fri, 2 Sep 2022 09:47:34 -0400 Subject: [PATCH 2/3] tidiness --- j1939/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/j1939/__init__.py b/j1939/__init__.py index bf38461..6a5a23a 100644 --- a/j1939/__init__.py +++ b/j1939/__init__.py @@ -83,7 +83,6 @@ class Bus(BusABC): A CAN Bus that implements the J1939 Protocol. :param bool strict: - indicates whether to operate on strictly J1939 message (True) or also handle 11-bit CAN messages. If False, then any specfified `can_filters` will be honored; incoming 11-bit messages will be From 29a99f4c9e03712a37c2713c9707b9d59528b421 Mon Sep 17 00:00:00 2001 From: Dave Paul Date: Fri, 2 Sep 2022 10:01:40 -0400 Subject: [PATCH 3/3] better description of strict operation --- j1939/__init__.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/j1939/__init__.py b/j1939/__init__.py index 6a5a23a..e557f24 100644 --- a/j1939/__init__.py +++ b/j1939/__init__.py @@ -84,9 +84,12 @@ class Bus(BusABC): :param bool strict: indicates whether to operate on strictly J1939 message (True) - or also handle 11-bit CAN messages. If False, then any specfified - `can_filters` will be honored; incoming 11-bit messages will be - accepted; and outgoing 11-bit messages will be sent. + or also handle 11-bit CAN messages (False). In strict operation, + specification of `j1939_filters` will override any specification of + `can_filters`; inbound and outbound 11-bit CAN messages will be + dropped. In non-strict operation, `j1939_filters` will add to + `can_filters`; inbound and outbound 11-bit CAN messages will be + accepted and sent, respectively. :param list j1939_filters: a list of dictionaries that specify filters that messages must