File size: 3,800 Bytes
7ec53ba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import akshare as ak
import pandas as pd
import datetime
from pymongo import MongoClient
import time
import pytz

client = MongoClient(
    "mongodb://wth000:wth000@43.159.47.250:27017/dbname?authSource=wth000")
db = client["wth000"]

# 设置参数
name = "行业"

collection = db[f"{name}"]
# 获取当前日期
start_date = "20190101"
current_date = datetime.datetime.now()
end_date = current_date.strftime("%Y%m%d")
codes = ak.stock_board_industry_name_em()["板块名称"]
try:
    # 遍历目标指数代码,获取其分钟K线数据
    for code in codes:
        latest = list(collection.find({"代码": str(code)}, {
                      "timestamp": 1}).sort("timestamp", -1).limit(1))
        # print(latest)
        if len(latest) == 0:
            upsert_docs = True
            start_date_query = start_date
        else:
            upsert_docs = False
            latest_timestamp = latest[0]["timestamp"]
            start_date_query = datetime.datetime.fromtimestamp(
                latest_timestamp).strftime("%Y%m%d")
        # 通过 akshare 获取目标指数的日K线数据
        k_data = ak.stock_board_industry_hist_em(
            symbol=code, start_date=start_date_query, end_date=end_date, period="日k", adjust="hfq")
        k_data_true = ak.stock_board_industry_hist_em(
            symbol=code, start_date=start_date_query, end_date=end_date, period="日k", adjust="")
        try:
            k_data_true = k_data_true[["日期", "开盘"]].rename(columns={"开盘": "真实价格"})
            k_data = pd.merge(k_data, k_data_true, on="日期", how="left")
            k_data["代码"] = str(code)
            k_data["成交量"] = k_data["成交量"].apply(lambda x: float(x))
            k_data["timestamp"] = k_data["日期"].apply(lambda x: float(
                datetime.datetime.strptime(x, "%Y-%m-%d").replace(tzinfo=pytz.timezone("Asia/Shanghai")).timestamp()))
            # k_data["timestamp"] = k_data["日期"].apply(lambda x: float(
            #     datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.timezone("Asia/Shanghai")).timestamp()))

            k_data = k_data.sort_values(by=["代码", "日期"])
            docs_to_update = k_data.to_dict("records")
            if upsert_docs:
                # print(f"{name}({code}) 新增数据")
                try:
                    collection.insert_many(docs_to_update)
                except Exception as e:
                    pass
            else:
                bulk_insert = []
                for doc in docs_to_update:
                    if doc["timestamp"] > latest_timestamp:
                        # 否则,加入插入列表
                        bulk_insert.append(doc)
                    if doc["timestamp"] == float(latest_timestamp):
                        try:
                            collection.update_many({"代码": doc["代码"], "日期": doc["日期"]}, {
                                "$set": doc}, upsert=True)
                        except Exception as e:
                            pass
                # 执行批量插入操作
                if bulk_insert:
                    try:
                        collection.insert_many(bulk_insert)
                    except Exception as e:
                        pass
        except Exception as e:
            print(e, f"因为{code}停牌")
    print("任务已经完成")
    # limit = 600000
    # if collection.count_documents({}) >= limit:
    #     oldest_data = collection.find().sort([("日期", 1)]).limit(
    #         collection.count_documents({})-limit)
    #     ids_to_delete = [data["_id"] for data in oldest_data]
    #     collection.delete_many({"_id": {"$in": ids_to_delete}})
    # print("数据清理成功")
except Exception as e:
    print(e)