File size: 3,706 Bytes
7ec53ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import akshare as ak
import pandas as pd
import datetime
from pymongo import MongoClient
import time
import pytz

client = MongoClient(
    "mongodb://wth000:wth000@43.159.47.250:27017/dbname?authSource=wth000")
db = client["wth000"]
names = ["可转债"]

# 从akshare获取可转债的代码和名称
df = ak.bond_zh_hs_cov_spot()
df["代码"] = df["symbol"]
for name in names:
    try:
        collection = db[f"股票{name}"]
        # 遍历目标指数代码,获取其日K线数据
        for code in df["代码"]:
            latest = list(collection.find(
                {"代码": code}, {"timestamp": 1}).sort("timestamp", -1).limit(1))
            if len(latest) == 0:
                upsert_docs = True
            else:
                upsert_docs = False
                latest_timestamp = latest[0]["timestamp"]
            try:
                k_data = ak.bond_zh_hs_cov_daily(symbol=code)
                k_data["代码"] = code
                k_data = k_data.rename(columns={"date": "日期", "open": "开盘",
                                                "high": "最高", "low": "最低",
                                                "close": "收盘", "volume": "成交量",
                                                })
                k_data["日期"] = k_data["日期"].apply(lambda x: datetime.datetime.strftime(x, "%Y-%m-%d"))
                k_data["成交量"] = k_data["成交量"].apply(lambda x: float(x))
                k_data["成交额"] = k_data["成交量"]*k_data["开盘"]
                k_data["timestamp"] = k_data["日期"].apply(lambda x: float(datetime.datetime.strptime(x, "%Y-%m-%d").replace(tzinfo=pytz.timezone("Asia/Shanghai")).timestamp()))
                # k_data["timestamp"] = k_data["日期"].apply(lambda x: float(datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.timezone("Asia/Shanghai")).timestamp()))
                k_data = k_data.sort_values(by=["代码", "日期"])
                docs_to_update = k_data.to_dict("records")
                time.sleep(60.0)
                if upsert_docs:
                    # print(f"{name}({code}) 新增数据")
                    try:
                        collection.insert_many(docs_to_update)
                    except Exception as e:
                        pass
                else:
                    bulk_insert = []
                    for doc in docs_to_update:
                        if doc["timestamp"] > latest_timestamp:
                            # 否则,加入插入列表
                            bulk_insert.append(doc)
                        if doc["timestamp"] == float(latest_timestamp):
                            try:
                                collection.update_many({"代码": doc["代码"], "日期": doc["日期"]}, {
                                    "$set": doc}, upsert=True)
                            except Exception as e:
                                pass
                    # 执行批量插入操作
                    if bulk_insert:
                        try:
                            collection.insert_many(bulk_insert)
                        except Exception as e:
                            pass
            except Exception as e:
                print(e, f"因为{code}停牌")
        print("任务已经完成")
    except Exception as e:
        print(e)
# limit = 600000
# if collection.count_documents({}) >= limit:
#     oldest_data = collection.find().sort([("日期", 1)]).limit(
#         collection.count_documents({})-limit)
#     ids_to_delete = [data["_id"] for data in oldest_data]
#     collection.delete_many({"_id": {"$in": ids_to_delete}})
# print("数据清理成功")