Source code for furiosa_llm.artifact.builder

from collections import Counter
from contextlib import nullcontext
import json
import logging
import os
from pathlib import Path
from pprint import pformat
import shutil
import tempfile
import time
from typing import Any, Callable, Dict, List, Literal, Mapping, Optional, Sequence, Tuple, Union
import uuid
import warnings
from zipfile import ZipFile

from transformers import AutoConfig, PretrainedConfig

from furiosa_llm.optimum.model_configs import find_canonical_model_id
from furiosa_llm.optimum.modeling import (
    _EXPORTED_MODEL_QCKPT,
    _QFORMAT_YAML,
    _QPARAM_NPY,
    CANONICAL_MODEL_IDS,
    FURIOSA_CONFIG_JSON,
    _load_quantized_model_meta,
    is_quantized_model_path,
)
from furiosa_llm.optimum.types import FuriosaConfig, ModelClass, ModelKind
from furiosa_llm.parallelize.export.tensor import (
    _INDEX_TENSOR_NAME,
    ParamfileFormat,
    ParamFileMetadata,
)
from furiosa_llm.parallelize.mppp.config import Device
from furiosa_llm.parallelize.pipeline.types import ParamFileInfo
from furiosa_llm.tokenizer.tokenizer import get_tokenizer
from furiosa_llm.utils import get_list_with_no_dup_with_order_preserved

from ..device import NUM_PES_PER_NPU, get_device_mesh
from ..models import AttentionType, ModelMetadata, QuantizationConfig
from ..models.config_types import (
    Bucket,
    GeneratorConfig,
    KvCacheSharingAcrossBeamsConfig,
    ManualBucketConfig,
    ModelRewritingConfig,
    PagedAttentionConfig,
    ParallelConfig,
)
from ..parallelize.compiler_config import CompilerConfigContext
from ..parallelize.model_creation_info import ModelCreationInfo
from ..parallelize.mppp.api import PipelineParallelismMppp
from ..parallelize.pipeline import Pipeline
from ..parallelize.pipeline.builder.api import _get_commit_id
from ..parallelize.pipeline.types import SuperTaskKind
from ..parallelize.trace import (
    PARAM_FILE_CACHE_SUBDIR_NAME,
    get_param_file_with_cache,
)
from .helper import (
    build_pipelines,
    get_buckets_with_output_logits_size,
    prestep_for_remote_code_model,
)
from .types.latest import SCHEMA_VERSION, Artifact, ArtifactMetadata, ModelArtifact

logger = logging.getLogger(__name__)

# Default position id for padding
_POSITION_ID_PAD = 1

# Default param file name
_PARAM_FILE_NAME = "params.safetensors"

# Default index of the padding block when paged attention model is used.
DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX = 0

CACHE_DIR = Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "furiosa" / "llm"

BINARY_BUNDLE_ZIP_FILE_NAME = "binary_bundle"


def is_mlperf_optimized_model(canonical_model_id: str) -> bool:
    return canonical_model_id in CANONICAL_MODEL_IDS


