|
import akshare as ak |
|
import pandas as pd |
|
import datetime |
|
from pymongo import MongoClient |
|
import time |
|
import pytz |
|
|
|
client = MongoClient( |
|
"mongodb://wth000:wth000@43.159.47.250:27017/dbname?authSource=wth000") |
|
db = client["wth000"] |
|
names = [("000", "001", "002", "600", "601", "603", "605")] |
|
|
|
|
|
|
|
|
|
|
|
|
|
codes = ak.stock_zh_a_spot_em() |
|
|
|
|
|
|
|
|
|
for name in names: |
|
try: |
|
collection = db[f"股票30分钟{name}"] |
|
df = pd.DataFrame() |
|
df = codes[codes["代码"].str.startswith(name)][["代码", "名称"]].copy() |
|
|
|
for code in df["代码"]: |
|
|
|
latest = list(collection.find({"代码": float(code)}, { |
|
"timestamp": 1}).sort("timestamp", -1).limit(1)) |
|
|
|
if len(latest) == 0: |
|
upsert_docs = True |
|
|
|
else: |
|
upsert_docs = False |
|
latest_timestamp = latest[0]["timestamp"] |
|
try: |
|
|
|
k_data = ak.stock_zh_a_hist_min_em(symbol=code, period="30") |
|
k_data["代码"] = float(code) |
|
k_data["成交量"] = k_data["成交量"].apply(lambda x: float(x)) |
|
k_data["日期"] = k_data["时间"] |
|
|
|
|
|
k_data["timestamp"] = k_data["日期"].apply(lambda x: float( |
|
datetime.datetime.strptime(x, "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.timezone("Asia/Shanghai")).timestamp())) |
|
k_data = k_data.sort_values(by=["代码", "日期"]) |
|
docs_to_update = k_data.to_dict("records") |
|
if upsert_docs: |
|
|
|
try: |
|
collection.insert_many(docs_to_update) |
|
except Exception as e: |
|
pass |
|
else: |
|
bulk_insert = [] |
|
for doc in docs_to_update: |
|
if doc["timestamp"] > latest_timestamp: |
|
|
|
bulk_insert.append(doc) |
|
if doc["timestamp"] == float(latest_timestamp): |
|
try: |
|
collection.update_many({"代码": doc["代码"], "日期": doc["日期"]}, { |
|
"$set": doc}, upsert=True) |
|
except Exception as e: |
|
pass |
|
|
|
if bulk_insert: |
|
try: |
|
collection.insert_many(bulk_insert) |
|
except Exception as e: |
|
pass |
|
except Exception as e: |
|
print(e, f"因为{code}停牌") |
|
print("任务已经完成") |
|
except Exception as e: |
|
print(e) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|