by Grayson Adkins, updated February 19, 2024
This notebook provides several utility functions I use for creating and preparing datasets for training and testing.
You want to learn how to:
Note
If you're interested in web-scale datasets like Common Crawl, I recommend checking out the data pipeline scripts from Hugging Face used to generate the OBELICS dataset. They include advanced techniques for deduplication, extraction, cleaning, and filtering Web ARChive (WARC) files.
Crafting quality datasets is one of the most important steps in traning or fine-tuning machine learning systems. By "quality" I mean that the data is relevant to your target domain and representative of the task your system is being designed for. Quality data is also unbiased and free of excessive errors or noise. While quality datasets in machine learning are often large, they don't have to be—especially when fine-tuning an LLM that has already been imbued with advanced reasoning (See Fine-tuning Mistral 7B for Function Calling for an example of a small, but high-quality dataset).
In addition to data quality, data format is also important. The format of your data should depend on its intended use case.
Supervised Fine-tuning
Fine-tuning for chat or function calling is done via supervised learning, where the dataset used for fine-tuning is often in a question-and-answer (Q&A) format—the questions and answers correspoding to inputs and labels, respectively. In this notebook, I illustrate how you can use existing language models to convert raw text into a Q&A format for supervised fine-tuning.
Note that when fine-tuning an LLM that will be used in a chat or function calling format, it is best to start with a model that has already been fine-tuned for chat (as opposed to starting with a base model).
Unsupervised Fine-tuning
When fine-tuning a large base model, on the other hand, it's often impractical or too expensive to format data as Q&A pairs, given the scale of data required for effectively fine-tuning a model with billions of parameters. Therefore we typically fine-tuning using sequences of text in an unsupervised manner, simply performing next-token prediction. It's important to remember, though, that if the fine-tuned model will be used for a chat application, you'll need to do supervised fine-tuning afterwards with a smaller Q&A dataset.
For many of the functions below, I give examples from the following dataset:
from google.colab import drive
drive.mount('/content/drive')
import os
cache_dir = "/content/drive/My Drive/my_dataset_cache"
os.makedirs(cache_dir, exist_ok=True) # Ensure the directory exists
# https://stackoverflow.com/questions/56081324/why-are-google-colab-shell-commands-not-working
import locale
def getpreferredencoding(do_setlocale = True):
return "UTF-8"
locale.getpreferredencoding = getpreferredencoding
# pdf_to_txt.py
import PyPDF2
import os
def pdf_to_text(pdf_path, txt_path):
with open(pdf_path, 'rb') as pdf_file:
pdf_reader = PyPDF2.PdfReader(pdf_file)
text = ""
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
text += page.extract_text()
with open(txt_path, 'w', encoding='utf-8') as f:
f.write(text)
# For PDFs stored locally
summary = []
paths_to_check = ['/path/to/train.pdf', 'path/to/test.pdf']
for pdf_path in paths_to_check:
# Extract the base name to create txt file name
base_name = os.path.basename(pdf_path).split('.')[0]
txt_path = f'data/{base_name}.txt'
# Check if PDF exists
if os.path.exists(pdf_path):
pdf_to_text(pdf_path, txt_path)
summary.append(f"Converted {pdf_path} to {txt_path}.")
else:
summary.append(f"{pdf_path} was not found.")
# Print summary
print("Summary:")
for item in summary:
print(f"- {item}")
In this function, we use GPT-3.5-turbo to clean up a dataset of raw text, simplifying the formatting and removing special characters and markdown notation. Additionally, we as the LLM to create test data based on the raw input data.
⚠️ Warning
OpenAI Terms of Service forbid using ChatGPT or the APIs to generate datasets for use in training competitive models. However, you can create an API for your own open-source model like Llama 2 70B to create Q&A datasets for commercial purposes. See the next section for how to host your own open-source model on Runpod to perform cleaning.
# clean_train_create_test.py
import os
import time
import openai
from dotenv import load_dotenv
import tiktoken
# Function to count tokens using tiktoken
def count_tokens(text):
encoding = tiktoken.get_encoding("cl100k_base")
token_count = sum(1 for _ in encoding.encode(text))
return token_count
# Function to read and chunk the text file
def read_and_chunk_txt(file_path):
chunks = []
chunk = ""
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
text = line.strip()
if count_tokens(chunk + text) > 4000:
chunks.append(chunk.strip())
chunk = text
else:
chunk += " " + text
if chunk:
chunks.append(chunk.strip())
return chunks
# Load environment variables for OpenAI API key
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")
# Read and chunk the text from the txt file
chunks = read_and_chunk_txt("data/raw_train.txt")
# Inform user about the total number of tokens and estimated cost
total_tokens = sum(count_tokens(chunk) for chunk in chunks)
print(f"Total tokens in all chunks: {total_tokens}")
estimated_input_tokens = total_tokens * 2 # Input tokens (prompt repeated 3 times)
estimated_output_tokens = total_tokens * 1.2 # Output tokens (approximate)
total_estimated_tokens = estimated_input_tokens + estimated_output_tokens
estimated_cost_gpt4 = estimated_input_tokens / 1000 * 0.03 + estimated_output_tokens / 1000 * 0.06
estimated_cost_gpt35turbo16k = estimated_input_tokens / 1000 * 0.003 + estimated_output_tokens / 1000 * 0.004
print(f"Estimated cost with gpt-4: ${estimated_cost_gpt4:.2f}")
print(f"Estimated cost with gpt-3.5-turbo-16k: ${estimated_cost_gpt35turbo16k:.2f}")
# Ask the user how many chunks they want to process
while True:
process_option = input("Do you want to process one chunk (type 'one') or all chunks (type 'all')? ").strip().lower()
if process_option in ['one', 'all']:
break
else:
print("Invalid option. Please enter 'one' or 'all'.")
# Open output files
with open("data/train.txt", "w", encoding='utf-8') as train_file, \
open("data/test.txt", "w", encoding='utf-8') as test_file:
for snippet in [
"Clean up the above information and respond with the complete text. \
Avoid using special characters. Keep formatting very simple and suitable \
for a .txt file. Do not respond in markdown.",
"Select and re-phrase ten key takeaways from the above text without \
changing their meaning. Respond in plain text. Leave out any numbers or \
bullets."
]:
for idx, chunk in enumerate(chunks):
if process_option == 'one' and idx > 0:
break
prompt = f"{chunk}\n\n{snippet}"
# Make the API call
completion = openai.ChatCompletion.create(
# model="gpt-4",
model="gpt-3.5-turbo-16k",
temperature=0,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
)
# Extract and write the response
response = completion.choices[0].message['content']
if snippet.startswith("Clean up"):
train_file.write(response + "\n\n")
train_file.flush() # Flush the buffer to ensure the content is written
else:
test_file.write(response + "\n\n")
test_file.flush() # Flush the buffer to ensure the content is written
# Sleep for 200 ms between API calls
time.sleep(0.2)
This function uses LLMs to generate question-answer pairs from chunks of raw text. Q&A datasets are useful when fine-tuning a chat fine-tuned model.
⚠️ Warning
OpenAI Terms of Service forbid using ChatGPT or the APIs to generate datasets for use in training competitive models. However, you can create an API for your own open-source model like Llama 2 70B to create Q&A datasets for commercial purposes. In the example function below, the user can choose between using OpenAI or an open-source model hosted on Runpod.
# create_qa.py
import os
import time
import openai
from dotenv import load_dotenv
import tiktoken
import requests
import json
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
# Context
# If this is too small, the model will be dealing with lots of fragmented inputs
model_context_length = 1000
context = 'Context: WHO COVID-19 Epidemiological Update - Edition 163'
# Reduce this parameter to increase the granularity of questions
# !!! Reducing too much may cause language model to hallucinate content
tokens_per_question = 30
# Approx. 60 tokens per Q&A pair. Note: GPT gets confused making too many questions up
chunk_size = min(model_context_length / (1 + 60/tokens_per_question),25*tokens_per_question)
# chunk_size = 200 # for testing, set a smaller chunk size.
train_sample = "According to Edition 163 of the WHO COVID-19 Epidemiolocical Update, \
which WHO Region saw the highest number of new COVID-19 cases?\nEurope had the \
highest number of new COVID-19 cases with 701,053, or 63% of new cases worldwide \
for the period of 11 December 2023 to 7 January 2024."
test_sample = "How many countries reported new ICU admissions in Edition 163 of the \
WHO COVID-19 Epidemiological Update?\n42 countries reported new ICU admissions in \
Edition 163 of the WHO COVID-19 Epidemiological Update."
questions_per_chunk_train = int(chunk_size / tokens_per_question)
questions_per_chunk_test = max(int(questions_per_chunk_train / 10),1)
print(f'Setting {questions_per_chunk_train} questions per {int(chunk_size)}-token chunk for QA train dataset generation.')
def count_tokens(text):
encoding = tiktoken.get_encoding("cl100k_base")
token_count = sum(1 for _ in encoding.encode(text))
return token_count
def read_and_chunk_txt(file_path):
chunks = []
chunk = ""
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
text = line.strip()
if count_tokens(chunk + text) > chunk_size:
chunks.append(chunk.strip())
chunk = text
else:
chunk += " " + text
if chunk:
chunks.append(chunk.strip())
return chunks
# OpenAI Terms of Service forbid using ChatGPT to generate datasets for use
# in training competitive models. By using your own open-source model via API
# hosted on Runpod, you can create Q&A datasets for commercial purposes.
def query_runpod(pod_id, prompt, max_tokens):
url = f"https://{pod_id}-8080.proxy.runpod.net/generate"
prompt = f'[INST] {prompt} [/INST]\n\n'
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": max_tokens,
"do_sample": False,
"stop": [
"</s>",
"[INST]"
]
}
}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, data=json.dumps(payload), headers=headers)
# print(url)
if response.status_code == 200:
return json.loads(response.text)["generated_text"]
else:
return None
load_dotenv()
api_choice = input("Choose the API to use (openai/runpod): ").strip().lower()
if api_choice == "openai":
openai.api_key = os.getenv("OPENAI_API_KEY")
if not openai.api_key:
print("OpenAI API key is missing. Exiting.")
exit(1)
elif api_choice == "runpod":
pod_id = os.getenv("RUNPOD_POD_ID")
if not pod_id:
print("RunPod Pod ID is missing. Exiting.")
exit(1)
else:
print("Invalid API choice. Exiting.")
exit(1)
chunks = read_and_chunk_txt("data/raw_train.txt")
total_tokens = sum(count_tokens(chunk) for chunk in chunks)
print(f"Total tokens in all chunks: {total_tokens}")
estimated_input_tokens = total_tokens * 1.1
estimated_output_tokens = total_tokens * 50/tokens_per_question
total_estimated_tokens = estimated_input_tokens + estimated_output_tokens
estimated_cost_gpt4 = (estimated_input_tokens / 1000 * 0.03) + \
(estimated_output_tokens / 1000 * 0.06)
estimated_cost_gpt35turbo = (estimated_input_tokens / 1000 * 0.003) + \
(estimated_output_tokens / 1000 * 0.004)
print(f"Estimated cost with gpt-4: ${estimated_cost_gpt4:.2f}")
print(f"Estimated cost with gpt-3.5-turbo-16k: ${estimated_cost_gpt35turbo:.2f}")
while True:
process_option = input("Do you want to process one chunk (type 'one') or all \
chunks (type 'all')? ").strip().lower()
if process_option in ['one', 'all']:
break
else:
print("Invalid option. Please enter 'one' or 'all'.")
snippets = [
f"Provide {questions_per_chunk_train} question and answer pair(s) based on \
the text above. The questions must begin with \"In the context of ...\". The \
answers should borrow, verbatim, from the text above. In providing each \
question, consider that the reader does not see or have access to any of the \
other questions for context. Vary the style and format of questions. Respond \
in plain text on a new line for each question and answer. Do not include \
question numbers. Here is an example of two question answer pairs:\n\n{train_sample}",
f"Provide {questions_per_chunk_test} question and answer pair(s) based on the \
text above. The questions must begin with \"In the context of ...\". The \
answers should NOT borrow verbatim from the text above, but they should \
maintain the meaning. In providing each question, consider that the reader \
does not see or have access to any of the other questions for context. Vary \
the style and format of questions. Respond in plain text on a new line for \
each question and answer. Do not include question numbers. Here is an example \
of two question answer pairs:\n\n{test_sample}"
]
for idx, snippet in enumerate(snippets):
print(snippet)
output_filename = "data/train.txt" if idx == 0 else "data/test.txt"
with open(output_filename, "w", encoding='utf-8') as output_file:
if api_choice == "openai":
for chunk_idx, chunk in enumerate(chunks):
prompt = f"{context}\n\n{chunk}\n\n{snippet}"
if process_option == 'one':
print(f"\n\n{prompt}")
if process_option == 'one' and chunk_idx > 0:
break
completion = openai.ChatCompletion.create(
# model="gpt-4",
model="gpt-3.5-turbo-16k",
temperature=0,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
)
response = completion.choices[0].message['content']
output_file.write(response + "\n\n")
output_file.flush()
time.sleep(0.2)
elif api_choice == "runpod":
max_tokens = int(model_context_length * 0.9)
if process_option == 'all':
with ThreadPoolExecutor(max_workers=8) as executor:
future_to_chunk = {executor.submit(query_runpod, pod_id, \
f"{context}\n\n{chunk}\n\n{snippet}", \
max_tokens): chunk for chunk in chunks}
for future in concurrent.futures.as_completed(future_to_chunk):
chunk = future_to_chunk[future]
try:
response = future.result()
except Exception as exc:
print(f"Generated an exception: {exc}")
else:
output_file.write(response + "\n\n")
output_file.flush()
else:
for chunk in chunks:
response = query_runpod(pod_id, f"{context}\n\n{chunk}\n\n{snippet}", \
max_tokens)
output_file.write(response + "\n\n")
output_file.flush()
if process_option == 'one':
break
# qa_to_csv.py
import csv
import os
from transformers import AutoTokenizer
# Read questions and answers from txt file and write to CSV file
def q_and_a_to_csv(input_file_path, output_file_path):
tokenizer = AutoTokenizer.from_pretrained("TheBloke/Yarn-Llama-2-7B-128K-GPTQ", use_fast=True)
max_tokens = 0
with open(input_file_path, 'r') as input_file:
lines = input_file.readlines()
prompts_and_completions = []
# Remove empty lines and strip leading and trailing whitespaces
lines = [line.strip() for line in lines if line.strip()]
# Loop through lines two at a time to get question and answer pairs
for i in range(0, len(lines), 2):
prompt = lines[i]
completion = lines[i + 1]
# Tokenize the lines and update max_tokens if necessary
prompt_tokens = tokenizer.tokenize(prompt)
completion_tokens = tokenizer.tokenize(completion)
max_tokens = max(max_tokens, len(prompt_tokens) + len(completion_tokens))
prompts_and_completions.append((prompt, completion))
# Write list of (prompt, completion) pairs to CSV file
with open(output_file_path, 'w', newline='') as output_file:
csv_writer = csv.writer(output_file, quotechar='"', quoting=csv.QUOTE_ALL)
csv_writer.writerow(['prompt', 'completion']) # Write the header row
csv_writer.writerows(prompts_and_completions)
print(f"The maximum number of tokens (prompt + completion) in a row of \
{output_file_path} is {max_tokens}")
train_input_file_path = 'path/to/train.txt'
train_output_file_path = 'path/to/train.csv'
q_and_a_to_csv(train_input_file_path, train_output_file_path)
# If test.txt exists, convert to text.csv
text_input_file_path = 'data/test.txt'
text_output_file_path = 'data/test.csv'
if os.path.exists(text_input_file_path):
q_and_a_to_csv(text_input_file_path, text_output_file_path)
else:
print("test.txt does not exist, skipping its conversion.")
# txt_to_csv.py
import csv
import os
def find_sentence_end(text):
sentence_endings = [". ", "! ", "? ", "</s>"]
for i in reversed(range(len(text))):
if text[i:i + 2] in sentence_endings:
return i + 1 # +1 space for sentence-ending character
return len(text)
def text_to_csv(txt_path, csv_path, max_tokens):
if os.path.exists(txt_path):
with open(txt_path, 'r') as f:
text = f.read()
words = text.split()
total_tokens = 0
chunks = []
chunk_words = []
approx_tokens_in_chunk = 0
for word in words:
approx_tokens_in_word = len(word.split()) * 1.33 # ~1.33 tokens per word
if (approx_tokens_in_chunk + approx_tokens_in_word) < max_tokens:
chunk_words.append(word)
approx_tokens_in_chunk += approx_tokens_in_word
else:
chunk_text = " ".join(chunk_words)
end_index = find_sentence_end(chunk_text)
final_chunk = chunk_text[:end_index].strip()
total_tokens += len(final_chunk.split()) * 1.33
chunks.append(final_chunk)
chunk_words = chunk_words[len(final_chunk.split()):]
chunk_words.append(word)
approx_tokens_in_chunk = len(" ".join(chunk_words).split()) * 1.33
# For the remaining text
if chunk_words:
final_chunk = " ".join(chunk_words)
chunks.append(final_chunk)
total_tokens += len(final_chunk.split()) * 1.33
# Write chunks to CSV
with open(csv_path, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["Text"])
for chunk in chunks:
writer.writerow([chunk])
return f"Converted {txt_path} to {csv_path} with approximately \
{total_tokens} tokens.", total_tokens
else:
return f"{txt_path} was not found.", 0
def main():
data_length = input("Enter the maximum number of tokens for each data row \
(default is 1000): ")
if not data_length:
data_length = 1000
else:
data_length = int(data_length)
summary = []
total_tokens = {}
for file_name in ['train', 'test']:
txt_path = f"data/{file_name}.txt"
csv_path = f"data/{file_name}.csv"
action_summary, token_count = text_to_csv(txt_path, csv_path, data_length)
summary.append(action_summary)
total_tokens[file_name] = token_count
print("Summary:")
for item in summary:
print(f"- {item}")
print(f"Total tokens in train.txt: {total_tokens.get('train', 'N/A')}")
print(f"Total tokens in test.txt: {total_tokens.get('test', 'N/A')}")
if __name__ == "__main__":
main()
# upload_to_hf.py
from huggingface_hub import HfApi, login
import os
def upload_to_hf_hub(repo_id):
api = HfApi()
files_to_upload = ["data/train.csv", "data/test.csv", "data/README.md"]
uploaded_files = []
# Upload each file if it exists
for file_path in files_to_upload:
if os.path.exists(file_path):
print(f"Uploading {file_path}...")
api.upload_file(
path_or_fileobj=file_path,
path_in_repo=os.path.basename(file_path), # Filename, not path
repo_id=repo_id,
repo_type="dataset",
)
print(f"Uploaded {file_path}.")
uploaded_files.append(file_path)
else:
print(f"{file_path} does not exist, skipping.")
print("\nSummary:")
if uploaded_files:
print("Uploaded files:")
for file in uploaded_files:
print(f"- {file}")
else:
print("No files were uploaded.")
def main():
print("Logging in to Hugging Face account...")
login()
repo_id = input("Enter the path to the Hugging Face dataset repo (e.g. username/repo_name): ")
upload_to_hf_hub(repo_id)
if __name__ == "__main__":
main()
This function performs a similarity search between a given question and a set of training examples using cosine similarity on embeddings generated by a pre-trained language model. It then prints out the top similar examples along with their similarity scores.
# GPTQ
!pip install -q -U transformers peft accelerate optimum
!pip install auto-gptq --extra-index-url https://huggingface.github.io/autogptq-index/whl/cu118/
!pip install -q datasets
from transformers import AutoTokenizer, AutoModelForCausalLM
# Define the model ID
# model_id = "TheBloke/Llama-2-7B-chat-GPTQ"
model_id = "TheBloke/Llama-2-13B-chat-GPTQ"
# Load the model from pretrained
model = AutoModelForCausalLM.from_pretrained(
model_id, # Model identifier
device_map="auto", # Automatically choose device
cache_dir=cache_dir # Cache directory (assuming 'cache_dir' is defined somewhere)
)
# Instantiate the tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_id)
from datasets import load_dataset
# Load dataset
data = load_dataset("/dataset/on/hugging-face")
# Print first row of 'train' and 'test'
print("First row of train:", data['train'][0])
print("First row of test:", data['test'][0])
# Map tokenizer function to 'Text' column of each sample in the dataset
data = data.map(
lambda samples: tokenizer(samples["Text"]), # Tokenize 'Text' column
batched=True # Process in batches
)
import torch
from torch.nn.functional import cosine_similarity
import numpy as np
# Number of samples to find
n_samples = 3
# Function to get average embeddings
def get_avg_embedding(input_ids):
with torch.no_grad():
# Compute embeddings
embeddings = model.get_input_embeddings()(input_ids)
# Calculate average embedding
avg_embedding = torch.mean(embeddings, dim=1)
return avg_embedding
# Calculate average embeddings for each row in 'train' dataset
train_embeddings = []
for example in data['train']:
# Convert input_ids to tensor and move to CUDA
input_ids = torch.tensor(example['input_ids']).unsqueeze(0).to("cuda")
# Compute average embedding
avg_embedding = get_avg_embedding(input_ids)
# Append to list of train embeddings
train_embeddings.append(avg_embedding)
# Stack train embeddings and remove singleton dimensions
train_embeddings = torch.stack(train_embeddings).squeeze()
# Tokenize and get embedding for the question
question = "Which countries in the Americas saw a decrease in new hospitalizations \
compared to the previous period?" # Correct answer: Canada, Saint Lucia, Honduras
# Tokenize the question and move to CUDA
question_input_ids = tokenizer(question, return_tensors="pt", truncation=True, max_length=500)["input_ids"].to("cuda")
# Compute average embedding for the question and remove singleton dimensions
question_embedding = get_avg_embedding(question_input_ids).squeeze()
# Calculate cosine similarity between question and each row in 'train'
cosine_similarities = cosine_similarity(question_embedding.unsqueeze(0), train_embeddings, dim=1).cpu().float()
# Convert to float32 and move to CPU
# Sort and find top n_samples most similar rows
top_indices = torch.topk(cosine_similarities, n_samples).indices.tolist()
for idx in top_indices:
print("Similarity Score:", cosine_similarities[idx].item())
print("Text:", data['train'][idx]['Text'])
print("---")