from collections import Counter
from contextlib import nullcontext
from itertools import chain
import json
import logging
import os
from pathlib import Path
from pprint import pformat
import shutil
import tempfile
import time
from typing import (
Any,
Dict,
List,
Literal,
Mapping,
Optional,
Sequence,
Tuple,
Union,
)
import uuid
import warnings
from zipfile import ZipFile
from more_itertools import zip_equal
from transformers import AutoConfig
from furiosa.native_llm_common import compute_limits
from furiosa_llm.artifact.types.config import (
ArtifactConfig,
BucketConfig,
CompilerConfig,
ModelConfig,
ParallelConfig,
SpeculativeDecodingConfig,
)
from furiosa_llm.artifact.types.next_gen import Artifact as NextGenArtifact
from furiosa_llm.artifact.types.next_gen import ModelArtifact as NextGenModelArtifact
from furiosa_llm.models import tasks
from furiosa_llm.models.config_types import (
BucketWithOutputLogitsSize,
TokenwiseBucket,
)
from furiosa_llm.models.next_gen_config_types import ComposableKernelMetadata, CompositionType
from furiosa_llm.optimum.model_configs import find_canonical_model_id
from furiosa_llm.optimum.modeling import FURIOSA_CONFIG_JSON
from furiosa_llm.optimum.types import FuriosaConfig, ModelClass, ModelKind
from furiosa_llm.parallelize.export.tensor import (
_INDEX_TENSOR_NAME,
ParamfileFormat,
ParamFileMetadata,
)
from furiosa_llm.parallelize.mppp.config import Device
import furiosa_llm.parallelize.new_pipeline_builder as next_gen
import furiosa_llm.parallelize.pipeline.next_gen as next_gen_pipe
from furiosa_llm.parallelize.pipeline.types import ParamFileInfo
from furiosa_llm.parallelize.pipeline_build_configurer import (
ComposableKernelPipelineBuildConfigGenerator,
)
from furiosa_llm.tokenizer.tokenizer import get_tokenizer
from furiosa_llm.utils import get_list_with_no_dup_with_order_preserved
from ..device import NUM_PES_PER_NPU, get_device_mesh
from ..models import AttentionType, ModelMetadata
from ..models.config_types import (
AttentionBucket,
ManualBucketConfig,
ModelRewritingConfig,
PagedAttentionConfig,
)
from ..parallelize.model_creation_info import ModelCreationInfo
from ..parallelize.mppp.api import PipelineParallelismMppp
from ..parallelize.pipeline import Pipeline
from ..parallelize.pipeline.builder.api import _get_commit_id
from ..parallelize.pipeline.types import SuperTaskKind
from ..parallelize.trace import PARAM_FILE_CACHE_SUBDIR_NAME, get_param_file_with_cache
from .helper import (
get_buckets_with_output_logits_size,
prestep_for_remote_code_model,
)
from .types import next_gen as next_gen_artifact_types
from .types.latest import Artifact, ArtifactMetadata, ModelArtifact
logger = logging.getLogger(__name__)
# Default index of the padding block when paged attention model is used.
DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX = 0
CACHE_DIR = Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "furiosa" / "llm"
BINARY_BUNDLE_ZIP_FILE_NAME = "binary_bundle"
def get_tp_group_device(tp_size: int, start_idx: int) -> Device:
if tp_size <= NUM_PES_PER_NPU:
return Device(f"npu:{start_idx}:0-{tp_size-1}")
else:
assert tp_size % NUM_PES_PER_NPU == 0
return Device(",".join([f"npu:{start_idx + i}" for i in range(tp_size // NUM_PES_PER_NPU)]))
[docs]
class ArtifactBuilder:
"""The artifact builder to use in the Furiosa LLM.
Args:
model_id_or_path: The HuggingFace model id or a local path. This corresponds to
pretrained_model_name_or_path in HuggingFace Transformers.
name: The name of the artifact to build. If not provided, defaults to model_id_or_path.
model_config: Configuration for model HuggingFace settings (trust_remote_code, etc.).
parallel_config: Configuration for parallelization (tensor and pipeline parallelism).
Defaults to tensor_parallel_size=8, pipeline_parallel_size=1.
bucket_config: Configuration for attention buckets and sequence lengths.
Defaults to max_seq_len_to_capture=2048 with auto-generated buckets.
compiler_config: Configuration for compiler and model rewriting options.
speculative_config: Configuration for speculative decoding (experimental).
artifact_config: Configuration for artifact export options.
tensor_parallel_size: Deprecated. Use parallel_config instead.
pipeline_parallel_size: Deprecated. Use parallel_config instead.
max_seq_len_to_capture: Deprecated. Use bucket_config instead.
"""
def __init__(
self,
model_id_or_path: Union[str, Path],
name: str = "",
*,
model_config: Optional[ModelConfig] = None,
parallel_config: Optional[ParallelConfig] = None,
bucket_config: Optional[BucketConfig] = None,
compiler_config: Optional[CompilerConfig] = None,
speculative_config: Optional[SpeculativeDecodingConfig] = None,
artifact_config: Optional[ArtifactConfig] = None,
# Deprecated parameters (for backward compatibility)
tensor_parallel_size: Optional[int] = None,
pipeline_parallel_size: Optional[int] = None,
max_seq_len_to_capture: Optional[int] = None,
):
# Handle deprecated parameters with warnings
# Check for conflicts between deprecated params and config objects first
if (
tensor_parallel_size is not None or pipeline_parallel_size is not None
) and parallel_config is not None:
raise ValueError(
"Cannot specify both deprecated parallelism parameters "
"(tensor_parallel_size, pipeline_parallel_size) and 'parallel_config'. "
"Use parallel_config=ParallelConfig(...) instead."
)
if max_seq_len_to_capture is not None and bucket_config is not None:
raise ValueError(
"Cannot specify both 'max_seq_len_to_capture' and 'bucket_config'. "
"Use bucket_config=BucketConfig(max_seq_len_to_capture=...) instead."
)
# Apply deprecated parameters with warnings
# When using deprecated parameters, fall back to the same defaults used by
# ParallelConfig for any value that is not explicitly specified.
if tensor_parallel_size is not None or pipeline_parallel_size is not None:
effective_tensor_parallel_size = (
tensor_parallel_size if tensor_parallel_size is not None else 8
)
effective_pipeline_parallel_size = (
pipeline_parallel_size if pipeline_parallel_size is not None else 1
)
if tensor_parallel_size is not None:
warnings.warn(
"tensor_parallel_size parameter is deprecated. "
"Use parallel_config=ParallelConfig(tensor_parallel_size=...) instead. "
"When only tensor_parallel_size is provided, "
"pipeline_parallel_size defaults to 1.",
DeprecationWarning,
stacklevel=2,
)
if pipeline_parallel_size is not None:
warnings.warn(
"pipeline_parallel_size parameter is deprecated. "
"Use parallel_config=ParallelConfig(pipeline_parallel_size=...) instead. "
"When only pipeline_parallel_size is provided, "
"tensor_parallel_size defaults to 8.",
DeprecationWarning,
stacklevel=2,
)
parallel_config = ParallelConfig(
tensor_parallel_size=effective_tensor_parallel_size,
pipeline_parallel_size=effective_pipeline_parallel_size,
)
if max_seq_len_to_capture is not None:
warnings.warn(
"max_seq_len_to_capture parameter is deprecated. "
"Use bucket_config=BucketConfig(max_seq_len_to_capture=...) instead.",
DeprecationWarning,
stacklevel=2,
)
bucket_config = BucketConfig(max_seq_len_to_capture=max_seq_len_to_capture)
# Apply defaults for optional configs
if model_config is None:
model_config = ModelConfig()
if parallel_config is None:
parallel_config = ParallelConfig()
if bucket_config is None:
bucket_config = BucketConfig()
if compiler_config is None:
compiler_config = CompilerConfig()
if speculative_config is None:
speculative_config = SpeculativeDecodingConfig()
if artifact_config is None:
artifact_config = ArtifactConfig()
# Store config objects for reference
self._model_config = model_config
self._parallel_config = parallel_config
self._bucket_config = bucket_config
self._compiler_config = compiler_config
self._speculative_config = speculative_config
self._artifact_config = artifact_config
# Store primary fields
self.model_id_or_path = model_id_or_path
self._name = name
# Load HuggingFace model config and find canonical model ID
hf_model_config = AutoConfig.from_pretrained(
model_id_or_path,
trust_remote_code=model_config.trust_remote_code,
revision=model_config.revision,
)
canonical_model_id = find_canonical_model_id(
hf_model_config,
None if isinstance(model_id_or_path, Path) else model_id_or_path,
task=model_config.task,
)
if canonical_model_id is None:
raise ValueError(f"ArtifactBuilder doesn't support {type(hf_model_config)} class")
# Constants default values
self.artifact_id = str(uuid.uuid4())
self.canonical_model_id = canonical_model_id
self.optimize_paged_attention_block_loading = True
self.sparse_select_version = "v1.5"
self.one_supertask_per_device = True
# Presteps for model prepared by model id
self.model_metadata = get_model_metadata_from_model_id(
model_id=self.canonical_model_id,
model_id_or_path=self.model_id_or_path,
hf_overrides=model_config.hf_overrides,
trust_remote_code=model_config.trust_remote_code,
revision=model_config.revision,
task=model_config.task,
use_binary_seq_class=model_config.binary_classification_head,
)
logger.info(f"Derived Model Metadata is {repr(self.model_metadata)}")
# Check if the files to copy exist before building the model
self._validate_copies_from_local()
self._validate_copies_from_model()
# Followings are dummy specifications for parallel size and device configuration, required for compilation.
# These values are expected to be provided by the user when loading artifacts via `LLM.__init__()`.
self.dummy_data_parallel_size = 1
num_chip_per_tp_group = (parallel_config.tensor_parallel_size - 1) // NUM_PES_PER_NPU + 1
self.dummy_normalized_mesh = [
[dev for tp_group in pp_tp_group for dev in tp_group]
for pp_tp_group in get_device_mesh(
[
get_tp_group_device(parallel_config.tensor_parallel_size, i)
for i in range(
0,
self.dummy_data_parallel_size
* parallel_config.pipeline_parallel_size
* num_chip_per_tp_group,
num_chip_per_tp_group,
)
],
parallel_config.tensor_parallel_size,
parallel_config.pipeline_parallel_size,
self.dummy_data_parallel_size,
)
]
# Bucket Configuration
prefill_buckets = list(bucket_config.prefill_buckets)
decode_buckets = list(bucket_config.decode_buckets)
if not prefill_buckets and not decode_buckets:
self.prefill_buckets = [(1, bucket_config.max_seq_len_to_capture)]
self.decode_buckets = (
[(1, bucket_config.max_seq_len_to_capture)]
if self.model_metadata.is_generative_model
else []
)
else:
duplicate_prefill_buckets = [
bucket for bucket, cnt in Counter(prefill_buckets).items() if cnt > 1
]
duplicate_decode_buckets = [
bucket for bucket, cnt in Counter(decode_buckets).items() if cnt > 1
]
if duplicate_prefill_buckets:
logging.warning(
f"Duplicate prefill buckets are found: {duplicate_prefill_buckets}. "
)
if duplicate_decode_buckets:
logging.warning(f"Duplicate decode buckets are found: {duplicate_decode_buckets}. ")
self.prefill_buckets = get_list_with_no_dup_with_order_preserved(prefill_buckets)
self.decode_buckets = get_list_with_no_dup_with_order_preserved(decode_buckets)
append_buckets = list(bucket_config.append_buckets)
if append_buckets:
duplicate_append_buckets = [
bucket for bucket, cnt in Counter(append_buckets).items() if cnt > 1
]
if duplicate_append_buckets:
logging.warning(f"duplicate append buckets are found: {duplicate_append_buckets}")
self.append_buckets = get_list_with_no_dup_with_order_preserved(append_buckets)
else:
self.append_buckets = []
self.model_rewriting_config = ModelRewritingConfig(
do_decompositions_for_model_rewrite=compiler_config.do_decompositions_for_model_rewrite,
use_blockwise_compile=compiler_config.use_blockwise_compile,
embedding_layer_as_single_block=compiler_config.embedding_layer_as_single_block,
embed_all_constants_into_graph=compiler_config.embed_all_constants_into_graph,
optimize_logit_shape=compiler_config.optimize_logit_shape,
max_embeddable_constant_size=compiler_config.max_embeddable_constant_size,
)
if self.model_metadata.attention_type is AttentionType.PAGED_ATTENTION:
if compiler_config.paged_attention_block_size != 1:
raise NotImplementedError(
"Currently, only paged attention with block_size=1 is supported."
)
padding_block_idx = (
DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX
if self.optimize_paged_attention_block_loading
else None
)
self.paged_attention_config: Optional[PagedAttentionConfig] = PagedAttentionConfig(
compiler_config.paged_attention_block_size, padding_block_idx
)
else:
self.paged_attention_config = None
if speculative_config.num_speculative_tokens:
if not (
speculative_config.speculative_model
or speculative_config.allow_empty_speculative_model
):
raise ValueError(
"`num_speculative_tokens` and `speculative_model` must be presented at the same time."
)
@property
def model_id(self) -> str:
return str(self.model_id_or_path)
@property
def name(self) -> str:
if self._name:
return self._name
return self.model_id
def _validate_copies_from_local(self):
"""Check if the files exist in the local directory."""
if self._artifact_config.copies_from_local is None:
return
for file_copy in self._artifact_config.copies_from_local:
if not os.path.exists(file_copy):
raise ValueError(
f"The file {file_copy} of copies_from_local does not exist in the local directory."
)
def _validate_copies_from_model(self):
"""Check if the files are downloadable from the Hugging Face Hub."""
from huggingface_hub import hf_hub_download
if self._artifact_config.copies_from_model is None:
return
for file_copy in self._artifact_config.copies_from_model:
try:
_ = hf_hub_download(self.model_metadata.pretrained_id, file_copy)
except Exception as e:
raise ValueError(
f"Fail to download {file_copy} from Hugging Face Hub's repository {self.model_metadata.pretrained_id}."
) from e
def _build_model_artifact(
self,
*,
num_pipeline_builder_workers: int,
num_compile_workers: int,
num_cpu_per_pipeline_build_worker: int,
num_cpu_per_compile_worker: int,
cache_dir: Optional[os.PathLike] = CACHE_DIR,
param_file_path: Optional[Union[os.PathLike, str]] = None,
param_saved_format: Literal["safetensors", "pt"] = "safetensors",
param_file_max_shard_size: Optional[Union[str, int]] = "5GB",
_use_composable_kernel: bool = False,
_use_activation_dq: bool = False,
_cleanup: bool = True,
_raise_error_if_compile: bool = False,
_includes_composable_ir: bool = False,
**kwargs,
) -> Tuple[
Union[ModelArtifact, NextGenModelArtifact],
Union[List[Pipeline], List[next_gen.Pipeline]],
]:
model_config = self.model_metadata.config
# Please refer to an example at https://huggingface.co/docs/transformers/en/main_classes/text_generation#transformers.GenerationMixin.greedy_search.example
# Some models like GPT-2 may not have pad_token_id. BTW, when we run a batch of sequence generations,
# We must need pad_token_id to fill the batch with pad. With Hugging Face Transformers,
# users should handle this issue. Our goal is to provide a better useability for users.
# We handle this issue within LLM class.
model_config.pad_token_id = model_config.eos_token_id
original_model_type = self.model_metadata.get_optimized_cls()
max_seq_len_to_capture = self._bucket_config.max_seq_len_to_capture
max_prompt_len = (
self._bucket_config.max_prompt_len or max_seq_len_to_capture
if self._bucket_config.prefill_chunk_size
else None
)
(
prefill_buckets_with_output_size,
decode_buckets_with_output_size,
other_buckets_with_output_size,
) = get_buckets_with_output_logits_size(
self.model_metadata,
ManualBucketConfig(
prefill_buckets=self.prefill_buckets,
decode_buckets=self.decode_buckets,
),
max_prompt_len if max_prompt_len else max_seq_len_to_capture,
max_seq_len_to_capture,
num_speculative_tokens=self._speculative_config.num_speculative_tokens,
prefill_chunk_size=self._bucket_config.prefill_chunk_size,
optimize_bucket_output_logits_size=self.model_rewriting_config.optimize_logit_shape,
)
other_buckets_with_output_size += [
BucketWithOutputLogitsSize(
(bucket := AttentionBucket(b[0], b[1], b[1] - b[2])),
self.model_metadata.get_output_logits_size(bucket),
)
for b in self.append_buckets
]
prefill_buckets = [
bucket_with_output_size.bucket
for bucket_with_output_size in prefill_buckets_with_output_size
]
decode_buckets = [
bucket_with_output_size.bucket
for bucket_with_output_size in decode_buckets_with_output_size
]
other_buckets = [
bucket_with_output_size.bucket
for bucket_with_output_size in other_buckets_with_output_size
]
parallel_config = self._parallel_config
model_creation_info = ModelCreationInfo(
self.model_metadata,
False if self._model_config.seed_for_random_weight is None else True,
seed=self._model_config.seed_for_random_weight,
)
if _cleanup:
self.tmp_dir = tempfile.TemporaryDirectory()
tmp_dir_path = Path(self.tmp_dir.name)
else:
tmp_dir_path = Path(tempfile.mkdtemp())
if param_file_path:
param_file_metadata = ParamFileMetadata.load(
param_file_path, ParamfileFormat.from_str(param_saved_format)
)
else:
if cache_dir and model_creation_info.is_hashable():
param_file_cache_dir = Path(cache_dir) / PARAM_FILE_CACHE_SUBDIR_NAME
param_file_metadata = get_param_file_with_cache(
model_creation_info,
param_file_cache_dir,
max_shard_size=param_file_max_shard_size,
)
else:
assert isinstance(tmp_dir_path, Path)
param_file_metadata = get_param_file_with_cache(
model_creation_info, tmp_dir_path, max_shard_size=param_file_max_shard_size
)
cache_dir = None if cache_dir is None else Path(cache_dir)
# For now, `PipelineParallelismMppp` supports all valid cases because only pipeline parallelism is needed to be expressed within one pipeline.
# TODO: Make the mppp configuration specific to the speculative_model.
# for e.g., consider introducing a keyword argument (e.g., "speculative_mppp") for this purpose
mppp = kwargs.pop("mppp", None) or PipelineParallelismMppp()
logger.info(
f"Buckets with output sizes: {pformat([*prefill_buckets_with_output_size, *decode_buckets_with_output_size, *other_buckets_with_output_size])}"
)
if self._model_config.trust_remote_code:
prestep_for_remote_code_model(self.model_metadata, num_pipeline_builder_workers)
bucket_limits = compute_limits(
tokenwise_buckets=[TokenwiseBucket(x) for x in self._bucket_config.tokenwise_seq_lens],
attention_buckets=list(chain(prefill_buckets, decode_buckets, other_buckets)),
)
logger.info(f"The computed bucket limits are {bucket_limits.max_executable_len}")
max_model_len = getattr(self.model_metadata.config, "max_position_embeddings", None)
if max_model_len is None:
logger.warning(
"Unable to determine 'max_model_len' from the provided model configuration."
"Skipping validation for the artifact's supported model length limits."
)
if max_model_len is not None and max_model_len < bucket_limits.max_executable_len:
raise ValueError(
f"The maximum executable length {bucket_limits.max_executable_len} derived from bucket configuration "
f"exceeds the model's maximum position embeddings {max_model_len}. "
f"Please reduce the bucket sizes or adjust tokenwise_seq_lens to fit within the model's limit."
)
if _use_composable_kernel:
pipeline = next_gen.build_pipeline(
f"{original_model_type.__module__}.{original_model_type.__name__}",
model_creation_info,
self.dummy_normalized_mesh[0],
"postlower+edf" if _includes_composable_ir else "edf",
mppp,
ComposableKernelPipelineBuildConfigGenerator(
attention_buckets=list(chain(prefill_buckets, decode_buckets, other_buckets)),
tokenwise_seq_lens=self._bucket_config.tokenwise_seq_lens,
),
param_file_metadata,
tmp_dir_path,
paged_attention_block_size=(
self.paged_attention_config.block_size if self.paged_attention_config else None
),
padding_block_idx=(
self.paged_attention_config.padding_block_idx
if self.paged_attention_config
else None
),
# TODO: expose valid_length_input_tensor option.
add_valid_length_input_tensor=False,
use_activation_dq=_use_activation_dq,
logits_slice_config=None,
embed_all_constants_into_graph=self.model_rewriting_config.embed_all_constants_into_graph,
compiler_config_overrides=self._compiler_config.compiler_config_overrides,
cache_dir=cache_dir,
num_pipeline_builder_workers=num_pipeline_builder_workers,
num_compile_workers=num_compile_workers,
num_cpu_per_pipeline_build_worker=num_cpu_per_pipeline_build_worker,
num_cpu_per_compile_worker=num_cpu_per_compile_worker,
_raise_error_if_compile=_raise_error_if_compile,
)[0]
pipe_meta = ComposableKernelMetadata(
attention_buckets=list(chain(prefill_buckets, decode_buckets, other_buckets)),
tokenwise_buckets=list(self._bucket_config.tokenwise_seq_lens),
composition_type=CompositionType.KERNELWISE,
)
return (
NextGenModelArtifact(
hf_config=self.model_metadata.config.to_dict(),
model_metadata=self.model_metadata,
parallel_config=parallel_config,
pipelines=[],
pipeline_metadata_list=[pipe_meta],
max_prompt_len=max_prompt_len,
),
[pipeline],
)
else:
raise ValueError("Block-wise artifact is not supported anymore.")
[docs]
def build(
self,
save_dir: Union[str, os.PathLike],
*,
num_pipeline_builder_workers: int = 1,
num_compile_workers: int = 1,
num_cpu_per_pipeline_build_worker: int = 1,
num_cpu_per_compile_worker: int = 1,
cache_dir: Optional[os.PathLike] = CACHE_DIR,
param_file_path: Optional[os.PathLike] = None,
param_saved_format: Literal["safetensors", "pt"] = "safetensors",
param_file_max_shard_size: Optional[Union[str, int]] = "5GB",
_cleanup: bool = True,
_raise_error_if_compile: bool = False,
**kwargs,
):
"""Build the artifacts for given model configurations.
Args:
save_dir: The path to save the artifacts. With artifacts, you can create ``LLM`` without quantizing or compiling the model again.
num_pipeline_builder_workers: The number of workers used for building pipelines (except for compilation). The default is 1 (no parallelism).
Setting this value larger than 1 reduces pipeline building time, especially for large models, but requires much more memory.
num_compile_workers: The number of workers used for compilation. The default is 1 (no parallelism).
num_cpu_per_pipeline_build_worker : The number of cpu cores allocated for each pipeline build worker. The default is 1.
num_cpu_per_compile_worker : The number of cpu cores allocated for each compile worker. The default is 1.
cache_dir: The cache directory for all generated files for this LLM instance.
When its value is ``None``, caching is disabled. The default is "$HOME/.cache/furiosa/llm".
param_file_path: The path to the parameter file to use for pipeline generation.
If not specified, the parameters will be saved in a temporary file which will be
deleted when ``LLM`` is destroyed.
param_saved_format: The format of the parameter file. Only possible value is "safetensors" now.
The default is "safetensors".
param_file_max_shard_size: The maximum size of single parameter file. Parameter file will be
split into smaller files to be less than this size. The default is "5GB".
"""
import furiosa.native_compiler
metadata = ArtifactMetadata( # type: ignore[assignment]
artifact_id=self.artifact_id,
name=self.name,
timestamp=int(time.time()),
furiosa_llm_version=_get_commit_id(),
furiosa_compiler_version=furiosa.native_compiler.compiler_git_short_hash(),
includes_composable_ir=self._compiler_config.includes_composable_ir,
)
# Make sure the model local, get a model hash, and set it to ModelMetadata
self.model_metadata.ensure_model_and_update_weight_hash()
target_model_artifact, target_model_pipelines = self._build_model_artifact(
num_pipeline_builder_workers=num_pipeline_builder_workers,
num_compile_workers=num_compile_workers,
num_cpu_per_pipeline_build_worker=num_cpu_per_pipeline_build_worker,
num_cpu_per_compile_worker=num_cpu_per_compile_worker,
cache_dir=cache_dir,
param_file_path=param_file_path,
param_saved_format=param_saved_format,
_cleanup=_cleanup,
param_file_max_shard_size=param_file_max_shard_size,
_use_composable_kernel=self._compiler_config.composable_kernel,
_use_activation_dq=self._compiler_config.use_activation_dq,
_includes_composable_ir=self._compiler_config.includes_composable_ir,
_raise_error_if_compile=_raise_error_if_compile,
**kwargs,
)
speculative_model_artifact = None
speculative_model_pipelines: Union[List[Pipeline], List[next_gen.Pipeline]] = []
if self._compiler_config.composable_kernel:
assert isinstance(target_model_artifact, NextGenModelArtifact)
artifact: Union[Artifact, NextGenArtifact] = NextGenArtifact(
metadata=metadata,
model=target_model_artifact,
generator_config=next_gen_artifact_types.GeneratorConfig(
num_speculative_tokens=self._speculative_config.num_speculative_tokens,
),
speculative_model=speculative_model_artifact,
version=next_gen_artifact_types.SCHEMA_VERSION,
)
else:
raise ValueError("Block-wise artifact is not supported anymore.")
ArtifactBuilder.__save_artifacts(
save_dir,
artifact,
target_model_pipelines,
speculative_model_pipelines,
self._artifact_config.copies_from_model,
self._artifact_config.copies_from_local,
self._artifact_config.bundle_binaries,
)
@staticmethod
def __preprocess_for_pipeline_save(
pipelines: Any,
path: Union[str, os.PathLike],
bundle_binaries: bool = True,
) -> List[Dict[str, Any]]:
from furiosa.native_compiler import CompiledGraph
def save_blob_to_zip(zip_file: ZipFile, file_name: str, blob: Any) -> None:
zip_file.writestr(file_name, blob)
def save_blob_to_dir(base_path: str, file_name: str, blob: Any) -> None:
mode = "w" if isinstance(blob, str) else "wb"
with open(os.path.join(base_path, file_name), mode) as f:
f.write(blob)
def copy_param_file(
param_file_info: ParamFileInfo, base_path: Union[str, os.PathLike]
) -> None:
param_file_path = Path(param_file_info.path)
parent_path = os.fspath(param_file_path.parent)
if _INDEX_TENSOR_NAME in os.listdir(parent_path):
# This param file is a shard in saftensors file set.
# Copy whole directory
new_parent_path = f"{base_path}/{os.path.basename(parent_path)}"
if not os.path.exists(new_parent_path):
shutil.copytree(parent_path, new_parent_path)
new_file_path = os.path.join(new_parent_path, os.path.basename(param_file_path))
else:
new_file_path = f"{base_path}/{os.path.basename(param_file_path)}"
if not os.path.exists(new_file_path):
shutil.copy(param_file_path, new_file_path)
param_file_info.path = os.path.relpath(new_file_path, base_path)
processed_pipelines = []
zip_path = os.path.join(path, f"{BINARY_BUNDLE_ZIP_FILE_NAME}.zip")
save_ftn = save_blob_to_zip if bundle_binaries else save_blob_to_dir
# NOTE: Each EDF file can be several gigabytes in size, so we should avoid
# unnecessarily storing EDF blob data elsewhere in the program (e.g., in a temporary dictionary).
zip_mode = "a" if os.path.exists(zip_path) else "w"
with ZipFile(zip_path, mode=zip_mode) if bundle_binaries else nullcontext() as zip_file: # type: ignore[call-overload]
save_target = zip_file if bundle_binaries else path
for pipeline in pipelines:
for id, blob in pipeline.blobs.items():
kind = pipeline.get_blob_kind().get(id)
if kind in (SuperTaskKind.FX, next_gen.TaskKind.FX):
assert isinstance(blob, str)
file_names = [f"{id}.fx"]
data = [blob]
elif kind in (SuperTaskKind.EDF, next_gen.TaskKind.EDF):
assert isinstance(blob, CompiledGraph)
file_names = [f"{id}.edf"]
data = [blob.serialize()] # type: ignore[list-item]
elif kind is next_gen.TaskKind.POSTLOWER_EDF:
assert pipeline.composable_blobs
assert isinstance(pipeline.composable_blobs.get(id), CompiledGraph)
file_names = [
f"{id}.edf",
f"{id}.postlower",
]
data = [blob.serialize(), pipeline.composable_blobs[id].serialize()] # type: ignore[assignment]
pipeline.composable_blobs[id] = None
else:
raise NotImplementedError(f"SuperTask {kind} is not supported to save")
for f, d in zip_equal(file_names, data):
save_ftn(save_target, f, d) # type: ignore[arg-type]
pipeline.blobs[id] = None
for param_file_info in pipeline.param_files.values():
copy_param_file(param_file_info, path)
processed_pipelines.append(json.loads(pipeline.to_json()))
return processed_pipelines
@staticmethod
def __save_artifacts(
path: Union[str, os.PathLike],
artifact: Union[Artifact, NextGenArtifact],
target_model_pipelines: Union[Sequence[Pipeline], Sequence[next_gen_pipe.Pipeline]],
speculative_model_pipelines: Union[Sequence[Pipeline], Sequence[next_gen_pipe.Pipeline]],
copies_from_model: Optional[Sequence[str]],
copies_from_local: Optional[Sequence[os.PathLike]],
bundle_binaries: bool = True,
):
import shutil
from huggingface_hub import hf_hub_download
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
artifact.model = artifact.model.model_copy(
update={
"pipelines": ArtifactBuilder.__preprocess_for_pipeline_save(
target_model_pipelines, path, bundle_binaries
),
},
)
if artifact.speculative_model:
artifact.speculative_model = artifact.speculative_model.model_copy(
update={
"pipelines": ArtifactBuilder.__preprocess_for_pipeline_save(
speculative_model_pipelines, path, bundle_binaries
),
},
)
artifact.export(f"{path}/artifact.json")
model_metadata = artifact.model.model_metadata
# Save Transformers configs to make an artifact more compatible with Hugging Face Transformers
tokenizer = get_tokenizer(model_metadata.model_id_or_path)
tokenizer.save_pretrained(path)
# Save the configuration file for the draft model.
# Rename it to `draft_model_config.json` to avoid conflicts with the target model's `config.json`.
if artifact.speculative_model:
draft_model_config = artifact.speculative_model.model_metadata.config
draft_model_config.save_pretrained(path)
os.rename(path / "config.json", path / "draft_model_config.json")
# Save config.json for target model
hf_config = model_metadata.config
hf_config.save_pretrained(path)
# Usually, this copies important files like README.md, LICENSE.txt, etc.
# So, we should fail if copying fails.
if copies_from_model is not None:
for hub_file in copies_from_model:
cache_path = hf_hub_download(model_metadata.pretrained_id, hub_file)
shutil.copy(cache_path, path)
if copies_from_local is not None:
for local_file in copies_from_local:
shutil.copy(local_file, path)
# Save furiosa_config.json
furiosa_config = FuriosaConfig(
model_id=model_metadata.pretrained_id,
model_kinds=[ModelKind.ARTIFACT],
model_class=ModelClass.from_class(model_metadata.get_optimized_cls()),
llm_config=model_metadata.llm_config,
)
furiosa_config_json_path = path / FURIOSA_CONFIG_JSON
furiosa_config.export(furiosa_config_json_path)
def get_model_metadata_from_model_id(
model_id: str,
model_id_or_path: Optional[Union[str, Path]] = None,
hf_overrides: Mapping[str, Any] = {},
trust_remote_code: Optional[bool] = None,
revision: Optional[str] = None,
task: Optional["tasks.GenerationTask | tasks.PoolingTask"] = None,
use_binary_seq_class: bool = False,
) -> ModelMetadata:
use_binary_seq_class = use_binary_seq_class and task == tasks.SCORE
model_metadata = ModelMetadata(
model_id,
model_id_or_path=model_id_or_path,
trust_remote_code=trust_remote_code,
revision=revision,
task_type=task,
use_binary_seq_class=use_binary_seq_class,
)
# Override model config
if hf_overrides:
model_metadata = model_metadata.with_hf_configs(hf_overrides)
return model_metadata
# Resolve forward reference for SpeculativeDecodingConfig
# This must be called after ArtifactBuilder is defined
SpeculativeDecodingConfig.model_rebuild()