b32e2fd803
- 新增 fetch_sector_leaders() 从 market.json 读取热门行业领涨股 - 三路并行:同花顺技术榜 + 行业领涨 + 东财热榜(502降级) - 优先级排序:行业领涨 > 同花顺榜 > 东财热榜 - 名称→代码映射使用本地缓存,避免频繁调用akshare - 更新文档
349 lines
13 KiB
Python
349 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""xiaoguo_scanner.py — 小果独立扫描线
|
|
|
|
每5分钟跑一轮,全市场排行榜主动发现潜在标的。
|
|
不依赖 trend_detector 信号,独立产出到 signal_news。
|
|
"""
|
|
|
|
import json, os, re, time, urllib.request
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
try:
|
|
import akshare as ak
|
|
HAS_AKSHARE = True
|
|
except ImportError:
|
|
HAS_AKSHARE = False
|
|
|
|
DATA_DIR = Path(__file__).parent / "data"
|
|
DB_PATH = DATA_DIR / "mofin.db"
|
|
XIAOGUO_API = "http://192.168.1.122:18003/v1/chat/completions"
|
|
XIAOGUO_MODEL = "Qwen3.6-27B-MTPLX-Optimized-Speed"
|
|
SCAN_INTERVAL = 3600 # 同一只股1小时内不重复搜
|
|
MAX_STOCKS_PER_RUN = 15
|
|
ARTICLES_PER_STOCK = 3
|
|
|
|
# 同花顺看多榜(挖掘潜力股)
|
|
BULLISH_BOARDS = [
|
|
("创新高", "stock_rank_cxg_ths"),
|
|
("量价齐升", "stock_rank_ljqs_ths"),
|
|
("向上突破", "stock_rank_xstp_ths"),
|
|
("连续上涨", "stock_rank_lxsz_ths"),
|
|
("持续放量", "stock_rank_cxfl_ths"),
|
|
("险资举牌", "stock_rank_xzjp_ths"),
|
|
]
|
|
|
|
# 同花顺看空榜(持仓风险预警)
|
|
BEARISH_BOARDS = [
|
|
("创新低", "stock_rank_cxd_ths"),
|
|
("持续缩量", "stock_rank_cxsl_ths"),
|
|
("量价齐跌", "stock_rank_ljqd_ths"),
|
|
("连续下跌", "stock_rank_lxxd_ths"),
|
|
("向下突破", "stock_rank_xxtp_ths"),
|
|
]
|
|
|
|
ALL_BOARDS = BULLISH_BOARDS + BEARISH_BOARDS
|
|
BULLISH_COUNT = len(BULLISH_BOARDS)
|
|
|
|
# 行业领涨股扫描(不轮换,每轮都跑)
|
|
# 从 market.json 读热门行业领涨股
|
|
SECTOR_HOT_THRESHOLD = 2.5 # 板块涨幅>2.5%时捞它的领涨股
|
|
|
|
|
|
def clean_proxy():
|
|
for k in ['http_proxy','https_proxy','HTTP_PROXY','HTTPS_PROXY']:
|
|
os.environ.pop(k, None)
|
|
|
|
|
|
def get_conn():
|
|
import sqlite3
|
|
conn = sqlite3.connect(str(DB_PATH))
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def fetch_hot_board():
|
|
"""东方财富热榜"""
|
|
if not HAS_AKSHARE:
|
|
return []
|
|
try:
|
|
clean_proxy()
|
|
df = ak.stock_hot_rank_em()
|
|
if df is None or len(df) == 0:
|
|
return []
|
|
# 东方财富热榜列名变化较大,自动检测
|
|
cols = list(df.columns)
|
|
code_candidates = [c for c in cols if any(x in c for x in ['代码', 'code', 'CODE'])]
|
|
name_candidates = [c for c in cols if any(x in c for x in ['简称', '名称', 'name', 'NAME'])]
|
|
code_col = code_candidates[0] if code_candidates else cols[1]
|
|
name_col = name_candidates[0] if name_candidates else cols[2]
|
|
return [{"code": str(r[code_col]).zfill(6).strip(), "name": str(r[name_col]).strip(),
|
|
"rank": i+1, "source": "东方财富热榜"}
|
|
for i, (_, r) in enumerate(df.head(30).iterrows())]
|
|
except Exception:
|
|
pass
|
|
return []
|
|
|
|
|
|
def fetch_rotating_board():
|
|
"""同花顺轮流榜(每轮一个),返回 (股票列表, 是否看多)"""
|
|
if not HAS_AKSHARE:
|
|
return [], True
|
|
conn = get_conn()
|
|
row = conn.execute("SELECT val FROM state_meta WHERE key='xiaoguo_board_round'").fetchone()
|
|
round_idx = (int(row[0]) if row else 0) % len(ALL_BOARDS)
|
|
conn.execute("INSERT OR REPLACE INTO state_meta (key, val) VALUES ('xiaoguo_board_round', ?)",
|
|
(str((round_idx + 1) % len(ALL_BOARDS)),))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
board_name, func_name = ALL_BOARDS[round_idx]
|
|
is_bullish = round_idx < BULLISH_COUNT
|
|
print(f" 同花顺榜: {board_name} {'📈看多' if is_bullish else '📉看空'}", flush=True)
|
|
|
|
try:
|
|
clean_proxy()
|
|
fn = getattr(ak, func_name)
|
|
df = fn()
|
|
cols = list(df.columns)
|
|
code_col = [c for c in cols if '代码' in c][0]
|
|
name_col = [c for c in cols if '简称' in c or '名称' in c][0]
|
|
return [{"code": str(r[code_col]).zfill(6), "name": str(r[name_col]).strip(),
|
|
"source": f"同花顺{board_name}"}
|
|
for _, r in df.head(15).iterrows()], is_bullish
|
|
except Exception as e:
|
|
print(f" {board_name}失败: {e}", flush=True)
|
|
return [], is_bullish
|
|
|
|
|
|
def fetch_sector_leaders():
|
|
"""从 market.json 读热门行业领涨股"""
|
|
mkt_path = DATA_DIR / "market.json"
|
|
if not mkt_path.exists():
|
|
return []
|
|
try:
|
|
mkt = json.loads(mkt_path.read_text())
|
|
sectors = mkt.get("sectors", [])
|
|
|
|
# 代码→名称映射(优先用本地缓存,避免每次跑都调akshare)
|
|
cache_path = DATA_DIR / "stock_name_code_cache.json"
|
|
name_to_code = {}
|
|
if cache_path.exists():
|
|
name_to_code = json.loads(cache_path.read_text())
|
|
if not name_to_code:
|
|
try:
|
|
import akshare as ak
|
|
df = ak.stock_info_a_code_name()
|
|
for _, r in df.iterrows():
|
|
name_to_code[r["简称"]] = r["代码"]
|
|
cache_path.write_text(json.dumps(name_to_code, ensure_ascii=False))
|
|
print(f" 名称代码映射: {len(name_to_code)}只已缓存", flush=True)
|
|
except Exception as e:
|
|
print(f" 名称代码映射加载失败: {e}", flush=True)
|
|
|
|
leaders = []
|
|
seen = set()
|
|
for s in sectors:
|
|
chg = s.get("change", 0) or 0
|
|
lead_name = s.get("lead_stock", "")
|
|
if chg < SECTOR_HOT_THRESHOLD or not lead_name or lead_name in seen:
|
|
continue
|
|
seen.add(lead_name)
|
|
code = name_to_code.get(lead_name, "")
|
|
if not code:
|
|
continue
|
|
leaders.append({
|
|
"code": code,
|
|
"name": lead_name,
|
|
"source": f"行业领涨-{s['name']}+{chg:+.1f}%",
|
|
})
|
|
return leaders
|
|
except Exception as e:
|
|
print(f" 行业领涨获取失败: {e}", flush=True)
|
|
return []
|
|
|
|
|
|
def get_scanned_codes(conn):
|
|
"""取1小时内已扫描过的代码"""
|
|
rows = conn.execute(
|
|
"SELECT code FROM xiaoguo_scan_tracker WHERE datetime(last_scanned_at) > datetime('now', '-1 hour')"
|
|
).fetchall()
|
|
return {r[0] for r in rows}
|
|
|
|
|
|
def mark_scanned(conn, code, name, found):
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO xiaoguo_scan_tracker (code, name, last_scanned_at, found_count) "
|
|
"VALUES (?, ?, datetime('now','localtime'), COALESCE((SELECT found_count FROM xiaoguo_scan_tracker WHERE code=?),0)+?)",
|
|
(code, name, code, 1 if found else 0)
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def search_news(code, max_results=3):
|
|
"""akshare搜个股新闻"""
|
|
articles = []
|
|
if not HAS_AKSHARE:
|
|
return articles
|
|
try:
|
|
clean_proxy()
|
|
df = ak.stock_news_em(symbol=code)
|
|
for _, r in df.head(max_results).iterrows():
|
|
title = r.get('新闻标题', '')
|
|
content = r.get('新闻内容', '')
|
|
if title and len(title) > 5:
|
|
articles.append({"title": title, "content": content})
|
|
except:
|
|
pass
|
|
return articles
|
|
|
|
|
|
def check_stock(code, name, articles):
|
|
"""小果LLM判断这只股票是否有料(一次调用判断所有文章)"""
|
|
if not articles:
|
|
return None, None
|
|
|
|
lines = [f"{i+1}. {a['title']}" for i, a in enumerate(articles[:3])]
|
|
prompt = f"""以下是最新关于{name}({code})的新闻标题。
|
|
该股今日上了人气热榜/技术榜单。
|
|
|
|
新闻:
|
|
{chr(10).join(lines)}
|
|
|
|
这只股上榜是否跟这些新闻有关?有关的话是利好还是利空?
|
|
回答格式:有关(利好|利空|中性) 或 无关
|
|
回答:"""
|
|
|
|
payload = json.dumps({
|
|
"model": XIAOGUO_MODEL,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"temperature": 0.1, "max_tokens": 100,
|
|
}).encode()
|
|
|
|
opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
|
|
req = urllib.request.Request(XIAOGUO_API, data=payload,
|
|
headers={"Content-Type": "application/json"}, method="POST")
|
|
try:
|
|
resp = opener.open(req, timeout=30)
|
|
reply = json.loads(resp.read())["choices"][0]["message"]["content"]
|
|
if "有关" in reply or "利好" in reply or "利空" in reply:
|
|
for s in ["利好", "利空", "中性"]:
|
|
if s in reply:
|
|
return True, s
|
|
return True, "中性"
|
|
except:
|
|
pass
|
|
return None, None
|
|
|
|
|
|
def main():
|
|
start_time = time.time()
|
|
conn = get_conn()
|
|
|
|
# 1. 拉板
|
|
hot = fetch_hot_board()
|
|
rotating, is_bullish = fetch_rotating_board()
|
|
leaders = fetch_sector_leaders()
|
|
elapsed = time.time() - start_time
|
|
print(f"榜单: 东方财富{len(hot)}只, 同花顺{len(rotating)}只, 行业领涨{len(leaders)}只 ({elapsed:.0f}s)", flush=True)
|
|
|
|
if not hot and not rotating and not leaders:
|
|
conn.close()
|
|
return
|
|
|
|
# 加载持仓代码(用于看空榜比对)
|
|
holdings = set()
|
|
if not is_bullish:
|
|
cur = conn.execute("SELECT code FROM holdings WHERE is_active=1")
|
|
holdings = {r[0].lstrip("0") for r in cur.fetchall()}
|
|
# 也查自选
|
|
cur2 = conn.execute("SELECT code FROM watchlist_stocks")
|
|
holdings.update({r[0].lstrip("0") for r in cur2.fetchall()})
|
|
|
|
# 2. 合并去重 + 看空榜只保留持仓股
|
|
all_stocks = {}
|
|
# 行业领涨优先(热门板块龙头)
|
|
for s in (leaders if is_bullish else []) + hot + rotating:
|
|
code = s["code"]
|
|
code_stripped = code.lstrip("0")
|
|
if not is_bullish:
|
|
# 看空榜:只处理持仓/自选中的股票
|
|
if code_stripped not in holdings:
|
|
continue
|
|
if code not in all_stocks:
|
|
all_stocks[code] = {"code": code, "name": s["name"], "sources": []}
|
|
all_stocks[code]["sources"].append(s["source"])
|
|
|
|
if not all_stocks:
|
|
if is_bullish:
|
|
print("榜单为空", flush=True)
|
|
else:
|
|
print(f"看空榜无持仓股命中", flush=True)
|
|
conn.close()
|
|
return
|
|
|
|
# 3. 排除已搜索过的(看空榜不排除——每次都要检查风险)
|
|
scanned = get_scanned_codes(conn)
|
|
if is_bullish:
|
|
candidates = [s for code, s in all_stocks.items()
|
|
if code not in scanned and len(code) == 6 and code.isdigit()][:MAX_STOCKS_PER_RUN]
|
|
else:
|
|
# 看空榜:不限数量,全检
|
|
candidates = [s for code, s in all_stocks.items()
|
|
if len(code) == 6 and code.isdigit()]
|
|
|
|
if not candidates:
|
|
print(f"无新候选(已有 {len(scanned)} 只已扫描)", flush=True)
|
|
conn.close()
|
|
return
|
|
|
|
print(f"待扫描: {len(candidates)} 只({'看多' if is_bullish else '看空'}榜)", flush=True)
|
|
|
|
# 4. 逐只处理
|
|
found_any = False
|
|
for stock in candidates:
|
|
code, name = stock["code"], stock["name"]
|
|
sources = "|".join(stock["sources"])
|
|
|
|
articles = search_news(code, ARTICLES_PER_STOCK) if is_bullish else []
|
|
if not articles and is_bullish:
|
|
mark_scanned(conn, code, name, False)
|
|
continue
|
|
|
|
has_found = False
|
|
if is_bullish:
|
|
ok, sentiment = check_stock(code, name, articles)
|
|
if ok:
|
|
has_found = True
|
|
found_any = True
|
|
conn.execute(
|
|
"INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) "
|
|
"VALUES (NULL, ?, ?, ?, ?, ?, 'xiaoguo')",
|
|
(f"扫描-{name}", sentiment, f"[{sources}] {articles[0]['title'][:80]}",
|
|
json.dumps([{"title": a["title"], "sentiment": sentiment, "summary": (a.get("content") or "")[:100]} for a in articles[:3]], ensure_ascii=False),
|
|
json.dumps([name], ensure_ascii=False))
|
|
)
|
|
print(f" ✅ {name}({code}) [{sources}] {sentiment}: {articles[0]['title'][:50]}", flush=True)
|
|
mark_scanned(conn, code, name, has_found)
|
|
else:
|
|
# 看空榜:直接写入风险信号
|
|
has_found = True
|
|
found_any = True
|
|
conn.execute(
|
|
"INSERT INTO signal_news (signal_id, sector, overall_sentiment, summary, key_articles, searched_stocks, source) "
|
|
"VALUES (NULL, ?, ?, ?, ?, ?, 'xiaoguo_risk')",
|
|
(f"预警-{name}", "偏空", f"[{sources}] {name}登上{sources}榜,需关注持仓风险",
|
|
json.dumps([{"title": name, "sentiment": "偏空", "summary": f"上榜{sources}"}], ensure_ascii=False),
|
|
json.dumps([name], ensure_ascii=False))
|
|
)
|
|
print(f" ⚠️ {name}({code}) [{sources}] 持仓风险信号", flush=True)
|
|
mark_scanned(conn, code, name, has_found)
|
|
|
|
total_time = time.time() - start_time
|
|
print(f"完成: {len(candidates)}只{'看多' if is_bullish else '看空'}扫描, {'有发现' if found_any else '无发现'} ({total_time:.0f}s)", flush=True)
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|