Source code for furiosa_llm.artifact.builder

import json
import logging
import os
from pathlib import Path
from pprint import pformat
import tempfile
import time
from typing import Literal, Mapping, Optional, Sequence, Tuple, Union
import uuid

from transformers import AutoConfig

from furiosa_llm.models.metadata import LLMConfig
from furiosa_llm.optimum.modeling import (
    _EXPORTED_MODEL_QCKPT,
    _FURIOSA_CONFIG_JSON,
    _QFORMAT_YAML,
    _QPARAM_NPY,
    _load_quantized_model_meta,
    is_quantized_model_path,
)
from furiosa_llm.utils import get_list_with_no_dup_with_order_preserved

from ..device import get_device_mesh, parse_devices_str
from ..models import AttentionType, ModelMetadata, QuantizationConfig
from ..models.config_types import (
    GeneratorConfig,
    KvCacheSharingAcrossBeamsConfig,
    LLMBackend,
    ManualBucketConfig,
    ModelRewritingConfig,
    PagedAttentionConfig,
    ParallelConfig,
    SchedulerConfig,
)
from ..parallelize.compiler_config import CompilerConfigContext
from ..parallelize.model_creation_info import ModelCreationInfo
from ..parallelize.mppp.api import PipelineParallelismMppp
from ..parallelize.pipeline import Pipeline
from ..parallelize.pipeline.builder.api import _get_commit_id
from ..parallelize.pipeline.types import SuperTaskKind
from ..parallelize.trace import get_param_file_with_cache
from .helper import (
    build_pipelines,
    get_buckets_with_output_logits_size,
    prestep_for_remote_code_model,
)
from .types import Artifact, ArtifactMetadata, ArtifactVersion, RuntimeConfig

logger = logging.getLogger(__name__)

# Default position id for padding
_POSITION_ID_PAD = 1

# Default param file name
_PARAM_FILE_NAME = "params.safetensors"

# Default index of the padding block when paged attention model is used.
DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX = 0

CACHE_DIR = Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "furiosa" / "llm"


def is_mlperf_optimized_model(model_id: str) -> bool:
    return model_id in (
        "furiosa-ai/mlperf-bert-large",
        "furiosa-ai/mlperf-gpt-j-6b",
        "meta-llama/Llama-2-70b-chat-hf",
        "meta-llama/Meta-Llama-3.1-8B-Instruct",
        "meta-llama/Meta-Llama-3.1-70B-Instruct",
        "eliceai/helpy-edu-b-llama3.1",
        "upstage/SOLAR-10.7B-Instruct-v1.0",
        "furiosa-ai/EXAONE-3.0-7.8B-Instruct-converted",
        "LGAI-EXAONE/EXAONE-3.5-2.4B-Instruct",
        "LGAI-EXAONE/EXAONE-3.5-7.8B-Instruct",
        "LGAI-EXAONE/EXAONE-3.5-32B-Instruct",
    )


