Async randomly dying
Checks
-
I agree to follow the MicroPython Code of Conduct to ensure a safe and respectful space for everyone.
-
I've searched for existing issues matching this bug, and didn't find any.
Port, board and/or hardware
RP2
MicroPython version
MicroPython v1.22.2 on 2024-02-22; Raspberry Pi Pico with RP2040
Reproduction
from machine import UART, Pin
from uasyncio import StreamReader, StreamWriter, Event, Lock, sleep_ms, run
LoRa_bands = {'EU': 868.1, 'US': 915.0}
class LoRa:
def __init__(self, uart=0, tx_pin=0, rx_pin=1, reset_pin=4, baudrate=115200,
band='US', address=2):
self.uart = UART(uart, baudrate=baudrate, tx=tx_pin, rx=rx_pin)
self.reset_pin = Pin(reset_pin, Pin.OUT)
self.reader = StreamReader(self.uart)
self.writer = StreamWriter(self.uart, {})
self._band = band
self._address = address
self.initialized = False
self.seat = Lock()
self._recv = Event()
self._received = Event()
def start(self):
from uasyncio import create_task
create_task(self.dev_init())
create_task(self.readloop())
async def send_query(self, query):
return await self.send_command(query, True)
async def send_command(self, cmd, query=False):
""" Sends an AT command to the device """
# Fix capitalization
if loc := cmd.find('=') + 1:
cmd = cmd[:loc].upper() + cmd[loc:]
else:
cmd = cmd.upper()
# Add prefix
if not cmd.startswith('AT+'):
orig_cmd = cmd
cmd = 'AT+' + cmd
else:
orig_cmd = cmd[3:]
# Add suffix for query
if query and cmd[-1] != '?':
cmd += '?'
# Add termination
cmd += '\r\n'
async with self.seat:
print("Sending command: ", cmd.rstrip())
await self.writer.awrite(cmd)
await sleep_ms(10 if query else 50)
while not self.uart.any():
print("Waiting for response...")
await sleep_ms(250)
response = await self.recv()
response = response[:-2]
print(response)
if query:
if response.startswith(f"+{orig_cmd}="):
response = response[len(f"+{orig_cmd}="):]
else:
raise ValueError("Invalid response: %s" % response)
elif response != '+OK':
raise ValueError("Error: %s" % response)
return response
async def reset(self):
print("Resetting LoRa module.")
self.reset_pin.off()
await sleep_ms(100)
self.reset_pin.on()
status = await self.recv()
if status != '+READY\r\n':
raise ValueError("Module not ready after reset: %s" % status)
async def dev_init(self):
await self.reset()
if not self.version or not self.uid:
raise ValueError("Unknown module.")
self.address = self._address
self.band = self._band
async def recv(self):
self._recv.set()
await self._received.wait()
self._received.clear()
return self._data
async def readloop(self):
while True:
await self._recv.wait()
raw_data = await self.reader.readline()
self._data = raw_data.decode()
self._recv.clear()
self._received.set()
print("wtf")
@property
def address(self):
return run(self.send_query('address'))
@address.setter
def address(self, address=None):
if address is None:
address = self._address
elif address >= 0 and address < 2 ** 16:
self._address = address
else:
raise ValueError("Invalid LoRa address: %s", address)
return run(self.send_command(f'ADDRESS={address}'))
@property
def band(self):
return run(self.send_query('band'))
@band.setter
def band(self, band=None):
if band is None:
band = self._band
else:
self._band = band
band_freq = int(LoRa_bands[band] * 1_000_000)
return run(self.send_command(f'BAND={band_freq}'))
@property
def parameter(self):
return run(self.send_query('parameter'))
@parameter.setter
def set_parameters(self, spreading_factor, bandwidth, coding_rate):
return run(self.send_command(f'PARAMETER={spreading_factor},{bandwidth},{coding_rate}'))
@property
def uid(self):
return run(self.send_query('uid'))
@property
def version(self):
return run(self.send_query('ver'))
from asyncio import get_event_loop
from lora import LoRa
lora = LoRa()
loop = get_event_loop()
lora.start()
loop.run_forever()
print("Ended.")
Connected to MicroPython at /dev/ttyACM2
Use Ctrl-] or Ctrl-x to exit this shell
MPY: soft reboot
Resetting LoRa module.
Sending command: AT+VER?
+VER=RYLR998_REYAX_V1.2.2
Sending command: AT+UID?
+UID=000500110457F1B400003E76
Sending command: AT+ADDRESS=2
+OK
Sending command: AT+BAND=915000000
+OK
Ended.
MicroPython v1.22.2 on 2024-02-22; Raspberry Pi Pico with RP2040
Type "help()" for more information.
The infinite loop seems to end withot reason, and then the run_forever is not followed.
Expected behaviour
This script should loop infinitely, instead it seems to stop after dev_init completes.
Observed behaviour
The loop stops with no exceptions, even though it's an infinite loop.
Additional Information
No, I've provided everything above.
Give me a solution for the esp01 response throught Micropython
Checks
-
I agree to follow the MicroPython Code of Conduct to ensure a safe and respectful space for everyone.
-
I've searched for existing issues matching this bug, and didn't find any.
Port, board and/or hardware
Raspberry Pi Pico
MicroPython version
MicroPython v1.22.1 on 2024-01-05; Raspberry Pi Pico with RP2040
Reproduction
when i run this python code :
from machine import UART, Pin
import utime
esp = UART(0, baudrate=115200)
def send_at_command(cmd):
esp.write(cmd)
response = waitResp(esp)
# print(response.decode())
return response
def waitResp(uart, timeout=2000):
prvMills = utime.ticks_ms()
resp = b""
while (utime.ticks_ms()-prvMills) < timeout:
if uart.any():
resp = b"".join([resp, uart.readline()])
print("resp:")
try:
print(resp.decode())
except UnicodeError:
print("E:", resp)
return resp
send_at_command(b'AT')
Expected behaviour
the output should be:
resp:
AT
OK
Observed behaviour
but giving only:
resp:
At
Additional Information
No, I've provided everything above.