Source code for lmflow.models.hf_model_mixin

#!/usr/bin/env python
# coding=utf-8
# Copyright 2024 Statistics and Machine Learning Research Group. All rights reserved.
import gc
import os
import logging
from typing import Union, Optional, Dict, List
import copy

import torch
import deepspeed
from transformers import (
    CONFIG_MAPPING,
    AutoConfig,
    BitsAndBytesConfig,
    AutoTokenizer,
    PreTrainedTokenizer,
    PreTrainedTokenizerFast,
    AutoModelForCausalLM,
    AutoModelForSequenceClassification,
)
from peft import (
    LoraConfig,
    PeftModel,
    TaskType,
    get_peft_model,
    prepare_model_for_kbit_training
)
from peft.utils.constants import TRANSFORMERS_MODELS_TO_LORA_TARGET_MODULES_MAPPING

from lmflow.models.base_model import BaseModel
from lmflow.utils.constants import (
    LMFLOW_LORA_TARGET_MODULES_MAPPING
)
from lmflow.args import ModelArguments
from lmflow.utils.versioning import is_vllm_available

if is_vllm_available():
    from vllm import LLM, SamplingParams
    from vllm.distributed.parallel_state import destroy_model_parallel


[docs] logger = logging.getLogger(__name__)
[docs] HF_AUTOMODEL_MAPPING = { "decoder_only": AutoModelForCausalLM, "text_regression": AutoModelForSequenceClassification }
[docs] HF_AUTOMODEL_TYPE = Union[AutoModelForCausalLM, AutoModelForSequenceClassification]
[docs] LORA_TARGET_MODULES_MAPPING = { k: TRANSFORMERS_MODELS_TO_LORA_TARGET_MODULES_MAPPING.get(k, LMFLOW_LORA_TARGET_MODULES_MAPPING.get(k)) for k in set(TRANSFORMERS_MODELS_TO_LORA_TARGET_MODULES_MAPPING) | set(LMFLOW_LORA_TARGET_MODULES_MAPPING) }
[docs] class HFModelMixin(BaseModel): def __init__( self, model_args: ModelArguments, do_train: bool, ds_config=None, device: Optional[str]="gpu", use_accelerator: bool=False, hf_auto_model_additional_args: Optional[Dict]=None, *args, **kwargs ): """Initializes a HFModel instance. Parameters ---------- model_args : Dictionary with model arguments such as model name, path, revision, etc. do_train : bool To prepare the model for training or inference. ds_config : optional Deepspeed configuration for distributed training, by default None device : str, optional By default "gpu" use_accelerator : bool, optional By default False """ # See more about loading any type of standard or custom dataset (from # files, python dict, pandas DataFrame, etc) at # https://huggingface.co/docs/datasets/loading_datasets.html. # Load pretrained model and tokenizer # # Distributed training: The .from_pretrained methods guarantee that # only one local process can concurrently download model & vocab.
[docs] self.device = device
[docs] self.model_args = model_args
[docs] self.hf_auto_model = HF_AUTOMODEL_MAPPING[model_args.arch_type]
[docs] self.use_accelerator = use_accelerator
[docs] self.ds_config = ds_config
[docs] self.do_train = do_train
[docs] self.tokenizer = self.__prepare_tokenizer(model_args)
[docs] self.torch_dtype = self.__prepare_dtype(model_args)
[docs] self.hf_model_config = self.__prepare_model_config(model_args, hf_auto_model_additional_args)
[docs] self.quant_config = self.__prepare_quant_config(model_args)
[docs] self.peft_config = self.__prepare_peft_config(model_args)
[docs] self._activated = False # for inference load and offload
# Some implementations require custom modules to be injected into the model. self.__model_module_inject(model_args) if self.do_train: self.__prepare_model_for_training(model_args, self.hf_auto_model)
[docs] def __prepare_tokenizer( self, model_args: ModelArguments, ) -> Union[PreTrainedTokenizer, PreTrainedTokenizerFast]: tokenizer_name = model_args.tokenizer_name or model_args.model_name_or_path if not tokenizer_name: raise ValueError( "You are instantiating a new tokenizer from scratch. This is" " not supported by this script. You can do it from another" " script, save it, and load it from here, using" " --tokenizer_name." ) tokenizer_kwargs = { "cache_dir": model_args.cache_dir, "use_fast": model_args.use_fast_tokenizer, "revision": model_args.model_revision, "token": model_args.token, "trust_remote_code": model_args.trust_remote_code, } if model_args.padding_side != 'auto': tokenizer_kwargs["padding_side"] = model_args.padding_side try: tokenizer = AutoTokenizer.from_pretrained(tokenizer_name, **tokenizer_kwargs) except RecursionError: logger.warning( "The tokenizer_config.json file doesn't set the special tokens. Using default values: " "<unk>, <s>, </s> for unknown token, bos token and eos token respectively.") tokenizer = AutoTokenizer.from_pretrained(tokenizer_name, unk_token="<unk>", bos_token="<s>", eos_token="</s>", **tokenizer_kwargs) tokenizer.truncation_side = model_args.truncation_side or tokenizer.truncation_side tokenizer.model_max_length = model_args.model_max_length or tokenizer.model_max_length return tokenizer
[docs] def __prepare_dtype( self, model_args: ModelArguments, ) -> torch.dtype: if model_args.arch_type == 'text_regression': if model_args.torch_dtype in ["auto", None, "bf16", "bfloat16"]: torch_dtype = torch.bfloat16 else: torch_dtype = getattr(torch, model_args.torch_dtype) logger.warning( f"If you are doing reward modeling," f" InstructGPT uses torch.bfloat16 for reward model, but you" f" are using {torch_dtype} for your reward model init. Ignore" f" this warning if it is intended.") else: torch_dtype = ( model_args.torch_dtype if model_args.torch_dtype in ["auto", None] else getattr(torch, model_args.torch_dtype) ) logger.debug(f"torch_dtype on init: {torch_dtype}") return torch_dtype
[docs] def __prepare_model_config( self, model_args: ModelArguments, hf_auto_model_additional_args: Optional[Dict]=None, ): """Prepare model configuration for hf auto register, Parameters ---------- model_args : ModelArguments LMFlow model arguments. hf_auto_model_additional_args : Optional[Dict], optional Special configurations such as `num_labels` in `AutoModelForSequenceClassification` (commonly used in reward modeling) will not preset in __prepare_model_config, so it should be passed in hf_auto_model_additional_args. Returns ------- config : ModelConfig hf model config. """ config_kwargs = { "attn_implementation": "flash_attention_2" if model_args.use_flash_attention else None, "cache_dir": model_args.cache_dir, "revision": model_args.model_revision, "token": model_args.token, "trust_remote_code": model_args.trust_remote_code, "from_tf": bool(".ckpt" in model_args.model_name_or_path), } if hf_auto_model_additional_args is not None: config_kwargs.update(hf_auto_model_additional_args) if model_args.config_name: config = AutoConfig.from_pretrained(model_args.config_name, **config_kwargs) elif model_args.model_name_or_path: config = AutoConfig.from_pretrained(model_args.model_name_or_path, **config_kwargs) else: config = CONFIG_MAPPING[model_args.model_type]() logger.warning("You are instantiating a new config instance from scratch.") if model_args.config_overrides is not None: logger.info(f"Overriding config: {model_args.config_overrides}") config.update_from_string(model_args.config_overrides) logger.info(f"New config: {config}") return config
[docs] def __prepare_quant_config( self, model_args: ModelArguments, ): quant_config = None if self.do_train: if model_args.use_qlora: quant_config = BitsAndBytesConfig( load_in_4bit=model_args.bits == 4, load_in_8bit=model_args.bits == 8, llm_int8_threshold=6.0, llm_int8_has_fp16_weight=False, bnb_4bit_compute_dtype=self.torch_dtype, bnb_4bit_use_double_quant=model_args.double_quant, bnb_4bit_quant_type=model_args.quant_type, ) else: # inference if model_args.use_int8: quant_config = BitsAndBytesConfig( load_in_8bit = model_args.use_int8, ) return quant_config
[docs] def __prepare_peft_config( self, model_args: ModelArguments, ): peft_config = None if model_args.use_lora: if model_args.lora_target_modules: lora_target_modules = model_args.lora_target_modules else: model_config = self.hf_model_config if hasattr(model_config, "to_dict"): model_config = model_config.to_dict() if "model_type" not in model_config or not model_config["model_type"]: logger.warning("It seems that your base model is a custom model, since " "model_type is not found in model_config when preparing peft config. " "Setting model_type to 'custom' as a fallback.") model_config["model_type"] = "custom" lora_target_modules = LORA_TARGET_MODULES_MAPPING.get(model_config["model_type"], None) peft_config = LoraConfig( task_type=TaskType.CAUSAL_LM, inference_mode=False, r=model_args.lora_r, lora_alpha=model_args.lora_alpha, lora_dropout=model_args.lora_dropout, target_modules=lora_target_modules, ) return peft_config
[docs] def __model_module_inject( self, model_args: ModelArguments, ) -> None: """Override some model modules with custom implementations. Current implementations: - Position interpolation (model_args.do_rope_scaling): replace llama embeddings with condense embeddings. """ # position interpolation if model_args.do_rope_scaling: if "LlamaForCausalLM" in self.model_config.architectures: from lmflow.utils.position_interpolation.llama_rope_scaled_monkey_patch import ( replace_llama_with_condense, ) replace_llama_with_condense(model_args.rope_pi_ratio, model_args.rope_ntk_ratio)
[docs] def __prepare_model_for_training( self, model_args: ModelArguments, hf_auto_model: HF_AUTOMODEL_TYPE, ): assert self.do_train, "To prepare the model for training, please set do_train=True." # TODO: change to accelerate logger.info("Preparing model for training") if model_args.model_name_or_path: model = hf_auto_model.from_pretrained( model_args.model_name_or_path, torch_dtype=self.torch_dtype, config=self.hf_model_config, quantization_config=self.quant_config, ) if model_args.use_qlora: model.gradient_checkpointing_enable() model = prepare_model_for_kbit_training(model) else: model = hf_auto_model.from_config(self.hf_model_config) n_params = sum(dict((p.data_ptr(), p.numel()) for p in model.parameters()).values()) logger.info(f"Training new model from scratch - Total size={n_params/2**20:.2f}M params") self.backend_model_full = model if model_args.ignore_bias_buffers: # torch distributed hack # fix for DDP issues with LM bias/mask buffers - invalid scalar type, inplace operation. # See: https://github.com/huggingface/transformers/issues/22482#issuecomment-1595790992 model._ddp_params_and_buffers_to_ignore = [ name for name, buffer in model.named_buffers() if buffer.dtype == torch.bool ] if model_args.use_lora: model.enable_input_require_grads() if model_args.lora_model_path is not None: # Load model from LoRA weights model = PeftModel.from_pretrained( model, model_args.lora_model_path, is_trainable=True, ) else: # New LoRA Finetuning model = get_peft_model(model, self.peft_config) model.print_trainable_parameters() # We resize the embeddings only when necessary to avoid index errors. # If you are creating a model from scratch on a small vocab and want a # smaller embedding size, remove this test. with deepspeed.zero.GatheredParameters(model.get_input_embeddings().weight, modifier_rank=None): weights = model.get_input_embeddings().weight embedding_size = weights.shape[0] if len(self.tokenizer) > embedding_size: model.resize_token_embeddings(len(self.tokenizer)) self.backend_model = model self.__prepare_model_post_process()
[docs] def __prepare_model_for_inference( self, model_args: ModelArguments, hf_auto_model: HF_AUTOMODEL_TYPE, use_accelerator: bool, ds_config ): logger.info(f"Backend model already initialized, moving to device: {self.device}") if hasattr(self, "backend_model"): if self.backend_model.device == torch.device("cpu"): self.backend_model.to(self.device) return # TODO: change to accelerate logger.info("Preparing model for inference") inference_load_kwargs = {} inference_load_kwargs_bak = copy.deepcopy(inference_load_kwargs) ram_optimized_load_kwargs = { "device_map": "auto", "offload_folder": "offload", "offload_state_dict": True, } if use_accelerator or model_args.use_ram_optimized_load: inference_load_kwargs.update(ram_optimized_load_kwargs) if not use_accelerator: from transformers.integrations import HfDeepSpeedConfig dschf = HfDeepSpeedConfig(ds_config) try: self.backend_model = hf_auto_model.from_pretrained( model_args.model_name_or_path, torch_dtype=self.torch_dtype, config=self.hf_model_config, quantization_config=self.quant_config, **inference_load_kwargs, ) except: logger.warning( "Failed to use RAM optimized load. Using original load instead." ) self.backend_model = hf_auto_model.from_pretrained( model_args.model_name_or_path, torch_dtype=self.torch_dtype, config=self.hf_model_config, quantization_config=self.quant_config, **inference_load_kwargs_bak, ) self.backend_model_full = self.backend_model if model_args.lora_model_path is not None: self.backend_model = PeftModel.from_pretrained( self.backend_model, model_args.lora_model_path, ) if (not use_accelerator) and self.device == "gpu": deepspeed.init_distributed() self.ds_engine = deepspeed.initialize(model=self.backend_model, config_params=ds_config)[0] self.ds_engine.module.eval() self.__prepare_model_post_process()
[docs] def __prepare_model_for_vllm_inference( self, model_args: ModelArguments, vllm_gpu_memory_utilization: float, vllm_tensor_parallel_size: int, ): if not is_vllm_available(): raise ImportError('VLLM is not available. Please install via `pip install -e ".[vllm]"`.') self.backend_model_for_inference = LLM( model=model_args.model_name_or_path, tokenizer=model_args.model_name_or_path, dtype=model_args.torch_dtype if model_args.torch_dtype else "auto", load_format="auto", gpu_memory_utilization=vllm_gpu_memory_utilization, tensor_parallel_size=vllm_tensor_parallel_size, )
[docs] def __prepare_model_post_process(self): # old models/tokenizers may not have these attributes, fixing if self.tokenizer.eos_token is None: self.tokenizer.eos_token = self.backend_model.config.eos_token if self.tokenizer.eos_token_id is None: self.tokenizer.eos_token_id = self.backend_model.config.eos_token_id if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token if self.tokenizer.pad_token_id is None: self.tokenizer.pad_token_id = self.tokenizer.eos_token_id if self.model_args.eos_padding: self.tokenizer.pad_token = self.tokenizer.eos_token if not hasattr(self.backend_model.config, "pad_token_id"): logger.warning("pad_token_id not found in model config. Setting pad_token_id to eos_token_id.") self.backend_model.config.pad_token_id = self.backend_model.config.eos_token_id elif self.backend_model.config.pad_token_id is None: logger.warning("pad_token_id is None in model config. Setting pad_token_id to eos_token_id.") self.backend_model.config.pad_token_id = self.backend_model.config.eos_token_id
[docs] def activate_model_for_inference( self, use_vllm: bool=False, **kwargs, ): if self._activated: logger.warning("You are trying to activate the model for inference, but it is already activated.") return if use_vllm: self.__prepare_model_for_vllm_inference( model_args=self.model_args, vllm_gpu_memory_utilization=kwargs.get("vllm_gpu_memory_utilization"), vllm_tensor_parallel_size=kwargs.get("vllm_tensor_parallel_size"), ) else: self.__prepare_model_for_inference( model_args=self.model_args, hf_auto_model=self.hf_auto_model, use_accelerator=self.use_accelerator, ds_config=self.ds_config, ) self._activated = True
[docs] def deactivate_model_for_inference( self, use_vllm: bool=False, ): """Deactivate the model and release the resources. NOTE: Currently, VLLM doesn't have an official way to do this, and the implementation below cannot release all gpu resources by our observation. Thus this method is just a placeholder for future implementation. See: [Github issue](https://github.com/vllm-project/vllm/issues/1908) """ if not self._activated: logger.warning("You are trying to deactivate the model for inference, but it is already deactivated.") return if use_vllm: destroy_model_parallel() del self.backend_model_for_inference.llm_engine.model_executor.driver_worker del self.backend_model_for_inference gc.collect() torch.cuda.empty_cache() else: self.backend_model.to("cpu") pass self._activated = False
[docs] def get_max_length(self): """ Return max acceptable input length in terms of tokens. """ return self.tokenizer.model_max_length
[docs] def get_tokenizer(self): """ Return the tokenizer of the model. """ return self.tokenizer
[docs] def get_backend_model(self): """ Return the backend model. """ return self.backend_model