from collections import Counter
from contextlib import nullcontext
import json
import logging
import os
from pathlib import Path
from pprint import pformat
import shutil
import tempfile
import time
from typing import Any, Callable, Dict, List, Literal, Mapping, Optional, Sequence, Tuple, Union
import uuid
import warnings
from zipfile import ZipFile
from transformers import AutoConfig, PretrainedConfig
from furiosa_llm.optimum.model_configs import find_canonical_model_id
from furiosa_llm.optimum.modeling import (
_EXPORTED_MODEL_QCKPT,
_QFORMAT_YAML,
_QPARAM_NPY,
CANONICAL_MODEL_IDS,
FURIOSA_CONFIG_JSON,
_load_quantized_model_meta,
is_quantized_model_path,
)
from furiosa_llm.optimum.types import FuriosaConfig, ModelClass, ModelKind
from furiosa_llm.parallelize.export.tensor import (
_INDEX_TENSOR_NAME,
ParamfileFormat,
ParamFileMetadata,
)
from furiosa_llm.parallelize.mppp.config import Device
from furiosa_llm.parallelize.pipeline.types import ParamFileInfo
from furiosa_llm.tokenizer.tokenizer import get_tokenizer
from furiosa_llm.utils import get_list_with_no_dup_with_order_preserved
from ..device import NUM_PES_PER_NPU, get_device_mesh
from ..models import AttentionType, ModelMetadata, QuantizationConfig
from ..models.config_types import (
Bucket,
GeneratorConfig,
KvCacheSharingAcrossBeamsConfig,
ManualBucketConfig,
ModelRewritingConfig,
PagedAttentionConfig,
ParallelConfig,
)
from ..parallelize.compiler_config import CompilerConfigContext
from ..parallelize.model_creation_info import ModelCreationInfo
from ..parallelize.mppp.api import PipelineParallelismMppp
from ..parallelize.pipeline import Pipeline
from ..parallelize.pipeline.builder.api import _get_commit_id
from ..parallelize.pipeline.types import SuperTaskKind
from ..parallelize.trace import (
PARAM_FILE_CACHE_SUBDIR_NAME,
get_param_file_with_cache,
)
from .helper import (
build_pipelines,
get_buckets_with_output_logits_size,
prestep_for_remote_code_model,
)
from .types.latest import SCHEMA_VERSION, Artifact, ArtifactMetadata, ModelArtifact
logger = logging.getLogger(__name__)
# Default position id for padding
_POSITION_ID_PAD = 1
# Default param file name
_PARAM_FILE_NAME = "params.safetensors"
# Default index of the padding block when paged attention model is used.
DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX = 0
CACHE_DIR = Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "furiosa" / "llm"
BINARY_BUNDLE_ZIP_FILE_NAME = "binary_bundle"
def is_mlperf_optimized_model(canonical_model_id: str) -> bool:
return canonical_model_id in CANONICAL_MODEL_IDS
def get_tp_group_device(tp_size: int, start_idx: int) -> Device:
if tp_size <= NUM_PES_PER_NPU:
return Device(f"npu:{start_idx}:0-{tp_size-1}")
else:
assert tp_size % NUM_PES_PER_NPU == 0
return Device(",".join([f"npu:{start_idx + i}" for i in range(tp_size // NUM_PES_PER_NPU)]))
[docs]
class ArtifactBuilder:
"""The artifact builder to use in the Furiosa LLM.
Args:
model_id_or_path: The Huggingface model id or a local path. This corresponds to
pretrained_model_name_or_path in HuggingFace Transformers.
name: The name of the artifact to build.
tensor_parallel_size: The number of PEs for each tensor parallelism group. The default is 4.
pipeline_parallel_size : The pipeline parallel size that will be used when loading the model by default. The default value is 1.
prefill_buckets: Specify the bucket size for prefill
decode_buckets: Specify the bucket size for decode
max_seq_len_to_capture: Maximum sequence length covered by LLM engine. Sequence with larger context than this will not be covered.
If no bucket is explicitly specified, a single batch bucket with a context length of this value is created.
max_prompt_len: Maximum prompt sequence length covered by LLM engine. Prompt larger than this cannot be handled.
If not given, will be obtained from bucket and other configs.
num_hidden_layers: Number of hidden layers in the Transformer encoder.
seed_for_random_weight: The seed to initialize the random number generator
for creating random weight.
calculate_logit_only_for_last_token: Whether the model has last block slice optimization applied.
auto_bfloat16_cast: Whether to cast the model to bfloat16 automatically. This option is required when neither the model is trained with bfloat16 nor quantized.
quantize_artifact_path: Specifies the path where quantization artifacts
generated by the furiosa-model-compressor are saved.
compiler_config_overrides: Overrides for the compiler config.
This is a dictionary that includes the configuration for the compiler.
do_decompositions_for_model_rewrite: Whether to decompose some ops to describe various parallelism strategies
with mppp config. When the value is True, mppp config that matches with the decomposed FX graph should be given.
use_blockwise_compile: If True, each task will be compiled in the unit of transformer block,
and compilation result for transformer block is generated once and reused. The default is ``True``.
num_blocks_per_supertask: The number of transformer blocks that will be merged into one supertask. This option is valid
only when `use_blockwise_compile=True`. The default is 1.
num_blocks_per_pp_stage: The number of transformers blocks per each pipeline parallelism stage. If not given, transformer blocks will be
distributed equally.
embed_all_constants_into_graph: Whether to embed constant tensors into graph or make them as input of the graph and save them as separate files.
The default is False.
optimize_logit_shape: Add logit slice or removal operation in the graph for optimized performance.
kv_cache_sharing_across_beams_config: Configuration for sharing kv cache across beams. This argument must be given if and only if
the model is optimized to share kv cache across beams. If this argument is given, decode phase buckets with batch size of
``batch_size`` \* ``kv_cache_sharing_across_beams_config.beam_width`` will be created.
prefill_chunk_size: Chunk size used for chunked prefill. If the value is `None`, chunked prefill is not used.
paged_attention_block_size: The maximum number of tokens that can be stored in a single paged attention block. This argument must be given
if model uses paged attention.
trust_remote_code: Trust remote code when downloading the model and tokenizer from HuggingFace.
copies_from_model: Relative paths of files to copy from the model directory (e.g., LICENSE.txt)
copies_from_local: Relative paths of files to copy from the local directory (e.g., README.md)
bundle_binaries: If `True`, all compiled binaries will be bundled into a single archive file (binary_bundle.zip) in the output directory. The default is ``True``.
speculative_model : Speculative model for speculative decoding. Note that speculative decoding is an experimental feature and the build may be unstable.
num_speculative_tokens : The number of tokens that specualtive model will generate speculatively during each iteration of the decoding process.
Note that speculative decoding is an experimental feature and the build may be unstable.
"""
def __init__(
self,
model_id_or_path: Union[str, Path],
name: str = "",
*,
# Parallelize Config
tensor_parallel_size: int = 4,
pipeline_parallel_size: int = 1,
# Bucket Config
prefill_buckets: Sequence[Tuple[int, int]] = [],
decode_buckets: Sequence[Tuple[int, int]] = [],
max_seq_len_to_capture: int = 2048,
max_prompt_len: Optional[int] = None,
# Model Config
num_hidden_layers: Optional[int] = None,
seed_for_random_weight: Optional[int] = None,
calculate_logit_only_for_last_token: Optional[bool] = False,
# Quantize Config
auto_bfloat16_cast: Optional[bool] = None,
quantize_artifact_path: Optional[os.PathLike] = None,
# Compiler Config
compiler_config_overrides: Optional[Mapping] = None,
do_decompositions_for_model_rewrite: bool = False,
use_blockwise_compile: bool = True,
num_blocks_per_supertask: Union[
Union[int, Sequence[int]], Callable[[Bucket], Union[int, Sequence[int]]]
] = 1,
embed_all_constants_into_graph: bool = False,
optimize_logit_shape: bool = True,
kv_cache_sharing_across_beams_config: Optional[KvCacheSharingAcrossBeamsConfig] = None,
prefill_chunk_size: Optional[int] = None,
# PagedAttention Config
paged_attention_block_size: int = 1,
# Huggingface-related Config
trust_remote_code: Optional[bool] = None,
revision: Optional[str] = None,
copies_from_model: Optional[List[str]] = None,
copies_from_local: Optional[List[os.PathLike]] = None,
bundle_binaries: bool = True,
# Speculative-decoding Config
speculative_model: Optional["ArtifactBuilder"] = None,
num_speculative_tokens: Optional[int] = None,
_allow_empty_speculative_model: bool = False,
_embedding_layer_as_single_block: bool = False,
_enable_bf16_partial_sum_for_split: bool = True,
_use_2d_attention_masks: bool = False,
_merge_kv_cache_indices: bool = False,
**kwargs,
):
# TODO : remove following later
if kwargs.pop("preferred_pipeline_parallel_size", None):
raise ValueError(
"`preferred_pipeline_parallel_size` is depreacted. Use `pipeline_parallel_size` instead."
)
model_config = AutoConfig.from_pretrained(
model_id_or_path,
trust_remote_code=trust_remote_code,
revision=revision,
)
canonical_model_id = find_canonical_model_id(model_config)
if canonical_model_id is None:
raise ValueError(f"ArtifactBuilder doesn't support {type(model_config)} class")
# Constants default values
self.artifact_id = str(uuid.uuid4())
self.model_id_or_path = model_id_or_path
self.canonical_model_id = canonical_model_id
self.optimize_paged_attention_block_loading = True
self.sparse_select_version = "v1.5"
self.one_supertask_per_device = True
self.quantize_artifact_path = quantize_artifact_path
self.prefill_chunk_size = prefill_chunk_size
self.trust_remote_code = trust_remote_code
self.revision = revision
self.copies_from_model = copies_from_model
self.copies_from_local = copies_from_local
self.enable_bf16_partial_sum_for_split = _enable_bf16_partial_sum_for_split
self.use_2d_attention_masks = _use_2d_attention_masks
self.merge_kv_cache_indices = _merge_kv_cache_indices
self.bundle_binaries = bundle_binaries
if calculate_logit_only_for_last_token:
warnings.warn(
"calculate_logit_only_for_last_token is deprecated and will be removed in the future. Use `optimize_logit_shape` instead."
)
optimize_logit_shape = calculate_logit_only_for_last_token
# Pre-step for model configurations
if is_quantized_model_path(model_id_or_path):
self.is_from_quantized_model = True
self.model_path = (
Path(model_id_or_path)
if not isinstance(model_id_or_path, Path)
else model_id_or_path
)
self.model_metadata = get_model_metadata_from_quantized_model(
self.model_path, model_config, trust_remote_code=trust_remote_code
)
# Warnings for unsupported options
if seed_for_random_weight is not None:
logging.warning("Random weight model is not supported for quantized model.")
if auto_bfloat16_cast is not None:
logging.warning("auto_bfloat16_cast is not supported for quantized model.")
self.seed_for_random_weight = None
else:
self.is_from_quantized_model = False
# Presteps for model prepared by model id
self.model_metadata = get_model_metadata_from_model_id(
model_id=self.canonical_model_id,
model_id_or_path=self.model_id_or_path,
num_hidden_layers=num_hidden_layers,
quantize_artifact_path=quantize_artifact_path,
prefill_chunk_size=self.prefill_chunk_size,
trust_remote_code=trust_remote_code,
revision=self.revision,
auto_bfloat16_cast=auto_bfloat16_cast,
num_speculative_tokens=num_speculative_tokens,
use_2d_attention_masks=_use_2d_attention_masks,
merge_kv_cache_indices=_merge_kv_cache_indices,
)
self.seed_for_random_weight = seed_for_random_weight
# Check if the files to copy exist before building the model
self._validate_copies_from_local()
self._validate_copies_from_model()
self._name = name
# Followings are dummy specifications for parallel size and device configuration, required for compilation.
# These values are expected to be provided by the user when loading artifacts via `LLM.load_artifact`.
self.tensor_parallel_size = tensor_parallel_size
self.pipeline_parallel_size = pipeline_parallel_size
self.dummy_data_parallel_size = 1
num_chip_per_tp_group = (self.tensor_parallel_size - 1) // NUM_PES_PER_NPU + 1
self.dummy_normalized_mesh = [
[dev for tp_group in pp_tp_group for dev in tp_group]
for pp_tp_group in get_device_mesh(
[
get_tp_group_device(self.tensor_parallel_size, i)
for i in range(
0,
self.dummy_data_parallel_size
* self.pipeline_parallel_size
* num_chip_per_tp_group,
num_chip_per_tp_group,
)
],
tensor_parallel_size,
self.pipeline_parallel_size,
self.dummy_data_parallel_size,
)
]
self.max_seq_len_to_capture = max_seq_len_to_capture
self.max_prompt_len = (
max_prompt_len or max_seq_len_to_capture if prefill_chunk_size else None
)
# Bucket Configuration
if not prefill_buckets and not decode_buckets:
self.prefill_buckets = [(1, max_seq_len_to_capture)]
self.decode_buckets = (
[(1, max_seq_len_to_capture)] if self.model_metadata.is_generative_model else []
)
else:
duplicate_prefill_buckets = [
bucket for bucket, cnt in Counter(prefill_buckets).items() if cnt > 1
]
duplicate_decode_buckets = [
bucket for bucket, cnt in Counter(decode_buckets).items() if cnt > 1
]
if duplicate_prefill_buckets:
logging.warning(
f"Duplicate prefill buckets are found: {duplicate_prefill_buckets}. "
)
if duplicate_decode_buckets:
logging.warning(f"Duplicate decode buckets are found: {duplicate_decode_buckets}. ")
self.prefill_buckets = get_list_with_no_dup_with_order_preserved(prefill_buckets)
self.decode_buckets = get_list_with_no_dup_with_order_preserved(decode_buckets)
self.compiler_config_overrides = compiler_config_overrides
self.model_rewriting_config = ModelRewritingConfig(
do_decompositions_for_model_rewrite=do_decompositions_for_model_rewrite,
use_blockwise_compile=use_blockwise_compile,
embedding_layer_as_single_block=_embedding_layer_as_single_block,
embed_all_constants_into_graph=embed_all_constants_into_graph,
optimize_logit_shape=optimize_logit_shape,
)
self.num_blocks_per_supertask = num_blocks_per_supertask
self.kv_cache_sharing_across_beams_config = kv_cache_sharing_across_beams_config
if self.model_metadata.attention_type is AttentionType.PAGED_ATTENTION:
if paged_attention_block_size != 1:
raise NotImplementedError(
"Currently, only paged attention with block_size=1 is supported."
)
padding_block_idx = (
DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX
if self.optimize_paged_attention_block_loading
else None
)
self.paged_attention_config: Optional[PagedAttentionConfig] = PagedAttentionConfig(
paged_attention_block_size, padding_block_idx
)
else:
self.paged_attention_config = None
self.num_speculative_tokens = num_speculative_tokens
self.speculative_model_artifact_builder = speculative_model
self._allow_empty_speculative_model = _allow_empty_speculative_model
if self.num_speculative_tokens:
if not (self.speculative_model_artifact_builder or self._allow_empty_speculative_model):
raise ValueError(
"`num_speculative_tokens` and `speculative_model_artifact_builder` must be presented at the same time."
)
@property
def model_id(self) -> str:
if self.is_from_quantized_model:
furiosa_config_file = self.model_path / FURIOSA_CONFIG_JSON
furiosa_config = json.loads(furiosa_config_file.read_text())
return furiosa_config['model_id']
else:
return str(self.model_id_or_path)
@property
def name(self) -> str:
if self._name:
return self._name
return self.model_id
@property
def quant_ckpt_file_path(self):
if self.is_from_quantized_model:
assert self.model_path
return self.model_path / _EXPORTED_MODEL_QCKPT
if self.quantize_artifact_path:
quant_ckpt_file_path = Path(f"{self.quantize_artifact_path}/exported_model.qckpt")
return quant_ckpt_file_path if os.path.exists(quant_ckpt_file_path) else None
return None
@property
def qformat_path(self) -> Union[None, Path]:
if self.is_from_quantized_model:
assert self.model_path
return self.model_path / _QFORMAT_YAML
if self.quantize_artifact_path:
qformat_path = Path(f"{self.quantize_artifact_path}/qformat.yaml")
if os.path.exists(qformat_path):
return qformat_path
else:
raise ValueError(
"The quantize_artifact_path is specified, but the qformat.yaml file does not exist."
)
return None
@property
def qparam_path(self) -> Union[None, Path]:
if self.is_from_quantized_model:
assert self.model_path
return self.model_path / _QPARAM_NPY
if self.quantize_artifact_path:
qparam_path = Path(f"{self.quantize_artifact_path}/qparam.npy")
if os.path.exists(qparam_path):
return qparam_path
else:
raise ValueError(
"The quantize_artifact_path is specified, but the qparam.npy file does not exist."
)
return None
def _validate_copies_from_local(self):
"""Check if the files exist in the local directory."""
if self.copies_from_local is None:
return
for file_copy in self.copies_from_local:
if not os.path.exists(file_copy):
raise ValueError(
f"The file {file_copy} of copies_from_local does not exist in the local directory."
)
def _validate_copies_from_model(self):
"""Check if the files are downloadable from the Hugging Face Hub."""
from huggingface_hub import hf_hub_download
if self.copies_from_model is None:
return
for file_copy in self.copies_from_model:
try:
_ = hf_hub_download(self.model_metadata.pretrained_id, file_copy)
except Exception as e:
raise ValueError(
f"Fail to download {file_copy} from Hugging Face Hub's repository {self.model_metadata.pretrained_id}."
) from e
def _build_model_artifact(
self,
*,
num_pipeline_builder_workers: int = 1,
num_compile_workers: int = 1,
cache_dir: Optional[os.PathLike] = CACHE_DIR,
param_file_path: Optional[Union[os.PathLike, str]] = None,
param_saved_format: Literal["safetensors", "pt"] = "safetensors",
param_file_max_shard_size: Optional[Union[str, int]] = "5GB",
_cleanup: bool = True,
**kwargs,
) -> Tuple[ModelArtifact, List[Pipeline]]:
model_config = self.model_metadata.config
# Please refer to an example at https://huggingface.co/docs/transformers/en/main_classes/text_generation#transformers.GenerationMixin.greedy_search.example
# Some models like GPT-2 may not have pad_token_id. BTW, when we run a batch of sequence generations,
# We must need pad_token_id to fill the batch with pad. With Hugging Face Transformers,
# users should handle this issue. Our goal is to provide a better useability for users.
# We handle this issue within LLM class.
model_config.pad_token_id = model_config.eos_token_id
kv_cache_dtype = self.model_metadata.kv_cache_dtype
original_model_type = self.model_metadata.get_optimized_cls()
original_model_name = f"{original_model_type.__module__}.{original_model_type.__name__}"
(
prefill_buckets_with_output_size,
decode_buckets_with_output_size,
other_buckets_with_output_size,
) = get_buckets_with_output_logits_size(
self.model_metadata,
ManualBucketConfig(
prefill_buckets=self.prefill_buckets,
decode_buckets=self.decode_buckets,
),
self.max_prompt_len if self.max_prompt_len else self.max_seq_len_to_capture,
self.max_seq_len_to_capture,
num_speculative_tokens=self.num_speculative_tokens,
prefill_chunk_size=self.prefill_chunk_size,
optimize_bucket_output_logits_size=self.model_rewriting_config.optimize_logit_shape,
)
prefill_buckets = [
bucket_with_output_size.bucket
for bucket_with_output_size in prefill_buckets_with_output_size
]
decode_buckets = [
bucket_with_output_size.bucket
for bucket_with_output_size in decode_buckets_with_output_size
]
other_buckets = [
bucket_with_output_size.bucket
for bucket_with_output_size in other_buckets_with_output_size
]
packing_type: Literal["IDENTITY"] = (
"IDENTITY" # TODO: enum PackingType has `IDENTITY` and `GREEDY`
)
generator_config = GeneratorConfig(
_POSITION_ID_PAD,
get_list_with_no_dup_with_order_preserved(
(*prefill_buckets, *decode_buckets, *other_buckets)
),
original_model_name,
self.paged_attention_config,
packing_type,
self.kv_cache_sharing_across_beams_config,
self.num_speculative_tokens,
None,
)
parallel_config = ParallelConfig(
tensor_parallel_size=self.tensor_parallel_size,
pipeline_parallel_size=self.pipeline_parallel_size,
)
model_creation_info = ModelCreationInfo(
self.model_metadata,
False if self.seed_for_random_weight is None else True,
seed=self.seed_for_random_weight,
qformat_path=self.qformat_path,
qparam_path=self.qparam_path,
quant_ckpt_file_path=self.quant_ckpt_file_path,
)
beam_size_or_none = (
None
if self.kv_cache_sharing_across_beams_config is None
else self.kv_cache_sharing_across_beams_config.beam_width
)
compiler_config_context = CompilerConfigContext(
model_metadata=self.model_metadata,
beam_size=beam_size_or_none,
compiler_config_overrides=self.compiler_config_overrides,
enable_bf16_partial_sum_for_split=self.enable_bf16_partial_sum_for_split,
embedding_as_single_block=self.model_rewriting_config.embedding_layer_as_single_block,
)
if _cleanup:
self.tmp_dir = tempfile.TemporaryDirectory()
tmp_dir_path = Path(self.tmp_dir.name)
else:
tmp_dir_path = Path(tempfile.mkdtemp())
if param_file_path:
param_file_metadata = ParamFileMetadata.load(
param_file_path, ParamfileFormat.from_str(param_saved_format)
)
else:
if cache_dir and model_creation_info.is_hashable():
param_file_cache_dir = Path(cache_dir) / PARAM_FILE_CACHE_SUBDIR_NAME
param_file_metadata = get_param_file_with_cache(
model_creation_info,
param_file_cache_dir,
max_shard_size=param_file_max_shard_size,
)
else:
assert isinstance(tmp_dir_path, Path)
param_file_metadata = get_param_file_with_cache(
model_creation_info, tmp_dir_path, max_shard_size=param_file_max_shard_size
)
cache_dir = None if cache_dir is None else Path(cache_dir)
# For now, `PipelineParallelismMppp` supports all valid cases because only pipeline parallelism is needed to be expressed within one pipeline.
# TODO: Make the mppp configuration specific to the speculative_model.
# for e.g., consider introducing a keyword argument (e.g., "speculative_mppp") for this purpose
mppp = kwargs.pop("mppp", None) or PipelineParallelismMppp()
logger.info(
f"Buckets with output sizes: {pformat([*prefill_buckets_with_output_size, *decode_buckets_with_output_size, *other_buckets_with_output_size])}"
)
if self.trust_remote_code:
prestep_for_remote_code_model(self.model_metadata, num_pipeline_builder_workers)
pipelines_with_metadata = build_pipelines(
model_creation_info,
[
*prefill_buckets_with_output_size,
*decode_buckets_with_output_size,
*other_buckets_with_output_size,
],
self.dummy_normalized_mesh[0],
param_file_metadata,
cache_dir,
mppp,
SuperTaskKind.EDF,
self.one_supertask_per_device,
self.model_rewriting_config.use_blockwise_compile,
self.model_rewriting_config.embedding_layer_as_single_block,
self.model_rewriting_config.do_decompositions_for_model_rewrite,
kv_cache_dtype.to_torch_dtype() if kv_cache_dtype else None,
generator_config.paged_attention_config,
self.sparse_select_version,
generator_config.kv_cache_sharing_across_beams_config,
tmp_dir_path,
self.model_metadata,
compiler_config_context,
num_pipeline_builder_workers,
num_compile_workers,
self.model_rewriting_config.embed_all_constants_into_graph,
self.num_blocks_per_supertask,
self.model_metadata.is_generative_model,
param_saved_format,
**kwargs,
)
if len(pipelines_with_metadata) == 0:
raise ValueError("No pipeline is generated")
pipeline_metadata_list = [metadata for _, metadata in pipelines_with_metadata]
pipelines = [pipeline for pipeline, _ in pipelines_with_metadata]
# Instead of properly casting pipelines into ModelArtifact.pipelines,
# this function returns a Sequence[Pipeline] as one of tuple element.
# as these pipelines will later be passed to __save_artifacts.
return (
ModelArtifact(
generator_config=generator_config,
hf_config=self.model_metadata.config.to_dict(),
model_metadata=self.model_metadata,
model_rewriting_config=self.model_rewriting_config,
parallel_config=parallel_config,
pipeline_metadata_list=pipeline_metadata_list,
max_prompt_len=self.max_prompt_len,
),
pipelines,
)
[docs]
def build(
self,
save_dir: Union[str, os.PathLike],
*,
num_pipeline_builder_workers: int = 1,
num_compile_workers: int = 1,
cache_dir: Optional[os.PathLike] = CACHE_DIR,
param_file_path: Optional[os.PathLike] = None,
param_saved_format: Literal["safetensors", "pt"] = "safetensors",
param_file_max_shard_size: Optional[Union[str, int]] = "5GB",
_cleanup: bool = True,
**kwargs,
):
"""Build the artifacts for given model configurations.
Args:
save_dir: The path to save the artifacts. With artifacts, you can create ``LLM`` without quantizing or compiling the model again.
num_pipeline_builder_workers: The number of workers used for building pipelines (except for compilation). The default is 1 (no parallelism).
Setting this value larger than 1 reduces pipeline building time, especially for large models, but requires much more memory.
num_compile_workers: The number of workers used for compilation. The default is 1 (no parallelism).
cache_dir: The cache directory for all generated files for this LLM instance.
When its value is ``None``, caching is disabled. The default is "$HOME/.cache/furiosa/llm".
param_file_path: The path to the parameter file to use for pipeline generation.
If not specified, the parameters will be saved in a temporary file which will be
deleted when ``LLM`` is destroyed.
param_saved_format: The format of the parameter file. Only possible value is "safetensors" now.
The default is "safetensors".
param_file_max_shard_size: The maximum size of single parameter file. Parameter file will be
split into smaller files to be less than this size. The default is "5GB".
"""
import furiosa.native_compiler
metadata = ArtifactMetadata(
artifact_id=self.artifact_id,
name=self.name,
timestamp=int(time.time()),
furiosa_llm_version=_get_commit_id(),
furiosa_compiler_version=furiosa.native_compiler.compiler_git_short_hash(),
)
# Make sure the model local, get a model hash, and set it to ModelMetadata
self.model_metadata.ensure_model_and_update_weight_hash()
target_model_artifact, target_model_pipelines = self._build_model_artifact(
num_pipeline_builder_workers=num_pipeline_builder_workers,
num_compile_workers=num_compile_workers,
cache_dir=cache_dir,
param_file_path=param_file_path,
param_saved_format=param_saved_format,
_cleanup=_cleanup,
param_file_max_shard_size=param_file_max_shard_size,
**kwargs,
)
speculative_model_artifact = None
speculative_model_pipelines: List[Pipeline] = []
if self.speculative_model_artifact_builder:
# Make sure the model local, get a model hash, and set it to ModelMetadata
self.speculative_model_artifact_builder.model_metadata.ensure_model_and_update_weight_hash()
speculative_model_artifact, speculative_model_pipelines = (
self.speculative_model_artifact_builder._build_model_artifact(
num_pipeline_builder_workers=num_pipeline_builder_workers,
num_compile_workers=num_compile_workers,
cache_dir=cache_dir,
param_file_path=param_file_path,
param_saved_format=param_saved_format,
param_file_max_shard_size=param_file_max_shard_size,
_cleanup=_cleanup,
)
)
artifact = Artifact(
metadata=metadata,
model=target_model_artifact,
speculative_model=speculative_model_artifact,
prefill_chunk_size=self.prefill_chunk_size,
version=SCHEMA_VERSION,
)
ArtifactBuilder.__save_artifacts(
save_dir,
artifact,
target_model_pipelines,
speculative_model_pipelines,
self.copies_from_model,
self.copies_from_local,
self.bundle_binaries,
)
@staticmethod
def __preprocess_for_pipeline_save(
pipelines: Sequence[Pipeline],
path: Union[str, os.PathLike],
bundle_binaries: bool = True,
) -> List[Dict[str, Any]]:
from furiosa.native_compiler import CompiledGraph
def save_blob_to_zip(zip_file: ZipFile, file_name: str, blob: Any) -> None:
zip_file.writestr(file_name, blob)
def save_blob_to_dir(base_path: str, file_name: str, blob: Any) -> None:
mode = "w" if isinstance(blob, str) else "wb"
with open(os.path.join(base_path, file_name), mode) as f:
f.write(blob)
def copy_param_file(
param_file_info: ParamFileInfo, base_path: Union[str, os.PathLike]
) -> None:
param_file_path = Path(param_file_info.path)
parent_path = os.fspath(param_file_path.parent)
if _INDEX_TENSOR_NAME in os.listdir(parent_path):
# This param file is a shard in saftensors file set.
# Copy whole directory
new_parent_path = f"{base_path}/{os.path.basename(parent_path)}"
if not os.path.exists(new_parent_path):
shutil.copytree(parent_path, new_parent_path)
new_file_path = os.path.join(new_parent_path, os.path.basename(param_file_path))
else:
new_file_path = f"{base_path}/{os.path.basename(param_file_path)}"
if not os.path.exists(new_file_path):
shutil.copy(param_file_path, new_file_path)
param_file_info.path = os.path.relpath(new_file_path, base_path)
processed_pipelines = []
zip_path = os.path.join(path, f"{BINARY_BUNDLE_ZIP_FILE_NAME}.zip")
save_ftn = save_blob_to_zip if bundle_binaries else save_blob_to_dir
# NOTE: Each EDF file can be several gigabytes in size, so we should avoid
# unnecessarily storing EDF blob data elsewhere in the program (e.g., in a temporary dictionary).
zip_mode = "a" if os.path.exists(zip_path) else "w"
with ZipFile(zip_path, mode=zip_mode) if bundle_binaries else nullcontext() as zip_file: # type: ignore[call-overload]
save_target = zip_file if bundle_binaries else path
for pipeline in pipelines:
for id, blob in pipeline.blobs.items():
kind = pipeline.get_blob_kind().get(id)
if kind == SuperTaskKind.FX:
assert isinstance(blob, str)
file_name = f"{id}.fx"
data = blob
elif kind == SuperTaskKind.EDF:
assert isinstance(blob, CompiledGraph)
file_name = f"{id}.edf"
data = blob.serialize() # type: ignore[assignment]
else:
raise NotImplementedError(f"SuperTask [{kind}] is not supported to save")
save_ftn(save_target, file_name, data) # type: ignore[arg-type]
pipeline.blobs[id] = None # type: ignore[assignment]
for param_file_info in pipeline.param_files.values():
copy_param_file(param_file_info, path)
processed_pipelines.append(json.loads(pipeline.to_json()))
return processed_pipelines
@staticmethod
def __save_artifacts(
path: Union[str, os.PathLike],
artifact: Artifact,
target_model_pipelines: Sequence[Pipeline],
speculative_model_pipelines: Sequence[Pipeline],
copies_from_model: Optional[Sequence[str]],
copies_from_local: Optional[Sequence[os.PathLike]],
bundle_binaries: bool = True,
):
import shutil
from huggingface_hub import hf_hub_download
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
artifact.model = artifact.model.model_copy(
update={
"pipelines": ArtifactBuilder.__preprocess_for_pipeline_save(
target_model_pipelines, path, bundle_binaries
),
},
)
if artifact.speculative_model:
artifact.speculative_model = artifact.speculative_model.model_copy(
update={
"pipelines": ArtifactBuilder.__preprocess_for_pipeline_save(
speculative_model_pipelines, path, bundle_binaries
),
},
)
artifact.export(f"{path}/artifact.json")
model_metadata = artifact.model.model_metadata
# Save Transformers configs to make an artifact more compatible with Hugging Face Transformers
tokenizer = get_tokenizer(model_metadata._model_id_or_path)
tokenizer.save_pretrained(path)
hf_config = model_metadata.config
hf_config.save_pretrained(path)
# Usually, this copies important files like README.md, LICENSE.txt, etc.
# So, we should fail if copying fails.
if copies_from_model is not None:
for hub_file in copies_from_model:
cache_path = hf_hub_download(model_metadata.pretrained_id, hub_file)
shutil.copy(cache_path, path)
if copies_from_local is not None:
for local_file in copies_from_local:
shutil.copy(local_file, path)
# Save furiosa_config.json
furiosa_config = FuriosaConfig(
model_id=model_metadata.pretrained_id,
model_kinds=[ModelKind.ARTIFACT],
model_class=ModelClass.from_class(model_metadata.get_optimized_cls()),
llm_config=model_metadata.llm_config,
)
furiosa_config_json_path = path / FURIOSA_CONFIG_JSON
furiosa_config.export(furiosa_config_json_path)
def get_model_metadata_from_quantized_model(
model_path: Union[str, Path], config: PretrainedConfig, trust_remote_code: Optional[bool]
) -> ModelMetadata:
model_path = Path(model_path) if not isinstance(model_path, Path) else model_path
(furiosa_config, _, _, _) = _load_quantized_model_meta(model_path)
model_metadata = ModelMetadata(
pretrained_id=furiosa_config.model_id,
llm_config=furiosa_config.llm_config,
trust_remote_code=trust_remote_code,
model_id_or_path=model_path,
)
model_metadata = model_metadata.with_hf_configs(config.to_diff_dict())
assert model_metadata.is_generative_model
return model_metadata
def get_model_metadata_from_model_id(
model_id: str,
model_id_or_path: Optional[Union[str, Path]] = None,
num_hidden_layers: Optional[int] = None,
quantize_artifact_path: Optional[os.PathLike] = None,
prefill_chunk_size: Optional[int] = None,
trust_remote_code: Optional[bool] = None,
revision: Optional[str] = None,
auto_bfloat16_cast: Optional[bool] = None,
num_speculative_tokens: Optional[int] = None,
use_2d_attention_masks: bool = False,
merge_kv_cache_indices: bool = False,
) -> ModelMetadata:
model_metadata = (
ModelMetadata.init_with_mlperf_optim_options(
model_id,
model_id_or_path=model_id_or_path,
trust_remote_code=trust_remote_code,
revision=revision,
)
if is_mlperf_optimized_model(model_id)
else ModelMetadata(
model_id,
model_id_or_path=model_id_or_path,
trust_remote_code=trust_remote_code,
revision=revision,
)
)
if auto_bfloat16_cast:
model_metadata = model_metadata.with_auto_bfloat16_cast()
# Override model config, optimization config
if num_hidden_layers:
model_metadata = model_metadata.with_num_layers(num_hidden_layers)
if prefill_chunk_size or num_speculative_tokens:
# For chunked prefill, model optimized for spec dec is needed, which can accept decode buckets with inputs_ids_len > 1.
model_metadata = model_metadata.with_optimizations("optimized_for_speculative_decoding")
if use_2d_attention_masks:
# 2d mask optimization always goes with speculative decoding optimization.
model_metadata = model_metadata.with_optimizations(
{
"optimized_for_speculative_decoding": True,
"use_2d_masks": True,
}
)
if merge_kv_cache_indices:
# KV cache indices merge optimization always goes with speculative decoding optimization.
model_metadata = model_metadata.with_optimizations(
{
"optimized_for_speculative_decoding": True,
"merged_kv_indices": True,
}
)
## Quantization Configuration
if quantize_artifact_path:
# model-compressor specific paths
qformat_path = Path(f"{quantize_artifact_path}/qformat.yaml")
qparam_path = Path(f"{quantize_artifact_path}/qparam.npy")
if os.path.exists(qformat_path):
qformat_path = qformat_path
else:
raise ValueError(
f"The quantize_artifact_path {quantize_artifact_path} is specified, but the qformat.yaml file does not exist."
)
if os.path.exists(qparam_path):
qparam_path = qparam_path
else:
raise ValueError(
"The quantize_artifact_path is specified, but the qparam.npy file does not exist."
)
quantization_config = QuantizationConfig.from_qformat(qformat_path)
model_metadata = model_metadata.with_quantization_config(quantization_config)
return model_metadata