import gradio as gr import numpy as np import pandas as pd #from pandas_datareader import data as wb import matplotlib.pyplot as plt from scipy.stats import norm, gmean, cauchy import seaborn as sns from datetime import datetime import os import json #%matplotlib inline from alpha_vantage.timeseries import TimeSeries import pandas as pd from datetime import datetime # Function to import stock data using Alpha Vantage def import_stock_data_alphavantage(tickers, api_key, start='2023-1-01', end=datetime.today().strftime('%Y-%m-%d')): data = pd.DataFrame() ts = TimeSeries(key=api_key, output_format='pandas') # Initialize TimeSeries with your API key if isinstance(tickers, str): tickers = [tickers] # Convert to list if only one ticker is provided for ticker in tickers: # Get the stock data df, meta_data = ts.get_daily_adjusted(ticker, outputsize='full') # Selecting only the '5. adjusted close' column and renaming it to the ticker df = df['5. adjusted close'].rename(ticker).to_frame() # Filter the data based on the start and end dates df = df[(df.index >= start) & (df.index <= end)] # If data is empty, initialize it with the current df if data.empty: data = df else: # If not empty, join the new df with the existing data data = data.join(df, how='outer') return data def log_returns(data): """ Calculate the log returns of a given dataset. Parameters: data (pandas.DataFrame): The dataset for which log returns are calculated. Returns: pandas.DataFrame: The log returns of the dataset. """ return (np.log(1+data.pct_change())) def simple_returns(data): """ Calculate the simple returns of a given dataset. Parameters: data (pandas.Series): The dataset for which to calculate the simple returns. Returns: pandas.Series: The simple returns of the dataset. """ return ((data/data.shift(1))-1) def market_data_combination(data, mark_ticker = "SPY", start='2022-1-1'): api_key = os.environ.get('ALPHAVANTAGE_API_KEY') market_data = import_stock_data_alphavantage(mark_ticker, api_key) market_rets = log_returns(market_data).dropna() ann_return = np.exp(market_rets.mean()*252).values-1 data = data.merge(market_data, left_index=True, right_index=True) # Add debugging statements here print("Market data shape:", market_data.shape) print("Number of non-NaN entries in market data:", sum(~market_data.isna().values.flatten())) print("First few rows of market data:\n", market_data.head()) return data, ann_return def beta_sharpe(data, mark_ticker = "SPY", start='2010-1-1', riskfree = 0.025): """ Input: 1. data: dataframe of stock price data 2. mark_ticker: ticker of the market data you want to compute CAPM metrics with (default is ^GSPC) 3. start: data from which to download data (default Jan 1st 2010) 4. riskfree: the assumed risk free yield (US 10 Year Bond is assumed: 2.5%) Output: 1. Dataframe with CAPM metrics computed against specified market procy """ # Beta dd, mark_ret = market_data_combination(data, mark_ticker, start) print("printing dd") print(dd.head()) print("printing mark_ret") print(mark_ret) log_ret = log_returns(dd) covar = log_ret.cov()*252 covar = pd.DataFrame(covar.iloc[:-1,-1]) mrk_var = log_ret.iloc[:,-1].var()*252 beta = covar/mrk_var stdev_ret = pd.DataFrame(((log_ret.std()*250**0.5)[:-1]), columns=['STD']) beta = beta.merge(stdev_ret, left_index=True, right_index=True) # CAPM for i, row in beta.iterrows(): beta.at[i,'CAPM'] = riskfree + (row[mark_ticker] * (mark_ret-riskfree)) # Sharpe for i, row in beta.iterrows(): beta.at[i,'Sharpe'] = ((row['CAPM']-riskfree)/(row['STD'])) beta.rename(columns={"SPY":"Beta"}, inplace=True) return beta def drift_calc(data, return_type='log'): try: if return_type == 'log': lr = log_returns(data) elif return_type == 'simple': lr = simple_returns(data) u = lr.mean() var = lr.var() drift = u - (0.5 * var) return drift.values except Exception as e: print(f"Error in drift_calc: {str(e)}") print("Please check the input data and return type") return None def daily_returns(data, days, iterations, return_type='log'): ft = drift_calc(data, return_type) if return_type == 'log': try: stv = log_returns(data).std().values except: stv = log_returns(data).std() elif return_type == 'simple': try: stv = simple_returns(data).std().values except: stv = simple_returns(data).std() # Oftentimes, we find that the distribution of returns is a variation of the normal distribution where it has a fat tail # This distribution is called cauchy distribution dr = np.exp(ft + stv * norm.ppf(np.random.rand(days, iterations))) return dr def probs_find(predicted, higherthan, on = 'value'): """ This function calculated the probability of a stock being above a certain threshhold, which can be defined as a value (final stock price) or return rate (percentage change) Input: 1. predicted: dataframe with all the predicted prices (days and simulations) 2. higherthan: specified threshhold to which compute the probability (ex. 0 on return will compute the probability of at least breakeven) 3. on: 'return' or 'value', the return of the stock or the final value of stock for every simulation over the time specified """ if on == 'return': predicted0 = predicted.iloc[0,0] predicted = predicted.iloc[-1] predList = list(predicted) over = [(i*100)/predicted0 for i in predList if ((i-predicted0)*100)/predicted0 >= higherthan] less = [(i*100)/predicted0 for i in predList if ((i-predicted0)*100)/predicted0 < higherthan] elif on == 'value': predicted = predicted.iloc[-1] predList = list(predicted) over = [i for i in predList if i >= higherthan] less = [i for i in predList if i < higherthan] else: print("'on' must be either value or return") return (len(over)/(len(over)+len(less))) import matplotlib.pyplot as plt import seaborn as sns def simulate_mc(data, days, iterations, return_type='log', plot=True): # Generate daily returns returns = daily_returns(data, days, iterations, return_type) # Create empty matrix price_list = np.zeros_like(returns) # Put the last actual price in the first row of matrix. price_list[0] = data.iloc[-1] # Calculate the price of each day for t in range(1,days): price_list[t] = price_list[t-1]*returns[t] # Plot Option if plot == True: x = pd.DataFrame(price_list).iloc[-1] fig, ax = plt.subplots(1,2, figsize=(14,4)) sns.distplot(x, ax=ax[0]) sns.distplot(x, hist_kws={'cumulative':True},kde_kws={'cumulative':True},ax=ax[1]) plt.xlabel("Stock Price") plt.savefig('stock_price_distribution.png') plt.show() #CAPM and Sharpe Ratio # Printing information about stock try: [print(nam) for nam in data.columns] except: print(data.name) print(f"Days: {days-1}") print(f"Expected Value: ${round(pd.DataFrame(price_list).iloc[-1].mean(),2)}") print(f"Return: {round(100*(pd.DataFrame(price_list).iloc[-1].mean()-price_list[0,1])/pd.DataFrame(price_list).iloc[-1].mean(),2)}%") print(f"Probability of Breakeven: {probs_find(pd.DataFrame(price_list),0, on='return')}") output = { "Days": days-1, "Expected Value": round(pd.DataFrame(price_list).iloc[-1].mean(), 2), "Return": round(100*(pd.DataFrame(price_list).iloc[-1].mean()-price_list[0,1])/pd.DataFrame(price_list).iloc[-1].mean(), 2), "Probability of Breakeven": probs_find(pd.DataFrame(price_list), 0, on='return') } return (output) # return pd.DataFrame(price_list) # set Variables def get_stock_data(tickers, days): api_key = os.getenv('ALPHAVANTAGE_API_KEY') # Placeholder - This is where you'd fetch and process the actual stock data # using libraries like pandas-datareader, yfinance, etc. data = import_stock_data_alphavantage(tickers, api_key) print(data.head()) log_return = log_returns(data) print(data) beta_sharpe(data) drift_calc(data) print(drift_calc(data)) dr = daily_returns(data, 2, 3) item = simulate_mc(data, days, 2000, 'log') print(item) return json.dumps(item) # Create Gradio interface elements ticker_input = gr.Textbox(lines=1, placeholder="Enter tickers separated by commas (e.g., AAPL, TSLA)") days_input = gr.Number(value=180, label="Number of Days") iface = gr.Interface( fn=get_stock_data, inputs=[ticker_input, days_input], outputs="text", title="Stock Data Analyzer" ) iface.launch(share=True)