|
import akshare as ak |
|
import pandas as pd |
|
import datetime |
|
from pymongo import MongoClient |
|
import time |
|
import pytz |
|
|
|
client = MongoClient( |
|
"mongodb://wth000:wth000@43.159.47.250:27017/dbname?authSource=wth000") |
|
db = client["wth000"] |
|
names = ["可转债30分钟"] |
|
|
|
|
|
df = ak.bond_zh_hs_cov_spot() |
|
df["代码"] = df["symbol"] |
|
for name in names: |
|
try: |
|
collection = db[f"股票{name}"] |
|
|
|
for code in df["代码"]: |
|
latest = list(collection.find( |
|
{"代码": code}, {"timestamp": 1}).sort("timestamp", -1).limit(1)) |
|
if len(latest) == 0: |
|
upsert_docs = True |
|
else: |
|
upsert_docs = False |
|
latest_timestamp = latest[0]["timestamp"] |
|
try: |
|
k_data = ak.bond_zh_hs_cov_min(symbol=code, period=60) |
|
k_data["代码"] = code |
|
k_data = k_data.rename(columns={"时间": "日期"}) |
|
|
|
k_data["成交量"] = k_data["成交量"].apply(lambda x: float(x)) |
|
k_data["成交额"] = k_data["成交量"]*k_data["开盘"] |
|
|
|
k_data["timestamp"] = k_data["日期"].apply(lambda x: float(datetime.datetime.strptime( |
|
x, "%Y-%m-%d %H:%M:%S").replace(tzinfo=pytz.timezone("Asia/Shanghai")).timestamp())) |
|
k_data = k_data.sort_values(by=["代码", "日期"]) |
|
docs_to_update = k_data.to_dict("records") |
|
if upsert_docs: |
|
|
|
try: |
|
collection.insert_many(docs_to_update) |
|
except Exception as e: |
|
pass |
|
else: |
|
bulk_insert = [] |
|
for doc in docs_to_update: |
|
if doc["timestamp"] > latest_timestamp: |
|
|
|
bulk_insert.append(doc) |
|
if doc["timestamp"] == float(latest_timestamp): |
|
try: |
|
collection.update_many({"代码": doc["代码"], "日期": doc["日期"]}, { |
|
"$set": doc}, upsert=True) |
|
except Exception as e: |
|
pass |
|
|
|
if bulk_insert: |
|
try: |
|
collection.insert_many(bulk_insert) |
|
except Exception as e: |
|
pass |
|
except Exception as e: |
|
print(e, f"因为{code}停牌") |
|
print("任务已经完成") |
|
except Exception as e: |
|
print(e) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|