Skip to content

TypeError: extra keyword arguments given #82

@skylin008

Description

@skylin008

Description

Thank you shared this impressive project, I used the version:2.3.7 umodbus code, when run this code, the error shows: TypeError:extra keyword arguments given. Follow as code:
`# modbus master driver for lidar

6 pieces lidar for system, address from 1 to 6

from right to left side, count closewize

system package

from machine import Pin
import logging
import gc
from InitConfig import config
import uasyncio as asyncio

log = logging.MiniLog("RtuMaster", level = logging.DEBUG)
DEBUG = config.get('DEBUG', False)

MasterUartId = config.get("UART_LIDAR")

self defined package

from umodbus.serial import Serial as RTUMaster
from umodbus import version

gc.collect()

version = '1.0'
author = 'skylin'

register_definitions = {
"HREGS":{ # read hold registers
"slave_address":{
"1": 1,
"2": 2,
"3": 3,
"4": 4,
"5": 5,
"6": 6
},
"register_address": 0x94,
"length": 0x02,
},
}

class Frame(object):
def init(self,data = None):
""" Data struct: """
if data is None:
self.data = bytearray()
else:
self.data = bytearray(data)

def __len__(self):
    return len(self.data)

class FrameBuffer(object):
def init(self, size):
"""
A FrameBuffer is used to buffer frames received from the Network
over the active Bus.The FrameBuffer is designed as a FIFO and is
useful to overcome the limited storing capability
"""
self._size = size
self._data = [[memoryview(bytearray(6))] for _ in range(size)]
self._index_write = 0
self._index_read = 0
self._null_sink = [0, 0, 0, 0]
self._count = 0

@micropython.native
def put(self, v):
    self._data[self._index_write] = v
    next_index = (self._index_write + 1) % self._size
    if next_index != self._index_read:
        self._index_write = self.next_index
        self._count += 1
    else:
        self.clear()
        if DEBUG:
            raise Exception("frame buffer over flow")

@micropython.native
def get(self):
    if self._index_read == self._index_write:
        return None

    lin_id, _, data = self._data[self._index_read]
    self._index_read = (self._index_read +1) % self._size
    self._count -= 1
    return Frame(data)  # Omit the data length


def any(self):
    if self._index_read == self._index_write:
        return False
    return True

def clear(self):
    self._index_write = 0
    self._index_read  = 0

class Master(object):
def init(self, uart_id, baudrate = 115200, ctrlPin = None, fbSize = 128):
self.rtu_pins = (Pin('A2'), Pin('A3'))
self.uart_id = uart_id
self.ctrlPin = ctrlPin
self.baudrate = baudrate

    self.host = RTUMaster(
        pins = self.rtu_pins,
        uart_id = self.uart_id,
        baudrate = self.baudrate,
        ctrl_pin = self.ctrlPin
        )

    self.framebuf = FrameBuffer(fbSize)

def write(self, registerdefinitions, idx):     
    slave_addr = registerdefinitions['HREGS']['slave_address']['idx']
    register_addr = registerdefinitions['HREGS']['register_address']
    qty = registerdefinitions['HREGS']['length']
     
    register_value = self.host.read_holding_registers(
                    slave_addr = slave_addr,
                    starting_addr = register_addr,
                    register_qty = qty,
                    signed = False
)   
    if not register_value:
        self.framebuf.put(register_value)

def read(self):
    if self.framebuf.any():
        return self.framebuf.get()

if name == 'main':

async def test():            
    ctrl_Pin = '485_TR1'
    m = Master(uart_id = 2, ctrlPin = '485_TR1')
    while True:
        for idx in range(7):
            w = m.write(register_definitions, idx)
                
            if w is not None:
                log.info(f"Lidar{idx} return data is:{w}")
            await asyncio.sleep_ms(30)  


async def start():
    asyncio.create_task(test())
    while True:
        await asyncio.sleep_ms(10)

app = None
async def main():
    import time
    t = time.localtime()
    log.info(t)
    gc.collect()
    global app
    
    await start()

try:
    gc.collect()
    asyncio.run(main())

except KeyboardInterrupt:
    log.info('Interrupt')

finally:
    asyncio.new_event_loop()
    log.info("run again")

`

Reproduction steps

...

MicroPython version

V1.22

MicroPython board

pyboard

MicroPython Modbus version

# e.g. v2.3.3
# use the following command to get the used version
import os
from umodbus import version
print('MicroPython infos:', os.uname())
print('Used micropthon-modbus version:', version.__version__))

Relevant log output

>>> %Run -c $EDITOR_CONTENT