[docs] class ArtifactBuilder: """The artifact builder to use in the Furiosa LLM. Args: model_id_or_path: The Huggingface model id or a local path. This corresponds to pretrained_model_name_or_path in HuggingFace Transformers. name: The name of the artifact to build. tensor_parallel_size: The number of PEs for each tensor parallelism group. The default is 4. pipeline_parallel_size: The number of pipeline stages for pipeline parallelism. The default is 1. This param configures the default pipeline parallelism degree for the artifact. pipeline_parallel_size can be overridden when the artifact is loaded. data_parallel_size: The size of the data parallelism group. If not given, it will be derived from total avaialble PEs and other parallelism degrees. prefill_buckets: Specify the bucket size for prefill decode_buckets: Specify the bucket size for decode max_seq_len_to_capture: Maximum sequence length covered by LLM engine. Sequence with larger context than this will not be covered. If no bucket is explicitly specified, a single batch bucket with a context length of this value is created. max_prompt_len: Maximum prompt sequence length covered by LLM engine. Prompt larger than this cannot be handled. If not given, will be obtained from bucket and other configs. num_hidden_layers: Number of hidden layers in the Transformer encoder. seed_for_random_weight: The seed to initialize the random number generator for creating random weight. calculate_logit_only_for_last_token: Whether the model has last block slice optimization applied. quantize_artifact_path: Specifies the path where quantization artifacts generated by the furiosa-model-compressor are saved. compiler_config_overrides: Overrides for the compiler config. This is a dictionary that includes the configuration for the compiler. do_decompositions_for_model_rewrite: Whether to decompose some ops to describe various parallelism strategies with mppp config. When the value is True, mppp config that matches with the decomposed FX graph should be given. use_blockwise_compile: If True, each task will be compiled in the unit of transformer block, and compilation result for transformer block is generated once and reused. The default is ``True``. num_blocks_per_supertask: The number of transformer blocks that will be merged into one supertask. This option is valid only when `use_blockwise_compile=True`. The default is 1. num_blocks_per_pp_stage: The number of transformers blocks per each pipeline parallelism stage. If not given, transformer blocks will be distributed equally. embed_all_constants_into_graph: Whether to embed constant tensors into graph or make them as input of the graph and save them as separate files. The default is False. optimize_logit_shape: Add logit slice or removal operation in the graph for optimized performance. kv_cache_sharing_across_beams_config: Configuration for sharing kv cache across beams. This argument must be given if and only if the model is optimized to share kv cache across beams. If this argument is given, decode phase buckets with batch size of ``batch_size`` \* ``kv_cache_sharing_across_beams_config.beam_width`` will be created. paged_attention_block_size: The maximum number of tokens that can be stored in a single paged attention block. This argument must be given if model uses paged attention. default_scheduler_config: Default configuration for the scheduler, allowing to maximum number of tasks which can be queued to HW, maximum number of samples that can be processed by the scheduler, and ratio of spare blocks that are reserved by scheduler. trust_remote_code: Trust remote code when downloading the model and tokenizer from HuggingFace. """ def __init__( self, model_id_or_path: str, name: str = "", *, # Parallelize Config tensor_parallel_size: int = 4, pipeline_parallel_size: int = 1, data_parallel_size: Optional[int] = None, # Bucket Config prefill_buckets: Sequence[Tuple[int, int]] = [], decode_buckets: Sequence[Tuple[int, int]] = [], max_seq_len_to_capture: int = 2048, max_prompt_len: Optional[int] = None, # Model Config num_hidden_layers: Optional[int] = None, seed_for_random_weight: Optional[int] = None, calculate_logit_only_for_last_token: Optional[bool] = True, # Quantize Config quantize_artifact_path: Optional[os.PathLike] = None, # Compiler Config compiler_config_overrides: Optional[Mapping] = None, do_decompositions_for_model_rewrite: bool = False, use_blockwise_compile: bool = True, num_blocks_per_supertask: int = 1, num_blocks_per_pp_stage: Optional[Sequence[int]] = None, embed_all_constants_into_graph: bool = False, optimize_logit_shape: bool = True, kv_cache_sharing_across_beams_config: Optional[KvCacheSharingAcrossBeamsConfig] = None, # PagedAttention Config paged_attention_block_size: int = 1, # Scheduler Config for RuntimeConfig default_scheduler_config: SchedulerConfig = SchedulerConfig(), # Other configs trust_remote_code: Optional[bool] = None, **kwargs, ): self.artifact_id = str(uuid.uuid4()) # Constants default values self.optimize_paged_attention_block_loading = True self.sparse_select_version = "v1.5" self.one_supertask_per_device = True self.quantize_artifact_path = quantize_artifact_path self.model_id_or_path = model_id_or_path self.trust_remote_code = trust_remote_code # Pre-step for model configurations if is_quantized_model_path(model_id_or_path): self.is_from_quantized_model = True self.model_path = ( Path(model_id_or_path) if not isinstance(model_id_or_path, Path) else model_id_or_path ) self.model_metadata = get_model_metadata_from_quantized_model( self.model_path, trust_remote_code=trust_remote_code ) # Warnings for unsupported options if seed_for_random_weight is not None: logging.warning("Random weight model is not supported for quantized model.") self.seed_for_random_weight = None if calculate_logit_only_for_last_token is not None: logging.warning( "calculate_logit_only_for_last_token is not supported for quantized model." ) self.calculate_logit_only_for_last_token = None else: self.is_from_quantized_model = False # Presteps for model prepared by model id self.model_metadata = get_model_metadata_from_model_id( model_id=self.model_id, num_hidden_layers=num_hidden_layers, calculate_logit_only_for_last_token=calculate_logit_only_for_last_token, quantize_artifact_path=quantize_artifact_path, prefill_chunk_size=default_scheduler_config.prefill_chunk_size, trust_remote_code=trust_remote_code, ) self.seed_for_random_weight = seed_for_random_weight self.calculate_logit_only_for_last_token = calculate_logit_only_for_last_token self._name = name # The followings are required for compilation self.devices = ",".join(f"npu:{i}" for i in range(0, pipeline_parallel_size)) self.tensor_parallel_size = tensor_parallel_size self.pipeline_parallel_size = pipeline_parallel_size self.data_parallel_size = data_parallel_size self.max_seq_len_to_capture = max_seq_len_to_capture self.max_prompt_len = max_prompt_len # Bucket Configuration if not prefill_buckets and not decode_buckets: self.prefill_buckets = [(1, max_seq_len_to_capture)] self.decode_buckets = ( [(1, max_seq_len_to_capture)] if self.model_metadata.is_generative_model else [] ) else: self.prefill_buckets = list(prefill_buckets) self.decode_buckets = list(decode_buckets) self.compiler_config_overrides = compiler_config_overrides self.model_rewriting_config = ModelRewritingConfig( do_decompositions_for_model_rewrite=do_decompositions_for_model_rewrite, use_blockwise_compile=use_blockwise_compile, num_blocks_per_supertask=num_blocks_per_supertask, embed_all_constants_into_graph=embed_all_constants_into_graph, optimize_logit_shape=optimize_logit_shape, ) self.num_blocks_per_pp_stage = num_blocks_per_pp_stage self.kv_cache_sharing_across_beams_config = kv_cache_sharing_across_beams_config if self.model_metadata.attention_type is AttentionType.PAGED_ATTENTION: paged_attention_num_blocks = 65536 # A dummy number until we remove this option if paged_attention_block_size != 1: raise NotImplementedError( "Currently, only paged attention with block_size=1 is supported." ) padding_block_idx = ( DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX if self.optimize_paged_attention_block_loading else None ) self.paged_attention_config: Optional[PagedAttentionConfig] = PagedAttentionConfig( paged_attention_num_blocks, paged_attention_block_size, padding_block_idx ) else: self.paged_attention_config = None self.prefill_chunk_size = default_scheduler_config.prefill_chunk_size self.runtime_config = RuntimeConfig( npu_queue_limit=default_scheduler_config.npu_queue_limit, max_processing_samples=default_scheduler_config.max_processing_samples, spare_blocks_ratio=default_scheduler_config.spare_blocks_ratio, is_offline=default_scheduler_config.is_offline, paged_attention_num_blocks=paged_attention_num_blocks, prefill_buckets=self.prefill_buckets, decode_buckets=self.decode_buckets, prefill_chunk_size=default_scheduler_config.prefill_chunk_size, ) @property def model_id(self) -> str: if self.is_from_quantized_model: furiosa_config_file = self.model_path / _FURIOSA_CONFIG_JSON furiosa_config = json.loads(furiosa_config_file.read_text()) return furiosa_config['model_id'] else: return self.model_id_or_path @property def name(self) -> str: if self._name: return self._name return self.model_id @property def quant_ckpt_file_path(self): if self.is_from_quantized_model: assert self.model_path return self.model_path / _EXPORTED_MODEL_QCKPT if self.quantize_artifact_path: quant_ckpt_file_path = Path(f"{self.quantize_artifact_path}/exported_model.qckpt") return quant_ckpt_file_path if os.path.exists(quant_ckpt_file_path) else None return None @property def qformat_path(self) -> Union[None, Path]: if self.is_from_quantized_model: assert self.model_path return self.model_path / _QFORMAT_YAML if self.quantize_artifact_path: qformat_path = Path(f"{self.quantize_artifact_path}/qformat.yaml") if os.path.exists(qformat_path): return qformat_path else: raise ValueError( "The quantize_artifact_path is specified, but the qformat.yaml file does not exist." ) return None @property def qparam_path(self) -> Union[None, Path]: if self.is_from_quantized_model: assert self.model_path return self.model_path / _QPARAM_NPY if self.quantize_artifact_path: qparam_path = Path(f"{self.quantize_artifact_path}/qparam.npy") if os.path.exists(qparam_path): return qparam_path else: raise ValueError( "The quantize_artifact_path is specified, but the qparam.npy file does not exist." ) return None
[docs] def build( self, save_dir: Union[str, os.PathLike], *, num_pipeline_builder_workers: int = 1, num_compile_workers: int = 1, cache_dir: Optional[os.PathLike] = CACHE_DIR, param_file_path: Optional[os.PathLike] = None, param_saved_format: Literal["safetensors", "pt"] = "safetensors", _cleanup: bool = True, **kwargs, ): """Build the artifacts for given model configurations. Args: save_dir: The path to save the artifacts. With artifacts, you can create ``LLM`` without quantizing or compiling the model again. num_pipeline_builder_workers: The number of workers used for building pipelines (except for compilation). The default is 1 (no parallelism). Setting this value larger than 1 reduces pipeline building time, especially for large models, but requires much more memory. num_compile_workers: The number of workers used for compilation. The default is 1 (no parallelism). cache_dir: The cache directory for all generated files for this LLM instance. When its value is ``None``, caching is disabled. The default is "$HOME/.cache/furiosa/llm". param_file_path: The path to the parameter file to use for pipeline generation. If not specified, the parameters will be saved in a temporary file which will be deleted when ``LLM`` is destroyed. param_saved_format: The format of the parameter file. Only possible value is "safetensors" now. The default is "safetensors". """ import furiosa.native_compiler model_config = self.model_metadata.config # Please refer to an example at https://huggingface.co/docs/transformers/en/main_classes/text_generation#transformers.GenerationMixin.greedy_search.example # Some models like GPT-2 may not have pad_token_id. BTW, when we run a batch of sequence generations, # We must need pad_token_id to fill the batch with pad. With Hugging Face Transformers, # users should handle this issue. Our goal is to provide a better useability for users. # We handle this issue within LLM class. model_config.pad_token_id = model_config.eos_token_id kv_cache_dtype = self.model_metadata.kv_cache_dtype original_model_type = self.model_metadata.get_optimized_cls() original_model_name = f"{original_model_type.__module__}.{original_model_type.__name__}" ( prefill_buckets_with_output_size, decode_buckets_with_output_size, other_buckets_with_output_size, ) = get_buckets_with_output_logits_size( self.model_metadata, ManualBucketConfig( prefill_buckets=self.prefill_buckets, decode_buckets=self.decode_buckets, ), self.max_prompt_len if self.max_prompt_len else self.max_seq_len_to_capture, self.max_seq_len_to_capture, num_speculative_tokens=None, prefill_chunk_size=self.prefill_chunk_size, optimize_bucket_output_logits_size=self.model_rewriting_config.optimize_logit_shape, ) prefill_buckets = [ bucket_with_output_size.bucket for bucket_with_output_size in prefill_buckets_with_output_size ] decode_buckets = [ bucket_with_output_size.bucket for bucket_with_output_size in decode_buckets_with_output_size ] other_buckets = [ bucket_with_output_size.bucket for bucket_with_output_size in other_buckets_with_output_size ] packing_type: Literal["IDENTITY"] = ( "IDENTITY" # TODO: enum PackingType has `IDENTITY` and `GREEDY` ) generator_config = GeneratorConfig( _POSITION_ID_PAD, get_list_with_no_dup_with_order_preserved( (*prefill_buckets, *decode_buckets, *other_buckets) ), original_model_name, self.paged_attention_config, packing_type, self.kv_cache_sharing_across_beams_config, None, None, ) model_creation_info = ModelCreationInfo( self.model_metadata, False if self.seed_for_random_weight is None else True, seed=self.seed_for_random_weight, qformat_path=self.qformat_path, qparam_path=self.qparam_path, quant_ckpt_file_path=self.quant_ckpt_file_path, ) beam_size_or_none = ( None if self.kv_cache_sharing_across_beams_config is None else self.kv_cache_sharing_across_beams_config.beam_width ) compiler_config_context = CompilerConfigContext( model_metadata=self.model_metadata, beam_size=beam_size_or_none, compiler_config_overrides=self.compiler_config_overrides, ) if _cleanup: self.tmp_dir = tempfile.TemporaryDirectory() tmp_dir_path = Path(self.tmp_dir.name) else: tmp_dir_path = Path(tempfile.mkdtemp()) if not param_file_path: if cache_dir and model_creation_info.is_hashable(): param_file_cache_dir = Path(cache_dir) / "param_files" param_file_path = get_param_file_with_cache( model_creation_info, param_file_cache_dir ) else: assert isinstance(tmp_dir_path, Path) param_file_path = get_param_file_with_cache(model_creation_info, tmp_dir_path) cache_dir = None if cache_dir is None else Path(cache_dir) # For now, `PipelineParallelismMppp` supports all valid cases because only pipeline parallelism is needed to be expressed within one pipeline. if self.num_blocks_per_pp_stage and "mppp" in kwargs: logging.warning( "`num_blocks_per_pp_stage` and custom `mppp` is given at the same time.`num_blocks_per_pp_stage` is ignored." ) mppp = kwargs.pop("mppp", None) or PipelineParallelismMppp(self.num_blocks_per_pp_stage) devices = parse_devices_str(self.devices) normalized_mesh = [ [dev for tp_group in pp_tp_group for dev in tp_group] for pp_tp_group in get_device_mesh( devices, self.tensor_parallel_size, self.pipeline_parallel_size, self.data_parallel_size, ) ] self.data_parallel_size = len(normalized_mesh) parallel_config = ParallelConfig( tensor_parallel_size=self.tensor_parallel_size, pipeline_parallel_size=self.pipeline_parallel_size, ) logger.info( f"Buckets with output sizes: {pformat([*prefill_buckets_with_output_size, *decode_buckets_with_output_size, *other_buckets_with_output_size])}" ) if self.trust_remote_code: prestep_for_remote_code_model(self.model_metadata, num_pipeline_builder_workers) pipelines_with_metadata = build_pipelines( model_creation_info, [ *prefill_buckets_with_output_size, *decode_buckets_with_output_size, *other_buckets_with_output_size, ], normalized_mesh[0], param_file_path, cache_dir, LLMBackend.FURIOSA_RT_V2, mppp, SuperTaskKind.EDF, self.one_supertask_per_device, self.model_rewriting_config.use_blockwise_compile, self.model_rewriting_config.do_decompositions_for_model_rewrite, kv_cache_dtype.to_torch_dtype() if kv_cache_dtype else None, generator_config.paged_attention_config, self.sparse_select_version, generator_config.kv_cache_sharing_across_beams_config, tmp_dir_path, self.model_metadata, compiler_config_context, num_pipeline_builder_workers, num_compile_workers, self.model_rewriting_config.embed_all_constants_into_graph, self.model_rewriting_config.num_blocks_per_supertask, self.model_metadata.is_generative_model, param_saved_format, **kwargs, ) if len(pipelines_with_metadata) == 0: raise ValueError("No pipeline is generated") metadata = ArtifactMetadata( artifact_id=self.artifact_id, name=self.name, timestamp=int(time.time()), version=ArtifactVersion( furiosa_llm=_get_commit_id(), furiosa_compiler=furiosa.native_compiler.compiler_git_short_hash(), ), ) pipeline_metadata_list = [metadata for _, metadata in pipelines_with_metadata] pipelines = [pipeline for pipeline, _ in pipelines_with_metadata] artifact = Artifact( metadata=metadata, devices=self.devices, generator_config=generator_config, hf_config=self.model_metadata.config.to_dict(), model_metadata=self.model_metadata, model_rewriting_config=self.model_rewriting_config, parallel_config=parallel_config, pipeline_metadata_list=pipeline_metadata_list, max_prompt_len=self.max_prompt_len, ) ArtifactBuilder.__save_artifacts( save_dir, artifact, pipelines, self.runtime_config, )
@staticmethod def __save_artifacts( path: Union[str, os.PathLike], artifact: Artifact, pipelines: Sequence[Pipeline], runtime_config: RuntimeConfig, ): import shutil path = Path(path) path.mkdir(parents=True, exist_ok=True) for idx, pipeline in enumerate(pipelines): blob_kind = pipeline.get_blob_kind() for id, blob in pipeline.blobs.items(): kind = blob_kind.get(id) if kind == SuperTaskKind.FX: with open(f"{path}/{id}.fx", "w") as fp: fp.write(blob) elif kind == SuperTaskKind.EDF: with open(f"{path}/{id}.edf", "wb") as fp: fp.write(blob.serialize()) # type: ignore[attr-defined] else: raise NotImplementedError(f"SuperTask [{kind}] is not supported to save") pipeline.blobs[id] = None # type: ignore[assignment] for param_idx, param_file in pipeline.param_files.items(): filename = os.path.basename(param_file.path) new_path = Path(f"{path}/{filename}") if not new_path.exists(): shutil.copyfile(param_file.path, new_path) pipeline.param_files[param_idx].path = filename artifact.append_pipeline(json.loads(pipeline.to_json())) artifact.export(f"{path}/artifact.json") runtime_config.export(f"{path}/runtime_config.json")
def get_model_metadata_from_quantized_model( model_path: Union[str, Path], trust_remote_code: Optional[bool] ) -> ModelMetadata: model_path = Path(model_path) if not isinstance(model_path, Path) else model_path (furiosa_config, optimization_config, quantization_config, _, _, _) = ( _load_quantized_model_meta(model_path) ) hf_configs = AutoConfig.from_pretrained(model_path) model_metadata = ModelMetadata( pretrained_id=furiosa_config['model_id'], llm_config=LLMConfig( optimization_config=optimization_config, quantization_config=quantization_config, ), trust_remote_code=trust_remote_code, ) model_metadata = model_metadata.with_hf_configs(hf_configs.to_diff_dict()) assert model_metadata.is_generative_model return model_metadata def get_model_metadata_from_model_id( model_id, num_hidden_layers: Optional[int] = None, calculate_logit_only_for_last_token: Optional[bool] = False, quantize_artifact_path: Optional[os.PathLike] = None, prefill_chunk_size: Optional[int] = None, trust_remote_code: Optional[bool] = None, ) -> ModelMetadata: model_metadata = ( ModelMetadata.init_with_mlperf_optim_options(model_id, trust_remote_code=trust_remote_code) if is_mlperf_optimized_model(model_id) else ModelMetadata(model_id, trust_remote_code=trust_remote_code) ) # Override model config, optimization config if num_hidden_layers: model_metadata = model_metadata.with_num_layers(num_hidden_layers) if calculate_logit_only_for_last_token: model_metadata = model_metadata.with_optimizations( {"calculate_logit_only_for_last_token": calculate_logit_only_for_last_token} ) if prefill_chunk_size: # For chunked prefill, model optimized for spec dec is needed, which can accept decode buckets with inputs_ids_len > 1. model_metadata = model_metadata.with_optimizations( {"optimized_for_speculative_decoding": True} ) ## Quantization Configuration if quantize_artifact_path: # model-compressor specific paths qformat_path = Path(f"{quantize_artifact_path}/qformat.yaml") qparam_path = Path(f"{quantize_artifact_path}/qparam.npy") if os.path.exists(qformat_path): qformat_path = qformat_path else: raise ValueError( "The quantize_artifact_path is specified, but the qformat.yaml file does not exist." ) if os.path.exists(qparam_path): qparam_path = qparam_path else: raise ValueError( "The quantize_artifact_path is specified, but the qparam.npy file does not exist." ) quantization_config = QuantizationConfig.from_qformat(qformat_path) model_metadata = model_metadata.with_quantization_config(quantization_config) return model_metadata