-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
I'm trying to implement cancellation for agents/pipelines. Since run_async works well with asyncio so I went with using asyncio's task.cancel.
With some experiments, I see that OpenAI doesn't log /chat/completions requests if we close the stream soon enough, which can save tokens for long running requests. However, OpenAI generators don't take in account any exception and the stream would be left opened until all chunks are returned
If OpenAIChatGenerator._handle_async_stream_response closed the chat_completions: AsyncStream when asyncio.CancelledError is raised, token usage could be reduced. I’m currently monkey-patching this behavior:
async def _handle_async_stream_response_wrapper(wrapped, instance, args, kwargs):
async_stream = args[0]
try:
return await wrapped(*args, **kwargs)
except asyncio.CancelledError:
await asyncio.shield(async_stream.close())
raise
def patch_openai():
wrap_function_wrapper(
"haystack.components.generators.chat.openai",
"OpenAIChatGenerator._handle_async_stream_response",
_handle_async_stream_response_wrapper,
)
#I want to ask what you guys think on whether this change makes sense. Cancellation isn’t supported across Haystack agents/pipelines yet, and I’m only confident about the OpenAI generator side.