Skip to content

vllm.entrypoints.openai.translations.speech_to_text

Modules:

Name Description
envs

PromptType module-attribute

Schema for any prompt, regardless of model type.

This is the input format accepted by most LLM APIs.

EncoderDecoderDictPrompt

Bases: TypedDict

A EncoderDecoderPrompt that has been standardized into a dictionary.

Source code in vllm/renderers/inputs/preprocess.py
class EncoderDecoderDictPrompt(TypedDict):
    """
    A [`EncoderDecoderPrompt`][vllm.inputs.data.EncoderDecoderPrompt]
    that has been standardized into a dictionary.
    """

    encoder_prompt: EncoderDictPrompt

    decoder_prompt: DecoderDictPrompt | None

EngineClient

Bases: ABC

Protocol class for Clients to Engine

Source code in vllm/engine/protocol.py
class EngineClient(ABC):
    """Protocol class for Clients to Engine"""

    vllm_config: VllmConfig
    model_config: ModelConfig
    input_processor: InputProcessor
    io_processor: IOProcessor | None

    @property
    @abstractmethod
    def renderer(self) -> BaseRenderer: ...

    @property
    @abstractmethod
    def is_running(self) -> bool: ...

    @property
    @abstractmethod
    def is_stopped(self) -> bool: ...

    @property
    @abstractmethod
    def errored(self) -> bool: ...

    @property
    @abstractmethod
    def dead_error(self) -> BaseException: ...

    @abstractmethod
    def generate(
        self,
        prompt: EngineCoreRequest
        | PromptType
        | DictPrompt
        | TokPrompt
        | AsyncGenerator[StreamingInput, None],
        sampling_params: SamplingParams,
        request_id: str,
        *,
        prompt_text: str | None = None,
        lora_request: LoRARequest | None = None,
        tokenization_kwargs: dict[str, Any] | None = None,
        trace_headers: Mapping[str, str] | None = None,
        priority: int = 0,
        data_parallel_rank: int | None = None,
    ) -> AsyncGenerator[RequestOutput, None]:
        """Generate outputs for a request."""
        ...

    @abstractmethod
    def encode(
        self,
        prompt: PromptType | DictPrompt | TokPrompt,
        pooling_params: PoolingParams,
        request_id: str,
        lora_request: LoRARequest | None = None,
        trace_headers: Mapping[str, str] | None = None,
        priority: int = 0,
        tokenization_kwargs: dict[str, Any] | None = None,
    ) -> AsyncGenerator[PoolingRequestOutput, None]:
        """Generate outputs for a request from a pooling model."""
        ...

    @abstractmethod
    async def abort(self, request_id: str | Iterable[str]) -> None:
        """Abort a request.

        Args:
            request_id: The unique id of the request,
                        or an iterable of such ids.
        """
        ...

    @abstractmethod
    async def is_tracing_enabled(self) -> bool: ...

    @abstractmethod
    async def do_log_stats(self) -> None: ...

    @abstractmethod
    async def check_health(self) -> None:
        """Raise if unhealthy"""
        ...

    @abstractmethod
    async def start_profile(self) -> None:
        """Start profiling the engine"""
        ...

    @abstractmethod
    async def stop_profile(self) -> None:
        """Stop profiling the engine"""
        ...

    @abstractmethod
    async def reset_mm_cache(self) -> None:
        """Reset the multi-modal cache"""
        ...

    @abstractmethod
    async def reset_encoder_cache(self) -> None:
        """Reset the encoder cache"""
        ...

    @abstractmethod
    async def reset_prefix_cache(
        self, reset_running_requests: bool = False, reset_connector: bool = False
    ) -> bool:
        """Reset the prefix cache and optionally any configured connector cache"""
        ...

    @abstractmethod
    async def sleep(self, level: int = 1) -> None:
        """Sleep the engine"""
        ...

    @abstractmethod
    async def wake_up(self, tags: list[str] | None = None) -> None:
        """Wake up the engine"""
        ...

    @abstractmethod
    async def is_sleeping(self) -> bool:
        """Check whether the engine is sleeping"""
        ...

    @abstractmethod
    async def add_lora(self, lora_request: LoRARequest) -> bool:
        """Load a new LoRA adapter into the engine for future requests."""
        ...

    @abstractmethod
    async def pause_generation(
        self,
        *,
        mode: "PauseMode" = "abort",
        wait_for_inflight_requests: bool = False,
        clear_cache: bool = True,
    ) -> None:
        """Pause new generation/encoding requests.

        Args:
            mode: How to handle in-flight requests:
                - ``"abort"``: Abort all in-flight requests immediately
                  and return partial results with "abort" reason (default).
                - ``"wait"``: Wait for in-flight requests to complete.
                - ``"keep"``: Freeze requests in queue; they resume on
                  :meth:`resume_generation`.
            wait_for_inflight_requests: DEPRECATED. Use ``mode="wait"`` instead.
            clear_cache: DEPRECATED. Whether to clear KV and prefix caches
                after draining.
        """
        ...

    @abstractmethod
    async def resume_generation(self) -> None:
        """Resume accepting generation/encoding requests."""
        ...

    @abstractmethod
    async def is_paused(self) -> bool:
        """Return whether the engine is currently paused."""
        ...

    async def scale_elastic_ep(
        self, new_data_parallel_size: int, drain_timeout: int = 300
    ) -> None:
        """Scale the engine"""
        raise NotImplementedError

    async def collective_rpc(
        self,
        method: str,
        timeout: float | None = None,
        args: tuple = (),
        kwargs: dict | None = None,
    ):
        """Perform a collective RPC call to the given path."""
        raise NotImplementedError

    async def get_supported_tasks(self) -> tuple[SupportedTask, ...]:
        """Get supported tasks"""
        raise NotImplementedError

    async def init_weight_transfer_engine(
        self, init_request: WeightTransferInitRequest
    ) -> None:
        """Initialize weight transfer for RL training."""
        raise NotImplementedError

    async def update_weights(self, request: WeightTransferUpdateRequest) -> None:
        """Batched weight update for RL training."""
        raise NotImplementedError

abort abstractmethod async

abort(request_id: str | Iterable[str]) -> None

Abort a request.

Parameters:

Name Type Description Default
request_id str | Iterable[str]

The unique id of the request, or an iterable of such ids.

required
Source code in vllm/engine/protocol.py
@abstractmethod
async def abort(self, request_id: str | Iterable[str]) -> None:
    """Abort a request.

    Args:
        request_id: The unique id of the request,
                    or an iterable of such ids.
    """
    ...

add_lora abstractmethod async

add_lora(lora_request: LoRARequest) -> bool

Load a new LoRA adapter into the engine for future requests.

Source code in vllm/engine/protocol.py
@abstractmethod
async def add_lora(self, lora_request: LoRARequest) -> bool:
    """Load a new LoRA adapter into the engine for future requests."""
    ...

check_health abstractmethod async

check_health() -> None

Raise if unhealthy

Source code in vllm/engine/protocol.py
@abstractmethod
async def check_health(self) -> None:
    """Raise if unhealthy"""
    ...

collective_rpc async

collective_rpc(
    method: str,
    timeout: float | None = None,
    args: tuple = (),
    kwargs: dict | None = None,
)

Perform a collective RPC call to the given path.

Source code in vllm/engine/protocol.py
async def collective_rpc(
    self,
    method: str,
    timeout: float | None = None,
    args: tuple = (),
    kwargs: dict | None = None,
):
    """Perform a collective RPC call to the given path."""
    raise NotImplementedError

encode abstractmethod

encode(
    prompt: PromptType | DictPrompt | TokPrompt,
    pooling_params: PoolingParams,
    request_id: str,
    lora_request: LoRARequest | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    tokenization_kwargs: dict[str, Any] | None = None,
) -> AsyncGenerator[PoolingRequestOutput, None]

Generate outputs for a request from a pooling model.

Source code in vllm/engine/protocol.py
@abstractmethod
def encode(
    self,
    prompt: PromptType | DictPrompt | TokPrompt,
    pooling_params: PoolingParams,
    request_id: str,
    lora_request: LoRARequest | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    tokenization_kwargs: dict[str, Any] | None = None,
) -> AsyncGenerator[PoolingRequestOutput, None]:
    """Generate outputs for a request from a pooling model."""
    ...

generate abstractmethod

generate(
    prompt: EngineCoreRequest
    | PromptType
    | DictPrompt
    | TokPrompt
    | AsyncGenerator[StreamingInput, None],
    sampling_params: SamplingParams,
    request_id: str,
    *,
    prompt_text: str | None = None,
    lora_request: LoRARequest | None = None,
    tokenization_kwargs: dict[str, Any] | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    data_parallel_rank: int | None = None,
) -> AsyncGenerator[RequestOutput, None]

Generate outputs for a request.

Source code in vllm/engine/protocol.py
@abstractmethod
def generate(
    self,
    prompt: EngineCoreRequest
    | PromptType
    | DictPrompt
    | TokPrompt
    | AsyncGenerator[StreamingInput, None],
    sampling_params: SamplingParams,
    request_id: str,
    *,
    prompt_text: str | None = None,
    lora_request: LoRARequest | None = None,
    tokenization_kwargs: dict[str, Any] | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    data_parallel_rank: int | None = None,
) -> AsyncGenerator[RequestOutput, None]:
    """Generate outputs for a request."""
    ...

get_supported_tasks async

get_supported_tasks() -> tuple[SupportedTask, ...]

Get supported tasks

Source code in vllm/engine/protocol.py
async def get_supported_tasks(self) -> tuple[SupportedTask, ...]:
    """Get supported tasks"""
    raise NotImplementedError

init_weight_transfer_engine async

init_weight_transfer_engine(
    init_request: WeightTransferInitRequest,
) -> None

Initialize weight transfer for RL training.

Source code in vllm/engine/protocol.py
async def init_weight_transfer_engine(
    self, init_request: WeightTransferInitRequest
) -> None:
    """Initialize weight transfer for RL training."""
    raise NotImplementedError

is_paused abstractmethod async

is_paused() -> bool

Return whether the engine is currently paused.

Source code in vllm/engine/protocol.py
@abstractmethod
async def is_paused(self) -> bool:
    """Return whether the engine is currently paused."""
    ...

is_sleeping abstractmethod async

is_sleeping() -> bool

Check whether the engine is sleeping

Source code in vllm/engine/protocol.py
@abstractmethod
async def is_sleeping(self) -> bool:
    """Check whether the engine is sleeping"""
    ...

pause_generation abstractmethod async

pause_generation(
    *,
    mode: PauseMode = "abort",
    wait_for_inflight_requests: bool = False,
    clear_cache: bool = True,
) -> None

Pause new generation/encoding requests.

Parameters:

Name Type Description Default
mode PauseMode

How to handle in-flight requests: - "abort": Abort all in-flight requests immediately and return partial results with "abort" reason (default). - "wait": Wait for in-flight requests to complete. - "keep": Freeze requests in queue; they resume on :meth:resume_generation.

'abort'
wait_for_inflight_requests bool

DEPRECATED. Use mode="wait" instead.

False
clear_cache bool

DEPRECATED. Whether to clear KV and prefix caches after draining.

True
Source code in vllm/engine/protocol.py
@abstractmethod
async def pause_generation(
    self,
    *,
    mode: "PauseMode" = "abort",
    wait_for_inflight_requests: bool = False,
    clear_cache: bool = True,
) -> None:
    """Pause new generation/encoding requests.

    Args:
        mode: How to handle in-flight requests:
            - ``"abort"``: Abort all in-flight requests immediately
              and return partial results with "abort" reason (default).
            - ``"wait"``: Wait for in-flight requests to complete.
            - ``"keep"``: Freeze requests in queue; they resume on
              :meth:`resume_generation`.
        wait_for_inflight_requests: DEPRECATED. Use ``mode="wait"`` instead.
        clear_cache: DEPRECATED. Whether to clear KV and prefix caches
            after draining.
    """
    ...

reset_encoder_cache abstractmethod async

reset_encoder_cache() -> None

Reset the encoder cache

Source code in vllm/engine/protocol.py
@abstractmethod
async def reset_encoder_cache(self) -> None:
    """Reset the encoder cache"""
    ...

reset_mm_cache abstractmethod async

reset_mm_cache() -> None

Reset the multi-modal cache

Source code in vllm/engine/protocol.py
@abstractmethod
async def reset_mm_cache(self) -> None:
    """Reset the multi-modal cache"""
    ...

reset_prefix_cache abstractmethod async

reset_prefix_cache(
    reset_running_requests: bool = False,
    reset_connector: bool = False,
) -> bool

Reset the prefix cache and optionally any configured connector cache

Source code in vllm/engine/protocol.py
@abstractmethod
async def reset_prefix_cache(
    self, reset_running_requests: bool = False, reset_connector: bool = False
) -> bool:
    """Reset the prefix cache and optionally any configured connector cache"""
    ...

resume_generation abstractmethod async

resume_generation() -> None

Resume accepting generation/encoding requests.

Source code in vllm/engine/protocol.py
@abstractmethod
async def resume_generation(self) -> None:
    """Resume accepting generation/encoding requests."""
    ...

scale_elastic_ep async

scale_elastic_ep(
    new_data_parallel_size: int, drain_timeout: int = 300
) -> None

Scale the engine

Source code in vllm/engine/protocol.py
async def scale_elastic_ep(
    self, new_data_parallel_size: int, drain_timeout: int = 300
) -> None:
    """Scale the engine"""
    raise NotImplementedError

sleep abstractmethod async

sleep(level: int = 1) -> None

Sleep the engine

Source code in vllm/engine/protocol.py
@abstractmethod
async def sleep(self, level: int = 1) -> None:
    """Sleep the engine"""
    ...

start_profile abstractmethod async

start_profile() -> None

Start profiling the engine

Source code in vllm/engine/protocol.py
@abstractmethod
async def start_profile(self) -> None:
    """Start profiling the engine"""
    ...

stop_profile abstractmethod async

stop_profile() -> None

Stop profiling the engine

Source code in vllm/engine/protocol.py
@abstractmethod
async def stop_profile(self) -> None:
    """Stop profiling the engine"""
    ...

update_weights async

update_weights(
    request: WeightTransferUpdateRequest,
) -> None

Batched weight update for RL training.

Source code in vllm/engine/protocol.py
async def update_weights(self, request: WeightTransferUpdateRequest) -> None:
    """Batched weight update for RL training."""
    raise NotImplementedError

wake_up abstractmethod async

wake_up(tags: list[str] | None = None) -> None

Wake up the engine

Source code in vllm/engine/protocol.py
@abstractmethod
async def wake_up(self, tags: list[str] | None = None) -> None:
    """Wake up the engine"""
    ...

FlatLogprobs dataclass

Bases: MutableSequence[LogprobsOnePosition | None]

Flat logprobs of a request into multiple primitive type lists.

Compared to list[dict[int, Logprob]], this data structure reduced GC overhead significantly. As it flattened logprob information for all positions and ranks in to multiple primitive type lists (i.e. logprobs, token_ids, ranks per token_ids, decoded_tokens). So regardless of the sequence length and top_logprobs setup, FlatLogprobs would only introduce a constant amount of objects.

As each position might contains different amount of ranks, start_indices_per_position would be used to access the logprob ranges for different positions.

NOTE: To reduce the migration overhead and improve backward compatibility, we support the key Sequence APIs of list, so it could act as list[LogprobsOnePosition]

Source code in vllm/logprobs.py
@dataclass
class FlatLogprobs(MutableSequence[LogprobsOnePosition | None]):
    """
    Flat logprobs of a request into multiple primitive type lists.

    Compared to list[dict[int, Logprob]], this data structure reduced GC
    overhead significantly. As it flattened logprob information for
    all positions and ranks in to multiple primitive type lists (i.e.
    logprobs, token_ids, ranks per token_ids, decoded_tokens).
    So regardless of the sequence length and top_logprobs setup,
    FlatLogprobs would only introduce a constant amount of objects.

    As each position might contains different amount of ranks,
    start_indices_per_position would be used to access the logprob ranges
    for different positions.

    NOTE: To reduce the migration overhead and improve backward compatibility,
    we support the key Sequence APIs of list, so it could act as
    list[LogprobsOnePosition]
    """

    # Start / end indices to indicate the range of logprobs for each position.
    start_indices: list[int] = field(default_factory=list)
    end_indices: list[int] = field(default_factory=list)

    # Flatten Logprob information for (each position, rank).
    # For position <i>, the logprobs are ranged
    # from self.start_indices[i] to self.end_indices[i] (exclusive).
    token_ids: list[int] = field(default_factory=list)
    logprobs: list[float] = field(default_factory=list)
    ranks: list[int | None] = field(default_factory=list)
    decoded_tokens: list[str | None] = field(default_factory=list)

    def append(self, logprobs_one_position: LogprobsOnePosition | None) -> None:
        """Appends the container with logprobs for the next position"""
        self.start_indices.append(len(self.logprobs))
        if logprobs_one_position:
            for token_id, logprob in logprobs_one_position.items():
                self.token_ids.append(token_id)
                self.logprobs.append(logprob.logprob)
                self.ranks.append(logprob.rank)
                self.decoded_tokens.append(logprob.decoded_token)
        self.end_indices.append(len(self.logprobs))

    def append_fast(
        self,
        token_ids: list[int],
        logprobs: list[float],
        ranks: itertools.chain[int],
        decoded_tokens: Iterable[str | None],
    ) -> None:
        """
        Appends logprobs for the next position without creating
        the intermediate logprob dictionary.
        """
        self.start_indices.append(len(self.logprobs))
        for token_id, logprob, rank, decoded_token in zip(
            token_ids, logprobs, ranks, decoded_tokens
        ):
            self.token_ids.append(token_id)
            self.logprobs.append(logprob)
            self.ranks.append(rank)
            self.decoded_tokens.append(decoded_token)
        self.end_indices.append(len(self.logprobs))

    def extend(self, logprobs_multi_positions) -> None:
        """Extends the container with logprobs for the next multiple positions"""
        for logprobs_one_position in logprobs_multi_positions:
            self.append(logprobs_one_position)

    def __len__(self) -> int:
        """Gets number of positions stored in the container"""
        return len(self.start_indices)

    @overload
    def __getitem__(self, position: int) -> LogprobsOnePosition: ...

    @overload
    def __getitem__(self, s: slice, /) -> "FlatLogprobs": ...

    def __getitem__(self, index: int | slice):
        """Extracts logprobs of a given position or slice"""
        if isinstance(index, int):
            return {
                self.token_ids[i]: Logprob(
                    logprob=self.logprobs[i],
                    rank=self.ranks[i],
                    decoded_token=self.decoded_tokens[i],
                )
                for i in range(self.start_indices[index], self.end_indices[index])
            }
        elif isinstance(index, slice):
            min_index = self.start_indices[index][0]
            max_index = self.end_indices[index][-1]
            return FlatLogprobs(
                # Shift updated start_indices and end_indices to
                # be 0-indexed
                start_indices=[i - min_index for i in self.start_indices[index]],
                end_indices=[i - min_index for i in self.end_indices[index]],
                token_ids=self.token_ids[min_index:max_index],
                logprobs=self.logprobs[min_index:max_index],
                ranks=self.ranks[min_index:max_index],
                decoded_tokens=self.decoded_tokens[min_index:max_index],
            )
        else:
            raise TypeError(f"Invalid index type: {type(index)}")

    def __setitem__(self, item, value) -> None:
        raise TypeError("Cannot set logprobs in FlatLogprobs")

    def __delitem__(self, item) -> None:
        raise TypeError("Cannot delete logprobs from FlatLogprobs")

    def insert(self, index: int, value: dict[int, Logprob] | None) -> None:
        raise TypeError("Cannot insert logprobs to FlatLogprobs")

    def __iter__(self) -> Iterator[LogprobsOnePosition]:
        """
        Iterates the container and yields LogprobsOnePosition for
        each position.
        """
        for i in range(0, len(self.start_indices)):
            yield self.__getitem__(i)

