import torch import torch.nn as nn import torch.optim as optim import numpy as np from fastapi import FastAPI, UploadFile, File from sklearn.metrics import mean_squared_error import pandas as pd from sklearn.model_selection import train_test_split import csv import io # Define the DNN model class DNN(nn.Module): def __init__(self, input_size, hidden_size, output_size, num_hidden_layers): super(DNN, self).__init__() self.fc1 = nn.Linear(input_size, hidden_size) self.relu1 = nn.ReLU() self.hidden_layers = nn.ModuleList() for _ in range(num_hidden_layers): self.hidden_layers.append(nn.Linear(hidden_size, hidden_size)) self.hidden_layers.append(nn.ReLU()) self.fc3 = nn.Linear(hidden_size, output_size) def forward(self, x): x = self.fc1(x) x = self.relu1(x) for layer in self.hidden_layers: x = layer(x) x = self.fc3(x) return x # Load the model model = DNN(input_size=6, hidden_size=64, output_size=1, num_hidden_layers=32) device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") model = model.to(device) model.load_state_dict(torch.load("model_weights.pth", map_location=device)) # Create a new FastAPI app instance app = FastAPI(docs_url="/", redoc_url="/new_redoc") # Create a POST endpoint @app.get( "/generate/{Soil_Quality}/{Seed_Variety}/{Fertilizer_Amount_kg_per_hectare}/{Sunny_Days}/{Rainfall_mm}/{Irrigation_Schedule}" ) def generate( Soil_Quality: float, Seed_Variety: float, Fertilizer_Amount_kg_per_hectare: float, Sunny_Days: float, Rainfall_mm: float, Irrigation_Schedule: float, ): global model # Combine all inputs input_data = [ Soil_Quality, Seed_Variety, Fertilizer_Amount_kg_per_hectare, Sunny_Days, Rainfall_mm, Irrigation_Schedule, ] input_data = torch.tensor([input_data], dtype=torch.float32) input_data = input_data.to(device) prediction = model(input_data) return {"prediction": prediction.item()} @app.post("/train") async def train( trainDatafile: UploadFile = File(...), testDatafile: UploadFile = File(...), epochs: int = 100, ): global model contents1 = await trainDatafile.read() train_data = pd.read_csv(io.StringIO(contents1.decode("utf-8"))) contents2 = await testDatafile.read() test_data = pd.read_csv(io.StringIO(contents2.decode("utf-8"))) # Convert data to numpy arrays X_train = train_data.drop("Yield_kg_per_hectare", axis=1).values y_train = train_data["Yield_kg_per_hectare"].values X_test = test_data.drop("Yield_kg_per_hectare", axis=1).values y_test = test_data["Yield_kg_per_hectare"].values # Convert data to torch tensors X_train = torch.tensor(X_train, dtype=torch.float32) X_train = X_train.to(device) y_train = torch.tensor(y_train, dtype=torch.float32) y_train = y_train.to(device) X_test = torch.tensor(X_test, dtype=torch.float32) X_test = X_test.to(device) y_test = torch.tensor(y_test, dtype=torch.float32) # Define loss function and optimizer criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=0.001) rmseList = [] for epoch in range(epochs): optimizer.zero_grad() # Forward pass outputs = model(X_train) loss = criterion(outputs, y_train.unsqueeze(1)) # Backward pass and optimization loss.backward() optimizer.step() predictions = model(X_test) rmse = np.sqrt( mean_squared_error( y_test.cpu().detach().numpy(), predictions.cpu().detach().numpy() ) ) print( f"Epoch: {epoch+1}, RMSE: {float(rmse)}, Loss: {float(np.sqrt(loss.cpu().detach().numpy()))}" ) rmseList.append(float(rmse)) torch.save(model.state_dict(), "model_weights.pth") return {"rmse": rmseList}