import streamlit as st
import json
import ee
import os
import pandas as pd
import geopandas as gpd
from datetime import datetime
import leafmap.foliumap as leafmap
import re
# Set up the page layout
st.set_page_config(layout="wide")
# Custom button styling
m = st.markdown(
"""
""",
unsafe_allow_html=True,
)
# Logo
st.write(
f"""
""",
unsafe_allow_html=True,
)
# Title
st.markdown(
f"""
Precision Analysis for Vegetation, Water, and Air Quality
""",
unsafe_allow_html=True,
)
st.write("User Inputs
", unsafe_allow_html=True)
# Authenticate and initialize Earth Engine
earthengine_credentials = os.environ.get("EE_Authentication")
# Initialize Earth Engine with secret credentials
os.makedirs(os.path.expanduser("~/.config/earthengine/"), exist_ok=True)
with open(os.path.expanduser("~/.config/earthengine/credentials"), "w") as f:
f.write(earthengine_credentials)
ee.Initialize(project='ee-yashsacisro24')
# Load the Sentinel dataset options from JSON file
with open("sentinel_datasets.json") as f:
data = json.load(f)
# Display the title for the Streamlit app
st.title("Sentinel Dataset")
# Select dataset category (main selection)
main_selection = st.selectbox("Select Sentinel Dataset Category", list(data.keys()))
# If a category is selected, display the sub-options (specific datasets)
if main_selection:
sub_options = data[main_selection]["sub_options"]
sub_selection = st.selectbox("Select Specific Dataset ID", list(sub_options.keys()))
# Display the selected dataset ID based on user input
if sub_selection:
st.write(f"You selected: {main_selection} -> {sub_selection}")
st.write(f"Dataset ID: {sub_options[sub_selection]}")
# Fetch the correct dataset ID from the sub-selection
dataset_id = sub_options[sub_selection]
# Earth Engine Index Calculator Section
st.header("Earth Engine Index Calculator")
index_choice = st.selectbox("Select an Index or Enter Custom Formula", ['NDVI', 'NDWI', 'Average NO₂', 'Custom Formula'])
# Initialize custom_formula variable
custom_formula = ""
# Display corresponding formula based on the index selected (case-insensitive)
if index_choice.lower() == 'ndvi':
st.write("Formula for NDVI: NDVI = (B8 - B4) / (B8 + B4)")
elif index_choice.lower() == 'ndwi':
st.write("Formula for NDWI: NDWI = (B3 - B8) / (B3 + B8)")
elif index_choice.lower() == 'average no₂':
st.write("Formula for Average NO₂: Average NO₂ = Mean(NO2 band)")
elif index_choice.lower() == 'custom formula':
custom_formula = st.text_input("Enter Custom Formula (e.g., B5,B4)")
# Check if custom formula is empty and show warning
if not custom_formula:
st.warning("Please enter a custom formula before proceeding.")
else:
st.write(f"Custom Formula: (band1 - band2) / (band1 + band2)") # Display the custom formula after the user inputs it
# Function to get the corresponding reducer based on user input
def get_reducer(reducer_name):
"""
Map user-friendly reducer names to Earth Engine reducer objects.
"""
reducers = {
'mean': ee.Reducer.mean(),
'sum': ee.Reducer.sum(),
'median': ee.Reducer.median(),
'min': ee.Reducer.min(),
'max': ee.Reducer.max(),
'count': ee.Reducer.count(),
}
# Default to 'mean' if the reducer_name is not recognized
return reducers.get(reducer_name.lower(), ee.Reducer.mean())
# Streamlit selectbox for reducer choice
reducer_choice = st.selectbox(
"Select Reducer",
['mean', 'sum', 'median', 'min', 'max', 'count'],
index=0 # Default to 'mean'
)
# Function to check if the polygon geometry is valid and convert it to the correct format
def convert_to_ee_geometry(geometry):
if geometry.is_valid:
geojson = geometry.__geo_interface__
return ee.Geometry(geojson)
else:
raise ValueError("Invalid geometry: The polygon geometry is not valid.")
# Function to read points from CSV
def read_csv(file_path):
df = pd.read_csv(file_path)
return df
# Function to read points from GeoJSON
def read_geojson(file_path):
gdf = gpd.read_file(file_path)
return gdf
# Function to read points from KML
def read_kml(file_path):
gdf = gpd.read_file(file_path, driver='KML')
return gdf
# Ask user whether they want to process 'Point' or 'Polygon' data (case-insensitive)
shape_type = st.selectbox("Do you want to process 'Point' or 'Polygon' data?", ["Point", "Polygon"])
# Ask user to upload a file based on shape type (case-insensitive)
file_upload = st.file_uploader(f"Upload your {shape_type} data (CSV, GeoJSON, KML)", type=["csv", "geojson", "kml"])
if file_upload is not None:
# Read the user-uploaded file
if shape_type.lower() == "point":
# Handle different file types for Point data
if file_upload.name.endswith('.csv'):
locations_df = pd.read_csv(file_upload)
elif file_upload.name.endswith('.geojson'):
locations_df = gpd.read_file(file_upload)
elif file_upload.name.endswith('.kml'):
locations_df = gpd.read_file(file_upload)
else:
st.error("Unsupported file format. Please upload CSV, GeoJSON, or KML.")
locations_df = pd.DataFrame()
# Check if the file contains polygons when the user selected "Point"
if 'geometry' in locations_df.columns:
# Check if the geometry type is Polygon or MultiPolygon
if locations_df.geometry.geom_type.isin(['Polygon', 'MultiPolygon']).any():
st.warning("The uploaded file contains polygon data. Please select 'Polygon' for processing.")
st.stop() # Stop further processing if polygons are detected
# Processing the point data
with st.spinner('Processing data...'):
if locations_df is not None and not locations_df.empty:
# For GeoJSON data, the coordinates are in the geometry column
if 'geometry' in locations_df.columns:
# Extract latitude and longitude from the geometry column
locations_df['latitude'] = locations_df['geometry'].y
locations_df['longitude'] = locations_df['geometry'].x
# Ensure the necessary columns exist in the dataframe
if 'latitude' not in locations_df.columns or 'longitude' not in locations_df.columns:
st.error("Uploaded file is missing required 'latitude' or 'longitude' columns.")
else:
# Display a preview of the points data
st.write("Preview of the uploaded points data:")
st.dataframe(locations_df.head())
# Create a LeafMap object to display the points
m = leafmap.Map(center=[locations_df['latitude'].mean(), locations_df['longitude'].mean()], zoom=10)
# Add points to the map using a loop
for _, row in locations_df.iterrows():
latitude = row['latitude']
longitude = row['longitude']
# Check if latitude or longitude are NaN and skip if they are
if pd.isna(latitude) or pd.isna(longitude):
continue # Skip this row and move to the next one
m.add_marker(location=[latitude, longitude], popup=row.get('name', 'No Name'))
# Display map
st.write("Map of Uploaded Points:")
m.to_streamlit()
# Store the map in session_state
st.session_state.map_data = m
elif shape_type.lower() == "polygon":
# Handle different file types for Polygon data:
if file_upload.name.endswith('.csv'):
locations_df = pd.read_csv(file_upload)
elif file_upload.name.endswith('.geojson'):
locations_df = gpd.read_file(file_upload)
elif file_upload.name.endswith('.kml'):
locations_df = gpd.read_file(file_upload)
else:
st.error("Unsupported file format. Please upload CSV, GeoJSON, or KML.")
locations_df = pd.DataFrame()
# Check if the file contains points when the user selected "Polygon"
if 'geometry' in locations_df.columns:
# Check if the geometry type is Point or MultiPoint
if locations_df.geometry.geom_type.isin(['Point', 'MultiPoint']).any():
st.warning("The uploaded file contains point data. Please select 'Point' for processing.")
st.stop() # Stop further processing if point data is detected
# Processing the polygon data
with st.spinner('Processing data...'):
if locations_df is not None and not locations_df.empty:
# Ensure the 'geometry' column exists in the dataframe
if 'geometry' not in locations_df.columns:
st.error("Uploaded file is missing required 'geometry' column.")
else:
# Display a preview of the polygons data
st.write("Preview of the uploaded polygons data:")
st.dataframe(locations_df.head())
# Create a LeafMap object to display the polygons
# Calculate the centroid of the polygons for the map center
centroid_lat = locations_df.geometry.centroid.y.mean()
centroid_lon = locations_df.geometry.centroid.x.mean()
m = leafmap.Map(center=[centroid_lat, centroid_lon], zoom=10)
# Add polygons to the map using a loop
for _, row in locations_df.iterrows():
polygon = row['geometry']
if polygon.is_valid: # Check if polygon is valid
# Create a GeoDataFrame for this polygon
gdf = gpd.GeoDataFrame([row], geometry=[polygon], crs=locations_df.crs)
m.add_gdf(gdf=gdf, layer_name=row.get('name', 'Unnamed Polygon'))
# Display map
st.write("Map of Uploaded Polygons:")
m.to_streamlit()
# Store the map in session_state
st.session_state.map_data = m
# Date Input for Start and End Dates
start_date = st.date_input("Start Date", value=pd.to_datetime('2020-01-01'))
end_date = st.date_input("End Date", value=pd.to_datetime('2020-12-31'))
# Convert start_date and end_date to string format for Earth Engine
start_date_str = start_date.strftime('%Y-%m-%d')
end_date_str = end_date.strftime('%Y-%m-%d')
# Aggregation period selection
aggregation_period = st.selectbox("Select Aggregation Period", ["Daily", "Weekly", "Monthly", "Yearly"], index=0)
# Initialize session state for storing results if not already done
if 'results' not in st.session_state:
st.session_state.results = []
if 'last_params' not in st.session_state:
st.session_state.last_params = {}
if 'map_data' not in st.session_state:
st.session_state.map_data = None # Initialize map_data
# Function to check if parameters have changed
def parameters_changed():
return (
st.session_state.last_params.get('main_selection') != main_selection or
st.session_state.last_params.get('dataset_id') != dataset_id or
st.session_state.last_params.get('index_choice') != index_choice or
st.session_state.last_params.get('start_date_str') != start_date_str or
st.session_state.last_params.get('end_date_str') != end_date_str or
st.session_state.last_params.get('shape_type') != shape_type or
st.session_state.last_params.get('file_upload') != file_upload
)
# If parameters have changed, reset the results
if parameters_changed():
st.session_state.results = [] # Clear the previous results
st.session_state.last_params = {
'main_selection': main_selection,
'dataset_id': dataset_id,
'index_choice': index_choice,
'start_date_str': start_date_str,
'end_date_str': end_date_str,
'shape_type': shape_type,
'file_upload': file_upload
}
# Function to calculate NDVI with the selected reducer
def calculate_ndvi(image, geometry, reducer_choice):
ndvi = image.normalizedDifference(['B8', 'B4']).rename('NDVI')
return ndvi
# Function to calculate NDWI
def calculate_ndwi(image, geometry, reducer_choice):
ndwi = image.normalizedDifference(['B3', 'B8']).rename('NDWI')
return ndwi
def calculate_custom_formula(image, geometry, custom_formula, reducer_choice, scale=30):
# Calculate NDWI using the user-specified bands
band1 = custom_formula[:custom_formula.find(",")]
band2 = custom_formula[custom_formula.find(",")+1:]
custom_formula = image.normalizedDifference([band1, band2]).rename('custom formula')
return custom_formula
# Modify aggregation functions to return the correct time period and aggregated results
def aggregate_data_daily(collection):
# Extract day from the image date (using the exact date)
collection = collection.map(lambda image: image.set('day', ee.Date(image.get('system:time_start')).format('YYYY-MM-dd')))
# Group images by day (distinct days)
grouped_by_day = collection.aggregate_array('day').distinct()
def calculate_daily_mean(day):
# Filter the collection by the specific day
daily_collection = collection.filter(ee.Filter.eq('day', day))
daily_mean = daily_collection.mean() # Calculate mean for the day
return daily_mean.set('day', day)
# Calculate the daily mean for each day
daily_images = ee.List(grouped_by_day.map(calculate_daily_mean))
return ee.ImageCollection(daily_images)
def aggregate_data_weekly(collection):
# Extract week and year from the image date
collection = collection.map(lambda image: image.set('week', ee.Date(image.get('system:time_start')).format('YYYY-ww')))
# Group images by week
grouped_by_week = collection.aggregate_array('week').distinct()
def calculate_weekly_mean(week):
weekly_collection = collection.filter(ee.Filter.eq('week', week))
weekly_mean = weekly_collection.mean()
return weekly_mean.set('week', week)
# Calculate the weekly mean for each week
weekly_images = ee.List(grouped_by_week.map(calculate_weekly_mean))
return ee.ImageCollection(weekly_images)
def aggregate_data_monthly(collection):
# Extract month and year from the image date
collection = collection.map(lambda image: image.set('month', ee.Date(image.get('system:time_start')).format('YYYY-MM')))
# Group images by month
grouped_by_month = collection.aggregate_array('month').distinct()
def calculate_monthly_mean(month):
monthly_collection = collection.filter(ee.Filter.eq('month', month))
monthly_mean = monthly_collection.mean()
return monthly_mean.set('month', month)
# Calculate the monthly mean for each month
monthly_images = ee.List(grouped_by_month.map(calculate_monthly_mean))
return ee.ImageCollection(monthly_images)
def aggregate_data_yearly(collection):
# Extract year from the image date
collection = collection.map(lambda image: image.set('year', ee.Date(image.get('system:time_start')).format('YYYY')))
# Group images by year
grouped_by_year = collection.aggregate_array('year').distinct()
def calculate_yearly_mean(year):
yearly_collection = collection.filter(ee.Filter.eq('year', year))
yearly_mean = yearly_collection.mean()
return yearly_mean.set('year', year)
# Calculate the yearly mean for each year
yearly_images = ee.List(grouped_by_year.map(calculate_yearly_mean))
return ee.ImageCollection(yearly_images)
# Function to calculate index based on the selected choice
def calculate_index_for_period(image, roi, index_choice, reducer_choice, custom_formula):
if index_choice.lower() == 'ndvi':
return calculate_ndvi(image, roi, reducer_choice)
elif index_choice.lower() == 'ndwi':
return calculate_ndwi(image, roi, reducer_choice)
elif index_choice.lower() == 'average no₂':
mean_no2 = image.select('NO2').mean().rename('Average NO₂')
return mean_no2
elif index_choice.lower() == 'custom formula':
# Pass the custom formula here, not the index_choice
return calculate_custom_formula(image, roi, custom_formula, reducer_choice)
else:
st.write("Please Select any one option...."+ index_choice.lower())
def process_aggregation(locations_df, start_date_str, end_date_str, dataset_id, index_choice, reducer_choice, shape_type, aggregation_period, custom_formula=""):
aggregated_results = []
# Check if the index_choice is 'custom formula' and the custom formula is empty
if index_choice.lower() == 'custom_formula' and not custom_formula:
st.error("Custom formula cannot be empty. Please provide a formula.")
return aggregated_results # Return early to avoid further processing
# Initialize progress bar
total_steps = len(locations_df)
progress_bar = st.progress(0)
progress_text = st.empty()
with st.spinner('Processing data...'):
if shape_type.lower() == "point":
for idx, row in locations_df.iterrows():
# Check if the latitude and longitude columns exist and have values
latitude = row.get('latitude')
longitude = row.get('longitude')
if pd.isna(latitude) or pd.isna(longitude):
st.warning(f"Skipping location {idx} with missing latitude or longitude")
continue
location_name = row.get('name', f"Location_{idx}")
roi = ee.Geometry.Point([longitude, latitude])
collection = ee.ImageCollection(dataset_id) \
.filterDate(ee.Date(start_date_str), ee.Date(end_date_str)) \
.filterBounds(roi)
# Aggregate data based on the selected period
if aggregation_period.lower() == 'daily':
collection = aggregate_data_daily(collection)
elif aggregation_period.lower() == 'weekly':
collection = aggregate_data_weekly(collection)
elif aggregation_period.lower() == 'monthly':
collection = aggregate_data_monthly(collection)
elif aggregation_period.lower() == 'yearly':
collection = aggregate_data_yearly(collection)
# Process each image in the collection
image_list = collection.toList(collection.size())
for i in range(image_list.size().getInfo()):
image = ee.Image(image_list.get(i))
if aggregation_period.lower() == 'daily':
timestamp = image.get('day')
elif aggregation_period.lower() == 'weekly':
timestamp = image.get('week')
elif aggregation_period.lower() == 'monthly':
timestamp = image.get('month')
elif aggregation_period.lower() == 'yearly':
timestamp = image.get('year')
date = ee.Date(timestamp).format('YYYY-MM-dd').getInfo()
# Calculate the index for each period
index_image = calculate_index_for_period(image, roi, index_choice, reducer_choice, custom_formula)
# Skip if index_image is None
if index_image is None:
st.warning(f"Index calculation failed for {location_name} on {date}. Skipping this entry.")
continue
# Reduce the region to get the aggregated value
try:
index_value = index_image.reduceRegion(
reducer=get_reducer(reducer_choice),
geometry=roi,
scale=30
).get(index_image.bandNames().get(0))
calculated_value = index_value.getInfo()
# Append the results if valid
if isinstance(calculated_value, (int, float)):
aggregated_results.append({
'Location Name': location_name,
'Latitude': latitude,
'Longitude': longitude,
'Date': date,
'Calculated Value': calculated_value
})
else:
st.warning(f"Skipping invalid value for {location_name} on {date}")
except Exception as e:
st.error(f"Error retrieving value for {location_name}: {e}")
# Update progress bar
progress_percentage = (idx + 1) / total_steps
progress_bar.progress(progress_percentage)
progress_text.markdown(f"Processing: {int(progress_percentage * 100)}%")
elif shape_type.lower() == "polygon":
for idx, row in locations_df.iterrows():
polygon_name = row.get('name', f"Polygon_{idx}")
polygon_geometry = row.get('geometry')
location_name = polygon_name
try:
roi = convert_to_ee_geometry(polygon_geometry)
except ValueError as e:
st.warning(f"Skipping invalid polygon {polygon_name}: {e}")
continue
collection = ee.ImageCollection(dataset_id) \
.filterDate(ee.Date(start_date_str), ee.Date(end_date_str)) \
.filterBounds(roi)
# Aggregate data based on the selected period
if aggregation_period.lower() == 'daily':
collection = aggregate_data_daily(collection)
elif aggregation_period.lower() == 'weekly':
collection = aggregate_data_weekly(collection)
elif aggregation_period.lower() == 'monthly':
collection = aggregate_data_monthly(collection)
elif aggregation_period.lower() == 'yearly':
collection = aggregate_data_yearly(collection)
# Process each image in the collection
image_list = collection.toList(collection.size())
for i in range(image_list.size().getInfo()):
image = ee.Image(image_list.get(i))
if aggregation_period.lower() == 'daily':
timestamp = image.get('day')
elif aggregation_period.lower() == 'weekly':
timestamp = image.get('week')
elif aggregation_period.lower() == 'monthly':
timestamp = image.get('month')
elif aggregation_period.lower() == 'yearly':
timestamp = image.get('year')
date = ee.Date(timestamp).format('YYYY-MM-dd').getInfo()
# Calculate the index for each period
index_image = calculate_index_for_period(image, roi, index_choice, reducer_choice, custom_formula)
# Skip if index_image is None
if index_image is None:
st.warning(f"Index calculation failed for {location_name} on {date}. Skipping this entry.")
continue
# Reduce the region to get the aggregated value
try:
index_value = index_image.reduceRegion(
reducer=get_reducer(reducer_choice),
geometry=roi,
scale=30
).get(index_image.bandNames().get(0))
calculated_value = index_value.getInfo()
# Append the results if valid
if isinstance(calculated_value, (int, float)):
aggregated_results.append({
'Location Name': location_name,
'Date': date,
'Calculated Value': calculated_value
})
else:
st.warning(f"Skipping invalid value for {location_name} on {date}")
except Exception as e:
st.error(f"Error retrieving value for {location_name}: {e}")
# Update progress bar
progress_percentage = (idx + 1) / total_steps
progress_bar.progress(progress_percentage)
progress_text.markdown(f"Processing: {int(progress_percentage * 100)}%")
return aggregated_results
# When the user clicks the process button, start the calculation
if st.button(f"Calculate ({index_choice})"):
if file_upload is not None:
# Read the user-uploaded file
if shape_type.lower() == "point":
# Process results for the selected aggregation period
results = process_aggregation(
locations_df,
start_date_str,
end_date_str,
dataset_id,
index_choice,
reducer_choice,
shape_type,
aggregation_period,
custom_formula
)
# Display the results in a DataFrame
if results:
result_df = pd.DataFrame(results)
st.write(f"Processed Results Table ({aggregation_period}):")
st.dataframe(result_df)
# Provide a download button for the result CSV file
filename = f"{main_selection}_{dataset_id}_{start_date.strftime('%Y/%m/%d')}_{end_date.strftime('%Y/%m/%d')}_{aggregation_period.lower()}.csv"
st.download_button(
label="Download results as CSV",
data=result_df.to_csv(index=False).encode('utf-8'),
file_name=filename,
mime='text/csv'
)
# Once processing is complete, hide the spinner
st.spinner('') # This will stop the spinner
st.success('Processing complete!')
else:
st.warning("No results were generated.")
elif shape_type.lower() == "polygon":
# Process results for the selected aggregation period
results = process_aggregation(
locations_df,
start_date_str,
end_date_str,
dataset_id,
index_choice,
reducer_choice,
shape_type,
aggregation_period,
custom_formula
)
# Display the results in a DataFrame
if results:
result_df = pd.DataFrame(results)
st.write(f"Processed Results Table ({aggregation_period}):")
st.dataframe(result_df)
# Provide a download button for the result CSV file
filename = f"{main_selection}_{dataset_id}_{start_date.strftime('%Y/%m/%d')}_{end_date.strftime('%Y/%m/%d')}_{aggregation_period.lower()}.csv"
st.download_button(
label="Download results as CSV",
data=result_df.to_csv(index=False).encode('utf-8'),
file_name=filename,
mime='text/csv'
)
# Once processing is complete, hide the spinner
st.spinner('') # This will stop the spinner
st.success('Processing complete!')
else:
st.warning("No results were generated.")
else:
st.warning("Please upload a file.")