Source code for furiosa_llm.artifact.builder

from collections import Counter
import json
import logging
import os
from pathlib import Path
from pprint import pformat
import tempfile
import time
from typing import Any, Dict, List, Literal, Mapping, Optional, Sequence, Tuple, Union
import uuid
import warnings

from transformers import AutoConfig, AutoTokenizer

from furiosa_llm.optimum.modeling import (
    _EXPORTED_MODEL_QCKPT,
    _QFORMAT_YAML,
    _QPARAM_NPY,
    FURIOSA_CONFIG_JSON,
    _load_quantized_model_meta,
    is_quantized_model_path,
)
from furiosa_llm.optimum.types import FuriosaConfig, ModelClass, ModelKind
from furiosa_llm.parallelize.mppp.config import Device
from furiosa_llm.utils import get_list_with_no_dup_with_order_preserved

from ..device import get_device_mesh
from ..models import AttentionType, ModelMetadata, QuantizationConfig
from ..models.config_types import (
    GeneratorConfig,
    KvCacheSharingAcrossBeamsConfig,
    LLMBackend,
    ManualBucketConfig,
    ModelRewritingConfig,
    PagedAttentionConfig,
    ParallelConfig,
)
from ..parallelize.compiler_config import CompilerConfigContext
from ..parallelize.model_creation_info import ModelCreationInfo
from ..parallelize.mppp.api import PipelineParallelismMppp
from ..parallelize.pipeline import Pipeline
from ..parallelize.pipeline.builder.api import _get_commit_id
from ..parallelize.pipeline.types import SuperTaskKind
from ..parallelize.trace import get_param_file_with_cache
from .helper import (
    build_pipelines,
    get_buckets_with_output_logits_size,
    prestep_for_remote_code_model,
)
from .types.latest import SCHEMA_VERSION, Artifact, ArtifactMetadata, ModelArtifact

logger = logging.getLogger(__name__)

# Default position id for padding
_POSITION_ID_PAD = 1

# Default param file name
_PARAM_FILE_NAME = "params.safetensors"

# Default index of the padding block when paged attention model is used.
DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX = 0

CACHE_DIR = Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "furiosa" / "llm"


def is_mlperf_optimized_model(model_id: str) -> bool:
    return model_id in (
        "furiosa-ai/mlperf-bert-large",
        "furiosa-ai/mlperf-gpt-j-6b",
        "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
        "deepseek-ai/DeepSeek-R1-Distill-Llama-70B",
        "meta-llama/Llama-2-70b-chat-hf",
        "meta-llama/Llama-3.1-8B-Instruct",
        "meta-llama/Llama-3.1-70B-Instruct",
        "meta-llama/Llama-3.3-70B-Instruct",
        "eliceai/helpy-edu-b-llama3.1",
        "upstage/SOLAR-10.7B-Instruct-v1.0",
        "furiosa-ai/EXAONE-3.0-7.8B-Instruct-converted",
        "LGAI-EXAONE/EXAONE-3.0-7.8B-Instruct",
        "LGAI-EXAONE/EXAONE-3.5-2.4B-Instruct",
        "LGAI-EXAONE/EXAONE-3.5-7.8B-Instruct",
        "LGAI-EXAONE/EXAONE-3.5-32B-Instruct",
    )


