from contextlib import nullcontext
from itertools import chain
import json
import logging
import os
from pathlib import Path
from pprint import pformat
import shutil
import tempfile
import time
from typing import (
Any,
Dict,
List,
Literal,
Optional,
Sequence,
Tuple,
Union,
)
import uuid
from zipfile import ZipFile
from furiosa_llm.artifact.types.config import (
ArtifactConfig,
BucketConfig,
CompilerConfig,
ModelConfig,
ParallelConfig,
)
from furiosa_llm.artifact.types.next_gen import Artifact as NextGenArtifact
from furiosa_llm.artifact.types.next_gen import ModelArtifact as NextGenModelArtifact
from furiosa_llm.generation_config import save_generation_config
from furiosa_llm.metadata.next_gen_config_types import ComposableKernelMetadata, CompositionType
from furiosa_llm.parallelize.export.tensor import (
_INDEX_TENSOR_NAME,
ParamfileFormat,
ParamFileMetadata,
)
import furiosa_llm.parallelize.new_pipeline_builder as next_gen
import furiosa_llm.parallelize.pipeline.next_gen as next_gen_pipe
from furiosa_llm.parallelize.pipeline.types import ParamFileInfo
from furiosa_llm.parallelize.pipeline_build_configurer import (
ComposableKernelPipelineBuildConfigGenerator,
)
from furiosa_llm.tokenizer.tokenizer import get_tokenizer
from furiosa_llm.utils import zip_equal
from ..metadata import AttentionType, ModelMetadata, load_hf_config
from ..metadata.config_types import PagedAttentionConfig
from ..parallelize.model_creation_info import ModelCreationInfo
from ..parallelize.mppp.api import PipelineParallelismMppp
from ..parallelize.mppp.config import Device
from ..parallelize.pipeline import Pipeline
from ..parallelize.pipeline.builder.api import _get_commit_id
from ..parallelize.pipeline.types import SuperTaskKind
from ..parallelize.trace import PARAM_FILE_CACHE_SUBDIR_NAME, get_param_file_with_cache
from ..utils import maybe_register_config_serialize_by_value
from .resolver import (
ResolvedBuckets,
resolve_device_mesh,
resolve_max_model_len,
resolve_model_metadata,
)
from .types import next_gen as next_gen_artifact_types
from .types.latest import ArtifactMetadata
from .validator import (
validate_artifact_config,
validate_bucket_config,
validate_hf_config,
validate_parallel_config,
)
logger = logging.getLogger(__name__)
# Default index of the padding block when paged attention model is used.
DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX = 0
CACHE_DIR = Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "furiosa" / "llm"
BINARY_BUNDLE_ZIP_FILE_NAME = "binary_bundle"
[docs]
class ArtifactBuilder:
"""The artifact builder to use in the Furiosa LLM.
Args:
model_id_or_path: The HuggingFace model id or a local path. This corresponds to
pretrained_model_name_or_path in HuggingFace Transformers.
name: The name of the artifact to build. If not provided, defaults to model_id_or_path.
model_config: Configuration for model HuggingFace settings (trust_remote_code, etc.).
parallel_config: Configuration for parallelization (tensor and pipeline parallelism).
Defaults to tensor_parallel_size=8, pipeline_parallel_size=1.
bucket_config: Configuration for attention buckets and sequence lengths.
When all bucket fields are empty, a matching bucket preset is
automatically applied based on the model metadata.
compiler_config: Configuration for compiler and model rewriting options.
artifact_config: Configuration for artifact export options.
"""
# Config
_model_id_or_path: Union[str, Path]
_name: str
_artifact_id: str
_model_config: ModelConfig
_parallel_config: ParallelConfig
_compiler_config: CompilerConfig
_artifact_config: ArtifactConfig
# Resolved
_model_metadata: ModelMetadata
_max_model_len: int
_device_mesh: List[Device]
_buckets: ResolvedBuckets
def __init__(
self,
model_id_or_path: Union[str, Path],
name: str = "",
*,
model_config: Optional[ModelConfig] = None,
parallel_config: Optional[ParallelConfig] = None,
bucket_config: Optional[BucketConfig] = None,
compiler_config: Optional[CompilerConfig] = None,
artifact_config: Optional[ArtifactConfig] = None,
):
# ── Config ────────────────────────────────────────────────────
if model_config is None:
model_config = ModelConfig()
if parallel_config is None:
parallel_config = ParallelConfig()
if bucket_config is None:
bucket_config = BucketConfig()
if compiler_config is None:
compiler_config = CompilerConfig()
if artifact_config is None:
artifact_config = ArtifactConfig()
self._model_id_or_path = model_id_or_path
self._name = name
self._artifact_id = str(uuid.uuid4())
self._model_config = model_config
self._parallel_config = parallel_config
self._compiler_config = compiler_config
self._artifact_config = artifact_config
hf_config = load_hf_config(
model_id_or_path,
trust_remote_code=model_config.trust_remote_code,
revision=model_config.revision,
hf_overrides=model_config.hf_overrides,
)
# ── Validate ──────────────────────────────────────────────────
validate_artifact_config(artifact_config, model_id_or_path=str(model_id_or_path))
validate_parallel_config(parallel_config)
if bucket_config.has_explicit_buckets() and not bucket_config.skip_validation:
validate_bucket_config(bucket_config)
validate_hf_config(hf_config)
# ── Resolve ───────────────────────────────────────────────────
self._model_metadata = resolve_model_metadata(
model_id_or_path=model_id_or_path,
model_config=model_config,
hf_config=hf_config,
)
self._max_model_len: int = resolve_max_model_len(hf_config, model_config.max_model_len)
self._device_mesh = resolve_device_mesh(parallel_config)
self._buckets = ResolvedBuckets.resolve(
self._model_metadata, bucket_config, self._max_model_len
)
def _build_model_artifact(
self,
*,
num_pipeline_builder_workers: int,
num_compile_workers: int,
num_cpu_per_pipeline_build_worker: int,
num_cpu_per_compile_worker: int,
cache_dir: Optional[os.PathLike] = CACHE_DIR,
param_file_path: Optional[Union[os.PathLike, str]] = None,
param_saved_format: Literal["safetensors", "pt"] = "safetensors",
param_file_max_shard_size: Optional[Union[str, int]] = "5GB",
_use_composable_kernel: bool = False,
_use_activation_dq: bool = False,
_cleanup: bool = True,
_raise_error_if_compile: bool = False,
_includes_composable_ir: bool = False,
**kwargs,
) -> Tuple[
NextGenModelArtifact,
Union[List[Pipeline], List[next_gen.Pipeline]],
]:
if self._model_metadata.attention_type is AttentionType.PAGED_ATTENTION:
assert (
self._compiler_config.paged_attention_block_size == 1
), "Currently, only paged attention with block_size=1 is supported."
paged_attention_config: Optional[PagedAttentionConfig] = PagedAttentionConfig(
self._compiler_config.paged_attention_block_size,
DEFAULT_PAGED_ATTENTION_PADDING_BLOCK_IDX,
)
else:
paged_attention_config = None
model_config = self._model_metadata.config
# Please refer to an example at https://huggingface.co/docs/transformers/en/main_classes/text_generation#transformers.GenerationMixin.greedy_search.example
# Some models like GPT-2 may not have pad_token_id. BTW, when we run a batch of sequence generations,
# We must need pad_token_id to fill the batch with pad. With Hugging Face Transformers,
# users should handle this issue. Our goal is to provide a better useability for users.
# We handle this issue within LLM class.
model_config.pad_token_id = model_config.eos_token_id
original_model_type = self._model_metadata.get_optimized_cls()
model_creation_info = ModelCreationInfo(
self._model_metadata,
False if self._model_config.seed_for_random_weight is None else True,
seed=self._model_config.seed_for_random_weight,
)
if _cleanup:
self.tmp_dir = tempfile.TemporaryDirectory()
tmp_dir_path = Path(self.tmp_dir.name)
else:
tmp_dir_path = Path(tempfile.mkdtemp())
if param_file_path:
param_file_metadata = ParamFileMetadata.load(
param_file_path, ParamfileFormat.from_str(param_saved_format)
)
else:
if cache_dir and model_creation_info.is_hashable():
param_file_cache_dir = Path(cache_dir) / PARAM_FILE_CACHE_SUBDIR_NAME
param_file_metadata = get_param_file_with_cache(
model_creation_info,
param_file_cache_dir,
max_shard_size=param_file_max_shard_size,
)
else:
assert isinstance(tmp_dir_path, Path)
param_file_metadata = get_param_file_with_cache(
model_creation_info, tmp_dir_path, max_shard_size=param_file_max_shard_size
)
cache_dir = None if cache_dir is None else Path(cache_dir)
# For now, `PipelineParallelismMppp` supports all valid cases because only pipeline parallelism is needed to be expressed within one pipeline.
mppp = kwargs.pop("mppp", None) or PipelineParallelismMppp()
logger.info(
f"Attention buckets: {pformat([*self._buckets.prefill_buckets, *self._buckets.decode_buckets, *self._buckets.append_buckets])}"
)
if self._model_config.trust_remote_code and num_pipeline_builder_workers > 1:
# Ensure remote model config is registered in transformers for parallel pipeline building.
self._model_metadata.config
maybe_register_config_serialize_by_value()
if _use_composable_kernel:
build_config_generator = ComposableKernelPipelineBuildConfigGenerator(
attention_buckets=list(
chain(
self._buckets.prefill_buckets,
self._buckets.decode_buckets,
self._buckets.append_buckets,
)
),
tokenwise_seq_lens=self._buckets.tokenwise_seq_lens,
)
pipeline = next_gen.build_pipeline(
f"{original_model_type.__module__}.{original_model_type.__name__}",
model_creation_info,
self._device_mesh,
"precommandgen+edf" if _includes_composable_ir else "edf",
mppp,
build_config_generator,
param_file_metadata,
tmp_dir_path,
paged_attention_block_size=(
paged_attention_config.block_size if paged_attention_config else None
),
padding_block_idx=(
paged_attention_config.padding_block_idx if paged_attention_config else None
),
# TODO: expose valid_length_input_tensor option.
add_valid_length_input_tensor=False,
use_activation_dq=_use_activation_dq,
logits_slice_config=None,
embed_all_constants_into_graph=self._compiler_config.embed_all_constants_into_graph,
compiler_config_overrides=self._compiler_config.compiler_config_overrides,
cache_dir=cache_dir,
num_pipeline_builder_workers=num_pipeline_builder_workers,
num_compile_workers=num_compile_workers,
num_cpu_per_pipeline_build_worker=num_cpu_per_pipeline_build_worker,
num_cpu_per_compile_worker=num_cpu_per_compile_worker,
_raise_error_if_compile=_raise_error_if_compile,
)[0]
pipe_meta = ComposableKernelMetadata(
attention_buckets=list(build_config_generator.attention_buckets),
tokenwise_buckets=list(self._buckets.tokenwise_seq_lens),
composition_type=CompositionType.KERNELWISE,
)
return (
NextGenModelArtifact(
model_metadata=self._model_metadata,
parallel_config=self._parallel_config,
pipelines=[],
pipeline_metadata_list=[pipe_meta],
max_prompt_len=None,
),
[pipeline],
)
else:
raise ValueError("Block-wise artifact is not supported anymore.")
[docs]
def build(
self,
save_dir: Union[str, os.PathLike],
*,
num_pipeline_builder_workers: int = 1,
num_compile_workers: int = 1,
num_cpu_per_pipeline_build_worker: int = 1,
num_cpu_per_compile_worker: int = 1,
cache_dir: Optional[os.PathLike] = CACHE_DIR,
param_file_path: Optional[os.PathLike] = None,
param_saved_format: Literal["safetensors", "pt"] = "safetensors",
param_file_max_shard_size: Optional[Union[str, int]] = "5GB",
_cleanup: bool = True,
_raise_error_if_compile: bool = False,
**kwargs,
):
"""Build the artifacts for given model configurations.
Args:
save_dir: The path to save the artifacts. With artifacts, you can create ``LLM`` without quantizing or compiling the model again.
num_pipeline_builder_workers: The number of workers used for building pipelines (except for compilation). The default is 1 (no parallelism).
Setting this value larger than 1 reduces pipeline building time, especially for large models, but requires much more memory.
num_compile_workers: The number of workers used for compilation. The default is 1 (no parallelism).
num_cpu_per_pipeline_build_worker : The number of cpu cores allocated for each pipeline build worker. The default is 1.
num_cpu_per_compile_worker : The number of cpu cores allocated for each compile worker. The default is 1.
cache_dir: The cache directory for all generated files for this LLM instance.
When its value is ``None``, caching is disabled. The default is "$HOME/.cache/furiosa/llm".
param_file_path: The path to the parameter file to use for pipeline generation.
If not specified, the parameters will be saved in a temporary file which will be
deleted when ``LLM`` is destroyed.
param_saved_format: The format of the parameter file. Only possible value is "safetensors" now.
The default is "safetensors".
param_file_max_shard_size: The maximum size of single parameter file. Parameter file will be
split into smaller files to be less than this size. The default is "5GB".
"""
from furiosa.native_common.compiler import compiler_git_short_hash
metadata = ArtifactMetadata( # type: ignore[assignment]
artifact_id=self._artifact_id,
name=self._name,
timestamp=int(time.time()),
furiosa_llm_version=_get_commit_id(),
furiosa_compiler_version=compiler_git_short_hash(),
includes_composable_ir=self._compiler_config.includes_composable_ir,
)
# Make sure the model local, get a model hash, and set it to ModelMetadata
self._model_metadata.ensure_model_and_update_weight_hash()
target_model_artifact, target_model_pipelines = self._build_model_artifact(
num_pipeline_builder_workers=num_pipeline_builder_workers,
num_compile_workers=num_compile_workers,
num_cpu_per_pipeline_build_worker=num_cpu_per_pipeline_build_worker,
num_cpu_per_compile_worker=num_cpu_per_compile_worker,
cache_dir=cache_dir,
param_file_path=param_file_path,
param_saved_format=param_saved_format,
_cleanup=_cleanup,
param_file_max_shard_size=param_file_max_shard_size,
_use_composable_kernel=self._compiler_config.composable_kernel,
_use_activation_dq=self._compiler_config.use_activation_dq,
_includes_composable_ir=self._compiler_config.includes_composable_ir,
_raise_error_if_compile=_raise_error_if_compile,
**kwargs,
)
if self._compiler_config.composable_kernel:
assert isinstance(target_model_artifact, NextGenModelArtifact)
artifact: NextGenArtifact = NextGenArtifact(
metadata=metadata,
model=target_model_artifact,
generator_config=next_gen_artifact_types.GeneratorConfig(
num_speculative_tokens=None,
),
version=next_gen_artifact_types.SCHEMA_VERSION,
)
else:
raise ValueError("Block-wise artifact is not supported anymore.")
ArtifactBuilder.__save_artifacts(
save_dir,
artifact,
target_model_pipelines,
self._artifact_config.copies_from_model,
self._artifact_config.copies_from_local,
self._artifact_config.bundle_binaries,
)
@staticmethod
def __preprocess_for_pipeline_save(
pipelines: Any,
path: Union[str, os.PathLike],
bundle_binaries: bool = True,
) -> List[Dict[str, Any]]:
from furiosa.native_common.compiler import CompiledGraph
def save_blob_to_zip(zip_file: ZipFile, file_name: str, blob: Any) -> None:
zip_file.writestr(file_name, blob)
def save_blob_to_dir(base_path: str, file_name: str, blob: Any) -> None:
mode = "w" if isinstance(blob, str) else "wb"
with open(os.path.join(base_path, file_name), mode) as f:
f.write(blob)
def copy_param_file(
param_file_info: ParamFileInfo, base_path: Union[str, os.PathLike]
) -> None:
param_file_path = Path(param_file_info.path)
parent_path = os.fspath(param_file_path.parent)
if _INDEX_TENSOR_NAME in os.listdir(parent_path):
# This param file is a shard in saftensors file set.
# Copy whole directory
new_parent_path = f"{base_path}/{os.path.basename(parent_path)}"
if not os.path.exists(new_parent_path):
shutil.copytree(parent_path, new_parent_path)
new_file_path = os.path.join(new_parent_path, os.path.basename(param_file_path))
else:
new_file_path = f"{base_path}/{os.path.basename(param_file_path)}"
if not os.path.exists(new_file_path):
shutil.copy(param_file_path, new_file_path)
param_file_info.path = os.path.relpath(new_file_path, base_path)
processed_pipelines = []
zip_path = os.path.join(path, f"{BINARY_BUNDLE_ZIP_FILE_NAME}.zip")
save_ftn = save_blob_to_zip if bundle_binaries else save_blob_to_dir
# NOTE: Each EDF file can be several gigabytes in size, so we should avoid
# unnecessarily storing EDF blob data elsewhere in the program (e.g., in a temporary dictionary).
zip_mode = "a" if os.path.exists(zip_path) else "w"
with ZipFile(zip_path, mode=zip_mode) if bundle_binaries else nullcontext() as zip_file: # type: ignore[call-overload]
save_target = zip_file if bundle_binaries else path
for pipeline in pipelines:
for id, blob in pipeline.blobs.items():
kind = pipeline.get_blob_kind().get(id)
if kind in (SuperTaskKind.FX, next_gen.TaskKind.FX):
assert isinstance(blob, str)
file_names = [f"{id}.fx"]
data = [blob]
elif kind in (SuperTaskKind.EDF, next_gen.TaskKind.EDF):
assert isinstance(blob, CompiledGraph)
file_names = [f"{id}.edf"]
data = [blob.serialize()] # type: ignore[list-item]
elif kind is next_gen.TaskKind.PRECOMMANDGEN_EDF:
assert pipeline.composable_blobs
assert isinstance(pipeline.composable_blobs.get(id), CompiledGraph)
file_names = [
f"{id}.edf",
f"{id}.precommandgen",
]
data = [blob.serialize(), pipeline.composable_blobs[id].serialize()] # type: ignore[assignment]
pipeline.composable_blobs[id] = None
else:
raise NotImplementedError(f"SuperTask {kind} is not supported to save")
for f, d in zip_equal(file_names, data):
save_ftn(save_target, f, d) # type: ignore[arg-type]
pipeline.blobs[id] = None
for param_file_info in pipeline.param_files.values():
copy_param_file(param_file_info, path)
processed_pipelines.append(json.loads(pipeline.to_json()))
return processed_pipelines
@staticmethod
def __save_artifacts(
path: Union[str, os.PathLike],
artifact: NextGenArtifact,
target_model_pipelines: Union[Sequence[Pipeline], Sequence[next_gen_pipe.Pipeline]],
copies_from_model: Optional[Sequence[str]],
copies_from_local: Optional[Sequence[os.PathLike]],
bundle_binaries: bool = True,
):
import shutil
from huggingface_hub import hf_hub_download
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
artifact.model = artifact.model.model_copy(
update={
"pipelines": ArtifactBuilder.__preprocess_for_pipeline_save(
target_model_pipelines, path, bundle_binaries
),
},
)
artifact.export(f"{path}/artifact.json")
model_metadata = artifact.model.model_metadata
assert isinstance(model_metadata, ModelMetadata)
# Save Transformers configs to make an artifact more compatible with Hugging Face Transformers
assert model_metadata._model_id_or_path is not None
tokenizer = get_tokenizer(model_metadata._model_id_or_path)
tokenizer.save_pretrained(path)
# Save config.json for target model
hf_config = model_metadata.config
hf_config.save_pretrained(path)
# Save generation_config.json if present in the source model
save_generation_config(model_metadata._model_id_or_path, path)
# Usually, this copies important files like README.md, LICENSE.txt, etc.
# So, we should fail if copying fails.
if copies_from_model is not None:
for hub_file in copies_from_model:
cache_path = hf_hub_download(str(model_metadata._model_id_or_path), hub_file)
shutil.copy(cache_path, path)
if copies_from_local is not None:
for local_file in copies_from_local:
shutil.copy(local_file, path)
# Backward-compatible alias — use resolve_model_metadata from .resolver instead.
get_model_metadata_from_model_id = resolve_model_metadata