00ber
added source code
f61d311
raw
history blame
8.8 kB
from flask import Flask, request
import tensorflow as tf
from datetime import datetime, timedelta
import logging
import requests
import requests_cache
import pandas as pd
import json
import numpy as np
import pickle
import math
import pytz
from flask_cors import CORS, cross_origin
session = requests_cache.CachedSession('requests-cache')
app = Flask(__name__)
cors = CORS(app)
app.config['CORS_HEADERS'] = 'Content-Type'
app.logger.setLevel(logging.INFO)
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
API_KEY = "e1f10a1e78da46f5b10a1e78da96f525"
BASE_URL = "https://api.weather.com/v1/location/KDCA:9:US/observations/historical.json?apiKey={api_key}&units=e&startDate={start_date}&endDate={end_date}"
model = tf.keras.models.load_model('/app/model', compile=False)
scaler = pickle.load(open('./model/scaler.pkl','rb'))
cols_to_scale = ["pressure", "wspd","heat_index","dewPt", "rh", "vis", "wc", "wdir_degree", "clds_ordinal",
"day_sin", "day_cos", "year_sin", "year_cos", "wdir_sin", "wdir_cos"]
def get_NaN_counts(df):
nan_counts = df.isna().sum()
return pd.concat([nan_counts, ((nan_counts/len(df))*100).round(2)],
axis=1,
keys=["NaN count", "Percentage"])
def clds_to_ordinal(row):
mapping = {
"SKC": 0,
"CLR": 0,
"FEW": 1,
"SCT": 2,
"BKN": 3,
"OVC": 4,
"VV": 5
}
clds = row["clds"]
if pd.isnull(clds):
return np.NaN
return mapping[clds]
def clean_wspd(row):
if row["wdir_cardinal"] == "CALM":
return 0
return row["wspd"]
def restrict_wspd(row):
if row["wspd"] < 0:
return 0
return row["wspd"]
def restrict_rh(row):
if row["rh"] < 0:
return 0
if row["rh"] > 100:
return 100
return row["rh"]
def clean_wdir(row):
if row["wdir_cardinal"] == "CALM":
return 0
return row["wdir"]
def wdir_cardinal_to_deg(row):
wdir = row["wdir"]
if not pd.isnull(wdir):
return wdir
cardinal_directions = {
'N': 0,
'NNE': 22.5,
'NE': 45,
'ENE': 67.5,
'E': 90,
'ESE': 112.5,
'SE': 135,
'SSE': 157.5,
'S': 180,
'SSW': 202.5,
'SW': 225,
'WSW': 247.5,
'W': 270,
'WNW': 292.5,
'NW': 315,
'NNW': 337.5,
'CALM': 0,
'VAR': -1
}
wdir_cardinal = row["wdir_cardinal"]
return cardinal_directions[wdir_cardinal] if wdir_cardinal in cardinal_directions else np.NaN
def prepare_dataframe(_df, start_timestamp, end_timestamp):
dates_df = pd.DataFrame()
dates_df["obs_timestamp"] = pd.date_range(start_timestamp, end_timestamp, freq="H")
_df = dates_df.merge(_df, how='left', on='obs_timestamp')
_df = _df.astype(
{
'temp': 'float',
'pressure': 'float',
'wspd': 'float',
'heat_index': 'float'
},
)
_df["wdir_cardinal"].fillna(method="bfill", inplace=True)
_df["wdir_degree"] = _df.apply(wdir_cardinal_to_deg, axis=1)
_df["clds_ordinal"] = _df.apply(clds_to_ordinal, axis=1)
_df["temp"].interpolate("polynomial", order=2, inplace=True)
_df["pressure"].interpolate("polynomial", order=2, inplace=True)
_df["heat_index"].interpolate("polynomial", order=2, inplace=True)
_df["wdir"].fillna(method="bfill", inplace=True)
_df["wdir"] = _df.apply(clean_wdir, axis=1)
_df["wspd"] = _df.apply(clean_wspd, axis=1)
_df["wspd"].interpolate("polynomial", order=2, inplace=True)
_df["wspd"] = _df.apply(restrict_wspd, axis=1)
_df["clds"].fillna(method="bfill", inplace=True)
_df["clds_ordinal"].interpolate("linear", inplace=True)
_df["dewPt"].interpolate("polynomial", order=2, inplace=True)
_df["rh"].interpolate("polynomial", order=2, inplace=True)
_df["rh"] = _df.apply(restrict_rh, axis=1)
_df["wc"].interpolate("polynomial", order=2, inplace=True)
_df["vis"].fillna(method="bfill", inplace=True)
_df.drop(["wdir", "wdir_cardinal", "clds"], axis=1, inplace=True)
_df = _df.dropna()
_df = _df.sort_values(by=['obs_timestamp'])
date_time = _df.pop('obs_timestamp')
timestamp_s = date_time.map(pd.Timestamp.timestamp)
day = 24*60*60
year = (365.2425)*day
_df['day_sin'] = np.sin(timestamp_s * (2 * np.pi / day))
_df['day_cos'] = np.cos(timestamp_s * (2 * np.pi / day))
_df['year_sin'] = np.sin(timestamp_s * (2 * np.pi / year))
_df['year_cos'] = np.cos(timestamp_s * (2 * np.pi / year))
_df['wdir_sin'] = np.sin(_df["wdir_degree"])
_df['wdir_cos'] = np.cos(_df["wdir_degree"])
return _df, date_time
def map_data_to_dataframe(data, target_date):
end_timestamp = target_date - timedelta(minutes=8)
start_timestamp = end_timestamp - timedelta(days=8) + timedelta(hours=1)
df = pd.read_json(json.dumps(data))
df["obs_timestamp"] = df.apply(lambda x: datetime.fromtimestamp(x["valid_time_gmt"]).strftime(DATE_FORMAT), axis=1)
df = df.astype({'obs_timestamp': 'datetime64[ns]'})
initial_cols = ["temp", "obs_timestamp", "pressure", "wspd", "heat_index", "dewPt", "rh", "vis", "wc", "wdir", "wdir_cardinal", "clds" ]
df = df[initial_cols]
df, _ = prepare_dataframe(df, start_timestamp.strftime(DATE_FORMAT), end_timestamp.strftime(DATE_FORMAT))
return df
def map_to_timestamp(predictions, target_date):
start = target_date + timedelta(hours=1)
end = start + timedelta(hours=23)
target_hours = [x.to_pydatetime().strftime(DATE_FORMAT) for x in pd.date_range(start, end, freq="H")]
return { h: predictions[idx] for idx, h in enumerate(target_hours)}
def predict(df):
predict_df = df[-168:]
predict_df_features = predict_df[cols_to_scale]
predict_df_features = scaler.transform(predict_df_features.values)
predict_df[cols_to_scale] = predict_df_features
predictions = model(predict_df.to_numpy().reshape(1, 168, 16))
return predictions
def predict_for_date(target_date):
date_format = "%Y%m%d"
start_date = target_date - timedelta(days=9)
res = session.get(BASE_URL.format(api_key=API_KEY, start_date=start_date.strftime(date_format), end_date=target_date.strftime(date_format)))
data = res.json()
df = map_data_to_dataframe(data["observations"], target_date)
predictions = predict(df)
flattened = list(map(lambda x: math.floor(x), predictions.numpy().flatten().tolist()))
return map_to_timestamp(flattened, target_date)
def get_actual_temperatures(target_date):
date_format = "%Y%m%d"
start_date = target_date - timedelta(days=1) #Because api uses utc
end_date = target_date + timedelta(days=1)
start_date_str = (start_date - timedelta(days=1)).strftime(date_format)
end_date_str = end_date.strftime(date_format)
today = datetime.today().astimezone(pytz.timezone("America/New_York")).date()
req_url = BASE_URL.format(api_key=API_KEY, start_date=start_date_str, end_date=end_date_str)
if target_date.date() < today:
res = session.get(req_url)
else:
res = requests.get(req_url)
start_timestamp = target_date + timedelta(minutes=52)
end_timestamp = end_date + timedelta(days=1) - timedelta(minutes=8)
data = res.json()
df = pd.read_json(json.dumps(data["observations"]))
df["obs_timestamp"] = df.apply(lambda x: datetime.fromtimestamp(x["valid_time_gmt"]).astimezone(pytz.timezone("America/New_York")).strftime(DATE_FORMAT), axis=1)
df = df.astype({'obs_timestamp': 'datetime64[ns]'})
initial_cols = ["temp", "obs_timestamp"]
df = df[initial_cols]
dates_df = pd.DataFrame()
dates_df["obs_timestamp"] = pd.date_range(start_timestamp, end_timestamp, freq="H")
df = dates_df.merge(df, how='left', on='obs_timestamp')
df["obs_timestamp"] = df.apply(lambda x: (x["obs_timestamp"] + timedelta(minutes=8)).strftime(DATE_FORMAT), axis=1)
dicts = df.to_dict("records")
reduced = { k["obs_timestamp"]: k["temp"] for k in dicts}
for k in reduced:
if np.isnan(reduced[k]):
reduced[k] = None
return reduced
@app.route("/predictions")
@cross_origin()
def get_predictions():
today = datetime.today().astimezone(pytz.timezone("America/New_York")).date()
target_date = datetime.strptime(request.args["target_date"], "%Y-%m-%d")
app.logger.info(today)
app.logger.info(target_date)
# target_dates = list(filter(lambda x: x < today, [x.to_pydatetime() for x in pd.date_range(start_date, end_date, freq="D").to_list()]))
predictions = predict_for_date(target_date)
actual_temp = get_actual_temperatures(target_date) if target_date.date() <= today else None
merged = { k: {"predicted": predictions[k], "actual": actual_temp[k] if actual_temp else None} for k in predictions}
response = app.response_class(response=json.dumps(merged),
status=200,
mimetype='application/json')
return response