← index #13770Issue #13758
Off-topic · high · value 3.061
QUERY · ISSUE

Async randomly dying

openby desultoryopened 2024-02-27updated 2024-03-28
bug

Checks

  • I agree to follow the MicroPython Code of Conduct to ensure a safe and respectful space for everyone.

  • I've searched for existing issues matching this bug, and didn't find any.

Port, board and/or hardware

RP2

MicroPython version

MicroPython v1.22.2 on 2024-02-22; Raspberry Pi Pico with RP2040

Reproduction

from machine import UART, Pin
from uasyncio import StreamReader, StreamWriter, Event, Lock, sleep_ms, run


LoRa_bands = {'EU': 868.1, 'US': 915.0}


class LoRa:
    def __init__(self, uart=0, tx_pin=0, rx_pin=1, reset_pin=4, baudrate=115200,
                 band='US', address=2):
        self.uart = UART(uart, baudrate=baudrate, tx=tx_pin, rx=rx_pin)
        self.reset_pin = Pin(reset_pin, Pin.OUT)
        self.reader = StreamReader(self.uart)
        self.writer = StreamWriter(self.uart, {})

        self._band = band
        self._address = address

        self.initialized = False
        self.seat = Lock()
        self._recv = Event()
        self._received = Event()

    def start(self):
        from uasyncio import create_task
        create_task(self.dev_init())
        create_task(self.readloop())

    async def send_query(self, query):
        return await self.send_command(query, True)

    async def send_command(self, cmd, query=False):
        """ Sends an AT command to the device """
        # Fix capitalization
        if loc := cmd.find('=') + 1:
            cmd = cmd[:loc].upper() + cmd[loc:]
        else:
            cmd = cmd.upper()

        # Add prefix
        if not cmd.startswith('AT+'):
            orig_cmd = cmd
            cmd = 'AT+' + cmd
        else:
            orig_cmd = cmd[3:]

        # Add suffix for query
        if query and cmd[-1] != '?':
            cmd += '?'

        # Add termination
        cmd += '\r\n'

        async with self.seat:
            print("Sending command: ", cmd.rstrip())
            await self.writer.awrite(cmd)
            await sleep_ms(10 if query else 50)
            while not self.uart.any():
                print("Waiting for response...")
                await sleep_ms(250)
            response = await self.recv()

        response = response[:-2]
        print(response)

        if query:
            if response.startswith(f"+{orig_cmd}="):
                response = response[len(f"+{orig_cmd}="):]
            else:
                raise ValueError("Invalid response: %s" % response)
        elif response != '+OK':
            raise ValueError("Error: %s" % response)
        return response

    async def reset(self):
        print("Resetting LoRa module.")
        self.reset_pin.off()
        await sleep_ms(100)
        self.reset_pin.on()
        status = await self.recv()
        if status != '+READY\r\n':
            raise ValueError("Module not ready after reset: %s" % status)

    async def dev_init(self):
        await self.reset()
        if not self.version or not self.uid:
            raise ValueError("Unknown module.")
        self.address = self._address
        self.band = self._band

    async def recv(self):
        self._recv.set()
        await self._received.wait()
        self._received.clear()
        return self._data

    async def readloop(self):
        while True:
            await self._recv.wait()
            raw_data = await self.reader.readline()
            self._data = raw_data.decode()
            self._recv.clear()
            self._received.set()
        print("wtf")

    @property
    def address(self):
        return run(self.send_query('address'))

    @address.setter
    def address(self, address=None):
        if address is None:
            address = self._address
        elif address >= 0 and address < 2 ** 16:
            self._address = address
        else:
            raise ValueError("Invalid LoRa address: %s", address)

        return run(self.send_command(f'ADDRESS={address}'))

    @property
    def band(self):
        return run(self.send_query('band'))

    @band.setter
    def band(self, band=None):
        if band is None:
            band = self._band
        else:
            self._band = band

        band_freq = int(LoRa_bands[band] * 1_000_000)
        return run(self.send_command(f'BAND={band_freq}'))

    @property
    def parameter(self):
        return run(self.send_query('parameter'))

    @parameter.setter
    def set_parameters(self, spreading_factor, bandwidth, coding_rate):
        return run(self.send_command(f'PARAMETER={spreading_factor},{bandwidth},{coding_rate}'))

    @property
    def uid(self):
        return run(self.send_query('uid'))

    @property
    def version(self):
        return run(self.send_query('ver'))
from asyncio import get_event_loop
from lora import LoRa

lora = LoRa()

loop = get_event_loop()

lora.start()

loop.run_forever()

print("Ended.")
Connected to MicroPython at /dev/ttyACM2
Use Ctrl-] or Ctrl-x to exit this shell

MPY: soft reboot
Resetting LoRa module.
Sending command:  AT+VER?
+VER=RYLR998_REYAX_V1.2.2
Sending command:  AT+UID?
+UID=000500110457F1B400003E76
Sending command:  AT+ADDRESS=2
+OK
Sending command:  AT+BAND=915000000
+OK
Ended.
MicroPython v1.22.2 on 2024-02-22; Raspberry Pi Pico with RP2040
Type "help()" for more information.

The infinite loop seems to end withot reason, and then the run_forever is not followed.

Expected behaviour

This script should loop infinitely, instead it seems to stop after dev_init completes.

Observed behaviour

The loop stops with no exceptions, even though it's an infinite loop.

Additional Information

No, I've provided everything above.

CANDIDATE · ISSUE

Give me a solution for the esp01 response throught Micropython

closedby Jayakrishna112opened 2024-02-26updated 2024-02-27
bug

Checks

  • I agree to follow the MicroPython Code of Conduct to ensure a safe and respectful space for everyone.

  • I've searched for existing issues matching this bug, and didn't find any.

Port, board and/or hardware

Raspberry Pi Pico

MicroPython version

MicroPython v1.22.1 on 2024-01-05; Raspberry Pi Pico with RP2040

Reproduction

when i run this python code :

from machine import UART, Pin
import utime

esp = UART(0, baudrate=115200)

def send_at_command(cmd):
    esp.write(cmd)
    response = waitResp(esp)
    # print(response.decode())
    return response

def waitResp(uart, timeout=2000):
    prvMills = utime.ticks_ms()
    resp = b""
    while (utime.ticks_ms()-prvMills) < timeout:
        if uart.any():
            resp = b"".join([resp, uart.readline()])
    print("resp:")
    try:
        print(resp.decode())
    except UnicodeError:
        print("E:", resp)
    return resp

send_at_command(b'AT')

Expected behaviour

the output should be:

resp:
AT

OK

Observed behaviour

but giving only:

resp:
At

Additional Information

No, I've provided everything above.

Keyboard

j / / n
next pair
k / / p
previous pair
1 / / h
show query pane
2 / / l
show candidate pane
c
copy suggested comment
r
toggle reasoning
g i
go to index
?
show this help
esc
close overlays

press ? or esc to close

copied