|
import yfinance as yf |
|
import pandas as pd |
|
import numpy as np |
|
from datetime import date, timedelta, datetime |
|
import logging |
|
import sys |
|
import os |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
import requests |
|
import re |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, stream=sys.stdout, |
|
format='%(asctime)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger() |
|
|
|
def get_last_market_date(): |
|
today = date.today() |
|
while today.weekday() >= 5: |
|
today -= timedelta(days=1) |
|
return today.strftime("%Y-%m-%d") |
|
|
|
def get_start_date(interval): |
|
today = date.today() |
|
if interval in ['1d', '1wk', '1mo']: |
|
years_ago = 5 |
|
days_ago = 365*years_ago |
|
else: |
|
years_ago = 2 |
|
days_ago = 365*years_ago |
|
start_date = today - timedelta(days=days_ago) |
|
return start_date.strftime("%Y-%m-%d") |
|
|
|
def fetch_asset_data(symbol, start_date, end_date, interval='1d', asset_type='stock'): |
|
try: |
|
if asset_type == 'crypto': |
|
symbol = f"{symbol}-USD" |
|
asset = yf.Ticker(symbol) |
|
data = asset.history(start=start_date, end=end_date, interval=interval) |
|
if data.empty: |
|
logger.warning(f"No data fetched for {asset_type} {symbol}. Please check the symbol and date range.") |
|
return None |
|
return data |
|
except Exception as e: |
|
logger.warning(f"Error fetching data for {asset_type} {symbol}: {e}") |
|
return None |
|
|
|
def calculate_atr(data, period=14): |
|
high = data['High'] |
|
low = data['Low'] |
|
close = data['Close'] |
|
tr1 = high - low |
|
tr2 = abs(high - close.shift()) |
|
tr3 = abs(low - close.shift()) |
|
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) |
|
atr = tr.rolling(window=period).mean() |
|
return atr |
|
|
|
def calculate_supertrend(data, atr_period, multiplier): |
|
hl2 = (data['High'] + data['Low']) / 2 |
|
atr = calculate_atr(data, atr_period) |
|
|
|
upper_band = hl2 + (multiplier * atr) |
|
lower_band = hl2 - (multiplier * atr) |
|
|
|
supertrend = pd.Series(index=data.index, dtype=float) |
|
direction = pd.Series(index=data.index, dtype=int) |
|
|
|
for i in range(1, len(data)): |
|
if data['Close'].iloc[i] > upper_band.iloc[i-1]: |
|
direction.iloc[i] = 1 |
|
elif data['Close'].iloc[i] < lower_band.iloc[i-1]: |
|
direction.iloc[i] = -1 |
|
else: |
|
direction.iloc[i] = direction.iloc[i-1] |
|
|
|
if direction.iloc[i] == 1 and lower_band.iloc[i] < lower_band.iloc[i-1]: |
|
lower_band.iloc[i] = lower_band.iloc[i-1] |
|
if direction.iloc[i] == -1 and upper_band.iloc[i] > upper_band.iloc[i-1]: |
|
upper_band.iloc[i] = upper_band.iloc[i-1] |
|
|
|
if direction.iloc[i] == 1: |
|
supertrend.iloc[i] = lower_band.iloc[i] |
|
else: |
|
supertrend.iloc[i] = upper_band.iloc[i] |
|
|
|
|
|
signals = pd.Series(index=data.index, dtype=str) |
|
signals.iloc[0] = '' |
|
for i in range(1, len(data)): |
|
if direction.iloc[i] == 1 and direction.iloc[i-1] == -1: |
|
signals.iloc[i] = 'BUY' |
|
elif direction.iloc[i] == -1 and direction.iloc[i-1] == 1: |
|
signals.iloc[i] = 'SELL' |
|
else: |
|
signals.iloc[i] = '' |
|
|
|
return supertrend, signals |
|
|
|
def ema(series, period): |
|
return series.ewm(span=period, adjust=False).mean() |
|
|
|
def range_size(x, qty, n): |
|
wper = (n * 2) - 1 |
|
avrng = ema(abs(x - x.shift(1)), n) |
|
AC = ema(avrng, wper) * qty |
|
return AC |
|
|
|
def range_filter(x, rng_, n): |
|
r = rng_ |
|
rfilt = pd.Series(index=x.index, dtype=float) |
|
rfilt.iloc[0] = x.iloc[0] |
|
|
|
for i in range(1, len(x)): |
|
if x.iloc[i] - r.iloc[i] > rfilt.iloc[i-1]: |
|
rfilt.iloc[i] = x.iloc[i] - r.iloc[i] |
|
elif x.iloc[i] + r.iloc[i] < rfilt.iloc[i-1]: |
|
rfilt.iloc[i] = x.iloc[i] + r.iloc[i] |
|
else: |
|
rfilt.iloc[i] = rfilt.iloc[i-1] |
|
|
|
return rfilt |
|
|
|
def vumanchu_swing(data, rng_per, rng_qty): |
|
close = data['Close'] |
|
r = range_size(close, rng_qty, rng_per) |
|
filt = range_filter(close, r, rng_per) |
|
|
|
fdir = pd.Series(index=data.index, dtype=float) |
|
fdir.iloc[0] = 0 |
|
|
|
for i in range(1, len(data)): |
|
if filt.iloc[i] > filt.iloc[i-1]: |
|
fdir.iloc[i] = 1 |
|
elif filt.iloc[i] < filt.iloc[i-1]: |
|
fdir.iloc[i] = -1 |
|
else: |
|
fdir.iloc[i] = fdir.iloc[i-1] |
|
|
|
upward = (fdir == 1).astype(int) |
|
downward = (fdir == -1).astype(int) |
|
|
|
longCond = ((close > filt) & (close > close.shift(1)) & (upward > 0)) | \ |
|
((close > filt) & (close < close.shift(1)) & (upward > 0)) |
|
shortCond = ((close < filt) & (close < close.shift(1)) & (downward > 0)) | \ |
|
((close < filt) & (close > close.shift(1)) & (downward > 0)) |
|
|
|
CondIni = pd.Series(0, index=data.index) |
|
for i in range(1, len(data)): |
|
if longCond.iloc[i]: |
|
CondIni.iloc[i] = 1 |
|
elif shortCond.iloc[i]: |
|
CondIni.iloc[i] = -1 |
|
else: |
|
CondIni.iloc[i] = CondIni.iloc[i-1] |
|
|
|
signals = pd.Series(index=data.index, dtype=str) |
|
signals.iloc[0] = '' |
|
for i in range(1, len(data)): |
|
if CondIni.iloc[i] == 1 and CondIni.iloc[i-1] == -1: |
|
signals.iloc[i] = 'BUY' |
|
elif CondIni.iloc[i] == -1 and CondIni.iloc[i-1] == 1: |
|
signals.iloc[i] = 'SELL' |
|
else: |
|
signals.iloc[i] = '' |
|
|
|
return filt, signals |
|
|
|
def analyze_asset(symbol, start_date, end_date, interval, asset_type='stock'): |
|
data = fetch_asset_data(symbol, start_date, end_date, interval, asset_type) |
|
|
|
if data is None or len(data) < 100: |
|
logger.warning(f"Insufficient data for {symbol}. Data points: {len(data) if data is not None else 0}") |
|
return None |
|
|
|
data['SuperTrend_1x'], data['Signal_1x'] = calculate_supertrend(data, 10, 1) |
|
data['SuperTrend_2x'], data['Signal_2x'] = calculate_supertrend(data, 11, 2) |
|
data['SuperTrend_3x'], data['Signal_3x'] = calculate_supertrend(data, 12, 3) |
|
|
|
|
|
swing_period = 20 |
|
swing_multiplier = 3.5 |
|
data['VuManchu'], data['VuManchu_Signal'] = vumanchu_swing(data, swing_period, swing_multiplier) |
|
|
|
return data |
|
|
|
def get_sp500_tickers(): |
|
url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies" |
|
response = requests.get(url) |
|
tables = pd.read_html(response.text) |
|
df = tables[0] |
|
return df['Symbol'].tolist() |
|
|
|
def get_signals(symbol, start_date, end_date, interval): |
|
data = analyze_asset(symbol, start_date, end_date, interval) |
|
if data is not None: |
|
if interval == '1d': |
|
signals = data.last('7D') |
|
elif interval == '1wk': |
|
signals = data.last('8W') |
|
else: |
|
signals = data.last('7D') |
|
|
|
signals = signals[['Close', 'Signal_1x', 'Signal_2x', 'Signal_3x', 'VuManchu_Signal']].copy() |
|
signals['Symbol'] = symbol |
|
signals['Date'] = signals.index.date |
|
logger.info(f"Generated signals for {symbol}:\n{signals}") |
|
return signals |
|
return None |
|
|
|
def process_batch(symbols, start_date, end_date, interval): |
|
results = [] |
|
with ThreadPoolExecutor(max_workers=10) as executor: |
|
future_to_stock = {executor.submit(get_signals, symbol, start_date, end_date, interval): symbol for symbol in symbols} |
|
for future in as_completed(future_to_stock): |
|
symbol = future_to_stock[future] |
|
try: |
|
signals = future.result() |
|
if signals is not None and not signals.empty: |
|
results.append(signals) |
|
else: |
|
logger.warning(f"No signals generated for {symbol}") |
|
except Exception as exc: |
|
logger.error(f'{symbol} generated an exception: {exc}') |
|
return results |
|
|
|
def main(): |
|
stocks_input = input("Enter stock symbol(s) to analyze (comma-separated) or press Enter for S&P 500: ").strip().upper() |
|
interval = input("Enter time interval (1d or 1wk): ").lower() |
|
|
|
if interval not in ['1d', '1wk']: |
|
logger.warning("Invalid interval. Defaulting to 1d.") |
|
interval = '1d' |
|
|
|
if stocks_input: |
|
|
|
stocks = re.findall(r'\b[A-Z]+\b', stocks_input) |
|
else: |
|
logger.info("Fetching S&P 500 stocks...") |
|
stocks = get_sp500_tickers() |
|
|
|
end_date = get_last_market_date() |
|
start_date = (datetime.strptime(end_date, "%Y-%m-%d") - timedelta(days=365*2)).strftime("%Y-%m-%d") |
|
|
|
logger.info(f"Analyzing {len(stocks)} stocks from {start_date} to {end_date}...") |
|
|
|
all_signals = [] |
|
batch_size = 50 |
|
total_batches = (len(stocks) + batch_size - 1) // batch_size |
|
|
|
for i in range(0, len(stocks), batch_size): |
|
batch = stocks[i:i+batch_size] |
|
logger.info(f"Processing batch {i//batch_size + 1} of {total_batches}...") |
|
batch_results = process_batch(batch, start_date, end_date, interval) |
|
all_signals.extend(batch_results) |
|
logger.info(f"Completed batch {i//batch_size + 1} of {total_batches}") |
|
|
|
if all_signals: |
|
combined_signals = pd.concat(all_signals, ignore_index=True) |
|
combined_signals = combined_signals[['Date', 'Symbol', 'Close', 'Signal_1x', 'Signal_2x', 'Signal_3x', 'VuManchu_Signal']] |
|
|
|
output_dir = 'vumanchu/output' |
|
os.makedirs(output_dir, exist_ok=True) |
|
output_file = os.path.join(output_dir, f'all_stocks_signals_{interval}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv') |
|
combined_signals.to_csv(output_file, index=False) |
|
print(f"\nSignals for all analyzed stocks exported to {output_file}") |
|
|
|
print("\nSample of the results:") |
|
print(combined_signals.head(15)) |
|
else: |
|
print("No signals generated for any stock.") |
|
|
|
if __name__ == "__main__": |
|
main() |