Skip to content

Conversation

@Pr0Wh1teGivee
Copy link

@Pr0Wh1teGivee Pr0Wh1teGivee commented Dec 6, 2025

Purpose

For background, please check the RFC proposed by Huawei: #30101

Test Plan

Test Result


Essential Elements of an Effective PR Description Checklist
  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model.
  • (Optional) Release notes update. If your change is user facing, please update the release notes draft in the Google Doc.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a BufferResponseProcessor to manage response buffering based on Service Level Objectives (SLOs), which is a valuable addition for controlling response latency. The integration into AsyncLLM is mostly well-handled. However, I've identified a critical runtime error due to an undefined variable and a file duplication issue that should be addressed to improve maintainability.

await asyncio.sleep(0)

if self.buffer_response:
self.buffer_response_processor.abort_request(request_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The variable request_id is not defined in this scope, which will cause a NameError at runtime. The requests that need to be aborted are available in processed_outputs.reqs_to_abort. This line should be replaced with a loop that iterates over processed_outputs.reqs_to_abort and calls abort_request for each request ID.

Suggested change
self.buffer_response_processor.abort_request(request_id)
for request_id_to_abort in processed_outputs.reqs_to_abort:
self.buffer_response_processor.abort_request(request_id_to_abort)

Comment on lines 1 to 168
from typing import Any, Optional, Callable, Dict, Union
from dataclasses import dataclass, field
from rwlock import RWLock

import time
import threading
import queue
import _queue

SECOND_TO_MS = 1000

@dataclass
class GlobalSLORequirement:
ttft_slo: Optional[Union[int, float]] = 1000 # ms
tpot_slo: Optional[Union[int, float]] = 50 # ms

@dataclass
class BufferedResponse:
request_id: str
req_slo_requirement: Optional[Union[GlobalSLORequirement, None]] = None
output: Union[queue.Queue, Any] = None
is_ended: Optional[bool] = False
have_sent_prefill: Optional[bool] = False
last_processed_time: Optional[float] = 0.0
engine_index: Optional[int] = 0
is_aborted : Optional[bool] = False

class BufferResponseProcessor():
def __init__(self,
process_callback: Callable[[Any], Any],
global_slo_requirement: Optional[GlobalSLORequirement] = GlobalSLORequirement(),
engine_num: Optional[int] = 1
):
"""
Init BufferResponseProcessor and object is belonged to async_llm
:param process_callback: func to release responses to request when it meets slo requirements
:param engine_num: Optional, record the engine num for saving corresponding logs to different loggers in async_llm
"""
self.process_callback = process_callback
self.slo_send_factor = 0.95
self.default_slo_requirement = global_slo_requirement
self.engine_num = engine_num
self.response_container : Dict[str, BufferedResponse] = {}
self._rw_lock = RWLock()
self._running = True
self._buffer_response_thread = threading.Thread(
target=self._process_buffered_response,
daemon=True
)
self._buffer_response_thread.start()

def add_response(self, response: BufferedResponse) -> None:
"""
Add BufferedResponse to the BufferResponseProcessor.
:param response: class BufferedResponse with request_id, output and slo_requirement(optional)
:return: None
"""
with self._rw_lock.writer_lock:
if response.request_id in self.response_container:
# update output, engine_index(DP), is_ended for the request in response_container
self.response_container[response.request_id].output.put_nowait(response.output)
self.response_container[response.request_id].engine_index = response.engine_index
self.response_container[response.request_id].is_ended = response.is_ended
else:
# add new request to response_container
if not response.req_slo_requirement:
response.req_slo_requirement = self.default_slo_requirement
self.response_container[response.request_id] = response

def abort_request(self, request_id: str) -> None:
"""
Remove the request from response_container once it is aborted
:param request_id: str
:return: None
"""
with self._rw_lock.writer_lock:
if request_id in self.response_container:
self.response_container[request_id].is_aborted = True

def _slo_checker(self) -> list[Any]:
"""
To filter outputs that are approaching to SLO requirements
:return: list[Any]
"""
global SECOND_TO_MS
to_send_outputs = {i: [] for i in range(self.engine_num)}
to_update_response = []
with self._rw_lock.reader_lock:
for req_id, req_response in self.response_container.items():
if req_response.is_aborted or req_response.is_ended:
to_update_response.append((req_id, req_response.engine_index))
else:
processing_slo = "tpot_slo" if req_response.have_sent_prefill else "ttft_slo"

if (((time.time() - req_response.last_processed_time) * SECOND_TO_MS >
self.slo_send_factor * getattr(req_response.req_slo_requirement, processing_slo))
and req_response.output.qsize() > 0):
to_update_response.append((req_id, req_response.engine_index))

for id_index_pair in to_update_response:
outputs = self._update_response_container_and_get_output(id_index_pair[0])
to_send_outputs[id_index_pair[1]].extend(outputs)
return to_send_outputs

def _process_buffered_response(self) -> None:
"""
Loop to check slo in response_container and release buffered responses
:return: None
"""
while self._running:
to_send_output = self._slo_checker()
for engine_index in range(self.engine_num):
if len(to_send_output[engine_index]) > 0:
self.process_callback(outputs = to_send_output[engine_index], engine_index = engine_index)
time.sleep(0.001)

def _update_response_container_and_get_output(self, req_id: str):
"""
Update the request's info in response_container
:param req_id: str, request id
:return: Union None or outputs
"""
with self._rw_lock.writer_lock:
response = self.response_container[req_id]
result = []
if response.is_aborted:
del self.response_container[req_id]
elif response.is_ended:
while True:
try:
result.append(response.output.get_nowait())
except (_queue.Empty, queue.Empty):
break
del self.response_container[req_id]
# update whether send the first token
else:
# ensure the queue is not empty in _slo_checker
try:
result.append(response.output.get_nowait())
if not response.have_sent_prefill:
self.response_container[req_id].have_sent_prefill = True
self.response_container[req_id].last_processed_time = time.time()
except (_queue.Empty, queue.Empty):
pass
return result

def stop(self) -> None:
"""
End buffer_response_thread
:return: None
"""
self._running = False
if self._buffer_response_thread and self._buffer_response_thread.is_alive():
self._buffer_response_thread.join(timeout=10.0)

@staticmethod
def bind_fixed_params(*fixed_args: Any, **fixed_kwargs: Any):
"""
Decorator to bind fixed parameters or arguments to callback function
:param fixed_args: fixed positional arguments
:param fixed_kwargs: fixed named arguments
:return: callback func with fixed parameters or arguments
"""
def decorator(callback: Callable[..., None]) -> Callable[..., None]:
def wrapped(*dynamic_args: Any, **dynamic_kwargs: Any) -> None:
callback(*fixed_args, *dynamic_args, **fixed_kwargs, **dynamic_kwargs)
return wrapped
return decorator No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This file appears to be an exact duplicate of vllm/v1/core/sched/policy/buffer_response_processor.py. The async_llm.py module imports from buffer_response_processor.py, which suggests this __init__.py file may be unnecessary and could cause confusion. To improve maintainability, please consider removing this file. If this file is intended to be the package's __init__.py, it should instead import the necessary classes from buffer_response_processor.py to avoid code duplication.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines 580 to 581
if self.buffer_response:
self.buffer_response_processor.abort_request(request_id)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Abort buffered requests with defined request id

When buffer_response is enabled, the output handler tries to call self.buffer_response_processor.abort_request(request_id) after processing buffered outputs, but request_id is undefined in that scope (lines 580-581 of vllm/v1/engine/async_llm.py). As soon as the first EngineCoreOutputs with is_buffered_outputs=True is processed, this raises NameError and kills the output handler task, so buffered responses can never be delivered to callers.

Useful? React with 👍 / 👎.

@mergify
Copy link

mergify bot commented Dec 6, 2025

Hi @Pr0Wh1teGivee, the pre-commit checks have failed. Please run:

uv pip install pre-commit
pre-commit install
pre-commit run --all-files

Then, commit the changes and push to your branch.

For future commits, pre-commit will run automatically on changed files before each commit.

2 similar comments
@mergify
Copy link

mergify bot commented Dec 6, 2025

Hi @Pr0Wh1teGivee, the pre-commit checks have failed. Please run:

uv pip install pre-commit
pre-commit install
pre-commit run --all-files

Then, commit the changes and push to your branch.

For future commits, pre-commit will run automatically on changed files before each commit.

@mergify
Copy link

mergify bot commented Dec 6, 2025

Hi @Pr0Wh1teGivee, the pre-commit checks have failed. Please run:

uv pip install pre-commit
pre-commit install
pre-commit run --all-files

Then, commit the changes and push to your branch.

For future commits, pre-commit will run automatically on changed files before each commit.

@mergify
Copy link

mergify bot commented Dec 6, 2025

Hi @Pr0Wh1teGivee, the pre-commit checks have failed. Please run:

uv pip install pre-commit
pre-commit install
pre-commit run --all-files

Then, commit the changes and push to your branch.

For future commits, pre-commit will run automatically on changed files before each commit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant