Spaces:
Sleeping
Sleeping
import pickle | |
from transformers import AutoModel, AutoTokenizer | |
from tqdm import tqdm | |
import pandas as pd | |
import torch | |
import numpy as np | |
from pyspark.sql import SparkSession | |
import time | |
# Paths to JSON data files | |
TRAIN_DATA = "data/train_data_162k.json" | |
TEST_DATA = "data/test_data_162k.json" | |
VAL_DATA = "data/val_data_162k.json" | |
# Function to load BERT model and tokenizer | |
def load_bert(): | |
v_phobert = AutoModel.from_pretrained("vinai/phobert-base-v2") | |
v_tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base-v2", use_fast=False) | |
return v_phobert, v_tokenizer | |
# Load BERT model and tokenizer | |
phobert, tokenizer = load_bert() | |
print("Load model done!") | |
# Initialize SparkSession | |
spark = SparkSession.builder \ | |
.appName("Feature Extraction") \ | |
.master("local[*]") \ | |
.config("spark.executor.memory", "50g") \ | |
.config("spark.executor.instances", "4") \ | |
.config("spark.executor.cores", "12") \ | |
.config("spark.memory.offHeap.enabled", True) \ | |
.config("spark.driver.memory", "50g") \ | |
.config("spark.memory.offHeap.size", "16g") \ | |
.config("spark.ui.showConsoleProgress", False) \ | |
.config("spark.driver.maxResultSize", "16g") \ | |
.config("spark.log.level", "ERROR") \ | |
.getOrCreate() | |
# Load JSON data into Spark DataFrames | |
train_data = spark.read.json(TRAIN_DATA) | |
test_data = spark.read.json(TEST_DATA) | |
val_data = spark.read.json(VAL_DATA) | |
print("Load data done!") | |
# Function to extract BERT features from text | |
def make_bert_features(v_text): | |
v_tokenized = [] | |
max_len = 256 # Maximum sequence length | |
# Use tqdm to display progress | |
for i_text in v_text: | |
# Tokenize using BERT tokenizer | |
line = tokenizer.encode(i_text, truncation=True) | |
v_tokenized.append(line) | |
# Pad sequences to ensure consistent length | |
padded = [] | |
for i in v_tokenized: | |
if len(i) < max_len: | |
padded.append(i + [1] * (max_len - len(i))) # Padding with 1s | |
else: | |
padded.append(i[:max_len]) # Truncate if sequence is too long | |
padded = np.array(padded) | |
# Create attention mask | |
attention_mask = np.where(padded == 1, 0, 1) | |
# Convert to PyTorch tensors | |
padded = torch.tensor(padded).to(torch.long) | |
attention_mask = torch.tensor(attention_mask) | |
# Obtain features from BERT | |
with torch.no_grad(): | |
last_hidden_states = phobert(input_ids=padded, attention_mask=attention_mask) | |
v_features = last_hidden_states[0][:, 0, :].numpy() | |
print(v_features.shape) | |
return v_features | |
# Extract BERT features for train, test, and validation datasets | |
train_features = train_data.select("processed_content").rdd.map(make_bert_features) | |
test_features = test_data.select("processed_content").rdd.map(make_bert_features) | |
val_features = val_data.select("processed_content").rdd.map(make_bert_features) | |
# Convert category column to lists | |
category_list_train = train_data.select("category").rdd.flatMap(lambda x: x).collect() | |
category_list_test = test_data.select("category").rdd.flatMap(lambda x: x).collect() | |
category_list_val = val_data.select("category").rdd.flatMap(lambda x: x).collect() | |
# Convert to one-hot encoding using pd.get_dummies() | |
y_train = pd.get_dummies(category_list_train) | |
y_test = pd.get_dummies(category_list_test) | |
y_val = pd.get_dummies(category_list_val) | |
# Save data to file using pickle | |
start_time = time.time() | |
print("Saving to file") | |
data_dict = { | |
'X_train': train_features.collect(), | |
'X_test': test_features.collect(), | |
'X_val': val_features.collect(), | |
'y_train': y_train, | |
'y_test': y_test, | |
'y_val': y_val | |
} | |
# Save dictionary to pickle file | |
with open('data/features_162k_phobertbase_v2.pkl', 'wb') as f: | |
pickle.dump(data_dict, f) | |
end_time = time.time() | |
duration = end_time - start_time | |
print(f'Total feature extraction time: {duration:.2f} seconds') | |
print("Done!") | |