import copy
from functools import cached_property
import glob
from itertools import chain, product
import json
import logging
import os
from pathlib import Path
from pprint import pformat
import tempfile
from typing import (
Any,
AsyncGenerator,
Dict,
List,
Literal,
Mapping,
Optional,
Sequence,
Tuple,
Type,
Union,
cast,
get_args,
)
import uuid
from furiosa_torch_ext.torch_ext import preprocess
import pydantic
from pydantic import RootModel
from pydantic.dataclasses import dataclass
from torch._subclasses import FakeTensorMode
import torch.distributed._tensor.ops.common_rules
from torch.fx import GraphModule
from torch.fx.passes.shape_prop import ShapeProp
import transformers
from transformers import (
MODEL_FOR_CAUSAL_LM_MAPPING,
BatchEncoding,
PretrainedConfig,
PreTrainedModel,
PreTrainedTokenizer,
PreTrainedTokenizerFast,
set_seed,
)
import yaml
from furiosa_llm.optimum.modeling import get_mapped_class_for_optimization
from furiosa_llm.parallelize.export.graphmodule import deserialize_gm
from furiosa_llm.parallelize.model_creation_info import ModelCreationInfo
from furiosa_llm.parallelize.mppp.api import PipelineParallelismMppp
from furiosa_llm.parallelize.mppp.config import DeviceId
from furiosa_llm.parallelize.pipeline.builder.converter import GraphModuleConverter
from furiosa_llm.parallelize.pipeline.types import (
CompSuperTask,
DataBlobId,
Device,
ParamInfo,
SuperTaskKind,
TensorInfo,
load_partial_param,
)
from furiosa_llm.parallelize.trace import get_param_file_with_cache
from .artifact.helper import (
build_pipelines,
get_buckets_with_output_logits_size,
get_pipeline_metadata,
override_pp_size_on_pipeline,
prestep_for_remote_code_model,
)
from .artifact.types import Artifact, ModelMetadataForArtifact, RuntimeConfig
from .device import get_device_mesh, parse_devices_str
from .models.config_types import (
Bucket,
BucketConfig,
BucketWithOutputLogitsSize,
DeprecatedBucket,
GeneratorConfig,
KvCacheSharingAcrossBeamsConfig,
LLMBackend,
ManualBucketConfig,
MinimalBucketConfig,
ModelRewritingConfig,
PagedAttentionConfig,
ParallelConfig,
PipelineMetadata,
PipelineWithMetadata,
SchedulerConfig,
)
from .models.metadata import LLMConfig, ModelMetadata, get_model_cls_from_pretrained_id
from .optimum import AttentionType, OptimizationConfig, QDtype, QuantizationConfig
from .optimum.types import get_kv_cache_dtype_from_qformat
from .outputs import CompletionOutput, RequestOutput
from .parallelize.compiler_config import CompilerConfigContext
from .parallelize.pipeline import Pipeline
from .sampling_params import SamplingParams
from .tokenizer import encode_auto, get_tokenizer
from .utils import get_list_with_no_dup_with_order_preserved, get_logger_with_tz, zip_equal
logger = get_logger_with_tz(logging.getLogger(__name__))
# Default position id for padding
_POSITION_ID_PAD = 1
# Default param file name
_HF_CAUSAL_LM_CLASS_NAMES = set(
model_cls.__name__ for model_cls in MODEL_FOR_CAUSAL_LM_MAPPING.values()
)
# Default index of the padding block when paged attention model is used.
DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX = 0
CACHE_DIR = Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "furiosa" / "llm"
TokenizerModeType = Literal["auto", "slow"]
RAY_LOG_PREFIX = "[furiosa-llm]"
STREAMING_MAX_DECODE_TRIAL = 2
# FIXME: remove this after deprecating old generator config format.
@dataclass
class DeprecatedGeneratorConfig:
position_id_pad: int
prefill_buckets: List[DeprecatedBucket]
decode_buckets: List[DeprecatedBucket]
model_qname: str # qualified name of the model (module + class)
paged_attention_config: Optional[PagedAttentionConfig]
packing_type: Literal["IDENTITY"]
kv_cache_sharing_across_beams_config: Optional[KvCacheSharingAcrossBeamsConfig]
scheduler_config: Optional[SchedulerConfig]
def export(self, path: Union[str, os.PathLike]):
with open(path, "w") as f:
f.write(RootModel[DeprecatedGeneratorConfig](self).model_dump_json(indent=4))
@classmethod
def load(cls, path: Union[str, os.PathLike]) -> "DeprecatedGeneratorConfig":
with open(path) as f:
o = json.load(f)
return DeprecatedGeneratorConfig(**o)
def _get_available_devices() -> List[Device]:
try:
import furiosa_smi_py # type: ignore[import-untyped]
from furiosa_smi_py import ( # type: ignore[import-not-found, import-untyped]
CoreStatus,
list_devices,
)
except ModuleNotFoundError:
raise ModuleNotFoundError("Install furiosa_smi_py to get available devices automatically.")
except Exception as e:
raise ImportError(f"Failed to import furiosa_smi_py with error {e}")
try:
furiosa_smi_py.init()
devices = list_devices()
available_devs = []
for device in devices:
# e.g. npu1
name = device.device_info().name()
if name[:3] != "npu":
raise RuntimeError("Unexpected device name: {name}")
npu_idx = int(name[3:])
core_status = device.core_status()
for core_id, status in core_status.items():
# Cannot be compared by reference (using "is") because `CoreStatus` is a Rust enum exposed with PyO3.
if status == CoreStatus.Available:
available_devs.append(Device(f"npu:{npu_idx}:{core_id}"))
return available_devs
except Exception as e:
raise RuntimeError(f"Failed to get available devices with errror {e}")
def _get_bucket_from_pipeline_name(pipeline_name: str) -> Bucket:
# Returns: tuple of (is_prefill, bucket)
# Possible pipeline name formats:
# * f"{model_name}-{mode}-b{bucket.batch_size}-attn{bucket.attention_size} (will be deprecated)
# * f"{model_name}-kv{bucket.kv_cache_size}-b{bucket.batch_size}-attn{bucket.attention_size}
_, mode_or_kv_cache_size, b_batch_size, attn_attn_size = pipeline_name.split("-")
batch_size = int(b_batch_size[1:])
attn_size = int(attn_attn_size[4:])
if mode_or_kv_cache_size == "prefill":
kv_size = 0
elif mode_or_kv_cache_size == "decode":
kv_size = attn_size - 1
else:
assert mode_or_kv_cache_size.startswith("kv")
kv_size = int(mode_or_kv_cache_size[2:])
return Bucket(batch_size, attn_size, kv_size)
[docs]
class LLM:
"""An LLM for generating texts from given prompts and sampling parameters.
Args:
model: The name of the pretrained model. This corresponds to
pretrained_model_name_or_path in HuggingFace Transformers.
task_type: The type of the task. This corresponds to task in HuggingFace Transformers.
See https://huggingface.co/docs/transformers/main/en/quicktour#pipeline for more
details.
llm_config: The configuration for the LLM. This includes quantization and optimization
configurations.
qformat_path: The path to the quantization format file.
qparam_path: The path to the quantization parameter file.
quant_ckpt_file_path: The path to the quantized parameters checkpoint file.
hf_overrides: Additional HuggingFace Transformers model configuration. This is a dictionary
that includes the configuration for the model.
bucket_config: Config for bucket generating policy. If not given, the model will use single one batch, `max_seq_len_to_capture` attention size bucket per
each phase.
speculative_model: Specualtive model for speculative decoding.
speculative_model_llm_config: The configuration for the specualtive model. This includes quantization and optimization
configurations.
speculative_model_qformat_path: The path to the quantization format file for the specualtive model.
speculative_model_qparam_path: The path to the quantization parameter file for the specualtive model.
speculative_model_quant_ckpt_file_path: The path to the quantized parameters checkpoint file for the specualtive model.
speculative_model_config: Additional HuggingFace Transformers model configuration for the specualtive model. This is a dictionary
that includes the configuration for the model.
speculative_model_bucket_config: Config for bucket generating policy. If not given, the model will use single one batch, `max_seq_len_to_capture` attention size bucket per
each phase.
speculative_model_paged_attention_num_blocks: The maximum number of blocks that each k/v storage per layer can store for the specualtive model. This argument must be given
if the specualtive model uses paged attention.
num_speculative_tokens: The number of tokens that specualtive model will generate speculatively during each iteration of the decoding process
max_seq_len_to_capture: Maximum sequence length covered by LLM engine. Sequence with larger context than this will not be covered.
The default is 2048.
max_prompt_len: Maximum prompt sequence length covered by LLM engine. Prompt larger than this cannot be handled.
If not given, will be obtained from bucket and other configs.
tensor_parallel_size: The number of PEs for each tensor parallelism group. The default is 4.
pipeline_parallel_size: The number of pipeline stages for pipeline parallelism. The default is 1,
which means no pipeline parallelism.
data_parallel_size: The size of the data parallelism group. If not given, it will be inferred from
total avaialble PEs and other parallelism degrees.
trust_remote_code: Trust remote code when downloading the model and tokenizer from HuggingFace.
tokenizer: The name or path of a HuggingFace Transformers tokenizer.
tokenizer_mode: The tokenizer mode. "auto" will use the fast tokenizer
if available, and "slow" will always use the slow tokenizer.
seed: The seed to initialize the random number generator for sampling.
devices: The devices to run the model. It can be a single device or a list of devices.
Each device can be either "npu:X" or "npu:X:\*" where X is a specific device index.
If not given, available devices will be used.
param_file_path: The path to the parameter file to use for pipeline generation.
If not specified, the parameters will be saved in a temporary file which will be
deleted when ``LLM`` is destroyed.
param_saved_format: The format of the parameter file. Only possible value is "safetensors" now.
The default is "safetensors".
do_decompositions_for_model_rewrite: Whether to decompose some ops to describe various parallelism strategies
with mppp config. When the value is True, mppp config that matches with the decomposed FX graph should be given.
comp_supertask_kind: The format that pipeline's supertasks will be represented as.
Possible values are "fx","dfg", and "edf", and the default is "edf".
cache_dir: The cache directory for all generated files for this LLM instance.
When its value is ``None``, caching is disabled. The default is "$HOME/.cache/furiosa/llm".
backend: The backend implementation to run forward() of a model for the LLM.
If not specified, the backend will be chosen based on the device kind.
use_blockwise_compile: If True, each task will be compiled in the unit of transformer block,
and compilation result for transformer block is generated once and reused. The default is ``True``.
num_blocks_per_supertask: The number of transformer blocks that will be merged into one supertask. This option is valid
only when `use_blockwise_compile=True`. The default is 1.
num_blocks_per_pp_stage: The number of transformers blocks per each pipeline parallelism stage. If not given, transformer blocks will be
distributed equally.
embed_all_constants_into_graph: Whether to embed constant tensors into graph or make them as input of the graph and save them as separate files.
The default is False.
paged_attention_num_blocks: The maximum number of blocks that each k/v storage per layer can store. This argument must be given
if model uses paged attention.
paged_attention_block_size: The maximum number of tokens that can be stored in a single paged attention block. This argument must be given
if model uses paged attention.
kv_cache_sharing_across_beams_config: Configuration for sharing kv cache across beams. This argument must be given if and only if
the model is optimized to share kv cache across beams. If this argument is given, decode phase buckets with batch size of
``batch_size`` \* ``kv_cache_sharing_across_beams_config.beam_width`` will be created.
scheduler_config: Configuration for the scheduler, allowing to maximum number of tasks which can be queued to HW, maximum number of samples
that can be processed by the scheduler, and ratio of spare blocks that are reserved by scheduler.
packing_type: Packing algorithm. Possible values are "IDENTITY" only for now
compiler_config_overrides: Overrides for the compiler config. This is a dictionary that includes the configuration for the compiler.
use_random_weight: If True, the model will be initialized with random weights.
num_pipeline_builder_workers: number of workers used for building pipelines (except for compilation). The default is 1 (no parallelism).
Setting this value larger than 1 reduces pipeline building time, especially for large models, but requires much more memory.
num_compile_workers: number of workers used for compilation. The default is 1 (no parallelism).
skip_engine: If True, the native runtime engine will not be initialized. This is useful when you need
the pipelines for other purposes than running them with the engine.
artifacts_export_path: The path to export the artifacts. With artifacts, you can create ``LLM`` without quantizing or compiling the model again.
"""
max_seq_len_to_capture: Optional[int]
def __init__(
self,
model: str,
task_type: Optional[str] = None,
llm_config: Optional[LLMConfig] = None,
qformat_path: Optional[os.PathLike] = None, # FIXME: move to quantization_config
qparam_path: Optional[os.PathLike] = None, # FIXME: move to quantization_config
quant_ckpt_file_path: Optional[os.PathLike] = None,
hf_overrides: Dict[str, Any] = {}, # aka hf_config
bucket_config: Optional[BucketConfig] = None,
speculative_model: Optional[Union[str, "LLM"]] = None,
speculative_model_llm_config: Optional[LLMConfig] = None,
speculative_model_qformat_path: Optional[os.PathLike] = None,
speculative_model_qparam_path: Optional[os.PathLike] = None,
speculative_model_quant_ckpt_file_path: Optional[os.PathLike] = None,
speculative_model_config: Dict[str, Any] = {},
speculative_model_bucket_config: Optional[BucketConfig] = None,
speculative_model_paged_attention_num_blocks: Optional[int] = None,
num_speculative_tokens: Optional[int] = None,
max_seq_len_to_capture: int = 2048,
max_prompt_len: Optional[int] = None,
tensor_parallel_size: int = 4,
pipeline_parallel_size: int = 1,
data_parallel_size: Optional[int] = None,
tokenizer: Optional[Union[PreTrainedTokenizer, PreTrainedTokenizerFast]] = None,
tokenizer_mode: TokenizerModeType = "auto",
trust_remote_code: bool = False,
seed: Optional[int] = None,
# TODO: change devices default value to None and get devices from furiosa-smi.
devices: Optional[Union[str, Sequence[Device]]] = None,
param_file_path: Optional[os.PathLike] = None,
param_saved_format: Literal["safetensors", "pt"] = "safetensors",
do_decompositions_for_model_rewrite: bool = False, # FIXME: move to compiler_config
comp_supertask_kind: Optional[Literal["edf", "dfg", "fx"]] = None,
cache_dir: Optional[os.PathLike] = CACHE_DIR,
backend: Optional[LLMBackend] = None,
use_blockwise_compile: bool = True, # FIXME: move to compiler_config
num_blocks_per_supertask: int = 1, # FIXME: move to compiler_config
num_blocks_per_pp_stage: Optional[Sequence[int]] = None,
embed_all_constants_into_graph: bool = False,
paged_attention_num_blocks: Optional[int] = None, # FIXME: move to compiler_config
paged_attention_block_size: int = 1, # FIXME: move to compiler_config
kv_cache_sharing_across_beams_config: Optional[
KvCacheSharingAcrossBeamsConfig
] = None, # FIXME: move to compiler_config / leave this in LLM attr ??
scheduler_config: SchedulerConfig = SchedulerConfig(),
packing_type: Literal["IDENTITY"] = "IDENTITY",
compiler_config_overrides: Optional[Mapping] = None,
use_random_weight: bool = False,
num_pipeline_builder_workers: int = 1,
num_compile_workers: int = 1,
skip_engine: bool = False,
artifacts_export_path: Optional[Union[str, os.PathLike]] = None,
*,
_cleanup: bool = True,
_pipelines_with_metadata: Optional[Sequence[PipelineWithMetadata]] = None,
_custom_buckets: Sequence[Union[Bucket, BucketWithOutputLogitsSize]] = [],
_optimize_logit_shape: bool = False,
_model_metadata: Optional[ModelMetadata] = None,
_unpadded_vocab_size: Optional[int] = None,
**kwargs,
):
optimize_paged_attention_block_loading = kwargs.pop(
"optimize_paged_attention_block_loading", True
)
sparse_select_version = kwargs.pop("sparse_select_version", "v1.5")
one_supertask_per_device = kwargs.pop("one_supertask_per_device", True)
if config := kwargs.pop("config", None):
logger.warning("`config` is deprecated. use `hf_overrides` instead.")
if hf_overrides:
raise ValueError("`config` and `hf_overrides` are given at the same time.")
hf_overrides = config
# Set seed in order to guarantee the reproducibility with the same seed number
if seed is not None:
set_seed(seed)
LLM.__verify_tokenizer_mode(tokenizer_mode)
# Set logging options for ray.
if "RAY_COLOR_PREFIX" not in os.environ:
os.environ["RAY_COLOR_PREFIX"] = "1"
if "RAY_DEDUP_LOGS_ALLOW_REGEX" not in os.environ:
# For not to dedup our info logs.
os.environ["RAY_DEDUP_LOGS_ALLOW_REGEX"] = f"INFO:*{RAY_LOG_PREFIX}*"
if devices is None:
devices = _get_available_devices()
logger.info(f"Device is not given, using available device: {devices}")
assert devices is not None
# Normalize the devices
if isinstance(devices, str):
devices = parse_devices_str(devices)
LLM.__verify_devices(devices)
if num_pipeline_builder_workers < 1:
raise ValueError("`num_pipeline_builder_workers` must be larger than 0")
if llm_config is None:
opt_config = self._get_default_opt_config_from_pretrained_id(model, trust_remote_code)
quant_config = QuantizationConfig.from_qformat(qformat_path) if qformat_path else None
llm_config = LLMConfig(opt_config, quant_config)
# To use speculative decoding, special model optimized for speculative decoding is needed.
if num_speculative_tokens:
llm_config = LLMConfig(
llm_config.optimization_config.with_optimizations(
{"optimized_for_speculative_decoding": True}
),
llm_config.quantization_config,
)
if _model_metadata:
self.model_metadata = _model_metadata
else:
self.model_metadata = ModelMetadata(
pretrained_id=model,
task_type=task_type,
llm_config=llm_config,
hf_configs=hf_overrides.copy(),
trust_remote_code=trust_remote_code,
)
self.model_config = self.model_metadata.config_dict
self.is_generative_model = self.model_metadata.is_generative_model
kv_cache_dtype = self.model_metadata.kv_cache_dtype
if max_seq_len_to_capture > self.model_max_seq_len:
raise ValueError(
"`max_seq_len_to_capture` is larger than the model's max number of positions."
)
if bucket_config is None:
# TODO: alaways set max_seq_len to model's max_position_embeddings once compiler supports it.
bucket_config = MinimalBucketConfig(max_seq_len=max_seq_len_to_capture)
(
prefill_buckets_with_output_size,
decode_buckets_with_output_size,
other_buckets_with_output_size,
) = get_buckets_with_output_logits_size(
self.model_metadata,
bucket_config,
max_prompt_len or max_seq_len_to_capture,
max_seq_len_to_capture,
num_speculative_tokens,
scheduler_config.prefill_chunk_size,
_optimize_logit_shape,
)
if _custom_buckets:
for custom_bucket in _custom_buckets:
if isinstance(custom_bucket, BucketWithOutputLogitsSize):
bucket_with_output_size = custom_bucket
bucket = custom_bucket.bucket
else:
assert isinstance(custom_bucket, Bucket)
output_logits_size = self.model_metadata.get_output_logits_size(custom_bucket)
bucket_with_output_size = BucketWithOutputLogitsSize(
custom_bucket, output_logits_size=output_logits_size
)
bucket = custom_bucket
if bucket.is_prefill:
prefill_buckets_with_output_size.append(bucket_with_output_size)
elif bucket.input_ids_size == 1:
decode_buckets_with_output_size.append(bucket_with_output_size)
else:
other_buckets_with_output_size.append(bucket_with_output_size)
# remove duplication if any.
prefill_buckets_with_output_size = get_list_with_no_dup_with_order_preserved(
prefill_buckets_with_output_size
)
decode_buckets_with_output_size = get_list_with_no_dup_with_order_preserved(
decode_buckets_with_output_size
)
other_buckets_with_output_size = get_list_with_no_dup_with_order_preserved(
other_buckets_with_output_size
)
buckets_with_output_size = [
*prefill_buckets_with_output_size,
*decode_buckets_with_output_size,
*other_buckets_with_output_size,
]
# NOTE: Allow no prefill or decode bucket case with skip_engine=True for artifacts building and internal tests.
if not skip_engine and not prefill_buckets_with_output_size:
raise ValueError("Prefill buckets must be given.")
# Find or check max prompt length is valid and set as field.
if scheduler_config.prefill_chunk_size:
bucket_max_prompt_len = max_prompt_len or max_seq_len_to_capture
else:
bucket_max_prompt_len = (
max(
bucket_with_output_size.bucket.attention_size
for bucket_with_output_size in prefill_buckets_with_output_size
)
if prefill_buckets_with_output_size
else 0
)
if max_prompt_len is not None:
if bucket_max_prompt_len < max_prompt_len:
raise ValueError(
f"Generated buckets cannot handle prompts with `max_prompt_len` {max_prompt_len}. Generate larger buckets or decrease `max_prompt_len`."
)
self.prompt_max_seq_len = max_prompt_len
else:
self.prompt_max_seq_len = bucket_max_prompt_len
# Find the max attention_size of prefill/decode_buckets and set them as field.
if decode_buckets_with_output_size:
bucket_max_seq_len_to_capture = max(
bucket_with_output_size.bucket.attention_size
for bucket_with_output_size in decode_buckets_with_output_size
)
else:
bucket_max_seq_len_to_capture = max(
(
bucket_with_output_size.bucket.attention_size
for bucket_with_output_size in chain(
prefill_buckets_with_output_size, other_buckets_with_output_size
)
),
)
if bucket_max_seq_len_to_capture < max_seq_len_to_capture:
raise ValueError(
"There's no bucket to handle `max_seq_len_to_capture` length of sequence. Add bucket of size `max_seq_len_to_capture` or decrease `max_seq_len_to_capture`."
)
elif bucket_max_seq_len_to_capture > max_seq_len_to_capture:
logger.warning(
"Buckets with larger sequence length than `max_seq_len_to_capture` will be created. This will cause unnecessary overhead."
)
self.max_seq_len_to_capture = bucket_max_seq_len_to_capture
logger.info(
f"Prefill buckets with output size: {pformat(prefill_buckets_with_output_size)}"
)
logger.info(f"Decode buckets with output size: {pformat(decode_buckets_with_output_size)}")
logger.info(f"Other buckets with output size: {pformat(other_buckets_with_output_size)}")
prefill_buckets = [
bucket_with_output_size.bucket
for bucket_with_output_size in prefill_buckets_with_output_size
]
decode_buckets = [
bucket_with_output_size.bucket
for bucket_with_output_size in decode_buckets_with_output_size
]
other_buckets = [
bucket_with_output_size.bucket
for bucket_with_output_size in other_buckets_with_output_size
]
LLM.__verify_buckets(prefill_buckets, decode_buckets, kv_cache_sharing_across_beams_config)
if (
self.model_metadata.optimize_options.kv_cache_sharing_across_beams
and kv_cache_sharing_across_beams_config is None
):
raise ValueError(
"`kv_cache_sharing_across_beams_config` must be given if the model is optimized to share kv cache across beams."
)
padding_block_idx = (
DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX
if optimize_paged_attention_block_loading
else None
)
if self.model_metadata.attention_type is AttentionType.PAGED_ATTENTION:
if paged_attention_num_blocks is None:
# TODO: if `paged_attention_num_blocks` is not given, always calculate maximum possible num blocks and use that.
raise NotImplementedError(
"`paged_attention_num_blocks` must be given for paged attention models now."
)
if paged_attention_block_size != 1:
raise NotImplementedError(
"Currently, only paged attention with block_size=1 is supported."
)
assert paged_attention_num_blocks is not None
paged_attention_config = PagedAttentionConfig(
paged_attention_num_blocks, paged_attention_block_size, padding_block_idx
)
else:
paged_attention_config = None
self.generator_config = GeneratorConfig(
_POSITION_ID_PAD,
get_list_with_no_dup_with_order_preserved(
(*prefill_buckets, *decode_buckets, *other_buckets)
),
self.model_metadata.model_qname,
paged_attention_config,
packing_type,
kv_cache_sharing_across_beams_config,
num_speculative_tokens,
unpadded_vocab_size=_unpadded_vocab_size,
)
prefill_buckets_ = [
DeprecatedBucket(bucket.batch_size, bucket.attention_size)
for bucket in self.generator_config.buckets
if bucket.is_prefill
]
decode_buckets_ = [
DeprecatedBucket(bucket.batch_size, bucket.attention_size)
for bucket in self.generator_config.buckets
if bucket.is_decode and bucket.input_ids_size == 1
]
self.scheduler_config = scheduler_config
# FIXME: remove this after deprecating old generator config format.
self.generator_config_in_deprecated_format = DeprecatedGeneratorConfig(
self.generator_config.position_id_pad,
prefill_buckets_,
decode_buckets_,
self.generator_config.model_qname,
self.generator_config.paged_attention_config,
self.generator_config.packing_type,
self.generator_config.kv_cache_sharing_across_beams_config,
self.scheduler_config,
)
# FIXME: this is a temporary workaround to test decode buckets with more than one input_ids.
self.custom_buckets = _custom_buckets
self.model_rewriting_config = ModelRewritingConfig(
do_decompositions_for_model_rewrite=do_decompositions_for_model_rewrite,
use_blockwise_compile=use_blockwise_compile,
num_blocks_per_supertask=num_blocks_per_supertask,
embed_all_constants_into_graph=embed_all_constants_into_graph,
optimize_logit_shape=_optimize_logit_shape,
)
if device_sets_for_actual_use := kwargs.pop("device_sets_for_actual_use", None):
logger.warning(
"`device_sets_for_actual_use` is deprecated. Use `{tensor|pipeline|data}_parallel` options instead."
)
normalized_dev_mesh = [
parse_devices_str(device_set) if isinstance(device_set, str) else device_set
for device_set in device_sets_for_actual_use
]
else:
dev_mesh = get_device_mesh(
devices, tensor_parallel_size, pipeline_parallel_size, data_parallel_size
)
# Flatten pp_tp_groups to build pipeline. This is 2d-matrix whose elements are dp subgroups.
normalized_dev_mesh = [
[dev for tp_group in pp_tp_group for dev in tp_group] for pp_tp_group in dev_mesh
]
logger.info(
f"Device Mesh currently working is {normalized_dev_mesh} with tp_size={tensor_parallel_size}/pp_size={pipeline_parallel_size}/dp_size={data_parallel_size}"
)
if (
num_blocks_per_pp_stage is not None
and len(num_blocks_per_pp_stage) != pipeline_parallel_size
):
raise ValueError(
"`num_blocks_per_pp_stage` should have length of `pipeline_parallel_size`"
)
self.parallel_config = ParallelConfig(
tensor_parallel_size=tensor_parallel_size, pipeline_parallel_size=pipeline_parallel_size
)
data_parallel_size = len(normalized_dev_mesh)
# Build pipelines for first pp_tp_group and replicate them for other pp_tp_groups later.
first_dp_subgroup_devices = normalized_dev_mesh[0]
if backend is None:
dev_kind = devices[0].kind
if dev_kind == "npu":
backend = LLMBackend.FURIOSA_RT_V2
elif dev_kind == "cpu":
backend = LLMBackend.MOCK_BACKEND_V2
elif dev_kind == "cuda":
backend = LLMBackend.FURIOSA_RT_CUDA
else:
raise ValueError(f"Invalid device kind: {dev_kind}")
if comp_supertask_kind is None:
if backend in (LLMBackend.FURIOSA_RT_NPU, LLMBackend.FURIOSA_RT_V2):
comp_supertask_kind = "edf"
else:
comp_supertask_kind = "fx"
if comp_supertask_kind == "dfg":
logger.info("Using dfg as comp_supertask_kind")
LLM.__verify_comp_supertask_kind(comp_supertask_kind)
beam_size_or_none = (
None
if self.generator_config.kv_cache_sharing_across_beams_config is None
else self.generator_config.kv_cache_sharing_across_beams_config.beam_width
)
# Get Tokenizer
self.tokenizer = get_tokenizer(
model,
tokenizer,
tokenizer_mode,
trust_remote_code=trust_remote_code,
**kwargs,
)
# Please refer to an example at https://huggingface.co/docs/transformers/en/main_classes/text_generation#transformers.GenerationMixin.greedy_search.example
# Some models like GPT-2 may not have pad_token_id. BTW, when we run a batch of sequence generations,
# We must need pad_token_id to fill the batch with pad. With Hugging Face Transformers,
# users should handle this issue. Our goal is to provide a better useability for users.
# We handle this issue within LLM class.
self.model_config["pad_token_id"] = self.model_config["eos_token_id"]
if speculative_model is not None:
# mimic vllm's behavior
num_speculative_tokens = num_speculative_tokens or self.model_config.get("n_predict")
if num_speculative_tokens is None:
raise ValueError(
"`speculative_model` is given, but `num_speculative_tokens` is not given and cannot be obtained from model config."
)
compiler_config_context = CompilerConfigContext(
model_metadata=self.model_metadata,
beam_size=beam_size_or_none,
compiler_config_overrides=compiler_config_overrides,
)
if _pipelines_with_metadata is not None:
# FIXME: This pass exists only for supporting `LLM.from_artifacts` API.
# Only pick pipelines for given buckets.
buckets_with_output_size_to_include = set(buckets_with_output_size)
pipelines_with_bucket_info = [
(
BucketWithOutputLogitsSize(
_get_bucket_from_pipeline_name(pipeline_with_meta.pipeline.name),
pipeline_with_meta.metadata.output_logits_size,
),
pipeline_with_meta,
)
for pipeline_with_meta in _pipelines_with_metadata
]
pipelines_with_meta: List[PipelineWithMetadata] = [
pipeline_with_meta
for bucket_with_output_size, pipeline_with_meta in pipelines_with_bucket_info
if bucket_with_output_size in buckets_with_output_size_to_include
]
if len(pipelines_with_meta) != len(buckets_with_output_size_to_include):
needed_buckets = buckets_with_output_size_to_include - set(
bucket for bucket, _ in pipelines_with_bucket_info
)
raise ValueError(
f"Some needed buckets do not exist in the artifacts.\n{needed_buckets}"
)
# replace devices in pipelines
for pipeline_with_meta in pipelines_with_meta:
pipeline = pipeline_with_meta.pipeline
if len(pipeline.devices) != len(first_dp_subgroup_devices):
raise ValueError(
f"The number of devices in the pipeline {pipeline.devices} is different from the number of devices in the first dp subgroup {first_dp_subgroup_devices}."
)
pipeline.devices = {
DeviceId(str(i)): dev for i, dev in enumerate(first_dp_subgroup_devices)
}
self.pipelines_with_meta = pipelines_with_meta
else:
if self.model_metadata.need_quant_artifacts or qparam_path or qformat_path:
if not (qparam_path and qformat_path):
raise ValueError(
"To use quantized model, `qparam_path` and `qformat_path` should be given."
)
if not (os.path.exists(qparam_path) and os.path.exists(qformat_path)):
raise ValueError(
"`qparam_path` or `qformat_path` is invalid. The file does not exist."
)
if self.is_generative_model:
assert kv_cache_dtype
# Check model's kv cache dtype complies with description of qformat file.
self.__verify_kv_cache_dtype_with_qformat(
kv_cache_dtype, qformat_path, self.model_metadata
)
model_ = ModelCreationInfo(
self.model_metadata,
use_random_weight,
seed,
qformat_path=qformat_path,
qparam_path=qparam_path,
quant_ckpt_file_path=quant_ckpt_file_path,
)
if artifacts_export_path:
os.environ["FURIOSA_COMPILE_DUMP_PATH"] = str(artifacts_export_path)
try:
if trust_remote_code:
prestep_for_remote_code_model(model_.metadata, num_pipeline_builder_workers)
self.build_all_pipelines(
model_,
buckets_with_output_size,
first_dp_subgroup_devices,
backend,
comp_supertask_kind,
one_supertask_per_device,
use_blockwise_compile,
do_decompositions_for_model_rewrite,
kv_cache_dtype.to_torch_dtype() if kv_cache_dtype else None,
sparse_select_version,
num_pipeline_builder_workers,
num_compile_workers,
embed_all_constants_into_graph,
num_blocks_per_supertask,
num_blocks_per_pp_stage,
param_file_path,
param_saved_format,
compiler_config_context,
cache_dir,
_cleanup,
)
# Save artifacts berfoe copying pipelines according to `data_parallelism_size`.
if artifacts_export_path:
self._save_engine_artifacts(
artifacts_export_path,
comp_supertask_kind,
devices,
)
finally:
os.environ.pop("FURIOSA_COMPILE_DUMP_PATH", None)
# If data parallelism is used, replicate pipelines for each entity data parallelism subgroup.
if data_parallel_size > 1:
self.pipelines_with_meta: List[PipelineWithMetadata] = [ # type: ignore[no-redef]
PipelineWithMetadata(
pipeline_with_meta.pipeline.shallow_copy_with_replaced_devices(
dict(zip_equal(first_dp_subgroup_devices, flattened_pp_tp_group)) # type: ignore[arg-type]
),
pipeline_with_meta.metadata,
)
for pipeline_with_meta, flattened_pp_tp_group in product(
self.pipelines_with_meta, normalized_dev_mesh
)
]
if speculative_model:
if kv_cache_sharing_across_beams_config:
raise NotImplementedError(
"Speculative decoding with beam search is not supported yet."
)
if artifacts_export_path:
speculative_model_artifacts_export_path = os.path.join(
artifacts_export_path, "speculative_model"
)
else:
speculative_model_artifacts_export_path = None
if isinstance(speculative_model, LLM):
draft_llm = speculative_model
# TODO: do we need to check given arguments match?
elif isinstance(speculative_model, str):
draft_llm = LLM(
speculative_model,
task_type=self.model_metadata.task_type,
llm_config=speculative_model_llm_config,
qformat_path=speculative_model_qformat_path,
qparam_path=speculative_model_qparam_path,
quant_ckpt_file_path=speculative_model_quant_ckpt_file_path,
hf_overrides=speculative_model_config,
bucket_config=speculative_model_bucket_config,
max_seq_len_to_capture=max_seq_len_to_capture,
# TODO:Expose parallel config for specualtive model later?
tensor_parallel_size=tensor_parallel_size,
pipeline_parallel_size=pipeline_parallel_size,
data_parallel_size=data_parallel_size,
# NOTE: tokenizer is shared between main model and specualtive model.
seed=seed,
devices=devices,
# Use same configs of big model.
# TODO: do we need to expose param_file_path for the specualtive model?
do_decompositions_for_model_rewrite=do_decompositions_for_model_rewrite,
comp_supertask_kind=comp_supertask_kind,
cache_dir=cache_dir,
backend=backend,
use_blockwise_compile=use_blockwise_compile,
num_blocks_per_supertask=num_blocks_per_supertask,
embed_all_constants_into_graph=embed_all_constants_into_graph,
paged_attention_num_blocks=speculative_model_paged_attention_num_blocks,
paged_attention_block_size=paged_attention_block_size,
kv_cache_sharing_across_beams_config=None,
scheduler_config=scheduler_config,
packing_type=packing_type,
# TODO: Should we expose this?
compiler_config_overrides=compiler_config_overrides,
# TODO: Should we expose this?
use_random_weight=use_random_weight,
num_pipeline_builder_workers=num_pipeline_builder_workers,
num_compile_workers=num_compile_workers,
skip_engine=True,
artifacts_export_path=speculative_model_artifacts_export_path,
_optimize_logit_shape=_optimize_logit_shape,
)
else:
raise ValueError(
"`speculative_model` must be either a pretraiend model id or an instance of LLM."
)
self.draft_pipelines_with_meta: Optional[List[PipelineWithMetadata]] = (
draft_llm.pipelines_with_meta
)
self.draft_generator_config: Optional[GeneratorConfig] = draft_llm.generator_config
self.speculative_model_config: Optional[PretrainedConfig] = draft_llm.model_config
else:
self.draft_pipelines_with_meta = None
self.draft_generator_config = None
self.speculative_model_config = None
# for e2e testing purpose, it allows to skip to initialize the engine
if not skip_engine:
try:
from furiosa.native_runtime.llm import NativeLLMEngine # type: ignore
except ImportError:
logger.error(
"NativeLLMEngine is not available. Please make sure that the furiosa-native-runtime is installed.\n"
'You can install furiosa-native-runtime by running `pip install furiosa-llm`.\n'
"If you want to use the LLM without the native runtime, you can set `skip_engine=True` in the constructor."
)
raise
self.engine = NativeLLMEngine(
self.pipelines_with_meta,
self.draft_pipelines_with_meta,
self.generator_config,
self.draft_generator_config,
scheduler_config=self.scheduler_config,
backend=backend.value,
hf_config=self.model_config,
draft_hf_config=self.speculative_model_config,
)
[docs]
@classmethod
def load_artifacts(
cls,
path: Union[str, os.PathLike],
**kwargs,
) -> "LLM":
"""Instantiate LLM from saved artifacts without quantization and compilation.
Please note that this method is being deprecated. Use `load_artifact` instead.
Args:
path: A path to artifacts to load.
devices: The devices to run the model. It can be a single device or a list of devices.
Each device can be either "npu:X" or "npu:X:\*" where X is a specific device index.
If not given, all available devices will be used.
data_parallel_size: The size of the data parallelism group. If not given, it will be inferred from
total avaialble PEs and other parallelism degrees.
pipeline_parallel_size : The size of the pipeline parallelism. The argument is valid only for artifacts that use blockwise compilation.
num_blocks_per_pp_stage: The number of transformer blocks per each pipeline parallelism stage.
The argument is valid only for artifacts that use blockwise compilation. If only `pipeline_parallel_size` is provided, transformer blocks will be
distributed equally.
bucket_config: Config for bucket generating policy. If not given, all buckets in the artifacts will be used.
scheduler_config: Configuration for the scheduler, allowing to maximum number of tasks which can be queued to HW, maximum number of samples
that can be processed by the scheduler, and ratio of spare blocks that are reserved by scheduler. If this is not given, scheduler config
saved in the artifacts will be used.
paged_attention_num_blocks: The maximum number of blocks that each k/v storage per layer can store. This argument must be given
if model uses paged attention.
tokenizer: The name or path of a HuggingFace Transformers tokenizer.
tokenizer_mode: The tokenizer mode. "auto" will use the fast tokenizer
if available, and "slow" will always use the slow tokenizer.
seed: The seed to initialize the random number generator for sampling.
cache_dir: The cache directory for all generated files for this LLM instance.
When its value is ``None``, caching is disabled. The default is "$HOME/.cache/furiosa/llm".
skip_engine: If True, the native runtime engine will not be initialized. This is useful when you need
the pipelines for other purposes than running them with the engine.
"""
logger.warning(
"`LLM.load_artifacts()` is being deprecated. Use `LLM.load_artifact()` instead."
)
return cls.load_artifact(path, **kwargs)
[docs]
@classmethod
def load_artifact(
cls,
path: Union[str, os.PathLike],
*,
# Runtime Configuration
devices: Optional[Union[str, Sequence[Device]]] = None,
data_parallel_size: Optional[int] = None,
pipeline_parallel_size: Optional[int] = None,
num_blocks_per_pp_stage: Optional[Sequence[int]] = None,
bucket_config: Optional[BucketConfig] = None,
scheduler_config: Optional[SchedulerConfig] = None,
paged_attention_num_blocks: Optional[int] = None,
# Other Configuration
tokenizer: Optional[Union[PreTrainedTokenizer, PreTrainedTokenizerFast]] = None,
tokenizer_mode: TokenizerModeType = "auto",
seed: Optional[int] = None,
cache_dir: os.PathLike = CACHE_DIR,
skip_engine: bool = False,
**kwargs,
) -> "LLM":
"""Instantiate LLM from saved artifacts without quantization and compilation.
Args:
path: A path to artifacts to load.
devices: The devices to run the model. It can be a single device or a list of devices.
Each device can be either "npu:X" or "npu:X:\*" where X is a specific device index.
If not given, all available devices will be used.
data_parallel_size: The size of the data parallelism group. If not given, it will be inferred from
total avaialble PEs and other parallelism degrees.
pipeline_parallel_size : The size of the pipeline parallelism. The argument is valid only for artifacts that use blockwise compilation.
num_blocks_per_pp_stage: The number of transformer blocks per each pipeline parallelism stage.
The argument is valid only for artifacts that use blockwise compilation. If only `pipeline_parallel_size` is provided, transformer blocks will be
distributed equally.
bucket_config: Config for bucket generating policy. If not given, all buckets in the artifacts will be used.
scheduler_config: Configuration for the scheduler, allowing to maximum number of tasks which can be queued to HW, maximum number of samples
that can be processed by the scheduler, and ratio of spare blocks that are reserved by scheduler. If this is not given, scheduler config
saved in the artifacts will be used.
paged_attention_num_blocks: The maximum number of blocks that each k/v storage per layer can store. This argument must be given
if model uses paged attention.
tokenizer: The name or path of a HuggingFace Transformers tokenizer.
tokenizer_mode: The tokenizer mode. "auto" will use the fast tokenizer
if available, and "slow" will always use the slow tokenizer.
seed: The seed to initialize the random number generator for sampling.
cache_dir: The cache directory for all generated files for this LLM instance.
When its value is ``None``, caching is disabled. The default is "$HOME/.cache/furiosa/llm".
skip_engine: If True, the native runtime engine will not be initialized. This is useful when you need
the pipelines for other purposes than running them with the engine.
"""
# check path is valid
artifact_path = f"{path}/artifact.json"
if not os.path.exists(artifact_path):
raise ValueError("This artifacts is not valid.")
if pipeline_parallel_size and num_blocks_per_pp_stage:
raise ValueError(
"Only one of `pipeline_parallel_size` or `num_blocks_per_pp_stage` should be given as input, not both."
)
runtime_config_path = f"{path}/runtime_config.json"
if not os.path.exists(runtime_config_path):
raise ValueError("Runtime configuration file is not valid.")
# TODO: validation artifacts, pipelines, edf, safetensors
artifact = Artifact.load(artifact_path)
runtime_config = RuntimeConfig.load(runtime_config_path)
generator_config = artifact.generator_config
model_metadata = ModelMetadataForArtifact.from_metadata(
artifact.model_metadata, artifact.hf_config, generator_config.model_qname
)
model_metadata.trust_remote_code = False
model_rewriting_config = artifact.model_rewriting_config
parallel_config = artifact.parallel_config
max_prompt_len = kwargs.pop("max_prompt_len", artifact.max_prompt_len)
if not devices:
devices = _get_available_devices()
if not bucket_config:
if (
runtime_config.prefill_buckets is not None
and runtime_config.decode_buckets is not None
):
bucket_config = ManualBucketConfig(
prefill_buckets=runtime_config.prefill_buckets,
decode_buckets=runtime_config.decode_buckets,
)
else:
prefill_buckets = []
decode_buckets = []
for bucket in generator_config.buckets:
if bucket.is_prefill:
prefill_buckets.append(bucket)
elif bucket.is_decode and bucket.input_ids_size == 1:
decode_buckets.append(bucket)
else:
# Bucket for speculative decoding.
# speculative_decoding_buckets.append(bucket)
pass
bucket_config = ManualBucketConfig(
prefill_buckets=[
(bucket.batch_size, bucket.attention_size) for bucket in prefill_buckets
],
decode_buckets=[
(bucket.batch_size, bucket.attention_size) for bucket in decode_buckets
],
)
if not scheduler_config:
scheduler_config = SchedulerConfig(
npu_queue_limit=runtime_config.npu_queue_limit,
max_processing_samples=runtime_config.max_processing_samples,
spare_blocks_ratio=runtime_config.spare_blocks_ratio,
is_offline=runtime_config.is_offline,
prefill_chunk_size=runtime_config.prefill_chunk_size,
)
if not paged_attention_num_blocks:
paged_attention_num_blocks = runtime_config.paged_attention_num_blocks
# Find the max attention_size from the pipelines' buckets
if isinstance(bucket_config, ManualBucketConfig):
if bucket_config.decode_buckets:
assert model_metadata.is_generative_model
max_seq_len_to_capture = max([bucket[1] for bucket in bucket_config.decode_buckets])
else:
max_seq_len_to_capture = max(
[bucket[1] for bucket in bucket_config.prefill_buckets]
)
elif isinstance(bucket_config, MinimalBucketConfig):
max_seq_len_to_capture = bucket_config.max_seq_len
else:
raise ValueError("`bucket_config` must be an instance of BucketConfig.")
# Load all saved pipelines
pipelines = cls.__load_pipelines(
path,
try_from_lir=False,
try_from_dfg=False,
cache_dir=cache_dir,
artifact=artifact,
)
if artifact.pipeline_metadata_list is None:
# For B.C
pipelines_with_metadata = [
PipelineWithMetadata(pipeline, get_pipeline_metadata(model_metadata, bucket))
for bucket, pipeline in zip_equal(generator_config.buckets, pipelines)
]
else:
pipelines_with_metadata = [
PipelineWithMetadata(pipeline, pipeline_metadata)
for pipeline, pipeline_metadata in zip_equal(
pipelines, artifact.pipeline_metadata_list
)
]
# Overriding in PP size
pp_size_to_convey = None
if num_blocks_per_pp_stage or pipeline_parallel_size:
if not model_rewriting_config.use_blockwise_compile:
raise ValueError(
"Pipeline parallelism overriding is supported only for blockwise-compiled artifacts."
)
if isinstance(devices, str):
devices = parse_devices_str(devices)
# if `pipeline_parallel_size` is not None, `num_blocks_per_pp_stage` must be list of int
pp_size_to_convey = (
pipeline_parallel_size
if pipeline_parallel_size
else len(num_blocks_per_pp_stage) # type: ignore[arg-type]
)
dev_mesh = get_device_mesh(
devices,
parallel_config.tensor_parallel_size,
pp_size_to_convey,
data_parallel_size,
)
pipelines_with_metadata = [
PipelineWithMetadata(
override_pp_size_on_pipeline(
pipeline,
devices=dev_mesh[0],
pipeline_parallel_size=pipeline_parallel_size,
num_blocks_per_pp_stage=num_blocks_per_pp_stage,
),
meta,
)
for pipeline, meta in pipelines_with_metadata
]
return cls(
model_metadata.pretrained_id,
task_type=model_metadata.task_type,
llm_config=model_metadata.llm_config,
hf_overrides=model_metadata.hf_configs,
bucket_config=bucket_config,
speculative_model=None,
num_speculative_tokens=generator_config.num_speculative_tokens,
max_seq_len_to_capture=max_seq_len_to_capture,
max_prompt_len=max_prompt_len,
tensor_parallel_size=parallel_config.tensor_parallel_size,
data_parallel_size=data_parallel_size,
pipeline_parallel_size=(
pp_size_to_convey if pp_size_to_convey else parallel_config.pipeline_parallel_size
),
tokenizer=tokenizer,
tokenizer_mode=tokenizer_mode,
seed=seed,
devices=devices,
do_decompositions_for_model_rewrite=model_rewriting_config.do_decompositions_for_model_rewrite,
comp_supertask_kind="edf",
cache_dir=cache_dir,
backend=LLMBackend.FURIOSA_RT_V2,
use_blockwise_compile=model_rewriting_config.use_blockwise_compile,
num_blocks_per_pp_stage=num_blocks_per_pp_stage,
num_blocks_per_supertask=model_rewriting_config.num_blocks_per_supertask,
embed_all_constants_into_graph=model_rewriting_config.embed_all_constants_into_graph,
paged_attention_num_blocks=paged_attention_num_blocks,
kv_cache_sharing_across_beams_config=generator_config.kv_cache_sharing_across_beams_config,
scheduler_config=scheduler_config,
packing_type=generator_config.packing_type,
skip_engine=skip_engine,
_pipelines_with_metadata=pipelines_with_metadata,
_optimize_logit_shape=model_rewriting_config.optimize_logit_shape,
_model_metadata=model_metadata,
_unpadded_vocab_size=generator_config.unpadded_vocab_size,
**kwargs,
)
[docs]
@classmethod
def from_artifacts(
cls,
path: Union[str, os.PathLike],
bucket_config: Optional[BucketConfig] = None,
speculative_model: Optional["LLM"] = None,
num_speculative_tokens: Optional[int] = None,
speculative_model_bucket_config: Optional[BucketConfig] = None,
use_speculative_decoding_if_possible: bool = True,
data_parallel_size: Optional[int] = None,
pipeline_parallel_size: Optional[int] = None,
num_blocks_per_pp_stage: Optional[Sequence[int]] = None,
tokenizer: Optional[Union[PreTrainedTokenizer, PreTrainedTokenizerFast]] = None,
tokenizer_mode: TokenizerModeType = "auto",
seed: Optional[int] = None,
devices: Optional[Union[str, Sequence[Device]]] = None,
cache_dir: os.PathLike = CACHE_DIR,
backend: Optional[LLMBackend] = None,
paged_attention_num_blocks: Optional[int] = None,
scheduler_config: Optional[SchedulerConfig] = None,
speculative_model_paged_attention_num_blocks: Optional[int] = None,
packing_type: Literal["IDENTITY"] = "IDENTITY",
skip_engine: bool = False,
*,
_cleanup=True,
**kwargs,
) -> "LLM":
"""Instantiate LLM from saved artifacts without quantization and compilation.
Args:
path: A path to artifacts to load.
bucket_config: Config for bucket generating policy. If not given, all buckets in the artifacts will be used.
speculative_model: Draft model for speculative decoding.
num_speculative_tokens: The number of tokens that specualtive model will generate speculatively during each iteration of the decoding process.
speculative_model_bucket_config: Bucket generating config for speculative_model. If not given,
all specualtive model buckets (if exists) in the artifacts will be used.
use_speculative_decoding_if_possible: If True, speculative decoding will be used if possible
(`speculative_model` is given or there's artifacts for specualtive model in the artifacts.).
Otherwise, speculative decoding will not be used.
data_parallel_size: The size of the data parallelism group. If not given, it will be inferred from
total avaialble PEs and other parallelism degrees.
pipeline_parallel_size : The size of the pipeline parallelism. The argument is valid only for artifacts that use blockwise compilation.
num_blocks_per_pp_stage: The number of transformer blocks per each pipeline parallelism stage.
The argument is valid only for artifacts that use blockwise compilation. If only `pipeline_parallel_size` is provided, transformer blocks will be
distributed equally.
tokenizer: The name or path of a HuggingFace Transformers tokenizer.
tokenizer_mode: The tokenizer mode. "auto" will use the fast tokenizer
if available, and "slow" will always use the slow tokenizer.
seed: The seed to initialize the random number generator for sampling.
devices: The devices to run the model. It can be a single device or a list of devices.
Each device can be either "npu:X" or "npu:X:\*" where X is a specific device index.
If not given, devices saved in the artifacts will be used.
cache_dir: The cache directory for all generated files for this LLM instance.
When its value is ``None``, caching is disabled. The default is "$HOME/.cache/furiosa/llm".
backend: The backend implementation to run forward() of a model for the LLM.
The default is LLMBackend.TORCH_PIPELINE_RUNNER.
paged_attention_num_blocks: The maximum number of blocks that each k/v storage per layer can store. This argument must be given
if model uses paged attention.
scheduler_config: Configuration for the scheduler, allowing to maximum number of tasks which can be queued to HW, maximum number of samples
that can be processed by the scheduler, and ratio of spare blocks that are reserved by scheduler. If this is not given, scheduler config
saved in the artifacts will be used.
speculative_model_paged_attention_num_blocks: The maximum number of blocks that each k/v storage per layer can store for the specualtive model. This argument must be given
if the specualtive model uses paged attention.
packing_type: Packing algorithm. Possible values are "IDENTITY" only for now
skip_engine: If True, the native runtime engine will not be initialized. This is useful when you need
the pipelines for other purposes than running them with the engine.
"""
if not os.path.exists(f"{path}/ready"):
raise ValueError("This artifacts is not valid.")
if pipeline_parallel_size and num_blocks_per_pp_stage:
raise ValueError(
"Only one of `pipeline_parallel_size` or `num_blocks_per_pp_stage` should be given as input, not both."
)
# Load configs
try:
generator_config = GeneratorConfig.load(f"{path}/generator_config.json")
scheduler_config_ = SchedulerConfig.load(f"{path}/scheduler_config.json")
except pydantic.ValidationError:
# Generator config is in deprecated format.
generator_config_ = DeprecatedGeneratorConfig.load(f"{path}/generator_config.json")
new_buckets = [
*(
Bucket.prefill(bucket.batch_size, bucket.attention_size)
for bucket in generator_config_.prefill_buckets
),
*(
Bucket.decode(bucket.batch_size, bucket.attention_size)
for bucket in generator_config_.decode_buckets
),
]
# Artifacts' generator config was saved with previous version.
# TODO: remove this after deprecating previous version of generator config type.
generator_config = GeneratorConfig(
generator_config_.position_id_pad,
new_buckets,
generator_config_.model_qname,
generator_config_.paged_attention_config,
generator_config_.packing_type,
generator_config_.kv_cache_sharing_across_beams_config,
None,
None,
)
assert generator_config_.scheduler_config is not None
scheduler_config_ = generator_config_.scheduler_config
# Use saved scheduler config if not given.
scheduler_config = scheduler_config or scheduler_config_
with open(f"{path}/model_metadata.json", "r") as fp:
model_metadata = ModelMetadata.model_validate_json(fp.read())
with open(f"{path}/model_rewriting_config.json") as fp:
model_rewriting_config = ModelRewritingConfig.model_validate_json(fp.read())
with open(f"{path}/parallel_config.json") as fp:
parallel_config = ParallelConfig.model_validate_json(fp.read())
try:
with open(f"{path}/pipelines_metadata.json", "r") as fp:
pipelines_metadata = [
PipelineMetadata.model_validate(metadata) for metadata in json.load(fp)
]
except FileNotFoundError:
# PipelineMetadata doesn't exist. Get information from model metadata and bucket.
pipelines_metadata = [
get_pipeline_metadata(
model_metadata,
bucket,
)
for bucket in generator_config.buckets
]
with open(f"{path}/other_config.json") as fp:
other_configs = json.load(fp)
try_from_lir = os.environ.get("LLM_ENGINE_ARTIFACTS_TRY_FROM_LIR", "0") == "1"
try_from_dfg = os.environ.get("LLM_ENGINE_ARTIFACTS_TRY_FROM_DFG", "0") == "1"
# Load all saved pipelines
pipelines = cls.__load_pipelines(
path,
try_from_lir=try_from_lir,
try_from_dfg=try_from_dfg,
cache_dir=cache_dir,
)
if [
_get_bucket_from_pipeline_name(pipeline.name) for pipeline in pipelines
] != generator_config.buckets:
raise ValueError(
"Saved pipeline order does not comply with buckets in generator config."
)
pipelines_with_metadata = [
PipelineWithMetadata(pipeline, metadata)
for pipeline, metadata in zip_equal(pipelines, pipelines_metadata)
]
paged_attention_num_blocks = paged_attention_num_blocks or getattr(
generator_config.paged_attention_config, "num_blocks", None
)
paged_attention_block_size = getattr(
generator_config.paged_attention_config, "block_size", 1
)
comp_supertask_kind = other_configs["comp_supertask_kind"]
if devices is None:
devices = other_configs.get("devices")
if devices is None and not os.environ.get(
"DISABLE_LLM_ENGINE_ARTIFACTS_STRICT_LOAD", False
):
raise ValueError(
"Saved devices info is not found in the artifacts. Please provide devices explicitly or set `DISABLE_LLM_ENGINE_ARTIFACTS_STRICT_LOAD=1` to use all available devices."
)
if devices:
devices = [Device(device) for device in devices]
logger.info(
f"Device is not given, fall back to devices specified in artifact: {devices}"
)
pp_size_to_convey = None
if num_blocks_per_pp_stage or pipeline_parallel_size:
if not model_rewriting_config.use_blockwise_compile:
raise ValueError(
"Pipeline parallelism overriding is supported only for blockwise-compiled artifacts."
)
if isinstance(devices, str):
devices = parse_devices_str(devices)
# if `pipeline_parallel_size` is not None, `num_blocks_per_pp_stage` must be list of int
pp_size_to_convey = (
pipeline_parallel_size
if pipeline_parallel_size
else len(num_blocks_per_pp_stage) # type: ignore[arg-type]
)
dev_mesh = get_device_mesh(
devices,
parallel_config.tensor_parallel_size,
pp_size_to_convey,
data_parallel_size,
)
pipelines_with_metadata = [
PipelineWithMetadata(
override_pp_size_on_pipeline(
pipeline,
devices=dev_mesh[0],
pipeline_parallel_size=pipeline_parallel_size,
num_blocks_per_pp_stage=num_blocks_per_pp_stage,
),
metadata,
)
for pipeline, metadata in pipelines_with_metadata
]
if use_speculative_decoding_if_possible:
if speculative_model:
assert isinstance(speculative_model, LLM)
if speculative_model_paged_attention_num_blocks:
if not speculative_model.generator_config.paged_attention_config:
raise ValueError(
"Given specualtive model does not support paged attention but `speculative_model_paged_attention_num_blocks` is given."
)
speculative_model.generator_config.paged_attention_config.num_blocks = (
speculative_model_paged_attention_num_blocks
)
if speculative_model_bucket_config:
raise NotImplementedError(
"`speculative_model_bucket_config` is not supported when "
)
elif os.path.exists(f"{path}/speculative_model"):
# specualtive model exists.
speculative_model = cls.from_artifacts(
f"{path}/speculative_model",
bucket_config=speculative_model_bucket_config,
seed=seed,
devices=devices,
cache_dir=cache_dir,
paged_attention_num_blocks=speculative_model_paged_attention_num_blocks,
packing_type=packing_type,
skip_engine=True,
)
if num_speculative_tokens:
if (
generator_config.num_speculative_tokens
and generator_config.num_speculative_tokens != num_speculative_tokens
):
raise ValueError(
f"`num_speculative_tokens` must be same as saved `num_speculative_tokens` value in artifacts: {num_speculative_tokens} != {generator_config.num_speculative_tokens}"
)
if not speculative_model:
raise ValueError(
"Specualtive model given, but original model artifacts is not for speculative decoding."
)
generator_config.num_speculative_tokens = num_speculative_tokens
if generator_config.num_speculative_tokens and not speculative_model:
raise ValueError(
"You want to use speculative decoding but speculative model is not given and cannot be loaded."
)
else:
# Don't use speculative decoding.
if speculative_model:
logger.warning(
"Ignore `speculative_model` because `use_speculative_decoding_if_possible` is False."
)
if num_speculative_tokens:
logger.warning(
"Ignore `num_speculative_tokens` because `use_speculative_decoding_if_possible` is False."
)
num_speculative_tokens = None
generator_config.num_speculative_tokens = None
other_buckets = []
if not bucket_config:
# If bucket config is not given, load available buckets from the artifacts.
prefill_buckets_ = []
decode_buckets_ = []
for bucket in generator_config.buckets:
if bucket.is_prefill:
prefill_buckets_.append(bucket)
elif bucket.is_decode and bucket.input_ids_size == 1:
decode_buckets_.append(bucket)
else:
# Bucket for speculative decoding.
other_buckets.append(bucket)
if num_speculative_tokens:
# Check all needed verification buckets exist.
verification_buckets = set(
bucket
for bucket in other_buckets
if bucket.input_ids_size == num_speculative_tokens + 1
)
# There should be verification bucket with same attention size for each decode bucket.
if any(
Bucket(
decode_bucket.batch_size,
decode_bucket.attention_size,
decode_bucket.attention_size - num_speculative_tokens - 1,
)
not in verification_buckets
for decode_bucket in decode_buckets_
):
raise ValueError(
f"There is missing verfication buckets in the artifacts. Buckets with input_ids_length {num_speculative_tokens + 1} should exists."
)
prefill_buckets = [
(bucket.batch_size, bucket.attention_size) for bucket in prefill_buckets_
]
decode_buckets = [
(bucket.batch_size, bucket.attention_size) for bucket in decode_buckets_
]
bucket_config = ManualBucketConfig(prefill_buckets, decode_buckets)
# Find the max attention_size from the pipelines' buckets
if isinstance(bucket_config, ManualBucketConfig):
if bucket_config.decode_buckets:
assert model_metadata.is_generative_model
max_seq_len_to_capture = max([bucket[1] for bucket in bucket_config.decode_buckets])
else:
max_seq_len_to_capture = max(
[bucket[1] for bucket in bucket_config.prefill_buckets]
)
elif isinstance(bucket_config, MinimalBucketConfig):
max_seq_len_to_capture = bucket_config.max_seq_len
else:
raise ValueError("`bucket_config` must be an instance of BucketConfig.")
return cls(
model_metadata.pretrained_id,
task_type=model_metadata.task_type,
llm_config=model_metadata.llm_config,
hf_overrides=model_metadata.hf_configs,
bucket_config=bucket_config,
speculative_model=speculative_model,
num_speculative_tokens=generator_config.num_speculative_tokens,
max_seq_len_to_capture=max_seq_len_to_capture,
tensor_parallel_size=parallel_config.tensor_parallel_size,
pipeline_parallel_size=(
pp_size_to_convey if pp_size_to_convey else parallel_config.pipeline_parallel_size
),
data_parallel_size=data_parallel_size,
tokenizer=tokenizer,
tokenizer_mode=tokenizer_mode,
seed=seed,
devices=devices,
do_decompositions_for_model_rewrite=model_rewriting_config.do_decompositions_for_model_rewrite,
comp_supertask_kind=comp_supertask_kind,
cache_dir=cache_dir,
backend=backend,
use_blockwise_compile=model_rewriting_config.use_blockwise_compile,
num_blocks_per_pp_stage=num_blocks_per_pp_stage,
num_blocks_per_supertask=model_rewriting_config.num_blocks_per_supertask,
embed_all_constants_into_graph=model_rewriting_config.embed_all_constants_into_graph,
paged_attention_num_blocks=paged_attention_num_blocks,
paged_attention_block_size=paged_attention_block_size,
kv_cache_sharing_across_beams_config=generator_config.kv_cache_sharing_across_beams_config,
scheduler_config=scheduler_config,
packing_type=packing_type,
skip_engine=skip_engine,
_pipelines_with_metadata=pipelines_with_metadata,
_custom_buckets=other_buckets,
_optimize_logit_shape=model_rewriting_config.optimize_logit_shape,
_cleanup=_cleanup,
**kwargs,
)
@staticmethod
def __load_pipelines(
path: Union[str, os.PathLike],
try_from_lir: bool,
try_from_dfg: bool,
cache_dir: Optional[os.PathLike] = CACHE_DIR,
artifact: Optional[Artifact] = None,
) -> List[Pipeline]:
pipelines = []
if artifact and artifact.pipelines:
for pipeline_dict in artifact.pipelines:
pipeline = Pipeline.from_dict(pipeline_dict)
pipelines.append(pipeline)
else:
for idx in range(len(glob.glob(f"{path}/pipeline.*.json"))):
pipeline = Pipeline.load(f"{path}/pipeline.{idx}.json")
pipelines.append(pipeline)
for pipeline in pipelines:
blob_to_device: Dict[DataBlobId, Device] = {}
for _, task in pipeline.supertasks.items():
if isinstance(task, CompSuperTask) and task.kind == SuperTaskKind.EDF:
if task.data_blob is None:
continue
blob_to_device[task.data_blob] = pipeline.devices[task.device]
blob_kind = pipeline.get_blob_kind()
for id, _ in pipeline.blobs.items():
kind = blob_kind.get(id)
if kind == SuperTaskKind.FX:
with open(f"{path}/{id}.fx", "r") as fp:
pipeline.blobs[id] = fp.read()
elif kind == SuperTaskKind.EDF:
try:
from furiosa.native_compiler import ( # type: ignore[import]
CompiledGraph,
compile_from_path,
)
except ImportError:
logger.error("furiosa-native-compiler is required to load EDF format")
raise
compiler_config_yaml = f"{path}/{id}.config.yaml"
device = blob_to_device[id]
target_npu = GraphModuleConverter.get_target_npu_from_device(device)
# check if:
# - edf file does not exist,
# - try_from_lir is enabled,
# - and lir exists
# then, compile lir to edf
if (
not os.path.exists(f"{path}/{id}.edf")
and try_from_lir
and os.path.exists(f"{path}/{id}.lir")
):
if try_from_dfg:
logger.warning(
"Both TRY_FROM_LIR and TRY_FROM_DFG are enabled. In this case, TRY_FROM_LIR is prioritized."
)
compiler_config = try_compiler_config_from_yaml(compiler_config_yaml)
logger.info(
f"Compiling LIR to EDF for {id} with compiler config {compiler_config}"
)
out = compile_from_path(
f"{path}/{id}.lir",
target_npu,
target_ir="edf",
config=compiler_config,
dump_tag=id,
dump_path=str(path),
)
contents = CompiledGraph.serialize(out)
with open(f"{path}/{id}.edf", "wb") as fp: # type: ignore[assignment]
fp.write(contents) # type: ignore[arg-type]
# check if:
# - edf file does not exist,
# - try_from_dfg is enabled,
# - and dfg exists
# then, compile dfg to edf
if (
not os.path.exists(f"{path}/{id}.edf")
and try_from_dfg
and os.path.exists(f"{path}/{id}.dfg")
):
compiler_config = try_compiler_config_from_yaml(compiler_config_yaml)
logger.info(
f"Compiling DFG to EDF for {id} with compiler config {compiler_config}"
)
out = compile_from_path(
f"{path}/{id}.dfg",
target_npu,
target_ir="edf",
config=compiler_config,
dump_tag=id,
dump_path=str(path),
dump_lir=True,
)
contents = CompiledGraph.serialize(out)
with open(f"{path}/{id}.edf", "wb") as fp: # type: ignore[assignment]
fp.write(contents) # type: ignore[arg-type]
with open(f"{path}/{id}.edf", "rb") as fp:
pipeline.blobs[id] = CompiledGraph.deserialize(fp.read(), tag=id) # type: ignore[arg-type, assignment]
else:
raise NotImplementedError(f"SuperTask [{kind}] is not supported to load")
# Support both cases:
# 1. param file is located in the artifacts directory
# 2. param file is located in the global cache directory
for param_idx, param_file in pipeline.param_files.items():
# NOTE: param_file.path is already `os.path.basename`d
path_candidates = (
os.path.abspath(f"{path}/{param_file.path}"),
os.path.abspath(f"{cache_dir}/param_files/{param_file.path}"),
)
for candidate in path_candidates:
if os.path.exists(candidate):
param_file.path = candidate
break
else:
raise FileNotFoundError(
f"Param file {param_file.path} is not found in neither artifacts path nor cache directory."
)
return pipelines
@classmethod
def compute_prefill_buckets(
cls,
preferred_batch_size: int,
prefill_buckets_num: int,
prefill_buckets: Optional[Sequence[Tuple[int, int]]],
max_position_embeddings: int,
) -> List[Bucket]:
if prefill_buckets is not None:
return [
Bucket.prefill(batch_size, attn_size) for batch_size, attn_size in prefill_buckets
]
else:
# Generate the buckets automatically
percentage = 1.0 / prefill_buckets_num
interval = max_position_embeddings * percentage
atten_sizes = [int(interval * i) for i in range(1, prefill_buckets_num + 1)]
return [Bucket.prefill(preferred_batch_size, attn_size) for attn_size in atten_sizes]
@classmethod
def compute_decode_buckets(
cls,
preferred_batch_size: int,
decode_buckets_num: int,
decode_buckets: Optional[Sequence[Tuple[int, int]]],
max_position_embeddings: int,
) -> List[Bucket]:
if decode_buckets is not None:
return [
Bucket.decode(batch_size, attn_size) for batch_size, attn_size in decode_buckets
]
else:
# Generate the buckets automatically
percentage = 1.0 / decode_buckets_num
interval = max_position_embeddings * percentage
attn_sizes = [int(interval * i) for i in range(1, decode_buckets_num + 1)]
return [Bucket.decode(preferred_batch_size, attn_size) for attn_size in attn_sizes]
@classmethod
def __verify_buckets(
cls,
prefills: Sequence[Bucket],
decodes: Sequence[Bucket],
kv_cache_beam_sharing: Optional[KvCacheSharingAcrossBeamsConfig],
):
if kv_cache_beam_sharing is not None:
for bucket in decodes:
if bucket.batch_size % kv_cache_beam_sharing.beam_width != 0:
raise ValueError(
f"decode batch size must be a multiple of beam width, but got {bucket.batch_size} % {kv_cache_beam_sharing.beam_width} != 0"
)
if bucket.attention_size <= kv_cache_beam_sharing.max_new_tokens:
raise ValueError(
f"decode bucket's attention size must be greater than max_new_tokens, but got {bucket.attention_size} < {kv_cache_beam_sharing.max_new_tokens}"
)
@staticmethod
def __verify_comp_supertask_kind(kind: str) -> None:
if kind not in ("fx", "dfg", "edf"):
raise ValueError(
f"Unknown comp_supertask_kind: {kind}. Must be either 'fx', 'dfg', or 'edf'."
)
@staticmethod
def __verify_tokenizer_mode(tokenizer_mode: TokenizerModeType) -> None:
tokenizer_mode_lowered = tokenizer_mode.lower()
if tokenizer_mode_lowered not in get_args(TokenizerModeType):
valid_options = ",".join(get_args(TokenizerModeType))
raise ValueError(
f"Unknown tokenizer mode: {tokenizer_mode}. Must be one of '{valid_options}'."
)
@staticmethod
def __verify_devices(devices: Sequence[Device]) -> None:
if len(devices) == 0:
raise ValueError("No devices are given")
if not all(dev.kind == devices[0].kind for dev in devices):
raise ValueError("All devices must be the same kind.")
@staticmethod
def __is_generative_model(model_type: Union[str, Type[PreTrainedModel]]) -> bool:
"""Check if the model is a generative model."""
if isinstance(model_type, str):
return model_type in _HF_CAUSAL_LM_CLASS_NAMES
else:
return model_type.__name__ in _HF_CAUSAL_LM_CLASS_NAMES
def _get_default_opt_config_from_pretrained_id(
self, pretrained_id: str, trust_remote_code: Optional[bool]
) -> OptimizationConfig:
model_cls = get_model_cls_from_pretrained_id(pretrained_id, trust_remote_code)
model_cls = get_mapped_class_for_optimization(model_cls)
if model_cls is transformers.GPTJForCausalLM:
return OptimizationConfig(
attention_type=AttentionType.PAGED_ATTENTION,
optimize_rope=True,
optimize_packed=True,
causal_mask_free_decoding=True,
)
elif model_cls is transformers.BertForQuestionAnswering:
return OptimizationConfig(
use_unsplit_packed=True,
use_rngd_gelu=True,
)
elif model_cls is transformers.LlamaForCausalLM:
# Llama MLPerf slice model
return OptimizationConfig(
attention_type=AttentionType.PAGED_ATTENTION,
optimize_rope=True,
optimize_packed=True,
causal_mask_free_decoding=True,
calculate_logit_only_for_last_token=True,
)
else:
raise NotImplementedError(f"Unsupported model architecture: {model_cls}")
def build_all_pipelines(
self,
model: ModelCreationInfo,
buckets_with_output_size: Sequence[BucketWithOutputLogitsSize],
devices: Sequence[Device],
backend: LLMBackend,
comp_supertask_kind: str,
one_supertask_per_device: bool,
use_blockwise_compile: bool,
do_decompositions_for_model_rewrite: bool,
kv_cache_dtype: Optional[torch.dtype],
sparse_select_version: str,
num_pipeline_builder_workers: int,
num_compile_workers: int,
embed_all_constants_into_graph: bool,
num_blocks_per_supertask: int,
num_blocks_per_pp_stage: Optional[Sequence[int]],
param_file_path: Optional[os.PathLike],
param_saved_format: str,
compiler_config_context: CompilerConfigContext,
cache_dir: Optional[os.PathLike],
cleanup: bool,
**kwargs,
) -> None:
# If backend using Pipeline is used, create directory for temporary files.
tmp_dir_path = None
if backend.is_parallelism_supported():
if cleanup:
self.tmp_dir = tempfile.TemporaryDirectory()
tmp_dir_path = Path(self.tmp_dir.name)
else:
tmp_dir_path = Path(tempfile.mkdtemp())
# Save model parameters when param file path is not given
# and pipeline should be constructed.
if param_file_path is None and backend.is_parallelism_supported():
if cache_dir is not None and model.is_hashable():
param_file_cache_dir = Path(cache_dir) / "param_files"
param_file_path = get_param_file_with_cache(model, param_file_cache_dir)
else:
assert isinstance(tmp_dir_path, Path)
param_file_path = get_param_file_with_cache(model, tmp_dir_path)
cache_dir = None if cache_dir is None else Path(cache_dir)
# For now, `PipelineParallelismMppp` supports all valid cases because only pipeline parallelism is needed to be expressed within one pipeline.
if num_blocks_per_pp_stage and "mppp" in kwargs:
logger.warning(
"`num_blocks_per_pp_stage` and custom `mppp` is given at the same time.`num_blocks_per_pp_stage` is ignored."
)
mppp = kwargs.pop("mppp", None) or PipelineParallelismMppp(num_blocks_per_pp_stage)
# Build Pipelines for first dp subgroup.
self.pipelines_with_meta = build_pipelines(
model,
buckets_with_output_size,
devices,
param_file_path,
cache_dir,
backend,
mppp,
SuperTaskKind.from_str(comp_supertask_kind),
one_supertask_per_device,
use_blockwise_compile,
do_decompositions_for_model_rewrite,
kv_cache_dtype,
self.generator_config.paged_attention_config,
sparse_select_version,
self.generator_config.kv_cache_sharing_across_beams_config,
tmp_dir_path,
self.model_metadata,
compiler_config_context,
num_pipeline_builder_workers,
num_compile_workers,
embed_all_constants_into_graph,
num_blocks_per_supertask,
self.is_generative_model,
param_saved_format,
**kwargs,
)
if len(self.pipelines_with_meta) == 0:
raise ValueError("No pipeline is generated")
def _save_engine_artifacts(
self,
path: Union[str, os.PathLike],
comp_supertask_kind: str,
devices: Sequence[Device],
):
import shutil
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
for idx, pipeline_with_metadata in enumerate(self.pipelines_with_meta):
pipeline = pipeline_with_metadata.pipeline
blobs = {}
param_files = copy.deepcopy(pipeline.param_files)
blob_kind = pipeline.get_blob_kind()
for id, blob in pipeline.blobs.items():
blobs[id] = blob
kind = blob_kind.get(id)
if kind == SuperTaskKind.FX:
with open(f"{path}/{id}.fx", "w") as fp:
fp.write(blob)
elif kind == SuperTaskKind.EDF:
with open(f"{path}/{id}.edf", "wb") as fp:
fp.write(blob.serialize()) # type: ignore[attr-defined]
else:
raise NotImplementedError(f"SuperTask [{kind}] is not supported to save")
pipeline.blobs[id] = None # type: ignore[assignment]
for param_idx, param_file in pipeline.param_files.items():
filename = os.path.basename(param_file.path)
new_path = Path(f"{path}/{filename}")
if not new_path.exists():
shutil.copyfile(param_file.path, new_path)
pipeline.param_files[param_idx].path = filename
pipeline.export(f"{path}/pipeline.{idx}.json")
pipeline.blobs = blobs
pipeline.param_files = param_files
# Save pipeline metadata
serialized_pipelines_metadata_list = [
pipeline_with_meta.metadata.model_dump()
for pipeline_with_meta in self.pipelines_with_meta
]
with open(f"{path}/pipelines_metadata.json", "w") as fp:
fp.write(json.dumps(serialized_pipelines_metadata_list))
self.generator_config.export(f"{path}/generator_config.json")
self.scheduler_config.export(f"{path}/scheduler_config.json")
with open(f"{path}/hf_config.json", "w") as fp:
json.dump(self.model_config, fp)
model_metadata_path = f"{path}/model_metadata.json"
with open(model_metadata_path, "w") as fp:
fp.write(self.model_metadata.model_dump_json())
model_rewriting_config_path = f"{path}/model_rewriting_config.json"
with open(model_rewriting_config_path, "w") as fp:
fp.write(self.model_rewriting_config.model_dump_json())
parallel_config_path = f"{path}/parallel_config.json"
with open(parallel_config_path, "w") as fp:
fp.write(self.parallel_config.model_dump_json())
other_config_path = f"{path}/other_config.json"
with open(other_config_path, "w") as fp:
fp.write(
json.dumps(
{
"comp_supertask_kind": comp_supertask_kind,
"devices": tuple(devices),
}
)
)
open(f"{path}/ready", "w").close()
def _verify_token_len_and_finalize_max_tokens(
self, sampling_params: SamplingParams, input: BatchEncoding
) -> None:
"""
If sampling_params.max_tokens is None, set it to the maximum allowed value.
If not, verify that max_tokens + len(prompt) is not larger than the maximum allowed value.
"""
input_ids = input.input_ids
if not isinstance(input_ids, list):
raise ValueError("input_ids must be a list of integers.")
if len(input_ids) > 0 and isinstance(input_ids[0], list):
max_input_seq_len = max(len(ids) for ids in input_ids)
else:
max_input_seq_len = len(input_ids)
if max_input_seq_len > self.prompt_max_seq_len:
# Similar format as OpenAI
raise ValueError(
f"This model's maximum input context length is {self.prompt_max_seq_len} tokens, however you requested {max_input_seq_len} tokens. Please reduce your prompt."
)
assert (
self.max_seq_len_to_capture is not None
), "Generative models must have max_seq_len_to_capture set."
if sampling_params.max_tokens is None:
sampling_params.max_tokens = self.max_seq_len_to_capture - max_input_seq_len
else:
if sampling_params.max_tokens + max_input_seq_len > self.max_seq_len_to_capture:
# Same error message as OpenAI's response
raise ValueError(
f"This model's maximum context length is {self.max_seq_len_to_capture} tokens, however you requested {sampling_params.max_tokens + max_input_seq_len} tokens ({max_input_seq_len} in your prompt; {sampling_params.max_tokens} for the completion). Please reduce your prompt; or completion length."
)
@staticmethod
def __verify_sampling_params_with_generator_config(
sampling_params: SamplingParams,
generator_config: GeneratorConfig,
):
if sampling_params.max_tokens is None:
raise ValueError("`sampling_params.max_tokens` must be specified at this point.")
if generator_config.kv_cache_sharing_across_beams_config is not None:
if not sampling_params.use_beam_search:
raise ValueError(
"`sampling_params.use_beam_search` is not consistent with generator config. The model was configured to use beam search, but `sampling_params.use_beam_search` is False."
)
if (
sampling_params.max_tokens
> generator_config.kv_cache_sharing_across_beams_config.max_new_tokens
):
raise ValueError(
"`sampling_params.max_tokens` is larger than `generator_config.kv_cache_sharing_across_beams_config.max_new_tokens`"
)
if (
sampling_params.best_of
!= generator_config.kv_cache_sharing_across_beams_config.beam_width
):
raise ValueError(
"`sampling_params.best_of` is different from beam width specified in `generator_config.kv_cache_sharing_across_beams_config.beam_width`."
)
[docs]
def generate(
self,
prompts: Union[str, List[str]],
sampling_params: SamplingParams = SamplingParams(),
prompt_token_ids: Optional[BatchEncoding] = None,
tokenizer_kwargs: Optional[Dict[str, Any]] = None,
) -> Union[RequestOutput, List[RequestOutput]]:
"""Generate texts from given prompts and sampling parameters.
Args:
prompts: The prompts to generate texts.
sampling_params: The sampling parameters for generating texts.
prompt_token_ids: Pre-tokenized prompt input as a `BatchEncoding` object.
If not provided, the prompt will be tokenized internally using the tokenizer.
tokenizer_kwargs: Additional keyword arguments passed to the tokenizer's
`encode` method, such as `{"use_special_tokens": True}`.
Returns:
A list of `RequestOutput` objects containing the generated
completions in the same order as the input prompts.
"""
if not self.is_generative_model:
raise ValueError("generate API can only be used for generative models.")
if prompt_token_ids is None:
if tokenizer_kwargs is None:
tokenizer_kwargs = {}
prompt_token_ids = encode_auto(self.tokenizer, prompts, **tokenizer_kwargs)
self._verify_token_len_and_finalize_max_tokens(sampling_params, prompt_token_ids)
LLM.__verify_sampling_params_with_generator_config(sampling_params, self.generator_config)
native_outputs = self.engine.generate(prompt_token_ids, sampling_params)
return self._generate_postprocess(native_outputs, prompts, prompt_token_ids)
[docs]
async def stream_generate(
self,
prompt: str,
sampling_params: SamplingParams = SamplingParams(),
prompt_token_ids: Optional[BatchEncoding] = None,
tokenizer_kwargs: Optional[Dict[str, Any]] = None,
is_demo: bool = False,
) -> AsyncGenerator[str, None]:
"""Generate texts from given prompt and sampling parameters.
Args:
prompt: The prompt to generate texts. Note that unlike `generate`,
this API supports only a single prompt.
sampling_params: The sampling parameters for generating texts.
prompt_token_ids: Pre-tokenized prompt input as a `BatchEncoding` object.
If not provided, the prompt will be tokenized internally using the tokenizer.
tokenizer_kwargs: Additional keyword arguments passed to the tokenizer's
`encode` method, such as `{"use_special_tokens": True}`.
Returns:
A stream of generated output tokens.
"""
if not self.is_generative_model:
raise ValueError("generate API can only be used for generative models.")
if not isinstance(prompt, str):
raise ValueError("prompt must be a single string.")
if prompt_token_ids is None:
if tokenizer_kwargs is None:
tokenizer_kwargs = {}
prompt_token_ids = encode_auto(self.tokenizer, prompt, **tokenizer_kwargs)
self._verify_token_len_and_finalize_max_tokens(sampling_params, prompt_token_ids)
LLM.__verify_sampling_params_with_generator_config(sampling_params, self.generator_config)
# FIXME: LLM.__init__() should take max_tokens to determine the maximum sequence length through bucket generations
# and use the config value to raise an error.
if is_demo and len(prompt_token_ids.input_ids) > 1024: # type: ignore
raise ValueError("The length of the prompt is larger than 1024 tokens")
# NOTE: type of engine.stream_generate() is AsyncGenerator[RequestOutput, None]
token_buffer = []
request_output: RequestOutput
async for request_output in self.engine.stream_generate(prompt_token_ids, sampling_params):
num_decode_trials = STREAMING_MAX_DECODE_TRIAL
for completion_output in request_output.outputs:
token_buffer.extend(completion_output.token_ids)
num_decode_trials = min(num_decode_trials, len(completion_output.token_ids))
if num_decode_trials == 0:
continue
for tokens_to_discard in range(num_decode_trials):
end_offset = len(token_buffer) - 1 - tokens_to_discard
new_text = self.tokenizer.decode(
token_buffer[: end_offset + 1], skip_special_tokens=True
)
if not new_text.endswith("�"):
break
else:
continue
token_buffer = token_buffer[end_offset + 1 :]
yield new_text
if token_buffer:
yield self.tokenizer.decode(token_buffer, skip_special_tokens=True)
def _generate_postprocess(
self,
native_outputs,
prompts: Union[str, List[str]],
prompt_token_ids: Union[List[int], List[List[int]]],
) -> Union[RequestOutput, List[RequestOutput]]:
skip_special_tokens = isinstance(prompts, list)
# Convert one prompt and multiple generated sequences into a RequestOutput
def convert(prompt: str, _prompt_token_ids: List[int], request_output):
outputs = [
CompletionOutput(
o.index,
self.tokenizer.decode(
o.token_ids, skip_special_tokens, clean_up_tokenization_spaces=True
),
o.token_ids,
o.finish_reason,
)
for o in request_output.outputs
]
return RequestOutput(
request_id=uuid.uuid4().__str__(),
prompt=prompt,
prompt_token_ids=_prompt_token_ids,
outputs=outputs,
finished=True,
)
if isinstance(native_outputs, list):
assert isinstance(prompts, list)
return [
convert(req[0], req[1], req[2])
for req in zip(prompts, prompt_token_ids.input_ids, native_outputs) # type: ignore
]
else:
assert isinstance(prompts, str)
return convert(prompts, prompt_token_ids.input_ids, native_outputs) # type: ignore
def bert_forward(
self,
prompts: Union[str, List[str]],
contexts: Union[str, List[str]],
) -> Union[RequestOutput, List[RequestOutput]]:
prompt_token_ids = encode_auto(self.tokenizer, prompts, text_pair=contexts)
native_outputs = self.engine.bert_forward(prompt_token_ids)
return self._generate_postprocess(native_outputs, prompts, prompt_token_ids)
def __del__(self):
# Remove tmp directory if exists.
tmp_dir = getattr(self, "tmp_dir", None)
if tmp_dir is not None:
tmp_dir.cleanup()
@staticmethod
def __get_gms_for_pipeline(
pipeline: Pipeline,
get_input_constants: bool = False,
) -> Union[
Tuple[GraphModule, ...], Tuple[Tuple[GraphModule, Tuple[Optional[torch.Tensor], ...]], ...]
]:
ret: List = []
gm_cache: Dict[Optional[DataBlobId], GraphModule] = {}
# Sort supertasks by id to guarantee consistent order.
sorted_supertasks = (
supertask
for _, supertask in sorted(pipeline.supertasks.items(), key=lambda x: int(x[0]))
)
for supertask in sorted_supertasks:
if not isinstance(supertask, CompSuperTask):
continue
if supertask.kind != SuperTaskKind.FX:
raise ValueError("Supertask is not FX graph supertask.")
param_load_cache: Dict[Any, Any] = {}
fake_mode = FakeTensorMode(allow_non_fake_inputs=True)
with fake_mode:
fake_example_inputs = tuple(
torch.zeros(
pipeline.tensors[input_].shape,
dtype=pipeline.tensors[input_].dtype.to_torch_dtype(),
)
for input_ in supertask.inputs
)
gm = gm_cache.get(supertask.data_blob, None)
if gm is None:
if supertask.data is not None:
data = supertask.data
else:
assert supertask.data_blob is not None
data = pipeline.blobs[supertask.data_blob]
gm = deserialize_gm(data)
# NOTE: This Shape propagation is required because tensor meta infomration is lost during serialization. We need to regenerate this.
ShapeProp(gm).propagate(*fake_example_inputs)
# preprocess gms for it to be compiled immediately
gm = preprocess(gm, fake_example_inputs)
if supertask.data_blob is not None:
gm_cache[supertask.data_blob] = cast(GraphModule, gm)
if get_input_constants:
# TODO: change this to share same tensor among slices.
def load_tensor(tensor_name) -> Optional[torch.Tensor]:
tensor_info = pipeline.tensors[tensor_name]
if isinstance(tensor_info, TensorInfo):
# If it's not an input constant tensor (i.e., input tensor not originated from constant tensor),
# just return None.
return None
else:
assert isinstance(tensor_info, ParamInfo)
param_value = tensor_info.value
param_file_info = pipeline.param_files[param_value.param_file]
return load_partial_param(
param_file_info.path,
param_value.name,
param_value.placements,
param_file_info.format,
cache=param_load_cache,
).contiguous()
example_input = tuple(load_tensor(input_name) for input_name in supertask.inputs)
ret.append((gm, example_input))
else:
ret.append(gm)
return tuple(ret)
def _get_splitted_gms(self, get_input_constants: bool = False) -> Dict[
str,
Union[
Tuple[GraphModule, ...],
Tuple[Tuple[GraphModule, Tuple[Optional[torch.Tensor], ...]], ...],
],
]:
"""Get sub GraphModules for each pipeline.
Returns:
Dict[str, Union[Tuple[GraphModule, ...], Tuple[Tuple[GraphModule, Tuple[Optional[torch.Tensor], ...]], ...],],]:
Dictionary whose key is the pipeline name and value is the tuple containing ``GraphModule``s (computation supertasks) and some additional information if necessary.
if ``get_input_constants==False``, each value is just a tuple of ``GraphModule``s in the pipeline.
Otherwise, each value is a tuple whose element is ``GraphModule`` in the pipeline and list of input constant tensors,
which were originally constant tensors, but converted to input. The list of input constant tensors has same length as corresponding ``GraphModule``'s number of inputs
with each element exactly corresponding to the input of the ``GraphModule`` with same index, but elements with original input tensor indexes are ``None``.
"""
if not (
isinstance(self.pipelines_with_meta, Sequence)
and isinstance(self.pipelines_with_meta[0], PipelineWithMetadata)
):
raise ValueError("get_splitted_gms is only supported for parallel backends")
return {
pipeline_with_meta.pipeline.name: LLM.__get_gms_for_pipeline(
pipeline_with_meta.pipeline, get_input_constants=get_input_constants
)
for pipeline_with_meta in self.pipelines_with_meta
}
@staticmethod
def __verify_kv_cache_dtype_with_qformat(
kv_cache_dtype: QDtype, qformat_path: os.PathLike, model_metadata: ModelMetadata
) -> None:
kv_cache_dtype_from_qformat = get_kv_cache_dtype_from_qformat(qformat_path)
if kv_cache_dtype != kv_cache_dtype_from_qformat:
raise ValueError(
f"kv_cache_dtype != qformat's kv_cache dtype: {kv_cache_dtype} != {kv_cache_dtype_from_qformat}"
)
@cached_property
def model_max_seq_len(self) -> int:
possible_keys = [
# OPT, LLaMA, BERT
"max_position_embeddings",
# GPT-2, GPT-J
"n_positions",
# MPT
"max_seq_len",
# ChatGLM2
"seq_length",
# Command-R
"model_max_length",
# Others
"max_sequence_length",
"max_seq_length",
"seq_len",
]
for attr_name in possible_keys:
if attr_name in self.model_config:
model_max_seq_len = self.model_config[attr_name]
break
else:
# If none of the keys were found in the config, use a default and
# log a warning.
default_max_len = 2048
model_max_seq_len = default_max_len
logger.warning(
"The model's config.json does not contain any of the following "
"keys to determine the original maximum length of the model: "
"%s. Assuming the model's maximum length is %d.",
possible_keys,
default_max_len,
)
return model_max_seq_len
def try_compiler_config_from_yaml(path: str):
if not os.path.exists(path):
raise FileNotFoundError(f"Compiler config must be given at: {path}")
with open(path, "r") as f:
return yaml.safe_load(f)
def load_artifact(
path: Union[str, os.PathLike],
*,
# Runtime Configuration
devices: Optional[Union[str, Sequence[Device]]] = None,
data_parallel_size: Optional[int] = None,
pipeline_parallel_size: Optional[int] = None,
num_blocks_per_pp_stage: Optional[Sequence[int]] = None,
bucket_config: Optional[BucketConfig] = None,
scheduler_config: Optional[SchedulerConfig] = None,
paged_attention_num_blocks: Optional[int] = None,
# Other Configuration
tokenizer: Optional[Union[PreTrainedTokenizer, PreTrainedTokenizerFast]] = None,
tokenizer_mode: TokenizerModeType = "auto",
seed: Optional[int] = None,
cache_dir: os.PathLike = CACHE_DIR,
**kwargs,
) -> LLM:
"""Instantiate LLM from saved artifacts without quantization and compilation.
Internally, this function calls :meth:`LLM.load_artifact`.
"""
kwargs.pop("backend")
return LLM.load_artifact(
path,
devices=devices,
data_parallel_size=data_parallel_size,
pipeline_parallel_size=pipeline_parallel_size,
num_blocks_per_pp_stage=num_blocks_per_pp_stage,
bucket_config=bucket_config,
scheduler_config=scheduler_config,
paged_attention_num_blocks=paged_attention_num_blocks,
tokenizer=tokenizer,
tokenizer_mode=tokenizer_mode,
seed=seed,
cache_dir=cache_dir,
**kwargs,
)