import streamlit as st import json import ee import os import pandas as pd import geopandas as gpd from datetime import datetime import leafmap.foliumap as leafmap import time import re # Set up the page layout st.set_page_config(layout="wide") # Custom button styling m = st.markdown( """ """, unsafe_allow_html=True, ) # Logo st.write( f"""
""", unsafe_allow_html=True, ) # Authenticate and initialize Earth Engine earthengine_credentials = os.environ.get("EE_Authentication") # Initialize Earth Engine with secret credentials os.makedirs(os.path.expanduser("~/.config/earthengine/"), exist_ok=True) with open(os.path.expanduser("~/.config/earthengine/credentials"), "w") as f: f.write(earthengine_credentials) ee.Initialize(project='ee-yashsacisro24') # Load Sentinel dataset options from JSON file with open("sentinel_datasets.json") as f: data = json.load(f) # Display the title and dataset selection st.title("Sentinel Dataset") # Select dataset category and subcategory (case-insensitive selection) main_selection = st.selectbox("Select Sentinel Dataset Category", list(data.keys())) if main_selection: sub_options = data[main_selection]["sub_options"] sub_selection = st.selectbox("Select Specific Dataset ID", list(sub_options.keys())) # Earth Engine Index Calculator Section st.header("Earth Engine Index Calculator") # Choose Index or Custom Formula (case-insensitive) index_choice = st.selectbox("Select an Index or Enter Custom Formula", ['NDVI', 'NDWI', 'Average NO₂', 'Custom Formula']) # Initialize custom_formula variable custom_formula = "" # Display corresponding formula based on the index selected (case-insensitive) if index_choice.lower() == 'ndvi': st.write("Formula for NDVI: NDVI = (B8 - B4) / (B8 + B4)") elif index_choice.lower() == 'ndwi': st.write("Formula for NDWI: NDWI = (B3 - B8) / (B3 + B8)") elif index_choice.lower() == 'average no₂': st.write("Formula for Average NO₂: Average NO₂ = Mean(NO2 band)") elif index_choice.lower() == 'custom formula': custom_formula = st.text_input("Enter Custom Formula (e.g., 'B5 - B4 / B5 + B4')") st.write(f"Custom Formula: {custom_formula}") # Display the custom formula after the user inputs it # Function to check if the polygon geometry is valid and convert it to the correct format def convert_to_ee_geometry(geometry): # Ensure the polygon geometry is in the right format if geometry.is_valid: # Convert the geometry to GeoJSON format geojson = geometry.__geo_interface__ # Convert to Earth Engine geometry return ee.Geometry(geojson) else: raise ValueError("Invalid geometry: The polygon geometry is not valid.") # Function to read points from CSV def read_csv(file_path): df = pd.read_csv(file_path) return df # Function to read points from GeoJSON def read_geojson(file_path): gdf = gpd.read_file(file_path) return gdf # Function to read points from KML def read_kml(file_path): gdf = gpd.read_file(file_path, driver='KML') return gdf # Ask user whether they want to process 'Point' or 'Polygon' data (case-insensitive) shape_type = st.selectbox("Do you want to process 'Point' or 'Polygon' data?", ["Point", "Polygon"]) # Ask user to upload a file based on shape type (case-insensitive) file_upload = st.file_uploader(f"Upload your {shape_type} data (CSV, GeoJSON, KML)", type=["csv", "geojson", "kml"]) # Date Input for Start and End Dates start_date = st.date_input("Start Date", value=pd.to_datetime('2020-01-01')) end_date = st.date_input("End Date", value=pd.to_datetime('2020-12-31')) # Convert start_date and end_date to string format for Earth Engine start_date_str = start_date.strftime('%Y-%m-%d') end_date_str = end_date.strftime('%Y-%m-%d') # Initialize session state for storing results if not already done if 'results' not in st.session_state: st.session_state.results = [] if 'last_params' not in st.session_state: st.session_state.last_params = {} if 'map_data' not in st.session_state: st.session_state.map_data = None # Initialize map_data if 'file_upload' in st.session_state: st.session_state.file_upload = None # Function to check if parameters have changed def parameters_changed(): return ( st.session_state.last_params.get('main_selection') != main_selection or st.session_state.last_params.get('sub_selection') != sub_selection or st.session_state.last_params.get('index_choice') != index_choice or st.session_state.last_params.get('start_date_str') != start_date_str or st.session_state.last_params.get('end_date_str') != end_date_str ) # If parameters have changed, reset the results if parameters_changed(): st.session_state.results = [] # Clear the previous results # Update the last parameters to the current ones st.session_state.last_params = { 'main_selection': main_selection, 'sub_selection': sub_selection, 'index_choice': index_choice, 'start_date_str': start_date_str, 'end_date_str': end_date_str } # Function to perform index calculations def calculate_ndvi(image, geometry): ndvi = image.normalizedDifference(['B8', 'B4']).rename('NDVI') result = ndvi.reduceRegion( reducer=ee.Reducer.mean(), geometry=geometry, scale=30 ) return result.get('NDVI') def calculate_ndwi(image, geometry): ndwi = image.normalizedDifference(['B3', 'B8']).rename('NDWI') result = ndwi.reduceRegion( reducer=ee.Reducer.mean(), geometry=geometry, scale=30 ) return result.get('NDWI') def calculate_avg_no2_sentinel5p(image, geometry): no2 = image.select('NO2').reduceRegion( reducer=ee.Reducer.mean(), geometry=geometry, scale=1000 ).get('NO2') return no2 def calculate_custom_formula(image, geometry, formula): result = image.expression(formula).rename('Custom Index').reduceRegion( reducer=ee.Reducer.mean(), geometry=geometry, scale=30 ) return result.get('Custom Index') # Check if the file uploaded is different from the previous file uploaded if 'file_upload' in st.session_state and st.session_state.file_upload != file_upload: reset_session_state_for_new_file() # Reset session state for new file # Process each point or polygon if file_upload: locations_df = None # Initialize locations_df to None polygons_df = None # Initialize polygons_df to None file_extension = os.path.splitext(file_upload.name)[1].lower() # Convert extension to lowercase # Read file based on shape type (case-insensitive) if shape_type.lower() == 'point': if file_extension == '.csv': locations_df = read_csv(file_upload) elif file_extension == '.geojson': locations_df = read_geojson(file_upload) elif file_extension == '.kml': locations_df = read_kml(file_upload) else: st.error("Unsupported file type. Please upload a CSV, GeoJSON, or KML file for points.") elif shape_type.lower() == 'polygon': if file_extension == '.geojson': polygons_df = read_geojson(file_upload) elif file_extension == '.kml': polygons_df = read_kml(file_upload) else: st.error("Unsupported file type. Please upload a GeoJSON or KML file for polygons.") # Check if locations_df is populated for points if locations_df is not None: # Display a preview of the points data st.write("Preview of the uploaded points data:") st.dataframe(locations_df.head()) # Create a LeafMap object to display the points m = leafmap.Map(center=[locations_df['latitude'].mean(), locations_df['longitude'].mean()], zoom=10) # Add points to the map using a loop for _, row in locations_df.iterrows(): latitude = row['latitude'] longitude = row['longitude'] # Check if latitude or longitude are NaN and skip if they are if pd.isna(latitude) or pd.isna(longitude): continue # Skip this row and move to the next one m.add_marker(location=[latitude, longitude], popup=row.get('name', 'No Name')) # Display map st.write("Map of Uploaded Points:") m.to_streamlit() # Store the map in session_state st.session_state.map_data = m # Process each point for index calculation for idx, row in locations_df.iterrows(): latitude = row['latitude'] longitude = row['longitude'] location_name = row.get('name', f"Point_{idx}") # Skip processing if latitude or longitude is NaN if pd.isna(latitude) or pd.isna(longitude): continue # Skip this row and move to the next one # Define the region of interest (ROI) roi = ee.Geometry.Point([longitude, latitude]) # Load Sentinel-2 image collection collection = ee.ImageCollection(sub_options[sub_selection]) \ .filterDate(ee.Date(start_date_str), ee.Date(end_date_str)) \ .filterBounds(roi) # Check if the collection has images for the selected date range image_count = collection.size().getInfo() if image_count == 0: st.warning(f"No images found for {location_name}.") else: st.write(f"Found {image_count} images for {location_name}.") image = collection.first() # Perform the calculation based on user selection result = None if index_choice.lower() == 'ndvi': result = calculate_ndvi(image, roi) elif index_choice.lower() == 'ndwi': result = calculate_ndwi(image, roi) elif index_choice.lower() == 'average no₂': if 'NO2' in image.bandNames().getInfo(): result = calculate_avg_no2_sentinel5p(image, roi) else: st.warning(f"No NO2 band found for {location_name}. Please use Sentinel-5P for NO₂ data.") elif index_choice.lower() == 'custom formula' and custom_formula: result = calculate_custom_formula(image, roi, custom_formula) if result is not None: # Only store the numeric value (not the dictionary structure) calculated_value = result.getInfo() # Get the numeric value # Store the result in session state st.session_state.results.append({ 'Location Name': location_name, 'Latitude': latitude, 'Longitude': longitude, 'Calculated Value': calculated_value }) # Check if polygons_df is populated for polygons if polygons_df is not None: # Display a preview of the polygons data st.write("Preview of the uploaded polygons data:") st.dataframe(polygons_df.head()) # Create a LeafMap object to display the polygons m = leafmap.Map(center=[polygons_df.geometry.centroid.y.mean(), polygons_df.geometry.centroid.x.mean()], zoom=10) # Add polygons to the map for _, row in polygons_df.iterrows(): polygon = row['geometry'] if polygon.is_valid: # Check if the geometry is valid # Create a GeoDataFrame with the single row gdf = gpd.GeoDataFrame([row], geometry=[polygon], crs=polygons_df.crs) # Add the valid GeoDataFrame to the map m.add_gdf(gdf=gdf, layer_name=row.get('name', 'Unnamed Polygon')) # Display map st.write("Map of Uploaded Polygons:") m.to_streamlit() # Store the map in session_state st.session_state.map_data = m # Process each polygon for index calculation for idx, row in polygons_df.iterrows(): polygon = row['geometry'] location_name = row.get('name', f"Polygon_{idx}") # Define the region of interest (ROI) try: roi = convert_to_ee_geometry(polygon) except ValueError as e: st.error(str(e)) continue # Skip this polygon if geometry is invalid # Load Sentinel-2 image collection collection = ee.ImageCollection(sub_options[sub_selection]) \ .filterDate(ee.Date(start_date_str), ee.Date(end_date_str)) \ .filterBounds(roi) # Check if the collection has images for the selected date range image_count = collection.size().getInfo() if image_count == 0: st.warning(f"No images found for {location_name}.") else: st.write(f"Found {image_count} images for {location_name}.") image = collection.first() # Perform the calculation based on user selection result = None if index_choice.lower() == 'ndvi': result = calculate_ndvi(image, roi) elif index_choice.lower() == 'ndwi': result = calculate_ndwi(image, roi) elif index_choice.lower() == 'average no₂': if 'NO2' in image.bandNames().getInfo(): result = calculate_avg_no2_sentinel5p(image, roi) else: st.warning(f"No NO2 band found for {location_name}. Please use Sentinel-5P for NO₂ data.") elif index_choice.lower() == 'custom formula' and custom_formula: result = calculate_custom_formula(image, roi, custom_formula) if result is not None: # Only store the numeric value (not the dictionary structure) calculated_value = result.getInfo() # Get the numeric value # Store the result in session state st.session_state.results.append({ 'Location Name': location_name, 'Calculated Value': calculated_value }) # After processing, show the results if st.session_state.results: # Convert the results to a DataFrame for better visualization result_df = pd.DataFrame(st.session_state.results) # If the shape type is 'Point', include 'Latitude' and 'Longitude' if shape_type.lower() == 'point': # Show the results in a table format with Latitude and Longitude st.write("Processed Results Table (Points):") st.dataframe(result_df[['Location Name', 'Latitude', 'Longitude', 'Calculated Value']]) else: # For polygons, we only show the Location Name and Calculated Value st.write("Processed Results Table (Polygons):") st.dataframe(result_df[['Location Name', 'Calculated Value']]) # Generate the dynamic filename filename = f"{main_selection}_{sub_selection}_{start_date.strftime('%Y/%m/%d')}_{end_date.strftime('%Y/%m/%d')}_{shape_type}.csv" # Convert results to DataFrame for download st.download_button( label="Download results as CSV", data=result_df.to_csv(index=False).encode('utf-8'), file_name=filename, mime='text/csv' )