Spaces:
Runtime error
Runtime error
import gradio as gr | |
import numpy as np | |
import pandas as pd | |
#from pandas_datareader import data as wb | |
import matplotlib.pyplot as plt | |
from scipy.stats import norm, gmean, cauchy | |
import seaborn as sns | |
from datetime import datetime | |
import os | |
import json | |
#%matplotlib inline | |
from alpha_vantage.timeseries import TimeSeries | |
import pandas as pd | |
from datetime import datetime | |
# Function to import stock data using Alpha Vantage | |
def import_stock_data_alphavantage(tickers, api_key, start='2023-1-01', end=datetime.today().strftime('%Y-%m-%d')): | |
data = pd.DataFrame() | |
ts = TimeSeries(key=api_key, output_format='pandas') # Initialize TimeSeries with your API key | |
if isinstance(tickers, str): | |
tickers = [tickers] # Convert to list if only one ticker is provided | |
for ticker in tickers: | |
# Get the stock data | |
df, meta_data = ts.get_daily_adjusted(ticker, outputsize='full') | |
# Selecting only the '5. adjusted close' column and renaming it to the ticker | |
df = df['5. adjusted close'].rename(ticker).to_frame() | |
# Filter the data based on the start and end dates | |
df = df[(df.index >= start) & (df.index <= end)] | |
# If data is empty, initialize it with the current df | |
if data.empty: | |
data = df | |
else: | |
# If not empty, join the new df with the existing data | |
data = data.join(df, how='outer') | |
return data | |
def log_returns(data): | |
""" | |
Calculate the log returns of a given dataset. | |
Parameters: | |
data (pandas.DataFrame): The dataset for which log returns are calculated. | |
Returns: | |
pandas.DataFrame: The log returns of the dataset. | |
""" | |
return (np.log(1+data.pct_change())) | |
def simple_returns(data): | |
""" | |
Calculate the simple returns of a given dataset. | |
Parameters: | |
data (pandas.Series): The dataset for which to calculate the simple returns. | |
Returns: | |
pandas.Series: The simple returns of the dataset. | |
""" | |
return ((data/data.shift(1))-1) | |
def market_data_combination(data, mark_ticker = "SPY", start='2022-1-1'): | |
api_key = os.environ.get('ALPHAVANTAGE_API_KEY') | |
market_data = import_stock_data_alphavantage(mark_ticker, api_key) | |
market_rets = log_returns(market_data).dropna() | |
ann_return = np.exp(market_rets.mean()*252).values-1 | |
data = data.merge(market_data, left_index=True, right_index=True) | |
# Add debugging statements here | |
print("Market data shape:", market_data.shape) | |
print("Number of non-NaN entries in market data:", sum(~market_data.isna().values.flatten())) | |
print("First few rows of market data:\n", market_data.head()) | |
return data, ann_return | |
def beta_sharpe(data, mark_ticker = "SPY", start='2010-1-1', riskfree = 0.025): | |
""" | |
Input: | |
1. data: dataframe of stock price data | |
2. mark_ticker: ticker of the market data you want to compute CAPM metrics with (default is ^GSPC) | |
3. start: data from which to download data (default Jan 1st 2010) | |
4. riskfree: the assumed risk free yield (US 10 Year Bond is assumed: 2.5%) | |
Output: | |
1. Dataframe with CAPM metrics computed against specified market procy | |
""" | |
# Beta | |
dd, mark_ret = market_data_combination(data, mark_ticker, start) | |
print("printing dd") | |
print(dd.head()) | |
print("printing mark_ret") | |
print(mark_ret) | |
log_ret = log_returns(dd) | |
covar = log_ret.cov()*252 | |
covar = pd.DataFrame(covar.iloc[:-1,-1]) | |
mrk_var = log_ret.iloc[:,-1].var()*252 | |
beta = covar/mrk_var | |
stdev_ret = pd.DataFrame(((log_ret.std()*250**0.5)[:-1]), columns=['STD']) | |
beta = beta.merge(stdev_ret, left_index=True, right_index=True) | |
# CAPM | |
for i, row in beta.iterrows(): | |
beta.at[i,'CAPM'] = riskfree + (row[mark_ticker] * (mark_ret-riskfree)) | |
# Sharpe | |
for i, row in beta.iterrows(): | |
beta.at[i,'Sharpe'] = ((row['CAPM']-riskfree)/(row['STD'])) | |
beta.rename(columns={"SPY":"Beta"}, inplace=True) | |
return beta | |
def drift_calc(data, return_type='log'): | |
try: | |
if return_type == 'log': | |
lr = log_returns(data) | |
elif return_type == 'simple': | |
lr = simple_returns(data) | |
u = lr.mean() | |
var = lr.var() | |
drift = u - (0.5 * var) | |
return drift.values | |
except Exception as e: | |
print(f"Error in drift_calc: {str(e)}") | |
print("Please check the input data and return type") | |
return None | |
def daily_returns(data, days, iterations, return_type='log'): | |
ft = drift_calc(data, return_type) | |
if return_type == 'log': | |
try: | |
stv = log_returns(data).std().values | |
except: | |
stv = log_returns(data).std() | |
elif return_type == 'simple': | |
try: | |
stv = simple_returns(data).std().values | |
except: | |
stv = simple_returns(data).std() | |
# Oftentimes, we find that the distribution of returns is a variation of the normal distribution where it has a fat tail | |
# This distribution is called cauchy distribution | |
dr = np.exp(ft + stv * norm.ppf(np.random.rand(days, iterations))) | |
return dr | |
def probs_find(predicted, higherthan, on = 'value'): | |
""" | |
This function calculated the probability of a stock being above a certain threshhold, which can be defined as a value (final stock price) or return rate (percentage change) | |
Input: | |
1. predicted: dataframe with all the predicted prices (days and simulations) | |
2. higherthan: specified threshhold to which compute the probability (ex. 0 on return will compute the probability of at least breakeven) | |
3. on: 'return' or 'value', the return of the stock or the final value of stock for every simulation over the time specified | |
""" | |
if on == 'return': | |
predicted0 = predicted.iloc[0,0] | |
predicted = predicted.iloc[-1] | |
predList = list(predicted) | |
over = [(i*100)/predicted0 for i in predList if ((i-predicted0)*100)/predicted0 >= higherthan] | |
less = [(i*100)/predicted0 for i in predList if ((i-predicted0)*100)/predicted0 < higherthan] | |
elif on == 'value': | |
predicted = predicted.iloc[-1] | |
predList = list(predicted) | |
over = [i for i in predList if i >= higherthan] | |
less = [i for i in predList if i < higherthan] | |
else: | |
print("'on' must be either value or return") | |
return (len(over)/(len(over)+len(less))) | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
def simulate_mc(data, days, iterations, return_type='log', plot=True): | |
# Generate daily returns | |
returns = daily_returns(data, days, iterations, return_type) | |
# Create empty matrix | |
price_list = np.zeros_like(returns) | |
# Put the last actual price in the first row of matrix. | |
price_list[0] = data.iloc[-1] | |
# Calculate the price of each day | |
for t in range(1,days): | |
price_list[t] = price_list[t-1]*returns[t] | |
# Plot Option | |
if plot == True: | |
x = pd.DataFrame(price_list).iloc[-1] | |
fig, ax = plt.subplots(1,2, figsize=(14,4)) | |
sns.distplot(x, ax=ax[0]) | |
sns.distplot(x, hist_kws={'cumulative':True},kde_kws={'cumulative':True},ax=ax[1]) | |
plt.xlabel("Stock Price") | |
plt.savefig('stock_price_distribution.png') | |
plt.show() | |
#CAPM and Sharpe Ratio | |
# Printing information about stock | |
try: | |
[print(nam) for nam in data.columns] | |
except: | |
print(data.name) | |
print(f"Days: {days-1}") | |
print(f"Expected Value: ${round(pd.DataFrame(price_list).iloc[-1].mean(),2)}") | |
print(f"Return: {round(100*(pd.DataFrame(price_list).iloc[-1].mean()-price_list[0,1])/pd.DataFrame(price_list).iloc[-1].mean(),2)}%") | |
print(f"Probability of Breakeven: {probs_find(pd.DataFrame(price_list),0, on='return')}") | |
output = { | |
"Days": days-1, | |
"Expected Value": round(pd.DataFrame(price_list).iloc[-1].mean(), 2), | |
"Return": round(100*(pd.DataFrame(price_list).iloc[-1].mean()-price_list[0,1])/pd.DataFrame(price_list).iloc[-1].mean(), 2), | |
"Probability of Breakeven": probs_find(pd.DataFrame(price_list), 0, on='return') | |
} | |
return (output) | |
# return pd.DataFrame(price_list) | |
# set Variables | |
def get_stock_data(tickers, days): | |
api_key = os.getenv('ALPHAVANTAGE_API_KEY') | |
# Placeholder - This is where you'd fetch and process the actual stock data | |
# using libraries like pandas-datareader, yfinance, etc. | |
data = import_stock_data_alphavantage(tickers, api_key) | |
print(data.head()) | |
log_return = log_returns(data) | |
print(data) | |
beta_sharpe(data) | |
drift_calc(data) | |
print(drift_calc(data)) | |
dr = daily_returns(data, 2, 3) | |
item = simulate_mc(data, days, 2000, 'log') | |
print(item) | |
return json.dumps(item) | |
# Create Gradio interface elements | |
ticker_input = gr.Textbox(lines=1, placeholder="Enter tickers separated by commas (e.g., AAPL, TSLA)") | |
days_input = gr.Number(value=180, label="Number of Days") | |
iface = gr.Interface( | |
fn=get_stock_data, | |
inputs=[ticker_input, days_input], | |
outputs="text", | |
title="Stock Data Analyzer" | |
) | |
iface.launch(share=True) | |