import streamlit as st import json import ee import os import pandas as pd import geopandas as gpd from datetime import datetime import leafmap.foliumap as leafmap import re # Set up the page layout st.set_page_config(layout="wide") # Custom button styling m = st.markdown( """ """, unsafe_allow_html=True, ) # Logo st.write( f"""
""", unsafe_allow_html=True, ) # Title st.markdown( f"""

Precision Analysis for Vegetation, Water, and Air Quality

""", unsafe_allow_html=True, ) st.write("

User Inputs

", unsafe_allow_html=True) # Authenticate and initialize Earth Engine earthengine_credentials = os.environ.get("EE_Authentication") # Initialize Earth Engine with secret credentials os.makedirs(os.path.expanduser("~/.config/earthengine/"), exist_ok=True) with open(os.path.expanduser("~/.config/earthengine/credentials"), "w") as f: f.write(earthengine_credentials) ee.Initialize(project='ee-yashsacisro24') # Load the Sentinel dataset options from JSON file with open("sentinel_datasets.json") as f: data = json.load(f) # Display the title for the Streamlit app st.title("Sentinel Dataset") # Select dataset category (main selection) main_selection = st.selectbox("Select Sentinel Dataset Category", list(data.keys())) # If a category is selected, display the sub-options (specific datasets) if main_selection: sub_options = data[main_selection]["sub_options"] sub_selection = st.selectbox("Select Specific Dataset ID", list(sub_options.keys())) # Display the selected dataset ID based on user input if sub_selection: st.write(f"You selected: {main_selection} -> {sub_selection}") st.write(f"Dataset ID: {sub_options[sub_selection]}") # Fetch the correct dataset ID from the sub-selection dataset_id = sub_options[sub_selection] # Earth Engine Index Calculator Section st.header("Earth Engine Index Calculator") index_choice = st.selectbox("Select an Index or Enter Custom Formula", ['NDVI', 'NDWI', 'Average NO₂', 'Custom Formula']) # Initialize custom_formula variable custom_formula = "" # Display corresponding formula based on the index selected (case-insensitive) if index_choice.lower() == 'ndvi': st.write("Formula for NDVI: NDVI = (B8 - B4) / (B8 + B4)") elif index_choice.lower() == 'ndwi': st.write("Formula for NDWI: NDWI = (B3 - B8) / (B3 + B8)") elif index_choice.lower() == 'average no₂': st.write("Formula for Average NO₂: Average NO₂ = Mean(NO2 band)") elif index_choice.lower() == 'custom formula': custom_formula = st.text_input("Enter Custom Formula (e.g., B5,B4)") # Check if custom formula is empty and show warning if not custom_formula: st.warning("Please enter a custom formula before proceeding.") else: st.write(f"Custom Formula: (band1 - band2) / (band1 + band2)") # Display the custom formula after the user inputs it # Function to get the corresponding reducer based on user input def get_reducer(reducer_name): """ Map user-friendly reducer names to Earth Engine reducer objects. """ reducers = { 'mean': ee.Reducer.mean(), 'sum': ee.Reducer.sum(), 'median': ee.Reducer.median(), 'min': ee.Reducer.min(), 'max': ee.Reducer.max(), 'count': ee.Reducer.count(), } # Default to 'mean' if the reducer_name is not recognized return reducers.get(reducer_name.lower(), ee.Reducer.mean()) # Streamlit selectbox for reducer choice reducer_choice = st.selectbox( "Select Reducer", ['mean', 'sum', 'median', 'min', 'max', 'count'], index=0 # Default to 'mean' ) # Function to check if the polygon geometry is valid and convert it to the correct format def convert_to_ee_geometry(geometry): if geometry.is_valid: geojson = geometry.__geo_interface__ return ee.Geometry(geojson) else: raise ValueError("Invalid geometry: The polygon geometry is not valid.") # Function to read points from CSV def read_csv(file_path): df = pd.read_csv(file_path) return df # Function to read points from GeoJSON def read_geojson(file_path): gdf = gpd.read_file(file_path) return gdf # Function to read points from KML def read_kml(file_path): gdf = gpd.read_file(file_path, driver='KML') return gdf # Ask user whether they want to process 'Point' or 'Polygon' data (case-insensitive) shape_type = st.selectbox("Do you want to process 'Point' or 'Polygon' data?", ["Point", "Polygon"]) # Ask user to upload a file based on shape type (case-insensitive) file_upload = st.file_uploader(f"Upload your {shape_type} data (CSV, GeoJSON, KML)", type=["csv", "geojson", "kml"]) if file_upload is not None: # Read the user-uploaded file if shape_type.lower() == "point": # Handle different file types for Point data if file_upload.name.endswith('.csv'): locations_df = pd.read_csv(file_upload) elif file_upload.name.endswith('.geojson'): locations_df = gpd.read_file(file_upload) elif file_upload.name.endswith('.kml'): locations_df = gpd.read_file(file_upload) else: st.error("Unsupported file format. Please upload CSV, GeoJSON, or KML.") locations_df = pd.DataFrame() # Check if the file contains polygons when the user selected "Point" if 'geometry' in locations_df.columns: # Check if the geometry type is Polygon or MultiPolygon if locations_df.geometry.geom_type.isin(['Polygon', 'MultiPolygon']).any(): st.warning("The uploaded file contains polygon data. Please select 'Polygon' for processing.") st.stop() # Stop further processing if polygons are detected # Processing the point data with st.spinner('Processing data...'): if locations_df is not None and not locations_df.empty: # For GeoJSON data, the coordinates are in the geometry column if 'geometry' in locations_df.columns: # Extract latitude and longitude from the geometry column locations_df['latitude'] = locations_df['geometry'].y locations_df['longitude'] = locations_df['geometry'].x # Ensure the necessary columns exist in the dataframe if 'latitude' not in locations_df.columns or 'longitude' not in locations_df.columns: st.error("Uploaded file is missing required 'latitude' or 'longitude' columns.") else: # Display a preview of the points data st.write("Preview of the uploaded points data:") st.dataframe(locations_df.head()) # Create a LeafMap object to display the points m = leafmap.Map(center=[locations_df['latitude'].mean(), locations_df['longitude'].mean()], zoom=10) # Add points to the map using a loop for _, row in locations_df.iterrows(): latitude = row['latitude'] longitude = row['longitude'] # Check if latitude or longitude are NaN and skip if they are if pd.isna(latitude) or pd.isna(longitude): continue # Skip this row and move to the next one m.add_marker(location=[latitude, longitude], popup=row.get('name', 'No Name')) # Display map st.write("Map of Uploaded Points:") m.to_streamlit() # Store the map in session_state st.session_state.map_data = m elif shape_type.lower() == "polygon": # Handle different file types for Polygon data: if file_upload.name.endswith('.csv'): locations_df = pd.read_csv(file_upload) elif file_upload.name.endswith('.geojson'): locations_df = gpd.read_file(file_upload) elif file_upload.name.endswith('.kml'): locations_df = gpd.read_file(file_upload) else: st.error("Unsupported file format. Please upload CSV, GeoJSON, or KML.") locations_df = pd.DataFrame() # Check if the file contains points when the user selected "Polygon" if 'geometry' in locations_df.columns: # Check if the geometry type is Point or MultiPoint if locations_df.geometry.geom_type.isin(['Point', 'MultiPoint']).any(): st.warning("The uploaded file contains point data. Please select 'Point' for processing.") st.stop() # Stop further processing if point data is detected # Processing the polygon data with st.spinner('Processing data...'): if locations_df is not None and not locations_df.empty: # Ensure the 'geometry' column exists in the dataframe if 'geometry' not in locations_df.columns: st.error("Uploaded file is missing required 'geometry' column.") else: # Display a preview of the polygons data st.write("Preview of the uploaded polygons data:") st.dataframe(locations_df.head()) # Create a LeafMap object to display the polygons # Calculate the centroid of the polygons for the map center centroid_lat = locations_df.geometry.centroid.y.mean() centroid_lon = locations_df.geometry.centroid.x.mean() m = leafmap.Map(center=[centroid_lat, centroid_lon], zoom=10) # Add polygons to the map using a loop for _, row in locations_df.iterrows(): polygon = row['geometry'] if polygon.is_valid: # Check if polygon is valid # Create a GeoDataFrame for this polygon gdf = gpd.GeoDataFrame([row], geometry=[polygon], crs=locations_df.crs) m.add_gdf(gdf=gdf, layer_name=row.get('name', 'Unnamed Polygon')) # Display map st.write("Map of Uploaded Polygons:") m.to_streamlit() # Store the map in session_state st.session_state.map_data = m # Date Input for Start and End Dates start_date = st.date_input("Start Date", value=pd.to_datetime('2020-01-01')) end_date = st.date_input("End Date", value=pd.to_datetime('2020-12-31')) # Convert start_date and end_date to string format for Earth Engine start_date_str = start_date.strftime('%Y-%m-%d') end_date_str = end_date.strftime('%Y-%m-%d') # Aggregation period selection aggregation_period = st.selectbox("Select Aggregation Period", ["Daily", "Weekly", "Monthly", "Yearly"], index=0) # Initialize session state for storing results if not already done if 'results' not in st.session_state: st.session_state.results = [] if 'last_params' not in st.session_state: st.session_state.last_params = {} if 'map_data' not in st.session_state: st.session_state.map_data = None # Initialize map_data # Function to check if parameters have changed def parameters_changed(): return ( st.session_state.last_params.get('main_selection') != main_selection or st.session_state.last_params.get('dataset_id') != dataset_id or st.session_state.last_params.get('index_choice') != index_choice or st.session_state.last_params.get('start_date_str') != start_date_str or st.session_state.last_params.get('end_date_str') != end_date_str or st.session_state.last_params.get('shape_type') != shape_type or st.session_state.last_params.get('file_upload') != file_upload ) # If parameters have changed, reset the results if parameters_changed(): st.session_state.results = [] # Clear the previous results st.session_state.last_params = { 'main_selection': main_selection, 'dataset_id': dataset_id, 'index_choice': index_choice, 'start_date_str': start_date_str, 'end_date_str': end_date_str, 'shape_type': shape_type, 'file_upload': file_upload } # Function to calculate NDVI with the selected reducer def calculate_ndvi(image, geometry, reducer_choice): ndvi = image.normalizedDifference(['B8', 'B4']).rename('NDVI') return ndvi # Function to calculate NDWI def calculate_ndwi(image, geometry, reducer_choice): ndwi = image.normalizedDifference(['B3', 'B8']).rename('NDWI') return ndwi def calculate_custom_formula(image, geometry, custom_formula, reducer_choice, scale=30): # Calculate NDWI using the user-specified bands band1 = custom_formula[:custom_formula.find(",")] band2 = custom_formula[custom_formula.find(",")+1:] custom_formula = image.normalizedDifference([band1, band2]).rename('custom formula') return custom_formula # Modify aggregation functions to return the correct time period and aggregated results def aggregate_data_daily(collection): # Extract day from the image date (using the exact date) collection = collection.map(lambda image: image.set('day', ee.Date(image.get('system:time_start')).format('YYYY-MM-dd'))) # Group images by day (distinct days) grouped_by_day = collection.aggregate_array('day').distinct() def calculate_daily_mean(day): # Filter the collection by the specific day daily_collection = collection.filter(ee.Filter.eq('day', day)) daily_mean = daily_collection.mean() # Calculate mean for the day return daily_mean.set('day', day) # Calculate the daily mean for each day daily_images = ee.List(grouped_by_day.map(calculate_daily_mean)) return ee.ImageCollection(daily_images) def aggregate_data_weekly(collection): # Extract week and year from the image date collection = collection.map(lambda image: image.set('week', ee.Date(image.get('system:time_start')).format('YYYY-ww'))) # Group images by week grouped_by_week = collection.aggregate_array('week').distinct() def calculate_weekly_mean(week): weekly_collection = collection.filter(ee.Filter.eq('week', week)) weekly_mean = weekly_collection.mean() return weekly_mean.set('week', week) # Calculate the weekly mean for each week weekly_images = ee.List(grouped_by_week.map(calculate_weekly_mean)) return ee.ImageCollection(weekly_images) def aggregate_data_monthly(collection): # Extract month and year from the image date collection = collection.map(lambda image: image.set('month', ee.Date(image.get('system:time_start')).format('YYYY-MM'))) # Group images by month grouped_by_month = collection.aggregate_array('month').distinct() def calculate_monthly_mean(month): monthly_collection = collection.filter(ee.Filter.eq('month', month)) monthly_mean = monthly_collection.mean() return monthly_mean.set('month', month) # Calculate the monthly mean for each month monthly_images = ee.List(grouped_by_month.map(calculate_monthly_mean)) return ee.ImageCollection(monthly_images) def aggregate_data_yearly(collection): # Extract year from the image date collection = collection.map(lambda image: image.set('year', ee.Date(image.get('system:time_start')).format('YYYY'))) # Group images by year grouped_by_year = collection.aggregate_array('year').distinct() def calculate_yearly_mean(year): yearly_collection = collection.filter(ee.Filter.eq('year', year)) yearly_mean = yearly_collection.mean() return yearly_mean.set('year', year) # Calculate the yearly mean for each year yearly_images = ee.List(grouped_by_year.map(calculate_yearly_mean)) return ee.ImageCollection(yearly_images) # Function to calculate index based on the selected choice def calculate_index_for_period(image, roi, index_choice, reducer_choice, custom_formula): if index_choice.lower() == 'ndvi': return calculate_ndvi(image, roi, reducer_choice) elif index_choice.lower() == 'ndwi': return calculate_ndwi(image, roi, reducer_choice) elif index_choice.lower() == 'average no₂': mean_no2 = image.select('NO2').mean().rename('Average NO₂') return mean_no2 elif index_choice.lower() == 'custom formula': # Pass the custom formula here, not the index_choice return calculate_custom_formula(image, roi, custom_formula, reducer_choice) else: st.write("Please Select any one option...."+ index_choice.lower()) def process_aggregation(locations_df, start_date_str, end_date_str, dataset_id, index_choice, reducer_choice, shape_type, aggregation_period, custom_formula=""): aggregated_results = [] # Check if the index_choice is 'custom formula' and the custom formula is empty if index_choice.lower() == 'custom_formula' and not custom_formula: st.error("Custom formula cannot be empty. Please provide a formula.") return aggregated_results # Return early to avoid further processing # Initialize progress bar total_steps = len(locations_df) progress_bar = st.progress(0) progress_text = st.empty() with st.spinner('Processing data...'): if shape_type.lower() == "point": for idx, row in locations_df.iterrows(): # Check if the latitude and longitude columns exist and have values latitude = row.get('latitude') longitude = row.get('longitude') if pd.isna(latitude) or pd.isna(longitude): st.warning(f"Skipping location {idx} with missing latitude or longitude") continue location_name = row.get('name', f"Location_{idx}") roi = ee.Geometry.Point([longitude, latitude]) collection = ee.ImageCollection(dataset_id) \ .filterDate(ee.Date(start_date_str), ee.Date(end_date_str)) \ .filterBounds(roi) # Aggregate data based on the selected period if aggregation_period.lower() == 'daily': collection = aggregate_data_daily(collection) elif aggregation_period.lower() == 'weekly': collection = aggregate_data_weekly(collection) elif aggregation_period.lower() == 'monthly': collection = aggregate_data_monthly(collection) elif aggregation_period.lower() == 'yearly': collection = aggregate_data_yearly(collection) # Process each image in the collection image_list = collection.toList(collection.size()) for i in range(image_list.size().getInfo()): image = ee.Image(image_list.get(i)) if aggregation_period.lower() == 'daily': timestamp = image.get('day') elif aggregation_period.lower() == 'weekly': timestamp = image.get('week') elif aggregation_period.lower() == 'monthly': timestamp = image.get('month') elif aggregation_period.lower() == 'yearly': timestamp = image.get('year') date = ee.Date(timestamp).format('YYYY-MM-dd').getInfo() # Calculate the index for each period index_image = calculate_index_for_period(image, roi, index_choice, reducer_choice, custom_formula) # Skip if index_image is None if index_image is None: st.warning(f"Index calculation failed for {location_name} on {date}. Skipping this entry.") continue # Reduce the region to get the aggregated value try: index_value = index_image.reduceRegion( reducer=get_reducer(reducer_choice), geometry=roi, scale=30 ).get(index_image.bandNames().get(0)) calculated_value = index_value.getInfo() # Append the results if valid if isinstance(calculated_value, (int, float)): aggregated_results.append({ 'Location Name': location_name, 'Latitude': latitude, 'Longitude': longitude, 'Date': date, 'Calculated Value': calculated_value }) else: st.warning(f"Skipping invalid value for {location_name} on {date}") except Exception as e: st.error(f"Error retrieving value for {location_name}: {e}") # Update progress bar progress_percentage = (idx + 1) / total_steps progress_bar.progress(progress_percentage) progress_text.markdown(f"Processing: {int(progress_percentage * 100)}%") elif shape_type.lower() == "polygon": for idx, row in locations_df.iterrows(): polygon_name = row.get('name', f"Polygon_{idx}") polygon_geometry = row.get('geometry') location_name = polygon_name try: roi = convert_to_ee_geometry(polygon_geometry) except ValueError as e: st.warning(f"Skipping invalid polygon {polygon_name}: {e}") continue collection = ee.ImageCollection(dataset_id) \ .filterDate(ee.Date(start_date_str), ee.Date(end_date_str)) \ .filterBounds(roi) # Aggregate data based on the selected period if aggregation_period.lower() == 'daily': collection = aggregate_data_daily(collection) elif aggregation_period.lower() == 'weekly': collection = aggregate_data_weekly(collection) elif aggregation_period.lower() == 'monthly': collection = aggregate_data_monthly(collection) elif aggregation_period.lower() == 'yearly': collection = aggregate_data_yearly(collection) # Process each image in the collection image_list = collection.toList(collection.size()) for i in range(image_list.size().getInfo()): image = ee.Image(image_list.get(i)) if aggregation_period.lower() == 'daily': timestamp = image.get('day') elif aggregation_period.lower() == 'weekly': timestamp = image.get('week') elif aggregation_period.lower() == 'monthly': timestamp = image.get('month') elif aggregation_period.lower() == 'yearly': timestamp = image.get('year') date = ee.Date(timestamp).format('YYYY-MM-dd').getInfo() # Calculate the index for each period index_image = calculate_index_for_period(image, roi, index_choice, reducer_choice, custom_formula) # Skip if index_image is None if index_image is None: st.warning(f"Index calculation failed for {location_name} on {date}. Skipping this entry.") continue # Reduce the region to get the aggregated value try: index_value = index_image.reduceRegion( reducer=get_reducer(reducer_choice), geometry=roi, scale=30 ).get(index_image.bandNames().get(0)) calculated_value = index_value.getInfo() # Append the results if valid if isinstance(calculated_value, (int, float)): aggregated_results.append({ 'Location Name': location_name, 'Date': date, 'Calculated Value': calculated_value }) else: st.warning(f"Skipping invalid value for {location_name} on {date}") except Exception as e: st.error(f"Error retrieving value for {location_name}: {e}") # Update progress bar progress_percentage = (idx + 1) / total_steps progress_bar.progress(progress_percentage) progress_text.markdown(f"Processing: {int(progress_percentage * 100)}%") return aggregated_results # When the user clicks the process button, start the calculation if st.button(f"Calculate ({index_choice})"): if file_upload is not None: # Read the user-uploaded file if shape_type.lower() == "point": # Process results for the selected aggregation period results = process_aggregation( locations_df, start_date_str, end_date_str, dataset_id, index_choice, reducer_choice, shape_type, aggregation_period, custom_formula ) # Display the results in a DataFrame if results: result_df = pd.DataFrame(results) st.write(f"Processed Results Table ({aggregation_period}):") st.dataframe(result_df) # Provide a download button for the result CSV file filename = f"{main_selection}_{dataset_id}_{start_date.strftime('%Y/%m/%d')}_{end_date.strftime('%Y/%m/%d')}_{aggregation_period.lower()}.csv" st.download_button( label="Download results as CSV", data=result_df.to_csv(index=False).encode('utf-8'), file_name=filename, mime='text/csv' ) # Once processing is complete, hide the spinner st.spinner('') # This will stop the spinner st.success('Processing complete!') else: st.warning("No results were generated.") elif shape_type.lower() == "polygon": # Process results for the selected aggregation period results = process_aggregation( locations_df, start_date_str, end_date_str, dataset_id, index_choice, reducer_choice, shape_type, aggregation_period, custom_formula ) # Display the results in a DataFrame if results: result_df = pd.DataFrame(results) st.write(f"Processed Results Table ({aggregation_period}):") st.dataframe(result_df) # Provide a download button for the result CSV file filename = f"{main_selection}_{dataset_id}_{start_date.strftime('%Y/%m/%d')}_{end_date.strftime('%Y/%m/%d')}_{aggregation_period.lower()}.csv" st.download_button( label="Download results as CSV", data=result_df.to_csv(index=False).encode('utf-8'), file_name=filename, mime='text/csv' ) # Once processing is complete, hide the spinner st.spinner('') # This will stop the spinner st.success('Processing complete!') else: st.warning("No results were generated.") else: st.warning("Please upload a file.")