#!/usr/bin/env python
# coding=utf-8
# Copyright 2024 Statistics and Machine Learning Research Group. All rights reserved.
import copy
import hashlib
import logging
from typing import List, Union, Dict, Optional
import torch
from transformers.modeling_outputs import SequenceClassifierOutputWithPast
from lmflow.args import ModelArguments
from lmflow.datasets.dataset import Dataset, KEY_SCORE
from lmflow.models.interfaces.tunable import Tunable
from lmflow.models.hf_model_mixin import HFModelMixin
from lmflow.models.text_regression_model import TextRegressionModel
from lmflow.tokenization.hf_text_regression_model import (
paired_conversation_tokenize_function,
conversation_tokenize_function,
tokenize_function,
text_to_textlist_tokenize_function,
)
from lmflow.utils.conversation_template import PRESET_TEMPLATES
from lmflow.utils.constants import (
PAIRED_CONVERSATION_DATASET_DESCRIPTION,
TEXT2TEXT_DATASET_DESCRIPTION,
TEXT_ONLY_DATASET_DESCRIPTION,
TEXT_TO_TEXTLIST_DATASET_DESCRIPTION,
CONVERSATION_DATASET_DESCRIPTION,
)
from lmflow.utils.data_utils import RewardModelInferenceResultWithInput
from lmflow.utils.versioning import is_ray_available, is_vllm_available
from lmflow.utils.envs import is_accelerate_env
if is_ray_available():
import ray
import ray.data
if is_vllm_available():
from vllm import SamplingParams
[docs]
logger = logging.getLogger(__name__)
[docs]
class HFTextRegressionModel(TextRegressionModel, HFModelMixin, Tunable):
r"""
Initializes a HFTextRegressionModel instance.
Parameters
------------
model_args :
Model arguments such as model name, path, revision, etc.
do_train : bool, default True
Determines whether to prepare the model for training, including distribtued env, model placement, quantization,
lora, etc.
args : Optional.
Positional arguments.
kwargs : Optional.
Keyword arguments.
"""
def __init__(
self,
model_args: ModelArguments,
do_train: bool = False,
device = "gpu",
*args,
**kwargs
):
assert model_args.arch_type == "text_regression", (
f"Invalid model architecture type: {model_args.arch_type}. "
f"Expected: text_regression"
)
config_additional_args = {"num_labels": 1}
HFModelMixin.__init__(
self,
model_args=model_args,
do_train=do_train,
device=device,
hf_auto_model_additional_args=config_additional_args,
*args,
**kwargs
)
[docs]
def tokenize(
self,
dataset: Dataset,
add_special_tokens=True,
*args,
**kwargs
):
"""
Tokenize the full dataset.
Parameters
------------
dataset : lmflow.datasets.Dataset.
args : Optional.
Positional arguments.
kwargs : Optional.
Keyword arguments.
Returns
------------
tokenized_datasets :
The tokenized dataset, without any leading or trailing special
tokens (normally they are Begin-Of-Sentence or End-Of-Sentence
tokens).
"""
# Preprocessing the datasets.
# First we tokenize all the texts.
if dataset.get_backend() != "huggingface":
raise NotImplementedError(
"tokenization of datasets with non-huggingface backend are"
"not supported yet"
)
dataset_type = dataset.get_type()
model_args = self.model_args
raw_datasets = dataset
hf_raw_datasets = dataset.get_backend_dataset()
column_names = list(hf_raw_datasets.features) # in paired conversation, for example, would be 'chosen' and 'rejected'
data_args = raw_datasets.get_data_args()
# Whether to truncate long sequences to fit into max_length
use_truncation = False
if model_args.use_lora or data_args.disable_group_texts:
use_truncation = True
# Requires three types of information for tokenizing different datasets
# 1) Which fields require tokenization, e.g.
# "text2float": "text", but not "float"
# "text2text": both "input" and "output"
# 2) How will there tokenized sequence concatenated together, e.g.
# "text_only": "text" -> "text"
# "text2text": "input", "output" -> "input" + "output"
# 3) Which fields require loss in final computation, e.g.
# "text_only": "text"
# "text2text": "output" only
tokenize_fn = None
tokenize_fn_kwargs = {
"data_args": data_args,
"tokenizer": self.tokenizer,
"column_names": column_names,
}
if dataset_type == "text_only":
tokenize_fn = tokenize_function
text_only_tokenize_fn_kwargs = {
"tokenized_column_order": ["text"],
"label_columns": ["text"],
"add_special_tokens": add_special_tokens,
"use_truncation": use_truncation,
}
tokenize_fn_kwargs.update(text_only_tokenize_fn_kwargs)
elif dataset_type == "text2text":
tokenize_fn = tokenize_function
text2text_tokenize_fn_kwargs = {
"tokenized_column_order": ["input", "output"],
"label_columns": ["output"],
"add_special_tokens": False,
"use_truncation": use_truncation,
}
tokenize_fn_kwargs.update(text2text_tokenize_fn_kwargs)
elif dataset_type in ["conversation", "paired_conversation"]:
if dataset_type == "conversation":
tokenize_fn = conversation_tokenize_function
elif dataset_type == "paired_conversation":
tokenize_fn = paired_conversation_tokenize_function
if data_args.conversation_template:
if data_args.conversation_template in PRESET_TEMPLATES.keys():
conversation_template = PRESET_TEMPLATES[data_args.conversation_template]
else:
raise NotImplementedError(
f"Conversation template {data_args.conversation_template} is not supported yet."
)
else:
logger.warning("No conversation template provided. Using default template.")
conversation_template = PRESET_TEMPLATES['empty']
tokenize_fn_kwargs["conversation_template"] = conversation_template
logger.warning(f"Conversation template: {conversation_template}")
elif dataset_type == "text_to_textlist":
tokenize_fn = text_to_textlist_tokenize_function
text_to_textlist_tokenize_fn_kwargs = {
"add_special_tokens": False,
"use_truncation": use_truncation,
}
tokenize_fn_kwargs.update(text_to_textlist_tokenize_fn_kwargs)
else:
raise NotImplementedError(
f"Dataset type \"{dataset_type}\" is not supported, currently"
" only support following data types for HFTextRegressionModel:\n"
f" 1) [Inference]{TEXT_ONLY_DATASET_DESCRIPTION}\n"
f" 2) [Inference]{TEXT2TEXT_DATASET_DESCRIPTION}\n"
f" 3) [Training]{PAIRED_CONVERSATION_DATASET_DESCRIPTION}\n"
f" 4) [Inference]{CONVERSATION_DATASET_DESCRIPTION}\n"
f" 5) [Inference]{TEXT_TO_TEXTLIST_DATASET_DESCRIPTION}\n"
)
tokenize_kwargs = {}
if not data_args.streaming:
fingerprint = hashlib.md5(
(
raw_datasets.get_fingerprint()
+ str(self.tokenizer)
+ f'###padding_side={self.tokenizer.padding_side}'
+ ('###conversation_template=' + str(conversation_template) if "conversation" in dataset_type else "")
+ f'###disable_group_texts={data_args.disable_group_texts}'
+ f'###block_size={data_args.block_size}'
).encode("utf-8")
).hexdigest()
tokenize_kwargs = {
"num_proc": data_args.preprocessing_num_workers,
"load_from_cache_file": not data_args.overwrite_cache,
"desc": "Running tokenizer on dataset",
"new_fingerprint": fingerprint,
}
tokenized_datasets = raw_datasets.map(
tokenize_fn,
batched=True,
remove_columns=column_names,
fn_kwargs=tokenize_fn_kwargs,
**tokenize_kwargs
)
return tokenized_datasets
[docs]
def inference(
self,
inputs,
release_gpu: bool = False,
use_vllm: bool = False,
**kwargs
) -> Union[List[float], SequenceClassifierOutputWithPast]:
"""
Perform generation process of the model.
Parameters
------------
inputs :
The sequence used as a prompt for the generation or as model inputs to the model.
When using vllm inference, this should be a string or a list of strings.
When using normal inference, this should be a tensor.
release_gpu : bool, optional
Whether to release the GPU resource after inference, by default False.
use_vllm : bool, optional
Whether to use VLLM for inference, by default False.
kwargs : Optional.
Keyword arguments.
Returns
------------
outputs :
The generated sequence output
"""
if use_vllm:
logger.warning(
"VLLM inference is not supported for text regression model, using normal inference instead."
)
use_vllm = False
if not self._activated:
self.activate_model_for_inference(
use_vllm=use_vllm,
**kwargs,
)
if use_vllm:
res = self.__vllm_inference(inputs, **kwargs)
else:
res = self.__inference(inputs, **kwargs)
if release_gpu:
self.deactivate_model_for_inference(use_vllm=use_vllm)
return res
[docs]
def __inference(
self,
inputs,
**kwargs
):
"""
Perform generation process of the model.
Parameters
------------
inputs :
The **tokenized** sequence used as a prompt for the generation or as model inputs to the model.
kwargs : Optional.
Keyword arguments.
Returns
------------
outputs :
The generated sequence output
"""
with torch.no_grad():
if is_accelerate_env():
outputs = self.backend_model(
input_ids=inputs,
**kwargs,
)
else:
if self.device == "gpu":
# for scripts that run using 'deepspeed script.py'
outputs = self.ds_engine.module(
input_ids=inputs,
synced_gpus=True,
**kwargs,
)
elif self.device == "cpu":
outputs = self.backend_model(
input_ids=inputs,
synced_gpus=True,
**kwargs,
)
else:
raise NotImplementedError(
f"device \"{self.device}\" is not supported"
)
if kwargs.get('return_input', False):
outputs = {"input": inputs, "output": outputs}
return outputs
[docs]
def __vllm_inference(
self,
inputs: Union[str, List[str]],
sampling_params: Optional['SamplingParams'] = None,
**kwargs,
) -> Union[List[List[str]], List[List[List[int]]]]:
"""Perform VLLM inference process of the model.
Parameters
----------
inputs : Union[str, List[str]]
Prompt(s), string or a list of strings.
sampling_params : Optional[SamplingParams], optional
vllm SamplingParams object, by default None.
Returns
-------
"""
raise NotImplementedError(
"VLLM inference is not supported for text regression model."
)
[docs]
def prepare_inputs_for_inference(
self,
dataset: Dataset,
enable_distributed_inference: bool = False,
use_vllm: bool = False,
**kwargs,
) -> Union[Dataset, 'ray.data.Dataset']:
if use_vllm:
raise NotImplementedError(
"VLLM inference is not supported for text regression model."
)
inference_inputs = self.tokenize(dataset)
if enable_distributed_inference:
if not is_ray_available():
raise ValueError(
'Ray is not available. Please install ray via `pip install -e ".[ray]"`.'
)
inference_inputs.sanity_check(drop_invalid=True)
inference_inputs = inference_inputs.get_backend_dataset()
inference_inputs = ray.data.from_items(inference_inputs)
# -> Dict[str, np.ndarray]
# Example (batch size=2):
# {'input': array(['...','...'], dtype=object),
# 'output': array([array(["...", "..."], dtype=object), array(['...','...'], dtype=object)], dtype=object),
# 'input_ids': array(
# [
# array([array([ 27, 91, 882, ..., 128256, 128256, 128256]),
# array([ 27, 91, 882, ..., 128256, 128256, 128256])],
# dtype=object),
# array([array([ 27, 91, 882, ..., 128256, 128256, 128256]),
# array([ 27, 91, 882, ..., 128256, 128256, 128256])],
# dtype=object)
# ], dtype=object)}
return inference_inputs
@staticmethod
[docs]
def postprocess_inference_outputs(
dataset: Dataset,
scores: Union[List[float], List[List[float]]],
):
output_dict = {"type": "", "instances": []}
if dataset.get_type() == "text_to_textlist":
output_dict["type"] = "text_to_scored_textlist"
for idx, instance in enumerate(dataset.get_backend_dataset()):
if len(instance["output"]) < 2:
logger.warning(f"Instance {idx} has less than 2 outputs, skipping.")
output_dict["instances"].append(
{
"input": instance["input"],
"output": [{"text": text} for text in instance["output"]],
}
)
else:
raise NotImplementedError(f"Dataset type {dataset.get_type()} is not supported for reward model inference.")
for i, instance_scores in enumerate(scores):
for j, score in enumerate(instance_scores):
output_dict["instances"][i]["output"][j][KEY_SCORE] = score
output_dataset_args = copy.deepcopy(dataset.data_args)
output_dataset_args.dataset_path = None
output_dataset_args.dataset_name = f"{output_dataset_args.dataset_name}_scored"
output_dataset = Dataset(output_dataset_args)
output_dataset = output_dataset.from_dict(output_dict)
return output_dataset
@staticmethod
[docs]
def postprocess_distributed_inference_outputs(
dataset: Dataset,
inference_result: List[RewardModelInferenceResultWithInput],
):
output_dict = {"type": "text_to_scored_textlist", "instances": inference_result}
output_dataset_args = copy.deepcopy(dataset.data_args)
output_dataset_args.dataset_path = None
output_dataset_args.dataset_name = f"{output_dataset_args.dataset_name}_scored"
output_dataset = Dataset(output_dataset_args)
output_dataset = output_dataset.from_dict(output_dict)
return output_dataset
[docs]
def save(self, dir, *args, **kwargs):
"""
Perform generation process of the model.
Parameters
------------
dir :
The directory to save model and tokenizer
kwargs : Optional.
Keyword arguments.
"""
self.get_tokenizer().save_pretrained(dir)
self.get_backend_model().save_pretrained(dir)