import logging
# Standard library imports
import datetime
import base64
import os
# Related third-party imports
import streamlit as st
from streamlit_elements import elements
from google_auth_oauthlib.flow import Flow
from googleapiclient.discovery import build
from dotenv import load_dotenv
import pandas as pd
import searchconsole
import cohere
from sklearn.metrics.pairwise import cosine_similarity
import requests
from bs4 import BeautifulSoup
from apify_client import ApifyClient
import urllib.parse
# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
load_dotenv()
logging.info("Environment variables loaded")
logger = logging.getLogger(__name__)
# Initialize Cohere client
APIFY_API_TOKEN = os.environ.get('APIFY_API_TOKEN')
COHERE_API_KEY = os.environ["COHERE_API_KEY"]
co = cohere.Client(COHERE_API_KEY)
logging.info("Cohere client initialized")
if not APIFY_API_TOKEN:
logger.error("APIFY_API_TOKEN is not set in the environment variables.")
st.error("APIFY_API_TOKEN is not set in the environment variables. Please set it and restart the application.")
# Initialize the ApifyClient with the API token
client = ApifyClient(APIFY_API_TOKEN)
# Initialize the ApifyClient with the API token
logger.info("ApifyClient initialized")
# Configuration: Set to True if running locally, False if running on Streamlit Cloud
IS_LOCAL = False
# Constants
SEARCH_TYPES = ["web", "image", "video", "news", "discover", "googleNews"]
DATE_RANGE_OPTIONS = [
"Last 7 Days",
"Last 30 Days",
"Last 3 Months",
"Last 6 Months",
"Last 12 Months",
"Last 16 Months",
"Custom Range"
]
DEVICE_OPTIONS = ["All Devices", "desktop", "mobile", "tablet"]
BASE_DIMENSIONS = ["page", "query", "country", "date"]
MAX_ROWS = 250_000
DF_PREVIEW_ROWS = 100
# -------------
# Streamlit App Configuration
# -------------
def setup_streamlit():
st.set_page_config(page_title="Keyword Relevance Test", layout="wide")
st.title("Keyword Relevance Test Using Vector Embedding")
st.divider()
logging.info("Streamlit app configured")
def init_session_state():
if 'selected_property' not in st.session_state:
st.session_state.selected_property = None
if 'selected_search_type' not in st.session_state:
st.session_state.selected_search_type = 'web'
if 'selected_date_range' not in st.session_state:
st.session_state.selected_date_range = 'Last 7 Days'
if 'start_date' not in st.session_state:
st.session_state.start_date = datetime.date.today() - datetime.timedelta(days=7)
if 'end_date' not in st.session_state:
st.session_state.end_date = datetime.date.today()
if 'selected_dimensions' not in st.session_state:
st.session_state.selected_dimensions = ['page', 'query']
if 'selected_device' not in st.session_state:
st.session_state.selected_device = 'All Devices'
if 'custom_start_date' not in st.session_state:
st.session_state.custom_start_date = datetime.date.today() - datetime.timedelta(days=7)
if 'custom_end_date' not in st.session_state:
st.session_state.custom_end_date = datetime.date.today()
logging.info("Session state initialized")
# -------------
# Data Processing Functions
# -------------
def get_serp_results(query):
if not APIFY_API_TOKEN:
st.error("Apify API token is not set. Unable to fetch SERP results.")
return []
run_input = {
"queries": query,
"resultsPerPage": 5,
"maxPagesPerQuery": 1,
"languageCode": "",
"mobileResults": False,
"includeUnfilteredResults": False,
"saveHtml": False,
"saveHtmlToKeyValueStore": False,
"includeIcons": False,
}
try:
logger.debug(f"Calling Apify Actor with input: {run_input}")
# Run the Actor and wait for it to finish
run = client.actor("nFJndFXA5zjCTuudP").call(run_input=run_input)
logger.info(f"Apify Actor run completed. Run ID: {run.get('id')}")
# Fetch results from the run's dataset
logger.debug(f"Fetching results from dataset ID: {run.get('defaultDatasetId')}")
results = list(client.dataset(run["defaultDatasetId"]).iterate_items())
logger.info(f"Fetched {len(results)} results from Apify dataset")
if results and 'organicResults' in results[0]:
urls = [item['url'] for item in results[0]['organicResults']]
logger.info(f"Extracted {len(urls)} URLs from organic results")
return urls
else:
logger.warning("No organic results found in the SERP data.")
st.warning("No organic results found in the SERP data.")
return []
except Exception as e:
logger.exception(f"Error fetching SERP results: {str(e)}")
st.error(f"Error fetching SERP results: {str(e)}")
return []
def fetch_content(url):
logger.info(f"Fetching content from URL: {url}")
try:
# Decode URL-encoded characters
decoded_url = urllib.parse.unquote(url)
response = requests.get(decoded_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
content = soup.get_text(separator=' ', strip=True)
#logger.debug(f"Fetched {len(content)} characters from {url}")
return content
except requests.RequestException as e:
logger.error(f"Error fetching content from {url}: {e}")
st.warning(f"Error fetching content from {url}: {e}")
return ""
def calculate_relevance_score(page_content, query, co):
logger.info(f"Calculating relevance score for query: {query}")
try:
if not page_content:
logger.warning("Empty page content. Returning score 0.")
return 0
page_embedding = co.embed(texts=[page_content], model='embed-english-v3.0', input_type='search_document').embeddings[0]
query_embedding = co.embed(texts=[query], model='embed-english-v3.0', input_type='search_query').embeddings[0]
score = cosine_similarity([query_embedding], [page_embedding])[0][0]
logger.debug(f"Relevance score calculated: {score}")
return score
except Exception as e:
logger.exception(f"Error calculating relevance score: {str(e)}")
st.error(f"Error calculating relevance score: {str(e)}")
return 0
def analyze_competitors(row, co):
logger.info(f"Analyzing competitors for query: {row['query']}")
query = row['query']
our_url = row['page']
competitor_urls = get_serp_results(query)
results = []
# Calculate score for our page first
our_content = fetch_content(our_url)
if our_content:
our_score = calculate_relevance_score(our_content, query, co)
results.append({'url': our_url, 'relevancy_score': our_score})
logger.info(f"Our URL: {our_url}, Score: {our_score}")
else:
logger.warning(f"No content fetched for our URL: {our_url}")
# Calculate scores for competitor pages
for url in competitor_urls:
try:
logger.debug(f"Processing competitor URL: {url}")
content = fetch_content(url)
if not content:
logger.warning(f"No content fetched for competitor URL: {url}")
continue
score = calculate_relevance_score(content, query, co)
logger.info(f"Competitor URL: {url}, Score: {score}")
results.append({'url': url, 'relevancy_score': score})
except Exception as e:
logger.error(f"Error processing URL {url}: {str(e)}")
st.error(f"Error processing URL {url}: {str(e)}")
results_df = pd.DataFrame(results).sort_values('relevancy_score', ascending=False)
logger.info(f"Competitor analysis completed. {len(results)} results obtained.")
return results_df
def show_competitor_analysis(row, co):
if st.button("Check Competitors", key=f"comp_{row['page']}"):
logger.info(f"Competitor analysis requested for page: {row['page']}")
with st.spinner('Analyzing competitors...'):
results_df = analyze_competitors(row, co)
st.write("Relevancy Score Comparison:")
st.dataframe(results_df)
our_data = results_df[results_df['url'] == row['page']]
if our_data.empty:
st.error(f"Our page '{row['page']}' is not in the results. This indicates an error in fetching or processing the page.")
logger.error(f"Our page '{row['page']}' is missing from the results.")
# Additional debugging information
# st.write("Debugging Information:")
# st.json({
# "our_url": row['page'],
# "query": row['query'],
# "content_fetched": fetch_content(row['page']),
# "urls_processed": results_df['url'].tolist()
# })
else:
our_rank = our_data.index[0] + 1
total_results = len(results_df)
our_score = our_data['relevancy_score'].values[0]
logger.info(f"Our page ranks {our_rank} out of {total_results} in terms of relevancy score.")
st.write(f"Our page ('{row['page']}') ranks {our_rank} out of {total_results} in terms of relevancy score.")
st.write(f"Our relevancy score: {our_score:.4f}")
if our_score == 0:
st.warning("Our page's relevancy score is 0. This might indicate an issue with content fetching or score calculation.")
# Additional debugging information
st.write("Debugging Information:")
content = fetch_content(row['page'])
st.json({
"content_length": len(content),
"content_preview": content[:500] if content else "No content fetched",
"query": row['query']
})
elif our_rank == 1:
st.success("Your page has the highest relevancy score!")
elif our_rank <= 3:
st.info("Your page is among the top 3 most relevant results.")
elif our_rank > total_results / 2:
st.warning("Your page's relevancy score is in the lower half of the results. Consider optimizing your content.")
def analyze_competitors(row, co):
query = row['query']
our_url = row['page']
our_score = row['relevancy_score']
competitor_urls = get_serp_results(query)
results = []
for url in competitor_urls:
content = fetch_content(url)
score = calculate_relevance_score(content, query, co)
results.append({'url': url, 'relevancy_score': score})
results.append({'url': our_url, 'relevancy_score': our_score})
results_df = pd.DataFrame(results).sort_values('relevancy_score', ascending=False)
return results_df
def process_gsc_data(df):
logging.info("Processing GSC data")
df_sorted = df.sort_values(['impressions'], ascending=[False])
df_unique = df_sorted.drop_duplicates(subset='page', keep='first')
if 'relevancy_score' not in df_unique.columns:
df_unique['relevancy_score'] = 0
else:
df_unique['relevancy_score'] = df_sorted.groupby('page')['relevancy_score'].first().values
result = df_unique[['page', 'query', 'clicks', 'impressions', 'ctr', 'position', 'relevancy_score']]
logging.info("GSC data processed successfully")
return result
# -------------
# Google Authentication Functions
# -------------
def load_config():
logging.info("Loading Google client configuration")
client_config = {
"web": {
"client_id": os.environ["CLIENT_ID"],
"client_secret": os.environ["CLIENT_SECRET"],
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"redirect_uris": ["https://poemsforaphrodite-gscpro.hf.space/"],
}
}
logging.info("Google client configuration loaded")
return client_config
def init_oauth_flow(client_config):
logging.info("Initializing OAuth flow")
scopes = ["https://www.googleapis.com/auth/webmasters.readonly"]
flow = Flow.from_client_config(
client_config,
scopes=scopes,
redirect_uri=client_config["web"]["redirect_uris"][0]
)
logging.info("OAuth flow initialized")
return flow
def google_auth(client_config):
logging.info("Starting Google authentication")
flow = init_oauth_flow(client_config)
auth_url, _ = flow.authorization_url(prompt="consent")
logging.info("Google authentication URL generated")
return flow, auth_url
def auth_search_console(client_config, credentials):
#logging.info("Authenticating with Google Search Console")
token = {
"token": credentials.token,
"refresh_token": credentials.refresh_token,
"token_uri": credentials.token_uri,
"client_id": credentials.client_id,
"client_secret": credentials.client_secret,
"scopes": credentials.scopes,
"id_token": getattr(credentials, "id_token", None),
}
#logging.info("Google Search Console authenticated")
return searchconsole.authenticate(client_config=client_config, credentials=token)
# -------------
# Data Fetching Functions
# -------------
def list_gsc_properties(credentials):
logging.info("Listing GSC properties")
service = build('webmasters', 'v3', credentials=credentials)
site_list = service.sites().list().execute()
properties = [site['siteUrl'] for site in site_list.get('siteEntry', [])] or ["No properties found"]
#logging.info(f"GSC properties listed: {properties}")
return properties
def fetch_gsc_data(webproperty, search_type, start_date, end_date, dimensions, device_type=None):
#logging.info(f"Fetching GSC data for property: {webproperty}, search_type: {search_type}, date_range: {start_date} to {end_date}, dimensions: {dimensions}, device_type: {device_type}")
query = webproperty.query.range(start_date, end_date).search_type(search_type).dimension(*dimensions)
if 'device' in dimensions and device_type and device_type != 'All Devices':
query = query.filter('device', 'equals', device_type.lower())
try:
df = query.limit(MAX_ROWS).get().to_dataframe()
logging.info("GSC data fetched successfully")
return process_gsc_data(df)
except Exception as e:
logging.error(f"Error fetching GSC data: {e}")
show_error(e)
return pd.DataFrame()
def calculate_relevance_score(page_content, query, co):
logger.info(f"Calculating relevance score for query: {query}")
try:
if not page_content:
logger.warning("Empty page content. Returning score 0.")
return 0
page_embedding = co.embed(texts=[page_content], model='embed-english-v3.0', input_type='search_document').embeddings[0]
query_embedding = co.embed(texts=[query], model='embed-english-v3.0', input_type='search_query').embeddings[0]
score = cosine_similarity([query_embedding], [page_embedding])[0][0]
logger.debug(f"Relevance score calculated: {score}")
return score
except Exception as e:
logger.exception(f"Error calculating relevance score: {str(e)}")
st.error(f"Error calculating relevance score: {str(e)}")
return 0
def calculate_relevancy_scores(df, model_type):
logging.info("Calculating relevancy scores")
with st.spinner('Calculating relevancy scores...'):
try:
page_contents = [fetch_content(url) for url in df['page']]
page_embeddings = generate_embeddings(page_contents, model_type)
query_embeddings = generate_embeddings(df['query'].tolist(), model_type)
relevancy_scores = cosine_similarity(query_embeddings, page_embeddings).diagonal()
df = df.assign(relevancy_score=relevancy_scores)
logging.info("Relevancy scores calculated successfully")
except Exception as e:
logging.error(f"Error calculating relevancy scores: {e}")
st.warning(f"Error calculating relevancy scores: {e}")
df = df.assign(relevancy_score=0)
return df
# -------------
# Utility Functions
# -------------
def update_dimensions(selected_search_type):
# logging.debug(f"Updating dimensions for search type: {selected_search_type}")
return BASE_DIMENSIONS + ['device'] if selected_search_type in SEARCH_TYPES else BASE_DIMENSIONS
def calc_date_range(selection, custom_start=None, custom_end=None):
# logging.debug(f"Calculating date range for selection: {selection}")
range_map = {
'Last 7 Days': 7,
'Last 30 Days': 30,
'Last 3 Months': 90,
'Last 6 Months': 180,
'Last 12 Months': 365,
'Last 16 Months': 480
}
today = datetime.date.today()
if selection == 'Custom Range':
if custom_start and custom_end:
logging.debug(f"Custom date range: {custom_start} to {custom_end}")
return custom_start, custom_end
else:
logging.debug("Defaulting custom date range to last 7 days")
return today - datetime.timedelta(days=7), today
date_range = today - datetime.timedelta(days=range_map.get(selection, 0)), today
#logging.debug(f"Date range calculated: {date_range}")
return date_range
def show_error(e):
logging.error(f"An error occurred: {e}")
st.error(f"An error occurred: {e}")
def property_change():
#logging.info(f"Property changed to: {st.session_state['selected_property_selector']}")
st.session_state.selected_property = st.session_state['selected_property_selector']
# -------------
# File & Download Operations
# -------------
def show_dataframe(report):
logging.info("Showing dataframe preview")
with st.expander("Preview the First 100 Rows (Unique Pages with Top Query)"):
st.dataframe(report.head(DF_PREVIEW_ROWS))
def download_csv_link(report):
logging.info("Generating CSV download link")
def to_csv(df):
return df.to_csv(index=False, encoding='utf-8-sig')
csv = to_csv(report)
b64_csv = base64.b64encode(csv.encode()).decode()
href = f'Download CSV File'
st.markdown(href, unsafe_allow_html=True)
logging.info("CSV download link generated")
# -------------
# Streamlit UI Components
# -------------
def show_google_sign_in(auth_url):
logging.info("Showing Google sign-in button")
with st.sidebar:
if st.button("Sign in with Google"):
st.write('Please click the link below to sign in:')
st.markdown(f'[Google Sign-In]({auth_url})', unsafe_allow_html=True)
def show_property_selector(properties, account):
logging.info("Showing property selector")
selected_property = st.selectbox(
"Select a Search Console Property:",
properties,
index=properties.index(
st.session_state.selected_property) if st.session_state.selected_property in properties else 0,
key='selected_property_selector',
on_change=property_change
)
return account[selected_property]
def show_search_type_selector():
logging.info("Showing search type selector")
return st.selectbox(
"Select Search Type:",
SEARCH_TYPES,
index=SEARCH_TYPES.index(st.session_state.selected_search_type),
key='search_type_selector'
)
def show_model_type_selector():
logging.info("Showing model type selector")
return st.selectbox(
"Select the embedding model:",
["english", "multilingual"],
key='model_type_selector'
)
def show_tabular_data(df, co):
st.write("Data Table with Relevancy Scores and Competitor Analysis")
for index, row in df.iterrows():
with st.expander(f"Query: {row['query']} | Page: {row['page']}"):
col1, col2 = st.columns(2)
with col1:
st.write("GSC Data:")
st.write(f"URL: {row['page']}")
st.write(f"Query: {row['query']}")
st.write(f"Impressions: {row['impressions']}")
st.write(f"Clicks: {row['clicks']}")
st.write(f"Position: {row['position']}")
st.write(f"Relevancy Score: {row['relevancy_score']:.4f}")
with col2:
st.write("Competitor Analysis:")
if st.button("Analyze Competitors", key=f"comp_{index}"):
with st.spinner('Analyzing competitors...'):
results_df = analyze_competitors(row, co)
st.dataframe(results_df)
our_rank = results_df.index[results_df['url'] == row['page']].tolist()[0] + 1
st.write(f"Our page ranks {our_rank} out of {len(results_df)} in terms of relevancy score.")
def show_date_range_selector():
logging.info("Showing date range selector")
return st.selectbox(
"Select Date Range:",
DATE_RANGE_OPTIONS,
index=DATE_RANGE_OPTIONS.index(st.session_state.selected_date_range),
key='date_range_selector'
)
def show_custom_date_inputs():
logging.info("Showing custom date inputs")
st.session_state.custom_start_date = st.date_input("Start Date", st.session_state.custom_start_date)
st.session_state.custom_end_date = st.date_input("End Date", st.session_state.custom_end_date)
def show_dimensions_selector(search_type):
logging.info("Showing dimensions selector")
available_dimensions = update_dimensions(search_type)
return st.multiselect(
"Select Dimensions:",
available_dimensions,
default=st.session_state.selected_dimensions,
key='dimensions_selector'
)
def show_paginated_dataframe(report, rows_per_page=20):
logging.info("Showing paginated dataframe")
report['position'] = report['position'].astype(int)
report['impressions'] = pd.to_numeric(report['impressions'], errors='coerce')
def format_ctr(x):
try:
return f"{float(x):.2%}"
except ValueError:
return x
def format_relevancy_score(x):
try:
return f"{float(x):.2f}"
except ValueError:
return x
report['ctr'] = report['ctr'].apply(format_ctr)
report['relevancy_score'] = report['relevancy_score'].apply(format_relevancy_score)
def make_clickable(url):
return f'{url}'
report['clickable_url'] = report['page'].apply(make_clickable)
columns = ['clickable_url', 'query', 'impressions', 'clicks', 'ctr', 'position', 'relevancy_score']
report = report[columns]
sort_column = st.selectbox("Sort by:", columns[1:], index=columns[1:].index('impressions'))
sort_order = st.radio("Sort order:", ("Descending", "Ascending"))
ascending = sort_order == "Ascending"
def safe_float_convert(x):
try:
return float(x.rstrip('%')) / 100 if isinstance(x, str) and x.endswith('%') else float(x)
except ValueError:
return 0
report['ctr_numeric'] = report['ctr'].apply(safe_float_convert)
report['relevancy_score_numeric'] = report['relevancy_score'].apply(safe_float_convert)
sort_column_numeric = sort_column + '_numeric' if sort_column in ['ctr', 'relevancy_score'] else sort_column
report = report.sort_values(by=sort_column_numeric, ascending=ascending)
report = report.drop(columns=['ctr_numeric', 'relevancy_score_numeric'])
total_rows = len(report)
total_pages = (total_rows - 1) // rows_per_page + 1
if 'current_page' not in st.session_state:
st.session_state.current_page = 1
col1, col2, col3 = st.columns([1,3,1])
with col1:
if st.button("Previous", disabled=st.session_state.current_page == 1):
st.session_state.current_page -= 1
with col2:
st.write(f"Page {st.session_state.current_page} of {total_pages}")
with col3:
if st.button("Next", disabled=st.session_state.current_page == total_pages):
st.session_state.current_page += 1
start_idx = (st.session_state.current_page - 1) * rows_per_page
end_idx = start_idx + rows_per_page
st.markdown(report.iloc[start_idx:end_idx].to_html(escape=False, index=False), unsafe_allow_html=True)
# -------------
# Main Streamlit App Function
# -------------
def main():
logging.info("Starting main function")
setup_streamlit()
client_config = load_config()
if 'auth_flow' not in st.session_state or 'auth_url' not in st.session_state:
st.session_state.auth_flow, st.session_state.auth_url = google_auth(client_config)
query_params = st.query_params
auth_code = query_params.get("code", None)
if auth_code and 'credentials' not in st.session_state:
st.session_state.auth_flow.fetch_token(code=auth_code)
st.session_state.credentials = st.session_state.auth_flow.credentials
if 'credentials' not in st.session_state:
show_google_sign_in(st.session_state.auth_url)
else:
init_session_state()
account = auth_search_console(client_config, st.session_state.credentials)
properties = list_gsc_properties(st.session_state.credentials)
if properties:
webproperty = show_property_selector(properties, account)
search_type = show_search_type_selector()
date_range_selection = show_date_range_selector()
model_type = show_model_type_selector()
if date_range_selection == 'Custom Range':
show_custom_date_inputs()
start_date, end_date = st.session_state.custom_start_date, st.session_state.custom_end_date
else:
start_date, end_date = calc_date_range(date_range_selection)
selected_dimensions = show_dimensions_selector(search_type)
if 'report_data' not in st.session_state:
st.session_state.report_data = None
if st.button("Fetch Data"):
with st.spinner('Fetching data...'):
st.session_state.report_data = fetch_gsc_data(webproperty, search_type, start_date, end_date, selected_dimensions)
if st.session_state.report_data is not None and not st.session_state.report_data.empty:
st.write("Data fetched successfully. Click the button below to calculate relevancy scores.")
if st.button("Calculate Relevancy Scores"):
logger.info("Calculating relevancy scores for all rows")
st.session_state.report_data = calculate_relevancy_scores(st.session_state.report_data, model_type)
show_tabular_data(st.session_state.report_data, co)
download_csv_link(st.session_state.report_data)
elif st.session_state.report_data is not None:
logger.warning("No data found for the selected criteria.")
st.warning("No data found for the selected criteria.")
if __name__ == "__main__":
logging.info("Running main function")
main()
logger.info("Script completed")