mirix's picture
Update app.py
a8154a6 verified
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import os
import srtm
elevation_data = srtm.get_data()
import json
import geopy
from geopy import distance
from beaufort_scale.beaufort_scale import beaufort_scale_kmh
import requests
import requests_cache
import openmeteo_requests
from retry_requests import retry
from dash import Dash, dcc, html, Input, Output, callback, no_update
from dash_extensions import Purify
import plotly.graph_objects as go
### UPDATE PEAK LIST ###
lat = 49.610755
lon = 6.13268
ele = 310
dist = 100
overpass_url = 'https://overpass.private.coffee/api/interpreter'
def add_ele(row):
if str(int(round(row['altitude'], 0))).isnumeric():
row['altitude'] = row['altitude']
else:
row['altitude'] = elevation_data.get_elevation(row['latitude'], row['longitude'], 0)
return row
def eukarney(lat1, lon1):
p1 = (lat1, lon1)
p2 = (lat, lon)
karney = distance.distance(p1, p2).m
return karney
def compute_bbox(lat, lon, dist):
bearings = [225, 45]
origin = geopy.Point(lat, lon)
l = []
for bearing in bearings:
destination = distance.distance(dist).destination(origin, bearing)
coords = destination.latitude, destination.longitude
l.extend(coords)
return l
bbox = compute_bbox(lat, lon, dist)
bbox = ','.join(str(x) for x in compute_bbox(lat, lon, dist))
peak_list = 'peak_list.csv'
def update_peaks():
overpass_query = '[out:json];(nwr[natural=peak](' + bbox + ');nwr[natural=hill](' + bbox + '););out body;'
response = requests.get(overpass_url, params={'data': overpass_query})
response = response.json()
peak_dict = {'name': [], 'latitude': [], 'longitude': [], 'altitude': []}
for e in response['elements']:
peak_dict['latitude'].append(float(e['lat']))
peak_dict['longitude'].append(float(e['lon']))
if 'name' in e['tags'].keys():
peak_dict['name'].append(e['tags']['name'])
else:
peak_dict['name'].append('Unnamed hill')
if 'ele' in e['tags'].keys():
peak_dict['altitude'].append(float(e['tags']['ele']))
else:
peak_dict['altitude'].append(elevation_data.get_elevation(e['lat'], e['lon'], 0))
df = pd.DataFrame.from_dict(peak_dict)
df = df.apply(lambda x: add_ele(x), axis=1)
df['distances'] = df.apply(lambda x: eukarney(x['latitude'], x['longitude']), axis=1).fillna(0)
df['altitude'] = df['altitude'].round(0).astype(int)
df.to_csv(peak_list, index=False)
return df
### WEATHER FORECAST ###
cache_session = requests_cache.CachedSession('.cache', expire_after = 3600)
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
openmeteo = openmeteo_requests.Client(session = retry_session)
# Open Meteo weather forecast API
url = 'https://api.open-meteo.com/v1/forecast'
params = {
'timezone': 'auto',
'hourly': ['temperature_2m', 'is_day', 'rain', 'weather_code', 'wind_speed_10m', 'snow_depth']
}
# Load the JSON files mapping weather codes to descriptions and icons
with open('weather_icons_custom.json', 'r') as file:
icons = json.load(file)
# Weather icons URL
icon_url = 'https://raw.githubusercontent.com/basmilius/weather-icons/refs/heads/dev/production/fill/svg/'
def map_icons(df):
code = df['weather_code']
if df['is_day'] == 1:
icon = icons[str(code)]['day']['icon']
description = icons[str(code)]['day']['description']
elif df['is_day'] == 0:
icon = icons[str(code)]['night']['icon']
description = icons[str(code)]['night']['description']
df['Weather'] = icon_url + icon
df['Weather outline'] = description
return df
# Quantitative pluviometry to natural language
def rain_intensity(precipt):
if precipt >= 50:
rain = 'Extreme rain'
elif 50 < precipt <= 16:
rain = 'Very heavy rain'
elif 4 <= precipt < 16:
rain = 'Heavy rain'
elif 1 <= precipt < 4:
rain = 'Moderate rain'
elif 0.25 <= precipt < 1:
rain = 'Light rain'
elif 0 < precipt < 0.25:
rain = 'Light drizzle'
else:
rain = 'No rain / No info'
return rain
# Obtain the weather forecast for each waypoint at each specific time
def get_weather(df):
params['latitude'] = df['latitude']
params['longitude'] = df['longitude']
params['elevation'] = df['altitude']
now = datetime.now()
start_period = (now - timedelta(seconds=3600)).strftime('%Y-%m-%dT%H:%M')
end_period = now.strftime('%Y-%m-%dT%H:%M')
params['start_hour'] = start_period
params['end_hour'] = end_period
responses = openmeteo.weather_api(url, params=params)
# Process first location. Add a for-loop for multiple locations or weather models
response = responses[0]
# Process hourly data. The order of variables needs to be the same as requested.
# currently = response.Current()
hourly = response.Hourly()
minutely_temperature_2m = hourly.Variables(0).ValuesAsNumpy()[0]
is_day = hourly.Variables(1).ValuesAsNumpy()[0]
rain = hourly.Variables(2).ValuesAsNumpy()[0]
weather_code = hourly.Variables(3).ValuesAsNumpy()[0]
minutely_wind_speed_10m = hourly.Variables(4).ValuesAsNumpy()[0]
snow_depth = hourly.Variables(5).ValuesAsNumpy()[0]
df['Temp (°C)'] = minutely_temperature_2m
df['weather_code'] = weather_code
df['is_day'] = is_day
v_rain_intensity = np.vectorize(rain_intensity)
df['Rain level'] = v_rain_intensity(rain)
v_beaufort_scale_kmh = np.vectorize(beaufort_scale_kmh)
df['Wind level'] = v_beaufort_scale_kmh(minutely_wind_speed_10m, language='en')
df['Rain (mm/h)'] = rain
df['Wind (km/h)'] = minutely_wind_speed_10m
df['Snow depth (cm)'] = (snow_depth * 100).round(1)
return df
def format_peaks():
if not os.path.isfile(peak_list):
update_peaks()
today = datetime.today()
modified_date = datetime.fromtimestamp(os.path.getmtime(peak_list))
peak_age = today - modified_date
if peak_age.days > 30:
update_peaks()
df = pd.read_csv(peak_list)
df = df[df['altitude']>=df['altitude'].quantile(3/4)].copy()
df = df.sort_values(by='distances',ascending=True).reset_index(drop=True)
df = df.head(600).copy()
df = df.apply(lambda x: get_weather(x), axis=1)
df['Temp (°C)'] = df['Temp (°C)'].round(0).astype(int).astype(str) + '°C'
df['Wind (km/h)'] = df['Wind (km/h)'].round(1).astype(str).replace('0.0', '')
df['Rain (mm/h)'] = df['Rain (mm/h)'].round(1).astype(str).replace('0.0', '')
df['distances'] = (df['distances'] / 1000).round(1).astype(str) + ' km'
df['Snow depth (cm)'] = df['Snow depth (cm)'].astype(str) + ' cm'
df['altitude'] = df['altitude'].astype(str) + ' m'
df['is_day'] = df['is_day'].astype(int)
df['weather_code'] = df['weather_code'].astype(int)
df = df.apply(map_icons, axis=1)
df['Rain level'] = df['Rain level'].astype(str)
df['Wind level'] = df['Wind level'].astype(str)
df = df.rename(columns={'distances': 'Distance (km)'})
df['dist_read'] = ('<p style="font-family:sans; font-size:12px;">' +
df['name'] + '<br>' +
df['altitude'] + ' | ' + df['Distance (km)'] + '<br><br>' +
'Snow: ' + df['Snow depth (cm)'] + '<br><br>' +
'<b>' + df['Weather outline'] + '</b><br><br>' +
df['Temp (°C)'] + '<br><br>' +
df['Rain level'] + '<br>' +
df['Wind level'])
df = df[(df['Snow depth (cm)'] != '0.0 cm') | (df['Weather outline'].str.lower().str.contains('snow'))].copy()
return df
def snow_color(row):
if row['Snow depth (cm)'] == '0.0 cm':
row['snow_colour'] = 'goldenrod'
else:
row['snow_colour'] = 'aqua'
return row
def plot_fig():
global df
lat_centre = 49.8464
lon_centre = 6.0992
df = format_peaks()
df['snow_colour'] = ''
df = df.apply(lambda row: snow_color(row), axis=1)
fig = go.Figure()
fig.add_trace(go.Scattermap(lon=df['longitude'],
lat=df['latitude'],
mode='markers', marker=dict(size=24, color=df['snow_colour'], opacity=0.8, symbol='circle'),
name='circles'))
fig.add_trace(go.Scattermap(lon=df['longitude'],
lat=df['latitude'],
mode='markers', marker=dict(size=8, opacity=1, symbol='mountain'),
name='peaks'))
fig.update_layout(map_style='open-street-map',
map=dict(center=dict(lat=lat_centre, lon=lon_centre), zoom=8))
fig.update_traces(showlegend=False, hoverinfo='none', hovertemplate=None, selector=({'name': 'circles'}))
fig.update_traces(showlegend=False, hoverinfo='none', hovertemplate=None, selector=({'name': 'peaks'}))
return fig
app = Dash(__name__)
server = app.server
fig = plot_fig()
def serve_layout():
layout = html.Div([
html.Div([dcc.Graph(id='base-figure', figure=fig, clear_on_unhover=True, style={'height': '99vh'})], id='base-figure-div'),
dcc.Tooltip(id='figure-tooltip'),
dcc.Interval(
id='interval-component',
interval=60 * 60 * 1000,
n_intervals=0),
], id='layout-content')
return layout
app.layout = serve_layout
@callback(Output('layout-content', 'children'),
Output('base-figure', 'figure'),
Input('interval-component', 'n_intervals'))
def refresh_layout(n):
global fig
fig = plot_fig()
layout = serve_layout()
return fig, layout
@callback(Output('figure-tooltip', 'show'),
Output('figure-tooltip', 'bbox'),
Output('figure-tooltip', 'children'),
Input('base-figure', 'hoverData'))
def display_hover(hoverData):
if hoverData is None:
return False, no_update, no_update
pt = hoverData['points'][0]
bbox = pt['bbox']
num = pt['pointNumber']
df_row = df.iloc[num].copy()
img_src = df_row['Weather']
txt_src = df_row['dist_read']
children = [html.Div([html.Img(src=img_src, style={'width': '100%'}), Purify(txt_src),],
style={'width': '96px', 'white-space': 'normal'})]
return True, bbox, children
if __name__ == '__main__':
app.run(debug=False, host='0.0.0.0', port=7860)