File size: 3,706 Bytes
7ec53ba |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
import akshare as ak
import pandas as pd
import datetime
from pymongo import MongoClient
import time
import pytz
client = MongoClient(
"mongodb://wth000:wth000@43.159.47.250:27017/dbname?authSource=wth000")
db = client["wth000"]
names = ["可转债"]
# 从akshare获取可转债的代码和名称
df = ak.bond_zh_hs_cov_spot()
df["代码"] = df["symbol"]
for name in names:
try:
collection = db[f"股票{name}"]
# 遍历目标指数代码,获取其日K线数据
for code in df["代码"]:
latest = list(collection.find(
{"代码": code}, {"timestamp": 1}).sort("timestamp", -1).limit(1))
if len(latest) == 0:
upsert_docs = True
else:
upsert_docs = False
latest_timestamp = latest[0]["timestamp"]
try:
k_data = ak.bond_zh_hs_cov_daily(symbol=code)
k_data["代码"] = code
k_data = k_data.rename(columns={"date": "日期", "open": "开盘",
"high": "最高", "low": "最低",
"close": "收盘", "volume": "成交量",
})
k_data["日期"] = k_data["日期"].apply(lambda x: datetime.datetime.strftime(x, "%Y-%m-%d"))
k_data["成交量"] = k_data["成交量"].apply(lambda x: float(x))
k_data["成交额"] = k_data["成交量"]*k_data["开盘"]
k_data["timestamp"] = k_data["日期"].apply(lambda x: float(datetime.datetime.strptime(x, "%Y-%m-%d").replace(tzinfo=pytz.timezone("Asia/Shanghai")).timestamp()))
# k_data["timestamp"] = k_data["日期"].apply(lambda x: float(datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.timezone("Asia/Shanghai")).timestamp()))
k_data = k_data.sort_values(by=["代码", "日期"])
docs_to_update = k_data.to_dict("records")
time.sleep(60.0)
if upsert_docs:
# print(f"{name}({code}) 新增数据")
try:
collection.insert_many(docs_to_update)
except Exception as e:
pass
else:
bulk_insert = []
for doc in docs_to_update:
if doc["timestamp"] > latest_timestamp:
# 否则,加入插入列表
bulk_insert.append(doc)
if doc["timestamp"] == float(latest_timestamp):
try:
collection.update_many({"代码": doc["代码"], "日期": doc["日期"]}, {
"$set": doc}, upsert=True)
except Exception as e:
pass
# 执行批量插入操作
if bulk_insert:
try:
collection.insert_many(bulk_insert)
except Exception as e:
pass
except Exception as e:
print(e, f"因为{code}停牌")
print("任务已经完成")
except Exception as e:
print(e)
# limit = 600000
# if collection.count_documents({}) >= limit:
# oldest_data = collection.find().sort([("日期", 1)]).limit(
# collection.count_documents({})-limit)
# ids_to_delete = [data["_id"] for data in oldest_data]
# collection.delete_many({"_id": {"$in": ids_to_delete}})
# print("数据清理成功")
|