import logging # Standard library imports import datetime import base64 import os # Related third-party imports import streamlit as st from streamlit_elements import elements from google_auth_oauthlib.flow import Flow from googleapiclient.discovery import build from dotenv import load_dotenv import pandas as pd import searchconsole import cohere from sklearn.metrics.pairwise import cosine_similarity import requests from bs4 import BeautifulSoup from apify_client import ApifyClient import urllib.parse # Configure logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') load_dotenv() logging.info("Environment variables loaded") logger = logging.getLogger(__name__) # Initialize Cohere client APIFY_API_TOKEN = os.environ.get('APIFY_API_TOKEN') COHERE_API_KEY = os.environ["COHERE_API_KEY"] co = cohere.Client(COHERE_API_KEY) logging.info("Cohere client initialized") if not APIFY_API_TOKEN: logger.error("APIFY_API_TOKEN is not set in the environment variables.") st.error("APIFY_API_TOKEN is not set in the environment variables. Please set it and restart the application.") # Initialize the ApifyClient with the API token client = ApifyClient(APIFY_API_TOKEN) # Initialize the ApifyClient with the API token logger.info("ApifyClient initialized") # Configuration: Set to True if running locally, False if running on Streamlit Cloud IS_LOCAL = False # Constants SEARCH_TYPES = ["web", "image", "video", "news", "discover", "googleNews"] DATE_RANGE_OPTIONS = [ "Last 7 Days", "Last 30 Days", "Last 3 Months", "Last 6 Months", "Last 12 Months", "Last 16 Months", "Custom Range" ] DEVICE_OPTIONS = ["All Devices", "desktop", "mobile", "tablet"] BASE_DIMENSIONS = ["page", "query", "country", "date"] MAX_ROWS = 250_000 DF_PREVIEW_ROWS = 100 # ------------- # Streamlit App Configuration # ------------- def setup_streamlit(): st.set_page_config(page_title="Keyword Relevance Test", layout="wide") st.title("Keyword Relevance Test Using Vector Embedding") st.divider() logging.info("Streamlit app configured") def init_session_state(): if 'selected_property' not in st.session_state: st.session_state.selected_property = None if 'selected_search_type' not in st.session_state: st.session_state.selected_search_type = 'web' if 'selected_date_range' not in st.session_state: st.session_state.selected_date_range = 'Last 7 Days' if 'start_date' not in st.session_state: st.session_state.start_date = datetime.date.today() - datetime.timedelta(days=7) if 'end_date' not in st.session_state: st.session_state.end_date = datetime.date.today() if 'selected_dimensions' not in st.session_state: st.session_state.selected_dimensions = ['page', 'query'] if 'selected_device' not in st.session_state: st.session_state.selected_device = 'All Devices' if 'custom_start_date' not in st.session_state: st.session_state.custom_start_date = datetime.date.today() - datetime.timedelta(days=7) if 'custom_end_date' not in st.session_state: st.session_state.custom_end_date = datetime.date.today() logging.info("Session state initialized") # ------------- # Data Processing Functions # ------------- def get_serp_results(query): if not APIFY_API_TOKEN: st.error("Apify API token is not set. Unable to fetch SERP results.") return [] run_input = { "queries": query, "resultsPerPage": 5, "maxPagesPerQuery": 1, "languageCode": "", "mobileResults": False, "includeUnfilteredResults": False, "saveHtml": False, "saveHtmlToKeyValueStore": False, "includeIcons": False, } try: logger.debug(f"Calling Apify Actor with input: {run_input}") # Run the Actor and wait for it to finish run = client.actor("nFJndFXA5zjCTuudP").call(run_input=run_input) logger.info(f"Apify Actor run completed. Run ID: {run.get('id')}") # Fetch results from the run's dataset logger.debug(f"Fetching results from dataset ID: {run.get('defaultDatasetId')}") results = list(client.dataset(run["defaultDatasetId"]).iterate_items()) logger.info(f"Fetched {len(results)} results from Apify dataset") if results and 'organicResults' in results[0]: urls = [item['url'] for item in results[0]['organicResults']] logger.info(f"Extracted {len(urls)} URLs from organic results") return urls else: logger.warning("No organic results found in the SERP data.") st.warning("No organic results found in the SERP data.") return [] except Exception as e: logger.exception(f"Error fetching SERP results: {str(e)}") st.error(f"Error fetching SERP results: {str(e)}") return [] def fetch_content(url): logger.info(f"Fetching content from URL: {url}") try: # Decode URL-encoded characters decoded_url = urllib.parse.unquote(url) response = requests.get(decoded_url, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') content = soup.get_text(separator=' ', strip=True) #logger.debug(f"Fetched {len(content)} characters from {url}") return content except requests.RequestException as e: logger.error(f"Error fetching content from {url}: {e}") st.warning(f"Error fetching content from {url}: {e}") return "" def calculate_relevance_score(page_content, query, co): logger.info(f"Calculating relevance score for query: {query}") try: if not page_content: logger.warning("Empty page content. Returning score 0.") return 0 page_embedding = co.embed(texts=[page_content], model='embed-english-v3.0', input_type='search_document').embeddings[0] query_embedding = co.embed(texts=[query], model='embed-english-v3.0', input_type='search_query').embeddings[0] score = cosine_similarity([query_embedding], [page_embedding])[0][0] logger.debug(f"Relevance score calculated: {score}") return score except Exception as e: logger.exception(f"Error calculating relevance score: {str(e)}") st.error(f"Error calculating relevance score: {str(e)}") return 0 def analyze_competitors(row, co): logger.info(f"Analyzing competitors for query: {row['query']}") query = row['query'] our_url = row['page'] competitor_urls = get_serp_results(query) results = [] # Calculate score for our page first our_content = fetch_content(our_url) if our_content: our_score = calculate_relevance_score(our_content, query, co) results.append({'url': our_url, 'relevancy_score': our_score}) logger.info(f"Our URL: {our_url}, Score: {our_score}") else: logger.warning(f"No content fetched for our URL: {our_url}") # Calculate scores for competitor pages for url in competitor_urls: try: logger.debug(f"Processing competitor URL: {url}") content = fetch_content(url) if not content: logger.warning(f"No content fetched for competitor URL: {url}") continue score = calculate_relevance_score(content, query, co) logger.info(f"Competitor URL: {url}, Score: {score}") results.append({'url': url, 'relevancy_score': score}) except Exception as e: logger.error(f"Error processing URL {url}: {str(e)}") st.error(f"Error processing URL {url}: {str(e)}") results_df = pd.DataFrame(results).sort_values('relevancy_score', ascending=False) logger.info(f"Competitor analysis completed. {len(results)} results obtained.") return results_df def show_competitor_analysis(row, co): if st.button("Check Competitors", key=f"comp_{row['page']}"): logger.info(f"Competitor analysis requested for page: {row['page']}") with st.spinner('Analyzing competitors...'): results_df = analyze_competitors(row, co) st.write("Relevancy Score Comparison:") st.dataframe(results_df) our_data = results_df[results_df['url'] == row['page']] if our_data.empty: st.error(f"Our page '{row['page']}' is not in the results. This indicates an error in fetching or processing the page.") logger.error(f"Our page '{row['page']}' is missing from the results.") # Additional debugging information # st.write("Debugging Information:") # st.json({ # "our_url": row['page'], # "query": row['query'], # "content_fetched": fetch_content(row['page']), # "urls_processed": results_df['url'].tolist() # }) else: our_rank = our_data.index[0] + 1 total_results = len(results_df) our_score = our_data['relevancy_score'].values[0] logger.info(f"Our page ranks {our_rank} out of {total_results} in terms of relevancy score.") st.write(f"Our page ('{row['page']}') ranks {our_rank} out of {total_results} in terms of relevancy score.") st.write(f"Our relevancy score: {our_score:.4f}") if our_score == 0: st.warning("Our page's relevancy score is 0. This might indicate an issue with content fetching or score calculation.") # Additional debugging information st.write("Debugging Information:") content = fetch_content(row['page']) st.json({ "content_length": len(content), "content_preview": content[:500] if content else "No content fetched", "query": row['query'] }) elif our_rank == 1: st.success("Your page has the highest relevancy score!") elif our_rank <= 3: st.info("Your page is among the top 3 most relevant results.") elif our_rank > total_results / 2: st.warning("Your page's relevancy score is in the lower half of the results. Consider optimizing your content.") def analyze_competitors(row, co): query = row['query'] our_url = row['page'] our_score = row['relevancy_score'] competitor_urls = get_serp_results(query) results = [] for url in competitor_urls: content = fetch_content(url) score = calculate_relevance_score(content, query, co) results.append({'url': url, 'relevancy_score': score}) results.append({'url': our_url, 'relevancy_score': our_score}) results_df = pd.DataFrame(results).sort_values('relevancy_score', ascending=False) return results_df def process_gsc_data(df): logging.info("Processing GSC data") df_sorted = df.sort_values(['impressions'], ascending=[False]) df_unique = df_sorted.drop_duplicates(subset='page', keep='first') if 'relevancy_score' not in df_unique.columns: df_unique['relevancy_score'] = 0 else: df_unique['relevancy_score'] = df_sorted.groupby('page')['relevancy_score'].first().values result = df_unique[['page', 'query', 'clicks', 'impressions', 'ctr', 'position', 'relevancy_score']] logging.info("GSC data processed successfully") return result # ------------- # Google Authentication Functions # ------------- def load_config(): logging.info("Loading Google client configuration") client_config = { "web": { "client_id": os.environ["CLIENT_ID"], "client_secret": os.environ["CLIENT_SECRET"], "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "redirect_uris": ["https://poemsforaphrodite-gscpro.hf.space/"], } } logging.info("Google client configuration loaded") return client_config def init_oauth_flow(client_config): logging.info("Initializing OAuth flow") scopes = ["https://www.googleapis.com/auth/webmasters.readonly"] flow = Flow.from_client_config( client_config, scopes=scopes, redirect_uri=client_config["web"]["redirect_uris"][0] ) logging.info("OAuth flow initialized") return flow def google_auth(client_config): logging.info("Starting Google authentication") flow = init_oauth_flow(client_config) auth_url, _ = flow.authorization_url(prompt="consent") logging.info("Google authentication URL generated") return flow, auth_url def auth_search_console(client_config, credentials): #logging.info("Authenticating with Google Search Console") token = { "token": credentials.token, "refresh_token": credentials.refresh_token, "token_uri": credentials.token_uri, "client_id": credentials.client_id, "client_secret": credentials.client_secret, "scopes": credentials.scopes, "id_token": getattr(credentials, "id_token", None), } #logging.info("Google Search Console authenticated") return searchconsole.authenticate(client_config=client_config, credentials=token) # ------------- # Data Fetching Functions # ------------- def list_gsc_properties(credentials): logging.info("Listing GSC properties") service = build('webmasters', 'v3', credentials=credentials) site_list = service.sites().list().execute() properties = [site['siteUrl'] for site in site_list.get('siteEntry', [])] or ["No properties found"] #logging.info(f"GSC properties listed: {properties}") return properties def fetch_gsc_data(webproperty, search_type, start_date, end_date, dimensions, device_type=None): #logging.info(f"Fetching GSC data for property: {webproperty}, search_type: {search_type}, date_range: {start_date} to {end_date}, dimensions: {dimensions}, device_type: {device_type}") query = webproperty.query.range(start_date, end_date).search_type(search_type).dimension(*dimensions) if 'device' in dimensions and device_type and device_type != 'All Devices': query = query.filter('device', 'equals', device_type.lower()) try: df = query.limit(MAX_ROWS).get().to_dataframe() logging.info("GSC data fetched successfully") return process_gsc_data(df) except Exception as e: logging.error(f"Error fetching GSC data: {e}") show_error(e) return pd.DataFrame() def calculate_relevance_score(page_content, query, co): logger.info(f"Calculating relevance score for query: {query}") try: if not page_content: logger.warning("Empty page content. Returning score 0.") return 0 page_embedding = co.embed(texts=[page_content], model='embed-english-v3.0', input_type='search_document').embeddings[0] query_embedding = co.embed(texts=[query], model='embed-english-v3.0', input_type='search_query').embeddings[0] score = cosine_similarity([query_embedding], [page_embedding])[0][0] logger.debug(f"Relevance score calculated: {score}") return score except Exception as e: logger.exception(f"Error calculating relevance score: {str(e)}") st.error(f"Error calculating relevance score: {str(e)}") return 0 def calculate_relevancy_scores(df, model_type): logging.info("Calculating relevancy scores") with st.spinner('Calculating relevancy scores...'): try: page_contents = [fetch_content(url) for url in df['page']] page_embeddings = generate_embeddings(page_contents, model_type) query_embeddings = generate_embeddings(df['query'].tolist(), model_type) relevancy_scores = cosine_similarity(query_embeddings, page_embeddings).diagonal() df = df.assign(relevancy_score=relevancy_scores) logging.info("Relevancy scores calculated successfully") except Exception as e: logging.error(f"Error calculating relevancy scores: {e}") st.warning(f"Error calculating relevancy scores: {e}") df = df.assign(relevancy_score=0) return df # ------------- # Utility Functions # ------------- def update_dimensions(selected_search_type): # logging.debug(f"Updating dimensions for search type: {selected_search_type}") return BASE_DIMENSIONS + ['device'] if selected_search_type in SEARCH_TYPES else BASE_DIMENSIONS def calc_date_range(selection, custom_start=None, custom_end=None): # logging.debug(f"Calculating date range for selection: {selection}") range_map = { 'Last 7 Days': 7, 'Last 30 Days': 30, 'Last 3 Months': 90, 'Last 6 Months': 180, 'Last 12 Months': 365, 'Last 16 Months': 480 } today = datetime.date.today() if selection == 'Custom Range': if custom_start and custom_end: logging.debug(f"Custom date range: {custom_start} to {custom_end}") return custom_start, custom_end else: logging.debug("Defaulting custom date range to last 7 days") return today - datetime.timedelta(days=7), today date_range = today - datetime.timedelta(days=range_map.get(selection, 0)), today #logging.debug(f"Date range calculated: {date_range}") return date_range def show_error(e): logging.error(f"An error occurred: {e}") st.error(f"An error occurred: {e}") def property_change(): #logging.info(f"Property changed to: {st.session_state['selected_property_selector']}") st.session_state.selected_property = st.session_state['selected_property_selector'] # ------------- # File & Download Operations # ------------- def show_dataframe(report): logging.info("Showing dataframe preview") with st.expander("Preview the First 100 Rows (Unique Pages with Top Query)"): st.dataframe(report.head(DF_PREVIEW_ROWS)) def download_csv_link(report): logging.info("Generating CSV download link") def to_csv(df): return df.to_csv(index=False, encoding='utf-8-sig') csv = to_csv(report) b64_csv = base64.b64encode(csv.encode()).decode() href = f'Download CSV File' st.markdown(href, unsafe_allow_html=True) logging.info("CSV download link generated") # ------------- # Streamlit UI Components # ------------- def show_google_sign_in(auth_url): logging.info("Showing Google sign-in button") with st.sidebar: if st.button("Sign in with Google"): st.write('Please click the link below to sign in:') st.markdown(f'[Google Sign-In]({auth_url})', unsafe_allow_html=True) def show_property_selector(properties, account): logging.info("Showing property selector") selected_property = st.selectbox( "Select a Search Console Property:", properties, index=properties.index( st.session_state.selected_property) if st.session_state.selected_property in properties else 0, key='selected_property_selector', on_change=property_change ) return account[selected_property] def show_search_type_selector(): logging.info("Showing search type selector") return st.selectbox( "Select Search Type:", SEARCH_TYPES, index=SEARCH_TYPES.index(st.session_state.selected_search_type), key='search_type_selector' ) def show_model_type_selector(): logging.info("Showing model type selector") return st.selectbox( "Select the embedding model:", ["english", "multilingual"], key='model_type_selector' ) def show_tabular_data(df, co): st.write("Data Table with Relevancy Scores and Competitor Analysis") for index, row in df.iterrows(): with st.expander(f"Query: {row['query']} | Page: {row['page']}"): col1, col2 = st.columns(2) with col1: st.write("GSC Data:") st.write(f"URL: {row['page']}") st.write(f"Query: {row['query']}") st.write(f"Impressions: {row['impressions']}") st.write(f"Clicks: {row['clicks']}") st.write(f"Position: {row['position']}") st.write(f"Relevancy Score: {row['relevancy_score']:.4f}") with col2: st.write("Competitor Analysis:") if st.button("Analyze Competitors", key=f"comp_{index}"): with st.spinner('Analyzing competitors...'): results_df = analyze_competitors(row, co) st.dataframe(results_df) our_rank = results_df.index[results_df['url'] == row['page']].tolist()[0] + 1 st.write(f"Our page ranks {our_rank} out of {len(results_df)} in terms of relevancy score.") def show_date_range_selector(): logging.info("Showing date range selector") return st.selectbox( "Select Date Range:", DATE_RANGE_OPTIONS, index=DATE_RANGE_OPTIONS.index(st.session_state.selected_date_range), key='date_range_selector' ) def show_custom_date_inputs(): logging.info("Showing custom date inputs") st.session_state.custom_start_date = st.date_input("Start Date", st.session_state.custom_start_date) st.session_state.custom_end_date = st.date_input("End Date", st.session_state.custom_end_date) def show_dimensions_selector(search_type): logging.info("Showing dimensions selector") available_dimensions = update_dimensions(search_type) return st.multiselect( "Select Dimensions:", available_dimensions, default=st.session_state.selected_dimensions, key='dimensions_selector' ) def show_paginated_dataframe(report, rows_per_page=20): logging.info("Showing paginated dataframe") report['position'] = report['position'].astype(int) report['impressions'] = pd.to_numeric(report['impressions'], errors='coerce') def format_ctr(x): try: return f"{float(x):.2%}" except ValueError: return x def format_relevancy_score(x): try: return f"{float(x):.2f}" except ValueError: return x report['ctr'] = report['ctr'].apply(format_ctr) report['relevancy_score'] = report['relevancy_score'].apply(format_relevancy_score) def make_clickable(url): return f'{url}' report['clickable_url'] = report['page'].apply(make_clickable) columns = ['clickable_url', 'query', 'impressions', 'clicks', 'ctr', 'position', 'relevancy_score'] report = report[columns] sort_column = st.selectbox("Sort by:", columns[1:], index=columns[1:].index('impressions')) sort_order = st.radio("Sort order:", ("Descending", "Ascending")) ascending = sort_order == "Ascending" def safe_float_convert(x): try: return float(x.rstrip('%')) / 100 if isinstance(x, str) and x.endswith('%') else float(x) except ValueError: return 0 report['ctr_numeric'] = report['ctr'].apply(safe_float_convert) report['relevancy_score_numeric'] = report['relevancy_score'].apply(safe_float_convert) sort_column_numeric = sort_column + '_numeric' if sort_column in ['ctr', 'relevancy_score'] else sort_column report = report.sort_values(by=sort_column_numeric, ascending=ascending) report = report.drop(columns=['ctr_numeric', 'relevancy_score_numeric']) total_rows = len(report) total_pages = (total_rows - 1) // rows_per_page + 1 if 'current_page' not in st.session_state: st.session_state.current_page = 1 col1, col2, col3 = st.columns([1,3,1]) with col1: if st.button("Previous", disabled=st.session_state.current_page == 1): st.session_state.current_page -= 1 with col2: st.write(f"Page {st.session_state.current_page} of {total_pages}") with col3: if st.button("Next", disabled=st.session_state.current_page == total_pages): st.session_state.current_page += 1 start_idx = (st.session_state.current_page - 1) * rows_per_page end_idx = start_idx + rows_per_page st.markdown(report.iloc[start_idx:end_idx].to_html(escape=False, index=False), unsafe_allow_html=True) # ------------- # Main Streamlit App Function # ------------- def main(): logging.info("Starting main function") setup_streamlit() client_config = load_config() if 'auth_flow' not in st.session_state or 'auth_url' not in st.session_state: st.session_state.auth_flow, st.session_state.auth_url = google_auth(client_config) query_params = st.query_params auth_code = query_params.get("code", None) if auth_code and 'credentials' not in st.session_state: st.session_state.auth_flow.fetch_token(code=auth_code) st.session_state.credentials = st.session_state.auth_flow.credentials if 'credentials' not in st.session_state: show_google_sign_in(st.session_state.auth_url) else: init_session_state() account = auth_search_console(client_config, st.session_state.credentials) properties = list_gsc_properties(st.session_state.credentials) if properties: webproperty = show_property_selector(properties, account) search_type = show_search_type_selector() date_range_selection = show_date_range_selector() model_type = show_model_type_selector() if date_range_selection == 'Custom Range': show_custom_date_inputs() start_date, end_date = st.session_state.custom_start_date, st.session_state.custom_end_date else: start_date, end_date = calc_date_range(date_range_selection) selected_dimensions = show_dimensions_selector(search_type) if 'report_data' not in st.session_state: st.session_state.report_data = None if st.button("Fetch Data"): with st.spinner('Fetching data...'): st.session_state.report_data = fetch_gsc_data(webproperty, search_type, start_date, end_date, selected_dimensions) if st.session_state.report_data is not None and not st.session_state.report_data.empty: st.write("Data fetched successfully. Click the button below to calculate relevancy scores.") if st.button("Calculate Relevancy Scores"): logger.info("Calculating relevancy scores for all rows") st.session_state.report_data = calculate_relevancy_scores(st.session_state.report_data, model_type) show_tabular_data(st.session_state.report_data, co) download_csv_link(st.session_state.report_data) elif st.session_state.report_data is not None: logger.warning("No data found for the selected criteria.") st.warning("No data found for the selected criteria.") if __name__ == "__main__": logging.info("Running main function") main() logger.info("Script completed")