Source code for furiosa_llm.api

from functools import cached_property
import logging
import os
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    AsyncGenerator,
    Dict,
    List,
    Literal,
    Optional,
    Sequence,
    Union,
    cast,
)

from pydantic_core import to_json

from furiosa.native_llm_common import NextGenArtifact
from furiosa.native_runtime.llm import NativeLLMEngine
from furiosa_llm.utils import (
    async_gather,
    coalesce,
    compute_bucket_lengths,
    resolve_artifact_path,
    run_sync,
)
from furiosa_llm.vllm_compat import (
    PromptType,
    apply_prompt_truncation,
    get_score_prompt,
    preprocess_prompt,
    prompt_to_str,
)

if TYPE_CHECKING:

    from tests.utils import FakeNativeLLMEngine

import uuid

from openai.types.chat import ChatCompletionMessageParam
import torch
from transformers.tokenization_utils import PreTrainedTokenizer
from transformers.tokenization_utils_base import BatchEncoding
from transformers.tokenization_utils_fast import PreTrainedTokenizerFast

from furiosa_llm.models.tasks import POOLING_TASKS, PoolingTask
from furiosa_llm.parallelize.pipeline.types import Device
from furiosa_llm.server.utils import is_list_of
from furiosa_llm.version import FURIOSA_LLM_VERSION

from .device import resolve_devices
from .models.config_types import SchedulerConfig
from .outputs import (
    CompletionOutput,
    EmbeddingRequestOutput,
    Logprob,
    PoolingOutput,
    PoolingRequestOutput,
    RequestOutput,
    ScoringRequestOutput,
)
from .sampling_params import PoolingParams, SamplingParams
from .tokenizer import encode_auto, get_tokenizer
from .utils import get_logger_with_tz

logger = get_logger_with_tz(logging.getLogger(__name__))

# Default index of the padding block when paged attention model is used.
DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX = 0

CACHE_DIR: Path = Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "furiosa" / "llm"

TokenizerModeType = Literal["auto", "slow"]
ChatTemplateContentFormatOption = Literal["string"]

RAY_LOG_PREFIX = "[furiosa-llm]"

STREAMING_MAX_DECODE_TRIAL = 2


