|
import akshare as ak |
|
import pandas as pd |
|
import datetime |
|
from pymongo import MongoClient |
|
import time |
|
import pytz |
|
|
|
client = MongoClient( |
|
"mongodb://wth000:wth000@43.159.47.250:27017/dbname?authSource=wth000") |
|
db = client["wth000"] |
|
|
|
|
|
name = "行业" |
|
|
|
collection = db[f"{name}"] |
|
|
|
start_date = "20190101" |
|
current_date = datetime.datetime.now() |
|
end_date = current_date.strftime("%Y%m%d") |
|
codes = ak.stock_board_industry_name_em()["板块名称"] |
|
try: |
|
|
|
for code in codes: |
|
latest = list(collection.find({"代码": str(code)}, { |
|
"timestamp": 1}).sort("timestamp", -1).limit(1)) |
|
|
|
if len(latest) == 0: |
|
upsert_docs = True |
|
start_date_query = start_date |
|
else: |
|
upsert_docs = False |
|
latest_timestamp = latest[0]["timestamp"] |
|
start_date_query = datetime.datetime.fromtimestamp( |
|
latest_timestamp).strftime("%Y%m%d") |
|
|
|
k_data = ak.stock_board_industry_hist_em( |
|
symbol=code, start_date=start_date_query, end_date=end_date, period="日k", adjust="hfq") |
|
k_data_true = ak.stock_board_industry_hist_em( |
|
symbol=code, start_date=start_date_query, end_date=end_date, period="日k", adjust="") |
|
try: |
|
k_data_true = k_data_true[["日期", "开盘"]].rename(columns={"开盘": "真实价格"}) |
|
k_data = pd.merge(k_data, k_data_true, on="日期", how="left") |
|
k_data["代码"] = str(code) |
|
k_data["成交量"] = k_data["成交量"].apply(lambda x: float(x)) |
|
k_data["timestamp"] = k_data["日期"].apply(lambda x: float( |
|
datetime.datetime.strptime(x, "%Y-%m-%d").replace(tzinfo=pytz.timezone("Asia/Shanghai")).timestamp())) |
|
|
|
|
|
|
|
k_data = k_data.sort_values(by=["代码", "日期"]) |
|
docs_to_update = k_data.to_dict("records") |
|
if upsert_docs: |
|
|
|
try: |
|
collection.insert_many(docs_to_update) |
|
except Exception as e: |
|
pass |
|
else: |
|
bulk_insert = [] |
|
for doc in docs_to_update: |
|
if doc["timestamp"] > latest_timestamp: |
|
|
|
bulk_insert.append(doc) |
|
if doc["timestamp"] == float(latest_timestamp): |
|
try: |
|
collection.update_many({"代码": doc["代码"], "日期": doc["日期"]}, { |
|
"$set": doc}, upsert=True) |
|
except Exception as e: |
|
pass |
|
|
|
if bulk_insert: |
|
try: |
|
collection.insert_many(bulk_insert) |
|
except Exception as e: |
|
pass |
|
except Exception as e: |
|
print(e, f"因为{code}停牌") |
|
print("任务已经完成") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
print(e) |
|
|