import yfinance as yf import pandas as pd import numpy as np from datetime import date, timedelta, datetime import logging import sys import os from concurrent.futures import ThreadPoolExecutor, as_completed import requests import re # Set up logging logging.basicConfig(level=logging.INFO, stream=sys.stdout, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger() def get_last_market_date(): today = date.today() while today.weekday() >= 5: today -= timedelta(days=1) return today.strftime("%Y-%m-%d") def get_start_date(interval): today = date.today() if interval in ['1d', '1wk', '1mo']: years_ago = 5 days_ago = 365*years_ago else: years_ago = 2 days_ago = 365*years_ago start_date = today - timedelta(days=days_ago) return start_date.strftime("%Y-%m-%d") def fetch_asset_data(symbol, start_date, end_date, interval='1d', asset_type='stock'): try: if asset_type == 'crypto': symbol = f"{symbol}-USD" asset = yf.Ticker(symbol) data = asset.history(start=start_date, end=end_date, interval=interval) if data.empty: logger.warning(f"No data fetched for {asset_type} {symbol}. Please check the symbol and date range.") return None return data except Exception as e: logger.warning(f"Error fetching data for {asset_type} {symbol}: {e}") return None def calculate_atr(data, period=14): high = data['High'] low = data['Low'] close = data['Close'] tr1 = high - low tr2 = abs(high - close.shift()) tr3 = abs(low - close.shift()) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) atr = tr.rolling(window=period).mean() return atr def calculate_supertrend(data, atr_period, multiplier): hl2 = (data['High'] + data['Low']) / 2 atr = calculate_atr(data, atr_period) upper_band = hl2 + (multiplier * atr) lower_band = hl2 - (multiplier * atr) supertrend = pd.Series(index=data.index, dtype=float) direction = pd.Series(index=data.index, dtype=int) for i in range(1, len(data)): if data['Close'].iloc[i] > upper_band.iloc[i-1]: direction.iloc[i] = 1 elif data['Close'].iloc[i] < lower_band.iloc[i-1]: direction.iloc[i] = -1 else: direction.iloc[i] = direction.iloc[i-1] if direction.iloc[i] == 1 and lower_band.iloc[i] < lower_band.iloc[i-1]: lower_band.iloc[i] = lower_band.iloc[i-1] if direction.iloc[i] == -1 and upper_band.iloc[i] > upper_band.iloc[i-1]: upper_band.iloc[i] = upper_band.iloc[i-1] if direction.iloc[i] == 1: supertrend.iloc[i] = lower_band.iloc[i] else: supertrend.iloc[i] = upper_band.iloc[i] # Generate buy/sell signals signals = pd.Series(index=data.index, dtype=str) signals.iloc[0] = '' for i in range(1, len(data)): if direction.iloc[i] == 1 and direction.iloc[i-1] == -1: signals.iloc[i] = 'BUY' elif direction.iloc[i] == -1 and direction.iloc[i-1] == 1: signals.iloc[i] = 'SELL' else: signals.iloc[i] = '' return supertrend, signals def ema(series, period): return series.ewm(span=period, adjust=False).mean() def range_size(x, qty, n): wper = (n * 2) - 1 avrng = ema(abs(x - x.shift(1)), n) AC = ema(avrng, wper) * qty return AC def range_filter(x, rng_, n): r = rng_ rfilt = pd.Series(index=x.index, dtype=float) rfilt.iloc[0] = x.iloc[0] for i in range(1, len(x)): if x.iloc[i] - r.iloc[i] > rfilt.iloc[i-1]: rfilt.iloc[i] = x.iloc[i] - r.iloc[i] elif x.iloc[i] + r.iloc[i] < rfilt.iloc[i-1]: rfilt.iloc[i] = x.iloc[i] + r.iloc[i] else: rfilt.iloc[i] = rfilt.iloc[i-1] return rfilt def vumanchu_swing(data, rng_per, rng_qty): close = data['Close'] r = range_size(close, rng_qty, rng_per) filt = range_filter(close, r, rng_per) fdir = pd.Series(index=data.index, dtype=float) fdir.iloc[0] = 0 for i in range(1, len(data)): if filt.iloc[i] > filt.iloc[i-1]: fdir.iloc[i] = 1 elif filt.iloc[i] < filt.iloc[i-1]: fdir.iloc[i] = -1 else: fdir.iloc[i] = fdir.iloc[i-1] upward = (fdir == 1).astype(int) downward = (fdir == -1).astype(int) longCond = ((close > filt) & (close > close.shift(1)) & (upward > 0)) | \ ((close > filt) & (close < close.shift(1)) & (upward > 0)) shortCond = ((close < filt) & (close < close.shift(1)) & (downward > 0)) | \ ((close < filt) & (close > close.shift(1)) & (downward > 0)) CondIni = pd.Series(0, index=data.index) for i in range(1, len(data)): if longCond.iloc[i]: CondIni.iloc[i] = 1 elif shortCond.iloc[i]: CondIni.iloc[i] = -1 else: CondIni.iloc[i] = CondIni.iloc[i-1] signals = pd.Series(index=data.index, dtype=str) signals.iloc[0] = '' for i in range(1, len(data)): if CondIni.iloc[i] == 1 and CondIni.iloc[i-1] == -1: signals.iloc[i] = 'BUY' elif CondIni.iloc[i] == -1 and CondIni.iloc[i-1] == 1: signals.iloc[i] = 'SELL' else: signals.iloc[i] = '' return filt, signals def analyze_asset(symbol, start_date, end_date, interval, asset_type='stock'): data = fetch_asset_data(symbol, start_date, end_date, interval, asset_type) if data is None or len(data) < 100: logger.warning(f"Insufficient data for {symbol}. Data points: {len(data) if data is not None else 0}") return None data['SuperTrend_1x'], data['Signal_1x'] = calculate_supertrend(data, 10, 1) data['SuperTrend_2x'], data['Signal_2x'] = calculate_supertrend(data, 11, 2) data['SuperTrend_3x'], data['Signal_3x'] = calculate_supertrend(data, 12, 3) # VuManchu Swing swing_period = 20 swing_multiplier = 3.5 data['VuManchu'], data['VuManchu_Signal'] = vumanchu_swing(data, swing_period, swing_multiplier) return data def get_sp500_tickers(): url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies" response = requests.get(url) tables = pd.read_html(response.text) df = tables[0] return df['Symbol'].tolist() def get_signals(symbol, start_date, end_date, interval): data = analyze_asset(symbol, start_date, end_date, interval) if data is not None: if interval == '1d': signals = data.last('7D') elif interval == '1wk': signals = data.last('8W') # Changed to 8 weeks else: signals = data.last('7D') # Default to 1 week for other intervals signals = signals[['Close', 'Signal_1x', 'Signal_2x', 'Signal_3x', 'VuManchu_Signal']].copy() signals['Symbol'] = symbol signals['Date'] = signals.index.date logger.info(f"Generated signals for {symbol}:\n{signals}") return signals return None def process_batch(symbols, start_date, end_date, interval): results = [] with ThreadPoolExecutor(max_workers=10) as executor: future_to_stock = {executor.submit(get_signals, symbol, start_date, end_date, interval): symbol for symbol in symbols} for future in as_completed(future_to_stock): symbol = future_to_stock[future] try: signals = future.result() if signals is not None and not signals.empty: results.append(signals) else: logger.warning(f"No signals generated for {symbol}") except Exception as exc: logger.error(f'{symbol} generated an exception: {exc}') return results def main(): stocks_input = input("Enter stock symbol(s) to analyze (comma-separated) or press Enter for S&P 500: ").strip().upper() interval = input("Enter time interval (1d or 1wk): ").lower() if interval not in ['1d', '1wk']: logger.warning("Invalid interval. Defaulting to 1d.") interval = '1d' if stocks_input: # Use regex to split the input string into individual stock symbols stocks = re.findall(r'\b[A-Z]+\b', stocks_input) else: logger.info("Fetching S&P 500 stocks...") stocks = get_sp500_tickers() end_date = get_last_market_date() start_date = (datetime.strptime(end_date, "%Y-%m-%d") - timedelta(days=365*2)).strftime("%Y-%m-%d") logger.info(f"Analyzing {len(stocks)} stocks from {start_date} to {end_date}...") all_signals = [] batch_size = 50 total_batches = (len(stocks) + batch_size - 1) // batch_size for i in range(0, len(stocks), batch_size): batch = stocks[i:i+batch_size] logger.info(f"Processing batch {i//batch_size + 1} of {total_batches}...") batch_results = process_batch(batch, start_date, end_date, interval) all_signals.extend(batch_results) logger.info(f"Completed batch {i//batch_size + 1} of {total_batches}") if all_signals: combined_signals = pd.concat(all_signals, ignore_index=True) combined_signals = combined_signals[['Date', 'Symbol', 'Close', 'Signal_1x', 'Signal_2x', 'Signal_3x', 'VuManchu_Signal']] output_dir = 'vumanchu/output' os.makedirs(output_dir, exist_ok=True) output_file = os.path.join(output_dir, f'all_stocks_signals_{interval}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv') combined_signals.to_csv(output_file, index=False) print(f"\nSignals for all analyzed stocks exported to {output_file}") print("\nSample of the results:") print(combined_signals.head(15)) # Increased to show more rows else: print("No signals generated for any stock.") if __name__ == "__main__": main()