Преглед изворни кода

feat:增加定时任务,aigc解构

jihuaqiang пре 2 недеља
родитељ
комит
6f03c91c9a
8 измењених фајлова са 618 додато и 2 уклоњено
  1. 4 1
      .env
  2. 2 0
      docker-compose.yaml
  3. 19 1
      main.py
  4. 2 0
      requirements.txt
  5. 44 0
      scheduler/bootstrap.py
  6. 427 0
      scheduler/decode_dispatch_job.py
  7. 94 0
      scheduler/odps_fetch.py
  8. 26 0
      utils/scheduler_logger.py

+ 4 - 1
.env

@@ -8,4 +8,7 @@ LOG_LEVEL=INFO
 
 
 # local本地  prod线上
-APP_ENV=prod  
+APP_ENV=prod 
+
+ODPS_ACCESS_KEY_ID=LTAI5t9b7RnUiCy5v3gqXf9Y
+ODPS_ACCESS_KEY_SECRET=1HVHQe0AV7xaoOWHsM8k9XN3gEaal7

+ 2 - 0
docker-compose.yaml

@@ -10,6 +10,8 @@ services:
     restart: always
     ports:
       - "8000:8000"
+    volumes:
+      - ./logs:/video_decode/logs
     env_file:
       - .env
     environment:

+ 19 - 1
main.py

@@ -10,6 +10,7 @@ from tasks.decode import begin_decode_task
 from tasks.detail import get_decode_detail_by_task_id
 from tasks.pattern import begin_pattern_task
 from tasks.topic_search import search_topics
+from scheduler.bootstrap import run_dispatch_once, start_scheduler, stop_scheduler
 
 from loguru import logger
 import sys
@@ -49,6 +50,16 @@ app.add_middleware(
 )
 
 
+@app.on_event("startup")
+async def on_startup() -> None:
+    start_scheduler()
+
+
+@app.on_event("shutdown")
+async def on_shutdown() -> None:
+    stop_scheduler()
+
+
 @app.middleware("http")
 async def api_access_log_middleware(request: Request, call_next):
     """记录每个接口的请求(路径、查询、body)与响应(状态码、body)。"""
@@ -179,4 +190,11 @@ def pattern_content(param: PatternContentParam):
 def search_content_topics(param: TopicSearchParam):
     """视频选题检索:根据关键词在解构结果中匹配,返回匹配度最高的 top5"""
     results = search_topics(param)
-    return _build_api_response(code=0, data=results)
+    return _build_api_response(code=0, data=results)
+
+
+@app.post("/api/v1/content/tasks/scheduler/decode/run-once")
+def run_decode_scheduler_once():
+    """手动触发一次17点解构调度任务,便于验证。"""
+    run_dispatch_once()
+    return _build_api_response(code=0, data={"triggered": True})

+ 2 - 0
requirements.txt

@@ -19,6 +19,8 @@ cryptography>=41.0.0
 
 # HTTP 客户端
 requests>=2.31.0
+apscheduler>=3.10.4
+pyodps>=0.12.3
 
 # Excel 读写(mysql_work 等脚本)
 pandas>=2.0.0

+ 44 - 0
scheduler/bootstrap.py

@@ -0,0 +1,44 @@
+from typing import Optional
+from datetime import datetime
+from zoneinfo import ZoneInfo
+
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.triggers.cron import CronTrigger
+
+from scheduler.decode_dispatch_job import run_decode_dispatch_job
+from utils.scheduler_logger import get_scheduler_logger
+
+
+logger = get_scheduler_logger()
+_scheduler: Optional[BackgroundScheduler] = None
+
+
+def start_scheduler() -> None:
+    global _scheduler
+    if _scheduler and _scheduler.running:
+        logger.info("调度器已在运行,跳过重复启动")
+        return
+
+    _scheduler = BackgroundScheduler(timezone=ZoneInfo("Asia/Shanghai"))
+    _scheduler.add_job(
+        run_decode_dispatch_job,
+        trigger=CronTrigger(minute="*/5"),
+        id="decode_dispatch_every_5min",
+        replace_existing=True,
+        next_run_time=datetime.now(ZoneInfo("Asia/Shanghai")),
+    )
+    _scheduler.start()
+    logger.info("调度器已启动:立即触发一次,之后每5分钟执行一次")
+
+
+def stop_scheduler() -> None:
+    global _scheduler
+    if _scheduler and _scheduler.running:
+        _scheduler.shutdown(wait=False)
+        logger.info("调度器已停止")
+    _scheduler = None
+
+
+def run_dispatch_once() -> None:
+    logger.info("手动触发执行一次调度任务")
+    run_decode_dispatch_job()

