Skip to content

Close OpenAI stream on asyncio cancellation to save tokens #10133

@ChienNQuangHolistics

Description

@ChienNQuangHolistics

I'm trying to implement cancellation for agents/pipelines. Since run_async works well with asyncio so I went with using asyncio's task.cancel.

With some experiments, I see that OpenAI doesn't log /chat/completions requests if we close the stream soon enough, which can save tokens for long running requests. However, OpenAI generators don't take in account any exception and the stream would be left opened until all chunks are returned

If OpenAIChatGenerator._handle_async_stream_response closed the chat_completions: AsyncStream when asyncio.CancelledError is raised, token usage could be reduced. I’m currently monkey-patching this behavior:

async def _handle_async_stream_response_wrapper(wrapped, instance, args, kwargs):
	async_stream = args[0]
	try:
		return await wrapped(*args, **kwargs)
	except asyncio.CancelledError:
		await asyncio.shield(async_stream.close())
		raise

def patch_openai():
	wrap_function_wrapper(
		"haystack.components.generators.chat.openai",
		"OpenAIChatGenerator._handle_async_stream_response",
		_handle_async_stream_response_wrapper,
	)
#

I want to ask what you guys think on whether this change makes sense. Cancellation isn’t supported across Haystack agents/pipelines yet, and I’m only confident about the OpenAI generator side.

Metadata

Metadata

Assignees

Labels

P1High priority, add to the next sprint

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions