import torch import torch.nn as nn import torch.optim as optim import numpy as np from fastapi import FastAPI, UploadFile, File from sklearn.model_selection import KFold from sklearn.metrics import mean_squared_error from sklearn.preprocessing import OneHotEncoder import csv import io from joblib import load, dump # Define the DNN model class DNN(nn.Module): def __init__(self, input_size, hidden_size, output_size): super(DNN, self).__init__() self.fc1 = nn.Linear(input_size, hidden_size) self.relu1 = nn.ReLU() self.fc2 = nn.Linear(hidden_size, hidden_size) self.relu2 = nn.ReLU() self.fc3 = nn.Linear(hidden_size, output_size) def forward(self, x): x = self.fc1(x) x = self.relu1(x) x = self.fc2(x) x = self.relu2(x) x = self.fc3(x) return x # Load the model model = DNN(input_size=7, hidden_size=128, output_size=1) # Initialize the OneHotEncoder encoder = OneHotEncoder(handle_unknown="ignore") # Create a new FastAPI app instance app = FastAPI(docs_url="/", redoc_url="/new_redoc") # Create a POST endpoint @app.get("/generate/{squareFeet}/{bedrooms}/{bathrooms}/{neighborhood}/{yearBuilt}") def generate( squareFeet: float, bedrooms: float, bathrooms: float, neighborhood: str, yearBuilt: float, ): global model, encoder # Apply the encoder to the neighborhood input neighborhood_encoded = encoder.transform([[neighborhood]]).toarray()[0] # Combine all inputs input_data = [squareFeet, bedrooms, bathrooms, *neighborhood_encoded, yearBuilt] input_data = torch.tensor([input_data], dtype=torch.float32) prediction = model(input_data) return {"output": prediction.item()} @app.post("/train") async def train(file: UploadFile = File(...)): global model, encoder contents = await file.read() data = list(csv.reader(io.StringIO(contents.decode("utf-8")))) data_np = np.array(data[1:], dtype=object) # Delete the fourth column encoded_columns = encoder.fit_transform(data_np[:, 3].reshape(-1, 1)) data_np = np.delete(data_np, 3, axis=1) data_np = np.concatenate((data_np, encoded_columns.toarray()), axis=1) data_np = np.array(data_np, dtype=float) # All columns except the last X = data_np[:, :-1] # Only the last column y = data_np[:, -1] y = np.ravel(y) # Convert data to torch tensors X = torch.tensor(X, dtype=torch.float32) y = torch.tensor(y, dtype=torch.float32) # Define loss function and optimizer criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=0.0001) # Fit the model kf = KFold(n_splits=4) accuracies = [] epochs = 25 # Define the number of epochs for epoch in range(epochs): for train_index, test_index in kf.split(X): X_train, X_test = X[train_index], X[test_index] y_train, y_test = y[train_index], y[test_index] optimizer.zero_grad() # Forward pass outputs = model(X_train) loss = criterion(outputs, y_train.unsqueeze(1)) # Backward pass and optimization loss.backward() optimizer.step() predictions = model(X_test) rmse = np.sqrt(mean_squared_error(y_test, predictions.detach().numpy())) accuracies.append(rmse) average_rmse = sum(accuracies) / len(accuracies) print(f"Epoch: {epoch+1}, Average RMSE: {average_rmse}") dump(model, "model.joblib") return {"filename": file.filename, "average_rmse": average_rmse}