Просмотр исходного кода

feat: 增加aigc平台解构调度任务

jihuaqiang 1 неделя назад
Родитель
Сommit
c706089c18
3 измененных файлов с 174 добавлено и 90 удалено
  1. 113 69
      scheduler/decode_dispatch_job.py
  2. 2 2
      scheduler/odps_fetch.py
  3. 59 19
      utils/sync_mysql_help.py

+ 113 - 69
scheduler/decode_dispatch_job.py

@@ -51,6 +51,29 @@ def _today_dt() -> str:
     return datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
 
 
+def _is_decode_submit_open() -> bool:
+    """
+    Gate for submitting NEW decode tasks.
+    Only controls whether to submit; polling/querying existing tasks is unaffected.
+    """
+    sql = """
+        SELECT is_open
+        FROM aigc_topic_decode_task_oprate
+        ORDER BY id DESC
+        LIMIT 1
+    """
+    try:
+        row = mysql.fetchone(sql)
+        if not row:
+            # Fail-open if table is empty to avoid blocking by default.
+            return True
+        return int(row.get("is_open") or 0) == 1
+    except Exception as exc:
+        # Conservative: if we cannot confirm switch is open, skip submit this cycle.
+        logger.exception("查询解构开关失败,本轮不发起新解构任务: {}", exc)
+        return False
+
+
 def _fetch_today_pending_vids(dt: str) -> List[str]:
     sql = """
         SELECT DISTINCT vid
@@ -198,11 +221,20 @@ def _poll_decode_results_for_today(dt: str, vids: List[str]) -> None:
 def _build_posts_payload(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
     posts: List[Dict[str, Any]] = []
     for item in records:
+        extend_raw = item.get("extend")
+        extend_obj: Dict[str, Any]
+        if isinstance(extend_raw, dict):
+            extend_obj = extend_raw
+        else:
+            extend_obj = _safe_json_loads(str(extend_raw)) if extend_raw is not None else {}
+        cover_url = extend_obj.get("cover_url") or ""
+        images = [cover_url] if cover_url else []
         posts.append(
             {
                 "channelContentId": item.get("vid") or "",
                 "title": item.get("title") or "",
                 "video": item.get("url") or "",
+                "images": images,
                 "contentModal": 4,
                 "channel": 10,
             }
@@ -307,7 +339,8 @@ def _insert_task_result_row(
 ) -> None:
     extend = _safe_json_loads(source.get("extend"))
     cover_url = extend.get("cover_url", "")
-    images_text = cover_url if isinstance(cover_url, str) else ""
+    cover_text = cover_url if isinstance(cover_url, str) else ""
+    images_text = source.get("url") or ""
     sql = """
         INSERT INTO aigc_topic_decode_task_result
         (task_id, status, err_msg, vid, title, cover, video_url, images, type, channel, cate1, cate2, dt, data_content, html)
