Spaces:
Running
Running
File size: 4,896 Bytes
667fe9d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
from __future__ import annotations
import warnings
from pathlib import Path
from typing import TYPE_CHECKING
import click
import joblib
import pandas as pd
from numpy.random import RandomState
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
if TYPE_CHECKING:
from sklearn.base import BaseEstimator
SEED = 42
DATASET_PATH = Path("data/training.1600000.processed.noemoticon.csv")
STOPWORDS_PATH = Path("data/stopwords-en.txt")
CHECKPOINT_PATH = Path("cache/pipeline.pkl")
MODELS_DIR = Path("models")
CACHE_DIR = Path("cache")
MAX_FEATURES = 10000 # 500000
# Make sure paths exist
MODELS_DIR.mkdir(parents=True, exist_ok=True)
CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Memory cache for sklearn pipelines
mem = joblib.Memory(CACHE_DIR, verbose=0)
# TODO: use xgboost
def get_random_state(seed: int = SEED) -> RandomState:
return RandomState(seed)
def load_data() -> tuple[list[str], list[int]]:
"""The model takes in a list of strings and a list of integers where 1 is positive sentiment and 0 is negative sentiment."""
data = pd.read_csv(
DATASET_PATH,
encoding="ISO-8859-1",
names=[
"target", # 0 = negative, 2 = neutral, 4 = positive
"id", # The id of the tweet
"date", # The date of the tweet
"flag", # The query, NO_QUERY if not present
"user", # The user that tweeted
"text", # The text of the tweet
],
)
# Ignore rows with neutral sentiment
data = data[data["target"] != 2]
# Create new column called "sentiment" with 1 for positive and 0 for negative
data["sentiment"] = data["target"] == 4
# Drop the columns we don't need
# data = data.drop(columns=["target", "id", "date", "flag", "user"]) # NOTE: No need, since we return the columns we need
# Return as lists
return list(data["text"]), list(data["sentiment"])
def create_pipeline(clf: BaseEstimator) -> Pipeline:
return Pipeline(
[
# Preprocess
# ("vectorize", CountVectorizer(stop_words="english", ngram_range=(1, 2), max_features=MAX_FEATURES)),
# ("tfidf", TfidfTransformer()),
("vectorize", TfidfVectorizer(ngram_range=(1, 2), max_features=MAX_FEATURES)),
# Classifier
("clf", clf),
],
memory=mem,
)
def evaluate_pipeline(pipeline: Pipeline, x: list[str], y: list[int]) -> float:
y_pred = pipeline.predict(x)
report = classification_report(y, y_pred)
click.echo(report)
# TODO: Confusion matrix
return accuracy_score(y, y_pred)
def export_pipeline(pipeline: Pipeline, name: str) -> None:
model_path = MODELS_DIR / f"{name}.pkl"
joblib.dump(pipeline, model_path)
click.echo(f"Model exported to {model_path!r}")
@click.command()
@click.option("--retrain", is_flag=True, help="Train the model even if a checkpoint exists.")
@click.option("--evaluate", is_flag=True, help="Evaluate the model.")
@click.option("--flush-cache", is_flag=True, help="Clear sklearn cache.")
@click.option("--seed", type=int, default=SEED, help="Random seed.")
def train(retrain: bool, evaluate: bool, flush_cache: bool, seed: int) -> None:
rng = get_random_state(seed)
# Clear sklearn cache
if flush_cache:
click.echo("Clearing cache... ", nl=False)
mem.clear(warn=False)
click.echo("DONE")
# Load and split data
click.echo("Loading data... ", nl=False)
x, y = load_data()
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=rng)
click.echo("DONE")
# Train model
if retrain or not CHECKPOINT_PATH.exists():
click.echo("Training model... ", nl=False)
clf = LogisticRegression(max_iter=1000, random_state=rng)
model = create_pipeline(clf)
with warnings.catch_warnings():
warnings.simplefilter("ignore") # Ignore joblib warnings
model.fit(x_train, y_train)
joblib.dump(model, CHECKPOINT_PATH)
click.echo("DONE")
else:
click.echo("Loading model... ", nl=False)
model = joblib.load(CHECKPOINT_PATH)
click.echo("DONE")
# Evaluate model
if evaluate:
evaluate_pipeline(model, x_test, y_test)
# Quick test
test_text = ["I love this movie", "I hate this movie"]
click.echo("Quick test:")
for text in test_text:
click.echo(f"\t{'positive' if model.predict([text])[0] else 'negative'}: {text}")
# Export model
click.echo("Exporting model... ", nl=False)
export_pipeline(model, "logistic_regression")
click.echo("DONE")
if __name__ == "__main__":
train()
|