__getitem__

__getitem__(position: int) -> LogprobsOnePosition
__getitem__(s: slice) -> FlatLogprobs
__getitem__(index: int | slice)

Extracts logprobs of a given position or slice

Source code in vllm/logprobs.py
def __getitem__(self, index: int | slice):
    """Extracts logprobs of a given position or slice"""
    if isinstance(index, int):
        return {
            self.token_ids[i]: Logprob(
                logprob=self.logprobs[i],
                rank=self.ranks[i],
                decoded_token=self.decoded_tokens[i],
            )
            for i in range(self.start_indices[index], self.end_indices[index])
        }
    elif isinstance(index, slice):
        min_index = self.start_indices[index][0]
        max_index = self.end_indices[index][-1]
        return FlatLogprobs(
            # Shift updated start_indices and end_indices to
            # be 0-indexed
            start_indices=[i - min_index for i in self.start_indices[index]],
            end_indices=[i - min_index for i in self.end_indices[index]],
            token_ids=self.token_ids[min_index:max_index],
            logprobs=self.logprobs[min_index:max_index],
            ranks=self.ranks[min_index:max_index],
            decoded_tokens=self.decoded_tokens[min_index:max_index],
        )
    else:
        raise TypeError(f"Invalid index type: {type(index)}")

__iter__

__iter__() -> Iterator[LogprobsOnePosition]

Iterates the container and yields LogprobsOnePosition for each position.

Source code in vllm/logprobs.py
def __iter__(self) -> Iterator[LogprobsOnePosition]:
    """
    Iterates the container and yields LogprobsOnePosition for
    each position.
    """
    for i in range(0, len(self.start_indices)):
        yield self.__getitem__(i)

__len__

__len__() -> int

Gets number of positions stored in the container

Source code in vllm/logprobs.py
def __len__(self) -> int:
    """Gets number of positions stored in the container"""
    return len(self.start_indices)

append

append(
    logprobs_one_position: LogprobsOnePosition | None,
) -> None

Appends the container with logprobs for the next position

Source code in vllm/logprobs.py
def append(self, logprobs_one_position: LogprobsOnePosition | None) -> None:
    """Appends the container with logprobs for the next position"""
    self.start_indices.append(len(self.logprobs))
    if logprobs_one_position:
        for token_id, logprob in logprobs_one_position.items():
            self.token_ids.append(token_id)
            self.logprobs.append(logprob.logprob)
            self.ranks.append(logprob.rank)
            self.decoded_tokens.append(logprob.decoded_token)
    self.end_indices.append(len(self.logprobs))

append_fast

append_fast(
    token_ids: list[int],
    logprobs: list[float],
    ranks: chain[int],
    decoded_tokens: Iterable[str | None],
) -> None

Appends logprobs for the next position without creating the intermediate logprob dictionary.

Source code in vllm/logprobs.py
def append_fast(
    self,
    token_ids: list[int],
    logprobs: list[float],
    ranks: itertools.chain[int],
    decoded_tokens: Iterable[str | None],
) -> None:
    """
    Appends logprobs for the next position without creating
    the intermediate logprob dictionary.
    """
    self.start_indices.append(len(self.logprobs))
    for token_id, logprob, rank, decoded_token in zip(
        token_ids, logprobs, ranks, decoded_tokens
    ):
        self.token_ids.append(token_id)
        self.logprobs.append(logprob)
        self.ranks.append(rank)
        self.decoded_tokens.append(decoded_token)
    self.end_indices.append(len(self.logprobs))

extend

extend(logprobs_multi_positions) -> None

Extends the container with logprobs for the next multiple positions

Source code in vllm/logprobs.py
def extend(self, logprobs_multi_positions) -> None:
    """Extends the container with logprobs for the next multiple positions"""
    for logprobs_one_position in logprobs_multi_positions:
        self.append(logprobs_one_position)

Logprob dataclass

Infos for supporting OpenAI compatible logprobs and token ranks.

Attributes:

Name Type Description
logprob float

The logprob of chosen token

rank int | None

The vocab rank of chosen token (>=1)

decoded_token str | None

The decoded chosen token index

Source code in vllm/logprobs.py
@dataclass
class Logprob:
    """Infos for supporting OpenAI compatible logprobs and token ranks.

    Attributes:
        logprob: The logprob of chosen token
        rank: The vocab rank of chosen token (>=1)
        decoded_token: The decoded chosen token index
    """

    logprob: float
    rank: int | None = None
    decoded_token: str | None = None

OpenAIServing