def get_tp_group_device(tp_size: int, start_idx: int) -> Device:
    if tp_size <= NUM_PES_PER_NPU:
        return Device(f"npu:{start_idx}:0-{tp_size-1}")
    else:
        assert tp_size % NUM_PES_PER_NPU == 0
        return Device(",".join([f"npu:{start_idx + i}" for i in range(tp_size // NUM_PES_PER_NPU)]))


[docs] class ArtifactBuilder: """The artifact builder to use in the Furiosa LLM. Args: model_id_or_path: The Huggingface model id or a local path. This corresponds to pretrained_model_name_or_path in HuggingFace Transformers. name: The name of the artifact to build. tensor_parallel_size: The number of PEs for each tensor parallelism group. The default is 4. pipeline_parallel_size : The pipeline parallel size that will be used when loading the model by default. The default value is 1. prefill_buckets: Specify the bucket size for prefill decode_buckets: Specify the bucket size for decode max_seq_len_to_capture: Maximum sequence length covered by LLM engine. Sequence with larger context than this will not be covered. If no bucket is explicitly specified, a single batch bucket with a context length of this value is created. max_prompt_len: Maximum prompt sequence length covered by LLM engine. Prompt larger than this cannot be handled. If not given, will be obtained from bucket and other configs. num_hidden_layers: Number of hidden layers in the Transformer encoder. seed_for_random_weight: The seed to initialize the random number generator for creating random weight. calculate_logit_only_for_last_token: Whether the model has last block slice optimization applied. auto_bfloat16_cast: Whether to cast the model to bfloat16 automatically. This option is required when neither the model is trained with bfloat16 nor quantized. quantize_artifact_path: Specifies the path where quantization artifacts generated by the furiosa-model-compressor are saved. compiler_config_overrides: Overrides for the compiler config. This is a dictionary that includes the configuration for the compiler. do_decompositions_for_model_rewrite: Whether to decompose some ops to describe various parallelism strategies with mppp config. When the value is True, mppp config that matches with the decomposed FX graph should be given. use_blockwise_compile: If True, each task will be compiled in the unit of transformer block, and compilation result for transformer block is generated once and reused. The default is ``True``. num_blocks_per_supertask: The number of transformer blocks that will be merged into one supertask. This option is valid only when `use_blockwise_compile=True`. The default is 1. num_blocks_per_pp_stage: The number of transformers blocks per each pipeline parallelism stage. If not given, transformer blocks will be distributed equally. embed_all_constants_into_graph: Whether to embed constant tensors into graph or make them as input of the graph and save them as separate files. The default is False. optimize_logit_shape: Add logit slice or removal operation in the graph for optimized performance. kv_cache_sharing_across_beams_config: Configuration for sharing kv cache across beams. This argument must be given if and only if the model is optimized to share kv cache across beams. If this argument is given, decode phase buckets with batch size of ``batch_size`` \* ``kv_cache_sharing_across_beams_config.beam_width`` will be created. prefill_chunk_size: Chunk size used for chunked prefill. If the value is `None`, chunked prefill is not used. paged_attention_block_size: The maximum number of tokens that can be stored in a single paged attention block. This argument must be given if model uses paged attention. trust_remote_code: Trust remote code when downloading the model and tokenizer from HuggingFace. copies_from_model: Relative paths of files to copy from the model directory (e.g., LICENSE.txt) copies_from_local: Relative paths of files to copy from the local directory (e.g., README.md) bundle_binaries: If `True`, all compiled binaries will be bundled into a single archive file (binary_bundle.zip) in the output directory. The default is ``True``. speculative_model : Speculative model for speculative decoding. Note that speculative decoding is an experimental feature and the build may be unstable. num_speculative_tokens : The number of tokens that specualtive model will generate speculatively during each iteration of the decoding process. Note that speculative decoding is an experimental feature and the build may be unstable. """ def __init__( self, model_id_or_path: Union[str, Path], name: str = "", *, # Parallelize Config tensor_parallel_size: int = 4, pipeline_parallel_size: int = 1, # Bucket Config prefill_buckets: Sequence[Tuple[int, int]] = [], decode_buckets: Sequence[Tuple[int, int]] = [], max_seq_len_to_capture: int = 2048, max_prompt_len: Optional[int] = None, # Model Config num_hidden_layers: Optional[int] = None, seed_for_random_weight: Optional[int] = None, calculate_logit_only_for_last_token: Optional[bool] = False, # Quantize Config auto_bfloat16_cast: Optional[bool] = None, quantize_artifact_path: Optional[os.PathLike] = None, # Compiler Config compiler_config_overrides: Optional[Mapping] = None, do_decompositions_for_model_rewrite: bool = False, use_blockwise_compile: bool = True, num_blocks_per_supertask: Union[ Union[int, Sequence[int]], Callable[[Bucket], Union[int, Sequence[int]]] ] = 1, embed_all_constants_into_graph: bool = False, optimize_logit_shape: bool = True, kv_cache_sharing_across_beams_config: Optional[KvCacheSharingAcrossBeamsConfig] = None, prefill_chunk_size: Optional[int] = None, # PagedAttention Config paged_attention_block_size: int = 1, # Huggingface-related Config trust_remote_code: Optional[bool] = None, revision: Optional[str] = None, copies_from_model: Optional[List[str]] = None, copies_from_local: Optional[List[os.PathLike]] = None, bundle_binaries: bool = True, # Speculative-decoding Config speculative_model: Optional["ArtifactBuilder"] = None, num_speculative_tokens: Optional[int] = None, _allow_empty_speculative_model: bool = False, _embedding_layer_as_single_block: bool = False, _enable_bf16_partial_sum_for_split: bool = True, _use_2d_attention_masks: bool = False, _merge_kv_cache_indices: bool = False, **kwargs, ): # TODO : remove following later if kwargs.pop("preferred_pipeline_parallel_size", None): raise ValueError( "`preferred_pipeline_parallel_size` is depreacted. Use `pipeline_parallel_size` instead." ) model_config = AutoConfig.from_pretrained( model_id_or_path, trust_remote_code=trust_remote_code, revision=revision, ) canonical_model_id = find_canonical_model_id(model_config) if canonical_model_id is None: raise ValueError(f"ArtifactBuilder doesn't support {type(model_config)} class") # Constants default values self.artifact_id = str(uuid.uuid4()) self.model_id_or_path = model_id_or_path self.canonical_model_id = canonical_model_id self.optimize_paged_attention_block_loading = True self.sparse_select_version = "v1.5" self.one_supertask_per_device = True self.quantize_artifact_path = quantize_artifact_path self.prefill_chunk_size = prefill_chunk_size self.trust_remote_code = trust_remote_code self.revision = revision self.copies_from_model = copies_from_model self.copies_from_local = copies_from_local self.enable_bf16_partial_sum_for_split = _enable_bf16_partial_sum_for_split self.use_2d_attention_masks = _use_2d_attention_masks self.merge_kv_cache_indices = _merge_kv_cache_indices self.bundle_binaries = bundle_binaries if calculate_logit_only_for_last_token: warnings.warn( "calculate_logit_only_for_last_token is deprecated and will be removed in the future. Use `optimize_logit_shape` instead." ) optimize_logit_shape = calculate_logit_only_for_last_token # Pre-step for model configurations if is_quantized_model_path(model_id_or_path): self.is_from_quantized_model = True self.model_path = ( Path(model_id_or_path) if not isinstance(model_id_or_path, Path) else model_id_or_path ) self.model_metadata = get_model_metadata_from_quantized_model( self.model_path, model_config, trust_remote_code=trust_remote_code ) # Warnings for unsupported options if seed_for_random_weight is not None: logging.warning("Random weight model is not supported for quantized model.") if auto_bfloat16_cast is not None: logging.warning("auto_bfloat16_cast is not supported for quantized model.") self.seed_for_random_weight = None else: self.is_from_quantized_model = False # Presteps for model prepared by model id self.model_metadata = get_model_metadata_from_model_id( model_id=self.canonical_model_id, model_id_or_path=self.model_id_or_path, num_hidden_layers=num_hidden_layers, quantize_artifact_path=quantize_artifact_path, prefill_chunk_size=self.prefill_chunk_size, trust_remote_code=trust_remote_code, revision=self.revision, auto_bfloat16_cast=auto_bfloat16_cast, num_speculative_tokens=num_speculative_tokens, use_2d_attention_masks=_use_2d_attention_masks, merge_kv_cache_indices=_merge_kv_cache_indices, ) self.seed_for_random_weight = seed_for_random_weight # Check if the files to copy exist before building the model self._validate_copies_from_local() self._validate_copies_from_model() self._name = name # Followings are dummy specifications for parallel size and device configuration, required for compilation. # These values are expected to be provided by the user when loading artifacts via `LLM.load_artifact`. self.tensor_parallel_size = tensor_parallel_size self.pipeline_parallel_size = pipeline_parallel_size self.dummy_data_parallel_size = 1 num_chip_per_tp_group = (self.tensor_parallel_size - 1) // NUM_PES_PER_NPU + 1 self.dummy_normalized_mesh = [ [dev for tp_group in pp_tp_group for dev in tp_group] for pp_tp_group in get_device_mesh( [ get_tp_group_device(self.tensor_parallel_size, i) for i in range( 0, self.dummy_data_parallel_size * self.pipeline_parallel_size * num_chip_per_tp_group, num_chip_per_tp_group, ) ], tensor_parallel_size, self.pipeline_parallel_size, self.dummy_data_parallel_size, ) ] self.max_seq_len_to_capture = max_seq_len_to_capture self.max_prompt_len = ( max_prompt_len or max_seq_len_to_capture if prefill_chunk_size else None ) # Bucket Configuration if not prefill_buckets and not decode_buckets: self.prefill_buckets = [(1, max_seq_len_to_capture)] self.decode_buckets = ( [(1, max_seq_len_to_capture)] if self.model_metadata.is_generative_model else [] ) else: duplicate_prefill_buckets = [ bucket for bucket, cnt in Counter(prefill_buckets).items() if cnt > 1 ] duplicate_decode_buckets = [ bucket for bucket, cnt in Counter(decode_buckets).items() if cnt > 1 ] if duplicate_prefill_buckets: logging.warning( f"Duplicate prefill buckets are found: {duplicate_prefill_buckets}. " ) if duplicate_decode_buckets: logging.warning(f"Duplicate decode buckets are found: {duplicate_decode_buckets}. ") self.prefill_buckets = get_list_with_no_dup_with_order_preserved(prefill_buckets) self.decode_buckets = get_list_with_no_dup_with_order_preserved(decode_buckets) self.compiler_config_overrides = compiler_config_overrides self.model_rewriting_config = ModelRewritingConfig( do_decompositions_for_model_rewrite=do_decompositions_for_model_rewrite, use_blockwise_compile=use_blockwise_compile, embedding_layer_as_single_block=_embedding_layer_as_single_block, embed_all_constants_into_graph=embed_all_constants_into_graph, optimize_logit_shape=optimize_logit_shape, ) self.num_blocks_per_supertask = num_blocks_per_supertask self.kv_cache_sharing_across_beams_config = kv_cache_sharing_across_beams_config if self.model_metadata.attention_type is AttentionType.PAGED_ATTENTION: if paged_attention_block_size != 1: raise NotImplementedError( "Currently, only paged attention with block_size=1 is supported." ) padding_block_idx = ( DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX if self.optimize_paged_attention_block_loading else None ) self.paged_attention_config: Optional[PagedAttentionConfig] = PagedAttentionConfig( paged_attention_block_size, padding_block_idx ) else: self.paged_attention_config = None self.num_speculative_tokens = num_speculative_tokens self.speculative_model_artifact_builder = speculative_model self._allow_empty_speculative_model = _allow_empty_speculative_model if self.num_speculative_tokens: if not (self.speculative_model_artifact_builder or self._allow_empty_speculative_model): raise ValueError( "`num_speculative_tokens` and `speculative_model_artifact_builder` must be presented at the same time." ) @property def model_id(self) -> str: if self.is_from_quantized_model: furiosa_config_file = self.model_path / FURIOSA_CONFIG_JSON furiosa_config = json.loads(furiosa_config_file.read_text()) return furiosa_config['model_id'] else: return str(self.model_id_or_path) @property def name(self) -> str: if self._name: return self._name return self.model_id @property def quant_ckpt_file_path(self): if self.is_from_quantized_model: assert self.model_path return self.model_path / _EXPORTED_MODEL_QCKPT if self.quantize_artifact_path: quant_ckpt_file_path = Path(f"{self.quantize_artifact_path}/exported_model.qckpt") return quant_ckpt_file_path if os.path.exists(quant_ckpt_file_path) else None return None @property def qformat_path(self) -> Union[None, Path]: if self.is_from_quantized_model: assert self.model_path return self.model_path / _QFORMAT_YAML if self.quantize_artifact_path: qformat_path = Path(f"{self.quantize_artifact_path}/qformat.yaml") if os.path.exists(qformat_path): return qformat_path else: raise ValueError( "The quantize_artifact_path is specified, but the qformat.yaml file does not exist." ) return None @property def qparam_path(self) -> Union[None, Path]: if self.is_from_quantized_model: assert self.model_path return self.model_path / _QPARAM_NPY if self.quantize_artifact_path: qparam_path = Path(f"{self.quantize_artifact_path}/qparam.npy") if os.path.exists(qparam_path): return qparam_path else: raise ValueError( "The quantize_artifact_path is specified, but the qparam.npy file does not exist." ) return None def _validate_copies_from_local(self): """Check if the files exist in the local directory.""" if self.copies_from_local is None: return for file_copy in self.copies_from_local: if not os.path.exists(file_copy): raise ValueError( f"The file {file_copy} of copies_from_local does not exist in the local directory." ) def _validate_copies_from_model(self): """Check if the files are downloadable from the Hugging Face Hub.""" from huggingface_hub import hf_hub_download if self.copies_from_model is None: return for file_copy in self.copies_from_model: try: _ = hf_hub_download(self.model_metadata.pretrained_id, file_copy) except Exception as e: raise ValueError( f"Fail to download {file_copy} from Hugging Face Hub's repository {self.model_metadata.pretrained_id}." ) from e def _build_model_artifact( self, *, num_pipeline_builder_workers: int = 1, num_compile_workers: int = 1, cache_dir: Optional[os.PathLike] = CACHE_DIR, param_file_path: Optional[Union[os.PathLike, str]] = None, param_saved_format: Literal["safetensors", "pt"] = "safetensors", param_file_max_shard_size: Optional[Union[str, int]] = "5GB", _cleanup: bool = True, **kwargs, ) -> Tuple[ModelArtifact, List[Pipeline]]: model_config = self.model_metadata.config # Please refer to an example at https://huggingface.co/docs/transformers/en/main_classes/text_generation#transformers.GenerationMixin.greedy_search.example # Some models like GPT-2 may not have pad_token_id. BTW, when we run a batch of sequence generations, # We must need pad_token_id to fill the batch with pad. With Hugging Face Transformers, # users should handle this issue. Our goal is to provide a better useability for users. # We handle this issue within LLM class. model_config.pad_token_id = model_config.eos_token_id kv_cache_dtype = self.model_metadata.kv_cache_dtype original_model_type = self.model_metadata.get_optimized_cls() original_model_name = f"{original_model_type.__module__}.{original_model_type.__name__}" ( prefill_buckets_with_output_size, decode_buckets_with_output_size, other_buckets_with_output_size, ) = get_buckets_with_output_logits_size( self.model_metadata, ManualBucketConfig( prefill_buckets=self.prefill_buckets, decode_buckets=self.decode_buckets, ), self.max_prompt_len if self.max_prompt_len else self.max_seq_len_to_capture, self.max_seq_len_to_capture, num_speculative_tokens=self.num_speculative_tokens, prefill_chunk_size=self.prefill_chunk_size, optimize_bucket_output_logits_size=self.model_rewriting_config.optimize_logit_shape, ) prefill_buckets = [ bucket_with_output_size.bucket for bucket_with_output_size in prefill_buckets_with_output_size ] decode_buckets = [ bucket_with_output_size.bucket for bucket_with_output_size in decode_buckets_with_output_size ] other_buckets = [ bucket_with_output_size.bucket for bucket_with_output_size in other_buckets_with_output_size ] packing_type: Literal["IDENTITY"] = ( "IDENTITY" # TODO: enum PackingType has `IDENTITY` and `GREEDY` ) generator_config = GeneratorConfig( _POSITION_ID_PAD, get_list_with_no_dup_with_order_preserved( (*prefill_buckets, *decode_buckets, *other_buckets) ), original_model_name, self.paged_attention_config, packing_type, self.kv_cache_sharing_across_beams_config, self.num_speculative_tokens, None, ) parallel_config = ParallelConfig( tensor_parallel_size=self.tensor_parallel_size, pipeline_parallel_size=self.pipeline_parallel_size, ) model_creation_info = ModelCreationInfo( self.model_metadata, False if self.seed_for_random_weight is None else True, seed=self.seed_for_random_weight, qformat_path=self.qformat_path, qparam_path=self.qparam_path, quant_ckpt_file_path=self.quant_ckpt_file_path, ) beam_size_or_none = ( None if self.kv_cache_sharing_across_beams_config is None else self.kv_cache_sharing_across_beams_config.beam_width ) compiler_config_context = CompilerConfigContext( model_metadata=self.model_metadata, beam_size=beam_size_or_none, compiler_config_overrides=self.compiler_config_overrides, enable_bf16_partial_sum_for_split=self.enable_bf16_partial_sum_for_split, embedding_as_single_block=self.model_rewriting_config.embedding_layer_as_single_block, ) if _cleanup: self.tmp_dir = tempfile.TemporaryDirectory() tmp_dir_path = Path(self.tmp_dir.name) else: tmp_dir_path = Path(tempfile.mkdtemp()) if param_file_path: param_file_metadata = ParamFileMetadata.load( param_file_path, ParamfileFormat.from_str(param_saved_format) ) else: if cache_dir and model_creation_info.is_hashable(): param_file_cache_dir = Path(cache_dir) / PARAM_FILE_CACHE_SUBDIR_NAME param_file_metadata = get_param_file_with_cache( model_creation_info, param_file_cache_dir, max_shard_size=param_file_max_shard_size, ) else: assert isinstance(tmp_dir_path, Path) param_file_metadata = get_param_file_with_cache( model_creation_info, tmp_dir_path, max_shard_size=param_file_max_shard_size ) cache_dir = None if cache_dir is None else Path(cache_dir) # For now, `PipelineParallelismMppp` supports all valid cases because only pipeline parallelism is needed to be expressed within one pipeline. # TODO: Make the mppp configuration specific to the speculative_model. # for e.g., consider introducing a keyword argument (e.g., "speculative_mppp") for this purpose mppp = kwargs.pop("mppp", None) or PipelineParallelismMppp() logger.info( f"Buckets with output sizes: {pformat([*prefill_buckets_with_output_size, *decode_buckets_with_output_size, *other_buckets_with_output_size])}" ) if self.trust_remote_code: prestep_for_remote_code_model(self.model_metadata, num_pipeline_builder_workers) pipelines_with_metadata = build_pipelines( model_creation_info, [ *prefill_buckets_with_output_size, *decode_buckets_with_output_size, *other_buckets_with_output_size, ], self.dummy_normalized_mesh[0], param_file_metadata, cache_dir, mppp, SuperTaskKind.EDF, self.one_supertask_per_device, self.model_rewriting_config.use_blockwise_compile, self.model_rewriting_config.embedding_layer_as_single_block, self.model_rewriting_config.do_decompositions_for_model_rewrite, kv_cache_dtype.to_torch_dtype() if kv_cache_dtype else None, generator_config.paged_attention_config, self.sparse_select_version, generator_config.kv_cache_sharing_across_beams_config, tmp_dir_path, self.model_metadata, compiler_config_context, num_pipeline_builder_workers, num_compile_workers, self.model_rewriting_config.embed_all_constants_into_graph, self.num_blocks_per_supertask, self.model_metadata.is_generative_model, param_saved_format, **kwargs, ) if len(pipelines_with_metadata) == 0: raise ValueError("No pipeline is generated") pipeline_metadata_list = [metadata for _, metadata in pipelines_with_metadata] pipelines = [pipeline for pipeline, _ in pipelines_with_metadata] # Instead of properly casting pipelines into ModelArtifact.pipelines, # this function returns a Sequence[Pipeline] as one of tuple element. # as these pipelines will later be passed to __save_artifacts. return ( ModelArtifact( generator_config=generator_config, hf_config=self.model_metadata.config.to_dict(), model_metadata=self.model_metadata, model_rewriting_config=self.model_rewriting_config, parallel_config=parallel_config, pipeline_metadata_list=pipeline_metadata_list, max_prompt_len=self.max_prompt_len, ), pipelines, )
[docs] def build( self, save_dir: Union[str, os.PathLike], *, num_pipeline_builder_workers: int = 1, num_compile_workers: int = 1, cache_dir: Optional[os.PathLike] = CACHE_DIR, param_file_path: Optional[os.PathLike] = None, param_saved_format: Literal["safetensors", "pt"] = "safetensors", param_file_max_shard_size: Optional[Union[str, int]] = "5GB", _cleanup: bool = True, **kwargs, ): """Build the artifacts for given model configurations. Args: save_dir: The path to save the artifacts. With artifacts, you can create ``LLM`` without quantizing or compiling the model again. num_pipeline_builder_workers: The number of workers used for building pipelines (except for compilation). The default is 1 (no parallelism). Setting this value larger than 1 reduces pipeline building time, especially for large models, but requires much more memory. num_compile_workers: The number of workers used for compilation. The default is 1 (no parallelism). cache_dir: The cache directory for all generated files for this LLM instance. When its value is ``None``, caching is disabled. The default is "$HOME/.cache/furiosa/llm". param_file_path: The path to the parameter file to use for pipeline generation. If not specified, the parameters will be saved in a temporary file which will be deleted when ``LLM`` is destroyed. param_saved_format: The format of the parameter file. Only possible value is "safetensors" now. The default is "safetensors". param_file_max_shard_size: The maximum size of single parameter file. Parameter file will be split into smaller files to be less than this size. The default is "5GB". """ import furiosa.native_compiler metadata = ArtifactMetadata( artifact_id=self.artifact_id, name=self.name, timestamp=int(time.time()), furiosa_llm_version=_get_commit_id(), furiosa_compiler_version=furiosa.native_compiler.compiler_git_short_hash(), ) # Make sure the model local, get a model hash, and set it to ModelMetadata self.model_metadata.ensure_model_and_update_weight_hash() target_model_artifact, target_model_pipelines = self._build_model_artifact( num_pipeline_builder_workers=num_pipeline_builder_workers, num_compile_workers=num_compile_workers, cache_dir=cache_dir, param_file_path=param_file_path, param_saved_format=param_saved_format, _cleanup=_cleanup, param_file_max_shard_size=param_file_max_shard_size, **kwargs, ) speculative_model_artifact = None speculative_model_pipelines: List[Pipeline] = [] if self.speculative_model_artifact_builder: # Make sure the model local, get a model hash, and set it to ModelMetadata self.speculative_model_artifact_builder.model_metadata.ensure_model_and_update_weight_hash() speculative_model_artifact, speculative_model_pipelines = ( self.speculative_model_artifact_builder._build_model_artifact( num_pipeline_builder_workers=num_pipeline_builder_workers, num_compile_workers=num_compile_workers, cache_dir=cache_dir, param_file_path=param_file_path, param_saved_format=param_saved_format, param_file_max_shard_size=param_file_max_shard_size, _cleanup=_cleanup, ) ) artifact = Artifact( metadata=metadata, model=target_model_artifact, speculative_model=speculative_model_artifact, prefill_chunk_size=self.prefill_chunk_size, version=SCHEMA_VERSION, ) ArtifactBuilder.__save_artifacts( save_dir, artifact, target_model_pipelines, speculative_model_pipelines, self.copies_from_model, self.copies_from_local, self.bundle_binaries, )
@staticmethod def __preprocess_for_pipeline_save( pipelines: Sequence[Pipeline], path: Union[str, os.PathLike], bundle_binaries: bool = True, ) -> List[Dict[str, Any]]: from furiosa.native_compiler import CompiledGraph def save_blob_to_zip(zip_file: ZipFile, file_name: str, blob: Any) -> None: zip_file.writestr(file_name, blob) def save_blob_to_dir(base_path: str, file_name: str, blob: Any) -> None: mode = "w" if isinstance(blob, str) else "wb" with open(os.path.join(base_path, file_name), mode) as f: f.write(blob) def copy_param_file( param_file_info: ParamFileInfo, base_path: Union[str, os.PathLike] ) -> None: param_file_path = Path(param_file_info.path) parent_path = os.fspath(param_file_path.parent) if _INDEX_TENSOR_NAME in os.listdir(parent_path): # This param file is a shard in saftensors file set. # Copy whole directory new_parent_path = f"{base_path}/{os.path.basename(parent_path)}" if not os.path.exists(new_parent_path): shutil.copytree(parent_path, new_parent_path) new_file_path = os.path.join(new_parent_path, os.path.basename(param_file_path)) else: new_file_path = f"{base_path}/{os.path.basename(param_file_path)}" if not os.path.exists(new_file_path): shutil.copy(param_file_path, new_file_path) param_file_info.path = os.path.relpath(new_file_path, base_path) processed_pipelines = [] zip_path = os.path.join(path, f"{BINARY_BUNDLE_ZIP_FILE_NAME}.zip") save_ftn = save_blob_to_zip if bundle_binaries else save_blob_to_dir # NOTE: Each EDF file can be several gigabytes in size, so we should avoid # unnecessarily storing EDF blob data elsewhere in the program (e.g., in a temporary dictionary). zip_mode = "a" if os.path.exists(zip_path) else "w" with ZipFile(zip_path, mode=zip_mode) if bundle_binaries else nullcontext() as zip_file: # type: ignore[call-overload] save_target = zip_file if bundle_binaries else path for pipeline in pipelines: for id, blob in pipeline.blobs.items(): kind = pipeline.get_blob_kind().get(id) if kind == SuperTaskKind.FX: assert isinstance(blob, str) file_name = f"{id}.fx" data = blob elif kind == SuperTaskKind.EDF: assert isinstance(blob, CompiledGraph) file_name = f"{id}.edf" data = blob.serialize() # type: ignore[assignment] else: raise NotImplementedError(f"SuperTask [{kind}] is not supported to save") save_ftn(save_target, file_name, data) # type: ignore[arg-type] pipeline.blobs[id] = None # type: ignore[assignment] for param_file_info in pipeline.param_files.values(): copy_param_file(param_file_info, path) processed_pipelines.append(json.loads(pipeline.to_json())) return processed_pipelines @staticmethod def __save_artifacts( path: Union[str, os.PathLike], artifact: Artifact, target_model_pipelines: Sequence[Pipeline], speculative_model_pipelines: Sequence[Pipeline], copies_from_model: Optional[Sequence[str]], copies_from_local: Optional[Sequence[os.PathLike]], bundle_binaries: bool = True, ): import shutil from huggingface_hub import hf_hub_download path = Path(path) path.mkdir(parents=True, exist_ok=True) artifact.model = artifact.model.model_copy( update={ "pipelines": ArtifactBuilder.__preprocess_for_pipeline_save( target_model_pipelines, path, bundle_binaries ), }, ) if artifact.speculative_model: artifact.speculative_model = artifact.speculative_model.model_copy( update={ "pipelines": ArtifactBuilder.__preprocess_for_pipeline_save( speculative_model_pipelines, path, bundle_binaries ), }, ) artifact.export(f"{path}/artifact.json") model_metadata = artifact.model.model_metadata # Save Transformers configs to make an artifact more compatible with Hugging Face Transformers tokenizer = get_tokenizer(model_metadata._model_id_or_path) tokenizer.save_pretrained(path) hf_config = model_metadata.config hf_config.save_pretrained(path) # Usually, this copies important files like README.md, LICENSE.txt, etc. # So, we should fail if copying fails. if copies_from_model is not None: for hub_file in copies_from_model: cache_path = hf_hub_download(model_metadata.pretrained_id, hub_file) shutil.copy(cache_path, path) if copies_from_local is not None: for local_file in copies_from_local: shutil.copy(local_file, path) # Save furiosa_config.json furiosa_config = FuriosaConfig( model_id=model_metadata.pretrained_id, model_kinds=[ModelKind.ARTIFACT], model_class=ModelClass.from_class(model_metadata.get_optimized_cls()), llm_config=model_metadata.llm_config, ) furiosa_config_json_path = path / FURIOSA_CONFIG_JSON furiosa_config.export(furiosa_config_json_path)
def get_model_metadata_from_quantized_model( model_path: Union[str, Path], config: PretrainedConfig, trust_remote_code: Optional[bool] ) -> ModelMetadata: model_path = Path(model_path) if not isinstance(model_path, Path) else model_path (furiosa_config, _, _, _) = _load_quantized_model_meta(model_path) model_metadata = ModelMetadata( pretrained_id=furiosa_config.model_id, llm_config=furiosa_config.llm_config, trust_remote_code=trust_remote_code, model_id_or_path=model_path, ) model_metadata = model_metadata.with_hf_configs(config.to_diff_dict()) assert model_metadata.is_generative_model return model_metadata def get_model_metadata_from_model_id( model_id: str, model_id_or_path: Optional[Union[str, Path]] = None, num_hidden_layers: Optional[int] = None, quantize_artifact_path: Optional[os.PathLike] = None, prefill_chunk_size: Optional[int] = None, trust_remote_code: Optional[bool] = None, revision: Optional[str] = None, auto_bfloat16_cast: Optional[bool] = None, num_speculative_tokens: Optional[int] = None, use_2d_attention_masks: bool = False, merge_kv_cache_indices: bool = False, ) -> ModelMetadata: model_metadata = ( ModelMetadata.init_with_mlperf_optim_options( model_id, model_id_or_path=model_id_or_path, trust_remote_code=trust_remote_code, revision=revision, ) if is_mlperf_optimized_model(model_id) else ModelMetadata( model_id, model_id_or_path=model_id_or_path, trust_remote_code=trust_remote_code, revision=revision, ) ) if auto_bfloat16_cast: model_metadata = model_metadata.with_auto_bfloat16_cast() # Override model config, optimization config if num_hidden_layers: model_metadata = model_metadata.with_num_layers(num_hidden_layers) if prefill_chunk_size or num_speculative_tokens: # For chunked prefill, model optimized for spec dec is needed, which can accept decode buckets with inputs_ids_len > 1. model_metadata = model_metadata.with_optimizations("optimized_for_speculative_decoding") if use_2d_attention_masks: # 2d mask optimization always goes with speculative decoding optimization. model_metadata = model_metadata.with_optimizations( { "optimized_for_speculative_decoding": True, "use_2d_masks": True, } ) if merge_kv_cache_indices: # KV cache indices merge optimization always goes with speculative decoding optimization. model_metadata = model_metadata.with_optimizations( { "optimized_for_speculative_decoding": True, "merged_kv_indices": True, } ) ## Quantization Configuration if quantize_artifact_path: # model-compressor specific paths qformat_path = Path(f"{quantize_artifact_path}/qformat.yaml") qparam_path = Path(f"{quantize_artifact_path}/qparam.npy") if os.path.exists(qformat_path): qformat_path = qformat_path else: raise ValueError( f"The quantize_artifact_path {quantize_artifact_path} is specified, but the qformat.yaml file does not exist." ) if os.path.exists(qparam_path): qparam_path = qparam_path else: raise ValueError( "The quantize_artifact_path is specified, but the qparam.npy file does not exist." ) quantization_config = QuantizationConfig.from_qformat(qformat_path) model_metadata = model_metadata.with_quantization_config(quantization_config) return model_metadata