Spaces:
Sleeping
Sleeping
# Standard library imports | |
import datetime | |
import base64 | |
import os | |
# Related third-party imports | |
import streamlit as st | |
from google_auth_oauthlib.flow import Flow | |
from googleapiclient.discovery import build | |
from dotenv import load_dotenv | |
import pandas as pd | |
import searchconsole | |
import cohere | |
from sklearn.metrics.pairwise import cosine_similarity | |
import requests | |
from bs4 import BeautifulSoup | |
from apify_client import ApifyClient | |
import urllib.parse | |
load_dotenv() | |
# Initialize Cohere client | |
APIFY_API_TOKEN = os.environ.get('APIFY_API_TOKEN') | |
COHERE_API_KEY = os.environ["COHERE_API_KEY"] | |
co = cohere.Client(COHERE_API_KEY) | |
if not APIFY_API_TOKEN: | |
st.error("APIFY_API_TOKEN is not set in the environment variables. Please set it and restart the application.") | |
# Initialize the ApifyClient with the API token | |
client = ApifyClient(APIFY_API_TOKEN) | |
# Initialize the ApifyClient with the API token | |
# Configuration: Set to True if running locally, False if running on Streamlit Cloud | |
IS_LOCAL = False | |
# Constants | |
SEARCH_TYPES = ["web", "image", "video", "news", "discover", "googleNews"] | |
DATE_RANGE_OPTIONS = [ | |
"Last 7 Days", | |
"Last 30 Days", | |
"Last 3 Months", | |
"Last 6 Months", | |
"Last 12 Months", | |
"Last 16 Months", | |
"Custom Range" | |
] | |
DEVICE_OPTIONS = ["All Devices", "desktop", "mobile", "tablet"] | |
BASE_DIMENSIONS = ["page", "query", "country", "date"] | |
MAX_ROWS = 250_000 | |
DF_PREVIEW_ROWS = 100 | |
# ------------- | |
# Streamlit App Configuration | |
# ------------- | |
def setup_streamlit(): | |
st.set_page_config(page_title="Keyword Relevance Test", layout="wide") | |
st.title("Keyword Relevance Test Using Vector Embedding") | |
st.divider() | |
#logging.info("Streamlit app configured") | |
def init_session_state(): | |
if 'selected_property' not in st.session_state: | |
st.session_state.selected_property = None | |
if 'selected_search_type' not in st.session_state: | |
st.session_state.selected_search_type = 'web' | |
if 'selected_date_range' not in st.session_state: | |
st.session_state.selected_date_range = 'Last 7 Days' | |
if 'start_date' not in st.session_state: | |
st.session_state.start_date = datetime.date.today() - datetime.timedelta(days=7) | |
if 'end_date' not in st.session_state: | |
st.session_state.end_date = datetime.date.today() | |
if 'selected_dimensions' not in st.session_state: | |
st.session_state.selected_dimensions = ['page', 'query'] | |
if 'selected_device' not in st.session_state: | |
st.session_state.selected_device = 'All Devices' | |
if 'custom_start_date' not in st.session_state: | |
st.session_state.custom_start_date = datetime.date.today() - datetime.timedelta(days=7) | |
if 'custom_end_date' not in st.session_state: | |
st.session_state.custom_end_date = datetime.date.today() | |
#logging.info("Session state initialized") | |
# ------------- | |
# Data Processing Functions | |
# ------------- | |
def generate_embeddings(text_list, model_type): | |
#logging.debug(f"Generating embeddings for model type: {model_type}") | |
if not text_list: | |
logging.warning("Text list is empty, returning empty embeddings") | |
return [] | |
model = 'embed-english-v3.0' if model_type == 'english' else 'embed-multilingual-v3.0' | |
input_type = 'search_document' | |
response = co.embed(model=model, texts=text_list, input_type=input_type) | |
embeddings = response.embeddings | |
# logging.debug(f"Embeddings generated successfully for model type: {model_type}") | |
return embeddings | |
def get_serp_results(query): | |
if not APIFY_API_TOKEN: | |
st.error("Apify API token is not set. Unable to fetch SERP results.") | |
return [] | |
run_input = { | |
"queries": query, | |
"resultsPerPage": 5, | |
"maxPagesPerQuery": 1, | |
"languageCode": "", | |
"mobileResults": False, | |
"includeUnfilteredResults": False, | |
"saveHtml": False, | |
"saveHtmlToKeyValueStore": False, | |
"includeIcons": False, | |
} | |
try: | |
#logger.debug(f"Calling Apify Actor with input: {run_input}") | |
# Run the Actor and wait for it to finish | |
run = client.actor("nFJndFXA5zjCTuudP").call(run_input=run_input) | |
# logger.info(f"Apify Actor run completed. Run ID: {run.get('id')}") | |
# Fetch results from the run's dataset | |
#logger.debug(f"Fetching results from dataset ID: {run.get('defaultDatasetId')}") | |
results = list(client.dataset(run["defaultDatasetId"]).iterate_items()) | |
# logger.info(f"Fetched {len(results)} results from Apify dataset") | |
if results and 'organicResults' in results[0]: | |
urls = [item['url'] for item in results[0]['organicResults']] | |
# logger.info(f"Extracted {len(urls)} URLs from organic results") | |
return urls | |
else: | |
# logger.warning("No organic results found in the SERP data.") | |
st.warning("No organic results found in the SERP data.") | |
return [] | |
except Exception as e: | |
# logger.exception(f"Error fetching SERP results: {str(e)}") | |
st.error(f"Error fetching SERP results: {str(e)}") | |
return [] | |
def fetch_content(url): | |
try: | |
decoded_url = urllib.parse.unquote(url) | |
response = requests.get(decoded_url, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
content = soup.get_text(separator=' ', strip=True) | |
return content | |
except requests.RequestException: | |
return "" | |
def calculate_relevance_score(page_content, query, co): | |
# logger.info(f"Calculating relevance score for query: {query}") | |
try: | |
if not page_content: | |
# logger.warning("Empty page content. Returning score 0.") | |
return 0 | |
page_embedding = co.embed(texts=[page_content], model='embed-english-v3.0', input_type='search_document').embeddings[0] | |
query_embedding = co.embed(texts=[query], model='embed-english-v3.0', input_type='search_query').embeddings[0] | |
score = cosine_similarity([query_embedding], [page_embedding])[0][0] | |
# logger.debug(f"Relevance score calculated: {score}") | |
return score | |
except Exception as e: | |
# logger.exception(f"Error calculating relevance score: {str(e)}") | |
st.error(f"Error calculating relevance score: {str(e)}") | |
return 0 | |
def analyze_competitors(row, co, custom_url=None): | |
# logger.info(f"Analyzing competitors for query: {row['query']}") | |
query = row['query'] | |
our_url = row['page'] | |
competitor_urls = get_serp_results(query) | |
competitor_urls = [url for url in competitor_urls if not url.startswith('/search')][:5] # Get top 5 valid competitors | |
if custom_url and custom_url not in competitor_urls: | |
competitor_urls.append(custom_url) | |
results = [] | |
for url in competitor_urls: | |
content = fetch_content(url) | |
score = calculate_relevance_score(content, query, co) | |
results.append({'url': url, 'relevancy_score': score}) | |
results.append({'url': our_url, 'relevancy_score': row['relevancy_score']}) | |
results_df = pd.DataFrame(results).sort_values('relevancy_score', ascending=False) | |
return results_df | |
def show_competitor_analysis(row, co): | |
if st.button("Check Competitors", key=f"comp_{row['page']}"): | |
# logger.info(f"Competitor analysis requested for page: {row['page']}") | |
with st.spinner('Analyzing competitors...'): | |
results_df = analyze_competitors(row, co) | |
st.write("Relevancy Score Comparison:") | |
st.dataframe(results_df) | |
our_data = results_df[results_df['url'] == row['page']] | |
if our_data.empty: | |
st.error(f"Our page '{row['page']}' is not in the results. This indicates an error in fetching or processing the page.") | |
# logger.error(f"Our page '{row['page']}' is missing from the results.") | |
# Additional debugging information | |
# st.write("Debugging Information:") | |
# st.json({ | |
# "our_url": row['page'], | |
# "query": row['query'], | |
# "content_fetched": fetch_content(row['page']), | |
# "urls_processed": results_df['url'].tolist() | |
# }) | |
else: | |
our_rank = our_data.index[0] + 1 | |
total_results = len(results_df) | |
our_score = our_data['relevancy_score'].values[0] | |
# logger.info(f"Our page ranks {our_rank} out of {total_results} in terms of relevancy score.") | |
st.write(f"Our page ('{row['page']}') ranks {our_rank} out of {total_results} in terms of relevancy score.") | |
st.write(f"Our relevancy score: {our_score:.4f}") | |
if our_score == 0: | |
st.warning("Our page's relevancy score is 0. This might indicate an issue with content fetching or score calculation.") | |
# Additional debugging information | |
# st.write("Debugging Information:") | |
# content = fetch_content(row['page']) | |
# st.json({ | |
# "content_length": len(content), | |
# "content_preview": content[:500] if content else "No content fetched", | |
# "query": row['query'] | |
# }) | |
elif our_rank == 1: | |
st.success("Your page has the highest relevancy score!") | |
elif our_rank <= 3: | |
st.info("Your page is among the top 3 most relevant results.") | |
elif our_rank > total_results / 2: | |
st.warning("Your page's relevancy score is in the lower half of the results. Consider optimizing your content.") | |
def process_gsc_data(df): | |
#logging.info("Processing GSC data") | |
df_sorted = df.sort_values(['impressions'], ascending=[False]) | |
df_unique = df_sorted.drop_duplicates(subset='page', keep='first') | |
if 'relevancy_score' not in df_unique.columns: | |
df_unique['relevancy_score'] = 0 | |
else: | |
df_unique['relevancy_score'] = df_sorted.groupby('page')['relevancy_score'].first().values | |
result = df_unique[['page', 'query', 'clicks', 'impressions', 'ctr', 'position', 'relevancy_score']] | |
#logging.info("GSC data processed successfully") | |
return result | |
# ------------- | |
# Google Authentication Functions | |
# ------------- | |
def load_config(): | |
#logging.info("Loading Google client configuration") | |
client_config = { | |
"web": { | |
"client_id": os.environ["CLIENT_ID"], | |
"client_secret": os.environ["CLIENT_SECRET"], | |
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
"token_uri": "https://oauth2.googleapis.com/token", | |
"redirect_uris": ["https://poemsforaphrodite-gscpro.hf.space/"], | |
} | |
} | |
#logging.info("Google client configuration loaded") | |
return client_config | |
def init_oauth_flow(client_config): | |
#logging.info("Initializing OAuth flow") | |
scopes = ["https://www.googleapis.com/auth/webmasters.readonly"] | |
flow = Flow.from_client_config( | |
client_config, | |
scopes=scopes, | |
redirect_uri=client_config["web"]["redirect_uris"][0] | |
) | |
#logging.info("OAuth flow initialized") | |
return flow | |
def google_auth(client_config): | |
# logging.info("Starting Google authentication") | |
flow = init_oauth_flow(client_config) | |
auth_url, _ = flow.authorization_url(prompt="consent") | |
#logging.info("Google authentication URL generated") | |
return flow, auth_url | |
def auth_search_console(client_config, credentials): | |
#logging.info("Authenticating with Google Search Console") | |
token = { | |
"token": credentials.token, | |
"refresh_token": credentials.refresh_token, | |
"token_uri": credentials.token_uri, | |
"client_id": credentials.client_id, | |
"client_secret": credentials.client_secret, | |
"scopes": credentials.scopes, | |
"id_token": getattr(credentials, "id_token", None), | |
} | |
#logging.info("Google Search Console authenticated") | |
return searchconsole.authenticate(client_config=client_config, credentials=token) | |
# ------------- | |
# Data Fetching Functions | |
# ------------- | |
def list_gsc_properties(credentials): | |
# logging.info("Listing GSC properties") | |
service = build('webmasters', 'v3', credentials=credentials) | |
site_list = service.sites().list().execute() | |
properties = [site['siteUrl'] for site in site_list.get('siteEntry', [])] or ["No properties found"] | |
#logging.info(f"GSC properties listed: {properties}") | |
return properties | |
def fetch_gsc_data(webproperty, search_type, start_date, end_date, dimensions, device_type=None): | |
#logging.info(f"Fetching GSC data for property: {webproperty}, search_type: {search_type}, date_range: {start_date} to {end_date}, dimensions: {dimensions}, device_type: {device_type}") | |
query = webproperty.query.range(start_date, end_date).search_type(search_type).dimension(*dimensions) | |
if 'device' in dimensions and device_type and device_type != 'All Devices': | |
query = query.filter('device', 'equals', device_type.lower()) | |
try: | |
df = query.limit(MAX_ROWS).get().to_dataframe() | |
#logging.info("GSC data fetched successfully") | |
return process_gsc_data(df) | |
except Exception as e: | |
#logging.error(f"Error fetching GSC data: {e}") | |
show_error(e) | |
return pd.DataFrame() | |
def calculate_relevancy_scores(df, model_type): | |
#logging.info("Calculating relevancy scores") | |
with st.spinner('Calculating relevancy scores...'): | |
try: | |
page_contents = [fetch_content(url) for url in df['page']] | |
page_embeddings = generate_embeddings(page_contents, model_type) | |
query_embeddings = generate_embeddings(df['query'].tolist(), model_type) | |
relevancy_scores = cosine_similarity(query_embeddings, page_embeddings).diagonal() | |
df = df.assign(relevancy_score=relevancy_scores) | |
#logging.info("Relevancy scores calculated successfully") | |
except Exception as e: | |
#logging.error(f"Error calculating relevancy scores: {e}") | |
st.warning(f"Error calculating relevancy scores: {e}") | |
df = df.assign(relevancy_score=0) | |
return df | |
# ------------- | |
# Utility Functions | |
# ------------- | |
def update_dimensions(selected_search_type): | |
# logging.debug(f"Updating dimensions for search type: {selected_search_type}") | |
return BASE_DIMENSIONS + ['device'] if selected_search_type in SEARCH_TYPES else BASE_DIMENSIONS | |
def calc_date_range(selection, custom_start=None, custom_end=None): | |
# logging.debug(f"Calculating date range for selection: {selection}") | |
range_map = { | |
'Last 7 Days': 7, | |
'Last 30 Days': 30, | |
'Last 3 Months': 90, | |
'Last 6 Months': 180, | |
'Last 12 Months': 365, | |
'Last 16 Months': 480 | |
} | |
today = datetime.date.today() | |
if selection == 'Custom Range': | |
if custom_start and custom_end: | |
#logging.debug(f"Custom date range: {custom_start} to {custom_end}") | |
return custom_start, custom_end | |
else: | |
#logging.debug("Defaulting custom date range to last 7 days") | |
return today - datetime.timedelta(days=7), today | |
date_range = today - datetime.timedelta(days=range_map.get(selection, 0)), today | |
#logging.debug(f"Date range calculated: {date_range}") | |
return date_range | |
def show_error(e): | |
#logging.error(f"An error occurred: {e}") | |
st.error(f"An error occurred: {e}") | |
def property_change(): | |
#logging.info(f"Property changed to: {st.session_state['selected_property_selector']}") | |
st.session_state.selected_property = st.session_state['selected_property_selector'] | |
# ------------- | |
# File & Download Operations | |
# ------------- | |
def show_dataframe(report): | |
#logging.info("Showing dataframe preview") | |
with st.expander("Preview the First 100 Rows (Unique Pages with Top Query)"): | |
st.dataframe(report.head(DF_PREVIEW_ROWS)) | |
def download_csv_link(report): | |
#logging.info("Generating CSV download link") | |
def to_csv(df): | |
return df.to_csv(index=False, encoding='utf-8-sig') | |
csv = to_csv(report) | |
b64_csv = base64.b64encode(csv.encode()).decode() | |
href = f'<a href="data:file/csv;base64,{b64_csv}" download="search_console_data.csv">Download CSV File</a>' | |
st.markdown(href, unsafe_allow_html=True) | |
#logging.info("CSV download link generated") | |
# ------------- | |
# Streamlit UI Components | |
# ------------- | |
def show_google_sign_in(auth_url): | |
# logging.info("Showing Google sign-in button") | |
with st.sidebar: | |
if st.button("Sign in with Google"): | |
st.write('Please click the link below to sign in:') | |
st.markdown(f'[Google Sign-In]({auth_url})', unsafe_allow_html=True) | |
def show_property_selector(properties, account): | |
# logging.info("Showing property selector") | |
selected_property = st.selectbox( | |
"Select a Search Console Property:", | |
properties, | |
index=properties.index( | |
st.session_state.selected_property) if st.session_state.selected_property in properties else 0, | |
key='selected_property_selector', | |
on_change=property_change | |
) | |
return account[selected_property] | |
def show_search_type_selector(): | |
# logging.info("Showing search type selector") | |
return st.selectbox( | |
"Select Search Type:", | |
SEARCH_TYPES, | |
index=SEARCH_TYPES.index(st.session_state.selected_search_type), | |
key='search_type_selector' | |
) | |
def show_model_type_selector(): | |
# logging.info("Showing model type selector") | |
return st.selectbox( | |
"Select the embedding model:", | |
["english", "multilingual"], | |
key='model_type_selector' | |
) | |
def calculate_single_relevancy(row): | |
page_content = fetch_content(row['page']) | |
query = row['query'] | |
score = calculate_relevance_score(page_content, query, co) | |
return score | |
def show_tabular_data(df, co): | |
st.write("Data Table with Relevancy Scores") | |
# Pagination | |
rows_per_page = 10 | |
total_rows = len(df) | |
total_pages = (total_rows - 1) // rows_per_page + 1 | |
if 'current_page' not in st.session_state: | |
st.session_state.current_page = 1 | |
# Pagination controls | |
col1, col2, col3 = st.columns([1,3,1]) | |
with col1: | |
if st.button("< Prev", disabled=st.session_state.current_page == 1): | |
st.session_state.current_page -= 1 | |
with col2: | |
st.write(f"Page {st.session_state.current_page} of {total_pages}") | |
with col3: | |
if st.button("Next >", disabled=st.session_state.current_page == total_pages): | |
st.session_state.current_page += 1 | |
start_idx = (st.session_state.current_page - 1) * rows_per_page | |
end_idx = start_idx + rows_per_page | |
# Initialize or update selected_rows in session state | |
if 'selected_rows' not in st.session_state or len(st.session_state.selected_rows) != len(df): | |
st.session_state.selected_rows = [False] * len(df) | |
# Add a "Calculate Relevancy" button at the top | |
if st.button("Calculate Relevancy for Selected"): | |
selected_indices = [i for i, selected in enumerate(st.session_state.selected_rows) if selected] | |
with st.spinner('Calculating relevancy scores...'): | |
for index in selected_indices: | |
if pd.isna(df.iloc[index]['relevancy_score']) or df.iloc[index]['relevancy_score'] == 0: | |
df.iloc[index, df.columns.get_loc('relevancy_score')] = calculate_single_relevancy(df.iloc[index]) | |
st.success(f"Calculated relevancy scores for {len(selected_indices)} selected rows.") | |
st.experimental_rerun() | |
# Display column headers | |
cols = st.columns([0.5, 3, 2, 1, 1, 1, 1, 1, 1]) | |
headers = ['Select', 'Page', 'Query', 'Clicks', 'Impressions', 'CTR', 'Position', 'Relevancy Score', 'Competitors'] | |
for col, header in zip(cols, headers): | |
col.write(f"**{header}**") | |
# Display each row | |
for i, row in enumerate(df.iloc[start_idx:end_idx].itertuples(), start=start_idx): | |
cols = st.columns([0.5, 3, 2, 1, 1, 1, 1, 1, 1]) | |
# Checkbox for row selection | |
cols[0].checkbox("", key=f"select_{i}", value=st.session_state.selected_rows[i], | |
on_change=lambda idx=i: setattr(st.session_state, 'selected_rows', | |
[True if j == idx else x for j, x in enumerate(st.session_state.selected_rows)])) | |
# Truncate and make the URL clickable | |
truncated_url = row.page[:30] + '...' if len(row.page) > 30 else row.page | |
cols[1].markdown(f"[{truncated_url}]({row.page})") | |
cols[2].write(row.query) | |
cols[3].write(row.clicks) | |
cols[4].write(row.impressions) | |
cols[5].write(f"{row.ctr:.2%}") | |
cols[6].write(f"{row.position:.1f}") | |
cols[7].write(f"{row.relevancy_score:.4f}" if not pd.isna(row.relevancy_score) and row.relevancy_score != 0 else "N/A") | |
# Competitors column | |
competitor_button = cols[8].button("Show", key=f"comp_{i}", disabled=pd.isna(row.relevancy_score) or row.relevancy_score == 0) | |
if competitor_button: | |
st.write(f"Competitor Analysis for: {row.query}") | |
with st.spinner('Analyzing competitors...'): | |
results_df = analyze_competitors(row._asdict(), co) | |
# Sort the results by relevancy score in descending order | |
results_df = results_df.sort_values('relevancy_score', ascending=False).reset_index(drop=True) | |
# Find our page's rank | |
our_rank = results_df.index[results_df['url'] == row.page].tolist() | |
if our_rank: | |
our_rank = our_rank[0] + 1 # Adding 1 because index starts at 0 | |
total_results = len(results_df) | |
our_score = results_df.loc[results_df['url'] == row.page, 'relevancy_score'].values[0] | |
st.dataframe(results_df) | |
st.write(f"Our page ranks {our_rank} out of {total_results} in terms of relevancy score.") | |
st.write(f"Our relevancy score: {our_score:.4f}") | |
if our_rank == 1: | |
st.success("Your page has the highest relevancy score!") | |
elif our_rank <= 3: | |
st.info("Your page is among the top 3 most relevant results.") | |
elif our_rank > total_results / 2: | |
st.warning("Your page's relevancy score is in the lower half of the results. Consider optimizing your content.") | |
else: | |
st.error(f"Our page '{row.page}' is not in the results. This indicates an error in fetching or processing the page.") | |
return df # Return the updated dataframe | |
def show_date_range_selector(): | |
# logging.info("Showing date range selector") | |
return st.selectbox( | |
"Select Date Range:", | |
DATE_RANGE_OPTIONS, | |
index=DATE_RANGE_OPTIONS.index(st.session_state.selected_date_range), | |
key='date_range_selector' | |
) | |
def show_custom_date_inputs(): | |
# logging.info("Showing custom date inputs") | |
st.session_state.custom_start_date = st.date_input("Start Date", st.session_state.custom_start_date) | |
st.session_state.custom_end_date = st.date_input("End Date", st.session_state.custom_end_date) | |
def show_dimensions_selector(search_type): | |
# logging.info("Showing dimensions selector") | |
available_dimensions = update_dimensions(search_type) | |
return st.multiselect( | |
"Select Dimensions:", | |
available_dimensions, | |
default=st.session_state.selected_dimensions, | |
key='dimensions_selector' | |
) | |
def show_paginated_dataframe(report, rows_per_page=20): | |
# logging.info("Showing paginated dataframe") | |
report['position'] = report['position'].astype(int) | |
report['impressions'] = pd.to_numeric(report['impressions'], errors='coerce') | |
def format_ctr(x): | |
try: | |
return f"{float(x):.2%}" | |
except ValueError: | |
return x | |
def format_relevancy_score(x): | |
try: | |
return f"{float(x):.2f}" | |
except ValueError: | |
return x | |
report['ctr'] = report['ctr'].apply(format_ctr) | |
report['relevancy_score'] = report['relevancy_score'].apply(format_relevancy_score) | |
def make_clickable(url): | |
return f'<a href="{url}" target="_blank">{url}</a>' | |
report['clickable_url'] = report['page'].apply(make_clickable) | |
columns = ['clickable_url', 'query', 'impressions', 'clicks', 'ctr', 'position', 'relevancy_score'] | |
report = report[columns] | |
sort_column = st.selectbox("Sort by:", columns[1:], index=columns[1:].index('impressions')) | |
sort_order = st.radio("Sort order:", ("Descending", "Ascending")) | |
ascending = sort_order == "Ascending" | |
def safe_float_convert(x): | |
try: | |
return float(x.rstrip('%')) / 100 if isinstance(x, str) and x.endswith('%') else float(x) | |
except ValueError: | |
return 0 | |
report['ctr_numeric'] = report['ctr'].apply(safe_float_convert) | |
report['relevancy_score_numeric'] = report['relevancy_score'].apply(safe_float_convert) | |
sort_column_numeric = sort_column + '_numeric' if sort_column in ['ctr', 'relevancy_score'] else sort_column | |
report = report.sort_values(by=sort_column_numeric, ascending=ascending) | |
report = report.drop(columns=['ctr_numeric', 'relevancy_score_numeric']) | |
total_rows = len(report) | |
total_pages = (total_rows - 1) // rows_per_page + 1 | |
if 'current_page' not in st.session_state: | |
st.session_state.current_page = 1 | |
col1, col2, col3 = st.columns([1,3,1]) | |
with col1: | |
if st.button("Previous", disabled=st.session_state.current_page == 1): | |
st.session_state.current_page -= 1 | |
with col2: | |
st.write(f"Page {st.session_state.current_page} of {total_pages}") | |
with col3: | |
if st.button("Next", disabled=st.session_state.current_page == total_pages): | |
st.session_state.current_page += 1 | |
start_idx = (st.session_state.current_page - 1) * rows_per_page | |
end_idx = start_idx + rows_per_page | |
st.markdown(report.iloc[start_idx:end_idx].to_html(escape=False, index=False), unsafe_allow_html=True) | |
# ------------- | |
# Main Streamlit App Function | |
# ------------- | |
def main(): | |
# logging.info("Starting main function") | |
setup_streamlit() | |
print("hello") | |
client_config = load_config() | |
if 'auth_flow' not in st.session_state or 'auth_url' not in st.session_state: | |
st.session_state.auth_flow, st.session_state.auth_url = google_auth(client_config) | |
query_params = st.query_params | |
auth_code = query_params.get("code", None) | |
if auth_code and 'credentials' not in st.session_state: | |
st.session_state.auth_flow.fetch_token(code=auth_code) | |
st.session_state.credentials = st.session_state.auth_flow.credentials | |
if 'credentials' not in st.session_state: | |
show_google_sign_in(st.session_state.auth_url) | |
else: | |
init_session_state() | |
account = auth_search_console(client_config, st.session_state.credentials) | |
properties = list_gsc_properties(st.session_state.credentials) | |
if properties: | |
webproperty = show_property_selector(properties, account) | |
search_type = show_search_type_selector() | |
date_range_selection = show_date_range_selector() | |
model_type = show_model_type_selector() | |
if date_range_selection == 'Custom Range': | |
show_custom_date_inputs() | |
start_date, end_date = st.session_state.custom_start_date, st.session_state.custom_end_date | |
else: | |
start_date, end_date = calc_date_range(date_range_selection) | |
selected_dimensions = show_dimensions_selector(search_type) | |
if 'report_data' not in st.session_state: | |
st.session_state.report_data = None | |
if st.button("Fetch Data"): | |
with st.spinner('Fetching data...'): | |
st.session_state.report_data = fetch_gsc_data(webproperty, search_type, start_date, end_date, selected_dimensions) | |
if st.session_state.report_data is not None and not st.session_state.report_data.empty: | |
st.write("Data fetched successfully.") | |
st.session_state.report_data = show_tabular_data(st.session_state.report_data, co) | |
download_csv_link(st.session_state.report_data) | |
elif st.session_state.report_data is not None: | |
# logger.warning("No data found for the selected criteria.") | |
st.warning("No data found for the selected criteria.") | |
if __name__ == "__main__": | |
# logging.info("Running main function") | |
main() | |
#logger.info("Script completed") |