fix(stream): don't block event loop in EventQueue #151
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
Fixes #111 🦕
Warning
Backwards incompatible change, as
EventQueue.enqueue_event()becomes async.The implementation of the
AgentExecutorusually looks like this:The issue is that the loop inside
execute()callstask_updater.update_status()synchronously.If
update_statusis not an async function, then theexecute()method does not yield control back to the event loop during each iteration. That’s why everything gets blocked until the loop finishes.So the first step is to make sure that
update_status()andevent_queue.enqueue_event()are async functions.Still, this is not enough, since
EventQueueusesput_nowait(), so it never actually suspends.So the 2nd step is to switch from
queue.put_nowait()toqueue.put().This is still not enough.
By default, an
asyncio.Queueis unbounded , soqueue.putnever actually suspends the producer.Therefore, the producer loop in
SomeAgentExecutorstill runs full-speed without yielding.So the 3rd step is to make the queue bounded. I have set the size to 1024, but it can be actually lower or higher if needed.