code / 数据获取 /get股票分钟.py
tradequant's picture
Upload 59 files
7ec53ba
import akshare as ak
import pandas as pd
import datetime
from pymongo import MongoClient
import time
import pytz
client = MongoClient(
"mongodb://wth000:wth000@43.159.47.250:27017/dbname?authSource=wth000")
db = client["wth000"]
names = [("000", "001", "002", "600", "601", "603", "605")]
# # 获取当前日期
# start_date = "20190101"
# current_date = datetime.datetime.now()
# end_date = current_date.strftime("%Y%m%d")
# # 从akshare获取A股主板股票的代码和名称
codes = ak.stock_zh_a_spot_em()
# # 过滤掉危险股票
# codes = codes[~codes["名称"].str.contains("S")]
# # 过滤掉退市股票
# codes = codes[~codes["名称"].str.contains("退")]
for name in names:
try:
collection = db[f"股票30分钟{name}"]
df = pd.DataFrame()
df = codes[codes["代码"].str.startswith(name)][["代码", "名称"]].copy()
# 遍历目标指数代码,获取其分钟K线数据
for code in df["代码"]:
# print(code)
latest = list(collection.find({"代码": float(code)}, {
"timestamp": 1}).sort("timestamp", -1).limit(1))
# print(latest)
if len(latest) == 0:
upsert_docs = True
# start_date_query = start_date
else:
upsert_docs = False
latest_timestamp = latest[0]["timestamp"]
try:
# 通过 akshare 获取目标指数的日K线数据
k_data = ak.stock_zh_a_hist_min_em(symbol=code, period="30")
k_data["代码"] = float(code)
k_data["成交量"] = k_data["成交量"].apply(lambda x: float(x))
k_data["日期"] = k_data["时间"]
# k_data["timestamp"] = k_data["日期"].apply(lambda x: float(
# datetime.datetime.strptime(x, "%Y-%m-%d").replace(tzinfo=pytz.timezone("Asia/Shanghai")).timestamp()))
k_data["timestamp"] = k_data["日期"].apply(lambda x: float(
datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.timezone("Asia/Shanghai")).timestamp()))
k_data = k_data.sort_values(by=["代码", "日期"])
docs_to_update = k_data.to_dict("records")
if upsert_docs:
# print(f"{name}({code}) 新增数据")
try:
collection.insert_many(docs_to_update)
except Exception as e:
pass
else:
bulk_insert = []
for doc in docs_to_update:
if doc["timestamp"] > latest_timestamp:
# 否则,加入插入列表
bulk_insert.append(doc)
if doc["timestamp"] == float(latest_timestamp):
try:
collection.update_many({"代码": doc["代码"], "日期": doc["日期"]}, {
"$set": doc}, upsert=True)
except Exception as e:
pass
# 执行批量插入操作
if bulk_insert:
try:
collection.insert_many(bulk_insert)
except Exception as e:
pass
except Exception as e:
print(e, f"因为{code}停牌")
print("任务已经完成")
except Exception as e:
print(e)
# limit = 600000
# if collection.count_documents({}) >= limit:
# oldest_data = collection.find().sort([("日期", 1)]).limit(
# collection.count_documents({})-limit)
# ids_to_delete = [data["_id"] for data in oldest_data]
# collection.delete_many({"_id": {"$in": ids_to_delete}})
# print("数据清理成功")