[docs] class ArtifactBuilder: """The artifact builder to use in the Furiosa LLM. Args: model_id_or_path: The Huggingface model id or a local path. This corresponds to pretrained_model_name_or_path in HuggingFace Transformers. name: The name of the artifact to build. tensor_parallel_size: The number of PEs for each tensor parallelism group. The default is 4. pipeline_parallel_size : The pipeline parallel size that will be used when loading the model by default. The default value is 1. prefill_buckets: Specify the bucket size for prefill decode_buckets: Specify the bucket size for decode max_seq_len_to_capture: Maximum sequence length covered by LLM engine. Sequence with larger context than this will not be covered. If no bucket is explicitly specified, a single batch bucket with a context length of this value is created. max_prompt_len: Maximum prompt sequence length covered by LLM engine. Prompt larger than this cannot be handled. If not given, will be obtained from bucket and other configs. num_hidden_layers: Number of hidden layers in the Transformer encoder. seed_for_random_weight: The seed to initialize the random number generator for creating random weight. calculate_logit_only_for_last_token: Whether the model has last block slice optimization applied. auto_bfloat16_cast: Whether to cast the model to bfloat16 automatically. This option is required when neither the model is trained with bfloat16 nor quantized. quantize_artifact_path: Specifies the path where quantization artifacts generated by the furiosa-model-compressor are saved. compiler_config_overrides: Overrides for the compiler config. This is a dictionary that includes the configuration for the compiler. do_decompositions_for_model_rewrite: Whether to decompose some ops to describe various parallelism strategies with mppp config. When the value is True, mppp config that matches with the decomposed FX graph should be given. use_blockwise_compile: If True, each task will be compiled in the unit of transformer block, and compilation result for transformer block is generated once and reused. The default is ``True``. num_blocks_per_supertask: The number of transformer blocks that will be merged into one supertask. This option is valid only when `use_blockwise_compile=True`. The default is 1. num_blocks_per_pp_stage: The number of transformers blocks per each pipeline parallelism stage. If not given, transformer blocks will be distributed equally. embed_all_constants_into_graph: Whether to embed constant tensors into graph or make them as input of the graph and save them as separate files. The default is False. optimize_logit_shape: Add logit slice or removal operation in the graph for optimized performance. kv_cache_sharing_across_beams_config: Configuration for sharing kv cache across beams. This argument must be given if and only if the model is optimized to share kv cache across beams. If this argument is given, decode phase buckets with batch size of ``batch_size`` \* ``kv_cache_sharing_across_beams_config.beam_width`` will be created. prefill_chunk_size: Chunk size used for chunked prefill. If the value is `None`, chunked prefill is not used. paged_attention_block_size: The maximum number of tokens that can be stored in a single paged attention block. This argument must be given if model uses paged attention. trust_remote_code: Trust remote code when downloading the model and tokenizer from HuggingFace. copies_from_model: Relative paths of files to copy from the model directory (e.g., LICENSE.txt) copies_from_local: Relative paths of files to copy from the local directory (e.g., README.md) speculative_model : Speculative model for speculative decoding. Note that speculative decoding is an experimental feature and the build may be unstable. num_speculative_tokens : The number of tokens that specualtive model will generate speculatively during each iteration of the decoding process. Note that speculative decoding is an experimental feature and the build may be unstable. """ def __init__( self, model_id_or_path: str, name: str = "", *, # Parallelize Config tensor_parallel_size: int = 4, pipeline_parallel_size: int = 1, # Bucket Config prefill_buckets: Sequence[Tuple[int, int]] = [], decode_buckets: Sequence[Tuple[int, int]] = [], max_seq_len_to_capture: int = 2048, max_prompt_len: Optional[int] = None, # Model Config num_hidden_layers: Optional[int] = None, seed_for_random_weight: Optional[int] = None, calculate_logit_only_for_last_token: Optional[bool] = False, # Quantize Config auto_bfloat16_cast: Optional[bool] = None, quantize_artifact_path: Optional[os.PathLike] = None, # Compiler Config compiler_config_overrides: Optional[Mapping] = None, do_decompositions_for_model_rewrite: bool = False, use_blockwise_compile: bool = True, num_blocks_per_supertask: int = 1, embed_all_constants_into_graph: bool = False, optimize_logit_shape: bool = True, kv_cache_sharing_across_beams_config: Optional[KvCacheSharingAcrossBeamsConfig] = None, prefill_chunk_size: Optional[int] = None, # PagedAttention Config paged_attention_block_size: int = 1, # Huggingface-related Config trust_remote_code: Optional[bool] = None, copies_from_model: Optional[List[str]] = None, copies_from_local: Optional[List[os.PathLike]] = None, # Speculative-decoding Config speculative_model: Optional["ArtifactBuilder"] = None, num_speculative_tokens: Optional[int] = None, _allow_empty_speculative_model: bool = False, _embedding_layer_as_single_block: bool = False, **kwargs, ): # TODO : remove following later if kwargs.pop("preferred_pipeline_parallel_size", None): raise ValueError( "`preferred_pipeline_parallel_size` is depreacted. Use `pipeline_parallel_size` instead." ) self.artifact_id = str(uuid.uuid4()) # Constants default values self.optimize_paged_attention_block_loading = True self.sparse_select_version = "v1.5" self.one_supertask_per_device = True self.quantize_artifact_path = quantize_artifact_path self.model_id_or_path = model_id_or_path self.prefill_chunk_size = prefill_chunk_size self.trust_remote_code = trust_remote_code self.copies_from_model = copies_from_model self.copies_from_local = copies_from_local if calculate_logit_only_for_last_token: warnings.warn( "calculate_logit_only_for_last_token is deprecated and will be removed in the future. Use `optimize_logit_shape` instead." ) optimize_logit_shape = calculate_logit_only_for_last_token # Pre-step for model configurations if is_quantized_model_path(model_id_or_path): self.is_from_quantized_model = True self.model_path = ( Path(model_id_or_path) if not isinstance(model_id_or_path, Path) else model_id_or_path ) self.model_metadata = get_model_metadata_from_quantized_model( self.model_path, trust_remote_code=trust_remote_code ) # Warnings for unsupported options if seed_for_random_weight is not None: logging.warning("Random weight model is not supported for quantized model.") if auto_bfloat16_cast is not None: logging.warning("auto_bfloat16_cast is not supported for quantized model.") self.seed_for_random_weight = None else: self.is_from_quantized_model = False # Presteps for model prepared by model id self.model_metadata = get_model_metadata_from_model_id( model_id=self.model_id, num_hidden_layers=num_hidden_layers, quantize_artifact_path=quantize_artifact_path, prefill_chunk_size=self.prefill_chunk_size, trust_remote_code=trust_remote_code, auto_bfloat16_cast=auto_bfloat16_cast, num_speculative_tokens=num_speculative_tokens, ) self.seed_for_random_weight = seed_for_random_weight # Check if the files to copy exist before building the model self._validate_copies_from_local() self._validate_copies_from_model() self._name = name # Followings are dummy specifications for parallel size and device configuration, required for compilation. # These values are expected to be provided by the user when loading artifacts via `LLM.load_artifact`. self.tensor_parallel_size = tensor_parallel_size self.pipeline_parallel_size = pipeline_parallel_size self.dummy_data_parallel_size = 1 self.dummy_normalized_mesh = [ [dev for tp_group in pp_tp_group for dev in tp_group] for pp_tp_group in get_device_mesh( [ Device(f"npu:{i}:0-{self.tensor_parallel_size-1}") for i in range(self.dummy_data_parallel_size * self.pipeline_parallel_size) ], tensor_parallel_size, self.pipeline_parallel_size, self.dummy_data_parallel_size, ) ] self.max_seq_len_to_capture = max_seq_len_to_capture self.max_prompt_len = max_prompt_len # Bucket Configuration if not prefill_buckets and not decode_buckets: self.prefill_buckets = [(1, max_seq_len_to_capture)] self.decode_buckets = ( [(1, max_seq_len_to_capture)] if self.model_metadata.is_generative_model else [] ) else: duplicate_prefill_buckets = [ bucket for bucket, cnt in Counter(prefill_buckets).items() if cnt > 1 ] duplicate_decode_buckets = [ bucket for bucket, cnt in Counter(decode_buckets).items() if cnt > 1 ] if duplicate_prefill_buckets: logging.warning( f"Duplicate prefill buckets are found: {duplicate_prefill_buckets}. " ) if duplicate_decode_buckets: logging.warning(f"Duplicate decode buckets are found: {duplicate_decode_buckets}. ") self.prefill_buckets = get_list_with_no_dup_with_order_preserved(prefill_buckets) self.decode_buckets = get_list_with_no_dup_with_order_preserved(decode_buckets) self.compiler_config_overrides = compiler_config_overrides self.model_rewriting_config = ModelRewritingConfig( do_decompositions_for_model_rewrite=do_decompositions_for_model_rewrite, use_blockwise_compile=use_blockwise_compile, embedding_layer_as_single_block=_embedding_layer_as_single_block, num_blocks_per_supertask=num_blocks_per_supertask, embed_all_constants_into_graph=embed_all_constants_into_graph, optimize_logit_shape=optimize_logit_shape, ) self.kv_cache_sharing_across_beams_config = kv_cache_sharing_across_beams_config if self.model_metadata.attention_type is AttentionType.PAGED_ATTENTION: if paged_attention_block_size != 1: raise NotImplementedError( "Currently, only paged attention with block_size=1 is supported." ) padding_block_idx = ( DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX if self.optimize_paged_attention_block_loading else None ) self.paged_attention_config: Optional[PagedAttentionConfig] = PagedAttentionConfig( paged_attention_block_size, padding_block_idx ) else: self.paged_attention_config = None self.num_speculative_tokens = num_speculative_tokens self.speculative_model_artifact_builder = speculative_model self._allow_empty_speculative_model = _allow_empty_speculative_model if self.num_speculative_tokens: if not (self.speculative_model_artifact_builder or self._allow_empty_speculative_model): raise ValueError( "`num_speculative_tokens` and `speculative_model_artifact_builder` must be presented at the same time." ) @property def model_id(self) -> str: if self.is_from_quantized_model: furiosa_config_file = self.model_path / FURIOSA_CONFIG_JSON furiosa_config = json.loads(furiosa_config_file.read_text()) return furiosa_config['model_id'] else: return self.model_id_or_path @property def name(self) -> str: if self._name: return self._name return self.model_id @property def quant_ckpt_file_path(self): if self.is_from_quantized_model: assert self.model_path return self.model_path / _EXPORTED_MODEL_QCKPT if self.quantize_artifact_path: quant_ckpt_file_path = Path(f"{self.quantize_artifact_path}/exported_model.qckpt") return quant_ckpt_file_path if os.path.exists(quant_ckpt_file_path) else None return None @property def qformat_path(self) -> Union[None, Path]: if self.is_from_quantized_model: assert self.model_path return self.model_path / _QFORMAT_YAML if self.quantize_artifact_path: qformat_path = Path(f"{self.quantize_artifact_path}/qformat.yaml") if os.path.exists(qformat_path): return qformat_path else: raise ValueError( "The quantize_artifact_path is specified, but the qformat.yaml file does not exist." ) return None @property def qparam_path(self) -> Union[None, Path]: if self.is_from_quantized_model: assert self.model_path return self.model_path / _QPARAM_NPY if self.quantize_artifact_path: qparam_path = Path(f"{self.quantize_artifact_path}/qparam.npy") if os.path.exists(qparam_path): return qparam_path else: raise ValueError( "The quantize_artifact_path is specified, but the qparam.npy file does not exist." ) return None def _validate_copies_from_local(self): """Check if the files exist in the local directory.""" if self.copies_from_local is None: return for file_copy in self.copies_from_local: if not os.path.exists(file_copy): raise ValueError( f"The file {file_copy} of copies_from_local does not exist in the local directory." ) def _validate_copies_from_model(self): """Check if the files are downloadable from the Hugging Face Hub.""" from huggingface_hub import hf_hub_download if self.copies_from_model is None: return for file_copy in self.copies_from_model: try: _ = hf_hub_download(self.model_metadata.pretrained_id, file_copy) except Exception as e: raise ValueError( f"Fail to download {file_copy} from Hugging Face Hub's repository {self.model_metadata.pretrained_id}." ) from e def _build_model_artifact( self, *, num_pipeline_builder_workers: int = 1, num_compile_workers: int = 1, cache_dir: Optional[os.PathLike] = CACHE_DIR, param_file_path: Optional[os.PathLike] = None, param_saved_format: Literal["safetensors", "pt"] = "safetensors", _cleanup: bool = True, **kwargs, ) -> Tuple[ModelArtifact, List[Pipeline]]: model_config = self.model_metadata.config # Please refer to an example at https://huggingface.co/docs/transformers/en/main_classes/text_generation#transformers.GenerationMixin.greedy_search.example # Some models like GPT-2 may not have pad_token_id. BTW, when we run a batch of sequence generations, # We must need pad_token_id to fill the batch with pad. With Hugging Face Transformers, # users should handle this issue. Our goal is to provide a better useability for users. # We handle this issue within LLM class. model_config.pad_token_id = model_config.eos_token_id kv_cache_dtype = self.model_metadata.kv_cache_dtype original_model_type = self.model_metadata.get_optimized_cls() original_model_name = f"{original_model_type.__module__}.{original_model_type.__name__}" ( prefill_buckets_with_output_size, decode_buckets_with_output_size, other_buckets_with_output_size, ) = get_buckets_with_output_logits_size( self.model_metadata, ManualBucketConfig( prefill_buckets=self.prefill_buckets, decode_buckets=self.decode_buckets, ), self.max_prompt_len if self.max_prompt_len else self.max_seq_len_to_capture, self.max_seq_len_to_capture, num_speculative_tokens=self.num_speculative_tokens, prefill_chunk_size=self.prefill_chunk_size, optimize_bucket_output_logits_size=self.model_rewriting_config.optimize_logit_shape, ) prefill_buckets = [ bucket_with_output_size.bucket for bucket_with_output_size in prefill_buckets_with_output_size ] decode_buckets = [ bucket_with_output_size.bucket for bucket_with_output_size in decode_buckets_with_output_size ] other_buckets = [ bucket_with_output_size.bucket for bucket_with_output_size in other_buckets_with_output_size ] packing_type: Literal["IDENTITY"] = ( "IDENTITY" # TODO: enum PackingType has `IDENTITY` and `GREEDY` ) generator_config = GeneratorConfig( _POSITION_ID_PAD, get_list_with_no_dup_with_order_preserved( (*prefill_buckets, *decode_buckets, *other_buckets) ), original_model_name, self.paged_attention_config, packing_type, self.kv_cache_sharing_across_beams_config, self.num_speculative_tokens, None, ) parallel_config = ParallelConfig( tensor_parallel_size=self.tensor_parallel_size, pipeline_parallel_size=self.pipeline_parallel_size, ) model_creation_info = ModelCreationInfo( self.model_metadata, False if self.seed_for_random_weight is None else True, seed=self.seed_for_random_weight, qformat_path=self.qformat_path, qparam_path=self.qparam_path, quant_ckpt_file_path=self.quant_ckpt_file_path, ) beam_size_or_none = ( None if self.kv_cache_sharing_across_beams_config is None else self.kv_cache_sharing_across_beams_config.beam_width ) compiler_config_context = CompilerConfigContext( model_metadata=self.model_metadata, beam_size=beam_size_or_none, compiler_config_overrides=self.compiler_config_overrides, ) if _cleanup: self.tmp_dir = tempfile.TemporaryDirectory() tmp_dir_path = Path(self.tmp_dir.name) else: tmp_dir_path = Path(tempfile.mkdtemp()) if not param_file_path: if cache_dir and model_creation_info.is_hashable(): param_file_cache_dir = Path(cache_dir) / "param_files" param_file_path = get_param_file_with_cache( model_creation_info, param_file_cache_dir ) else: assert isinstance(tmp_dir_path, Path) param_file_path = get_param_file_with_cache(model_creation_info, tmp_dir_path) cache_dir = None if cache_dir is None else Path(cache_dir) # For now, `PipelineParallelismMppp` supports all valid cases because only pipeline parallelism is needed to be expressed within one pipeline. # TODO: Make the mppp configuration specific to the speculative_model. # for e.g., consider introducing a keyword argument (e.g., "speculative_mppp") for this purpose mppp = kwargs.pop("mppp", None) or PipelineParallelismMppp() logger.info( f"Buckets with output sizes: {pformat([*prefill_buckets_with_output_size, *decode_buckets_with_output_size, *other_buckets_with_output_size])}" ) if self.trust_remote_code: prestep_for_remote_code_model(self.model_metadata, num_pipeline_builder_workers) pipelines_with_metadata = build_pipelines( model_creation_info, [ *prefill_buckets_with_output_size, *decode_buckets_with_output_size, *other_buckets_with_output_size, ], self.dummy_normalized_mesh[0], param_file_path, cache_dir, LLMBackend.FURIOSA_RT_V2, mppp, SuperTaskKind.EDF, self.one_supertask_per_device, self.model_rewriting_config.use_blockwise_compile, self.model_rewriting_config.embedding_layer_as_single_block, self.model_rewriting_config.do_decompositions_for_model_rewrite, kv_cache_dtype.to_torch_dtype() if kv_cache_dtype else None, generator_config.paged_attention_config, self.sparse_select_version, generator_config.kv_cache_sharing_across_beams_config, tmp_dir_path, self.model_metadata, compiler_config_context, num_pipeline_builder_workers, num_compile_workers, self.model_rewriting_config.embed_all_constants_into_graph, self.model_rewriting_config.num_blocks_per_supertask, self.model_metadata.is_generative_model, param_saved_format, **kwargs, ) if len(pipelines_with_metadata) == 0: raise ValueError("No pipeline is generated") pipeline_metadata_list = [metadata for _, metadata in pipelines_with_metadata] pipelines = [pipeline for pipeline, _ in pipelines_with_metadata] # Instead of properly casting pipelines into ModelArtifact.pipelines, # this function returns a Sequence[Pipeline] as one of tuple element. # as these pipelines will later be passed to __save_artifacts. return ( ModelArtifact( generator_config=generator_config, hf_config=self.model_metadata.config.to_dict(), model_metadata=self.model_metadata, model_rewriting_config=self.model_rewriting_config, parallel_config=parallel_config, pipeline_metadata_list=pipeline_metadata_list, max_prompt_len=self.max_prompt_len, ), pipelines, )
[docs] def build( self, save_dir: Union[str, os.PathLike], *, num_pipeline_builder_workers: int = 1, num_compile_workers: int = 1, cache_dir: Optional[os.PathLike] = CACHE_DIR, param_file_path: Optional[os.PathLike] = None, param_saved_format: Literal["safetensors", "pt"] = "safetensors", _cleanup: bool = True, **kwargs, ): """Build the artifacts for given model configurations. Args: save_dir: The path to save the artifacts. With artifacts, you can create ``LLM`` without quantizing or compiling the model again. num_pipeline_builder_workers: The number of workers used for building pipelines (except for compilation). The default is 1 (no parallelism). Setting this value larger than 1 reduces pipeline building time, especially for large models, but requires much more memory. num_compile_workers: The number of workers used for compilation. The default is 1 (no parallelism). cache_dir: The cache directory for all generated files for this LLM instance. When its value is ``None``, caching is disabled. The default is "$HOME/.cache/furiosa/llm". param_file_path: The path to the parameter file to use for pipeline generation. If not specified, the parameters will be saved in a temporary file which will be deleted when ``LLM`` is destroyed. param_saved_format: The format of the parameter file. Only possible value is "safetensors" now. The default is "safetensors". """ import furiosa.native_compiler metadata = ArtifactMetadata( artifact_id=self.artifact_id, name=self.name, timestamp=int(time.time()), furiosa_llm_version=_get_commit_id(), furiosa_compiler_version=furiosa.native_compiler.compiler_git_short_hash(), ) target_model_artifact, target_model_pipelines = self._build_model_artifact( num_pipeline_builder_workers=num_pipeline_builder_workers, num_compile_workers=num_compile_workers, cache_dir=cache_dir, param_file_path=param_file_path, param_saved_format=param_saved_format, _cleanup=_cleanup, **kwargs, ) speculative_model_artifact = None speculative_model_pipelines: List[Pipeline] = [] if self.speculative_model_artifact_builder: speculative_model_artifact, speculative_model_pipelines = ( self.speculative_model_artifact_builder._build_model_artifact( num_pipeline_builder_workers=num_pipeline_builder_workers, num_compile_workers=num_compile_workers, cache_dir=cache_dir, param_file_path=param_file_path, param_saved_format=param_saved_format, _cleanup=_cleanup, ) ) artifact = Artifact( metadata=metadata, model=target_model_artifact, speculative_model=speculative_model_artifact, prefill_chunk_size=self.prefill_chunk_size, version=SCHEMA_VERSION, ) ArtifactBuilder.__save_artifacts( save_dir, artifact, target_model_pipelines, speculative_model_pipelines, self.copies_from_model, self.copies_from_local, )
@staticmethod def __preprocess_for_pipeline_save( pipelines: Sequence[Pipeline], path: Union[str, os.PathLike] ) -> List[Dict[str, Any]]: import shutil processed_pipelines = list() for _, pipeline in enumerate(pipelines): blob_kind = pipeline.get_blob_kind() for id, blob in pipeline.blobs.items(): kind = blob_kind.get(id) if kind == SuperTaskKind.FX: with open(f"{path}/{id}.fx", "w") as fp: fp.write(blob) elif kind == SuperTaskKind.EDF: with open(f"{path}/{id}.edf", "wb") as fp: fp.write(blob.serialize()) # type: ignore[attr-defined] else: raise NotImplementedError(f"SuperTask [{kind}] is not supported to save") pipeline.blobs[id] = None # type: ignore[assignment] for param_idx, param_file in pipeline.param_files.items(): filename = os.path.basename(param_file.path) new_path = Path(f"{path}/{filename}") if not new_path.exists(): shutil.copyfile(param_file.path, new_path) pipeline.param_files[param_idx].path = filename processed_pipelines.append(json.loads(pipeline.to_json())) return processed_pipelines @staticmethod def __save_artifacts( path: Union[str, os.PathLike], artifact: Artifact, target_model_pipelines: Sequence[Pipeline], speculative_model_pipelines: Sequence[Pipeline], copies_from_model: Optional[Sequence[str]], copies_from_local: Optional[Sequence[os.PathLike]], ): import shutil from huggingface_hub import hf_hub_download path = Path(path) path.mkdir(parents=True, exist_ok=True) artifact.model = artifact.model.model_copy( update={ "pipelines": ArtifactBuilder.__preprocess_for_pipeline_save( target_model_pipelines, path ), }, ) if artifact.speculative_model: artifact.speculative_model = artifact.speculative_model.model_copy( update={ "pipelines": ArtifactBuilder.__preprocess_for_pipeline_save( speculative_model_pipelines, path ), }, ) artifact.export(f"{path}/artifact.json") model_metadata = artifact.model.model_metadata # Save Transformers configs to make an artifact more compatible with Hugging Face Transformers tokenizer = AutoTokenizer.from_pretrained(model_metadata.pretrained_id) tokenizer.save_pretrained(path) hf_config = model_metadata.config hf_config.save_pretrained(path) # Usually, this copies important files like README.md, LICENSE.txt, etc. # So, we should fail if copying fails. if copies_from_model is not None: for hub_file in copies_from_model: cache_path = hf_hub_download(model_metadata.pretrained_id, hub_file) shutil.copy(cache_path, path) if copies_from_local is not None: for local_file in copies_from_local: shutil.copy(local_file, path) # Save furiosa_config.json furiosa_config = FuriosaConfig( model_id=model_metadata.pretrained_id, model_kinds=[ModelKind.ARTIFACT], model_class=ModelClass.from_class(model_metadata.get_optimized_cls()), llm_config=model_metadata.llm_config, ) furiosa_config_json_path = path / FURIOSA_CONFIG_JSON furiosa_config.export(furiosa_config_json_path)
def get_model_metadata_from_quantized_model( model_path: Union[str, Path], trust_remote_code: Optional[bool] ) -> ModelMetadata: model_path = Path(model_path) if not isinstance(model_path, Path) else model_path (furiosa_config, _, _, _) = _load_quantized_model_meta(model_path) hf_configs = AutoConfig.from_pretrained(model_path, trust_remote_code=trust_remote_code) model_metadata = ModelMetadata( pretrained_id=furiosa_config.model_id, llm_config=furiosa_config.llm_config, trust_remote_code=trust_remote_code, ) model_metadata = model_metadata.with_hf_configs(hf_configs.to_diff_dict()) assert model_metadata.is_generative_model return model_metadata def get_model_metadata_from_model_id( model_id, num_hidden_layers: Optional[int] = None, quantize_artifact_path: Optional[os.PathLike] = None, prefill_chunk_size: Optional[int] = None, trust_remote_code: Optional[bool] = None, auto_bfloat16_cast: Optional[bool] = None, num_speculative_tokens: Optional[int] = None, ) -> ModelMetadata: model_metadata = ( ModelMetadata.init_with_mlperf_optim_options( model_id, trust_remote_code=trust_remote_code, ) if is_mlperf_optimized_model(model_id) else ModelMetadata( model_id, trust_remote_code=trust_remote_code, ) ) if auto_bfloat16_cast: model_metadata = model_metadata.with_auto_bfloat16_cast() # Override model config, optimization config if num_hidden_layers: model_metadata = model_metadata.with_num_layers(num_hidden_layers) if prefill_chunk_size or num_speculative_tokens: # For chunked prefill, model optimized for spec dec is needed, which can accept decode buckets with inputs_ids_len > 1. model_metadata = model_metadata.with_optimizations( {"optimized_for_speculative_decoding": True} ) ## Quantization Configuration if quantize_artifact_path: # model-compressor specific paths qformat_path = Path(f"{quantize_artifact_path}/qformat.yaml") qparam_path = Path(f"{quantize_artifact_path}/qparam.npy") if os.path.exists(qformat_path): qformat_path = qformat_path else: raise ValueError( f"The quantize_artifact_path {quantize_artifact_path} is specified, but the qformat.yaml file does not exist." ) if os.path.exists(qparam_path): qparam_path = qparam_path else: raise ValueError( "The quantize_artifact_path is specified, but the qparam.npy file does not exist." ) quantization_config = QuantizationConfig.from_qformat(qformat_path) model_metadata = model_metadata.with_quantization_config(quantization_config) return model_metadata