Просмотр исходного кода

Merge branch 'dev_api_init' of weapp/video_decode into master

jihuaqiang 5 дней назад
Родитель
Сommit
0d328d2344

+ 4 - 1
.env

@@ -8,4 +8,7 @@ LOG_LEVEL=INFO
 
 
 # local本地  prod线上
-APP_ENV=prod  
+APP_ENV=prod 
+
+ODPS_ACCESS_KEY_ID=LTAI5t9b7RnUiCy5v3gqXf9Y
+ODPS_ACCESS_KEY_SECRET=1HVHQe0AV7xaoOWHsM8k9XN3gEaal7

+ 2 - 0
docker-compose.yaml

@@ -10,6 +10,8 @@ services:
     restart: always
     ports:
       - "8000:8000"
+    volumes:
+      - ./logs:/video_decode/logs
     env_file:
       - .env
     environment:

+ 19 - 1
main.py

@@ -10,6 +10,7 @@ from tasks.decode import begin_decode_task
 from tasks.detail import get_decode_detail_by_task_id
 from tasks.pattern import begin_pattern_task
 from tasks.topic_search import search_topics
+from scheduler.bootstrap import run_dispatch_once, start_scheduler, stop_scheduler
 
 from loguru import logger
 import sys
@@ -49,6 +50,16 @@ app.add_middleware(
 )
 
 
+@app.on_event("startup")
+async def on_startup() -> None:
+    start_scheduler()
+
+
+@app.on_event("shutdown")
+async def on_shutdown() -> None:
+    stop_scheduler()
+
+
 @app.middleware("http")
 async def api_access_log_middleware(request: Request, call_next):
     """记录每个接口的请求(路径、查询、body)与响应(状态码、body)。"""
@@ -179,4 +190,11 @@ def pattern_content(param: PatternContentParam):
 def search_content_topics(param: TopicSearchParam):
     """视频选题检索:根据关键词在解构结果中匹配,返回匹配度最高的 top5"""
     results = search_topics(param)
-    return _build_api_response(code=0, data=results)
+    return _build_api_response(code=0, data=results)
+
+
+@app.post("/api/v1/content/tasks/scheduler/decode/run-once")
+def run_decode_scheduler_once():
+    """手动触发一次17点解构调度任务,便于验证。"""
+    run_dispatch_once()
+    return _build_api_response(code=0, data={"triggered": True})

+ 2 - 0
requirements.txt

@@ -19,6 +19,8 @@ cryptography>=41.0.0
 
 # HTTP 客户端
 requests>=2.31.0
+apscheduler>=3.10.4
+pyodps>=0.12.3
 
 # Excel 读写(mysql_work 等脚本)
 pandas>=2.0.0

+ 51 - 0
scheduler/bootstrap.py

@@ -0,0 +1,51 @@
+from typing import Optional
+from datetime import datetime
+from zoneinfo import ZoneInfo
+
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.triggers.cron import CronTrigger
+
+from scheduler.decode_dispatch_job import run_decode_dispatch_job
+from scheduler.decode_hourly_stats_job import run_decode_hourly_stats_job
+from utils.scheduler_logger import get_scheduler_logger
+
+
+logger = get_scheduler_logger()
+_scheduler: Optional[BackgroundScheduler] = None
+
+
+def start_scheduler() -> None:
+    global _scheduler
+    if _scheduler and _scheduler.running:
+        logger.info("调度器已在运行,跳过重复启动")
+        return
+
+    _scheduler = BackgroundScheduler(timezone=ZoneInfo("Asia/Shanghai"))
+    _scheduler.add_job(
+        run_decode_dispatch_job,
+        trigger=CronTrigger(minute="*/3"),
+        id="decode_dispatch_every_3min",
+        replace_existing=True,
+        next_run_time=datetime.now(ZoneInfo("Asia/Shanghai")),
+    )
+    _scheduler.add_job(
+        run_decode_hourly_stats_job,
+        trigger=CronTrigger(minute=6),
+        id="decode_hourly_stats",
+        replace_existing=True,
+    )
+    _scheduler.start()
+    logger.info("调度器已启动:解码任务每3分钟执行,统计任务每小时整点执行")
+
+
+def stop_scheduler() -> None:
+    global _scheduler
+    if _scheduler and _scheduler.running:
+        _scheduler.shutdown(wait=False)
+        logger.info("调度器已停止")
+    _scheduler = None
+
+
+def run_dispatch_once() -> None:
+    logger.info("手动触发执行一次调度任务")
+    run_decode_dispatch_job()

+ 493 - 0
scheduler/decode_dispatch_job.py