[docs] class LLM: """An LLM for generating texts from given prompts and sampling parameters.""" max_seq_len_to_capture: Optional[int] engine: Union[NativeLLMEngine, "FakeNativeLLMEngine"]
[docs] @classmethod def load_artifact( cls, model_id_or_path: Union[str, os.PathLike], **kwargs, ) -> "LLM": """Deprecated: Use LLM() constructor directly. This method is kept for backward compatibility and will be removed in a future release. """ import warnings warnings.warn( "LLM.load_artifact() is deprecated. Use LLM() constructor directly.", DeprecationWarning, stacklevel=2, ) return cls(model_id_or_path, **kwargs)
def __init__( self, model_id_or_path: Union[str, os.PathLike], *, # Repo Configuration revision: Optional[str] = None, # Runtime Configuration devices: Optional[Union[str, Sequence[Device]]] = None, data_parallel_size: Optional[int] = None, pipeline_parallel_size: Optional[int] = None, num_blocks_per_pp_stage: Optional[Sequence[int]] = None, max_io_memory_mb: int = 2048, # Pipeline selection related Configs max_model_len: Optional[int] = None, # TODO: limit model max length via bucket overrides scheduler_config: Optional[SchedulerConfig] = None, # Guided-decoding related Configuration guided_decoding_backend: Literal["auto", "guidance", "xgrammar"] = "auto", # Other Configuration tokenizer: Optional[Union[str, PreTrainedTokenizer, PreTrainedTokenizerFast]] = None, tokenizer_mode: TokenizerModeType = "auto", seed: Optional[int] = None, cache_dir: os.PathLike = CACHE_DIR, skip_engine: bool = False, enable_jit_wiring: bool = False, **kwargs, ) -> None: """Instantiate LLM from saved artifacts without quantization and compilation. Args: model_id_or_path: A path to furiosa llm engine artifact or a HuggingFace model id. revision: The revision of the model, if `model_id_or_path` is a HuggingFace model id. devices: The devices to run the model. It can be a single device or a list of devices. Each device can be either "npu:X" or "npu:X:\\*" where X is a specific device index. If not given, all available devices will be used. data_parallel_size: The size of the data parallelism group. If not given, it will be inferred from total available PEs and other parallelism degrees. pipeline_parallel_size: The size of the pipeline parallelism. num_blocks_per_pp_stage: The number of transformer blocks per each pipeline parallelism stage. If only `pipeline_parallel_size` is provided, transformer blocks will be distributed equally. max_io_memory_mb: Maximum NPU memory to be used as I/O tensor, which can range from 0 to 48GB. If unspecified, will use the default value of 2048. max_model_len: The maximum context length to use. If given, decode buckets with attention size larger than this value will be ignored. scheduler_config: Configuration for the scheduler, allowing to maximum number of tasks which can be queued to HW, maximum number of samples that can be processed by the scheduler, and ratio of spare blocks that are reserved by scheduler. If this is not given, scheduler config saved in the artifacts will be used. guided_decoding_backend: The backend for guided decoding. "auto" will automatically select the best backend based on the model. "guidance" will use the guidance library. "xgrammar" will use the xgrammar library. tokenizer: The name or path of a HuggingFace Transformers tokenizer. tokenizer_mode: The tokenizer mode. "auto" will use the fast tokenizer if available, and "slow" will always use the slow tokenizer. seed: The seed to initialize the random number generator for sampling. cache_dir: The cache directory for all generated files for this LLM instance. When its value is ``None``, caching is disabled. The default is "$HOME/.cache/furiosa/llm". skip_engine: If True, the native runtime engine will not be initialized. This is useful when you need the pipelines for other purposes than running them with the engine. enable_jit_wiring: If True, JIT wiring will be enabled. """ artifact_path = resolve_artifact_path(model_id_or_path, revision, FURIOSA_LLM_VERSION) tokenizer = get_tokenizer(artifact_path, tokenizer, tokenizer_mode=tokenizer_mode) artifact = NextGenArtifact.load_without_blob(artifact_path) model_metadata = artifact.model.model_metadata devices = resolve_devices(devices, artifact.model.parallel_config.tensor_parallel_size) artifact.override_with( artifact_path, devices, data_parallel_size, pipeline_parallel_size, num_blocks_per_pp_stage, None, # speculative_model None, # speculative_draft_data_parallel_size None, # speculative_draft_pipeline_parallel_size None, # speculative_draft_num_blocks_per_pp_stage False, # skip_speculative_model_load str(cache_dir) if cache_dir else None, ) max_prefill_bucket_len, max_decode_bucket_len = compute_bucket_lengths( artifact.model.pipeline_metadata_list ) _max_model_len = max(max_decode_bucket_len, max_prefill_bucket_len) # Set instance attributes self.model_metadata = model_metadata self.model_config = model_metadata.config_dict self.artifact_id = artifact.metadata.artifact_id self.is_generative_model = model_metadata.is_generative_model self.pipeline_metadata = artifact.model.pipeline_metadata_list self.parallel_config = artifact.model.parallel_config self.max_seq_len_to_capture = _max_model_len self.prompt_max_seq_len = max_prefill_bucket_len self.tokenizer = tokenizer # Fields only used for testing and debugging purpose. self.pipelines = artifact.model.pipelines # type: ignore[assignment] if not skip_engine: self.engine = NativeLLMEngine( artifact_path, devices, data_parallel_size, max_io_memory_mb, self._serialize_obj(scheduler_config or SchedulerConfig()), guided_decoding_backend, tokenizer.backend_tokenizer.to_str(), enable_jit_wiring, str(cache_dir) if cache_dir else None, ) @classmethod def _serialize_obj( cls, obj: Any, ) -> str: return to_json(obj).decode("utf-8")
[docs] def generate( self, prompts: Union[str, List[str]], sampling_params: SamplingParams = SamplingParams(), prompt_token_ids: Optional[BatchEncoding] = None, tokenizer_kwargs: Optional[Dict[str, Any]] = None, ) -> Union[RequestOutput, List[RequestOutput]]: """Generate texts from given prompts and sampling parameters. Args: prompts: The prompts to generate texts. sampling_params: The sampling parameters for generating texts. prompt_token_ids: Pre-tokenized prompt input as a `BatchEncoding` object. If not provided, the prompt will be tokenized internally using the tokenizer. tokenizer_kwargs: Additional keyword arguments passed to the tokenizer's `encode` method, such as `{"use_special_tokens": True}`. Returns: A list of `RequestOutput` objects containing the generated completions in the same order as the input prompts. """ if not self.is_generative_model: raise ValueError("generate API can only be used for generative models.") if prompt_token_ids is None: if tokenizer_kwargs is None: tokenizer_kwargs = {} prompt_token_ids = encode_auto(self.tokenizer, prompts, **tokenizer_kwargs) input_ids = prompt_token_ids.input_ids if input_ids and isinstance(input_ids[0], list): longest_prompt_len = max(len(prompt) for prompt in input_ids) else: longest_prompt_len = len(input_ids) assert ( self.max_seq_len_to_capture is not None ), "Generative models must have max_seq_len_to_capture set." sampling_params.verify_and_finalize_max_tokens( longest_prompt_len, self.prompt_max_seq_len, self.max_seq_len_to_capture ) native_outputs = self.engine.generate(prompt_token_ids, sampling_params) return self._generate_postprocess(native_outputs, prompts, prompt_token_ids)
[docs] def chat( self, messages: Union[List[ChatCompletionMessageParam], List[List[ChatCompletionMessageParam]]], sampling_params: SamplingParams = SamplingParams(), chat_template: Optional[str] = None, chat_template_content_format: ChatTemplateContentFormatOption = "string", add_generation_prompt: bool = True, continue_final_message: bool = False, tools: Optional[List[Dict[str, Any]]] = None, chat_template_kwargs: Optional[Dict[str, Any]] = None, ) -> List[RequestOutput]: """ Generate responses for a chat conversation. The chat conversation is converted into a text prompt using the tokenizer and calls the :meth:`generate` method to generate the responses. Args: messages: A list of conversations or a single conversation. - Each conversation is represented as a list of messages. - Each message is a dictionary with 'role' and 'content' keys. sampling_params: The sampling parameters for text generation. chat_template: The template to use for structuring the chat. If not provided, the model's default chat template will be used. chat_template_content_format: The format to render message content. Currently only "string" is supported. add_generation_prompt: If True, adds a generation template to each message. continue_final_message: If True, continues the final message in the conversation instead of starting a new one. Cannot be ``True`` if ``add_generation_prompt`` is also ``True``. tools: Optional list of tools to use in the chat. chat_template_kwargs: Additional keyword arguments to pass to the chat template rendering function. Returns: A list of ``RequestOutput`` objects containing the generated responses in the same order as the input messages. """ if continue_final_message and add_generation_prompt: raise ValueError( "continue_final_message cannot be True when add_generation_prompt is True." ) messages_list: List[List[ChatCompletionMessageParam]] if is_list_of(messages, list): messages_list = cast(List[List[ChatCompletionMessageParam]], messages) else: messages_list = [cast(List[ChatCompletionMessageParam], messages)] _chat_template_kwargs: dict[str, Any] = dict( chat_template=chat_template, add_generation_prompt=add_generation_prompt, continue_final_message=continue_final_message, tools=tools, ) _chat_template_kwargs.update(chat_template_kwargs or {}) rendered_prompts = self.tokenizer.apply_chat_template( messages_list, # type: ignore[arg-type] tokenize=False, **_chat_template_kwargs, ) return self.generate( rendered_prompts, sampling_params, tokenizer_kwargs={"add_special_tokens": False} # type: ignore )
[docs] async def stream_generate( self, prompt: str, sampling_params: SamplingParams = SamplingParams(), prompt_token_ids: Optional[BatchEncoding] = None, tokenizer_kwargs: Optional[Dict[str, Any]] = None, is_demo: bool = False, ) -> AsyncGenerator[str, None]: """Generate texts from given prompt and sampling parameters. Args: prompt: The prompt to generate texts. Note that unlike `generate`, this API supports only a single prompt. sampling_params: The sampling parameters for generating texts. prompt_token_ids: Pre-tokenized prompt input as a `BatchEncoding` object. If not provided, the prompt will be tokenized internally using the tokenizer. tokenizer_kwargs: Additional keyword arguments passed to the tokenizer's `encode` method, such as `{"use_special_tokens": True}`. Returns: A stream of generated output tokens. """ try: from furiosa.native_runtime.llm import NativeRequestOutput except ImportError: pass if not self.is_generative_model: raise ValueError("generate API can only be used for generative models.") if not isinstance(prompt, str): raise ValueError("prompt must be a single string.") if prompt_token_ids is None: if tokenizer_kwargs is None: tokenizer_kwargs = {} prompt_token_ids = encode_auto(self.tokenizer, prompt, **tokenizer_kwargs) input_ids = prompt_token_ids.input_ids if input_ids and isinstance(input_ids[0], list): max_prompt_len = max(len(prompt) for prompt in input_ids) else: max_prompt_len = len(input_ids) assert ( self.max_seq_len_to_capture is not None ), "Generative models must have max_seq_len_to_capture set." sampling_params.verify_and_finalize_max_tokens( max_prompt_len, self.prompt_max_seq_len, self.max_seq_len_to_capture ) # FIXME: LLM.__init__() should take max_tokens to determine the maximum sequence length through bucket generations # and use the config value to raise an error. if is_demo and len(prompt_token_ids.input_ids) > 1024: # type: ignore raise ValueError("The length of the prompt is larger than 1024 tokens") # NOTE: type of engine.stream_generate() is AsyncGenerator[RequestOutput, None] token_buffer: List[int] = [] request_output: Union[RequestOutput, NativeRequestOutput] async for request_output in self.engine.stream_generate(prompt_token_ids, sampling_params): num_decode_trials = STREAMING_MAX_DECODE_TRIAL for completion_output in request_output.outputs: token_buffer.extend(completion_output.token_ids) num_decode_trials = min(num_decode_trials, len(completion_output.token_ids)) if num_decode_trials == 0: continue for tokens_to_discard in range(num_decode_trials): end_offset = len(token_buffer) - 1 - tokens_to_discard new_text = self.tokenizer.decode( token_buffer[: end_offset + 1], skip_special_tokens=True ) if not new_text.endswith("�"): break else: continue token_buffer = token_buffer[end_offset + 1 :] yield new_text if token_buffer: yield self.tokenizer.decode(token_buffer, skip_special_tokens=True)
def _generate_postprocess( self, native_outputs, prompts: Union[str, List[str]], prompt_token_ids: BatchEncoding, ) -> Union[RequestOutput, List[RequestOutput]]: # Convert one prompt and multiple generated sequences into a RequestOutput def convert(prompt: str, prompt_token_ids: List[int], request_output): outputs = [] prompt_logprobs = None for output in request_output.outputs: text = self.tokenizer.decode( output.token_ids, skip_special_tokens=True, clean_up_tokenization_spaces=True ) logprobs = None if output.logprobs is not None: # output: NativeCompletionOutput (not CompletionOutput) # output.logprobs: List[List[Tuple[int, Logprob]]] # token_id_to_logprob: List[Tuple[int, Logprob]] logprobs = [ { token_id: Logprob(logprob.logprob, logprob.rank, logprob.decoded_token) for token_id, logprob in token_id_to_logprob } for token_id_to_logprob in output.logprobs ] # Convert prompt_logprobs from native format if output.prompt_logprobs is not None and prompt_logprobs is None: prompt_logprobs = [ ( { token_id: Logprob( logprob.logprob, logprob.rank, logprob.decoded_token ) for token_id, logprob in token_id_to_logprob } if token_id_to_logprob is not None else None ) for token_id_to_logprob in output.prompt_logprobs ] outputs.append( CompletionOutput( output.index, text, output.token_ids, logprobs, output.finish_reason ) ) return RequestOutput( request_id=uuid.uuid4().__str__(), prompt=prompt, prompt_token_ids=prompt_token_ids, prompt_logprobs=prompt_logprobs, outputs=outputs, finished=True, ) if isinstance(native_outputs, list): assert isinstance(prompts, list) return [ convert(req[0], req[1], req[2]) for req in zip(prompts, prompt_token_ids.input_ids, native_outputs) # type: ignore ] else: assert isinstance(prompts, str) return convert(prompts, prompt_token_ids.input_ids, native_outputs) # type: ignore # XXX(n0gu): # More pooling APIs should be implemented - classify, reward, score. # However as of 2025.11 only embed API will be used; We support Qwen3-Reranker, # but this model uses slightly different scoring logic, thus not supported by vLLM's LLM.score() too. # See: https://huggingface.co/Qwen/Qwen3-Reranker-0.6B#vllm-usage
[docs] def encode( self, prompts: Union[PromptType, Sequence[PromptType]], pooling_params: Optional[Union[PoolingParams, Sequence[PoolingParams]]] = None, *, pooling_task: Optional[PoolingTask] = None, ) -> List[PoolingRequestOutput]: """ Apply pooling to the hidden states corresponding to the input prompts. Args: prompts: The prompts to the LLM. You may pass a sequence of prompts for batch inference. pooling_params: The pooling parameters for pooling. pooling_task: Override the pooling task to use. Returns: A list of `PoolingRequestOutput` objects containing the pooled hidden states in the same order as the input prompts. """ task_type_from_model = self.model_metadata.task_type if task_type_from_model not in POOLING_TASKS: raise ValueError("Pooling API is not supported by this model.") if not isinstance(prompts, list): prompt_list = [cast(PromptType, prompts)] else: prompt_list = prompts coroutines = [] prompt_token_ids = [] for i, prompt in enumerate(prompt_list): param: PoolingParams if isinstance(pooling_params, (list, tuple)): param = pooling_params[i] elif pooling_params is None: param = PoolingParams() elif isinstance(pooling_params, PoolingParams): param = pooling_params else: raise TypeError( f"pooling_params must be PoolingParams, Sequence[PoolingParams], or None, " f"got {type(pooling_params)}" ) batch_encoding, _ = preprocess_prompt(prompt, self.tokenizer) # apply truncation if specified apply_prompt_truncation( batch_encoding, param.truncate_prompt_tokens, self.prompt_max_seq_len, ) # Set pooling task by precedence (highest to lowest). # 1. Use the `pooling_task` argument if provided. # 2. Otherwise, use the task already set in `params.task`. # 3. If neither is set, infer the task from the model metadata. param.task = coalesce(pooling_task, param.task, cast(PoolingTask, task_type_from_model)) assert param.task is not None, "pooling task must be set at this point." coroutines.append( self.engine.encode( batch_encoding, param, None, # TODO: set request id ) ) prompt_token_ids.append(batch_encoding.input_ids) request_id = uuid.uuid4().__str__() native_outputs_list = run_sync(async_gather(*coroutines)) return [ PoolingRequestOutput( request_id=request_id, prompt_token_ids=prompt_token_ids[i], outputs=PoolingOutput(data=torch.Tensor(native_outputs[0].data)), finished=True, ) for i, native_outputs in enumerate(native_outputs_list) ]
[docs] def embed( self, prompts: Union[PromptType, Sequence[PromptType]], pooling_params: Optional[Union[PoolingParams, Sequence[PoolingParams]]] = None, ) -> List[EmbeddingRequestOutput]: """ Generate an embedding vector for each prompt. Only applicable to embedding models. Args: prompts: The prompts to the LLM. You may pass a sequence of prompts for batch embedding. pooling_params: The pooling parameters for pooling. Returns: A list of `EmbeddingRequestOutput` objects containing the embedding vectors in the same order as the input prompts. """ if "embed" != self.model_metadata.task_type: raise ValueError("Embedding API is not supported by this model.") items = self.encode( prompts, pooling_params=pooling_params, pooling_task="embed", ) return [EmbeddingRequestOutput.from_base(item) for item in items]
[docs] def score( self, data_1: PromptType | Sequence[PromptType], data_2: PromptType | Sequence[PromptType], /, *, truncate_prompt_tokens: int | None = None, pooling_params: PoolingParams | None = None, chat_template: str | None = None, ) -> list[ScoringRequestOutput]: """Generate similarity scores for all pairs `<text,text_pair>`. The inputs can be `1 -> 1`, `1 -> N` or `N -> N`. In the `1 - N` case the `data_1` input will be replicated `N` times to pair with the `data_2` inputs. Args: data_1: Can be a single prompt or a list of prompts. When a list, it must have the same length as the `data_2` list. data_2: The data to pair with the query to form the input to the LLM. truncate_prompt_tokens: The number of tokens to truncate the prompt to. pooling_params: The pooling parameters for pooling. If None, we use the default pooling parameters. chat_template: The chat template to use for the scoring. If None, we use the model's default chat template. Returns: A list of `ScoringRequestOutput` objects containing the generated scores in the same order as the input prompts. """ # XXX: As of 2026.1, this implementation supports only models converted via as_binary_seq_cls_model. model_metadata = self.model_metadata if model_metadata.task_type != "score" and not model_metadata.use_binary_seq_class: raise ValueError("LLM.score() is only supported for binary classification models.") # Validate inputs and create pairs # Convert single prompts to lists for uniform processing is_data_1_list = isinstance(data_1, list) is_data_2_list = isinstance(data_2, list) # Normalize inputs to List[PromptType] data_1_list: list[PromptType] data_2_list: list[PromptType] if not is_data_1_list and not is_data_2_list: # 1 -> 1 case data_1_list = [cast(PromptType, data_1)] data_2_list = [cast(PromptType, data_2)] elif not is_data_1_list and is_data_2_list: # 1 -> N case: replicate data_1 data_2_list = cast(List[PromptType], data_2) data_1_list = [cast(PromptType, data_1)] * len(data_2_list) elif is_data_1_list and is_data_2_list: # N -> N case: must have same length data_1_list = cast(List[PromptType], data_1) data_2_list = cast(List[PromptType], data_2) if len(data_1_list) != len(data_2_list): raise ValueError( f"When both data_1 and data_2 are lists, they must have the same length. " f"Got {len(data_1_list)} and {len(data_2_list)}." ) else: # data_1 is list, data_2 is not - this is not a standard case for scoring raise ValueError( "Invalid input combination. data_1 is a list but data_2 is not. " "Expected patterns: (single, single), (single, list), or (list, list)." ) # Normalize inputs to list[str] data_1_strs: list[str] = [prompt_to_str(d, self.tokenizer) for d in data_1_list] data_2_strs: list[str] = [prompt_to_str(d, self.tokenizer) for d in data_2_list] # Construct prompts for each pair prompts: list[PromptType] = [] for str_1, str_2 in zip(data_1_strs, data_2_strs): _, prompt = get_score_prompt( self.tokenizer, str_1, str_2, score_template=chat_template, ) prompts.append(prompt) # Set up pooling parameters with truncation if specified if pooling_params is None: pooling_params = PoolingParams() if truncate_prompt_tokens is not None: pooling_params.truncate_prompt_tokens = truncate_prompt_tokens # Call encode with the constructed prompts items = self.encode( prompts, pooling_params=pooling_params, pooling_task="score", ) return [ScoringRequestOutput.from_base(item) for item in items]
[docs] def shutdown(self): """Shutdown the LLM engine gracefully.""" if hasattr(self, "engine"): self.engine.shutdown()
def __del__(self): self.shutdown() # Remove tmp directory if exists. tmp_dir = getattr(self, "tmp_dir", None) if tmp_dir is not None: tmp_dir.cleanup() @cached_property def model_max_seq_len(self) -> int: possible_keys = [ # OPT, LLaMA, BERT "max_position_embeddings", # GPT-2, GPT-J "n_positions", # MPT "max_seq_len", # ChatGLM2 "seq_length", # Command-R "model_max_length", # Others "max_sequence_length", "max_seq_length", "seq_len", ] for attr_name in possible_keys: if attr_name in self.model_config: model_max_seq_len = self.model_config[attr_name] break else: # If none of the keys were found in the config, use a default and # log a warning. default_max_len = 2048 model_max_seq_len = default_max_len logger.warning( "The model's config.json does not contain any of the following " "keys to determine the original maximum length of the model: " "%s. Assuming the model's maximum length is %d.", possible_keys, default_max_len, ) return model_max_seq_len