+ 427 - 0
scheduler/decode_dispatch_job.py

@@ -0,0 +1,427 @@
+import json
+from datetime import datetime
+from typing import Any, Dict, List, Optional, Tuple
+from zoneinfo import ZoneInfo
+
+import requests
+
+from scheduler.odps_fetch import fetch_priority_posts
+from utils.scheduler_logger import get_scheduler_logger
+from utils.sync_mysql_help import mysql
+
+
+logger = get_scheduler_logger()
+
+CONFIG_ID = "57"
+DECODE_URL = "https://aigc-api.aiddit.com/aigc/api/task/decode"
+DECODE_RESULT_URL = "https://aigc-api.aiddit.com/aigc/api/task/decode/result"
+BATCH_SIZE = 10
+ODPS_PAGE_SIZE = 200
+RESULT_POLL_CHUNK = 50
+
+
+def _map_api_status_to_int(api_status: str, vid: str) -> int:
+    """Map upstream status string to DB status: 0待执行 1执行中 2成功 3失败."""
+    s = (api_status or "").strip().upper()
+    if s == "SUCCESS":
+        return 2
+    if s in ("FAILED", "FAILURE", "ERROR", "FAIL"):
+        return 3
+    if s in ("RUNNING", "PROCESSING", "DOING"):
+        return 1
+    if s in ("PENDING", "WAITING", "INIT", "QUEUED"):
+        return 0
+    if not s:
+        return 0
+    logger.warning("未知解码状态,按执行中处理 status={} vid={}", api_status, vid)
+    return 1
+
+
+def _safe_json_loads(text: Optional[str]) -> Dict[str, Any]:
+    if not text:
+        return {}
+    try:
+        data = json.loads(text)
+        return data if isinstance(data, dict) else {}
+    except Exception:
+        return {}
+
+
+def _today_dt() -> str:
+    return datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
+
+
+def _fetch_today_pending_vids(dt: str) -> List[str]:
+    sql = """
+        SELECT DISTINCT vid
+        FROM aigc_topic_decode_task_result
+        WHERE dt = %s AND status IN (0, 1) AND vid IS NOT NULL AND vid != ''
+        ORDER BY vid
+    """
+    rows = mysql.fetchall(sql, (dt,))
+    return [str(row["vid"]) for row in rows if row.get("vid")]
+
+
+def _count_today_non_terminal(dt: str) -> int:
+    sql = """
+        SELECT COUNT(1) AS total
+        FROM aigc_topic_decode_task_result
+        WHERE dt = %s AND status IN (0, 1)
+    """
+    result = mysql.fetchone(sql, (dt,))
+    return int((result or {}).get("total", 0))
+
+
+def _submit_decode_result_chunk(
+    channel_content_ids: List[str],
+) -> Tuple[bool, str, Dict[str, Any]]:
+    payload = {"params": {"configId": CONFIG_ID, "channelContentIds": channel_content_ids}}
+    try:
+        resp = requests.post(DECODE_RESULT_URL, json=payload, timeout=60)
+        if resp.status_code != 200:
+            return False, f"http_status_{resp.status_code}", {}
+        body = resp.json()
+        ok = body.get("code") == 0
+        return ok, body.get("msg") or "", body
+    except Exception as exc:
+        return False, str(exc), {}
+
+
+def _apply_result_row_to_db(dt: str, item: Dict[str, Any]) -> None:
+    vid = str(item.get("channelContentId") or "").strip()
+    if not vid:
+        return
+    api_status_raw = item.get("status") or ""
+    err_msg = (item.get("err_msg") or item.get("errorMessage") or "") or ""
+    data_content = item.get("dataContent")
+    if data_content is not None and not isinstance(data_content, str):
+        data_content = json.dumps(data_content, ensure_ascii=False)
+    html = item.get("html")
+
+    base_status = _map_api_status_to_int(str(api_status_raw), vid)
+
+    sql = """
+        UPDATE aigc_topic_decode_task_result
+        SET status = %s,
+            err_msg = %s,
+            data_content = %s,
+            html = %s
+        WHERE dt = %s AND vid = %s
+    """
+    mysql.execute(
+        sql,
+        (
+            base_status,
+            err_msg[:512] if err_msg else "",
+            data_content if data_content is not None else "",
+            html if html is not None else None,
+            dt,
+            vid,
+        ),
+    )
+
+
+def _poll_decode_results_for_today(dt: str, vids: List[str]) -> None:
+    if not vids:
+        return
+    total = len(vids)
+    logger.info("开始查询解码结果 dt={} 总vid数={}", dt, total)
+    overall_success = 0
+    overall_returned = 0
+    for i in range(0, total, RESULT_POLL_CHUNK):
+        chunk = vids[i : i + RESULT_POLL_CHUNK]
+        logger.info(
+            "查询解码结果 dt={} 分片序号={} 分片大小={} 总数={}",
+            dt,
+            i // RESULT_POLL_CHUNK,
+            len(chunk),
+            total,
+        )
+        ok, msg, body = _submit_decode_result_chunk(chunk)
+        if not ok:
+            logger.error(
+                "查询解码结果接口失败 dt={} msg={} body={}",
+                dt,
+                msg,
+                body,
+            )
+            continue
+        data_list = body.get("data")
+        if not isinstance(data_list, list):
+            logger.warning("查询解码结果返回中缺少data列表 body={}", body)
+            continue
+        chunk_success = 0
+        returned_ids = {str(x.get("channelContentId") or "") for x in data_list}
+        missing = set(chunk) - returned_ids
+        if missing:
+            logger.warning(
+                "查询解码结果返回缺少{}个vid,示例={}",
+                len(missing),
+                list(missing)[:5],
+            )
+        for item in data_list:
+            if not isinstance(item, dict):
+                continue
+            vid = str(item.get("channelContentId") or "").strip()
+            api_status = str(item.get("status") or "")
+            mapped_status = _map_api_status_to_int(api_status, vid)
+            if mapped_status == 2:
+                chunk_success += 1
+            err_msg = (item.get("err_msg") or item.get("errorMessage") or "") or ""
+            logger.info(
+                "解码结果明细 dt={} vid={} 接口状态={} 映射状态={} 错误信息={}",
+                dt,
+                vid,
+                api_status,
+                mapped_status,
+                err_msg[:512] if err_msg else "",
+            )
+            _apply_result_row_to_db(dt, item)
+        overall_success += chunk_success
+        overall_returned += len(data_list)
+        logger.info(
+            "解码结果分片处理完成 dt={} 查询数={} 返回数={} 成功数={}",
+            dt,
+            len(chunk),
+            len(data_list),
+            chunk_success,
+        )
+    logger.info(
+        "解码结果查询完成 dt={} 查询总数={} 返回总数={} 成功总数={}",
+        dt,
+        total,
+        overall_returned,
+        overall_success,
+    )
+
+
+def _build_posts_payload(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+    posts: List[Dict[str, Any]] = []
+    for item in records:
+        posts.append(
+            {
+                "channelContentId": item.get("vid") or "",
+                "title": item.get("title") or "",
+                "video": item.get("url") or "",
+                "contentModal": 4,
+                "channel": 10,
+            }
+        )
+    return posts
+
+
+def _submit_decode(posts: List[Dict[str, Any]]) -> Tuple[bool, str, Dict[str, Any]]:
+    payload = {"params": {"configId": CONFIG_ID, "posts": posts}}
+    try:
+        resp = requests.post(DECODE_URL, json=payload, timeout=60)
+        if resp.status_code != 200:
+            return False, f"http_status_{resp.status_code}", {}
+        body = resp.json()
+        ok = body.get("code") == 0
+        return ok, body.get("msg") or "", body
+    except Exception as exc:
+        return False, str(exc), {}
+
+
+def _load_existing_vids(dt: str) -> set[str]:
+    sql = """
+        SELECT DISTINCT vid
+        FROM aigc_topic_decode_task_result
+        WHERE dt = %s AND vid IS NOT NULL AND vid != ''
+    """
+    rows = mysql.fetchall(sql, (dt,))
+    return {str(row["vid"]) for row in rows if row.get("vid")}
+
+
+def _pick_candidate_records(dt: str, batch_size: int = BATCH_SIZE) -> List[Dict[str, Any]]:
+    existing_vids = _load_existing_vids(dt)
+    selected: List[Dict[str, Any]] = []
+    selected_vids: set[str] = set()
+    offset = 0
+
+    while len(selected) < batch_size:
+        page = fetch_priority_posts(limit=ODPS_PAGE_SIZE, offset=offset, dt=dt)
+        if not page:
+            break
+        for item in page:
+            vid = str(item.get("vid") or "")
+            if not vid or vid in existing_vids or vid in selected_vids:
+                continue
+            selected.append(item)
+            selected_vids.add(vid)
+            if len(selected) >= batch_size:
+                break
+        offset += ODPS_PAGE_SIZE
+
+    logger.info(
+        "候选数据筛选完成 dt={} 已选数量={} 扫描offset={}",
+        dt,
+        len(selected),
+        offset,
+    )
+    if selected:
+        vid_title_pairs = [
+            {"vid": str(item.get("vid") or ""), "title": item.get("title") or ""}
+            for item in selected
+        ]
+        logger.info("已选候选数据 dt={} items={}", dt, vid_title_pairs)
+    return selected
+
+
+def _row_status_after_decode_submit(
+    vid: str, row_in_resp: Optional[Dict[str, Any]], full_body: Dict[str, Any]
+) -> Tuple[int, str, str, Optional[str]]:
+    """Returns (status, err_msg, data_content, html) for INSERT."""
+    if not row_in_resp:
+        payload = json.dumps({"decode_submit_response": full_body}, ensure_ascii=False)
+        return 1, "", payload, None
+
+    api_status_raw = row_in_resp.get("status") or ""
+    err_msg = (row_in_resp.get("err_msg") or row_in_resp.get("errorMessage") or "") or ""
+    mapped = _map_api_status_to_int(str(api_status_raw), vid)
+    payload = json.dumps(
+        {"decode_submit_item": row_in_resp, "decode_submit_response": full_body},
+        ensure_ascii=False,
+    )
+
+    if mapped == 3:
+        return 3, err_msg[:512], payload, None
+
+    if mapped == 2:
+        # New submit API only returns status/errorMessage.
+        # Keep SUCCESS as terminal success; detailed result is queried via decode/result.
+        return 2, "", payload, None
+
+    if mapped == 0:
+        return 0, err_msg[:512], payload, None
+
+    return 1, err_msg[:512], payload, None
+
+
+def _insert_task_result_row(
+    source: Dict[str, Any],
+    status: int,
+    err_msg: str,
+    data_content: str,
+    html: Optional[str],
+) -> None:
+    extend = _safe_json_loads(source.get("extend"))
+    cover_url = extend.get("cover_url", "")
+    images_text = cover_url if isinstance(cover_url, str) else ""
+    sql = """
+        INSERT INTO aigc_topic_decode_task_result
+        (task_id, status, err_msg, vid, title, cover, video_url, images, type, channel, cate1, cate2, dt, data_content, html)
+        VALUES
+        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+    """
+    params = (
+        None,
+        status,
+        err_msg or "",
+        str(source.get("vid") or ""),
+        source.get("title") or "",
+        images_text,
+        source.get("url") or "",
+        images_text,
+        source.get("type") or "",
+        source.get("channel") or "",
+        source.get("cate1") or "",
+        source.get("cate2") or "",
+        source.get("dt") or _today_dt(),
+        data_content,
+        html,
+    )
+    mysql.execute(sql, params)
+
+
+def _insert_rows_after_decode_submit(records: List[Dict[str, Any]], body: Dict[str, Any]) -> None:
+    data_list = body.get("data") if isinstance(body.get("data"), list) else []
+    by_vid = {str(x.get("channelContentId") or ""): x for x in data_list if isinstance(x, dict)}
+
+    for item in records:
+        vid = str(item.get("vid") or "")
+        row = by_vid.get(vid)
+        status, err_msg, data_content, html = _row_status_after_decode_submit(vid, row, body)
+        _insert_task_result_row(item, status, err_msg, data_content, html)
+
+
+def run_decode_dispatch_job() -> None:
+    logger.info("解码调度任务开始执行")
+    dt = _today_dt()
+
+    # Startup guard: if there are in-flight tasks today, poll only in this run.
+    # New batch submit will wait for next scheduler cycle after all are terminal.
+    initial_non_terminal = _count_today_non_terminal(dt)
+    if initial_non_terminal > 0:
+        logger.info(
+            "启动时发现当天存在进行中任务,本轮仅查询不发起新批次 dt={} count={}",
+            dt,
+            initial_non_terminal,
+        )
+        pending_vids = _fetch_today_pending_vids(dt)
+        if pending_vids:
+            logger.info("查询当天待执行/执行中记录 dt={} count={}", dt, len(pending_vids))
+            _poll_decode_results_for_today(dt, pending_vids)
+        else:
+            logger.warning(
+                "存在非终态记录但未获取到可查询vid dt={} count={}",
+                dt,
+                initial_non_terminal,
+            )
+
+        remaining_non_terminal = _count_today_non_terminal(dt)
+        if remaining_non_terminal > 0:
+            logger.info(
+                "查询后仍有待执行/执行中任务,跳过新批次发起 dt={} count={}",
+                dt,
+                remaining_non_terminal,
+            )
+        else:
+            logger.info(
+                "查询后当天进行中任务已清空,将在下一轮发起新批次 dt={}",
+                dt,
+            )
+        logger.info("解码调度任务结束(启动保护:仅查询)")
+        return
+
+    records = _pick_candidate_records(dt=dt, batch_size=BATCH_SIZE)
+    if not records:
+        logger.info("无可发起的新批次候选数据 dt={}", dt)
+        logger.info("解码调度任务结束(无新增任务)")
+        return
+
+    posts = _build_posts_payload(records)
+    ok, err_msg, body = _submit_decode(posts)
+    logger.info(
+        "解码提交接口执行完成 success={} records={} msg={} body={}",
+        ok,
+        len(records),
+        err_msg,
+        body,
+    )
+
+    if not ok:
+        fail_body = json.dumps({"decode_submit_response": body}, ensure_ascii=False)
+        for item in records:
+            _insert_task_result_row(
+                item,
+                status=3,
+                err_msg=err_msg or "解码提交失败",
+                data_content=fail_body,
+                html=None,
+            )
+    else:
+        if isinstance(body.get("data"), list) and body["data"]:
+            _insert_rows_after_decode_submit(records, body)
+        else:
+            payload = json.dumps({"decode_submit_response": body}, ensure_ascii=False)
+            for item in records:
+                _insert_task_result_row(
+                    item,
+                    status=1,
+                    err_msg="",
+                    data_content=payload,
+                    html=None,
+                )
+
+    logger.info("解码调度任务结束,本轮新发起数量={}", len(records))

+ 94 - 0
scheduler/odps_fetch.py

@@ -0,0 +1,94 @@
+import os
+import json
+import sys
+from pathlib import Path
+from datetime import datetime
+from typing import Any, Dict, List, Optional
+from zoneinfo import ZoneInfo
+
+from odps import ODPS
+from dotenv import find_dotenv, load_dotenv
+
+# 支持直接以脚本方式运行:python scheduler/odps_fetch.py
+PROJECT_ROOT = Path(__file__).resolve().parent.parent
+if str(PROJECT_ROOT) not in sys.path:
+    sys.path.insert(0, str(PROJECT_ROOT))
+
+from utils.scheduler_logger import get_scheduler_logger
+
+
+logger = get_scheduler_logger()
+load_dotenv(find_dotenv(), override=False)
+
+# 固定项目与 endpoint(按需求不走环境变量)
+ODPS_PROJECT = "loghubods"
+ODPS_ENDPOINT = "http://service.cn-hangzhou.maxcompute.aliyun.com/api"
+ODPS_TABLE = "dwd_topic_decode_input_vids_di"
+
+
+def _today_dt() -> str:
+    return datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
+
+
+def _build_odps_client() -> ODPS:
+    ak = os.getenv("ODPS_ACCESS_KEY_ID", "")
+    sk = os.getenv("ODPS_ACCESS_KEY_SECRET", "")
+    if not ak or not sk:
+        raise ValueError("missing ODPS_ACCESS_KEY_ID or ODPS_ACCESS_KEY_SECRET")
+    return ODPS(ak, sk, ODPS_PROJECT, endpoint=ODPS_ENDPOINT)
+
+
+def fetch_priority_posts(limit: int = 10, offset: int = 0, dt: Optional[str] = None) -> List[Dict[str, Any]]:
+    target_dt = dt or _today_dt()
+    sql = f"""
+        SELECT
+            type_id, type, channel, vid, cate1, cate2, title, url,
+            level, reason, count, extend, dt
+        FROM {ODPS_TABLE}
+        WHERE dt = '{target_dt}'
+        ORDER BY level ASC
+        LIMIT {offset}, {limit}
+    """
+    logger.info("开始执行ODPS查询 dt={} limit={} offset={}", target_dt, limit, offset)
+
+    odps = _build_odps_client()
+    instance = odps.execute_sql(sql)
+    records: List[Dict[str, Any]] = []
+    with instance.open_reader() as reader:
+        for row in reader:
+            records.append(
+                {
+                    "type_id": row[0],
+                    "type": row[1],
+                    "channel": row[2],
+                    "vid": row[3],
+                    "cate1": row[4],
+                    "cate2": row[5],
+                    "title": row[6],
+                    "url": row[7],
+                    "level": row[8],
+                    "reason": row[9],
+                    "count": row[10],
+                    "extend": row[11],
+                    "dt": row[12] if len(row) > 12 else target_dt,
+                }
+            )
+
+    logger.info("ODPS查询完成 dt={} 返回数量={}", target_dt, len(records))
+    return records
+
+
+def fetch_top_priority_posts(limit: int = 10) -> List[Dict[str, Any]]:
+    return fetch_priority_posts(limit=limit, offset=0)
+
+
+if __name__ == "__main__":
+    try:
+        test_limit = int(os.getenv("ODPS_TEST_LIMIT", "10"))
+        data = fetch_top_priority_posts(limit=test_limit)
+        print(f"fetch_count={len(data)}")
+        preview = data
+        print(json.dumps(preview, ensure_ascii=False, indent=2))
+    except Exception as exc:
+        logger.exception("ODPS查询测试失败: {}", exc)
+        raise

+ 26 - 0
utils/scheduler_logger.py

@@ -0,0 +1,26 @@
+from pathlib import Path
+
+from loguru import logger
+
+
+_SCHEDULER_LOGGER_CONFIGURED = False
+
+
+def get_scheduler_logger():
+    """Return scheduler logger with daily file sink."""
+    global _SCHEDULER_LOGGER_CONFIGURED
+    if not _SCHEDULER_LOGGER_CONFIGURED:
+        log_dir = Path("logs/scheduler")
+        log_dir.mkdir(parents=True, exist_ok=True)
+        logger.add(
+            str(log_dir / "scheduler_{time:YYYY-MM-DD}.log"),
+            level="INFO",
+            rotation="00:00",
+            retention="30 days",
+            encoding="utf-8",
+            enqueue=True,
+            backtrace=True,
+            diagnose=False,
+        )
+        _SCHEDULER_LOGGER_CONFIGURED = True
+    return logger