← index #5857PR #287
Likely Duplicate · medium · value 1.261
QUERY · ISSUE

extmod/uasyncio Provide means of scheduling I/O with high priority

openby peterhinchopened 2020-04-02updated 2020-05-04

This was discussed at length in the past, and prototyped with early code of this uasyncio. There is known demand for it in high speed UART applications and audio processing. Salient points:

  • Enable I/O readiness to be tested on every pass of the scheduler.
  • The facility to be available on a per-interface basis. A subset of active interfaces can be selected to run at high priority.

This would improve real time throughput and reduce buffering requirements

CANDIDATE · PULL REQUEST

uasyncio fast_io EventLoop option reduces I/O scheduling latency

openby peterhinchopened 2018-06-20updated 2019-02-24

This PR offers a solution to the issue discussed in MicroPython issue 2664 and provides a way of accessing the full capability of MicroPython PR 3836; namely the ability to support user-written stream I/O drivers with low latency.

An optional ioq_len=0 arg to get_event_loop() is added. The default behaviour is unchanged from standard: if I/O is pending in the presence of coros on the run queue, the I/O task is appended to the run queue and is scheduled after all pending tasks have yielded.

If ioq_len is > 0, pending I/O tasks will be scheduled in preference to tasks on the run queue, substantially reducing latency. This reduces the need for buffering of fast I/O devices and improves the response time of user-written I/O drivers.

A test program demonstrating this effect may be found here.

3 comments
peterhinch · 2018-06-25

To add to this the I/O latency of the current code is twice as long as might be expected. Assume there are N coros which issue await asyncio.sleep(0) and one StreamReader. The StreamReader gets to run only after the N coros have run twice. This may be seen with the test program referenced above (run test() against the current library).

The reason for this is as follows. Assume user coros as per the test script like

    async def dummy(self):
        while True:
            await asyncio.sleep(0)
            utime.sleep_ms(10)  # Emulate time consuming user code

Assume an initial condition where the run queue contains N such coros. Each will be scheduled before the scheduler's .wait method gives I/O a chance to poll the device. But when each coro yields, it appends itself to the run queue. So, when .wait gets to run, .runq already contains the N coros awaiting scheduling. When .wait runs and finds the device ready it appends the I/O read task to the queue.

So in the next iteration through the run queue the N coros run a second time before the I/O read is re-scheduled. At the time when this takes place each coro has yielded and been put on the run queue. So after the I/O read has been scheduled the run queue contains the N coros recreating the assumed initial condition.

This PR does not alter this default behaviour. But when fast_io is specified I/O tasks alternate with other tasks on the run queue, reducing latency by a factor of 2N.

dpgeorge · 2018-06-29

From my understanding there are two main things going on here: 1) implementing priorities for coros (low and high); 2) pumping the IO poller faster to give high-priority IO a chance to schedule itself faster.

For part 1): would it be simpler to just push the callback to the head of the runq, rather than having a separate queue?

Also, it seems the way to specify that the coro has priority here is to make it yield IOReadDone/IOWriteDone (via the appropriate StreamReader/StreamWriter class). Might there not be a more general way to specify that a coro is higher priority, like registering itself as high-prio with the event loop?

peterhinch · 2018-06-29

My initial plan was indeed to push the coro to the head of runq but ucollections.deque doesn't have an appendleft method; I was wary of the performance implications of implementing one. The penalty of an extra queue seems small, especially as it's only instantiated if required. But there is another issue.

The inner loop runs only those tasks which were on runq at the start (tasks are appended as the loop runs). If .wait(0) added an arbitrary number of tasks to the top (left) of runq keeping track of the tasks (and args) which need to be popped off could get involved: you don't know if args are queued until you've established the type of the object popped. So you can't just compare the length of the queue before and after .wait(0). I concluded an I/O queue was a simpler (and possibly faster) solution.

The forthcoming fix for the read/write class bug means that the StreamReader and StreamWriter bound coroutines must always yield IOReadDone or IOWriteDone. This is unrelated to priority: it is for the correct management of the poll flags for read/write devices which is done in Python. The code can be seen in this branch which combines this PR with the forthcoming proposal to fix the read/write bug. Yielding a "done" instance triggers the ._unregister() method which handles the poll flags.

I/O coros register themselves as high priority by ._call_io which places them on ioq or runq depending on whether an ioq has been instantiated.

I'm unsure what you're suggesting regarding a more general solution.

Keyboard

j / / n
next pair
k / / p
previous pair
1 / / h
show query pane
2 / / l
show candidate pane
c
copy suggested comment
r
toggle reasoning
g i
go to index
?
show this help
esc
close overlays

press ? or esc to close

copied