import streamlit as st
import json
import ee
import os
import pandas as pd
import geopandas as gpd
from datetime import datetime
import leafmap.foliumap as leafmap
import time
import re
# Set up the page layout
st.set_page_config(layout="wide")
# Custom button styling
m = st.markdown(
"""
""",
unsafe_allow_html=True,
)
# Logo
st.write(
f"""
""",
unsafe_allow_html=True,
)
# Authenticate and initialize Earth Engine
earthengine_credentials = os.environ.get("EE_Authentication")
# Initialize Earth Engine with secret credentials
os.makedirs(os.path.expanduser("~/.config/earthengine/"), exist_ok=True)
with open(os.path.expanduser("~/.config/earthengine/credentials"), "w") as f:
f.write(earthengine_credentials)
ee.Initialize(project='ee-yashsacisro24')
# Load Sentinel dataset options from JSON file
with open("sentinel_datasets.json") as f:
data = json.load(f)
# Display the title and dataset selection
st.title("Sentinel Dataset")
# Select dataset category and subcategory (case-insensitive selection)
main_selection = st.selectbox("Select Sentinel Dataset Category", list(data.keys()))
if main_selection:
sub_options = data[main_selection]["sub_options"]
sub_selection = st.selectbox("Select Specific Dataset ID", list(sub_options.keys()))
# Earth Engine Index Calculator Section
st.header("Earth Engine Index Calculator")
# Choose Index or Custom Formula (case-insensitive)
index_choice = st.selectbox("Select an Index or Enter Custom Formula", ['NDVI', 'NDWI', 'Average NO₂', 'Custom Formula'])
# Initialize custom_formula variable
custom_formula = ""
# Display corresponding formula based on the index selected (case-insensitive)
if index_choice.lower() == 'ndvi':
st.write("Formula for NDVI: NDVI = (B8 - B4) / (B8 + B4)")
elif index_choice.lower() == 'ndwi':
st.write("Formula for NDWI: NDWI = (B3 - B8) / (B3 + B8)")
elif index_choice.lower() == 'average no₂':
st.write("Formula for Average NO₂: Average NO₂ = Mean(NO2 band)")
elif index_choice.lower() == 'custom formula':
custom_formula = st.text_input("Enter Custom Formula (e.g., 'B5 - B4 / B5 + B4')")
st.write(f"Custom Formula: {custom_formula}") # Display the custom formula after the user inputs it
# Function to check if the polygon geometry is valid and convert it to the correct format
def convert_to_ee_geometry(geometry):
# Ensure the polygon geometry is in the right format
if geometry.is_valid:
# Convert the geometry to GeoJSON format
geojson = geometry.__geo_interface__
# Convert to Earth Engine geometry
return ee.Geometry(geojson)
else:
raise ValueError("Invalid geometry: The polygon geometry is not valid.")
# Function to read points from CSV
def read_csv(file_path):
df = pd.read_csv(file_path)
return df
# Function to read points from GeoJSON
def read_geojson(file_path):
gdf = gpd.read_file(file_path)
return gdf
# Function to read points from KML
def read_kml(file_path):
gdf = gpd.read_file(file_path, driver='KML')
return gdf
# Ask user whether they want to process 'Point' or 'Polygon' data (case-insensitive)
shape_type = st.selectbox("Do you want to process 'Point' or 'Polygon' data?", ["Point", "Polygon"])
# Ask user to upload a file based on shape type (case-insensitive)
file_upload = st.file_uploader(f"Upload your {shape_type} data (CSV, GeoJSON, KML)", type=["csv", "geojson", "kml"])
# Date Input for Start and End Dates
start_date = st.date_input("Start Date", value=pd.to_datetime('2020-01-01'))
end_date = st.date_input("End Date", value=pd.to_datetime('2020-12-31'))
# Convert start_date and end_date to string format for Earth Engine
start_date_str = start_date.strftime('%Y-%m-%d')
end_date_str = end_date.strftime('%Y-%m-%d')
# Initialize session state for storing results if not already done
if 'results' not in st.session_state:
st.session_state.results = []
if 'last_params' not in st.session_state:
st.session_state.last_params = {}
if 'map_data' not in st.session_state:
st.session_state.map_data = None # Initialize map_data
if 'file_upload' in st.session_state:
st.session_state.file_upload = None
# Function to check if parameters have changed
def parameters_changed():
return (
st.session_state.last_params.get('main_selection') != main_selection or
st.session_state.last_params.get('sub_selection') != sub_selection or
st.session_state.last_params.get('index_choice') != index_choice or
st.session_state.last_params.get('start_date_str') != start_date_str or
st.session_state.last_params.get('end_date_str') != end_date_str
)
# If parameters have changed, reset the results
if parameters_changed():
st.session_state.results = [] # Clear the previous results
# Update the last parameters to the current ones
st.session_state.last_params = {
'main_selection': main_selection,
'sub_selection': sub_selection,
'index_choice': index_choice,
'start_date_str': start_date_str,
'end_date_str': end_date_str
}
# Function to perform index calculations
def calculate_ndvi(image, geometry):
ndvi = image.normalizedDifference(['B8', 'B4']).rename('NDVI')
result = ndvi.reduceRegion(
reducer=ee.Reducer.mean(),
geometry=geometry,
scale=30
)
return result.get('NDVI')
def calculate_ndwi(image, geometry):
ndwi = image.normalizedDifference(['B3', 'B8']).rename('NDWI')
result = ndwi.reduceRegion(
reducer=ee.Reducer.mean(),
geometry=geometry,
scale=30
)
return result.get('NDWI')
def calculate_avg_no2_sentinel5p(image, geometry):
no2 = image.select('NO2').reduceRegion(
reducer=ee.Reducer.mean(),
geometry=geometry,
scale=1000
).get('NO2')
return no2
def calculate_custom_formula(image, geometry, formula):
result = image.expression(formula).rename('Custom Index').reduceRegion(
reducer=ee.Reducer.mean(),
geometry=geometry,
scale=30
)
return result.get('Custom Index')
# Check if the file uploaded is different from the previous file uploaded
if 'file_upload' in st.session_state and st.session_state.file_upload != file_upload:
reset_session_state_for_new_file() # Reset session state for new file
# Process each point or polygon
if file_upload:
locations_df = None # Initialize locations_df to None
polygons_df = None # Initialize polygons_df to None
file_extension = os.path.splitext(file_upload.name)[1].lower() # Convert extension to lowercase
# Read file based on shape type (case-insensitive)
if shape_type.lower() == 'point':
if file_extension == '.csv':
locations_df = read_csv(file_upload)
elif file_extension == '.geojson':
locations_df = read_geojson(file_upload)
elif file_extension == '.kml':
locations_df = read_kml(file_upload)
else:
st.error("Unsupported file type. Please upload a CSV, GeoJSON, or KML file for points.")
elif shape_type.lower() == 'polygon':
if file_extension == '.geojson':
polygons_df = read_geojson(file_upload)
elif file_extension == '.kml':
polygons_df = read_kml(file_upload)
else:
st.error("Unsupported file type. Please upload a GeoJSON or KML file for polygons.")
# Check if locations_df is populated for points
if locations_df is not None:
# Display a preview of the points data
st.write("Preview of the uploaded points data:")
st.dataframe(locations_df.head())
# Create a LeafMap object to display the points
m = leafmap.Map(center=[locations_df['latitude'].mean(), locations_df['longitude'].mean()], zoom=10)
# Add points to the map using a loop
for _, row in locations_df.iterrows():
latitude = row['latitude']
longitude = row['longitude']
# Check if latitude or longitude are NaN and skip if they are
if pd.isna(latitude) or pd.isna(longitude):
continue # Skip this row and move to the next one
m.add_marker(location=[latitude, longitude], popup=row.get('name', 'No Name'))
# Display map
st.write("Map of Uploaded Points:")
m.to_streamlit()
# Store the map in session_state
st.session_state.map_data = m
# Process each point for index calculation
for idx, row in locations_df.iterrows():
latitude = row['latitude']
longitude = row['longitude']
location_name = row.get('name', f"Point_{idx}")
# Skip processing if latitude or longitude is NaN
if pd.isna(latitude) or pd.isna(longitude):
continue # Skip this row and move to the next one
# Define the region of interest (ROI)
roi = ee.Geometry.Point([longitude, latitude])
# Load Sentinel-2 image collection
collection = ee.ImageCollection(sub_options[sub_selection]) \
.filterDate(ee.Date(start_date_str), ee.Date(end_date_str)) \
.filterBounds(roi)
# Check if the collection has images for the selected date range
image_count = collection.size().getInfo()
if image_count == 0:
st.warning(f"No images found for {location_name}.")
else:
st.write(f"Found {image_count} images for {location_name}.")
image = collection.first()
# Perform the calculation based on user selection
result = None
if index_choice.lower() == 'ndvi':
result = calculate_ndvi(image, roi)
elif index_choice.lower() == 'ndwi':
result = calculate_ndwi(image, roi)
elif index_choice.lower() == 'average no₂':
if 'NO2' in image.bandNames().getInfo():
result = calculate_avg_no2_sentinel5p(image, roi)
else:
st.warning(f"No NO2 band found for {location_name}. Please use Sentinel-5P for NO₂ data.")
elif index_choice.lower() == 'custom formula' and custom_formula:
result = calculate_custom_formula(image, roi, custom_formula)
if result is not None:
# Only store the numeric value (not the dictionary structure)
calculated_value = result.getInfo() # Get the numeric value
# Store the result in session state
st.session_state.results.append({
'Location Name': location_name,
'Latitude': latitude,
'Longitude': longitude,
'Calculated Value': calculated_value
})
# Check if polygons_df is populated for polygons
if polygons_df is not None:
# Display a preview of the polygons data
st.write("Preview of the uploaded polygons data:")
st.dataframe(polygons_df.head())
# Create a LeafMap object to display the polygons
m = leafmap.Map(center=[polygons_df.geometry.centroid.y.mean(), polygons_df.geometry.centroid.x.mean()], zoom=10)
# Add polygons to the map
for _, row in polygons_df.iterrows():
polygon = row['geometry']
if polygon.is_valid: # Check if the geometry is valid
# Create a GeoDataFrame with the single row
gdf = gpd.GeoDataFrame([row], geometry=[polygon], crs=polygons_df.crs)
# Add the valid GeoDataFrame to the map
m.add_gdf(gdf=gdf, layer_name=row.get('name', 'Unnamed Polygon'))
# Display map
st.write("Map of Uploaded Polygons:")
m.to_streamlit()
# Store the map in session_state
st.session_state.map_data = m
# Process each polygon for index calculation
for idx, row in polygons_df.iterrows():
polygon = row['geometry']
location_name = row.get('name', f"Polygon_{idx}")
# Define the region of interest (ROI)
try:
roi = convert_to_ee_geometry(polygon)
except ValueError as e:
st.error(str(e))
continue # Skip this polygon if geometry is invalid
# Load Sentinel-2 image collection
collection = ee.ImageCollection(sub_options[sub_selection]) \
.filterDate(ee.Date(start_date_str), ee.Date(end_date_str)) \
.filterBounds(roi)
# Check if the collection has images for the selected date range
image_count = collection.size().getInfo()
if image_count == 0:
st.warning(f"No images found for {location_name}.")
else:
st.write(f"Found {image_count} images for {location_name}.")
image = collection.first()
# Perform the calculation based on user selection
result = None
if index_choice.lower() == 'ndvi':
result = calculate_ndvi(image, roi)
elif index_choice.lower() == 'ndwi':
result = calculate_ndwi(image, roi)
elif index_choice.lower() == 'average no₂':
if 'NO2' in image.bandNames().getInfo():
result = calculate_avg_no2_sentinel5p(image, roi)
else:
st.warning(f"No NO2 band found for {location_name}. Please use Sentinel-5P for NO₂ data.")
elif index_choice.lower() == 'custom formula' and custom_formula:
result = calculate_custom_formula(image, roi, custom_formula)
if result is not None:
# Only store the numeric value (not the dictionary structure)
calculated_value = result.getInfo() # Get the numeric value
# Store the result in session state
st.session_state.results.append({
'Location Name': location_name,
'Calculated Value': calculated_value
})
# After processing, show the results
if st.session_state.results:
# Convert the results to a DataFrame for better visualization
result_df = pd.DataFrame(st.session_state.results)
# If the shape type is 'Point', include 'Latitude' and 'Longitude'
if shape_type.lower() == 'point':
# Show the results in a table format with Latitude and Longitude
st.write("Processed Results Table (Points):")
st.dataframe(result_df[['Location Name', 'Latitude', 'Longitude', 'Calculated Value']])
else:
# For polygons, we only show the Location Name and Calculated Value
st.write("Processed Results Table (Polygons):")
st.dataframe(result_df[['Location Name', 'Calculated Value']])
# Generate the dynamic filename
filename = f"{main_selection}_{sub_selection}_{start_date.strftime('%Y/%m/%d')}_{end_date.strftime('%Y/%m/%d')}_{shape_type}.csv"
# Convert results to DataFrame for download
st.download_button(
label="Download results as CSV",
data=result_df.to_csv(index=False).encode('utf-8'),
file_name=filename,
mime='text/csv'
)