# NOTE: This module is imported at the top level of furiosa_llm, so it must not
# import packages that require native packages (e.g.,
# furiosa-native-runtime, furiosa-native-llm-common, furiosa-torch-ext) at
# module scope. Use lazy imports inside methods or TYPE_CHECKING blocks instead,
# so that `import furiosa_llm` works without those packages installed.
# This is necessary because npu-tools and download CI environments depend only
# on furiosa-models without native packages.
from functools import cached_property
import logging
import os
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
Dict,
List,
Literal,
Optional,
Sequence,
Union,
cast,
)
from pydantic_core import to_json
from furiosa_llm.generation_config import get_diff_sampling_params
from furiosa_llm.vllm_compat import (
PromptType,
apply_prompt_truncation,
get_score_prompt,
preprocess_prompt,
prompt_to_str,
)
if TYPE_CHECKING:
from furiosa.native_runtime.llm import NativeLLMEngine
from furiosa_llm.artifact.types.config import ParallelConfig
from furiosa_llm.parallelize.pipeline.types import Device
from openai.types.chat import ChatCompletionMessageParam
from tests.utils import FakeNativeLLMEngine
import uuid
import torch
from transformers.tokenization_python import PreTrainedTokenizer
from transformers.tokenization_utils_base import BatchEncoding
from transformers.tokenization_utils_tokenizers import PreTrainedTokenizerFast
from furiosa_llm.metadata.tasks import POOLING_TASKS, PoolingTask
from furiosa_llm.server.utils import is_list_of
from furiosa_llm.version import FURIOSA_LLM_VERSION
from .device import resolve_devices
from .metadata.config_types import SchedulerConfig
from .outputs import (
CompletionOutput,
EmbeddingRequestOutput,
Logprob,
PoolingOutput,
PoolingRequestOutput,
RequestOutput,
ScoringRequestOutput,
)
from .sampling_params import PoolingParams, SamplingParams
from .tokenizer import encode_auto, get_tokenizer
from .utils import get_logger_with_tz
logger = get_logger_with_tz(logging.getLogger(__name__))
# Default index of the padding block when paged attention model is used.
DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX = 0
CACHE_DIR: Path = Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "furiosa" / "llm"
# JIT Wiring default configuration
DEFAULT_JIT_THRESHOLD = 5
DEFAULT_JIT_MAX_WORKERS = 15
DEFAULT_JIT_UNIT_SIZE = 8
TokenizerModeType = Literal["auto", "slow"]
ChatTemplateContentFormatOption = Literal["string"]
RAY_LOG_PREFIX = "[furiosa-llm]"
STREAMING_MAX_DECODE_TRIAL = 2
[docs]
class LLM:
"""An LLM for generating texts from given prompts and sampling parameters."""
max_seq_len_to_capture: Optional[int]
engine: Union["NativeLLMEngine", "FakeNativeLLMEngine"]
[docs]
@classmethod
def load_artifact(
cls,
model_id_or_path: Union[str, os.PathLike],
**kwargs,
) -> "LLM":
"""Deprecated: Use LLM() constructor directly.
This method is kept for backward compatibility and will be removed in a future release.
"""
import warnings
warnings.warn(
"LLM.load_artifact() is deprecated. Use LLM() constructor directly.",
DeprecationWarning,
stacklevel=2,
)
return cls(model_id_or_path, **kwargs)
def __init__(
self,
model_id_or_path: Union[str, os.PathLike],
*,
# V3 engine Configuration
fxb: Optional[Union[str, os.PathLike]] = None,
# Repo Configuration
revision: Optional[str] = None,
# Runtime Configuration
devices: Optional[Union[str, Sequence["Device"]]] = None,
data_parallel_size: Optional[int] = None,
pipeline_parallel_size: Optional[int] = None,
num_blocks_per_pp_stage: Optional[Sequence[int]] = None,
max_io_memory_mb: int = 2048,
# Pipeline selection related Configs
max_model_len: Optional[int] = None, # TODO: limit model max length via bucket overrides
scheduler_config: Optional[SchedulerConfig] = None,
# Structured outputs related Configuration
structured_outputs_backend: Literal["auto", "guidance", "xgrammar"] = "auto",
# Other Configuration
tokenizer: Optional[Union[str, PreTrainedTokenizer, PreTrainedTokenizerFast]] = None,
tokenizer_mode: TokenizerModeType = "auto",
seed: Optional[int] = None,
cache_dir: os.PathLike = CACHE_DIR,
skip_engine: bool = False,
enable_jit_compilation: bool = False,
jit_threshold: int = DEFAULT_JIT_THRESHOLD,
jit_max_workers: int = DEFAULT_JIT_MAX_WORKERS,
jit_unit_size: int = DEFAULT_JIT_UNIT_SIZE,
served_model_name: Optional[str] = None,
**kwargs,
) -> None:
"""Instantiate LLM from saved artifacts or from a v3 engine (FXB).
When ``fxb`` is provided, the v3 engine path is used: ``model_id_or_path`` is
treated as a HuggingFace model id or directory, and model metadata is derived from it.
Otherwise, ``model_id_or_path`` is treated as a furiosa-llm artifact path.
Args:
model_id_or_path: A path to furiosa llm engine artifact or a HuggingFace model id.
fxb: Path to the FXB artifact directory. When provided, the v3 engine is used
and ``model_id_or_path`` is treated as a HuggingFace model id or directory.
revision: The revision of the model, if `model_id_or_path` is a HuggingFace model id.
devices: The devices to run the model. It can be a single device or a list of devices.
Each device can be either "npu:X" or "npu:X:\\*" where X is a specific device index.
If not given, all available devices will be used.
data_parallel_size: The size of the data parallelism group. If not given, it will be inferred from
total available PEs and other parallelism degrees.
pipeline_parallel_size: The size of the pipeline parallelism.
If not given, it will use the value from artifact.
num_blocks_per_pp_stage: The number of transformer blocks per each pipeline parallelism stage.
If only `pipeline_parallel_size` is provided, transformer blocks will be distributed equally.
max_io_memory_mb: Maximum NPU memory to be used as I/O tensor, which can range from 0 to 48GB.
If unspecified, will use the default value of 2048.
max_model_len: The maximum context length to use. If given, decode buckets with attention size
larger than this value will be ignored.
scheduler_config: Configuration for the scheduler, allowing to maximum number of tasks which can
be queued to HW, maximum number of samples that can be processed by the scheduler, and ratio
of spare blocks that are reserved by scheduler. If this is not given, scheduler config saved
in the artifacts will be used.
structured_outputs_backend: The backend for structured outputs. "auto" will automatically select the
best backend based on the model. "guidance" will use the guidance library. "xgrammar" will
use the xgrammar library.
tokenizer: The name or path of a HuggingFace Transformers tokenizer.
tokenizer_mode: The tokenizer mode. "auto" will use the fast tokenizer
if available, and "slow" will always use the slow tokenizer.
seed: The seed to initialize the random number generator for sampling.
cache_dir: The cache directory for all generated files for this LLM instance.
When its value is ``None``, caching is disabled. The default is "$HOME/.cache/furiosa/llm".
skip_engine: If True, the native runtime engine will not be initialized. This is useful when you need
the pipelines for other purposes than running them with the engine.
enable_jit_compilation: [EXPERIMENTAL] If True, JIT compilation will be enabled.
jit_threshold: [EXPERIMENTAL] Number of requests before triggering JIT compilation.
jit_max_workers: [EXPERIMENTAL] Maximum concurrent background JIT compilations.
jit_unit_size: [EXPERIMENTAL] Number of stages to compile together. Must be >= 2.
served_model_name: The model name used in metrics and API responses.
If not specified, defaults to ``model_id_or_path``.
"""
if jit_unit_size < 2:
raise ValueError(f"jit_unit_size must be >= 2, got {jit_unit_size}")
if fxb is not None:
self._init_from_v3_engine(
model_id_or_path=model_id_or_path,
fxb_path=fxb,
devices=devices,
data_parallel_size=data_parallel_size,
pipeline_parallel_size=pipeline_parallel_size,
max_io_memory_mb=max_io_memory_mb,
scheduler_config=scheduler_config,
structured_outputs_backend=structured_outputs_backend,
tokenizer=tokenizer,
tokenizer_mode=tokenizer_mode,
enable_jit_compilation=enable_jit_compilation,
jit_threshold=jit_threshold,
jit_max_workers=jit_max_workers,
jit_unit_size=jit_unit_size,
served_model_name=served_model_name,
cache_dir=cache_dir,
)
else:
self._init_from_artifact(
model_id_or_path=model_id_or_path,
revision=revision,
devices=devices,
data_parallel_size=data_parallel_size,
pipeline_parallel_size=pipeline_parallel_size,
num_blocks_per_pp_stage=num_blocks_per_pp_stage,
max_io_memory_mb=max_io_memory_mb,
scheduler_config=scheduler_config,
structured_outputs_backend=structured_outputs_backend,
tokenizer=tokenizer,
tokenizer_mode=tokenizer_mode,
cache_dir=cache_dir,
skip_engine=skip_engine,
enable_jit_compilation=enable_jit_compilation,
jit_threshold=jit_threshold,
jit_max_workers=jit_max_workers,
jit_unit_size=jit_unit_size,
served_model_name=served_model_name,
)
def _init_from_v3_engine(
self,
model_id_or_path: Union[str, os.PathLike],
fxb_path: Union[str, os.PathLike],
*,
devices: Optional[Union[str, Sequence["Device"]]] = None,
data_parallel_size: Optional[int] = None,
pipeline_parallel_size: Optional[int] = None,
max_io_memory_mb: int = 2048,
scheduler_config: Optional[SchedulerConfig] = None,
structured_outputs_backend: Literal["auto", "guidance", "xgrammar"] = "auto",
tokenizer: Optional[Union[str, PreTrainedTokenizer, PreTrainedTokenizerFast]] = None,
tokenizer_mode: TokenizerModeType = "auto",
enable_jit_compilation: bool = False,
jit_threshold: int = DEFAULT_JIT_THRESHOLD,
jit_max_workers: int = DEFAULT_JIT_MAX_WORKERS,
jit_unit_size: int = DEFAULT_JIT_UNIT_SIZE,
served_model_name: Optional[str] = None,
cache_dir: os.PathLike = CACHE_DIR,
) -> None:
from transformers import AutoConfig
from furiosa_llm.generation_config import get_diff_sampling_params
from furiosa_llm.metadata.metadata import ModelMetadataBase
model_id_or_path = str(model_id_or_path)
fxb_path = str(fxb_path)
# Load tokenizer from HF model directory
self.tokenizer = get_tokenizer(model_id_or_path, tokenizer, tokenizer_mode=tokenizer_mode)
# Build model metadata from HF config
hf_config = AutoConfig.from_pretrained(model_id_or_path)
hf_configs = hf_config.to_dict()
model_metadata = ModelMetadataBase(
model_type=hf_config.model_type,
task="generate",
hf_configs=hf_configs,
)
# Derive sequence length limits from HF config
max_position_embeddings = hf_configs.get("max_position_embeddings")
if max_position_embeddings is None:
raise ValueError("'max_position_embeddings' not found in the model config.")
# Set instance attributes
self.served_model_name = served_model_name or model_id_or_path
self.model_metadata = model_metadata
self.model_config = hf_configs
self.artifact_id = Path(fxb_path).name
self.is_generative_model = True
# TODO: clean up - these are placeholders using max_position_embeddings;
# actual values are set later during engine creation.
self.max_seq_len_to_capture = max_position_embeddings
self.prompt_max_seq_len = max_position_embeddings
self.pipeline_metadata: list = []
self.parallel_config: Optional["ParallelConfig"] = None
self.pipelines: list = []
self.default_generation_config: dict[str, Any] = get_diff_sampling_params(model_id_or_path)
devices = resolve_devices(devices)
from furiosa.native_runtime.llm import NativeLLMEngine
self.engine = NativeLLMEngine(
fxb_path,
None, # draft_artifact_path
devices,
data_parallel_size,
pipeline_parallel_size,
max_io_memory_mb,
self._serialize_obj(scheduler_config or SchedulerConfig()),
structured_outputs_backend,
self.tokenizer.backend_tokenizer.to_str(),
None, # num_speculative_tokens
enable_jit_compilation,
jit_threshold,
jit_max_workers,
jit_unit_size,
str(cache_dir) if cache_dir else None,
self.served_model_name,
model_id_or_path, # model_poc_hf_model_dir
)
def _init_from_artifact(
self,
model_id_or_path: Union[str, os.PathLike],
*,
revision: Optional[str] = None,
devices: Optional[Union[str, Sequence["Device"]]] = None,
data_parallel_size: Optional[int] = None,
pipeline_parallel_size: Optional[int] = None,
num_blocks_per_pp_stage: Optional[Sequence[int]] = None,
max_io_memory_mb: int = 2048,
scheduler_config: Optional[SchedulerConfig] = None,
structured_outputs_backend: Literal["auto", "guidance", "xgrammar"] = "auto",
tokenizer: Optional[Union[str, PreTrainedTokenizer, PreTrainedTokenizerFast]] = None,
tokenizer_mode: TokenizerModeType = "auto",
cache_dir: os.PathLike = CACHE_DIR,
skip_engine: bool = False,
enable_jit_compilation: bool = False,
jit_threshold: int = DEFAULT_JIT_THRESHOLD,
jit_max_workers: int = DEFAULT_JIT_MAX_WORKERS,
jit_unit_size: int = DEFAULT_JIT_UNIT_SIZE,
served_model_name: Optional[str] = None,
) -> None:
from furiosa.native_llm_common import NextGenArtifact
from furiosa_llm.utils import compute_bucket_lengths, resolve_artifact_path
artifact_path = resolve_artifact_path(model_id_or_path, revision, FURIOSA_LLM_VERSION)
tokenizer = get_tokenizer(artifact_path, tokenizer, tokenizer_mode=tokenizer_mode)
artifact = NextGenArtifact.load_without_blob(artifact_path)
model_metadata = artifact.model.model_metadata
devices = resolve_devices(devices)
artifact.override_with(
artifact_path,
num_blocks_per_pp_stage,
str(cache_dir) if cache_dir else None,
)
max_prefill_bucket_len, max_decode_bucket_len = compute_bucket_lengths(
artifact.model.pipeline_metadata_list
)
_max_model_len = max(max_decode_bucket_len, max_prefill_bucket_len)
# Set instance attributes
self.served_model_name = served_model_name or str(model_id_or_path)
self.model_metadata = model_metadata
self.model_config = model_metadata.hf_configs
self.artifact_id = artifact.metadata.artifact_id
self.is_generative_model = model_metadata.is_generative_model
self.pipeline_metadata = artifact.model.pipeline_metadata_list
self.parallel_config = artifact.model.parallel_config
self.max_seq_len_to_capture = _max_model_len
self.prompt_max_seq_len = max_prefill_bucket_len
self.tokenizer = tokenizer
# Fields only used for testing and debugging purpose.
self.pipelines = artifact.model.pipelines # type: ignore[assignment]
self.default_generation_config = get_diff_sampling_params(artifact_path)
if not skip_engine:
from furiosa.native_runtime.llm import NativeLLMEngine
self.engine = NativeLLMEngine(
artifact_path,
None, # draft_artifact_path (speculative decoding not exposed)
devices,
data_parallel_size,
pipeline_parallel_size,
max_io_memory_mb,
self._serialize_obj(scheduler_config or SchedulerConfig()),
structured_outputs_backend,
tokenizer.backend_tokenizer.to_str(),
None, # num_speculative_tokens (speculative decoding not exposed)
enable_jit_compilation,
jit_threshold,
jit_max_workers,
jit_unit_size,
str(cache_dir) if cache_dir else None,
self.served_model_name,
)
[docs]
def get_default_sampling_params(self) -> SamplingParams:
"""Return SamplingParams reflecting model's generation_config defaults.
If the model has no generation_config or it matches HF defaults,
returns a default SamplingParams().
"""
if self.default_generation_config:
return SamplingParams.from_optional(**self.default_generation_config)
return SamplingParams()
@classmethod
def _serialize_obj(
cls,
obj: Any,
) -> str:
return to_json(obj).decode("utf-8")
[docs]
def generate(
self,
prompts: Union[str, List[str]],
sampling_params: Optional[SamplingParams] = None,
prompt_token_ids: Optional[BatchEncoding] = None,
tokenizer_kwargs: Optional[Dict[str, Any]] = None,
) -> Union[RequestOutput, List[RequestOutput]]:
"""Generate texts from given prompts and sampling parameters.
Args:
prompts: The prompts to generate texts.
sampling_params: The sampling parameters for generating texts.
If None, model's generation config defaults are used.
prompt_token_ids: Pre-tokenized prompt input as a `BatchEncoding` object.
If not provided, the prompt will be tokenized internally using the tokenizer.
tokenizer_kwargs: Additional keyword arguments passed to the tokenizer's
`encode` method, such as `{"use_special_tokens": True}`.
Returns:
A list of `RequestOutput` objects containing the generated
completions in the same order as the input prompts.
"""
if sampling_params is None:
sampling_params = self.get_default_sampling_params()
if not self.is_generative_model:
raise ValueError("generate API can only be used for generative models.")
if prompt_token_ids is None:
if tokenizer_kwargs is None:
tokenizer_kwargs = {}
prompt_token_ids = encode_auto(self.tokenizer, prompts, **tokenizer_kwargs)
input_ids = prompt_token_ids.input_ids
if input_ids and isinstance(input_ids[0], list):
longest_prompt_len = max(len(prompt) for prompt in input_ids)
else:
longest_prompt_len = len(input_ids)
assert (
self.max_seq_len_to_capture is not None
), "Generative models must have max_seq_len_to_capture set."
if longest_prompt_len > self.prompt_max_seq_len:
raise ValueError(
f"This model's maximum input context length is {self.prompt_max_seq_len} tokens."
f" However, your messages resulted in {longest_prompt_len} tokens."
" Please reduce the length of the messages."
)
native_outputs = self.engine.generate(prompt_token_ids, sampling_params)
return self._generate_postprocess(native_outputs, prompts, prompt_token_ids)
[docs]
def chat(
self,
messages: Union[
List["ChatCompletionMessageParam"], List[List["ChatCompletionMessageParam"]]
],
sampling_params: Optional[SamplingParams] = None,
chat_template: Optional[str] = None,
chat_template_content_format: ChatTemplateContentFormatOption = "string",
add_generation_prompt: bool = True,
continue_final_message: bool = False,
tools: Optional[List[Dict[str, Any]]] = None,
chat_template_kwargs: Optional[Dict[str, Any]] = None,
) -> List[RequestOutput]:
"""
Generate responses for a chat conversation.
The chat conversation is converted into a text prompt using the
tokenizer and calls the :meth:`generate` method to generate the
responses.
Args:
messages: A list of conversations or a single conversation.
- Each conversation is represented as a list of messages.
- Each message is a dictionary with 'role' and 'content' keys.
sampling_params: The sampling parameters for text generation.
chat_template: The template to use for structuring the chat.
If not provided, the model's default chat template will be used.
chat_template_content_format: The format to render message content.
Currently only "string" is supported.
add_generation_prompt: If True, adds a generation template
to each message.
continue_final_message: If True, continues the final message in
the conversation instead of starting a new one. Cannot be
``True`` if ``add_generation_prompt`` is also ``True``.
tools: Optional list of tools to use in the chat.
chat_template_kwargs: Additional keyword arguments to pass to the
chat template rendering function.
Returns:
A list of ``RequestOutput`` objects containing the generated
responses in the same order as the input messages.
"""
if sampling_params is None:
sampling_params = self.get_default_sampling_params()
if continue_final_message and add_generation_prompt:
raise ValueError(
"continue_final_message cannot be True when add_generation_prompt is True."
)
messages_list: List[List["ChatCompletionMessageParam"]]
if is_list_of(messages, list):
messages_list = cast(List[List["ChatCompletionMessageParam"]], messages)
else:
messages_list = [cast(List["ChatCompletionMessageParam"], messages)]
_chat_template_kwargs: dict[str, Any] = dict(
chat_template=chat_template,
add_generation_prompt=add_generation_prompt,
continue_final_message=continue_final_message,
tools=tools,
)
_chat_template_kwargs.update(chat_template_kwargs or {})
rendered_prompts = self.tokenizer.apply_chat_template(
messages_list, # type: ignore[arg-type]
tokenize=False,
**_chat_template_kwargs,
)
return self.generate(
rendered_prompts, sampling_params, tokenizer_kwargs={"add_special_tokens": False} # type: ignore
)
[docs]
async def stream_generate(
self,
prompt: str,
sampling_params: Optional[SamplingParams] = None,
prompt_token_ids: Optional[BatchEncoding] = None,
tokenizer_kwargs: Optional[Dict[str, Any]] = None,
is_demo: bool = False,
) -> AsyncGenerator[str, None]:
"""Generate texts from given prompt and sampling parameters.
Args:
prompt: The prompt to generate texts. Note that unlike `generate`,
this API supports only a single prompt.
sampling_params: The sampling parameters for generating texts.
prompt_token_ids: Pre-tokenized prompt input as a `BatchEncoding` object.
If not provided, the prompt will be tokenized internally using the tokenizer.
tokenizer_kwargs: Additional keyword arguments passed to the tokenizer's
`encode` method, such as `{"use_special_tokens": True}`.
Returns:
A stream of generated output tokens.
"""
if sampling_params is None:
sampling_params = self.get_default_sampling_params()
try:
from furiosa.native_runtime.llm import NativeRequestOutput
except ImportError:
pass
if not self.is_generative_model:
raise ValueError("generate API can only be used for generative models.")
if not isinstance(prompt, str):
raise ValueError("prompt must be a single string.")
if prompt_token_ids is None:
if tokenizer_kwargs is None:
tokenizer_kwargs = {}
prompt_token_ids = encode_auto(self.tokenizer, prompt, **tokenizer_kwargs)
input_ids = prompt_token_ids.input_ids
if input_ids and isinstance(input_ids[0], list):
max_prompt_len = max(len(prompt) for prompt in input_ids)
else:
max_prompt_len = len(input_ids)
assert (
self.max_seq_len_to_capture is not None
), "Generative models must have max_seq_len_to_capture set."
if max_prompt_len > self.prompt_max_seq_len:
raise ValueError(
f"This model's maximum input context length is {self.prompt_max_seq_len} tokens."
f" However, your messages resulted in {max_prompt_len} tokens."
" Please reduce the length of the messages."
)
# FIXME: LLM.__init__() should take max_tokens to determine the maximum sequence length through bucket generations
# and use the config value to raise an error.
if is_demo and len(prompt_token_ids.input_ids) > 1024: # type: ignore
raise ValueError("The length of the prompt is larger than 1024 tokens")
# NOTE: type of engine.stream_generate() is AsyncGenerator[RequestOutput, None]
token_buffer: List[int] = []
request_output: Union[RequestOutput, NativeRequestOutput]
async for request_output in self.engine.stream_generate(prompt_token_ids, sampling_params):
num_decode_trials = STREAMING_MAX_DECODE_TRIAL
for completion_output in request_output.outputs:
token_buffer.extend(completion_output.token_ids)
num_decode_trials = min(num_decode_trials, len(completion_output.token_ids))
if num_decode_trials == 0:
continue
for tokens_to_discard in range(num_decode_trials):
end_offset = len(token_buffer) - 1 - tokens_to_discard
new_text = self.tokenizer.decode(
token_buffer[: end_offset + 1], skip_special_tokens=True
)
assert type(new_text) is str
if not new_text.endswith("�"):
break
else:
continue
token_buffer = token_buffer[end_offset + 1 :]
yield new_text
if token_buffer:
decode_res = self.tokenizer.decode(token_buffer, skip_special_tokens=True)
assert type(decode_res) is str
yield decode_res
def _generate_postprocess(
self,
native_outputs,
prompts: Union[str, List[str]],
prompt_token_ids: BatchEncoding,
) -> Union[RequestOutput, List[RequestOutput]]:
# Convert one prompt and multiple generated sequences into a RequestOutput
def convert(prompt: str, prompt_token_ids: List[int], request_output):
outputs = []
prompt_logprobs = None
for output in request_output.outputs:
text = self.tokenizer.decode(
output.token_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True
)
assert type(text) is str
logprobs = None
if output.logprobs is not None:
# output: NativeCompletionOutput (not CompletionOutput)
# output.logprobs: List[List[Tuple[int, Logprob]]]
# token_id_to_logprob: List[Tuple[int, Logprob]]
logprobs = [
{
token_id: Logprob(logprob.logprob, logprob.rank, logprob.decoded_token)
for token_id, logprob in token_id_to_logprob
}
for token_id_to_logprob in output.logprobs
]
# Convert prompt_logprobs from native format
if output.prompt_logprobs is not None and prompt_logprobs is None:
prompt_logprobs = [
(
{
token_id: Logprob(
logprob.logprob, logprob.rank, logprob.decoded_token
)
for token_id, logprob in token_id_to_logprob
}
if token_id_to_logprob is not None
else None
)
for token_id_to_logprob in output.prompt_logprobs
]
outputs.append(
CompletionOutput(
output.index, text, output.token_ids, logprobs, output.finish_reason
)
)
return RequestOutput(
request_id=uuid.uuid4().__str__(),
prompt=prompt,
prompt_token_ids=prompt_token_ids,
prompt_logprobs=prompt_logprobs,
outputs=outputs,
finished=True,
num_cached_tokens=request_output.num_cached_tokens or 0,
)
if isinstance(native_outputs, list):
assert isinstance(prompts, list)
return [
convert(req[0], req[1], req[2])
for req in zip(prompts, prompt_token_ids.input_ids, native_outputs) # type: ignore
]
else:
assert isinstance(prompts, str)
return convert(prompts, prompt_token_ids.input_ids, native_outputs) # type: ignore
# XXX(n0gu):
# More pooling APIs should be implemented - classify, reward, score.
# However as of 2025.11 only embed API will be used; We support Qwen3-Reranker,
# but this model uses slightly different scoring logic, thus not supported by vLLM's LLM.score() too.
# See: https://huggingface.co/Qwen/Qwen3-Reranker-0.6B#vllm-usage
[docs]
def encode(
self,
prompts: Union[PromptType, Sequence[PromptType]],
pooling_params: Optional[Union[PoolingParams, Sequence[PoolingParams]]] = None,
*,
pooling_task: Optional[PoolingTask] = None,
) -> List[PoolingRequestOutput]:
"""
Apply pooling to the hidden states corresponding to the input prompts.
Args:
prompts: The prompts to the LLM. You may pass a sequence of prompts
for batch inference.
pooling_params: The pooling parameters for pooling.
pooling_task: Override the pooling task to use.
Returns:
A list of `PoolingRequestOutput` objects containing the
pooled hidden states in the same order as the input prompts.
"""
task_type_from_model = self.model_metadata.task
if task_type_from_model not in POOLING_TASKS:
raise ValueError("Pooling API is not supported by this model.")
if not isinstance(prompts, list):
prompt_list = [cast(PromptType, prompts)]
else:
prompt_list = prompts
coroutines = []
prompt_token_ids = []
for i, prompt in enumerate(prompt_list):
param: PoolingParams
if isinstance(pooling_params, (list, tuple)):
param = pooling_params[i]
elif pooling_params is None:
param = PoolingParams()
elif isinstance(pooling_params, PoolingParams):
param = pooling_params
else:
raise TypeError(
f"pooling_params must be PoolingParams, Sequence[PoolingParams], or None, "
f"got {type(pooling_params)}"
)
batch_encoding, _ = preprocess_prompt(prompt, self.tokenizer)
# apply truncation if specified
apply_prompt_truncation(
batch_encoding,
param.truncate_prompt_tokens,
self.prompt_max_seq_len,
)
# Set pooling task by precedence (highest to lowest).
# 1. Use the `pooling_task` argument if provided.
# 2. Otherwise, use the task already set in `params.task`.
# 3. If neither is set, infer the task from the model metadata.
from furiosa_llm.utils import coalesce
param.task = coalesce(pooling_task, param.task, cast(PoolingTask, task_type_from_model))
assert param.task is not None, "pooling task must be set at this point."
coroutines.append(
self.engine.encode(
batch_encoding,
param,
None, # TODO: set request id
)
)
prompt_token_ids.append(batch_encoding.input_ids)
request_id = uuid.uuid4().__str__()
from furiosa_llm.utils import async_gather, run_sync
native_outputs_list = run_sync(async_gather(*coroutines))
return [
PoolingRequestOutput(
request_id=request_id,
prompt_token_ids=prompt_token_ids[i],
outputs=PoolingOutput(data=torch.Tensor(native_outputs[0].data)),
finished=True,
)
for i, native_outputs in enumerate(native_outputs_list)
]
[docs]
def embed(
self,
prompts: Union[PromptType, Sequence[PromptType]],
pooling_params: Optional[Union[PoolingParams, Sequence[PoolingParams]]] = None,
) -> List[EmbeddingRequestOutput]:
"""
Generate an embedding vector for each prompt. Only applicable to embedding models.
Args:
prompts: The prompts to the LLM. You may pass a sequence of prompts
for batch embedding.
pooling_params: The pooling parameters for pooling.
Returns:
A list of `EmbeddingRequestOutput` objects containing the
embedding vectors in the same order as the input prompts.
"""
if "embed" != self.model_metadata.task:
raise ValueError("Embedding API is not supported by this model.")
items = self.encode(
prompts,
pooling_params=pooling_params,
pooling_task="embed",
)
return [EmbeddingRequestOutput.from_base(item) for item in items]
[docs]
def score(
self,
data_1: PromptType | Sequence[PromptType],
data_2: PromptType | Sequence[PromptType],
/,
*,
truncate_prompt_tokens: int | None = None,
pooling_params: PoolingParams | None = None,
chat_template: str | None = None,
) -> list[ScoringRequestOutput]:
"""Generate similarity scores for all pairs `<text,text_pair>`.
The inputs can be `1 -> 1`, `1 -> N` or `N -> N`.
In the `1 - N` case the `data_1` input will be replicated `N`
times to pair with the `data_2` inputs.
Args:
data_1: Can be a single prompt or a list of prompts.
When a list, it must have the same length as the `data_2` list.
data_2: The data to pair with the query to form the input to
the LLM.
truncate_prompt_tokens: The number of tokens to truncate the prompt to.
pooling_params: The pooling parameters for pooling. If None, we
use the default pooling parameters.
chat_template: The chat template to use for the scoring. If None, we
use the model's default chat template.
Returns:
A list of `ScoringRequestOutput` objects containing the
generated scores in the same order as the input prompts.
"""
# XXX: As of 2026.1, this implementation supports only models converted via as_binary_seq_cls_model.
model_metadata = self.model_metadata
if model_metadata.task != "score" and not model_metadata.use_binary_seq_class:
raise ValueError("LLM.score() is only supported for binary classification models.")
# Validate inputs and create pairs
# Convert single prompts to lists for uniform processing
is_data_1_list = isinstance(data_1, list)
is_data_2_list = isinstance(data_2, list)
# Normalize inputs to List[PromptType]
data_1_list: list[PromptType]
data_2_list: list[PromptType]
if not is_data_1_list and not is_data_2_list:
# 1 -> 1 case
data_1_list = [cast(PromptType, data_1)]
data_2_list = [cast(PromptType, data_2)]
elif not is_data_1_list and is_data_2_list:
# 1 -> N case: replicate data_1
data_2_list = cast(List[PromptType], data_2)
data_1_list = [cast(PromptType, data_1)] * len(data_2_list)
elif is_data_1_list and is_data_2_list:
# N -> N case: must have same length
data_1_list = cast(List[PromptType], data_1)
data_2_list = cast(List[PromptType], data_2)
if len(data_1_list) != len(data_2_list):
raise ValueError(
f"When both data_1 and data_2 are lists, they must have the same length. "
f"Got {len(data_1_list)} and {len(data_2_list)}."
)
else:
# data_1 is list, data_2 is not - this is not a standard case for scoring
raise ValueError(
"Invalid input combination. data_1 is a list but data_2 is not. "
"Expected patterns: (single, single), (single, list), or (list, list)."
)
# Normalize inputs to list[str]
data_1_strs: list[str] = [prompt_to_str(d, self.tokenizer) for d in data_1_list]
data_2_strs: list[str] = [prompt_to_str(d, self.tokenizer) for d in data_2_list]
# Construct prompts for each pair
prompts: list[PromptType] = []
for str_1, str_2 in zip(data_1_strs, data_2_strs):
_, prompt = get_score_prompt(
self.tokenizer,
str_1,
str_2,
score_template=chat_template,
)
prompts.append(prompt)
# Set up pooling parameters with truncation if specified
if pooling_params is None:
pooling_params = PoolingParams()
if truncate_prompt_tokens is not None:
pooling_params.truncate_prompt_tokens = truncate_prompt_tokens
# Call encode with the constructed prompts
items = self.encode(
prompts,
pooling_params=pooling_params,
pooling_task="score",
)
return [ScoringRequestOutput.from_base(item) for item in items]
[docs]
def shutdown(self):
"""Shutdown the LLM engine gracefully."""
if hasattr(self, "engine"):
self.engine.shutdown()
def __del__(self):
self.shutdown()
# Remove tmp directory if exists.
tmp_dir = getattr(self, "tmp_dir", None)
if tmp_dir is not None:
tmp_dir.cleanup()
@cached_property
def model_max_seq_len(self) -> int:
possible_keys = [
# OPT, LLaMA, BERT
"max_position_embeddings",
# GPT-2, GPT-J
"n_positions",
# MPT
"max_seq_len",
# ChatGLM2
"seq_length",
# Command-R
"model_max_length",
# Others
"max_sequence_length",
"max_seq_length",
"seq_len",
]
for attr_name in possible_keys:
if attr_name in self.model_config:
model_max_seq_len = self.model_config[attr_name]
break
else:
# If none of the keys were found in the config, use a default and
# log a warning.
default_max_len = 2048
model_max_seq_len = default_max_len
logger.warning(
"The model's config.json does not contain any of the following "
"keys to determine the original maximum length of the model: "
"%s. Assuming the model's maximum length is %d.",
possible_keys,
default_max_len,
)
return model_max_seq_len