@@ -320,7 +353,7 @@ def _insert_task_result_row(
         err_msg or "",
         str(source.get("vid") or ""),
         source.get("title") or "",
-        images_text,
+        cover_text,
         source.get("url") or "",
         images_text,
         source.get("type") or "",
@@ -347,81 +380,92 @@ def _insert_rows_after_decode_submit(records: List[Dict[str, Any]], body: Dict[s
 
 def run_decode_dispatch_job() -> None:
     logger.info("解码调度任务开始执行")
-    dt = _today_dt()
-
-    # Startup guard: if there are in-flight tasks today, poll only in this run.
-    # New batch submit will wait for next scheduler cycle after all are terminal.
-    initial_non_terminal = _count_today_non_terminal(dt)
-    if initial_non_terminal > 0:
-        logger.info(
-            "启动时发现当天存在进行中任务,本轮仅查询不发起新批次 dt={} count={}",
-            dt,
-            initial_non_terminal,
-        )
-        pending_vids = _fetch_today_pending_vids(dt)
-        if pending_vids:
-            logger.info("查询当天待执行/执行中记录 dt={} count={}", dt, len(pending_vids))
-            _poll_decode_results_for_today(dt, pending_vids)
-        else:
-            logger.warning(
-                "存在非终态记录但未获取到可查询vid dt={} count={}",
-                dt,
-                initial_non_terminal,
-            )
+    try:
+        dt = _today_dt()
 
-        remaining_non_terminal = _count_today_non_terminal(dt)
-        if remaining_non_terminal > 0:
-            logger.info(
-                "查询后仍有待执行/执行中任务,跳过新批次发起 dt={} count={}",
-                dt,
-                remaining_non_terminal,
-            )
-        else:
+        # Startup guard: if there are in-flight tasks today, poll only in this run.
+        # New batch submit will wait for next scheduler cycle after all are terminal.
+        initial_non_terminal = _count_today_non_terminal(dt)
+        if initial_non_terminal > 0:
             logger.info(
-                "查询后当天进行中任务已清空,将在下一轮发起新批次 dt={}",
+                "启动时发现当天存在进行中任务,本轮仅查询不发起新批次 dt={} count={}",
                 dt,
+                initial_non_terminal,
             )
-        logger.info("解码调度任务结束(启动保护:仅查询)")
-        return
-
-    records = _pick_candidate_records(dt=dt, batch_size=BATCH_SIZE)
-    if not records:
-        logger.info("无可发起的新批次候选数据 dt={}", dt)
-        logger.info("解码调度任务结束(无新增任务)")
-        return
+            pending_vids = _fetch_today_pending_vids(dt)
+            if pending_vids:
+                logger.info("查询当天待执行/执行中记录 dt={} count={}", dt, len(pending_vids))
+                _poll_decode_results_for_today(dt, pending_vids)
+            else:
+                logger.warning(
+                    "存在非终态记录但未获取到可查询vid dt={} count={}",
+                    dt,
+                    initial_non_terminal,
+                )
 
-    posts = _build_posts_payload(records)
-    ok, err_msg, body = _submit_decode(posts)
-    logger.info(
-        "解码提交接口执行完成 success={} records={} msg={} body={}",
-        ok,
-        len(records),
-        err_msg,
-        body,
-    )
+            remaining_non_terminal = _count_today_non_terminal(dt)
+            if remaining_non_terminal > 0:
+                logger.info(
+                    "查询后仍有待执行/执行中任务,跳过新批次发起 dt={} count={}",
+                    dt,
+                    remaining_non_terminal,
+                )
+                logger.info("解码调度任务结束(启动保护:仅查询)")
+                return
+            else:
+                logger.info(
+                    "查询后当天进行中任务已清空,立即发起新批次 dt={}",
+                    dt,
+                )
+                # fallthrough: submit new batch in the same run
+
+        if not _is_decode_submit_open():
+            logger.info("解构开关关闭(is_open!=1),跳过本轮新批次发起 dt={}", dt)
+            logger.info("解码调度任务结束(开关关闭:不发起新任务)")
+            return
+
+        records = _pick_candidate_records(dt=dt, batch_size=BATCH_SIZE)
+        if not records:
+            logger.info("无可发起的新批次候选数据 dt={}", dt)
+            logger.info("解码调度任务结束(无新增任务)")
+            return
+        logger.info("解码提交接口执行开始 records={}", records)
+        posts = _build_posts_payload(records)
+        logger.info("解码提交接口执行开始 posts={}", posts)
+        ok, err_msg, body = _submit_decode(posts)
+        logger.info(
+            "解码提交接口执行完成 success={} records={} msg={} body={}",
+            ok,
+            len(records),
+            err_msg,
+            body,
+        )
 
-    if not ok:
-        fail_body = json.dumps({"decode_submit_response": body}, ensure_ascii=False)
-        for item in records:
-            _insert_task_result_row(
-                item,
-                status=3,
-                err_msg=err_msg or "解码提交失败",
-                data_content=fail_body,
-                html=None,
-            )
-    else:
-        if isinstance(body.get("data"), list) and body["data"]:
-            _insert_rows_after_decode_submit(records, body)
-        else:
-            payload = json.dumps({"decode_submit_response": body}, ensure_ascii=False)
+        if not ok:
+            fail_body = json.dumps({"decode_submit_response": body}, ensure_ascii=False)
             for item in records:
                 _insert_task_result_row(
                     item,
-                    status=1,
-                    err_msg="",
-                    data_content=payload,
+                    status=3,
+                    err_msg=err_msg or "解码提交失败",
+                    data_content=fail_body,
                     html=None,
                 )
-
-    logger.info("解码调度任务结束,本轮新发起数量={}", len(records))
+        else:
+            if isinstance(body.get("data"), list) and body["data"]:
+                _insert_rows_after_decode_submit(records, body)
+            else:
+                payload = json.dumps({"decode_submit_response": body}, ensure_ascii=False)
+                for item in records:
+                    _insert_task_result_row(
+                        item,
+                        status=1,
+                        err_msg="",
+                        data_content=payload,
+                        html=None,
+                    )
+
+        logger.info("解码调度任务结束,本轮新发起数量={}", len(records))
+    except Exception as exc:
+        logger.exception("解码调度任务异常退出: {}", exc)
+        return

+ 2 - 2
scheduler/odps_fetch.py

@@ -31,8 +31,8 @@ def _today_dt() -> str:
 
 
 def _build_odps_client() -> ODPS:
-    ak = os.getenv("ODPS_ACCESS_KEY_ID", "")
-    sk = os.getenv("ODPS_ACCESS_KEY_SECRET", "")
+    ak = os.getenv("ODPS_ACCESS_KEY_ID", "LTAI5t9b7RnUiCy5v3gqXf9Y")
+    sk = os.getenv("ODPS_ACCESS_KEY_SECRET", "1HVHQe0AV7xaoOWHsM8k9XN3gEaal7")
     if not ak or not sk:
         raise ValueError("missing ODPS_ACCESS_KEY_ID or ODPS_ACCESS_KEY_SECRET")
     return ODPS(ak, sk, ODPS_PROJECT, endpoint=ODPS_ENDPOINT)

+ 59 - 19
utils/sync_mysql_help.py

@@ -1,4 +1,5 @@
 import os
+import time
 from loguru import logger
 
 import pymysql
@@ -34,31 +35,72 @@ class SyncMySQLHelper(object):
             database = os.getenv('DB_NAME', 'content-deconstruction-supply')
             logger.info(f"✅ 当前使用数据库 : {database}")
 
-
-            self._pool = PooledDB(
-                creator=pymysql,
-                mincached=10,
-                maxconnections=20,
-                blocking=True,
-                host=host,
-                port=port,
-                user=user,
-                password=password,
-                database=database)
+            # 防止调度任务因网络/DB 抖动“无限阻塞”
+            # - connect_timeout: 建连超时(秒)
+            # - read_timeout/write_timeout: 单次读写超时(秒)
+            # - blocking=False: 连接池耗尽时直接抛错,避免卡死整个 job
+            connect_timeout = int(os.getenv("DB_CONNECT_TIMEOUT", "30"))
+            read_timeout = int(os.getenv("DB_READ_TIMEOUT", "50"))
+            write_timeout = int(os.getenv("DB_WRITE_TIMEOUT", "50"))
+
+            # 注意:mincached 过大时会在初始化阶段“批量建连”,DB 抖动会直接把启动拖垮。
+            # 这里改为懒加载,按需建连。
+            mincached = int(os.getenv("DB_POOL_MINCACHED", "0"))
+            maxconnections = int(os.getenv("DB_POOL_MAXCONN", "20"))
+
+            last_exc: Optional[Exception] = None
+            for attempt in range(2):
+                try:
+                    self._pool = PooledDB(
+                        creator=pymysql,
+                        mincached=mincached,
+                        maxconnections=maxconnections,
+                        blocking=False,
+                        maxusage=1000,
+                        ping=1,
+                        host=host,
+                        port=port,
+                        user=user,
+                        password=password,
+                        database=database,
+                        connect_timeout=connect_timeout,
+                        read_timeout=read_timeout,
+                        write_timeout=write_timeout,
+                        charset="utf8mb4",
+                    )
+                    last_exc = None
+                    break
+                except Exception as exc:
+                    # 保持 _pool 为 None,下一轮调度可继续重试建池
+                    self._pool = None
+                    last_exc = exc
+                    logger.exception(f"❌ 初始化数据库连接池失败(第{attempt + 1}次): {exc}")
+                    time.sleep(0.5)
+
+            if last_exc is not None:
+                raise last_exc
 
         return self._pool
 
-    def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
+    def _conn(self):
         pool = self.get_pool()
-        with pool.connection() as conn:  
+        try:
+            # DBUtils 在 blocking=False 时,连接不足会直接抛异常
+            # 这里统一捕获并打印,避免外层任务“静默卡住”
+            return pool.connection()
+        except Exception as exc:
+            logger.exception(f"❌ 获取数据库连接失败: {exc}")
+            raise
+
+    def fetchone(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Dict[str, Any]:
+        with self._conn() as conn:
             with conn.cursor(DictCursor) as cursor: 
                 cursor.execute(sql, data)
                 result = cursor.fetchone()
                 return result
 
     def fetchall(self, sql: str, data: Optional[Tuple[Any, ...]] = None) -> Tuple[Dict[str, Any]]:
-        pool = self.get_pool()
-        with pool.connection() as conn: 
+        with self._conn() as conn:
             with conn.cursor(DictCursor) as cursor: 
                 cursor.execute(sql, data)
                 result = cursor.fetchall()
@@ -68,16 +110,14 @@ class SyncMySQLHelper(object):
                   sql: str,
                   data: Optional[Tuple[Any, ...]] = None,
                   size: Optional[int] = None) -> Tuple[Dict[str, Any]]:
-        pool = self.get_pool()
-        with pool.connection() as conn:  
+        with self._conn() as conn:
             with conn.cursor(DictCursor) as cursor: 
                 cursor.execute(sql, data)
                 result = cursor.fetchmany(size=size)
                 return result
 
     def execute(self, sql: str, data: Optional[Tuple[Any, ...]] = None):
-        pool = self.get_pool()
-        with pool.connection() as conn:  
+        with self._conn() as conn:
             with conn.cursor(DictCursor) as cursor:  
                 try:
                     cursor.execute(sql, data)