Source code in vllm/entrypoints/openai/engine/serving.py
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
class OpenAIServing:
    request_id_prefix: ClassVar[str] = """
    A short string prepended to every request’s ID (e.g. "embd", "classify")
    so you can easily tell “this ID came from Embedding vs Classification.”
    """

    def __init__(
        self,
        engine_client: EngineClient,
        models: OpenAIServingModels,
        *,
        request_logger: RequestLogger | None,
        return_tokens_as_token_ids: bool = False,
        log_error_stack: bool = False,
    ):
        super().__init__()

        self.engine_client = engine_client

        self.models = models

        self.request_logger = request_logger
        self.return_tokens_as_token_ids = return_tokens_as_token_ids

        self.log_error_stack = log_error_stack

        self.input_processor = self.models.input_processor
        self.io_processor = self.models.io_processor
        self.renderer = self.models.renderer
        self.model_config = self.models.model_config
        self.max_model_len = self.model_config.max_model_len

    async def beam_search(
        self,
        prompt: TokPrompt,
        request_id: str,
        params: BeamSearchParams,
        lora_request: LoRARequest | None = None,
        trace_headers: Mapping[str, str] | None = None,
    ) -> AsyncGenerator[RequestOutput, None]:
        beam_width = params.beam_width
        max_tokens = params.max_tokens
        ignore_eos = params.ignore_eos
        temperature = params.temperature
        length_penalty = params.length_penalty
        include_stop_str_in_output = params.include_stop_str_in_output

        input_processor = self.input_processor
        tokenizer = input_processor.tokenizer
        if tokenizer is None:
            raise VLLMValidationError(
                "You cannot use beam search when `skip_tokenizer_init=True`",
                parameter="skip_tokenizer_init",
                value=True,
            )

        eos_token_id: int = tokenizer.eos_token_id  # type: ignore

        if isinstance(prompt, dict) and "encoder_prompt" in prompt:
            raise NotImplementedError("Encoder-decoder prompt not supported")

        prompt_text: str | None = prompt.get("prompt")  # type: ignore
        prompt_token_ids: list[int] = prompt.get("prompt_token_ids", [])  # type: ignore
        multi_modal_data: MultiModalDataDict | None = prompt.get("multi_modal_data")  # type: ignore

        mm_processor_kwargs: dict[str, Any] | None = None

        # This is a workaround to fix multimodal beam search; this is a
        # bandaid fix for 2 small problems:
        # 1. Multi_modal_data on the processed_inputs currently resolves to
        #    `None`.
        # 2. preprocessing above expands the multimodal placeholders. However,
        #    this happens again in generation, so the double expansion causes
        #    a mismatch.
        # TODO - would be ideal to handle this more gracefully.

        tokenized_length = len(prompt_token_ids)

        sort_beams_key = create_sort_beams_key_function(eos_token_id, length_penalty)

        logprobs_num = 2 * beam_width
        beam_search_params = SamplingParams(
            logprobs=logprobs_num,
            max_tokens=1,
            temperature=temperature,
        )
        all_beams = [
            BeamSearchSequence(
                tokens=prompt_token_ids,
                cum_logprob=0,
                logprobs=[],
                multi_modal_data=multi_modal_data,
                mm_processor_kwargs=mm_processor_kwargs,
                lora_request=lora_request,
            )
        ]
        completed = []

        for _ in range(max_tokens):
            prompts_batch, lora_req_batch = zip(
                *[
                    (
                        TokensPrompt(
                            prompt_token_ids=beam.tokens,
                            multi_modal_data=beam.multi_modal_data,
                            mm_processor_kwargs=beam.mm_processor_kwargs,
                        ),
                        beam.lora_request,
                    )
                    for beam in all_beams
                ]
            )

            tasks = []
            request_id_batch = f"{request_id}-{random_uuid()}"

            for i, (individual_prompt, lora_req) in enumerate(
                zip(prompts_batch, lora_req_batch)
            ):
                request_id_item = f"{request_id_batch}-beam-{i}"
                task = asyncio.create_task(
                    collect_from_async_generator(
                        self.engine_client.generate(
                            individual_prompt,
                            beam_search_params,
                            request_id_item,
                            lora_request=lora_req,
                            trace_headers=trace_headers,
                        )
                    )
                )
                tasks.append(task)

            output = [x[0] for x in await asyncio.gather(*tasks)]

            new_beams = []
            # Store all new tokens generated by beam
            all_beams_token_id = []
            # Store the cumulative probability of all tokens
            # generated by beam search
            all_beams_logprob = []
            # Iterate through all beam inference results
            for i, result in enumerate(output):
                current_beam = all_beams[i]

                # check for error finish reason and abort beam search
                if result.outputs[0].finish_reason == "error":
                    # yield error output and terminate beam search
                    yield RequestOutput(
                        request_id=request_id,
                        prompt=prompt_text,
                        outputs=[
                            CompletionOutput(
                                index=0,
                                text="",
                                token_ids=[],
                                cumulative_logprob=None,
                                logprobs=None,
                                finish_reason="error",
                            )
                        ],
                        finished=True,
                        prompt_token_ids=prompt_token_ids,
                        prompt_logprobs=None,
                    )
                    return

                if result.outputs[0].logprobs is not None:
                    logprobs = result.outputs[0].logprobs[0]
                    all_beams_token_id.extend(list(logprobs.keys()))
                    all_beams_logprob.extend(
                        [
                            current_beam.cum_logprob + obj.logprob
                            for obj in logprobs.values()
                        ]
                    )

            # Handle the token for the end of sentence (EOS)
            all_beams_token_id = np.array(all_beams_token_id)
            all_beams_logprob = np.array(all_beams_logprob)

            if not ignore_eos:
                # Get the index position of eos token in all generated results
                eos_idx = np.where(all_beams_token_id == eos_token_id)[0]
                for idx in eos_idx:
                    current_beam = all_beams[idx // logprobs_num]
                    result = output[idx // logprobs_num]
                    assert result.outputs[0].logprobs is not None
                    logprobs_entry = result.outputs[0].logprobs[0]
                    completed.append(
                        BeamSearchSequence(
                            tokens=current_beam.tokens + [eos_token_id]
                            if include_stop_str_in_output
                            else current_beam.tokens,
                            logprobs=current_beam.logprobs + [logprobs_entry],
                            cum_logprob=float(all_beams_logprob[idx]),
                            finish_reason="stop",
                            stop_reason=eos_token_id,
                        )
                    )
                # After processing, set the log probability of the eos condition
                # to negative infinity.
                all_beams_logprob[eos_idx] = -np.inf

            # Processing non-EOS tokens
            # Get indices of the top beam_width probabilities
            topn_idx = np.argpartition(np.negative(all_beams_logprob), beam_width)[
                :beam_width
            ]

            for idx in topn_idx:
                current_beam = all_beams[idx // logprobs_num]
                result = output[idx // logprobs_num]
                token_id = int(all_beams_token_id[idx])
                assert result.outputs[0].logprobs is not None
                logprobs_entry = result.outputs[0].logprobs[0]
                new_beams.append(
                    BeamSearchSequence(
                        tokens=current_beam.tokens + [token_id],
                        logprobs=current_beam.logprobs + [logprobs_entry],
                        lora_request=current_beam.lora_request,
                        cum_logprob=float(all_beams_logprob[idx]),
                        multi_modal_data=current_beam.multi_modal_data,
                        mm_processor_kwargs=current_beam.mm_processor_kwargs,
                    )
                )

            all_beams = new_beams

        completed.extend(all_beams)
        sorted_completed = sorted(completed, key=sort_beams_key, reverse=True)
        best_beams = sorted_completed[:beam_width]

        for beam in best_beams:
            if beam.tokens[-1] == eos_token_id and not ignore_eos:
                # Skip the eos token in the text.
                tokens = beam.tokens[tokenized_length:-1]
            else:
                tokens = beam.tokens[tokenized_length:]
            beam.text = tokenizer.decode(tokens)

        yield RequestOutput(
            request_id=request_id,
            prompt=prompt_text,
            outputs=[
                CompletionOutput(
                    text=beam.text,  # type: ignore
                    cumulative_logprob=beam.cum_logprob,
                    token_ids=beam.tokens[tokenized_length:],
                    index=i,
                    logprobs=beam.logprobs,
                    finish_reason=beam.finish_reason
                    if beam.finish_reason is not None
                    else "length",
                    stop_reason=beam.stop_reason,
                )
                for (i, beam) in enumerate(best_beams)
            ],
            finished=True,
            prompt_token_ids=prompt_token_ids,
            prompt_logprobs=None,
        )

    async def _preprocess(
        self,
        ctx: ServeContext,
    ) -> ErrorResponse | None:
        """
        Default preprocessing hook. Subclasses may override
        to prepare `ctx` (classification, embedding, etc.).
        """
        return None

    def _build_response(
        self,
        ctx: ServeContext,
    ) -> AnyResponse | ErrorResponse:
        """
        Default response builder. Subclass may override this method
        to return the appropriate response object.
        """
        return self.create_error_response("unimplemented endpoint")

    async def handle(
        self,
        ctx: ServeContext,
    ) -> AnyResponse | ErrorResponse:
        async for response in self._pipeline(ctx):
            return response

        return self.create_error_response("No response yielded from pipeline")

    async def _pipeline(
        self,
        ctx: ServeContext,
    ) -> AsyncGenerator[AnyResponse | ErrorResponse, None]:
        """Execute the request processing pipeline yielding responses."""
        if error := await self._check_model(ctx.request):
            yield error
        if error := self._validate_request(ctx):
            yield error

        preprocess_ret = await self._preprocess(ctx)
        if isinstance(preprocess_ret, ErrorResponse):
            yield preprocess_ret

        generators_ret = await self._prepare_generators(ctx)
        if isinstance(generators_ret, ErrorResponse):
            yield generators_ret

        collect_ret = await self._collect_batch(ctx)
        if isinstance(collect_ret, ErrorResponse):
            yield collect_ret

        yield self._build_response(ctx)

    def _validate_request(self, ctx: ServeContext) -> ErrorResponse | None:
        truncate_prompt_tokens = getattr(ctx.request, "truncate_prompt_tokens", None)

        if (
            truncate_prompt_tokens is not None
            and truncate_prompt_tokens > self.max_model_len
        ):
            return self.create_error_response(
                "truncate_prompt_tokens value is "
                "greater than max_model_len."
                " Please, select a smaller truncation size."
            )
        return None

    def _create_pooling_params(
        self,
        ctx: ServeContext,
    ) -> PoolingParams | ErrorResponse:
        if not hasattr(ctx.request, "to_pooling_params"):
            return self.create_error_response(
                "Request type does not support pooling parameters"
            )

        return ctx.request.to_pooling_params()

    async def _prepare_generators(
        self,
        ctx: ServeContext,
    ) -> ErrorResponse | None:
        """Schedule the request and get the result generator."""
        generators: list[AsyncGenerator[PoolingRequestOutput, None]] = []

        try:
            trace_headers = (
                None
                if ctx.raw_request is None
                else await self._get_trace_headers(ctx.raw_request.headers)
            )

            pooling_params = self._create_pooling_params(ctx)
            if isinstance(pooling_params, ErrorResponse):
                return pooling_params

            if ctx.engine_prompts is None:
                return self.create_error_response("Engine prompts not available")

            for i, engine_prompt in enumerate(ctx.engine_prompts):
                request_id_item = f"{ctx.request_id}-{i}"

                self._log_inputs(
                    request_id_item,
                    engine_prompt,
                    params=pooling_params,
                    lora_request=ctx.lora_request,
                )

                generator = self.engine_client.encode(
                    engine_prompt,
                    pooling_params,
                    request_id_item,
                    lora_request=ctx.lora_request,
                    trace_headers=trace_headers,
                    priority=getattr(ctx.request, "priority", 0),
                )

                generators.append(generator)

            ctx.result_generator = merge_async_iterators(*generators)

            return None

        except Exception as e:
            return self.create_error_response(e)

    async def _collect_batch(
        self,
        ctx: ServeContext,
    ) -> ErrorResponse | None:
        """Collect batch results from the result generator."""
        try:
            if ctx.engine_prompts is None:
                return self.create_error_response("Engine prompts not available")

            num_prompts = len(ctx.engine_prompts)
            final_res_batch: list[PoolingRequestOutput | None]
            final_res_batch = [None] * num_prompts

            if ctx.result_generator is None:
                return self.create_error_response("Result generator not available")

            async for i, res in ctx.result_generator:
                final_res_batch[i] = res

            if None in final_res_batch:
                return self.create_error_response(
                    "Failed to generate results for all prompts"
                )

            ctx.final_res_batch = [res for res in final_res_batch if res is not None]

            return None

        except Exception as e:
            return self.create_error_response(e)

    def create_error_response(
        self,
        message: str | Exception,
        err_type: str = "BadRequestError",
        status_code: HTTPStatus = HTTPStatus.BAD_REQUEST,
        param: str | None = None,
    ) -> ErrorResponse:
        exc: Exception | None = None

        if isinstance(message, Exception):
            exc = message

            from vllm.exceptions import VLLMValidationError

            if isinstance(exc, VLLMValidationError):
                err_type = "BadRequestError"
                status_code = HTTPStatus.BAD_REQUEST
                param = exc.parameter
            elif isinstance(exc, (ValueError, TypeError, RuntimeError, OverflowError)):
                # Common validation errors from user input
                err_type = "BadRequestError"
                status_code = HTTPStatus.BAD_REQUEST
                param = None
            elif isinstance(exc, NotImplementedError):
                err_type = "NotImplementedError"
                status_code = HTTPStatus.NOT_IMPLEMENTED
                param = None
            elif exc.__class__.__name__ == "TemplateError":
                # jinja2.TemplateError (avoid importing jinja2)
                err_type = "BadRequestError"
                status_code = HTTPStatus.BAD_REQUEST
                param = None
            else:
                err_type = "InternalServerError"
                status_code = HTTPStatus.INTERNAL_SERVER_ERROR
                param = None

            message = str(exc)

        if self.log_error_stack:
            exc_type, _, _ = sys.exc_info()
            if exc_type is not None:
                traceback.print_exc()
            else:
                traceback.print_stack()

        return ErrorResponse(
            error=ErrorInfo(
                message=sanitize_message(message),
                type=err_type,
                code=status_code.value,
                param=param,
            )
        )

    def create_streaming_error_response(
        self,
        message: str | Exception,
        err_type: str = "BadRequestError",
        status_code: HTTPStatus = HTTPStatus.BAD_REQUEST,
        param: str | None = None,
    ) -> str:
        json_str = json.dumps(
            self.create_error_response(
                message=message,
                err_type=err_type,
                status_code=status_code,
                param=param,
            ).model_dump()
        )
        return json_str

    def _raise_if_error(self, finish_reason: str | None, request_id: str) -> None:
        """Raise GenerationError if finish_reason indicates an error."""
        if finish_reason == "error":
            logger.error(
                "Request %s failed with an internal error during generation",
                request_id,
            )
            raise GenerationError("Internal server error")

    def _convert_generation_error_to_response(
        self, e: GenerationError
    ) -> ErrorResponse:
        """Convert GenerationError to ErrorResponse."""
        return self.create_error_response(
            str(e),
            err_type="InternalServerError",
            status_code=e.status_code,
        )

    def _convert_generation_error_to_streaming_response(
        self, e: GenerationError
    ) -> str:
        """Convert GenerationError to streaming error response."""
        return self.create_streaming_error_response(
            str(e),
            err_type="InternalServerError",
            status_code=e.status_code,
        )

    async def _check_model(
        self,
        request: AnyRequest,
    ) -> ErrorResponse | None:
        error_response = None

        if self._is_model_supported(request.model):
            return None
        if request.model in self.models.lora_requests:
            return None
        if (
            envs.VLLM_ALLOW_RUNTIME_LORA_UPDATING
            and request.model
            and (load_result := await self.models.resolve_lora(request.model))
        ):
            if isinstance(load_result, LoRARequest):
                return None
            if (
                isinstance(load_result, ErrorResponse)
                and load_result.error.code == HTTPStatus.BAD_REQUEST.value
            ):
                error_response = load_result

        return error_response or self.create_error_response(
            message=f"The model `{request.model}` does not exist.",
            err_type="NotFoundError",
            status_code=HTTPStatus.NOT_FOUND,
            param="model",
        )

    def _get_active_default_mm_loras(self, request: AnyRequest) -> LoRARequest | None:
        """Determine if there are any active default multimodal loras."""
        # TODO: Currently this is only enabled for chat completions
        # to be better aligned with only being enabled for .generate
        # when run offline. It would be nice to support additional
        # tasks types in the future.
        message_types = self._get_message_types(request)
        default_mm_loras = set()

        for lora in self.models.lora_requests.values():
            # Best effort match for default multimodal lora adapters;
            # There is probably a better way to do this, but currently
            # this matches against the set of 'types' in any content lists
            # up until '_', e.g., to match audio_url -> audio
            if lora.lora_name in message_types:
                default_mm_loras.add(lora)

        # Currently only support default modality specific loras if
        # we have exactly one lora matched on the request.
        if len(default_mm_loras) == 1:
            return default_mm_loras.pop()
        return None

    def _maybe_get_adapters(
        self,
        request: AnyRequest,
        supports_default_mm_loras: bool = False,
    ) -> LoRARequest | None:
        if request.model in self.models.lora_requests:
            return self.models.lora_requests[request.model]

        # Currently only support default modality specific loras
        # if we have exactly one lora matched on the request.
        if supports_default_mm_loras:
            default_mm_lora = self._get_active_default_mm_loras(request)
            if default_mm_lora is not None:
                return default_mm_lora

        if self._is_model_supported(request.model):
            return None

        # if _check_model has been called earlier, this will be unreachable
        raise ValueError(f"The model `{request.model}` does not exist.")

    def _get_message_types(self, request: AnyRequest) -> set[str]:
        """Retrieve the set of types from message content dicts up
        until `_`; we use this to match potential multimodal data
        with default per modality loras.
        """
        message_types: set[str] = set()

        if not hasattr(request, "messages"):
            return message_types

        messages = request.messages
        if messages is None or isinstance(messages, (str, bytes)):
            return message_types

        for message in messages:
            if (
                isinstance(message, dict)
                and "content" in message
                and isinstance(message["content"], list)
            ):
                for content_dict in message["content"]:
                    if "type" in content_dict:
                        message_types.add(content_dict["type"].split("_")[0])
        return message_types

    def _validate_input(
        self,
        request: object,
        input_ids: list[int],
        input_text: str,
    ) -> TokensPrompt:
        token_num = len(input_ids)

        # Note: EmbeddingRequest, ClassificationRequest,
        # and ScoreRequest doesn't have max_tokens
        if isinstance(
            request,
            (
                EmbeddingChatRequest,
                EmbeddingCompletionRequest,
                ScoreDataRequest,
                ScoreTextRequest,
                ScoreQueriesDocumentsRequest,
                RerankRequest,
                ClassificationCompletionRequest,
                ClassificationChatRequest,
            ),
        ):
            # Note: input length can be up to the entire model context length
            # since these requests don't generate tokens.
            if token_num > self.max_model_len:
                operations: dict[type[AnyRequest], str] = {
                    ScoreDataRequest: "score",
                    ScoreTextRequest: "score",
                    ScoreQueriesDocumentsRequest: "score",
                    ClassificationCompletionRequest: "classification",
                    ClassificationChatRequest: "classification",
                }
                operation = operations.get(type(request), "embedding generation")
                raise VLLMValidationError(
                    f"This model's maximum context length is "
                    f"{self.max_model_len} tokens. However, you requested "
                    f"{token_num} tokens in the input for {operation}. "
                    f"Please reduce the length of the input.",
                    parameter="input_tokens",
                    value=token_num,
                )
            return TokensPrompt(prompt=input_text, prompt_token_ids=input_ids)

        # Note: TokenizeRequest and DetokenizeRequest doesn't have max_tokens
        # and does not require model context length validation
        if isinstance(
            request,
            (TokenizeCompletionRequest, TokenizeChatRequest, DetokenizeRequest),
        ):
            return TokensPrompt(prompt=input_text, prompt_token_ids=input_ids)

        # chat completion endpoint supports max_completion_tokens
        if isinstance(request, ChatCompletionRequest):
            # TODO(#9845): remove max_tokens when field dropped from OpenAI API
            max_tokens = request.max_completion_tokens or request.max_tokens
        else:
            max_tokens = getattr(request, "max_tokens", None)

        # Note: input length can be up to model context length - 1 for
        # completion-like requests.
        if token_num >= self.max_model_len:
            raise VLLMValidationError(
                f"This model's maximum context length is "
                f"{self.max_model_len} tokens. However, your request has "
                f"{token_num} input tokens. Please reduce the length of "
                "the input messages.",
                parameter="input_tokens",
                value=token_num,
            )

        if max_tokens is not None and token_num + max_tokens > self.max_model_len:
            raise VLLMValidationError(
                "'max_tokens' or 'max_completion_tokens' is too large: "
                f"{max_tokens}. This model's maximum context length is "
                f"{self.max_model_len} tokens and your request has "
                f"{token_num} input tokens ({max_tokens} > {self.max_model_len}"
                f" - {token_num}).",
                parameter="max_tokens",
                value=max_tokens,
            )

        return TokensPrompt(prompt=input_text, prompt_token_ids=input_ids)

    def _validate_chat_template(
        self,
        request_chat_template: str | None,
        chat_template_kwargs: dict[str, Any] | None,
        trust_request_chat_template: bool,
    ) -> ErrorResponse | None:
        if not trust_request_chat_template and (
            request_chat_template is not None
            or (
                chat_template_kwargs
                and chat_template_kwargs.get("chat_template") is not None
            )
        ):
            return self.create_error_response(
                "Chat template is passed with request, but "
                "--trust-request-chat-template is not set. "
                "Refused request with untrusted chat template."
            )
        return None

    @staticmethod
    def _prepare_extra_chat_template_kwargs(
        request_chat_template_kwargs: dict[str, Any] | None = None,
        default_chat_template_kwargs: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        """Helper to merge server-default and request-specific chat template kwargs."""
        request_chat_template_kwargs = request_chat_template_kwargs or {}
        if default_chat_template_kwargs is None:
            return request_chat_template_kwargs
        # Apply server defaults first, then request kwargs override.
        return default_chat_template_kwargs | request_chat_template_kwargs

    async def _preprocess_completion(
        self,
        request: RendererRequest,
        prompt_input: str | list[str] | list[int] | list[list[int]] | None,
        prompt_embeds: bytes | list[bytes] | None,
    ) -> list[TokPrompt]:
        renderer = self.renderer
        model_config = self.model_config

        prompts = list[SingletonPrompt | bytes]()
        if prompt_embeds is not None:  # embeds take higher priority
            prompts.extend(prompt_to_seq(prompt_embeds))
        if prompt_input is not None:
            prompts.extend(prompt_to_seq(prompt_input))

        parsed_prompts = [
            (
                prompt
                if isinstance(prompt, bytes)
                else parse_model_prompt(model_config, prompt)
            )
            for prompt in prompts
        ]
        tok_params = request.build_tok_params(model_config)

        return await renderer.render_cmpl_async(
            parsed_prompts,
            tok_params,
            prompt_extras={
                k: v
                for k in ("mm_processor_kwargs", "cache_salt")
                if (v := getattr(request, k, None)) is not None
            },
        )

    async def _preprocess_chat(
        self,
        request: RendererChatRequest,
        messages: list[ChatCompletionMessageParam],
        default_template: str | None,
        default_template_content_format: ChatTemplateContentFormatOption,
        default_template_kwargs: dict[str, Any] | None,
        tool_dicts: list[dict[str, Any]] | None = None,
        tool_parser: Callable[[TokenizerLike], ToolParser] | None = None,
    ) -> tuple[list[ConversationMessage], list[TokPrompt]]:
        from vllm.tokenizers.mistral import MistralTokenizer

        renderer = self.renderer

        default_template_kwargs = merge_kwargs(
            default_template_kwargs,
            dict(
                tools=tool_dicts,
                tokenize=isinstance(renderer.tokenizer, MistralTokenizer),
            ),
        )

        tok_params = request.build_tok_params(self.model_config)
        chat_params = request.build_chat_params(
            default_template, default_template_content_format
        ).with_defaults(default_template_kwargs)

        (conversation,), (engine_prompt,) = await renderer.render_chat_async(
            [messages],
            chat_params,
            tok_params,
            prompt_extras={
                k: v
                for k in ("mm_processor_kwargs", "cache_salt")
                if (v := getattr(request, k, None)) is not None
            },
        )

        # tool parsing is done only if a tool_parser has been set and if
        # tool_choice is not "none" (if tool_choice is "none" but a tool_parser
        # is set, we want to prevent parsing a tool_call hallucinated by the LLM
        if tool_parser is not None:
            tool_choice = getattr(request, "tool_choice", "none")
            if tool_choice != "none":
                if not isinstance(request, ChatCompletionRequest | ResponsesRequest):
                    msg = (
                        "Tool usage is only supported for Chat Completions API "
                        "or Responses API requests."
                    )
                    raise NotImplementedError(msg)

                # TODO: Update adjust_request to accept ResponsesRequest
                tokenizer = renderer.get_tokenizer()
                request = tool_parser(tokenizer).adjust_request(request=request)  # type: ignore[arg-type]

        return conversation, [engine_prompt]

    def _extract_prompt_components(self, prompt: object):
        return extract_prompt_components(self.model_config, prompt)

    def _extract_prompt_text(self, prompt: object):
        return self._extract_prompt_components(prompt).text

    def _extract_prompt_len(self, prompt: object):
        return extract_prompt_len(self.model_config, prompt)

    async def _render_next_turn(
        self,
        request: ResponsesRequest,
        messages: list[ResponseInputOutputItem],
        tool_dicts: list[dict[str, Any]] | None,
        tool_parser: Callable[[TokenizerLike], ToolParser] | None,
        chat_template: str | None,
        chat_template_content_format: ChatTemplateContentFormatOption,
    ):
        new_messages = construct_input_messages(
            request_input=messages,
        )

        _, engine_prompts = await self._preprocess_chat(
            request,
            new_messages,
            default_template=chat_template,
            default_template_content_format=chat_template_content_format,
            default_template_kwargs=None,
            tool_dicts=tool_dicts,
            tool_parser=tool_parser,
        )
        return engine_prompts

    async def _generate_with_builtin_tools(
        self,
        request_id: str,
        engine_prompt: TokPrompt,
        sampling_params: SamplingParams,
        tok_params: TokenizeParams,
        context: ConversationContext,
        lora_request: LoRARequest | None = None,
        priority: int = 0,
        trace_headers: Mapping[str, str] | None = None,
    ):
        prompt_text = self._extract_prompt_text(engine_prompt)

        orig_priority = priority
        sub_request = 0
        while True:
            # Ensure that each sub-request has a unique request id.
            sub_request_id = f"{request_id}_{sub_request}"

            self._log_inputs(
                sub_request_id,
                engine_prompt,
                params=sampling_params,
                lora_request=lora_request,
            )

            tokenization_kwargs = tok_params.get_encode_kwargs()
            engine_request = self.input_processor.process_inputs(
                sub_request_id,
                engine_prompt,
                sampling_params,
                lora_request=lora_request,
                tokenization_kwargs=tokenization_kwargs,
                trace_headers=trace_headers,
                priority=priority,
            )

            generator = self.engine_client.generate(
                engine_request,
                sampling_params,
                sub_request_id,
                lora_request=lora_request,
                trace_headers=trace_headers,
                priority=priority,
                prompt_text=prompt_text,
                tokenization_kwargs=tokenization_kwargs,
            )

            async for res in generator:
                context.append_output(res)
                # NOTE(woosuk): The stop condition is handled by the engine.
                yield context

            if not context.need_builtin_tool_call():
                # The model did not ask for a tool call, so we're done.
                break

            # Call the tool and update the context with the result.
            tool_output = await context.call_tool()
            context.append_tool_output(tool_output)

            # TODO: uncomment this and enable tool output streaming
            # yield context

            # Create inputs for the next turn.
            # Render the next prompt token ids and update sampling_params.
            if isinstance(context, (HarmonyContext, StreamingHarmonyContext)):
                token_ids = context.render_for_completion()
                engine_prompt = TokensPrompt(prompt_token_ids=token_ids)

                sampling_params.max_tokens = self.max_model_len - len(token_ids)
            elif isinstance(context, ParsableContext):
                engine_prompts = await self._render_next_turn(
                    context.request,
                    context.parser.response_messages,
                    context.tool_dicts,
                    context.tool_parser_cls,
                    context.chat_template,
                    context.chat_template_content_format,
                )
                engine_prompt = engine_prompts[0]
                prompt_text = self._extract_prompt_text(engine_prompt)

                sampling_params.max_tokens = get_max_tokens(
                    self.max_model_len,
                    context.request.max_output_tokens,
                    self._extract_prompt_len(engine_prompt),
                    self.default_sampling_params,  # type: ignore
                )

            # OPTIMIZATION
            priority = orig_priority - 1
            sub_request += 1

    def _log_inputs(
        self,
        request_id: str,
        inputs: PromptType | TokPrompt,
        params: SamplingParams | PoolingParams | BeamSearchParams | None,
        lora_request: LoRARequest | None,
    ) -> None:
        if self.request_logger is None:
            return

        components = self._extract_prompt_components(inputs)

        self.request_logger.log_inputs(
            request_id,
            components.text,
            components.token_ids,
            components.embeds,
            params=params,
            lora_request=lora_request,
        )

    async def _get_trace_headers(
        self,
        headers: Headers,
    ) -> Mapping[str, str] | None:
        is_tracing_enabled = await self.engine_client.is_tracing_enabled()

        if is_tracing_enabled:
            return extract_trace_headers(headers)

        if contains_trace_headers(headers):
            log_tracing_disabled_warning()

        return None

    @staticmethod
    def _base_request_id(
        raw_request: Request | None, default: str | None = None
    ) -> str | None:
        """Pulls the request id to use from a header, if provided"""
        if raw_request is not None and (
            (req_id := raw_request.headers.get("X-Request-Id")) is not None
        ):
            return req_id

        return random_uuid() if default is None else default

    @staticmethod
    def _get_data_parallel_rank(raw_request: Request | None) -> int | None:
        """Pulls the data parallel rank from a header, if provided"""
        if raw_request is None:
            return None

        rank_str = raw_request.headers.get("X-data-parallel-rank")
        if rank_str is None:
            return None

        try:
            return int(rank_str)
        except ValueError:
            return None

    @staticmethod
    def _parse_tool_calls_from_content(
        request: ResponsesRequest | ChatCompletionRequest,
        tokenizer: TokenizerLike | None,
        enable_auto_tools: bool,
        tool_parser_cls: Callable[[TokenizerLike], ToolParser] | None,
        content: str | None = None,
    ) -> tuple[list[FunctionCall] | None, str | None]:
        function_calls = list[FunctionCall]()
        if request.tool_choice and isinstance(request.tool_choice, ToolChoiceFunction):
            assert content is not None
            # Forced Function Call
            function_calls.append(
                FunctionCall(name=request.tool_choice.name, arguments=content)
            )
            content = None  # Clear content since tool is called.
        elif request.tool_choice and isinstance(
            request.tool_choice, ChatCompletionNamedToolChoiceParam
        ):
            assert content is not None
            # Forced Function Call
            function_calls.append(
                FunctionCall(name=request.tool_choice.function.name, arguments=content)
            )
            content = None  # Clear content since tool is called.
        elif request.tool_choice == "required":
            assert content is not None
            tool_calls = TypeAdapter(list[FunctionDefinition]).validate_json(content)
            function_calls.extend(
                [
                    FunctionCall(
                        name=tool_call.name,
                        arguments=json.dumps(tool_call.parameters, ensure_ascii=False),
                    )
                    for tool_call in tool_calls
                ]
            )
            content = None  # Clear content since tool is called.
        elif (
            tool_parser_cls
            and enable_auto_tools
            and (request.tool_choice == "auto" or request.tool_choice is None)
        ):
            if tokenizer is None:
                raise ValueError(
                    "Tokenizer not available when `skip_tokenizer_init=True`"
                )

            # Automatic Tool Call Parsing
            try:
                tool_parser = tool_parser_cls(tokenizer)
            except RuntimeError as e:
                logger.exception("Error in tool parser creation.")
                raise e
            tool_call_info = tool_parser.extract_tool_calls(
                content if content is not None else "",
                request=request,  # type: ignore
            )
            if tool_call_info is not None and tool_call_info.tools_called:
                # extract_tool_calls() returns a list of tool calls.
                function_calls.extend(
                    FunctionCall(
                        id=tool_call.id,
                        name=tool_call.function.name,
                        arguments=tool_call.function.arguments,
                    )
                    for tool_call in tool_call_info.tool_calls
                )
                content = tool_call_info.content
                if content and content.strip() == "":
                    content = None
            else:
                # No tool calls.
                return None, content

        return function_calls, content

    @staticmethod
    def _get_decoded_token(
        logprob: Logprob,
        token_id: int,
        tokenizer: TokenizerLike | None,
        return_as_token_id: bool = False,
    ) -> str:
        if return_as_token_id:
            return f"token_id:{token_id}"

        if logprob.decoded_token is not None:
            return logprob.decoded_token

        if tokenizer is None:
            raise ValueError(
                "Unable to get tokenizer because `skip_tokenizer_init=True`"
            )

        return tokenizer.decode([token_id])

    def _is_model_supported(self, model_name: str | None) -> bool:
        if not model_name:
            return True
        return self.models.is_base_model(model_name)

_base_request_id staticmethod

_base_request_id(
    raw_request: Request | None, default: str | None = None
) -> str | None

Pulls the request id to use from a header, if provided

Source code in vllm/entrypoints/openai/engine/serving.py
@staticmethod
def _base_request_id(
    raw_request: Request | None, default: str | None = None
) -> str | None:
    """Pulls the request id to use from a header, if provided"""
    if raw_request is not None and (
        (req_id := raw_request.headers.get("X-Request-Id")) is not None
    ):
        return req_id

    return random_uuid() if default is None else default

_build_response

_build_response(
    ctx: ServeContext,
) -> AnyResponse | ErrorResponse

Default response builder. Subclass may override this method to return the appropriate response object.

Source code in vllm/entrypoints/openai/engine/serving.py
def _build_response(
    self,
    ctx: ServeContext,
) -> AnyResponse | ErrorResponse:
    """
    Default response builder. Subclass may override this method
    to return the appropriate response object.
    """
    return self.create_error_response("unimplemented endpoint")

_collect_batch async

_collect_batch(ctx: ServeContext) -> ErrorResponse | None

Collect batch results from the result generator.

Source code in vllm/entrypoints/openai/engine/serving.py
async def _collect_batch(
    self,
    ctx: ServeContext,
) -> ErrorResponse | None:
    """Collect batch results from the result generator."""
    try:
        if ctx.engine_prompts is None:
            return self.create_error_response("Engine prompts not available")

        num_prompts = len(ctx.engine_prompts)
        final_res_batch: list[PoolingRequestOutput | None]
        final_res_batch = [None] * num_prompts

        if ctx.result_generator is None:
            return self.create_error_response("Result generator not available")

        async for i, res in ctx.result_generator:
            final_res_batch[i] = res

        if None in final_res_batch:
            return self.create_error_response(
                "Failed to generate results for all prompts"
            )

        ctx.final_res_batch = [res for res in final_res_batch if res is not None]

        return None

    except Exception as e:
        return self.create_error_response(e)

_convert_generation_error_to_response

_convert_generation_error_to_response(
    e: GenerationError,
) -> ErrorResponse

Convert GenerationError to ErrorResponse.

Source code in vllm/entrypoints/openai/engine/serving.py
def _convert_generation_error_to_response(
    self, e: GenerationError
) -> ErrorResponse:
    """Convert GenerationError to ErrorResponse."""
    return self.create_error_response(
        str(e),
        err_type="InternalServerError",
        status_code=e.status_code,
    )

_convert_generation_error_to_streaming_response

_convert_generation_error_to_streaming_response(
    e: GenerationError,
) -> str

Convert GenerationError to streaming error response.

Source code in vllm/entrypoints/openai/engine/serving.py
def _convert_generation_error_to_streaming_response(
    self, e: GenerationError
) -> str:
    """Convert GenerationError to streaming error response."""
    return self.create_streaming_error_response(
        str(e),
        err_type="InternalServerError",
        status_code=e.status_code,
    )

_get_active_default_mm_loras

_get_active_default_mm_loras(
    request: AnyRequest,
) -> LoRARequest | None

Determine if there are any active default multimodal loras.

Source code in vllm/entrypoints/openai/engine/serving.py
def _get_active_default_mm_loras(self, request: AnyRequest) -> LoRARequest | None:
    """Determine if there are any active default multimodal loras."""
    # TODO: Currently this is only enabled for chat completions
    # to be better aligned with only being enabled for .generate
    # when run offline. It would be nice to support additional
    # tasks types in the future.
    message_types = self._get_message_types(request)
    default_mm_loras = set()

    for lora in self.models.lora_requests.values():
        # Best effort match for default multimodal lora adapters;
        # There is probably a better way to do this, but currently
        # this matches against the set of 'types' in any content lists
        # up until '_', e.g., to match audio_url -> audio
        if lora.lora_name in message_types:
            default_mm_loras.add(lora)

    # Currently only support default modality specific loras if
    # we have exactly one lora matched on the request.
    if len(default_mm_loras) == 1:
        return default_mm_loras.pop()
    return None

_get_data_parallel_rank staticmethod

_get_data_parallel_rank(
    raw_request: Request | None,
) -> int | None

Pulls the data parallel rank from a header, if provided

Source code in vllm/entrypoints/openai/engine/serving.py
@staticmethod
def _get_data_parallel_rank(raw_request: Request | None) -> int | None:
    """Pulls the data parallel rank from a header, if provided"""
    if raw_request is None:
        return None

    rank_str = raw_request.headers.get("X-data-parallel-rank")
    if rank_str is None:
        return None

    try:
        return int(rank_str)
    except ValueError:
        return None

_get_message_types

_get_message_types(request: AnyRequest) -> set[str]

Retrieve the set of types from message content dicts up until _; we use this to match potential multimodal data with default per modality loras.

Source code in vllm/entrypoints/openai/engine/serving.py
def _get_message_types(self, request: AnyRequest) -> set[str]:
    """Retrieve the set of types from message content dicts up
    until `_`; we use this to match potential multimodal data
    with default per modality loras.
    """
    message_types: set[str] = set()

    if not hasattr(request, "messages"):
        return message_types

    messages = request.messages
    if messages is None or isinstance(messages, (str, bytes)):
        return message_types

    for message in messages:
        if (
            isinstance(message, dict)
            and "content" in message
            and isinstance(message["content"], list)
        ):
            for content_dict in message["content"]:
                if "type" in content_dict:
                    message_types.add(content_dict["type"].split("_")[0])
    return message_types

_pipeline async

_pipeline(
    ctx: ServeContext,
) -> AsyncGenerator[AnyResponse | ErrorResponse, None]

Execute the request processing pipeline yielding responses.

Source code in vllm/entrypoints/openai/engine/serving.py
async def _pipeline(
    self,
    ctx: ServeContext,
) -> AsyncGenerator[AnyResponse | ErrorResponse, None]:
    """Execute the request processing pipeline yielding responses."""
    if error := await self._check_model(ctx.request):
        yield error
    if error := self._validate_request(ctx):
        yield error

    preprocess_ret = await self._preprocess(ctx)
    if isinstance(preprocess_ret, ErrorResponse):
        yield preprocess_ret

    generators_ret = await self._prepare_generators(ctx)
    if isinstance(generators_ret, ErrorResponse):
        yield generators_ret

    collect_ret = await self._collect_batch(ctx)
    if isinstance(collect_ret, ErrorResponse):
        yield collect_ret

    yield self._build_response(ctx)

_prepare_extra_chat_template_kwargs staticmethod

_prepare_extra_chat_template_kwargs(
    request_chat_template_kwargs: dict[str, Any]
    | None = None,
    default_chat_template_kwargs: dict[str, Any]
    | None = None,
) -> dict[str, Any]

Helper to merge server-default and request-specific chat template kwargs.

Source code in vllm/entrypoints/openai/engine/serving.py
@staticmethod
def _prepare_extra_chat_template_kwargs(
    request_chat_template_kwargs: dict[str, Any] | None = None,
    default_chat_template_kwargs: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """Helper to merge server-default and request-specific chat template kwargs."""
    request_chat_template_kwargs = request_chat_template_kwargs or {}
    if default_chat_template_kwargs is None:
        return request_chat_template_kwargs
    # Apply server defaults first, then request kwargs override.
    return default_chat_template_kwargs | request_chat_template_kwargs

_prepare_generators async

_prepare_generators(
    ctx: ServeContext,
) -> ErrorResponse | None

Schedule the request and get the result generator.

Source code in vllm/entrypoints/openai/engine/serving.py
async def _prepare_generators(
    self,
    ctx: ServeContext,
) -> ErrorResponse | None:
    """Schedule the request and get the result generator."""
    generators: list[AsyncGenerator[PoolingRequestOutput, None]] = []

    try:
        trace_headers = (
            None
            if ctx.raw_request is None
            else await self._get_trace_headers(ctx.raw_request.headers)
        )

        pooling_params = self._create_pooling_params(ctx)
        if isinstance(pooling_params, ErrorResponse):
            return pooling_params

        if ctx.engine_prompts is None:
            return self.create_error_response("Engine prompts not available")

        for i, engine_prompt in enumerate(ctx.engine_prompts):
            request_id_item = f"{ctx.request_id}-{i}"

            self._log_inputs(
                request_id_item,
                engine_prompt,
                params=pooling_params,
                lora_request=ctx.lora_request,
            )

            generator = self.engine_client.encode(
                engine_prompt,
                pooling_params,
                request_id_item,
                lora_request=ctx.lora_request,
                trace_headers=trace_headers,
                priority=getattr(ctx.request, "priority", 0),
            )

            generators.append(generator)

        ctx.result_generator = merge_async_iterators(*generators)

        return None

    except Exception as e:
        return self.create_error_response(e)

_preprocess async

_preprocess(ctx: ServeContext) -> ErrorResponse | None

Default preprocessing hook. Subclasses may override to prepare ctx (classification, embedding, etc.).

Source code in vllm/entrypoints/openai/engine/serving.py
async def _preprocess(
    self,
    ctx: ServeContext,
) -> ErrorResponse | None:
    """
    Default preprocessing hook. Subclasses may override
    to prepare `ctx` (classification, embedding, etc.).
    """
    return None

_raise_if_error

_raise_if_error(
    finish_reason: str | None, request_id: str
) -> None

Raise GenerationError if finish_reason indicates an error.

Source code in vllm/entrypoints/openai/engine/serving.py
def _raise_if_error(self, finish_reason: str | None, request_id: str) -> None:
    """Raise GenerationError if finish_reason indicates an error."""
    if finish_reason == "error":
        logger.error(
            "Request %s failed with an internal error during generation",
            request_id,
        )
        raise GenerationError("Internal server error")

OpenAIServingModels

Shared instance to hold data about the loaded base model(s) and adapters.

Handles the routes: - /v1/models - /v1/load_lora_adapter - /v1/unload_lora_adapter

Source code in vllm/entrypoints/openai/models/serving.py
class OpenAIServingModels:
    """Shared instance to hold data about the loaded base model(s) and adapters.

    Handles the routes:
    - /v1/models
    - /v1/load_lora_adapter
    - /v1/unload_lora_adapter
    """

    def __init__(
        self,
        engine_client: EngineClient,
        base_model_paths: list[BaseModelPath],
        *,
        lora_modules: list[LoRAModulePath] | None = None,
    ):
        super().__init__()

        self.engine_client = engine_client
        self.base_model_paths = base_model_paths

        self.static_lora_modules = lora_modules
        self.lora_requests: dict[str, LoRARequest] = {}
        self.lora_id_counter = AtomicCounter(0)

        self.lora_resolvers: list[LoRAResolver] = []
        for lora_resolver_name in LoRAResolverRegistry.get_supported_resolvers():
            self.lora_resolvers.append(
                LoRAResolverRegistry.get_resolver(lora_resolver_name)
            )
        self.lora_resolver_lock: dict[str, Lock] = defaultdict(Lock)

        self.input_processor = self.engine_client.input_processor
        self.io_processor = self.engine_client.io_processor
        self.renderer = self.engine_client.renderer
        self.model_config = self.engine_client.model_config
        self.max_model_len = self.model_config.max_model_len

    async def init_static_loras(self):
        """Loads all static LoRA modules.
        Raises if any fail to load"""
        if self.static_lora_modules is None:
            return
        for lora in self.static_lora_modules:
            load_request = LoadLoRAAdapterRequest(
                lora_path=lora.path, lora_name=lora.name
            )
            load_result = await self.load_lora_adapter(
                request=load_request, base_model_name=lora.base_model_name
            )
            if isinstance(load_result, ErrorResponse):
                raise ValueError(load_result.error.message)

    def is_base_model(self, model_name) -> bool:
        return any(model.name == model_name for model in self.base_model_paths)

    def model_name(self, lora_request: LoRARequest | None = None) -> str:
        """Returns the appropriate model name depending on the availability
        and support of the LoRA or base model.
        Parameters:
        - lora: LoRARequest that contain a base_model_name.
        Returns:
        - str: The name of the base model or the first available model path.
        """
        if lora_request is not None:
            return lora_request.lora_name
        return self.base_model_paths[0].name

    async def show_available_models(self) -> ModelList:
        """Show available models. This includes the base model and all
        adapters"""
        model_cards = [
            ModelCard(
                id=base_model.name,
                max_model_len=self.max_model_len,
                root=base_model.model_path,
                permission=[ModelPermission()],
            )
            for base_model in self.base_model_paths
        ]
        lora_cards = [
            ModelCard(
                id=lora.lora_name,
                root=lora.path,
                parent=lora.base_model_name
                if lora.base_model_name
                else self.base_model_paths[0].name,
                permission=[ModelPermission()],
            )
            for lora in self.lora_requests.values()
        ]
        model_cards.extend(lora_cards)
        return ModelList(data=model_cards)

    async def load_lora_adapter(
        self, request: LoadLoRAAdapterRequest, base_model_name: str | None = None
    ) -> ErrorResponse | str:
        lora_name = request.lora_name

        # Ensure atomicity based on the lora name
        async with self.lora_resolver_lock[lora_name]:
            error_check_ret = await self._check_load_lora_adapter_request(request)
            if error_check_ret is not None:
                return error_check_ret

            lora_path = request.lora_path
            lora_int_id = (
                self.lora_requests[lora_name].lora_int_id
                if lora_name in self.lora_requests
                else self.lora_id_counter.inc(1)
            )
            lora_request = LoRARequest(
                lora_name=lora_name,
                lora_int_id=lora_int_id,
                lora_path=lora_path,
                load_inplace=request.load_inplace,
            )
            if base_model_name is not None and self.is_base_model(base_model_name):
                lora_request.base_model_name = base_model_name

            # Validate that the adapter can be loaded into the engine
            # This will also preload it for incoming requests
            try:
                await self.engine_client.add_lora(lora_request)
            except Exception as e:
                error_type = "BadRequestError"
                status_code = HTTPStatus.BAD_REQUEST
                if "No adapter found" in str(e):
                    error_type = "NotFoundError"
                    status_code = HTTPStatus.NOT_FOUND

                return create_error_response(
                    message=str(e), err_type=error_type, status_code=status_code
                )

            self.lora_requests[lora_name] = lora_request
            logger.info(
                "Loaded new LoRA adapter: name '%s', path '%s'", lora_name, lora_path
            )
            return f"Success: LoRA adapter '{lora_name}' added successfully."

    async def unload_lora_adapter(
        self, request: UnloadLoRAAdapterRequest
    ) -> ErrorResponse | str:
        lora_name = request.lora_name

        # Ensure atomicity based on the lora name
        async with self.lora_resolver_lock[lora_name]:
            error_check_ret = await self._check_unload_lora_adapter_request(request)
            if error_check_ret is not None:
                return error_check_ret

            # Safe to delete now since we hold the lock
            del self.lora_requests[lora_name]
            logger.info("Removed LoRA adapter: name '%s'", lora_name)
            return f"Success: LoRA adapter '{lora_name}' removed successfully."

    async def _check_load_lora_adapter_request(
        self, request: LoadLoRAAdapterRequest
    ) -> ErrorResponse | None:
        # Check if both 'lora_name' and 'lora_path' are provided
        if not request.lora_name or not request.lora_path:
            return create_error_response(
                message="Both 'lora_name' and 'lora_path' must be provided.",
                err_type="InvalidUserInput",
                status_code=HTTPStatus.BAD_REQUEST,
            )

        # If not loading inplace
        # Check if the lora adapter with the given name already exists
        if not request.load_inplace and request.lora_name in self.lora_requests:
            return create_error_response(
                message=f"The lora adapter '{request.lora_name}' has already been "
                "loaded. If you want to load the adapter in place, set 'load_inplace'"
                " to True.",
                err_type="InvalidUserInput",
                status_code=HTTPStatus.BAD_REQUEST,
            )

        return None

    async def _check_unload_lora_adapter_request(
        self, request: UnloadLoRAAdapterRequest
    ) -> ErrorResponse | None:
        # Check if 'lora_name' is not provided return an error
        if not request.lora_name:
            return create_error_response(
                message="'lora_name' needs to be provided to unload a LoRA adapter.",
                err_type="InvalidUserInput",
                status_code=HTTPStatus.BAD_REQUEST,
            )

        # Check if the lora adapter with the given name exists
        if request.lora_name not in self.lora_requests:
            return create_error_response(
                message=f"The lora adapter '{request.lora_name}' cannot be found.",
                err_type="NotFoundError",
                status_code=HTTPStatus.NOT_FOUND,
            )

        return None

    async def resolve_lora(self, lora_name: str) -> LoRARequest | ErrorResponse:
        """Attempt to resolve a LoRA adapter using available resolvers.

        Args:
            lora_name: Name/identifier of the LoRA adapter

        Returns:
            LoRARequest if found and loaded successfully.
            ErrorResponse (404) if no resolver finds the adapter.
            ErrorResponse (400) if adapter(s) are found but none load.
        """
        async with self.lora_resolver_lock[lora_name]:
            # First check if this LoRA is already loaded
            if lora_name in self.lora_requests:
                return self.lora_requests[lora_name]

            base_model_name = self.model_config.model
            unique_id = self.lora_id_counter.inc(1)
            found_adapter = False

            # Try to resolve using available resolvers
            for resolver in self.lora_resolvers:
                lora_request = await resolver.resolve_lora(base_model_name, lora_name)

                if lora_request is not None:
                    found_adapter = True
                    lora_request.lora_int_id = unique_id

                    try:
                        await self.engine_client.add_lora(lora_request)
                        self.lora_requests[lora_name] = lora_request
                        logger.info(
                            "Resolved and loaded LoRA adapter '%s' using %s",
                            lora_name,
                            resolver.__class__.__name__,
                        )
                        return lora_request
                    except BaseException as e:
                        logger.warning(
                            "Failed to load LoRA '%s' resolved by %s: %s. "
                            "Trying next resolver.",
                            lora_name,
                            resolver.__class__.__name__,
                            e,
                        )
                        continue

            if found_adapter:
                # An adapter was found, but all attempts to load it failed.
                return create_error_response(
                    message=(
                        f"LoRA adapter '{lora_name}' was found but could not be loaded."
                    ),
                    err_type="BadRequestError",
                    status_code=HTTPStatus.BAD_REQUEST,
                )
            else:
                # No adapter was found
                return create_error_response(
                    message=f"LoRA adapter {lora_name} does not exist",
                    err_type="NotFoundError",
                    status_code=HTTPStatus.NOT_FOUND,
                )

init_static_loras async

init_static_loras()

Loads all static LoRA modules. Raises if any fail to load

Source code in vllm/entrypoints/openai/models/serving.py
async def init_static_loras(self):
    """Loads all static LoRA modules.
    Raises if any fail to load"""
    if self.static_lora_modules is None:
        return
    for lora in self.static_lora_modules:
        load_request = LoadLoRAAdapterRequest(
            lora_path=lora.path, lora_name=lora.name
        )
        load_result = await self.load_lora_adapter(
            request=load_request, base_model_name=lora.base_model_name
        )
        if isinstance(load_result, ErrorResponse):
            raise ValueError(load_result.error.message)

model_name

model_name(lora_request: LoRARequest | None = None) -> str

Returns the appropriate model name depending on the availability and support of the LoRA or base model. Parameters: - lora: LoRARequest that contain a base_model_name. Returns: - str: The name of the base model or the first available model path.

Source code in vllm/entrypoints/openai/models/serving.py
def model_name(self, lora_request: LoRARequest | None = None) -> str:
    """Returns the appropriate model name depending on the availability
    and support of the LoRA or base model.
    Parameters:
    - lora: LoRARequest that contain a base_model_name.
    Returns:
    - str: The name of the base model or the first available model path.
    """
    if lora_request is not None:
        return lora_request.lora_name
    return self.base_model_paths[0].name

resolve_lora async

resolve_lora(lora_name: str) -> LoRARequest | ErrorResponse

Attempt to resolve a LoRA adapter using available resolvers.

Parameters:

Name Type Description Default
lora_name str

Name/identifier of the LoRA adapter

required

Returns:

Type Description
LoRARequest | ErrorResponse

LoRARequest if found and loaded successfully.

LoRARequest | ErrorResponse

ErrorResponse (404) if no resolver finds the adapter.

LoRARequest | ErrorResponse

ErrorResponse (400) if adapter(s) are found but none load.

Source code in vllm/entrypoints/openai/models/serving.py
async def resolve_lora(self, lora_name: str) -> LoRARequest | ErrorResponse:
    """Attempt to resolve a LoRA adapter using available resolvers.

    Args:
        lora_name: Name/identifier of the LoRA adapter

    Returns:
        LoRARequest if found and loaded successfully.
        ErrorResponse (404) if no resolver finds the adapter.
        ErrorResponse (400) if adapter(s) are found but none load.
    """
    async with self.lora_resolver_lock[lora_name]:
        # First check if this LoRA is already loaded
        if lora_name in self.lora_requests:
            return self.lora_requests[lora_name]

        base_model_name = self.model_config.model
        unique_id = self.lora_id_counter.inc(1)
        found_adapter = False

        # Try to resolve using available resolvers
        for resolver in self.lora_resolvers:
            lora_request = await resolver.resolve_lora(base_model_name, lora_name)

            if lora_request is not None:
                found_adapter = True
                lora_request.lora_int_id = unique_id

                try:
                    await self.engine_client.add_lora(lora_request)
                    self.lora_requests[lora_name] = lora_request
                    logger.info(
                        "Resolved and loaded LoRA adapter '%s' using %s",
                        lora_name,
                        resolver.__class__.__name__,
                    )
                    return lora_request
                except BaseException as e:
                    logger.warning(
                        "Failed to load LoRA '%s' resolved by %s: %s. "
                        "Trying next resolver.",
                        lora_name,
                        resolver.__class__.__name__,
                        e,
                    )
                    continue

        if found_adapter:
            # An adapter was found, but all attempts to load it failed.
            return create_error_response(
                message=(
                    f"LoRA adapter '{lora_name}' was found but could not be loaded."
                ),
                err_type="BadRequestError",
                status_code=HTTPStatus.BAD_REQUEST,
            )
        else:
            # No adapter was found
            return create_error_response(
                message=f"LoRA adapter {lora_name} does not exist",
                err_type="NotFoundError",
                status_code=HTTPStatus.NOT_FOUND,
            )

show_available_models async

show_available_models() -> ModelList

Show available models. This includes the base model and all adapters

Source code in vllm/entrypoints/openai/models/serving.py
async def show_available_models(self) -> ModelList:
    """Show available models. This includes the base model and all
    adapters"""
    model_cards = [
        ModelCard(
            id=base_model.name,
            max_model_len=self.max_model_len,
            root=base_model.model_path,
            permission=[ModelPermission()],
        )
        for base_model in self.base_model_paths
    ]
    lora_cards = [
        ModelCard(
            id=lora.lora_name,
            root=lora.path,
            parent=lora.base_model_name
            if lora.base_model_name
            else self.base_model_paths[0].name,
            permission=[ModelPermission()],
        )
        for lora in self.lora_requests.values()
    ]
    model_cards.extend(lora_cards)
    return ModelList(data=model_cards)

OpenAISpeechToText

Bases: OpenAIServing

Base class for speech-to-text operations like transcription and translation.

Source code in vllm/entrypoints/openai/speech_to_text/speech_to_text.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
class OpenAISpeechToText(OpenAIServing):
    """Base class for speech-to-text operations like transcription and
    translation."""

    def __init__(
        self,
        engine_client: EngineClient,
        models: OpenAIServingModels,
        *,
        request_logger: RequestLogger | None,
        return_tokens_as_token_ids: bool = False,
        task_type: Literal["transcribe", "translate"] = "transcribe",
        log_error_stack: bool = False,
        enable_force_include_usage: bool = False,
    ):
        super().__init__(
            engine_client=engine_client,
            models=models,
            request_logger=request_logger,
            return_tokens_as_token_ids=return_tokens_as_token_ids,
            log_error_stack=log_error_stack,
        )

        self.default_sampling_params = self.model_config.get_diff_sampling_param()
        self.task_type: Final = task_type

        self.asr_config = self.model_cls.get_speech_to_text_config(
            self.model_config, task_type
        )

        self.enable_force_include_usage = enable_force_include_usage

        self.max_audio_filesize_mb = envs.VLLM_MAX_AUDIO_CLIP_FILESIZE_MB
        if self.model_cls.supports_segment_timestamp:
            self.tokenizer = cast(
                PreTrainedTokenizerBase,
                get_tokenizer(
                    tokenizer_name=self.model_config.tokenizer,
                    tokenizer_mode=self.model_config.tokenizer_mode,
                ),
            )

        if self.default_sampling_params:
            logger.info(
                "Overwriting default completion sampling param with: %s",
                self.default_sampling_params,
            )

        # Warm up audio preprocessing to avoid first-request latency
        self._warmup_audio_preprocessing()
        # Warm up input processor with dummy audio
        self._warmup_input_processor()

    def _warmup_audio_preprocessing(self) -> None:
        """Warm up audio processing libraries to avoid first-request latency.

        The first call to librosa functions (load, get_duration, mel-spectrogram)
        triggers JIT compilation and library initialization which can take ~7s.
        This method warms up these operations during server initialization.
        """
        # Skip warmup if librosa is not installed (optional dependency)
        if isinstance(librosa, PlaceholderModule):
            return

        # Skip warmup if model doesn't support transcription
        if not supports_transcription(self.model_cls):
            return

        if getattr(self.model_cls, "skip_warmup_audio_preprocessing", False):
            return

        try:
            warmup_start = time.perf_counter()
            logger.info("Warming up audio preprocessing libraries...")

            # Create a minimal dummy audio (1 second of silence at target sample rate)
            dummy_audio = np.zeros(int(self.asr_config.sample_rate), dtype=np.float32)

            # Warm up librosa.load by using librosa functions on the dummy data
            # This initializes FFTW, numba JIT, and other audio processing libraries
            _ = librosa.get_duration(y=dummy_audio, sr=self.asr_config.sample_rate)

            # Warm up mel-spectrogram computation with model-specific parameters
            from vllm.transformers_utils.processor import cached_processor_from_config

            processor = cached_processor_from_config(self.model_config)
            feature_extractor = None
            if hasattr(processor, "feature_extractor"):
                feature_extractor = processor.feature_extractor
            elif hasattr(processor, "audio_processor"):
                # For models like GraniteSpeech that use audio_processor
                audio_proc = processor.audio_processor
                if hasattr(audio_proc, "feature_extractor"):
                    feature_extractor = audio_proc.feature_extractor
                # If audio_processor doesn't have feature_extractor,
                # skip mel-spectrogram warmup for these models

            if feature_extractor is not None:
                _ = librosa.feature.melspectrogram(
                    y=dummy_audio,
                    sr=self.asr_config.sample_rate,
                    n_mels=getattr(feature_extractor, "n_mels", 128),
                    n_fft=getattr(feature_extractor, "n_fft", 400),
                    hop_length=getattr(feature_extractor, "hop_length", 160),
                )

            warmup_elapsed = time.perf_counter() - warmup_start
            logger.info("Audio preprocessing warmup completed in %.2fs", warmup_elapsed)
        except Exception:
            # Don't fail initialization if warmup fails - log exception and continue
            logger.exception(
                "Audio preprocessing warmup failed (non-fatal): %s. "
                "First request may experience higher latency.",
            )

    def _warmup_input_processor(self) -> None:
        """Warm up input processor with dummy audio to avoid first-request latency.

        The first call to input_processor.process_inputs() with multimodal audio
        triggers multimodal processing initialization which can take ~2.5s.
        This method processes a dummy audio request to warm up the pipeline.
        """
        # Skip warmup if model doesn't support transcription
        if not supports_transcription(self.model_cls):
            return

        # Only warm up if model supports transcription methods
        if not hasattr(self.model_cls, "get_generation_prompt"):
            return

        try:
            from vllm.sampling_params import SamplingParams

            warmup_start = time.perf_counter()
            logger.info("Warming up multimodal input processor...")

            # Create minimal dummy audio (1 second of silence)
            dummy_audio = np.zeros(int(self.asr_config.sample_rate), dtype=np.float32)

            # Use the same method that _preprocess_speech_to_text uses
            # to create the prompt
            dummy_prompt = self.model_cls.get_generation_prompt(
                audio=dummy_audio,
                stt_config=self.asr_config,
                model_config=self.model_config,
                language="en",
                task_type=self.task_type,
                request_prompt="",
                to_language=None,
            )

            # Create minimal sampling params
            dummy_params = SamplingParams(
                max_tokens=1,
                temperature=0.0,
                skip_clone=True,  # Internal warmup, safe to skip clone
            )

            # Process the dummy input through the input processor
            # This will trigger all the multimodal processing initialization
            _ = self.input_processor.process_inputs(
                request_id="warmup",
                prompt=dummy_prompt,
                params=dummy_params,
            )

            warmup_elapsed = time.perf_counter() - warmup_start
            logger.info("Input processor warmup completed in %.2fs", warmup_elapsed)
        except Exception:
            # Don't fail initialization if warmup fails - log warning and continue
            logger.exception(
                "Input processor warmup failed (non-fatal): %s. "
                "First request may experience higher latency."
            )

    @cached_property
    def model_cls(self) -> type[SupportsTranscription]:
        from vllm.model_executor.model_loader import get_model_cls

        model_cls = get_model_cls(self.model_config)
        return cast(type[SupportsTranscription], model_cls)

    async def _preprocess_speech_to_text(
        self,
        request: SpeechToTextRequest,
        audio_data: bytes,
    ) -> tuple[list[PromptType], float]:
        # Validate request
        language = self.model_cls.validate_language(request.language)
        # Skip to_language validation to avoid extra logging for Whisper.
        to_language = (
            self.model_cls.validate_language(request.to_language)
            if request.to_language
            else None
        )

        if len(audio_data) / 1024**2 > self.max_audio_filesize_mb:
            raise VLLMValidationError(
                "Maximum file size exceeded",
                parameter="audio_filesize_mb",
                value=len(audio_data) / 1024**2,
            )

        with io.BytesIO(audio_data) as bytes_:
            # NOTE resample to model SR here for efficiency. This is also a
            # pre-requisite for chunking, as it assumes Whisper SR.
            y, sr = librosa.load(bytes_, sr=self.asr_config.sample_rate)

        duration = librosa.get_duration(y=y, sr=sr)
        do_split_audio = (
            self.asr_config.allow_audio_chunking
            and duration > self.asr_config.max_audio_clip_s
        )
        chunks = [y] if not do_split_audio else self._split_audio(y, int(sr))
        prompts = []
        for chunk in chunks:
            # The model has control over the construction, as long as it
            # returns a valid PromptType.
            prompt = self.model_cls.get_generation_prompt(
                audio=chunk,
                stt_config=self.asr_config,
                model_config=self.model_config,
                language=language,
                task_type=self.task_type,
                request_prompt=request.prompt,
                to_language=to_language,
            )
            if request.response_format == "verbose_json":
                prompt = self._preprocess_verbose_prompt(parse_enc_dec_prompt(prompt))

            prompts.append(prompt)

        return prompts, duration

    def _preprocess_verbose_prompt(self, prompt: EncoderDecoderDictPrompt):
        dec_prompt = prompt["decoder_prompt"]

        if not (isinstance(dec_prompt, dict) and "prompt" in dec_prompt):
            raise VLLMValidationError(
                "Expected decoder_prompt to contain text",
                parameter="decoder_prompt",
                value=type(dec_prompt).__name__,
            )

        dec_prompt["prompt"] = dec_prompt["prompt"].replace(
            "<|notimestamps|>", "<|0.00|>"
        )

        return prompt

    def _get_verbose_segments(
        self,
        tokens: tuple,
        log_probs: FlatLogprobs | list[dict[int, Logprob]],
        request: SpeechToTextRequest,
        segment_class: type[SpeechToTextSegment],
        start_time: float = 0,
    ) -> list[SpeechToTextSegment]:
        """
        Convert tokens to verbose segments.

        This method expects the model to produce
        timestamps as tokens (similar to Whisper).
        If the tokens do not include timestamp information,
        the segments may not be generated correctly.

        Note: No_speech_prob field is not supported
        in this implementation and will be None. See docs for details.
        """
        BASE_OFFSET = 0.02
        init_token = self.tokenizer.encode("<|0.00|>", add_special_tokens=False)[0]
        if tokens[-1] == self.tokenizer.eos_token_id:
            tokens = tokens[:-1]

        tokens_with_start = (init_token,) + tokens
        segments: list[SpeechToTextSegment] = []
        last_timestamp_start = 0

        if tokens_with_start[-2] < init_token and tokens_with_start[-1] >= init_token:
            tokens_with_start = tokens_with_start + (tokens_with_start[-1],)
        avg_logprob = 0.0
        for idx in range(1, len(tokens_with_start)):
            # Timestamp tokens (e.g., <|0.00|>) are assumed to be sorted.
            # If the ordering is violated, this slicing may produce incorrect results.
            token = tokens_with_start[idx]
            if token >= init_token and tokens_with_start[idx - 1] >= init_token:
                sliced_timestamp_tokens = tokens_with_start[last_timestamp_start:idx]
                start_timestamp = sliced_timestamp_tokens[0] - init_token
                end_timestamp = sliced_timestamp_tokens[-1] - init_token
                text = self.tokenizer.decode(sliced_timestamp_tokens[1:-1])
                text_bytes = text.encode("utf-8")

                casting_segment = cast(
                    SpeechToTextSegment,
                    segment_class(
                        id=len(segments),
                        seek=start_time,
                        start=start_time + BASE_OFFSET * start_timestamp,
                        end=start_time + BASE_OFFSET * end_timestamp,
                        temperature=request.temperature,
                        text=text,
                        # The compression ratio measures
                        # how compressible the generated text is.
                        # A higher ratio indicates more repetitive content,
                        # which is a strong sign of hallucination in outputs.
                        compression_ratio=len(text_bytes)
                        / len(zlib.compress(text_bytes)),
                        tokens=sliced_timestamp_tokens[1:-1],
                        avg_logprob=avg_logprob / (idx - last_timestamp_start),
                    ),
                )
                segments.append(casting_segment)
                last_timestamp_start = idx
                avg_logprob = 0
            else:
                avg_logprob += log_probs[idx - 1][token].logprob
        return segments

    async def _create_speech_to_text(
        self,
        audio_data: bytes,
        request: SpeechToTextRequest,
        raw_request: Request,
        response_class: type[ResponseType],
        stream_generator_method: Callable[..., AsyncGenerator[str, None]],
    ) -> T | V | AsyncGenerator[str, None] | ErrorResponse:
        """Base method for speech-to-text operations like transcription and
        translation."""
        error_check_ret = await self._check_model(request)
        if error_check_ret is not None:
            return error_check_ret

        # If the engine is dead, raise the engine's DEAD_ERROR.
        # This is required for the streaming case, where we return a
        # success status before we actually start generating text :).
        if self.engine_client.errored:
            raise self.engine_client.dead_error

        if request.response_format not in ["text", "json", "verbose_json"]:
            return self.create_error_response(
                "Currently only support response_format: "
                "`text`, `json` or `verbose_json`"
            )

        if (
            request.response_format == "verbose_json"
            and not self.model_cls.supports_segment_timestamp
        ):
            return self.create_error_response(
                f"Currently do not support verbose_json for {request.model}"
            )

        if request.response_format == "verbose_json" and request.stream:
            return self.create_error_response(
                "verbose_json format doesn't support streaming case"
            )
        request_id = f"{self.task_type}-{self._base_request_id(raw_request)}"

        request_metadata = RequestResponseMetadata(request_id=request_id)
        if raw_request:
            raw_request.state.request_metadata = request_metadata

        try:
            lora_request = self._maybe_get_adapters(request)

            prompts, duration_s = await self._preprocess_speech_to_text(
                request=request,
                audio_data=audio_data,
            )

        except ValueError as e:
            logger.exception("Error in preprocessing prompt inputs")
            return self.create_error_response(e)

        list_result_generator: list[AsyncGenerator[RequestOutput, None]] | None = None
        try:
            # Unlike most decoder-only models, whisper generation length is not
            # constrained by the size of the input audio, which is mapped to a
            # fixed-size log-mel-spectogram. Still, allow for fewer tokens to be
            # generated by respecting the extra completion tokens arg.
            if request.max_completion_tokens is None:
                default_max_tokens = self.model_config.max_model_len
            else:
                default_max_tokens = min(
                    self.model_config.max_model_len, request.max_completion_tokens
                )
            sampling_params = request.to_sampling_params(
                default_max_tokens, self.default_sampling_params
            )
            if request.response_format == "verbose_json":
                sampling_params.logprobs = 1

            self._log_inputs(
                request_id,
                # It will not display special tokens like <|startoftranscript|>
                request.prompt,
                params=sampling_params,
                lora_request=lora_request,
            )

            trace_headers = (
                None
                if raw_request is None
                else await self._get_trace_headers(raw_request.headers)
            )

            list_result_generator = []
            for i, prompt in enumerate(prompts):
                request_id_item = f"{request_id}_{i}"
                engine_request = self.input_processor.process_inputs(
                    request_id_item,
                    prompt,
                    sampling_params,
                    lora_request=lora_request,
                    trace_headers=trace_headers,
                    priority=0,
                )
                list_result_generator.append(
                    self.engine_client.generate(
                        engine_request,
                        sampling_params,
                        request_id_item,
                        lora_request=lora_request,
                    )
                )
        except ValueError as e:
            return self.create_error_response(e)

        if request.stream:
            return stream_generator_method(
                request, list_result_generator, request_id, request_metadata, duration_s
            )
        # Non-streaming response.
        total_segments = []
        text_parts = []
        try:
            assert list_result_generator is not None
            segments_types: dict[str, type[SpeechToTextSegment]] = {
                "transcribe": TranscriptionSegment,
                "translate": TranslationSegment,
            }
            segment_class: type[SpeechToTextSegment] = segments_types[self.task_type]
            text = ""
            chunk_size_in_s = self.asr_config.max_audio_clip_s
            if chunk_size_in_s is None:
                assert len(list_result_generator) == 1, (
                    "`max_audio_clip_s` is set to None, audio cannot be chunked"
                )
            for idx, result_generator in enumerate(list_result_generator):
                start_time = (
                    float(idx * chunk_size_in_s) if chunk_size_in_s is not None else 0.0
                )
                async for op in result_generator:
                    if request.response_format == "verbose_json":
                        assert op.outputs[0].logprobs
                        segments: list[SpeechToTextSegment] = (
                            self._get_verbose_segments(
                                tokens=tuple(op.outputs[0].token_ids),
                                segment_class=segment_class,
                                request=request,
                                start_time=start_time,
                                log_probs=op.outputs[0].logprobs,
                            )
                        )

                        total_segments.extend(segments)
                        text_parts.extend([seg.text for seg in segments])
                    else:
                        raw_text = op.outputs[0].text
                        text_parts.append(self.model_cls.post_process_output(raw_text))
            text = "".join(text_parts)
            if self.task_type == "transcribe":
                final_response: ResponseType
                # add usage in TranscriptionResponse.
                usage = {
                    "type": "duration",
                    # rounded up as per openAI specs
                    "seconds": int(math.ceil(duration_s)),
                }
                if request.response_format != "verbose_json":
                    final_response = cast(
                        T, TranscriptionResponse(text=text, usage=usage)
                    )
                else:
                    final_response = cast(
                        V,
                        TranscriptionResponseVerbose(
                            text=text,
                            language=request.language,
                            duration=str(duration_s),
                            segments=total_segments,
                        ),
                    )
            else:
                # no usage in response for translation task
                if request.response_format != "verbose_json":
                    final_response = cast(T, TranslationResponse(text=text))
                else:
                    final_response = cast(
                        V,
                        TranslationResponseVerbose(
                            text=text,
                            language=request.language,
                            duration=str(duration_s),
                            segments=total_segments,
                        ),
                    )
            return final_response
        except asyncio.CancelledError:
            return self.create_error_response("Client disconnected")
        except ValueError as e:
            return self.create_error_response(e)

    async def _speech_to_text_stream_generator(
        self,
        request: SpeechToTextRequest,
        list_result_generator: list[AsyncGenerator[RequestOutput, None]],
        request_id: str,
        request_metadata: RequestResponseMetadata,
        audio_duration_s: float,
        chunk_object_type: Literal["translation.chunk", "transcription.chunk"],
        response_stream_choice_class: type[TranscriptionResponseStreamChoice]
        | type[TranslationResponseStreamChoice],
        stream_response_class: type[TranscriptionStreamResponse]
        | type[TranslationStreamResponse],
    ) -> AsyncGenerator[str, None]:
        created_time = int(time.time())
        model_name = request.model

        completion_tokens = 0
        num_prompt_tokens = 0

        include_usage = self.enable_force_include_usage or request.stream_include_usage
        include_continuous_usage = (
            request.stream_continuous_usage_stats
            if include_usage and request.stream_continuous_usage_stats
            else False
        )

        try:
            for result_generator in list_result_generator:
                async for res in result_generator:
                    # On first result.
                    if res.prompt_token_ids is not None:
                        num_prompt_tokens = len(res.prompt_token_ids)
                        if audio_tokens := self.model_cls.get_num_audio_tokens(
                            audio_duration_s, self.asr_config, self.model_config
                        ):
                            num_prompt_tokens += audio_tokens

                    # We need to do it here, because if there are exceptions in
                    # the result_generator, it needs to be sent as the FIRST
                    # response (by the try...catch).

                    # Just one output (n=1) supported.
                    assert len(res.outputs) == 1
                    output = res.outputs[0]

                    # TODO: For models that output structured formats (e.g.,
                    # Qwen3-ASR with "language X<asr_text>" prefix), streaming
                    # would need buffering to strip the prefix properly since
                    # deltas may split the tag across chunks.
                    delta_message = DeltaMessage(content=output.text)
                    completion_tokens += len(output.token_ids)

                    if output.finish_reason is None:
                        # Still generating, send delta update.
                        choice_data = response_stream_choice_class(delta=delta_message)
                    else:
                        # Model is finished generating.
                        choice_data = response_stream_choice_class(
                            delta=delta_message,
                            finish_reason=output.finish_reason,
                            stop_reason=output.stop_reason,
                        )

                    chunk = stream_response_class(
                        id=request_id,
                        object=chunk_object_type,
                        created=created_time,
                        choices=[choice_data],
                        model=model_name,
                    )

                    # handle usage stats if requested & if continuous
                    if include_continuous_usage:
                        chunk.usage = UsageInfo(
                            prompt_tokens=num_prompt_tokens,
                            completion_tokens=completion_tokens,
                            total_tokens=num_prompt_tokens + completion_tokens,
                        )

                    data = chunk.model_dump_json(exclude_unset=True)
                    yield f"data: {data}\n\n"

            # Once the final token is handled, if stream_options.include_usage
            # is sent, send the usage.
            if include_usage:
                final_usage = UsageInfo(
                    prompt_tokens=num_prompt_tokens,
                    completion_tokens=completion_tokens,
                    total_tokens=num_prompt_tokens + completion_tokens,
                )

                final_usage_chunk = stream_response_class(
                    id=request_id,
                    object=chunk_object_type,
                    created=created_time,
                    choices=[],
                    model=model_name,
                    usage=final_usage,
                )
                final_usage_data = final_usage_chunk.model_dump_json(
                    exclude_unset=True, exclude_none=True
                )
                yield f"data: {final_usage_data}\n\n"

            # report to FastAPI middleware aggregate usage across all choices
            request_metadata.final_usage_info = UsageInfo(
                prompt_tokens=num_prompt_tokens,
                completion_tokens=completion_tokens,
                total_tokens=num_prompt_tokens + completion_tokens,
            )

        except Exception as e:
            logger.exception("Error in %s stream generator.", self.task_type)
            data = self.create_streaming_error_response(e)
            yield f"data: {data}\n\n"
        # Send the final done message after all response.n are finished
        yield "data: [DONE]\n\n"

    def _split_audio(
        self, audio_data: np.ndarray, sample_rate: int
    ) -> list[np.ndarray]:
        assert self.asr_config.max_audio_clip_s is not None, (
            f"{self.asr_config.max_audio_clip_s=} cannot be None to"
            " split audio into chunks."
        )
        chunk_size = sample_rate * self.asr_config.max_audio_clip_s
        overlap_size = sample_rate * self.asr_config.overlap_chunk_second
        chunks = []
        i = 0
        while i < audio_data.shape[-1]:
            if i + chunk_size >= audio_data.shape[-1]:
                # handle last chunk
                chunks.append(audio_data[..., i:])
                break

            # Find the best split point in the overlap region
            search_start = i + chunk_size - overlap_size
            search_end = min(i + chunk_size, audio_data.shape[-1])
            split_point = self._find_split_point(audio_data, search_start, search_end)

            # Extract chunk up to the split point
            chunks.append(audio_data[..., i:split_point])
            i = split_point
        return chunks

    def _find_split_point(self, wav: np.ndarray, start_idx: int, end_idx: int) -> int:
        """Find the best point to split audio by
        looking for silence or low amplitude.
        Args:
            wav: Audio tensor [1, T]
            start_idx: Start index of search region
            end_idx: End index of search region
        Returns:
            Index of best splitting point
        """
        segment = wav[start_idx:end_idx]

        # Calculate RMS energy in small windows
        min_energy = math.inf
        quietest_idx = 0
        min_energy_window = self.asr_config.min_energy_split_window_size
        assert min_energy_window is not None
        for i in range(0, len(segment) - min_energy_window, min_energy_window):
            window = segment[i : i + min_energy_window]
            energy = (window**2).mean() ** 0.5
            if energy < min_energy:
                quietest_idx = i + start_idx
                min_energy = energy
        return quietest_idx

_create_speech_to_text async

_create_speech_to_text(
    audio_data: bytes,
    request: SpeechToTextRequest,
    raw_request: Request,
    response_class: type[ResponseType],
    stream_generator_method: Callable[
        ..., AsyncGenerator[str, None]
    ],
) -> T | V | AsyncGenerator[str, None] | ErrorResponse

Base method for speech-to-text operations like transcription and translation.

Source code in vllm/entrypoints/openai/speech_to_text/speech_to_text.py
async def _create_speech_to_text(
    self,
    audio_data: bytes,
    request: SpeechToTextRequest,
    raw_request: Request,
    response_class: type[ResponseType],
    stream_generator_method: Callable[..., AsyncGenerator[str, None]],
) -> T | V | AsyncGenerator[str, None] | ErrorResponse:
    """Base method for speech-to-text operations like transcription and
    translation."""
    error_check_ret = await self._check_model(request)
    if error_check_ret is not None:
        return error_check_ret

    # If the engine is dead, raise the engine's DEAD_ERROR.
    # This is required for the streaming case, where we return a
    # success status before we actually start generating text :).
    if self.engine_client.errored:
        raise self.engine_client.dead_error

    if request.response_format not in ["text", "json", "verbose_json"]:
        return self.create_error_response(
            "Currently only support response_format: "
            "`text`, `json` or `verbose_json`"
        )

    if (
        request.response_format == "verbose_json"
        and not self.model_cls.supports_segment_timestamp
    ):
        return self.create_error_response(
            f"Currently do not support verbose_json for {request.model}"
        )

    if request.response_format == "verbose_json" and request.stream:
        return self.create_error_response(
            "verbose_json format doesn't support streaming case"
        )
    request_id = f"{self.task_type}-{self._base_request_id(raw_request)}"

    request_metadata = RequestResponseMetadata(request_id=request_id)
    if raw_request:
        raw_request.state.request_metadata = request_metadata

    try:
        lora_request = self._maybe_get_adapters(request)

        prompts, duration_s = await self._preprocess_speech_to_text(
            request=request,
            audio_data=audio_data,
        )

    except ValueError as e:
        logger.exception("Error in preprocessing prompt inputs")
        return self.create_error_response(e)

    list_result_generator: list[AsyncGenerator[RequestOutput, None]] | None = None
    try:
        # Unlike most decoder-only models, whisper generation length is not
        # constrained by the size of the input audio, which is mapped to a
        # fixed-size log-mel-spectogram. Still, allow for fewer tokens to be
        # generated by respecting the extra completion tokens arg.
        if request.max_completion_tokens is None:
            default_max_tokens = self.model_config.max_model_len
        else:
            default_max_tokens = min(
                self.model_config.max_model_len, request.max_completion_tokens
            )
        sampling_params = request.to_sampling_params(
            default_max_tokens, self.default_sampling_params
        )
        if request.response_format == "verbose_json":
            sampling_params.logprobs = 1

        self._log_inputs(
            request_id,
            # It will not display special tokens like <|startoftranscript|>
            request.prompt,
            params=sampling_params,
            lora_request=lora_request,
        )

        trace_headers = (
            None
            if raw_request is None
            else await self._get_trace_headers(raw_request.headers)
        )

        list_result_generator = []
        for i, prompt in enumerate(prompts):
            request_id_item = f"{request_id}_{i}"
            engine_request = self.input_processor.process_inputs(
                request_id_item,
                prompt,
                sampling_params,
                lora_request=lora_request,
                trace_headers=trace_headers,
                priority=0,
            )
            list_result_generator.append(
                self.engine_client.generate(
                    engine_request,
                    sampling_params,
                    request_id_item,
                    lora_request=lora_request,
                )
            )
    except ValueError as e:
        return self.create_error_response(e)

    if request.stream:
        return stream_generator_method(
            request, list_result_generator, request_id, request_metadata, duration_s
        )
    # Non-streaming response.
    total_segments = []
    text_parts = []
    try:
        assert list_result_generator is not None
        segments_types: dict[str, type[SpeechToTextSegment]] = {
            "transcribe": TranscriptionSegment,
            "translate": TranslationSegment,
        }
        segment_class: type[SpeechToTextSegment] = segments_types[self.task_type]
        text = ""
        chunk_size_in_s = self.asr_config.max_audio_clip_s
        if chunk_size_in_s is None:
            assert len(list_result_generator) == 1, (
                "`max_audio_clip_s` is set to None, audio cannot be chunked"
            )
        for idx, result_generator in enumerate(list_result_generator):
            start_time = (
                float(idx * chunk_size_in_s) if chunk_size_in_s is not None else 0.0
            )
            async for op in result_generator:
                if request.response_format == "verbose_json":
                    assert op.outputs[0].logprobs
                    segments: list[SpeechToTextSegment] = (
                        self._get_verbose_segments(
                            tokens=tuple(op.outputs[0].token_ids),
                            segment_class=segment_class,
                            request=request,
                            start_time=start_time,
                            log_probs=op.outputs[0].logprobs,
                        )
                    )

                    total_segments.extend(segments)
                    text_parts.extend([seg.text for seg in segments])
                else:
                    raw_text = op.outputs[0].text
                    text_parts.append(self.model_cls.post_process_output(raw_text))
        text = "".join(text_parts)
        if self.task_type == "transcribe":
            final_response: ResponseType
            # add usage in TranscriptionResponse.
            usage = {
                "type": "duration",
                # rounded up as per openAI specs
                "seconds": int(math.ceil(duration_s)),
            }
            if request.response_format != "verbose_json":
                final_response = cast(
                    T, TranscriptionResponse(text=text, usage=usage)
                )
            else:
                final_response = cast(
                    V,
                    TranscriptionResponseVerbose(
                        text=text,
                        language=request.language,
                        duration=str(duration_s),
                        segments=total_segments,
                    ),
                )
        else:
            # no usage in response for translation task
            if request.response_format != "verbose_json":
                final_response = cast(T, TranslationResponse(text=text))
            else:
                final_response = cast(
                    V,
                    TranslationResponseVerbose(
                        text=text,
                        language=request.language,
                        duration=str(duration_s),
                        segments=total_segments,
                    ),
                )
        return final_response
    except asyncio.CancelledError:
        return self.create_error_response("Client disconnected")
    except ValueError as e:
        return self.create_error_response(e)

_find_split_point

_find_split_point(
    wav: ndarray, start_idx: int, end_idx: int
) -> int

Find the best point to split audio by looking for silence or low amplitude. Args: wav: Audio tensor [1, T] start_idx: Start index of search region end_idx: End index of search region Returns: Index of best splitting point

Source code in vllm/entrypoints/openai/speech_to_text/speech_to_text.py
def _find_split_point(self, wav: np.ndarray, start_idx: int, end_idx: int) -> int:
    """Find the best point to split audio by
    looking for silence or low amplitude.
    Args:
        wav: Audio tensor [1, T]
        start_idx: Start index of search region
        end_idx: End index of search region
    Returns:
        Index of best splitting point
    """
    segment = wav[start_idx:end_idx]

    # Calculate RMS energy in small windows
    min_energy = math.inf
    quietest_idx = 0
    min_energy_window = self.asr_config.min_energy_split_window_size
    assert min_energy_window is not None
    for i in range(0, len(segment) - min_energy_window, min_energy_window):
        window = segment[i : i + min_energy_window]
        energy = (window**2).mean() ** 0.5
        if energy < min_energy:
            quietest_idx = i + start_idx
            min_energy = energy
    return quietest_idx

_get_verbose_segments

_get_verbose_segments(
    tokens: tuple,
    log_probs: FlatLogprobs | list[dict[int, Logprob]],
    request: SpeechToTextRequest,
    segment_class: type[SpeechToTextSegment],
    start_time: float = 0,
) -> list[SpeechToTextSegment]

Convert tokens to verbose segments.

This method expects the model to produce timestamps as tokens (similar to Whisper). If the tokens do not include timestamp information, the segments may not be generated correctly.

Note: No_speech_prob field is not supported in this implementation and will be None. See docs for details.

Source code in vllm/entrypoints/openai/speech_to_text/speech_to_text.py
def _get_verbose_segments(
    self,
    tokens: tuple,
    log_probs: FlatLogprobs | list[dict[int, Logprob]],
    request: SpeechToTextRequest,
    segment_class: type[SpeechToTextSegment],
    start_time: float = 0,
) -> list[SpeechToTextSegment]:
    """
    Convert tokens to verbose segments.

    This method expects the model to produce
    timestamps as tokens (similar to Whisper).
    If the tokens do not include timestamp information,
    the segments may not be generated correctly.

    Note: No_speech_prob field is not supported
    in this implementation and will be None. See docs for details.
    """
    BASE_OFFSET = 0.02
    init_token = self.tokenizer.encode("<|0.00|>", add_special_tokens=False)[0]
    if tokens[-1] == self.tokenizer.eos_token_id:
        tokens = tokens[:-1]

    tokens_with_start = (init_token,) + tokens
    segments: list[SpeechToTextSegment] = []
    last_timestamp_start = 0

    if tokens_with_start[-2] < init_token and tokens_with_start[-1] >= init_token:
        tokens_with_start = tokens_with_start + (tokens_with_start[-1],)
    avg_logprob = 0.0
    for idx in range(1, len(tokens_with_start)):
        # Timestamp tokens (e.g., <|0.00|>) are assumed to be sorted.
        # If the ordering is violated, this slicing may produce incorrect results.
        token = tokens_with_start[idx]
        if token >= init_token and tokens_with_start[idx - 1] >= init_token:
            sliced_timestamp_tokens = tokens_with_start[last_timestamp_start:idx]
            start_timestamp = sliced_timestamp_tokens[0] - init_token
            end_timestamp = sliced_timestamp_tokens[-1] - init_token
            text = self.tokenizer.decode(sliced_timestamp_tokens[1:-1])
            text_bytes = text.encode("utf-8")

            casting_segment = cast(
                SpeechToTextSegment,
                segment_class(
                    id=len(segments),
                    seek=start_time,
                    start=start_time + BASE_OFFSET * start_timestamp,
                    end=start_time + BASE_OFFSET * end_timestamp,
                    temperature=request.temperature,
                    text=text,
                    # The compression ratio measures
                    # how compressible the generated text is.
                    # A higher ratio indicates more repetitive content,
                    # which is a strong sign of hallucination in outputs.
                    compression_ratio=len(text_bytes)
                    / len(zlib.compress(text_bytes)),
                    tokens=sliced_timestamp_tokens[1:-1],
                    avg_logprob=avg_logprob / (idx - last_timestamp_start),
                ),
            )
            segments.append(casting_segment)
            last_timestamp_start = idx
            avg_logprob = 0
        else:
            avg_logprob += log_probs[idx - 1][token].logprob
    return segments

_warmup_audio_preprocessing

_warmup_audio_preprocessing() -> None

Warm up audio processing libraries to avoid first-request latency.

The first call to librosa functions (load, get_duration, mel-spectrogram) triggers JIT compilation and library initialization which can take ~7s. This method warms up these operations during server initialization.

Source code in vllm/entrypoints/openai/speech_to_text/speech_to_text.py
def _warmup_audio_preprocessing(self) -> None:
    """Warm up audio processing libraries to avoid first-request latency.

    The first call to librosa functions (load, get_duration, mel-spectrogram)
    triggers JIT compilation and library initialization which can take ~7s.
    This method warms up these operations during server initialization.
    """
    # Skip warmup if librosa is not installed (optional dependency)
    if isinstance(librosa, PlaceholderModule):
        return

    # Skip warmup if model doesn't support transcription
    if not supports_transcription(self.model_cls):
        return

    if getattr(self.model_cls, "skip_warmup_audio_preprocessing", False):
        return

    try:
        warmup_start = time.perf_counter()
        logger.info("Warming up audio preprocessing libraries...")

        # Create a minimal dummy audio (1 second of silence at target sample rate)
        dummy_audio = np.zeros(int(self.asr_config.sample_rate), dtype=np.float32)

        # Warm up librosa.load by using librosa functions on the dummy data
        # This initializes FFTW, numba JIT, and other audio processing libraries
        _ = librosa.get_duration(y=dummy_audio, sr=self.asr_config.sample_rate)

        # Warm up mel-spectrogram computation with model-specific parameters
        from vllm.transformers_utils.processor import cached_processor_from_config

        processor = cached_processor_from_config(self.model_config)
        feature_extractor = None
        if hasattr(processor, "feature_extractor"):
            feature_extractor = processor.feature_extractor
        elif hasattr(processor, "audio_processor"):
            # For models like GraniteSpeech that use audio_processor
            audio_proc = processor.audio_processor
            if hasattr(audio_proc, "feature_extractor"):
                feature_extractor = audio_proc.feature_extractor
            # If audio_processor doesn't have feature_extractor,
            # skip mel-spectrogram warmup for these models

        if feature_extractor is not None:
            _ = librosa.feature.melspectrogram(
                y=dummy_audio,
                sr=self.asr_config.sample_rate,
                n_mels=getattr(feature_extractor, "n_mels", 128),
                n_fft=getattr(feature_extractor, "n_fft", 400),
                hop_length=getattr(feature_extractor, "hop_length", 160),
            )

        warmup_elapsed = time.perf_counter() - warmup_start
        logger.info("Audio preprocessing warmup completed in %.2fs", warmup_elapsed)
    except Exception:
        # Don't fail initialization if warmup fails - log exception and continue
        logger.exception(
            "Audio preprocessing warmup failed (non-fatal): %s. "
            "First request may experience higher latency.",
        )

_warmup_input_processor

_warmup_input_processor() -> None

Warm up input processor with dummy audio to avoid first-request latency.

The first call to input_processor.process_inputs() with multimodal audio triggers multimodal processing initialization which can take ~2.5s. This method processes a dummy audio request to warm up the pipeline.

Source code in vllm/entrypoints/openai/speech_to_text/speech_to_text.py
def _warmup_input_processor(self) -> None:
    """Warm up input processor with dummy audio to avoid first-request latency.

    The first call to input_processor.process_inputs() with multimodal audio
    triggers multimodal processing initialization which can take ~2.5s.
    This method processes a dummy audio request to warm up the pipeline.
    """
    # Skip warmup if model doesn't support transcription
    if not supports_transcription(self.model_cls):
        return

    # Only warm up if model supports transcription methods
    if not hasattr(self.model_cls, "get_generation_prompt"):
        return

    try:
        from vllm.sampling_params import SamplingParams

        warmup_start = time.perf_counter()
        logger.info("Warming up multimodal input processor...")

        # Create minimal dummy audio (1 second of silence)
        dummy_audio = np.zeros(int(self.asr_config.sample_rate), dtype=np.float32)

        # Use the same method that _preprocess_speech_to_text uses
        # to create the prompt
        dummy_prompt = self.model_cls.get_generation_prompt(
            audio=dummy_audio,
            stt_config=self.asr_config,
            model_config=self.model_config,
            language="en",
            task_type=self.task_type,
            request_prompt="",
            to_language=None,
        )

        # Create minimal sampling params
        dummy_params = SamplingParams(
            max_tokens=1,
            temperature=0.0,
            skip_clone=True,  # Internal warmup, safe to skip clone
        )

        # Process the dummy input through the input processor
        # This will trigger all the multimodal processing initialization
        _ = self.input_processor.process_inputs(
            request_id="warmup",
            prompt=dummy_prompt,
            params=dummy_params,
        )

        warmup_elapsed = time.perf_counter() - warmup_start
        logger.info("Input processor warmup completed in %.2fs", warmup_elapsed)
    except Exception:
        # Don't fail initialization if warmup fails - log warning and continue
        logger.exception(
            "Input processor warmup failed (non-fatal): %s. "
            "First request may experience higher latency."
        )

PlaceholderModule

Bases: _PlaceholderBase

A placeholder object to use when a module does not exist.

This enables more informative errors when trying to access attributes of a module that does not exist.

Source code in vllm/utils/import_utils.py
class PlaceholderModule(_PlaceholderBase):
    """
    A placeholder object to use when a module does not exist.

    This enables more informative errors when trying to access attributes
    of a module that does not exist.
    """

    def __init__(self, name: str) -> None:
        super().__init__()

        # Apply name mangling to avoid conflicting with module attributes
        self.__name = name

    def placeholder_attr(self, attr_path: str):
        return _PlaceholderModuleAttr(self, attr_path)

    def __getattr__(self, key: str) -> Never:
        name = self.__name

        try:
            importlib.import_module(name)
        except ImportError as exc:
            for extra, names in get_vllm_optional_dependencies().items():
                if name in names:
                    msg = f"Please install vllm[{extra}] for {extra} support"
                    raise ImportError(msg) from exc

            raise exc

        raise AssertionError(
            "PlaceholderModule should not be used "
            "when the original module can be imported"
        )

RequestOutput

The output data of a completion request to the LLM.

Parameters:

Name Type Description Default
request_id str

The unique ID of the request.

required
prompt str | None

The prompt string of the request. For encoder/decoder models, this is the decoder input prompt.

required
prompt_token_ids list[int] | None

The token IDs of the prompt. For encoder/decoder models, this is the decoder input prompt token ids.

required
prompt_logprobs PromptLogprobs | None

The log probabilities to return per prompt token.

required
outputs list[CompletionOutput]

The output sequences of the request.

required
finished bool

Whether the whole request is finished.

required
metrics RequestStateStats | None

Metrics associated with the request.

None
lora_request LoRARequest | None

The LoRA request that was used to generate the output.

None
encoder_prompt str | None

The encoder prompt string of the request. None if decoder-only.

None
encoder_prompt_token_ids list[int] | None

The token IDs of the encoder prompt. None if decoder-only.

None
num_cached_tokens int | None

The number of tokens with prefix cache hit.

None
kv_transfer_params dict[str, Any] | None

The params for remote K/V transfer.

None
Source code in vllm/outputs.py
class RequestOutput:
    """The output data of a completion request to the LLM.

    Args:
        request_id: The unique ID of the request.
        prompt: The prompt string of the request.
                For encoder/decoder models, this is the
                decoder input prompt.
        prompt_token_ids: The token IDs of the prompt.
                          For encoder/decoder models, this is the
                          decoder input prompt token ids.
        prompt_logprobs: The log probabilities to return per prompt token.
        outputs: The output sequences of the request.
        finished: Whether the whole request is finished.
        metrics: Metrics associated with the request.
        lora_request: The LoRA request that was used to generate the output.
        encoder_prompt: The encoder prompt string of the request.
                        None if decoder-only.
        encoder_prompt_token_ids: The token IDs of the encoder prompt.
                                  None if decoder-only.
        num_cached_tokens: The number of tokens with prefix cache hit.
        kv_transfer_params: The params for remote K/V transfer.
    """

    def __init__(
        self,
        request_id: str,
        prompt: str | None,
        prompt_token_ids: list[int] | None,
        prompt_logprobs: PromptLogprobs | None,
        outputs: list[CompletionOutput],
        finished: bool,
        metrics: RequestStateStats | None = None,
        lora_request: LoRARequest | None = None,
        encoder_prompt: str | None = None,
        encoder_prompt_token_ids: list[int] | None = None,
        num_cached_tokens: int | None = None,
        *,
        multi_modal_placeholders: MultiModalPlaceholderDict | None = None,
        kv_transfer_params: dict[str, Any] | None = None,
        # Forward compatibility, code that uses args added in new release can
        # still run with older versions of vLLM without breaking.
        **kwargs: Any,
    ) -> None:
        if kwargs:
            logger.warning_once(
                "RequestOutput: Ignoring extra arguments: %s", str(kwargs)
            )
        self.request_id = request_id
        self.prompt = prompt
        self.prompt_token_ids = prompt_token_ids
        self.multi_modal_placeholders = multi_modal_placeholders or {}
        self.prompt_logprobs = prompt_logprobs
        self.outputs = outputs
        self.finished = finished
        self.metrics = metrics
        self.lora_request = lora_request
        self.encoder_prompt = encoder_prompt
        self.encoder_prompt_token_ids = encoder_prompt_token_ids
        self.num_cached_tokens = num_cached_tokens
        self.kv_transfer_params = kv_transfer_params

    def add(self, next_output: "RequestOutput", aggregate: bool) -> None:
        """Merge subsequent RequestOutput into this one"""

        self.finished |= next_output.finished
        self.kv_transfer_params = next_output.kv_transfer_params

        for next_completion in next_output.outputs:
            for i, completion in enumerate(self.outputs):
                if completion.index == next_completion.index:
                    if aggregate:
                        # Merge outputs with same index
                        completion.text += next_completion.text
                        if not isinstance(completion.token_ids, MutableSequence):
                            completion.token_ids = list(completion.token_ids)
                        completion.token_ids.extend(next_completion.token_ids)
                        if next_completion.logprobs:
                            assert completion.logprobs is not None
                            completion.logprobs.extend(next_completion.logprobs)
                        completion.cumulative_logprob = (
                            next_completion.cumulative_logprob
                        )
                        completion.finish_reason = next_completion.finish_reason
                        completion.stop_reason = next_completion.stop_reason
                    else:
                        # Replace the output with the new one
                        self.outputs[i] = next_completion
                    break
            else:
                self.outputs.append(next_completion)

    def __repr__(self) -> str:
        return (
            f"RequestOutput(request_id={self.request_id}, "
            f"prompt={self.prompt!r}, "
            f"prompt_token_ids={self.prompt_token_ids}, "
            f"encoder_prompt={self.encoder_prompt!r}, "
            f"encoder_prompt_token_ids={self.encoder_prompt_token_ids}, "
            f"prompt_logprobs={self.prompt_logprobs}, "
            f"outputs={self.outputs}, "
            f"finished={self.finished}, "
            f"metrics={self.metrics}, "
            f"lora_request={self.lora_request}, "
            f"num_cached_tokens={self.num_cached_tokens}, "
            f"multi_modal_placeholders={self.multi_modal_placeholders})"
        )

add

add(next_output: RequestOutput, aggregate: bool) -> None

Merge subsequent RequestOutput into this one

Source code in vllm/outputs.py
def add(self, next_output: "RequestOutput", aggregate: bool) -> None:
    """Merge subsequent RequestOutput into this one"""

    self.finished |= next_output.finished
    self.kv_transfer_params = next_output.kv_transfer_params

    for next_completion in next_output.outputs:
        for i, completion in enumerate(self.outputs):
            if completion.index == next_completion.index:
                if aggregate:
                    # Merge outputs with same index
                    completion.text += next_completion.text
                    if not isinstance(completion.token_ids, MutableSequence):
                        completion.token_ids = list(completion.token_ids)
                    completion.token_ids.extend(next_completion.token_ids)
                    if next_completion.logprobs:
                        assert completion.logprobs is not None
                        completion.logprobs.extend(next_completion.logprobs)
                    completion.cumulative_logprob = (
                        next_completion.cumulative_logprob
                    )
                    completion.finish_reason = next_completion.finish_reason
                    completion.stop_reason = next_completion.stop_reason
                else:
                    # Replace the output with the new one
                    self.outputs[i] = next_completion
                break
        else:
            self.outputs.append(next_completion)

SupportsTranscription

Bases: Protocol

The interface required for all models that support transcription.

Source code in vllm/model_executor/models/interfaces.py
@runtime_checkable
class SupportsTranscription(Protocol):
    """The interface required for all models that support transcription."""

    # Mapping from ISO639_1 language codes: language names
    supported_languages: ClassVar[Mapping[str, str]]

    supports_transcription: ClassVar[Literal[True]] = True

    supports_transcription_only: ClassVar[bool] = False
    """
    Transcription models can opt out of text generation by setting this to
    `True`.
    """
    supports_segment_timestamp: ClassVar[bool] = False
    """
    Enables the segment timestamp option for supported models by setting this to `True`.
    """

    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        # language codes in supported_languages
        # that don't exist in the full language map
        invalid = set(cls.supported_languages) - set(LANGUAGES.keys())
        if invalid:
            raise ValueError(
                f"{cls.__name__}.supported_languages contains invalid "
                f"language codes: {sorted(invalid)}\n. "
                f"Valid choices are: {sorted(LANGUAGES.keys())}"
            )

    @classmethod
    def get_generation_prompt(
        cls,
        audio: np.ndarray,
        stt_config: SpeechToTextConfig,
        model_config: ModelConfig,
        language: str | None,
        task_type: Literal["transcribe", "translate"],
        request_prompt: str,
        to_language: str | None,
    ) -> PromptType:
        """Get the prompt for the ASR model.
        The model has control over the construction, as long as it
        returns a valid PromptType."""
        ...

    @classmethod
    def get_other_languages(cls) -> Mapping[str, str]:
        # other possible language codes from the whisper map
        return {k: v for k, v in LANGUAGES.items() if k not in cls.supported_languages}

    @classmethod
    def validate_language(cls, language: str | None) -> str | None:
        """
        Ensure the language specified in the transcription request
        is a valid ISO 639-1 language code. If the request language is
        valid, but not natively supported by the model, trigger a
        warning (but not an exception).
        """
        if language is None or language in cls.supported_languages:
            return language
        elif language in cls.get_other_languages():
            logger.warning(
                "Language %r is not natively supported by %s; "
                "results may be less accurate. Supported languages: %r",
                language,
                cls.__name__,
                list(cls.supported_languages.keys()),
            )
            return language
        else:
            raise ValueError(
                f"Unsupported language: {language!r}.  Must be one of "
                f"{list(cls.supported_languages.keys())}."
            )

    @classmethod
    def get_speech_to_text_config(
        cls, model_config: ModelConfig, task_type: Literal["transcribe", "translate"]
    ) -> SpeechToTextConfig:
        """Get the speech to text config for the ASR model."""
        ...

    @classmethod
    def get_num_audio_tokens(
        cls,
        audio_duration_s: float,
        stt_config: SpeechToTextConfig,
        model_config: ModelConfig,
    ) -> int | None:
        """
        Map from audio duration to number of audio tokens produced by the ASR
        model, without running a forward pass.
        This is used for estimating the amount of processing for this audio.
        """
        return None

    @classmethod
    def post_process_output(cls, text: str) -> str:
        """
        Post-process the raw model output text.

        Some ASR models output structured formats (e.g., language tags,
        special tokens) that need to be stripped before returning to the user.

        Args:
            text: Raw decoded text from the model.

        Returns:
            Cleaned transcription text.
        """
        return text

supports_segment_timestamp class-attribute

supports_segment_timestamp: bool = False

Enables the segment timestamp option for supported models by setting this to True.

supports_transcription_only class-attribute

supports_transcription_only: bool = False

Transcription models can opt out of text generation by setting this to True.

get_generation_prompt classmethod

get_generation_prompt(
    audio: ndarray,
    stt_config: SpeechToTextConfig,
    model_config: ModelConfig,
    language: str | None,
    task_type: Literal["transcribe", "translate"],
    request_prompt: str,
    to_language: str | None,
) -> PromptType

Get the prompt for the ASR model. The model has control over the construction, as long as it returns a valid PromptType.

Source code in vllm/model_executor/models/interfaces.py
@classmethod
def get_generation_prompt(
    cls,
    audio: np.ndarray,
    stt_config: SpeechToTextConfig,
    model_config: ModelConfig,
    language: str | None,
    task_type: Literal["transcribe", "translate"],
    request_prompt: str,
    to_language: str | None,
) -> PromptType:
    """Get the prompt for the ASR model.
    The model has control over the construction, as long as it
    returns a valid PromptType."""
    ...

get_num_audio_tokens classmethod

get_num_audio_tokens(
    audio_duration_s: float,
    stt_config: SpeechToTextConfig,
    model_config: ModelConfig,
) -> int | None

Map from audio duration to number of audio tokens produced by the ASR model, without running a forward pass. This is used for estimating the amount of processing for this audio.

Source code in vllm/model_executor/models/interfaces.py
@classmethod
def get_num_audio_tokens(
    cls,
    audio_duration_s: float,
    stt_config: SpeechToTextConfig,
    model_config: ModelConfig,
) -> int | None:
    """
    Map from audio duration to number of audio tokens produced by the ASR
    model, without running a forward pass.
    This is used for estimating the amount of processing for this audio.
    """
    return None

get_speech_to_text_config classmethod

get_speech_to_text_config(
    model_config: ModelConfig,
    task_type: Literal["transcribe", "translate"],
) -> SpeechToTextConfig

Get the speech to text config for the ASR model.

Source code in vllm/model_executor/models/interfaces.py
@classmethod
def get_speech_to_text_config(
    cls, model_config: ModelConfig, task_type: Literal["transcribe", "translate"]
) -> SpeechToTextConfig:
    """Get the speech to text config for the ASR model."""
    ...

post_process_output classmethod

post_process_output(text: str) -> str

Post-process the raw model output text.

Some ASR models output structured formats (e.g., language tags, special tokens) that need to be stripped before returning to the user.

Parameters:

Name Type Description Default
text str

Raw decoded text from the model.

required

Returns:

Type Description
str

Cleaned transcription text.

Source code in vllm/model_executor/models/interfaces.py
@classmethod
def post_process_output(cls, text: str) -> str:
    """
    Post-process the raw model output text.

    Some ASR models output structured formats (e.g., language tags,
    special tokens) that need to be stripped before returning to the user.

    Args:
        text: Raw decoded text from the model.

    Returns:
        Cleaned transcription text.
    """
    return text

validate_language classmethod

validate_language(language: str | None) -> str | None

Ensure the language specified in the transcription request is a valid ISO 639-1 language code. If the request language is valid, but not natively supported by the model, trigger a warning (but not an exception).

Source code in vllm/model_executor/models/interfaces.py
@classmethod
def validate_language(cls, language: str | None) -> str | None:
    """
    Ensure the language specified in the transcription request
    is a valid ISO 639-1 language code. If the request language is
    valid, but not natively supported by the model, trigger a
    warning (but not an exception).
    """
    if language is None or language in cls.supported_languages:
        return language
    elif language in cls.get_other_languages():
        logger.warning(
            "Language %r is not natively supported by %s; "
            "results may be less accurate. Supported languages: %r",
            language,
            cls.__name__,
            list(cls.supported_languages.keys()),
        )
        return language
    else:
        raise ValueError(
            f"Unsupported language: {language!r}.  Must be one of "
            f"{list(cls.supported_languages.keys())}."
        )

TranscriptionResponse

Bases: OpenAIBaseModel

Source code in vllm/entrypoints/openai/speech_to_text/protocol.py
class TranscriptionResponse(OpenAIBaseModel):
    text: str
    """The transcribed text."""
    usage: TranscriptionUsageAudio

text instance-attribute

text: str

The transcribed text.

TranscriptionResponseVerbose

Bases: OpenAIBaseModel

Source code in vllm/entrypoints/openai/speech_to_text/protocol.py
class TranscriptionResponseVerbose(OpenAIBaseModel):
    duration: str
    """The duration of the input audio."""

    language: str
    """The language of the input audio."""

    text: str
    """The transcribed text."""

    segments: list[TranscriptionSegment] | None = None
    """Segments of the transcribed text and their corresponding details."""

    words: list[TranscriptionWord] | None = None
    """Extracted words and their corresponding timestamps."""

duration instance-attribute

duration: str

The duration of the input audio.

language instance-attribute

language: str

The language of the input audio.

segments class-attribute instance-attribute

segments: list[TranscriptionSegment] | None = None

Segments of the transcribed text and their corresponding details.

text instance-attribute

text: str

The transcribed text.

words class-attribute instance-attribute

words: list[TranscriptionWord] | None = None

Extracted words and their corresponding timestamps.

TranscriptionSegment

Bases: OpenAIBaseModel

Source code in vllm/entrypoints/openai/speech_to_text/protocol.py
class TranscriptionSegment(OpenAIBaseModel):
    id: int
    """Unique identifier of the segment."""

    avg_logprob: float
    """Average logprob of the segment.

    If the value is lower than -1, consider the logprobs failed.
    """

    compression_ratio: float
    """Compression ratio of the segment.

    If the value is greater than 2.4, consider the compression failed.
    """

    end: float
    """End time of the segment in seconds."""

    no_speech_prob: float | None = None
    """Probability of no speech in the segment.

    If the value is higher than 1.0 and the `avg_logprob` is below -1, consider
    this segment silent.
    """

    seek: int
    """Seek offset of the segment."""

    start: float
    """Start time of the segment in seconds."""

    temperature: float
    """Temperature parameter used for generating the segment."""

    text: str
    """Text content of the segment."""

    tokens: list[int]
    """Array of token IDs for the text content."""

avg_logprob instance-attribute

avg_logprob: float

Average logprob of the segment.

If the value is lower than -1, consider the logprobs failed.

compression_ratio instance-attribute

compression_ratio: float

Compression ratio of the segment.

If the value is greater than 2.4, consider the compression failed.

end instance-attribute

end: float

End time of the segment in seconds.

id instance-attribute

id: int

Unique identifier of the segment.

no_speech_prob class-attribute instance-attribute

no_speech_prob: float | None = None

Probability of no speech in the segment.

If the value is higher than 1.0 and the avg_logprob is below -1, consider this segment silent.

seek instance-attribute

seek: int

Seek offset of the segment.

start instance-attribute

start: float

Start time of the segment in seconds.

temperature instance-attribute

temperature: float

Temperature parameter used for generating the segment.

text instance-attribute

text: str

Text content of the segment.

tokens instance-attribute

tokens: list[int]

Array of token IDs for the text content.

TranslationResponse

Bases: OpenAIBaseModel

Source code in vllm/entrypoints/openai/speech_to_text/protocol.py
class TranslationResponse(OpenAIBaseModel):
    text: str
    """The translated text."""

text instance-attribute

text: str

The translated text.

TranslationResponseVerbose

Bases: OpenAIBaseModel

Source code in vllm/entrypoints/openai/speech_to_text/protocol.py
class TranslationResponseVerbose(OpenAIBaseModel):
    duration: str
    """The duration of the input audio."""

    language: str
    """The language of the input audio."""

    text: str
    """The translated text."""

    segments: list[TranslationSegment] | None = None
    """Segments of the translated text and their corresponding details."""

    words: list[TranslationWord] | None = None
    """Extracted words and their corresponding timestamps."""

duration instance-attribute

duration: str

The duration of the input audio.

language instance-attribute

language: str

The language of the input audio.

segments class-attribute instance-attribute

segments: list[TranslationSegment] | None = None

Segments of the translated text and their corresponding details.

text instance-attribute

text: str

The translated text.

words class-attribute instance-attribute

words: list[TranslationWord] | None = None

Extracted words and their corresponding timestamps.

TranslationSegment

Bases: OpenAIBaseModel

Source code in vllm/entrypoints/openai/speech_to_text/protocol.py
class TranslationSegment(OpenAIBaseModel):
    id: int
    """Unique identifier of the segment."""

    avg_logprob: float
    """Average logprob of the segment.

    If the value is lower than -1, consider the logprobs failed.
    """

    compression_ratio: float
    """Compression ratio of the segment.

    If the value is greater than 2.4, consider the compression failed.
    """

    end: float
    """End time of the segment in seconds."""

    no_speech_prob: float | None = None
    """Probability of no speech in the segment.

    If the value is higher than 1.0 and the `avg_logprob` is below -1, consider
    this segment silent.
    """

    seek: int
    """Seek offset of the segment."""

    start: float
    """Start time of the segment in seconds."""

    temperature: float
    """Temperature parameter used for generating the segment."""

    text: str
    """Text content of the segment."""

    tokens: list[int]
    """Array of token IDs for the text content."""

avg_logprob instance-attribute

avg_logprob: float

Average logprob of the segment.

If the value is lower than -1, consider the logprobs failed.

compression_ratio instance-attribute

compression_ratio: float

Compression ratio of the segment.

If the value is greater than 2.4, consider the compression failed.

end instance-attribute

end: float

End time of the segment in seconds.

id instance-attribute

id: int

Unique identifier of the segment.

no_speech_prob class-attribute instance-attribute

no_speech_prob: float | None = None

Probability of no speech in the segment.

If the value is higher than 1.0 and the avg_logprob is below -1, consider this segment silent.

seek instance-attribute

seek: int

Seek offset of the segment.

start instance-attribute

start: float

Start time of the segment in seconds.

temperature instance-attribute

temperature: float

Temperature parameter used for generating the segment.

text instance-attribute

text: str

Text content of the segment.

tokens instance-attribute

tokens: list[int]

Array of token IDs for the text content.

VLLMValidationError

Bases: ValueError

vLLM-specific validation error for request validation failures.

Parameters:

Name Type Description Default
message str

The error message describing the validation failure.

required
parameter str | None

Optional parameter name that failed validation.

None
value Any

Optional value that was rejected during validation.

None
Source code in vllm/exceptions.py
class VLLMValidationError(ValueError):
    """vLLM-specific validation error for request validation failures.

    Args:
        message: The error message describing the validation failure.
        parameter: Optional parameter name that failed validation.
        value: Optional value that was rejected during validation.
    """

    def __init__(
        self,
        message: str,
        *,
        parameter: str | None = None,
        value: Any = None,
    ) -> None:
        super().__init__(message)
        self.parameter = parameter
        self.value = value

    def __str__(self):
        base = super().__str__()
        extras = []
        if self.parameter is not None:
            extras.append(f"parameter={self.parameter}")
        if self.value is not None:
            extras.append(f"value={self.value}")
        return f"{base} ({', '.join(extras)})" if extras else base

get_tokenizer

get_tokenizer(
    tokenizer_name: str | Path,
    *args,
    tokenizer_cls: type[_T] = TokenizerLike,
    trust_remote_code: bool = False,
    revision: str | None = None,
    download_dir: str | None = None,
    **kwargs,
) -> _T

Gets a tokenizer for the given model name via HuggingFace or ModelScope.

Source code in vllm/tokenizers/registry.py
def get_tokenizer(
    tokenizer_name: str | Path,
    *args,
    tokenizer_cls: type[_T] = TokenizerLike,  # type: ignore[assignment]
    trust_remote_code: bool = False,
    revision: str | None = None,
    download_dir: str | None = None,
    **kwargs,
) -> _T:
    """Gets a tokenizer for the given model name via HuggingFace or ModelScope."""
    tokenizer_mode, tokenizer_name, args, kwargs = cached_resolve_tokenizer_args(
        tokenizer_name,
        *args,
        trust_remote_code=trust_remote_code,
        revision=revision,
        download_dir=download_dir,
        **kwargs,
    )

    if tokenizer_cls == TokenizerLike:
        tokenizer_cls_ = TokenizerRegistry.load_tokenizer_cls(tokenizer_mode)
    else:
        tokenizer_cls_ = tokenizer_cls

    tokenizer = tokenizer_cls_.from_pretrained(tokenizer_name, *args, **kwargs)
    if not tokenizer.is_fast:
        logger.warning(
            "Using a slow tokenizer. This might cause a significant "
            "slowdown. Consider using a fast tokenizer instead."
        )

    return tokenizer  # type: ignore

init_logger

init_logger(name: str) -> _VllmLogger

The main purpose of this function is to ensure that loggers are retrieved in such a way that we can be sure the root vllm logger has already been configured.

Source code in vllm/logger.py
def init_logger(name: str) -> _VllmLogger:
    """The main purpose of this function is to ensure that loggers are
    retrieved in such a way that we can be sure the root vllm logger has
    already been configured."""

    logger = logging.getLogger(name)

    for method_name, method in _METHODS_TO_PATCH.items():
        setattr(logger, method_name, MethodType(method, logger))

    return cast(_VllmLogger, logger)

parse_enc_dec_prompt

parse_enc_dec_prompt(
    prompt: object,
) -> EncoderDecoderDictPrompt

Parse a prompt for an encoder-decoder model and normalize it to a dictionary.

Source code in vllm/renderers/inputs/preprocess.py
def parse_enc_dec_prompt(prompt: object) -> EncoderDecoderDictPrompt:
    """
    Parse a prompt for an encoder-decoder model and normalize it to a dictionary.
    """
    if isinstance(prompt, dict) and "encoder_prompt" in prompt:
        enc_prompt: object = prompt["encoder_prompt"]  # type: ignore[typeddict-item]
        dec_prompt: object | None = prompt["decoder_prompt"]  # type: ignore[typeddict-item]
    else:
        enc_prompt = prompt
        dec_prompt = None

    return EncoderDecoderDictPrompt(
        encoder_prompt=_parse_enc_prompt(enc_prompt),
        decoder_prompt=None if dec_prompt is None else _parse_dec_prompt(dec_prompt),
    )