tosin2013's picture
Update app.py
7a28f2a verified
import gradio as gr
import numpy as np
import pandas as pd
#from pandas_datareader import data as wb
import matplotlib.pyplot as plt
from scipy.stats import norm, gmean, cauchy
import seaborn as sns
from datetime import datetime
import os
import json
#%matplotlib inline
from alpha_vantage.timeseries import TimeSeries
import pandas as pd
from datetime import datetime
# Function to import stock data using Alpha Vantage
def import_stock_data_alphavantage(tickers, api_key, start='2023-1-01', end=datetime.today().strftime('%Y-%m-%d')):
data = pd.DataFrame()
ts = TimeSeries(key=api_key, output_format='pandas') # Initialize TimeSeries with your API key
if isinstance(tickers, str):
tickers = [tickers] # Convert to list if only one ticker is provided
for ticker in tickers:
# Get the stock data
df, meta_data = ts.get_daily_adjusted(ticker, outputsize='full')
# Selecting only the '5. adjusted close' column and renaming it to the ticker
df = df['5. adjusted close'].rename(ticker).to_frame()
# Filter the data based on the start and end dates
df = df[(df.index >= start) & (df.index <= end)]
# If data is empty, initialize it with the current df
if data.empty:
data = df
else:
# If not empty, join the new df with the existing data
data = data.join(df, how='outer')
return data
def log_returns(data):
"""
Calculate the log returns of a given dataset.
Parameters:
data (pandas.DataFrame): The dataset for which log returns are calculated.
Returns:
pandas.DataFrame: The log returns of the dataset.
"""
return (np.log(1+data.pct_change()))
def simple_returns(data):
"""
Calculate the simple returns of a given dataset.
Parameters:
data (pandas.Series): The dataset for which to calculate the simple returns.
Returns:
pandas.Series: The simple returns of the dataset.
"""
return ((data/data.shift(1))-1)
def market_data_combination(data, mark_ticker = "SPY", start='2022-1-1'):
api_key = os.environ.get('ALPHAVANTAGE_API_KEY')
market_data = import_stock_data_alphavantage(mark_ticker, api_key)
market_rets = log_returns(market_data).dropna()
ann_return = np.exp(market_rets.mean()*252).values-1
data = data.merge(market_data, left_index=True, right_index=True)
# Add debugging statements here
print("Market data shape:", market_data.shape)
print("Number of non-NaN entries in market data:", sum(~market_data.isna().values.flatten()))
print("First few rows of market data:\n", market_data.head())
return data, ann_return
def beta_sharpe(data, mark_ticker = "SPY", start='2010-1-1', riskfree = 0.025):
"""
Input:
1. data: dataframe of stock price data
2. mark_ticker: ticker of the market data you want to compute CAPM metrics with (default is ^GSPC)
3. start: data from which to download data (default Jan 1st 2010)
4. riskfree: the assumed risk free yield (US 10 Year Bond is assumed: 2.5%)
Output:
1. Dataframe with CAPM metrics computed against specified market procy
"""
# Beta
dd, mark_ret = market_data_combination(data, mark_ticker, start)
print("printing dd")
print(dd.head())
print("printing mark_ret")
print(mark_ret)
log_ret = log_returns(dd)
covar = log_ret.cov()*252
covar = pd.DataFrame(covar.iloc[:-1,-1])
mrk_var = log_ret.iloc[:,-1].var()*252
beta = covar/mrk_var
stdev_ret = pd.DataFrame(((log_ret.std()*250**0.5)[:-1]), columns=['STD'])
beta = beta.merge(stdev_ret, left_index=True, right_index=True)
# CAPM
for i, row in beta.iterrows():
beta.at[i,'CAPM'] = riskfree + (row[mark_ticker] * (mark_ret-riskfree))
# Sharpe
for i, row in beta.iterrows():
beta.at[i,'Sharpe'] = ((row['CAPM']-riskfree)/(row['STD']))
beta.rename(columns={"SPY":"Beta"}, inplace=True)
return beta
def drift_calc(data, return_type='log'):
try:
if return_type == 'log':
lr = log_returns(data)
elif return_type == 'simple':
lr = simple_returns(data)
u = lr.mean()
var = lr.var()
drift = u - (0.5 * var)
return drift.values
except Exception as e:
print(f"Error in drift_calc: {str(e)}")
print("Please check the input data and return type")
return None
def daily_returns(data, days, iterations, return_type='log'):
ft = drift_calc(data, return_type)
if return_type == 'log':
try:
stv = log_returns(data).std().values
except:
stv = log_returns(data).std()
elif return_type == 'simple':
try:
stv = simple_returns(data).std().values
except:
stv = simple_returns(data).std()
# Oftentimes, we find that the distribution of returns is a variation of the normal distribution where it has a fat tail
# This distribution is called cauchy distribution
dr = np.exp(ft + stv * norm.ppf(np.random.rand(days, iterations)))
return dr
def probs_find(predicted, higherthan, on = 'value'):
"""
This function calculated the probability of a stock being above a certain threshhold, which can be defined as a value (final stock price) or return rate (percentage change)
Input:
1. predicted: dataframe with all the predicted prices (days and simulations)
2. higherthan: specified threshhold to which compute the probability (ex. 0 on return will compute the probability of at least breakeven)
3. on: 'return' or 'value', the return of the stock or the final value of stock for every simulation over the time specified
"""
if on == 'return':
predicted0 = predicted.iloc[0,0]
predicted = predicted.iloc[-1]
predList = list(predicted)
over = [(i*100)/predicted0 for i in predList if ((i-predicted0)*100)/predicted0 >= higherthan]
less = [(i*100)/predicted0 for i in predList if ((i-predicted0)*100)/predicted0 < higherthan]
elif on == 'value':
predicted = predicted.iloc[-1]
predList = list(predicted)
over = [i for i in predList if i >= higherthan]
less = [i for i in predList if i < higherthan]
else:
print("'on' must be either value or return")
return (len(over)/(len(over)+len(less)))
import matplotlib.pyplot as plt
import seaborn as sns
def simulate_mc(data, days, iterations, return_type='log', plot=True):
# Generate daily returns
returns = daily_returns(data, days, iterations, return_type)
# Create empty matrix
price_list = np.zeros_like(returns)
# Put the last actual price in the first row of matrix.
price_list[0] = data.iloc[-1]
# Calculate the price of each day
for t in range(1,days):
price_list[t] = price_list[t-1]*returns[t]
# Plot Option
if plot == True:
x = pd.DataFrame(price_list).iloc[-1]
fig, ax = plt.subplots(1,2, figsize=(14,4))
sns.distplot(x, ax=ax[0])
sns.distplot(x, hist_kws={'cumulative':True},kde_kws={'cumulative':True},ax=ax[1])
plt.xlabel("Stock Price")
plt.savefig('stock_price_distribution.png')
plt.show()
#CAPM and Sharpe Ratio
# Printing information about stock
try:
[print(nam) for nam in data.columns]
except:
print(data.name)
print(f"Days: {days-1}")
print(f"Expected Value: ${round(pd.DataFrame(price_list).iloc[-1].mean(),2)}")
print(f"Return: {round(100*(pd.DataFrame(price_list).iloc[-1].mean()-price_list[0,1])/pd.DataFrame(price_list).iloc[-1].mean(),2)}%")
print(f"Probability of Breakeven: {probs_find(pd.DataFrame(price_list),0, on='return')}")
output = {
"Days": days-1,
"Expected Value": round(pd.DataFrame(price_list).iloc[-1].mean(), 2),
"Return": round(100*(pd.DataFrame(price_list).iloc[-1].mean()-price_list[0,1])/pd.DataFrame(price_list).iloc[-1].mean(), 2),
"Probability of Breakeven": probs_find(pd.DataFrame(price_list), 0, on='return')
}
return (output)
# return pd.DataFrame(price_list)
# set Variables
def get_stock_data(tickers, days):
api_key = os.getenv('ALPHAVANTAGE_API_KEY')
# Placeholder - This is where you'd fetch and process the actual stock data
# using libraries like pandas-datareader, yfinance, etc.
data = import_stock_data_alphavantage(tickers, api_key)
print(data.head())
log_return = log_returns(data)
print(data)
beta_sharpe(data)
drift_calc(data)
print(drift_calc(data))
dr = daily_returns(data, 2, 3)
item = simulate_mc(data, days, 2000, 'log')
print(item)
return json.dumps(item)
# Create Gradio interface elements
ticker_input = gr.Textbox(lines=1, placeholder="Enter tickers separated by commas (e.g., AAPL, TSLA)")
days_input = gr.Number(value=180, label="Number of Days")
iface = gr.Interface(
fn=get_stock_data,
inputs=[ticker_input, days_input],
outputs="text",
title="Stock Data Analyzer"
)
iface.launch(share=True)