Spaces:
Running
Running
import gradio as gr | |
import requests | |
import folium | |
from datetime import datetime, timedelta | |
import tempfile | |
import os | |
from PIL import Image, ImageDraw, ImageFont | |
import io | |
import boto3 | |
import botocore | |
# Montana Mountain Peaks coordinates (for map and quick access buttons) | |
MONTANA_PEAKS = { | |
"Lone Peak (Big Sky)": (45.27806, -111.45028), | |
"Sacajawea Peak": (45.89583, -110.96861), | |
"Pioneer Mountain": (45.231835, -111.450505) | |
} | |
def get_nexrad_file(station, product="N0S", hours_back=6): | |
""" | |
Connects to the NOAA NEXRAD Level-II S3 bucket (noaa-nexrad-level2) and | |
retrieves the latest file (within the past `hours_back` hours) for the given | |
station and product code. The files are stored under the path: | |
{station}/{YYYY}/{MM}/{DD}/ | |
and have filenames like: | |
{station}_{YYYYMMDD}_{HHMM}_{product}.gz | |
Returns a tuple of (local_file_path, s3_key) or (None, "") if no file was found. | |
""" | |
s3 = boto3.client('s3') | |
bucket = "noaa-nexrad-level2" | |
now = datetime.utcnow() - timedelta(minutes=20) # allow for delay | |
start_time = now - timedelta(hours=hours_back) | |
files = [] | |
# Check each hour in the time window | |
for i in range(hours_back + 1): | |
dt = start_time + timedelta(hours=i) | |
prefix = f"{station}/{dt.strftime('%Y')}/{dt.strftime('%m')}/{dt.strftime('%d')}/" | |
try: | |
resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) | |
except botocore.exceptions.ClientError as e: | |
continue | |
if "Contents" in resp: | |
for obj in resp["Contents"]: | |
key = obj["Key"] | |
# Look for the desired product code in the filename. | |
# Typical filename: KMSX_20250221_1320_N0S.gz (or similar) | |
if f"_{product}." in key: | |
try: | |
parts = key.split("_") | |
if len(parts) >= 3: | |
# Combine the date and time parts | |
timestamp_str = parts[1] # YYYYMMDD | |
time_str = parts[2] # HHMM (might include additional info if not split by underscore) | |
# Remove any suffix from time_str (e.g. if it ends with extra letters) | |
time_str = ''.join(filter(str.isdigit, time_str)) | |
file_dt = datetime.strptime(timestamp_str + time_str, "%Y%m%d%H%M") | |
if start_time <= file_dt <= now: | |
files.append((file_dt, key)) | |
except Exception as e: | |
continue | |
if not files: | |
return None, "" | |
# Sort descending by file timestamp and choose the latest file | |
files.sort(key=lambda x: x[0], reverse=True) | |
latest_file_key = files[0][1] | |
# Download the file to a temporary location | |
tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".gz") | |
s3.download_file(bucket, latest_file_key, tmp_file.name) | |
return tmp_file.name, latest_file_key | |
def get_noaa_forecast(lat, lon): | |
"""Get NOAA text forecast using the points API.""" | |
try: | |
points_url = f"https://api.weather.gov/points/{lat},{lon}" | |
response = requests.get(points_url, timeout=10) | |
forecast_url = response.json()['properties']['forecast'] | |
forecast = requests.get(forecast_url, timeout=10).json() | |
text = "Weather Forecast:\n\n" | |
for period in forecast['properties']['periods']: | |
text += f"{period['name']}:\n" | |
text += f"Temperature: {period['temperature']}°{period['temperatureUnit']}\n" | |
text += f"Wind: {period['windSpeed']} {period['windDirection']}\n" | |
text += f"{period['detailedForecast']}\n\n" | |
if any(word in period['detailedForecast'].lower() for word in | |
['snow', 'flurries', 'wintry mix', 'blizzard']): | |
text += "⚠️ SNOW EVENT PREDICTED ⚠️\n\n" | |
return text | |
except Exception as e: | |
return f"Error getting forecast: {str(e)}" | |
def get_forecast_products(lat, lon): | |
"""Download and process various forecast product images.""" | |
gallery_data = [] | |
timestamp = datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC') | |
products = [ | |
("MaxT1_conus.png", "Maximum Temperature"), | |
("MinT1_conus.png", "Minimum Temperature"), | |
("QPF06_conus.png", "6-Hour Precipitation"), | |
("QPF12_conus.png", "12-Hour Precipitation"), | |
("QPF24_conus.png", "24-Hour Precipitation"), | |
("Snow1_conus.png", "Snowfall Amount"), | |
("Snow2_conus.png", "Snowfall Day 2"), | |
("Wx1_conus.png", "Weather Type"), | |
] | |
base_url = "https://graphical.weather.gov/images/conus" | |
for filename, title in products: | |
try: | |
url = f"{base_url}/{filename}" | |
response = requests.get(url, timeout=10) | |
if response.status_code == 200: | |
img = Image.open(io.BytesIO(response.content)).convert('RGB') | |
img = crop_to_region(img, lat, lon) | |
draw = ImageDraw.Draw(img) | |
text = f"{title}\n{timestamp}" | |
draw_text_with_outline(draw, text, (10, 10)) | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as tmp: | |
img.save(tmp.name) | |
gallery_data.append(tmp.name) | |
except Exception as e: | |
continue | |
return gallery_data | |
def crop_to_region(img, lat, lon, zoom=1.5): | |
"""Crop image to focus on selected region.""" | |
img_width, img_height = img.size | |
lat_min, lat_max = 25.0, 50.0 | |
lon_min, lon_max = -125.0, -65.0 | |
x = (lon - lon_min) / (lon_max - lon_min) * img_width | |
y = (lat_max - lat) / (lat_max - lat_min) * img_height | |
crop_width = img_width / zoom | |
crop_height = img_height / zoom | |
x1 = max(0, x - crop_width / 2) | |
y1 = max(0, y - crop_height / 2) | |
x2 = min(img_width, x + crop_width / 2) | |
y2 = min(img_height, y + crop_height / 2) | |
if x1 < 0: | |
x2 -= x1 | |
x1 = 0 | |
if y1 < 0: | |
y2 -= y1 | |
y1 = 0 | |
if x2 > img_width: | |
x1 -= (x2 - img_width) | |
x2 = img_width | |
if y2 > img_height: | |
y1 -= (y2 - img_height) | |
y2 = img_height | |
cropped = img.crop((x1, y1, x2, y2)) | |
return cropped.resize((img_width, img_height), Image.Resampling.LANCZOS) | |
def draw_text_with_outline(draw, text, pos, font_size=20, center=False): | |
"""Draw text with an outline for better visibility.""" | |
try: | |
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", font_size) | |
except: | |
font = ImageFont.load_default() | |
x, y = pos | |
if center: | |
bbox = draw.textbbox((0, 0), text, font=font) | |
text_width = bbox[2] - bbox[0] | |
text_height = bbox[3] - bbox[1] | |
x = x - text_width // 2 | |
y = y - text_height // 2 | |
for dx, dy in [(-1, -1), (-1, 1), (1, -1), (1, 1)]: | |
draw.text((x + dx, y + dy), text, fill='black', font=font) | |
draw.text((x, y), text, fill='white', font=font) | |
def get_map(lat, lon): | |
"""Create a folium map centered on the coordinates with markers for Montana peaks.""" | |
m = folium.Map(location=[lat, lon], zoom_start=9) | |
for peak_name, coords in MONTANA_PEAKS.items(): | |
folium.Marker( | |
coords, | |
popup=f"{peak_name}<br>Lat: {coords[0]:.4f}, Lon: {coords[1]:.4f}", | |
tooltip=peak_name | |
).add_to(m) | |
if (lat, lon) not in MONTANA_PEAKS.values(): | |
folium.Marker([lat, lon], popup=f"Selected Location<br>Lat: {lat:.4f}, Lon: {lon:.4f}").add_to(m) | |
m.add_child(folium.ClickForLatLng()) | |
return m._repr_html_() | |
def update_weather(lat, lon, station, product): | |
"""Update weather info and retrieve raw radar data from AWS.""" | |
try: | |
lat = float(lat) | |
lon = float(lon) | |
if not (-90 <= lat <= 90 and -180 <= lon <= 180): | |
return "Invalid coordinates", [], "No radar data", get_map(45.5, -111.0), "" | |
forecast_text = get_noaa_forecast(lat, lon) | |
# Retrieve raw radar data file from AWS (returns local file path and S3 key) | |
radar_file_path, radar_key = get_nexrad_file(station, product, hours_back=6) | |
# Get forecast product images | |
forecast_frames = get_forecast_products(lat, lon) | |
gallery_data = forecast_frames | |
map_html = get_map(lat, lon) | |
return forecast_text, gallery_data, radar_file_path, map_html, radar_key | |
except Exception as e: | |
return f"Error: {str(e)}", [], "No radar data", get_map(45.5, -111.0), "" | |
with gr.Blocks(title="Montana Mountain Weather") as demo: | |
gr.Markdown("# Montana Mountain Weather") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
lat_input = gr.Number(label="Latitude", value=45.5, minimum=-90, maximum=90) | |
lon_input = gr.Number(label="Longitude", value=-111.0, minimum=-180, maximum=180) | |
station_input = gr.Textbox(label="Radar Station ID", value="KMSX") | |
product_input = gr.Textbox(label="Radar Product Code", value="N0S") | |
gr.Markdown("### Quick Access - Montana Peaks") | |
peak_buttons = [] | |
for peak_name in MONTANA_PEAKS: | |
peak_buttons.append(gr.Button(f"📍 {peak_name}")) | |
submit_btn = gr.Button("Get Weather", variant="primary") | |
with gr.Column(scale=2): | |
map_display = gr.HTML(get_map(45.5, -111.0)) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
forecast_output = gr.Textbox(label="Weather Forecast", lines=12, | |
placeholder="Select a location to see the forecast...") | |
with gr.Column(scale=2): | |
radar_data_output = gr.Textbox(label="Raw Radar Data File Path", | |
placeholder="Radar data file path will appear here") | |
with gr.Row(): | |
forecast_gallery = gr.Gallery(label="Forecast Products", show_label=True, | |
columns=4, height=600, object_fit="contain") | |
radar_key_output = gr.Textbox(label="S3 Key for Radar Data", | |
placeholder="S3 key will appear here") | |
submit_btn.click( | |
fn=update_weather, | |
inputs=[lat_input, lon_input, station_input, product_input], | |
outputs=[forecast_output, forecast_gallery, radar_data_output, map_display, radar_key_output] | |
) | |
for i, peak_name in enumerate(MONTANA_PEAKS.keys()): | |
peak_buttons[i].click( | |
fn=lambda name=peak_name: MONTANA_PEAKS[name], | |
inputs=[], | |
outputs=[lat_input, lon_input] | |
).then( | |
fn=update_weather, | |
inputs=[lat_input, lon_input, station_input, product_input], | |
outputs=[forecast_output, forecast_gallery, radar_data_output, map_display, radar_key_output] | |
) | |
gr.Markdown(""" | |
## Instructions | |
1. Use the quick access buttons to check specific Montana peaks. | |
2. Or enter coordinates manually (or click on the map). | |
3. Enter the Radar Station ID (e.g., KMSX) and Radar Product Code (e.g., N0S). | |
4. Click "Get Weather" to see the forecast, forecast product images, and to download the latest raw radar data file from NOAA’s AWS bucket. | |
**Montana Peaks Included:** | |
- Lone Peak (Big Sky): 45°16′41″N 111°27′01″W | |
- Sacajawea Peak: 45°53′45″N 110°58′7″W | |
- Pioneer Mountain: 45°13′55″N 111°27′2″W | |
**Radar Data:** | |
- This app now retrieves raw NEXRAD Level‑II data (e.g. the “N0S” product) from NOAA’s AWS S3 bucket. | |
- You can process this raw file with external tools (like Py‑ART) to generate images. | |
**Forecast Products:** | |
- Temperature (Max/Min) | |
- Precipitation (6/12/24 Hour) | |
- Snowfall Amount | |
- Weather Type | |
**Note:** NOAA’s raw radar data is available via AWS and covers nearly all U.S. radars. For global coverage, you may need to explore additional sources. | |
""") | |
demo.queue().launch() |