@@ -0,0 +1,493 @@
+import json
+from datetime import datetime
+from typing import Any, Dict, List, Optional, Tuple
+from zoneinfo import ZoneInfo
+
+import requests
+
+from scheduler.odps_fetch import fetch_priority_posts
+from utils.scheduler_logger import get_scheduler_logger
+from utils.sync_mysql_help import mysql
+
+
+logger = get_scheduler_logger()
+
+CONFIG_ID = "57"
+DECODE_URL = "https://aigc-api.aiddit.com/aigc/api/task/decode"
+DECODE_RESULT_URL = "https://aigc-api.aiddit.com/aigc/api/task/decode/result"
+# 动态窗口:尽量保持当天 status IN (0,1) 的条数为此值(补充时按缺口取数)。
+BATCH_SIZE = 40
+ODPS_PAGE_SIZE = 200
+RESULT_POLL_CHUNK = 50
+
+
+def _map_api_status_to_int(api_status: str, vid: str) -> int:
+    """Map upstream status string to DB status: 0待执行 1执行中 2成功 3失败."""
+    s = (api_status or "").strip().upper()
+    if s == "SUCCESS":
+        return 2
+    if s in ("FAILED", "FAILURE", "ERROR", "FAIL"):
+        return 3
+    if s in ("RUNNING", "PROCESSING", "DOING"):
+        return 1
+    if s in ("PENDING", "WAITING", "INIT", "QUEUED"):
+        return 0
+    if not s:
+        return 0
+    logger.warning("未知解码状态,按执行中处理 status={} vid={}", api_status, vid)
+    return 1
+
+
+def _safe_json_loads(text: Optional[str]) -> Dict[str, Any]:
+    if not text:
+        return {}
+    try:
+        data = json.loads(text)
+        return data if isinstance(data, dict) else {}
+    except Exception:
+        return {}
+
+
+def _today_dt() -> str:
+    return datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
+
+
+def _is_allowed_level(level_value: Any) -> bool:
+    try:
+        return int(level_value) in (0, 1, 2)
+    except (TypeError, ValueError):
+        return False
+
+
+def _is_decode_submit_open() -> bool:
+    """
+    Gate for submitting NEW decode tasks.
+    Only controls whether to submit; polling/querying existing tasks is unaffected.
+    """
+    sql = """
+        SELECT is_open
+        FROM aigc_topic_decode_task_oprate
+        ORDER BY id DESC
+        LIMIT 1
+    """
+    try:
+        row = mysql.fetchone(sql)
+        if not row:
+            # Fail-open if table is empty to avoid blocking by default.
+            return True
+        return int(row.get("is_open") or 0) == 1
+    except Exception as exc:
+        # Conservative: if we cannot confirm switch is open, skip submit this cycle.
+        logger.exception("查询解构开关失败,本轮不发起新解构任务: {}", exc)
+        return False
+
+
+def _fetch_today_pending_vids(dt: str) -> List[str]:
+    sql = """
+        SELECT DISTINCT vid
+        FROM aigc_topic_decode_task_result
+        WHERE dt = %s AND status IN (0, 1) AND vid IS NOT NULL AND vid != ''
+        ORDER BY vid
+    """
+    rows = mysql.fetchall(sql, (dt,))
+    return [str(row["vid"]) for row in rows if row.get("vid")]
+
+
+def _count_today_non_terminal(dt: str) -> int:
+    sql = """
+        SELECT COUNT(1) AS total
+        FROM aigc_topic_decode_task_result
+        WHERE dt = %s AND status IN (0, 1)
+    """
+    result = mysql.fetchone(sql, (dt,))
+    return int((result or {}).get("total", 0))
+
+
+def _submit_decode_result_chunk(
+    channel_content_ids: List[str],
+) -> Tuple[bool, str, Dict[str, Any]]:
+    payload = {"params": {"configId": CONFIG_ID, "channelContentIds": channel_content_ids}}
+    try:
+        resp = requests.post(DECODE_RESULT_URL, json=payload, timeout=60)
+        if resp.status_code != 200:
+            return False, f"http_status_{resp.status_code}", {}
+        body = resp.json()
+        ok = body.get("code") == 0
+        return ok, body.get("msg") or "", body
+    except Exception as exc:
+        return False, str(exc), {}
+
+
+def _apply_result_row_to_db(dt: str, item: Dict[str, Any]) -> None:
+    vid = str(item.get("channelContentId") or "").strip()
+    if not vid:
+        return
+    api_status_raw = item.get("status") or ""
+    err_msg = (item.get("err_msg") or item.get("errorMessage") or "") or ""
+    data_content = item.get("dataContent")
+    if data_content is not None and not isinstance(data_content, str):
+        data_content = json.dumps(data_content, ensure_ascii=False)
+    html = item.get("html")
+
+    base_status = _map_api_status_to_int(str(api_status_raw), vid)
+
+    sql = """
+        UPDATE aigc_topic_decode_task_result
+        SET status = %s,
+            err_msg = %s,
+            data_content = %s,
+            html = %s
+        WHERE dt = %s AND vid = %s
+    """
+    mysql.execute(
+        sql,
+        (
+            base_status,
+            err_msg[:512] if err_msg else "",
+            data_content if data_content is not None else "",
+            html if html is not None else None,
+            dt,
+            vid,
+        ),
+    )
+
+
+def _poll_decode_results_for_today(dt: str, vids: List[str]) -> None:
+    if not vids:
+        return
+    total = len(vids)
+    logger.info("开始查询解码结果 dt={} 总vid数={}", dt, total)
+    overall_success = 0
+    overall_returned = 0
+    for i in range(0, total, RESULT_POLL_CHUNK):
+        chunk = vids[i : i + RESULT_POLL_CHUNK]
+        logger.info(
+            "查询解码结果 dt={} 分片序号={} 分片大小={} 总数={}",
+            dt,
+            i // RESULT_POLL_CHUNK,
+            len(chunk),
+            total,
+        )
+        ok, msg, body = _submit_decode_result_chunk(chunk)
+        if not ok:
+            logger.error(
+                "查询解码结果接口失败 dt={} msg={} body={}",
+                dt,
+                msg,
+                body,
+            )
+            continue
+        data_list = body.get("data")
+        if not isinstance(data_list, list):
+            logger.warning("查询解码结果返回中缺少data列表 body={}", body)
+            continue
+        chunk_success = 0
+        returned_ids = {str(x.get("channelContentId") or "") for x in data_list}
+        missing = set(chunk) - returned_ids
+        if missing:
+            logger.warning(
+                "查询解码结果返回缺少{}个vid,示例={}",
+                len(missing),
+                list(missing)[:5],
+            )
+        for item in data_list:
+            if not isinstance(item, dict):
+                continue
+            vid = str(item.get("channelContentId") or "").strip()
+            api_status = str(item.get("status") or "")
+            mapped_status = _map_api_status_to_int(api_status, vid)
+            if mapped_status == 2:
+                chunk_success += 1
+            err_msg = (item.get("err_msg") or item.get("errorMessage") or "") or ""
+            logger.info(
+                "解码结果明细 dt={} vid={} 接口状态={} 映射状态={} 错误信息={}",
+                dt,
+                vid,
+                api_status,
+                mapped_status,
+                err_msg[:512] if err_msg else "",
+            )
+            _apply_result_row_to_db(dt, item)
+        overall_success += chunk_success
+        overall_returned += len(data_list)
+        logger.info(
+            "解码结果分片处理完成 dt={} 查询数={} 返回数={} 成功数={}",
+            dt,
+            len(chunk),
+            len(data_list),
+            chunk_success,
+        )
+    logger.info(
+        "解码结果查询完成 dt={} 查询总数={} 返回总数={} 成功总数={}",
+        dt,
+        total,
+        overall_returned,
+        overall_success,
+    )
+
+
+def _build_posts_payload(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+    posts: List[Dict[str, Any]] = []
+    for item in records:
+        extend_raw = item.get("extend")
+        extend_obj: Dict[str, Any]
+        if isinstance(extend_raw, dict):
+            extend_obj = extend_raw
+        else:
+            extend_obj = _safe_json_loads(str(extend_raw)) if extend_raw is not None else {}
+        cover_url = extend_obj.get("cover_url") or ""
+        images = [cover_url] if cover_url else []
+        posts.append(
+            {
+                "channelContentId": item.get("vid") or "",
+                "title": item.get("title") or "",
+                "video": item.get("url") or "",
+                "images": images,
+                "contentModal": 4,
+                "channel": 10,
+            }
+        )
+    return posts
+
+
+def _submit_decode(posts: List[Dict[str, Any]]) -> Tuple[bool, str, Dict[str, Any]]:
+    payload = {"params": {"configId": CONFIG_ID, "posts": posts}}
+    try:
+        resp = requests.post(DECODE_URL, json=payload, timeout=60)
+        if resp.status_code != 200:
+            return False, f"http_status_{resp.status_code}", {}
+        body = resp.json()
+        ok = body.get("code") == 0
+        return ok, body.get("msg") or "", body
+    except Exception as exc:
+        return False, str(exc), {}
+
+
+def _load_existing_vids(dt: str) -> set[str]:
+    sql = """
+        SELECT DISTINCT vid
+        FROM aigc_topic_decode_task_result
+        WHERE dt = %s AND vid IS NOT NULL AND vid != ''
+    """
+    rows = mysql.fetchall(sql, (dt,))
+    return {str(row["vid"]) for row in rows if row.get("vid")}
+
+
+def _pick_candidate_records(dt: str, batch_size: int = BATCH_SIZE) -> List[Dict[str, Any]]:
+    existing_vids = _load_existing_vids(dt)
+    selected: List[Dict[str, Any]] = []
+    selected_vids: set[str] = set()
+    offset = 0
+
+    while len(selected) < batch_size:
+        page = fetch_priority_posts(limit=ODPS_PAGE_SIZE, offset=offset, dt=dt)
+        if not page:
+            break
+        for item in page:
+            vid = str(item.get("vid") or "")
+            if (
+                not vid
+                or not _is_allowed_level(item.get("level"))
+                or vid in existing_vids
+                or vid in selected_vids
+            ):
+                continue
+            selected.append(item)
+            selected_vids.add(vid)
+            if len(selected) >= batch_size:
+                break
+        offset += ODPS_PAGE_SIZE
+
+    logger.info(
+        "候选数据筛选完成 dt={} 已选数量={} 扫描offset={}",
+        dt,
+        len(selected),
+        offset,
+    )
+    if selected:
+        vid_title_pairs = [
+            {"vid": str(item.get("vid") or ""), "title": item.get("title") or ""}
+            for item in selected
+        ]
+        logger.info("已选候选数据 dt={} items={}", dt, vid_title_pairs)
+    return selected
+
+
+def _row_status_after_decode_submit(
+    vid: str, row_in_resp: Optional[Dict[str, Any]], full_body: Dict[str, Any]
+) -> Tuple[int, str, str, Optional[str]]:
+    """Returns (status, err_msg, data_content, html) for INSERT."""
+    if not row_in_resp:
+        payload = json.dumps({"decode_submit_response": full_body}, ensure_ascii=False)
+        return 1, "", payload, None
+
+    api_status_raw = row_in_resp.get("status") or ""
+    err_msg = (row_in_resp.get("err_msg") or row_in_resp.get("errorMessage") or "") or ""
+    mapped = _map_api_status_to_int(str(api_status_raw), vid)
+    payload = json.dumps(
+        {"decode_submit_item": row_in_resp, "decode_submit_response": full_body},
+        ensure_ascii=False,
+    )
+
+    if mapped == 3:
+        return 3, err_msg[:512], payload, None
+
+    if mapped == 2:
+        # New submit API only returns status/errorMessage.
+        # Keep SUCCESS as terminal success; detailed result is queried via decode/result.
+        return 2, "", payload, None
+
+    if mapped == 0:
+        return 0, err_msg[:512], payload, None
+
+    return 1, err_msg[:512], payload, None
+
+
+def _insert_task_result_row(
+    source: Dict[str, Any],
+    status: int,
+    err_msg: str,
+    data_content: str,
+    html: Optional[str],
+) -> None:
+    extend = _safe_json_loads(source.get("extend"))
+    cover_url = extend.get("cover_url", "")
+    cover_text = cover_url if isinstance(cover_url, str) else ""
+    images_text = source.get("url") or ""
+    sql = """
+        INSERT INTO aigc_topic_decode_task_result
+        (task_id, status, err_msg, vid, title, cover, video_url, images, type, channel, cate1, cate2, dt, data_content, html)
+        VALUES
+        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+    """
+    params = (
+        None,
+        status,
+        err_msg or "",
+        str(source.get("vid") or ""),
+        source.get("title") or "",
+        cover_text,
+        source.get("url") or "",
+        images_text,
+        source.get("type") or "",
+        source.get("channel") or "",
+        source.get("cate1") or "",
+        source.get("cate2") or "",
+        source.get("dt") or _today_dt(),
+        data_content,
+        html,
+    )
+    mysql.execute(sql, params)
+
+
+def _insert_rows_after_decode_submit(records: List[Dict[str, Any]], body: Dict[str, Any]) -> None:
+    data_list = body.get("data") if isinstance(body.get("data"), list) else []
+    by_vid = {str(x.get("channelContentId") or ""): x for x in data_list if isinstance(x, dict)}
+
+    for item in records:
+        vid = str(item.get("vid") or "")
+        row = by_vid.get(vid)
+        status, err_msg, data_content, html = _row_status_after_decode_submit(vid, row, body)
+        _insert_task_result_row(item, status, err_msg, data_content, html)
+
+
+def run_decode_dispatch_job() -> None:
+    logger.info("解码调度任务开始执行")
+    try:
+        dt = _today_dt()
+
+        before_poll_non_terminal = _count_today_non_terminal(dt)
+        if before_poll_non_terminal > 0:
+            logger.info(
+                "当天存在待执行/执行中任务,先拉取解码结果 dt={} count={}",
+                dt,
+                before_poll_non_terminal,
+            )
+        pending_vids = _fetch_today_pending_vids(dt)
+        if pending_vids:
+            logger.info("查询当天待执行/执行中记录 dt={} count={}", dt, len(pending_vids))
+            _poll_decode_results_for_today(dt, pending_vids)
+        elif before_poll_non_terminal > 0:
+            logger.warning(
+                "存在非终态记录但未获取到可查询vid dt={} count={}",
+                dt,
+                before_poll_non_terminal,
+            )
+
+        after_poll_non_terminal = _count_today_non_terminal(dt)
+        if pending_vids or before_poll_non_terminal > 0:
+            logger.info(
+                "解码结果查询阶段结束 dt={} 查询前非终态={} 查询后非终态={}",
+                dt,
+                before_poll_non_terminal,
+                after_poll_non_terminal,
+            )
+
+        if not _is_decode_submit_open():
+            logger.info("解构开关关闭(is_open!=1),跳过本轮新批次发起 dt={}", dt)
+            logger.info("解码调度任务结束(开关关闭:不发起新任务)")
+            return
+
+        need = max(0, BATCH_SIZE - after_poll_non_terminal)
+        if need == 0:
+            logger.info(
+                "解构中已满{}条,本轮无需补充 dt={} non_terminal={}",
+                BATCH_SIZE,
+                dt,
+                after_poll_non_terminal,
+            )
+            logger.info("解码调度任务结束(窗口已满)")
+            return
+
+        logger.info(
+            "动态窗口补充 dt={} 当前解构中={} 目标={} 本次补充={}",
+            dt,
+            after_poll_non_terminal,
+            BATCH_SIZE,
+            need,
+        )
+        records = _pick_candidate_records(dt=dt, batch_size=need)
+        if not records:
+            logger.info("无可发起的新批次候选数据 dt={} need={}", dt, need)
+            logger.info("解码调度任务结束(无新增任务)")
+            return
+        logger.info("解码提交接口执行开始 records={}", records)
+        posts = _build_posts_payload(records)
+        logger.info("解码提交接口执行开始 posts={}", posts)
+        ok, err_msg, body = _submit_decode(posts)
+        logger.info(
+            "解码提交接口执行完成 success={} records={} msg={} body={}",
+            ok,
+            len(records),
+            err_msg,
+            body,
+        )
+
+        if not ok:
+            fail_body = json.dumps({"decode_submit_response": body}, ensure_ascii=False)
+            for item in records:
+                _insert_task_result_row(
+                    item,
+                    status=3,
+                    err_msg=err_msg or "解码提交失败",
+                    data_content=fail_body,
+                    html=None,
+                )
+        else:
+            if isinstance(body.get("data"), list) and body["data"]:
+                _insert_rows_after_decode_submit(records, body)
+            else:
+                payload = json.dumps({"decode_submit_response": body}, ensure_ascii=False)
+                for item in records:
+                    _insert_task_result_row(
+                        item,
+                        status=1,
+                        err_msg="",
+                        data_content=payload,
+                        html=None,
+                    )
+
+        logger.info("解码调度任务结束,本轮新发起数量={}", len(records))
+    except Exception as exc:
+        logger.exception("解码调度任务异常退出: {}", exc)
+        return

+ 283 - 0
scheduler/decode_hourly_stats_job.py

@@ -0,0 +1,283 @@
+import base64
+import hashlib
+import hmac
+import sys
+import time
+from datetime import datetime, timedelta
+from pathlib import Path
+from typing import Dict
+from zoneinfo import ZoneInfo
+
+import requests
+
+# 支持直接以脚本方式运行:python scheduler/decode_hourly_stats_job.py
+_PROJECT_ROOT = Path(__file__).resolve().parent.parent
+if str(_PROJECT_ROOT) not in sys.path:
+    sys.path.insert(0, str(_PROJECT_ROOT))
+
+from utils.scheduler_logger import get_scheduler_logger
+from utils.sync_mysql_help import mysql
+from scheduler.odps_fetch import count_priority_posts
+
+
+logger = get_scheduler_logger()
+_TZ = ZoneInfo("Asia/Shanghai")
+_FEISHU_WEBHOOK = "https://open.feishu.cn/open-apis/bot/v2/hook/af94b535-ed47-47d8-87f4-d893e1077276"
+_FEISHU_SIGN_SECRET = "lebZtBVkKJrbaVFlss2Pcf"
+_ALERT_FAIL_RATE = 0.15
+_ALERT_SUCCESS_RATE = 0.20
+
+
+def _previous_hour_window(now: datetime) -> tuple[datetime, datetime]:
+    current_hour = now.replace(minute=0, second=0, microsecond=0)
+    return current_hour - timedelta(hours=1), current_hour
+
+
+def _fetch_hourly_stats(window_start: datetime, window_end: datetime) -> Dict[str, int]:
+    sql = """
+        SELECT
+            COUNT(1) AS total_count,
+            SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) AS success_count,
+            SUM(CASE WHEN status = 0 THEN 1 ELSE 0 END) AS pending_count,
+            SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) AS running_count,
+            SUM(CASE WHEN status = 3 THEN 1 ELSE 0 END) AS fail_count
+        FROM aigc_topic_decode_task_result
+        WHERE create_time >= %s
+          AND create_time < %s
+    """
+    row = mysql.fetchone(
+        sql,
+        (
+            window_start.strftime("%Y-%m-%d %H:%M:%S"),
+            window_end.strftime("%Y-%m-%d %H:%M:%S"),
+        ),
+    )
+    return {
+        "total": int((row or {}).get("total_count") or 0),
+        "success": int((row or {}).get("success_count") or 0),
+        "pending": int((row or {}).get("pending_count") or 0),
+        "running": int((row or {}).get("running_count") or 0),
+        "fail": int((row or {}).get("fail_count") or 0),
+    }
+
+
+def _fetch_today_stats(now: datetime) -> Dict[str, int]:
+    day_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
+    sql = """
+        SELECT
+            SUM(CASE WHEN status IN (1, 2, 3) THEN 1 ELSE 0 END) AS executed_count
+        FROM aigc_topic_decode_task_result
+        WHERE create_time >= %s
+          AND create_time < %s
+    """
+    row = mysql.fetchone(
+        sql,
+        (
+            day_start.strftime("%Y-%m-%d %H:%M:%S"),
+            now.strftime("%Y-%m-%d %H:%M:%S"),
+        ),
+    )
+    executed_total = int((row or {}).get("executed_count") or 0)
+    demand_total = int(count_priority_posts(now.strftime("%Y%m%d")) or 0)
+    pending_total = max(demand_total - executed_total, 0)
+    return {
+        "executed_total": executed_total,
+        "pending_total": pending_total,
+    }
+
+
+def _has_overdue_pending_task(now: datetime) -> bool:
+    overdue_before = now - timedelta(minutes=30)
+    sql = """
+        SELECT COUNT(1) AS total_count
+        FROM aigc_topic_decode_task_result
+        WHERE status IN (0, 1)
+          AND create_time <= %s
+    """
+    row = mysql.fetchone(sql, (overdue_before.strftime("%Y-%m-%d %H:%M:%S"),))
+    return int((row or {}).get("total_count") or 0) > 0
+
+
+def _gen_feishu_sign(timestamp: str, secret: str) -> str:
+    # 与飞书文档一致:把 timestamp + "\n" + 密钥 作为 key,对空串做 HmacSHA256 再 Base64
+    # https://open.feishu.cn/document/client-docs/bot-v3/add-custom-bot
+    string_to_sign = "{}\n{}".format(timestamp, secret)
+    hmac_code = hmac.new(
+        string_to_sign.encode("utf-8"),
+        b"",
+        digestmod=hashlib.sha256,
+    ).digest()
+    return base64.b64encode(hmac_code).decode("utf-8")
+
+
+def _build_feishu_card(
+    *,
+    is_alert: bool,
+    window_start: datetime,
+    window_end: datetime,
+    total: int,
+    success: int,
+    pending: int,
+    running: int,
+    fail: int,
+    fail_rate: float,
+    today_executed_total: int,
+    today_pending_total: int,
+) -> Dict:
+    header_title = "票圈供给-AIGC选题解构任务小时统计"
+    header_subtitle = (
+        f"{window_start.strftime('%Y-%m-%d %H:%M')} - {window_end.strftime('%H:%M')}"
+    )
+    level_tag = "异常告警" if is_alert else "运行正常"
+    template = "red" if is_alert else "blue"
+    status_line = "⚠️ 当前失败率已超过阈值,请尽快排查" if is_alert else "✅ 当前任务运行正常"
+
+    markdown_lines = [
+        status_line,
+        f"**发起数**:{total}",
+        f"**成功数**:{success}",
+        f"**执行中数**:{pending + running}",
+        f"**失败数**:{fail}",
+        f"**失败率**:{fail_rate:.2%}",
+        f"**当日已执行总数**:{today_executed_total}",
+        f"**当日待执行总数**:{today_pending_total}",
+    ]
+    body_elements = [
+        {
+            "tag": "markdown",
+            "content": "\n".join(markdown_lines),
+            "text_align": "left",
+        }
+    ]
+    if is_alert:
+        body_elements.insert(
+            0,
+            {
+                "tag": "markdown",
+                "content": "<at id=all></at>",
+                "text_align": "left",
+            },
+        )
+
+    return {
+        "schema": "2.0",
+        "config": {
+            "enable_forward": True,
+            "update_multi": True,
+            "width_mode": "compact",
+        },
+        "header": {
+            "title": {
+                "tag": "plain_text",
+                "content": header_title,
+            },
+            "subtitle": {
+                "tag": "plain_text",
+                "content": header_subtitle,
+            },
+            "text_tag_list": [
+                {
+                    "tag": "text_tag",
+                    "text": {
+                        "tag": "plain_text",
+                        "content": level_tag,
+                    },
+                    "color": "red" if is_alert else "green",
+                }
+            ],
+            "template": template,
+            "padding": "12px 12px 12px 12px",
+        },
+        "body": {
+            "direction": "vertical",
+            "padding": "12px 12px 12px 12px",
+            "horizontal_align": "left",
+            "vertical_spacing": "8px",
+            "elements": body_elements,
+        },
+    }
+
+
+def _send_feishu_card(card: Dict) -> None:
+    timestamp = str(int(time.time()))
+    sign = _gen_feishu_sign(timestamp, _FEISHU_SIGN_SECRET)
+    payload = {
+        "timestamp": timestamp,
+        "sign": sign,
+        "msg_type": "interactive",
+        "card": card,
+    }
+    resp = requests.post(_FEISHU_WEBHOOK, json=payload, timeout=10)
+    if resp.status_code != 200:
+        raise RuntimeError(f"send_feishu_http_status_{resp.status_code}")
+    body = resp.json()
+    if body.get("code") != 0:
+        raise RuntimeError(f"send_feishu_failed code={body.get('code')} msg={body.get('msg')}")
+
+
+def run_decode_hourly_stats_job() -> None:
+    now = datetime.now(_TZ)
+    window_start, window_end = _previous_hour_window(now)
+    logger.info(
+        "解构小时统计任务开始,统计窗口=[{}, {})",
+        window_start.strftime("%Y-%m-%d %H:%M:%S"),
+        window_end.strftime("%Y-%m-%d %H:%M:%S"),
+    )
+    try:
+        stats = _fetch_hourly_stats(window_start, window_end)
+        total = stats["total"]
+        success = stats["success"]
+        pending = stats["pending"]
+        running = stats["running"]
+        fail = stats["fail"]
+        executed_in_window = success + running + fail
+        if executed_in_window <= 0:
+            logger.info(
+                "当前统计窗口无已执行任务,跳过推送 window=[{}, {}) total={} pending={}",
+                window_start.strftime("%Y-%m-%d %H:%M:%S"),
+                window_end.strftime("%Y-%m-%d %H:%M:%S"),
+                total,
+                pending,
+            )
+            return
+        fail_rate = (fail / total) if total > 0 else 0.0
+        success_rate = (success / total) if total > 0 else 0.0
+        today_stats = _fetch_today_stats(now)
+        today_executed_total = today_stats["executed_total"]
+        today_pending_total = today_stats["pending_total"]
+        has_overdue_pending_task = _has_overdue_pending_task(now)
+        is_alert = total > 0 and (
+            fail_rate > _ALERT_FAIL_RATE
+            or (success_rate < _ALERT_SUCCESS_RATE and has_overdue_pending_task)
+        )
+        card = _build_feishu_card(
+            is_alert=is_alert,
+            window_start=window_start,
+            window_end=window_end,
+            total=total,
+            success=success,
+            pending=pending,
+            running=running,
+            fail=fail,
+            fail_rate=fail_rate,
+            today_executed_total=today_executed_total,
+            today_pending_total=today_pending_total,
+        )
+        _send_feishu_card(card)
+        logger.info(
+            "解构小时统计推送完成 total={} success={} pending={} running={} fail={} fail_rate={:.2%} success_rate={:.2%} has_overdue_pending_task={} is_alert={}",
+            total,
+            success,
+            pending,
+            running,
+            fail,
+            fail_rate,
+            success_rate,
+            has_overdue_pending_task,
+            is_alert,
+        )
+    except Exception as exc:
+        logger.exception("解构小时统计任务失败: {}", exc)
+
+if __name__ == "__main__":
+    run_decode_hourly_stats_job()   

+ 112 - 0
scheduler/odps_fetch.py

@@ -0,0 +1,112 @@
+import os
+import json
+import sys
+from pathlib import Path
+from datetime import datetime
+from typing import Any, Dict, List, Optional
+from zoneinfo import ZoneInfo
+
+from odps import ODPS
+from dotenv import find_dotenv, load_dotenv
+
+# 支持直接以脚本方式运行:python scheduler/odps_fetch.py
+PROJECT_ROOT = Path(__file__).resolve().parent.parent
+if str(PROJECT_ROOT) not in sys.path:
+    sys.path.insert(0, str(PROJECT_ROOT))
+
+from utils.scheduler_logger import get_scheduler_logger
+
+
+logger = get_scheduler_logger()
+load_dotenv(find_dotenv(), override=False)
+
+# 固定项目与 endpoint(按需求不走环境变量)
+ODPS_PROJECT = "loghubods"
+ODPS_ENDPOINT = "http://service.cn-hangzhou.maxcompute.aliyun.com/api"
+ODPS_TABLE = "dwd_topic_decode_input_vids_di"
+
+
+def _today_dt() -> str:
+    return datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
+
+
+def _build_odps_client() -> ODPS:
+    ak = os.getenv("ODPS_ACCESS_KEY_ID", "LTAI5t9b7RnUiCy5v3gqXf9Y")
+    sk = os.getenv("ODPS_ACCESS_KEY_SECRET", "1HVHQe0AV7xaoOWHsM8k9XN3gEaal7")
+    if not ak or not sk:
+        raise ValueError("missing ODPS_ACCESS_KEY_ID or ODPS_ACCESS_KEY_SECRET")
+    return ODPS(ak, sk, ODPS_PROJECT, endpoint=ODPS_ENDPOINT)
+
+
+def fetch_priority_posts(limit: int = 10, offset: int = 0, dt: Optional[str] = None) -> List[Dict[str, Any]]:
+    target_dt = dt or _today_dt()
+    sql = f"""
+        SELECT
+            type_id, type, channel, vid, cate1, cate2, title, url,
+            level, reason, count, extend, dt
+        FROM {ODPS_TABLE}
+        WHERE dt = '{target_dt}'
+          AND level IN (0, 1, 2)
+        ORDER BY count DESC, level ASC
+        LIMIT {offset}, {limit}
+    """
+    logger.info("开始执行ODPS查询 dt={} limit={} offset={}", target_dt, limit, offset)
+
+    odps = _build_odps_client()
+    instance = odps.execute_sql(sql)
+    records: List[Dict[str, Any]] = []
+    with instance.open_reader() as reader:
+        for row in reader:
+            records.append(
+                {
+                    "type_id": row[0],
+                    "type": row[1],
+                    "channel": row[2],
+                    "vid": row[3],
+                    "cate1": row[4],
+                    "cate2": row[5],
+                    "title": row[6],
+                    "url": row[7],
+                    "level": row[8],
+                    "reason": row[9],
+                    "count": row[10],
+                    "extend": row[11],
+                    "dt": row[12] if len(row) > 12 else target_dt,
+                }
+            )
+
+    logger.info("ODPS查询完成 dt={} 返回数量={}", target_dt, len(records))
+    return records
+
+
+def fetch_top_priority_posts(limit: int = 10) -> List[Dict[str, Any]]:
+    return fetch_priority_posts(limit=limit, offset=0)
+
+
+def count_priority_posts(dt: Optional[str] = None) -> int:
+    target_dt = dt or _today_dt()
+    sql = f"""
+        SELECT COUNT(1) AS total_count
+        FROM {ODPS_TABLE}
+        WHERE dt = '{target_dt}'
+          AND level IN (0, 1, 2)
+    """
+    logger.info("开始执行ODPS计数 dt={}", target_dt)
+    odps = _build_odps_client()
+    instance = odps.execute_sql(sql)
+    with instance.open_reader() as reader:
+        for row in reader:
+            return int(row[0] or 0)
+    return 0
+
+
+if __name__ == "__main__":
+    try:
+        test_limit = int(os.getenv("ODPS_TEST_LIMIT", "10"))
+        data = fetch_top_priority_posts(limit=test_limit)
+        print(f"fetch_count={len(data)}")
+        preview = data
+        print(json.dumps(preview, ensure_ascii=False, indent=2))
+    except Exception as exc:
+        logger.exception("ODPS查询测试失败: {}", exc)
+        raise

+ 26 - 0
utils/scheduler_logger.py

@@ -0,0 +1,26 @@
+from pathlib import Path
+
+from loguru import logger
+
+
+_SCHEDULER_LOGGER_CONFIGURED = False
+
+
+def get_scheduler_logger():
+    """Return scheduler logger with daily file sink."""
+    global _SCHEDULER_LOGGER_CONFIGURED
+    if not _SCHEDULER_LOGGER_CONFIGURED:
+        log_dir = Path("logs/scheduler")
+        log_dir.mkdir(parents=True, exist_ok=True)
+        logger.add(
+            str(log_dir / "scheduler_{time:YYYY-MM-DD}.log"),
+            level="INFO",
+            rotation="00:00",
+            retention="30 days",
+            encoding="utf-8",
+            enqueue=True,
+            backtrace=True,
+            diagnose=False,
+        )
+        _SCHEDULER_LOGGER_CONFIGURED = True
+    return logger

+ 59 - 19
utils/sync_mysql_help.py

@@ -1,4 +1,5 @@
 import os
+import time
 from loguru import logger
 
 import pymysql
@@ -34,31 +35,72 @@ class SyncMySQLHelper(object):
             database = os.getenv('DB_NAME', 'content-deconstruction-supply')
             logger.info(f"✅ 当前使用数据库 : {database}")
 
-
-            self._pool = PooledDB(
-                creator=pymysql,
-                mincached=10,
-                maxconnections=20,
-                blocking=True,
-                host=host,
-                port=port,
-                user=user,
-                password=password,
-                database=database)
+            # 防止调度任务因网络/DB 抖动“无限阻塞”
+            # - connect_timeout: 建连超时(秒)
+            # - read_timeout/write_timeout: 单次读写超时(秒)
+            # - blocking=False: 连接池耗尽时直接抛错,避免卡死整个 job
+            connect_timeout = int(os.getenv("DB_CONNECT_TIMEOUT", "30"))
+            read_timeout = int(os.getenv("DB_READ_TIMEOUT", "50"))
+            write_timeout = int(os.getenv("DB_WRITE_TIMEOUT", "50"))
+
+            # 注意:mincached 过大时会在初始化阶段“批量建连”,DB 抖动会直接把启动拖垮。
+            # 这里改为懒加载,按需建连。
+            mincached = int(os.getenv("DB_POOL_MINCACHED", "0"))
+            maxconnections = int(os.getenv("DB_POOL_MAXCONN", "20"))
+
+            last_exc: Optional[Exception] = None
+            for attempt in range(2):
+                try:
+                    self._pool = PooledDB(
+                        creator=pymysql,
+                        mincached=mincached,
+                        maxconnections=maxconnections,
+                        blocking=False,
+                        maxusage=1000,
+                        ping=1,
+                        host=host,
+                        port=port,
+                        user=user,
+                        password=password,
+                        database=database,
+                        connect_timeout=connect_timeout,
+                        read_timeout=read_timeout,
+                        write_timeout=write_timeout,
+                        charset="utf8mb4",
+                    )
+                    last_exc = None
+                    break
+                except Exception as exc:
+                    # 保持 _pool 为 None,下一轮调度可继续重试建池
+                    self._pool = None
+                    last_exc = exc
+                    logger.exception(f"❌ 初始化数据库连接池失败(第{attempt + 1}次): {exc}")
+                    time.sleep(0.5)
+
+            if last_exc is not None:
+                raise last_exc
 
         return self._pool
 
-    def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
+    def _conn(self):
         pool = self.get_pool()
-        with pool.connection() as conn:  
+        try:
+            # DBUtils 在 blocking=False 时,连接不足会直接抛异常
+            # 这里统一捕获并打印,避免外层任务“静默卡住”
+            return pool.connection()
+        except Exception as exc:
+            logger.exception(f"❌ 获取数据库连接失败: {exc}")
+            raise
+
+    def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
+        with self._conn() as conn:
             with conn.cursor(DictCursor) as cursor: 
                 cursor.execute(sql, data)
                 result = cursor.fetchone()
                 return result
 
     def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]:
-        pool = self.get_pool()
-        with pool.connection() as conn: 
+        with self._conn() as conn:
             with conn.cursor(DictCursor) as cursor: 
                 cursor.execute(sql, data)
                 result = cursor.fetchall()
@@ -68,16 +110,14 @@ class SyncMySQLHelper(object):
                   sql: str,
                   data: Optional[Tuple[Any, ...]] = None,
                   size: Optional[int] = None) -> Tuple[Dict[str, Any]]:
-        pool = self.get_pool()
-        with pool.connection() as conn:  
+        with self._conn() as conn:
             with conn.cursor(DictCursor) as cursor: 
                 cursor.execute(sql, data)
                 result = cursor.fetchmany(size=size)
                 return result
 
     def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None):
-        pool = self.get_pool()
-        with pool.connection() as conn:  
+        with self._conn() as conn:
             with conn.cursor(DictCursor) as cursor:  
                 try:
                     cursor.execute(sql, data)