Skip to content

Conversation

@matino
Copy link
Contributor

@matino matino commented Jun 3, 2025

Description

Fixes #111 🦕

Warning

Backwards incompatible change, as EventQueue.enqueue_event() becomes async.

The implementation of the AgentExecutor usually looks like this:

class SomeAgentExecutor(AgentExecutor):

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)
        task_updater.start_work()
        
        async for event in self._run_async(...):
            if event.is_final_response():
                task_updater.add_artifact(parts)
                task_updater.complete()
                break
            task_updater.update_status(
                TaskState.working,
                message=task_updater.new_agent_message(parts),
            )

The issue is that the loop inside execute() calls task_updater.update_status() synchronously.
If update_status is not an async function, then the execute() method does not yield control back to the event loop during each iteration. That’s why everything gets blocked until the loop finishes.

So the first step is to make sure that update_status() and event_queue.enqueue_event() are async functions.
Still, this is not enough, since EventQueue uses put_nowait(), so it never actually suspends.
So the 2nd step is to switch from queue.put_nowait() to queue.put().

This is still not enough.
By default, an asyncio.Queue is unbounded , so queue.put never actually suspends the producer.
Therefore, the producer loop in SomeAgentExecutor still runs full-speed without yielding.
So the 3rd step is to make the queue bounded. I have set the size to 1024, but it can be actually lower or higher if needed.

@matino matino requested a review from a team as a code owner June 3, 2025 11:09
@matino matino force-pushed the fix_message_stream branch from be45b34 to 2d7bcba Compare June 3, 2025 11:19
@pstephengoogle pstephengoogle merged commit efd9080 into a2aproject:main Jun 3, 2025
6 checks passed
holtskinner pushed a commit that referenced this pull request Jun 9, 2025
🤖 I have created a release *beep* *boop*
---


##
[0.2.6](v0.2.5...v0.2.6)
(2025-06-09)


### ⚠ BREAKING CHANGES

* Add FastAPI JSONRPC Application
([#104](#104))

### Features

* Add FastAPI JSONRPC Application
([#104](#104))
([0e66e1f](0e66e1f))
* Add gRPC server and client support
([#162](#162))
([a981605](a981605))
* add reject method to task_updater
([#147](#147))
([2a6ef10](2a6ef10))
* Add timestamp to `TaskStatus` updates on `TaskUpdater`
([#140](#140))
([0c9df12](0c9df12))
* **spec:** Add an optional iconUrl field to the AgentCard 🤖
([a1025f4](a1025f4))


### Bug Fixes

* Correctly adapt starlette BaseUser to A2A User
([#133](#133))
([88d45eb](88d45eb))
* Event consumer should stop on input_required
([#167](#167))
([51c2d8a](51c2d8a))
* Fix Release Version
([#161](#161))
([011d632](011d632))
* generate StrEnum types for enums
([#134](#134))
([0c49dab](0c49dab))
* library should released as 0.2.6
([d8187e8](d8187e8))
* remove error types from enqueable events
([#138](#138))
([511992f](511992f))
* **stream:** don't block event loop in EventQueue
([#151](#151))
([efd9080](efd9080))
* **task_updater:** fix potential duplicate artifact_id from default v…
([#156](#156))
([1f0a769](1f0a769))


### Documentation

* remove final and metadata fields from docstring
([#66](#66))
([3c50ee1](3c50ee1))
* Update Links to Documentation Site
([5e7d418](5e7d418))

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).

Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com>
daavoo added a commit to mozilla-ai/any-agent that referenced this pull request Jun 10, 2025
Changed in a2aproject/a2a-python#151.

- Increase `timeout` in `test_load_and_run_multi_agent_a2a_tool`.
daavoo added a commit to mozilla-ai/any-agent that referenced this pull request Jun 10, 2025
* Add missing `id` in `SendMessageRequest`.

* Add missing id

* fix(a2a): Add missing `await` in `event_queue.enqueue_event`.

Changed in a2aproject/a2a-python#151.

- Increase `timeout` in `test_load_and_run_multi_agent_a2a_tool`.

* bump "a2a-sdk>=0.2.6"

* Fix pre-commit
CharesrelaSutton added a commit to CharesrelaSutton/any-agent that referenced this pull request Nov 26, 2025
* Add missing `id` in `SendMessageRequest`.

* Add missing id

* fix(a2a): Add missing `await` in `event_queue.enqueue_event`.

Changed in a2aproject/a2a-python#151.

- Increase `timeout` in `test_load_and_run_multi_agent_a2a_tool`.

* bump "a2a-sdk>=0.2.6"

* Fix pre-commit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: message/stream does not stream properly

3 participants