Source code for lmflow.utils.data_utils

"""The program includes several functions: setting a random seed, 
loading data from a JSON file, batching data, and extracting answers from generated text.
"""

import random
import numpy as np
import torch
import json
import re
from typing import Union, List, TypedDict, Dict


[docs] def set_random_seed(seed: int): """ Set the random seed for `random`, `numpy`, `torch`, `torch.cuda`. Parameters ------------ seed : int The default seed. """ random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed)
[docs] def load_data(file_name: str): """ Load data with file name. Parameters ------------ file_name : str. The dataset file name. Returns ------------ inputs : list. The input texts of the dataset. outputs : list. The output texts file datasets. len : int. The length of the dataset. """ inputs = [] outputs = [] type = "" with open(file_name, encoding='utf-8') as f: json_data = json.load(f) type = json_data["type"] for line in json_data["instances"]: inputs.append(line["input"]) outputs.append(line["output"]) print(f"load dataset {file_name} success.\n") print(f"Type : {type}, datasize : {len(outputs)}") return inputs, outputs, len(outputs)
[docs] def batchlize(examples: list, batch_size: int, random_shuffle: bool): """ Convert examples to a dataloader. Parameters ------------ examples : list. Data list. batch_size : int. random_shuffle : bool If true, the dataloader shuffle the training data. Returns ------------ dataloader: Dataloader with batch generator. """ size = 0 dataloader = [] length = len(examples) if (random_shuffle): random.shuffle(examples) while size < length: if length - size > batch_size: dataloader.append(examples[size : size+batch_size]) size += batch_size else: dataloader.append(examples[size : size+(length-size)]) size += (length - size) return dataloader
[docs] def answer_extraction(response, answer_type=None): #use this funtion to extract answers from generated text """ Use this funtion to extract answers from generated text Parameters ------------ args : Arguments. response : str plain string response. Returns ------------ answer: Decoded answer (such as A, B, C, D, E for mutiple-choice QA). """ # temp = response["generated_text"] temp = response if answer_type in ("gsm8k", "svamp", "asdiv", "addsub", "singleeq", "multiarith", "math"): temp = temp.replace(",", "") temp = [s for s in re.findall(r'-?\d+\.?\d*', temp)] elif answer_type in ("aqua", "csqa", "multiple_choice"): temp = re.findall(r'A|B|C|D|E', temp) elif answer_type in ("strategyqa", "coin_flip"): temp = temp.lower() temp = re.sub("\"|\'|\n|\.|\s|\:|\,"," ", temp) temp = temp.split(" ") temp = [i for i in temp if i in ("yes", "no")] elif answer_type in ("last_letters"): temp = re.sub("\"|\'|\n|\.|\s","", temp) temp = [temp] elif answer_type in ("pubmedqa", "binary_choice"): # pattern = "Output: (yes|no|maybe)" # sttr = re.search(pattern, temp) # answer = sttr.group(0)[8:] if sttr is not None else "N/A" pattern = "(answer|Answer|ANSWER|output|Output|OUTPUT|A): \(*(yes|Yes|YES|no|No|NO|maybe|Maybe|MAYBE)" sttr = re.search(pattern, temp) if sttr is not None: mid_answer = sttr.group(0) mid_answer = mid_answer.split(":")[-1].strip() answer = mid_answer.lower() else: pattern = "(yes|Yes|YES|no|No|NO|maybe|Maybe|MAYBE)(\.|\s)" sttr = re.search(pattern, temp) if sttr is not None: answer = sttr.group(0)[:-1].lower() else: answer = "N/A" return answer elif answer_type == "medmcqa": # pattern = "Output: (A|B|C|D)." # sttr = re.search(pattern, temp) # answer = sttr.group(0)[8:-1].lower() if sttr is not None else "N/A" pattern = "(answer|Answer|ANSWER|output|Output|OUTPUT|A): \(*(A|B|C|D|a|b|c|d)" sttr = re.search(pattern, temp) if sttr is not None: mid_answer = sttr.group(0) answer = mid_answer[-1].lower() else: pattern = "\(*(A|B|C|D|a|b|c|d)\)*(\.|\s)" sttr = re.search(pattern, temp) if sttr is not None: if '(' in sttr.group(0): answer = sttr.group(0)[1].lower() else: answer = sttr.group(0)[0].lower() else: answer = "N/A" return answer elif answer_type == "usmle": # pattern = "Output: (A|B|C|D)." # sttr = re.search(pattern, temp) # answer = sttr.group(0)[8:-1].lower() if sttr is not None else "N/A" pattern = "(Answer|Output|A): \(*(A|B|C|D|a|b|c|d)" sttr = re.search(pattern, temp) if sttr is not None: mid_answer = sttr.group(0) answer = mid_answer[-1].lower() else: pattern = "\(*(A|B|C|D|a|b|c|d)\)*(\.|\s)" sttr = re.search(pattern, temp) if sttr is not None: if '(' in sttr.group(0): answer = sttr.group(0)[1].lower() else: answer = sttr.group(0)[0].lower() else: answer = "N/A" return answer elif answer_type == "text": return response else: raise NotImplementedError(f"Unsupported answer type: {answer_type}") if len(temp) != 0: answer = temp[-1] # if there is . at the end of answer, remove it # e.g. answer = 64. if answer != "": if answer[-1] == ".": answer = answer[:-1] # round the answer to nearest integer if answer_type in ("gsm8k", "svamp"): try: answer = str(round(float(answer))) except: answer = "" # no sol or sol doesn't have valid format elif answer_type in ("last_letters"): try: answer = answer[-args.concat_length:] except: answer = "" else: answer = "" return answer
[docs] def process_image_flag(text, image_flag="<ImageHere>"): texts = text.split(image_flag) if len(texts) > 1: image_token_indexes = [len(text) for text in texts[:-1]] else: image_token_indexes = [] # cumsun image_token_indexes = list(np.cumsum(image_token_indexes)) texts = "".join(texts) return texts, image_token_indexes
[docs] class VLLMInferenceResultWithInput(TypedDict):
[docs] input: str
[docs] output: Union[List[str], List[List[int]]]
[docs] class RewardModelInferenceResultWithInput(TypedDict):
[docs] input: str
[docs] output: List[Dict[str, Union[str, float]]] # [{"score": 0.5, "text": "output text"}]