Spaces:
Runtime error
Runtime error
import streamlit as st | |
import requests | |
from datetime import datetime, timedelta | |
from pytz import timezone | |
from io import BytesIO | |
import time | |
import folium | |
import base64 | |
import pandas as pd | |
import numpy as np | |
from PIL import Image, ImageFilter, ImageEnhance | |
st.set_page_config(layout="wide", page_title="Rainfall Data Dashboard") | |
HONG_KONG_TZ = timezone('Asia/Hong_Kong') | |
RADAR_BASE_URL = "https://www.hko.gov.hk/wxinfo/radars/rad_064_png/2d064nradar_{}.jpg" | |
API_URL = "https://data.weather.gov.hk/weatherAPI/opendata/weather.php?dataType=rhrread&lang=en" | |
COLORS_TO_EXTRACT = [ | |
"#ed00f0", "#c3006a", "#dc0201", "#f00000", "#ed8202", | |
"#eeb000", "#fada04", "#e1cf00", "#8fff00", "#01f908", | |
"#01f808", "#00d002", "#01a835", "#008448", "#3b96ff", | |
"#008ff5", "#00c8fb" | |
] | |
COLORS_TO_EXTRACT_RGB = [tuple(int(color[i:i+2], 16) for i in (1, 3, 5)) for color in COLORS_TO_EXTRACT] | |
def get_nearest_6_minute_interval(time): | |
return time.replace(minute=(time.minute // 6) * 6, second=0, microsecond=0) | |
def get_backward_6_minute_intervals(current_time, hours=3): | |
intervals = [] | |
interval_time = get_nearest_6_minute_interval(current_time) | |
end_time = current_time - timedelta(hours=hours) | |
while interval_time >= end_time: | |
intervals.append(interval_time) | |
interval_time -= timedelta(minutes=6) | |
return intervals | |
def fetch_radar_image(timestamp): | |
url = RADAR_BASE_URL.format(timestamp.strftime('%Y%m%d%H%M')) | |
response = requests.get(url) | |
return Image.open(BytesIO(response.content)) if response.status_code == 200 else None | |
def fetch_radar_image_with_rollback(timestamp): | |
for i in range(31): # 30 steps of 6 minutes = 3 hours | |
image = fetch_radar_image(timestamp - timedelta(minutes=6 * i)) | |
if image: | |
return image, timestamp - timedelta(minutes=6 * i) | |
return None, None | |
def image_to_base64(image): | |
buffered = BytesIO() | |
image.save(buffered, format="PNG") | |
return base64.b64encode(buffered.getvalue()).decode() | |
def extract_color_pixels(img_array, colors, tolerance=30): | |
return np.any([np.all(np.abs(img_array - color) <= tolerance, axis=-1) for color in colors], axis=0) | |
def filter_image_by_color(image, colors_to_extract_rgb): | |
img_array = np.array(image.convert("RGBA")) | |
color_mask = extract_color_pixels(img_array[..., :3], colors_to_extract_rgb) | |
img_array[~color_mask] = [255, 255, 255, 0] | |
return Image.fromarray(img_array) | |
def smooth_image(image): | |
return image.filter(ImageFilter.GaussianBlur(radius=1)) | |
def enhance_contrast(image, factor=1.5): | |
enhancer = ImageEnhance.Contrast(image) | |
enhanced_image = enhancer.enhance(factor) | |
return enhanced_image | |
def create_map_with_radar_tile(image): | |
filtered_image = filter_image_by_color(image, COLORS_TO_EXTRACT_RGB) | |
smoothed_image = smooth_image(filtered_image) | |
enhanced_image = enhance_contrast(smoothed_image, factor=1.5) | |
m = folium.Map(location=[22.364, 114.148], zoom_start=10, min_zoom=10, max_zoom=19, | |
tiles='https://mapapi.geodata.gov.hk/gs/api/v1.0.0/xyz/imagery/wgs84/{z}/{x}/{y}.png', | |
attr="Map information from Lands Department", control_scale=True, name="Basemap") | |
folium.TileLayer( | |
tiles='https://mapapi.geodata.gov.hk/gs/api/v1.0.0/xyz/label/hk/en/wgs84/{z}/{x}/{y}.png', | |
attr="Map information from Lands Department", | |
overlay=True, | |
name="Labels" | |
).add_to(m) | |
img_url = f"data:image/png;base64,{image_to_base64(enhanced_image)}" | |
folium.raster_layers.ImageOverlay( | |
image=img_url, | |
name="HKO Radar Image", | |
bounds=[[22.893, 113.538], [21.716, 115.362]], | |
opacity=0.95, | |
interactive=False, | |
cross_origin=False, | |
zindex=1, | |
).add_to(m) | |
folium.LayerControl().add_to(m) | |
return m._repr_html_() | |
def fetch_and_process_rainfall_data(): | |
response = requests.get(API_URL) | |
data = response.json() | |
df = pd.DataFrame(data['rainfall']['data']) | |
df['max'] = pd.to_numeric(df['max'], errors='coerce') | |
return df | |
# Main app | |
current_time_hkt = datetime.utcnow().replace(tzinfo=timezone('UTC')).astimezone(HONG_KONG_TZ) | |
time_intervals = get_backward_6_minute_intervals(current_time_hkt) | |
default_time = get_nearest_6_minute_interval(current_time_hkt) | |
col1, col2 = st.columns([2.2, 1]) | |
with col1: | |
st.subheader('Georeferenced Radar Image (64 km)') | |
slider = st.empty() | |
selected_time = slider.slider( | |
"Select Time:", | |
min_value=min(time_intervals), | |
max_value=max(time_intervals), | |
value=default_time, | |
format="YYYY-MM-DD HH:mm", | |
step=timedelta(minutes=6), | |
key="initial_time_slider" | |
) | |
map_placeholder = st.empty() | |
info_placeholder = st.empty() | |
cola1, cola2 = st.columns([1, 3]) | |
with cola1: | |
play = st.button("3-hour Sequence") | |
with cola2: | |
st.markdown(f""" | |
<style> | |
.color-bar {{ | |
height: 20px; | |
width: 100%; | |
background: linear-gradient(to right, {', '.join(COLORS_TO_EXTRACT)}); | |
}} | |
.color-labels {{ | |
display: flex; | |
justify-content: space-between; | |
font-size: 10px; | |
}} | |
</style> | |
<div class="color-labels">Rainfall rate (mm/h)</div> | |
<div class="color-bar"></div> | |
<div class="color-labels">{' '.join([f'<span>{label}</span>' for label in ['>300', '200-300', '150-200', '100-150', '75-100', '50-75', '30-50', '15-30', '10-15', '7-10', '5-7', '3-5', '2-3', '1-2', '0.50-1', '0.15-0.50']])}</div> | |
""", unsafe_allow_html=True) | |
if play: | |
for i, interval in enumerate(reversed(time_intervals)): | |
image, actual_time = fetch_radar_image_with_rollback(interval) | |
if image: | |
# Update slider with the actual time of the image | |
slider.slider( | |
"Select Time:", | |
min_value=min(time_intervals), | |
max_value=max(time_intervals), | |
value=actual_time, | |
format="YYYY-MM-DD HH:mm", | |
step=timedelta(minutes=6), | |
key=f"time_slider_{i}" | |
) | |
# Create and display the map | |
map_html = create_map_with_radar_tile(image) | |
map_placeholder.empty() | |
map_placeholder = st.components.v1.html(map_html,height=750) | |
if actual_time != interval: | |
info_placeholder.warning( | |
f"Showing the nearest available image from {actual_time.strftime('%Y-%m-%d %H:%M')}.") | |
else: | |
info_placeholder.empty() | |
time.sleep(0.01) | |
else: | |
info_placeholder.error(f"Could not fetch any radar image for {interval.strftime('%Y-%m-%d %H:%M')}") | |
else: | |
# Fetch the radar image with rollback for the selected time | |
image, actual_time = fetch_radar_image_with_rollback(selected_time) | |
if image: | |
# Create and display the map | |
map_html = create_map_with_radar_tile(image) | |
map_placeholder.empty() | |
map_placeholder = st.components.v1.html(map_html, height=750) | |
if actual_time != selected_time: | |
info_placeholder.warning( | |
f"Showing the nearest available image from {actual_time.strftime('%Y-%m-%d %H:%M')}.") | |
# Update slider to match the actual image time | |
slider.slider( | |
"Select Time:", | |
min_value=min(time_intervals), | |
max_value=max(time_intervals), | |
value=actual_time, | |
format="YYYY-MM-DD HH:mm", | |
step=timedelta(minutes=6), | |
key="adjusted_time_slider" | |
) | |
else: | |
info_placeholder.empty() | |
else: | |
info_placeholder.error( | |
f"Could not fetch any radar image within the last 3 hours of {selected_time.strftime('%Y-%m-%d %H:%M')}") | |
with col2: | |
df = fetch_and_process_rainfall_data() | |
areas_with_rainfall = df[df['max'] > 0]['place'].tolist() | |
areas_with_no_rainfall = df[df['max'] == 0]['place'].tolist() | |
st.caption('The following is the past hour rainfall from HKO Automatic Weather Station, updated hourly.') | |
col_1, col_2 = st.columns(2) | |
st.markdown( | |
""" | |
<style> | |
[data-testid="stMetricValue"] { | |
font-size: 26px; | |
} | |
</style> | |
""", | |
unsafe_allow_html=True, | |
) | |
with col_1: | |
st.metric("Average Rainfall", f"{df['max'].mean():.2f} mm") | |
st.metric("Maximum Rainfall", f"{df['max'].max()} mm") | |
with col_2: | |
st.metric("Areas with Rainfall", f"{len(areas_with_rainfall)}") | |
st.metric("Areas with No Rainfall", f"{len(areas_with_no_rainfall)}") | |
st.dataframe(df.sort_values(by='max', ascending=False)[['place', 'max']], use_container_width=True, height=480) | |
# JavaScript for auto-reloading every 5 minutes | |
st.markdown( | |
""" | |
<script> | |
function reloadPage() { | |
window.location.reload(); | |
} | |
setTimeout(reloadPage, 100000); | |
</script> | |
""", | |
unsafe_allow_html=True | |
) | |