MPY: sync filesystems
MPY: soft reboot
Task exception wasn't retrieved
future: <Task> coro= <generator object 'test' at 2000c850>
Traceback (most recent call last):
  File "asyncio/core.py", line 1, in run_until_complete
  File "<stdin>", line 147, in test
  File "<stdin>", line 117, in __init__
  File "umodbus/serial.py", line 1, in __init__
TypeError: extra keyword arguments given

User code

# modbus master driver for lidar 
# 6 pieces lidar for system, address from 1 to 6
# from right to left side, count closewize

# system package
from machine import Pin
import logging
import gc
from InitConfig import config
import uasyncio as asyncio

log = logging.MiniLog("RtuMaster", level = logging.DEBUG)
DEBUG = config.get('DEBUG', False)

MasterUartId = config.get("UART_LIDAR")

# self defined package
from umodbus.serial import Serial as RTUMaster
from umodbus import version

gc.collect()

__version__ = '1.0'
__author__ = 'skylin'


register_definitions = {
    "HREGS":{   # read hold registers
        "slave_address":{
            "1": 1,
            "2": 2,
            "3": 3,
            "4": 4,
            "5": 5,
            "6": 6
        },
        "register_address": 0x94,
        "length": 0x02,
    },
}


class Frame(object):
    def __init__(self,data = None):
        """ Data struct: """        
        if data is None:
            self.data = bytearray()
        else:            
            self.data = bytearray(data)

    def __len__(self):
        return len(self.data)
    

class FrameBuffer(object):
    def __init__(self, size):
        """
        A FrameBuffer is used to buffer frames received from the Network
        over the active Bus.The FrameBuffer is designed as a FIFO and is 
        useful to overcome the limited storing capability
        """
        self._size = size
        self._data = [[memoryview(bytearray(6))] for _ in range(size)]
        self._index_write = 0
        self._index_read = 0
        self._null_sink = [0, 0, 0, 0]
        self._count = 0

    @micropython.native
    def put(self, v):
        self._data[self._index_write] = v
        next_index = (self._index_write + 1) % self._size
        if next_index != self._index_read:
            self._index_write = self.next_index
            self._count += 1
        else:
            self.clear()
            if DEBUG:
                raise Exception("frame buffer over flow")

    @micropython.native
    def get(self):
        if self._index_read == self._index_write:
            return None

        lin_id, _, data = self._data[self._index_read]
        self._index_read = (self._index_read +1) % self._size
        self._count -= 1
        return Frame(data)  # Omit the data length


    def any(self):
        if self._index_read == self._index_write:
            return False
        return True

    def clear(self):
        self._index_write = 0
        self._index_read  = 0

    


class Master(object):
    def __init__(self, uart_id, baudrate = 115200, ctrlPin = None, fbSize = 128):
        self.rtu_pins = (Pin('A2'), Pin('A3'))
        self.uart_id = uart_id
        self.ctrlPin = ctrlPin
        self.baudrate = baudrate

        

        self.host = RTUMaster(
            pins = self.rtu_pins,
            uart_id = self.uart_id,
            baudrate = self.baudrate,
            ctrl_pin = self.ctrlPin
            )

        self.framebuf = FrameBuffer(fbSize)

    def write(self, registerdefinitions, idx):     
        slave_addr = registerdefinitions['HREGS']['slave_address']['idx']
        register_addr = registerdefinitions['HREGS']['register_address']
        qty = registerdefinitions['HREGS']['length']
         
        register_value = self.host.read_holding_registers(
                        slave_addr = slave_addr,
                        starting_addr = register_addr,
                        register_qty = qty,
                        signed = False
    )   
        if not register_value:
            self.framebuf.put(register_value)

    def read(self):
        if self.framebuf.any():
            return self.framebuf.get()




if __name__ == '__main__':
    
    async def test():            
        ctrl_Pin = '485_TR1'
        m = Master(uart_id = 2, ctrlPin = '485_TR1')
        while True:
            for idx in range(7):
                w = m.write(register_definitions, idx)
                    
                if w is not None:
                    log.info(f"Lidar{idx} return data is:{w}")
                await asyncio.sleep_ms(30)  

    
    async def start():
        asyncio.create_task(test())
        while True:
            await asyncio.sleep_ms(10)

    app = None
    async def main():
        import time
        t = time.localtime()
        log.info(t)
        gc.collect()
        global app
        
        await start()

    try:
        gc.collect()
        asyncio.run(main())

    except KeyboardInterrupt:
        log.info('Interrupt')

    finally:
        asyncio.new_event_loop()
        log.info("run again")

Additional informations

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions