Просмотр исходного кода

Merge branch 'dev_api_init' of weapp/video_decode into master

jihuaqiang 12 часов назад
Родитель
Сommit
53547f4a8d

+ 380 - 0
scheduler/add_score_job.py

@@ -0,0 +1,380 @@
+import argparse
+import concurrent.futures
+import datetime
+import json
+import sys
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Tuple
+
+import requests
+
+# 支持直接以脚本方式运行:python scheduler/add_score_job.py
+_PROJECT_ROOT = Path(__file__).resolve().parent.parent
+if str(_PROJECT_ROOT) not in sys.path:
+    sys.path.insert(0, str(_PROJECT_ROOT))
+
+from utils.scheduler_logger import get_scheduler_logger
+from utils.sync_mysql_help import mysql
+
+
+logger = get_scheduler_logger()
+
+SCORE_API_URL = "http://47.236.83.130:8200/process_note"
+DEFAULT_CUTOFF_DT = "20260429"
+DEFAULT_TIMEOUT = 2400
+DEFAULT_WORKERS = 3
+DEFAULT_START_ID = 7150
+FETCH_BATCH_SIZE = 20
+EXISTING_SCORE_KEYS = {"分享意愿度", "消费意愿度", "点击意愿度"}
+
+
+def _safe_json_loads(text: str) -> Optional[Dict[str, Any]]:
+    if not text:
+        return None
+    try:
+        payload = json.loads(text)
+    except Exception:
+        return None
+    if not isinstance(payload, dict):
+        return None
+    return payload
+
+
+def _extract_words_section(content: Dict[str, Any], key: str) -> List[Dict[str, Any]]:
+    section = content.get(key)
+    if not isinstance(section, list):
+        return []
+    result: List[Dict[str, Any]] = []
+    for item in section:
+        if not isinstance(item, dict):
+            continue
+        words = item.get("分词结果")
+        if not isinstance(words, list):
+            continue
+        filtered_words: List[Dict[str, str]] = []
+        for w in words:
+            if not isinstance(w, dict):
+                continue
+            word = str(w.get("词") or "").strip()
+            desc = str(w.get("详细描述") or "").strip()
+            if not word:
+                continue
+            filtered_words.append({"词": word, "详细描述": desc})
+        if filtered_words:
+            result.append({"分词结果": filtered_words})
+    return result
+
+
+def _normalize_target_post(content: Dict[str, Any]) -> Dict[str, Any]:
+    target_post = content.get("target_post")
+    if not isinstance(target_post, dict):
+        target_post = {}
+    images = target_post.get("images")
+    if not isinstance(images, list):
+        images = []
+    note_id = (
+        str(target_post.get("note_id") or "").strip()
+        or str(target_post.get("channel_content_id") or "").strip()
+        or str(content.get("帖子ID") or "").strip()
+    )
+    return {
+        "note_id": note_id,
+        "images": [],
+        "body_text": str(target_post.get("body_text") or "").strip(),
+        "title": str(target_post.get("title") or "").strip(),
+        "video": images[0] if images else "",
+    }
+
+
+def build_score_payload(content: Dict[str, Any]) -> Dict[str, Any]:
+    return {
+        "灵感点": _extract_words_section(content, "灵感点"),
+        "目的点": _extract_words_section(content, "目的点"),
+        "关键点": _extract_words_section(content, "关键点"),
+        "target_post": _normalize_target_post(content),
+    }
+
+
+def _extract_contribution_results(resp_body: Dict[str, Any]) -> Optional[List[Dict[str, Any]]]:
+    direct = resp_body.get("contribution_results")
+    if isinstance(direct, list):
+        return direct
+    data = resp_body.get("data")
+    if isinstance(data, dict):
+        nested = data.get("contribution_results")
+        if isinstance(nested, list):
+            return nested
+    return None
+
+
+def request_score(payload: Dict[str, Any], timeout: int) -> List[Dict[str, Any]]:
+    response = requests.post(SCORE_API_URL, json=payload, timeout=timeout)
+    response.raise_for_status()
+    body = response.json()
+    if not isinstance(body, dict):
+        raise ValueError("score_api_response_not_dict")
+    contribution_results = _extract_contribution_results(body)
+    if contribution_results is None:
+        raise ValueError(f"missing_contribution_results: {body}")
+    return contribution_results
+
+
+def _has_existing_scores(content: Dict[str, Any]) -> bool:
+    contribution_results = content.get("contribution_results")
+    if not isinstance(contribution_results, list):
+        return False
+    for item in contribution_results:
+        if not isinstance(item, dict):
+            continue
+        if any(key in item for key in EXISTING_SCORE_KEYS):
+            return True
+    return False
+
+
+def _fetch_rows(
+    cutoff_dt: str, worker_idx: int, workers: int, last_id: int, limit: int
+) -> Tuple[Dict[str, Any], ...]:
+    sql = """
+        SELECT id, dt, vid, data_content
+        FROM aigc_topic_decode_task_result
+        WHERE dt < %s
+          AND id > %s
+          AND MOD(id, %s) = %s
+          AND id IS NOT NULL
+          AND data_content IS NOT NULL
+          AND data_content != ''
+        ORDER BY id
+        LIMIT %s
+    """
+    return mysql.fetchall(sql, (cutoff_dt, last_id, workers, worker_idx, limit))
+
+
+def _update_data_content(row_id: int, new_data_content: str) -> None:
+    sql = """
+        UPDATE aigc_topic_decode_task_result
+        SET data_content = %s, update_time = %s
+        WHERE id = %s
+    """
+    mysql.execute(sql, (new_data_content, datetime.datetime.now(), row_id))
+
+
+def _process_worker_rows(
+    cutoff_dt: str,
+    timeout: int,
+    dry_run: bool,
+    worker_idx: int,
+    workers: int,
+    start_id: int,
+) -> Dict[str, int]:
+    total = 0
+    updated = 0
+    skipped = 0
+    failed = 0
+    last_id = start_id
+
+    while True:
+        rows = _fetch_rows(
+            cutoff_dt=cutoff_dt,
+            worker_idx=worker_idx,
+            workers=workers,
+            last_id=last_id,
+            limit=FETCH_BATCH_SIZE,
+        )
+        if not rows:
+            break
+        last_id = int(rows[-1]["id"])
+        logger.info(
+            "add_score worker={} 读取批次 count={} last_id={}", worker_idx, len(rows), last_id
+        )
+
+        for row in rows:
+            total += 1
+            row_id_raw = row.get("id")
+            try:
+                row_id = int(row_id_raw)
+            except (TypeError, ValueError):
+                skipped += 1
+                logger.warning(
+                    "add_score worker={} 缺少合法id,跳过 total={} id={}",
+                    worker_idx,
+                    total,
+                    row_id_raw,
+                )
+                continue
+            dt = str(row.get("dt") or "").strip()
+            vid = str(row.get("vid") or "").strip()
+            raw_content = row.get("data_content")
+            if not dt or not vid or not isinstance(raw_content, str) or not raw_content.strip():
+                skipped += 1
+                logger.warning(
+                    "add_score worker={} 跳过非法行 total={} id={} dt={} vid={}",
+                    worker_idx,
+                    total,
+                    row_id,
+                    dt,
+                    vid,
+                )
+                continue
+            logger.info(
+                "add_score worker={} 处理记录 id={} dt={} vid={}", worker_idx, row_id, dt, vid
+            )
+
+            content = _safe_json_loads(raw_content)
+            if content is None:
+                skipped += 1
+                logger.warning(
+                    "add_score worker={} data_content非合法JSON,跳过 id={} dt={} vid={}",
+                    worker_idx,
+                    row_id,
+                    dt,
+                    vid,
+                )
+                continue
+            if _has_existing_scores(content):
+                skipped += 1
+                logger.info(
+                    "add_score worker={} 跳过记录,已存在目标打分字段 id={} dt={} vid={}",
+                    worker_idx,
+                    row_id,
+                    dt,
+                    vid,
+                )
+                continue
+
+            try:
+                score_payload = build_score_payload(content)
+                contribution_results = request_score(score_payload, timeout=timeout)
+            except Exception as exc:
+                failed += 1
+                logger.exception(
+                    "add_score worker={} 打分接口调用失败 id={} dt={} vid={} err={}",
+                    worker_idx,
+                    row_id,
+                    dt,
+                    vid,
+                    exc,
+                )
+                continue
+
+            content["contribution_results"] = contribution_results
+            new_data_content = json.dumps(content, ensure_ascii=False)
+            if dry_run:
+                updated += 1
+                logger.info(
+                    "add_score worker={} dry-run: 已生成新contribution_results id={} dt={} vid={} count={}",
+                    worker_idx,
+                    row_id,
+                    dt,
+                    vid,
+                    len(contribution_results),
+                )
+                continue
+
+            try:
+                _update_data_content(row_id, new_data_content)
+                updated += 1
+                logger.info(
+                    "add_score worker={} 更新成功 id={} dt={} vid={} contribution_count={}",
+                    worker_idx,
+                    row_id,
+                    dt,
+                    vid,
+                    len(contribution_results),
+                )
+            except Exception as exc:
+                failed += 1
+                logger.exception(
+                    "add_score worker={} 数据库回写失败 id={} dt={} vid={} err={}",
+                    worker_idx,
+                    row_id,
+                    dt,
+                    vid,
+                    exc,
+                )
+
+    logger.info(
+        "add_score worker={} 任务结束 cutoff_dt={} total={} updated={} skipped={} failed={} dry_run={}",
+        worker_idx,
+        cutoff_dt,
+        total,
+        updated,
+        skipped,
+        failed,
+        dry_run,
+    )
+    return {"total": total, "updated": updated, "skipped": skipped, "failed": failed}
+
+
+def run_add_score_job(
+    *, cutoff_dt: str, timeout: int, dry_run: bool, workers: int, start_id: int
+) -> None:
+    total = 0
+    updated = 0
+    skipped = 0
+    failed = 0
+
+    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
+        futures = [
+            executor.submit(
+                _process_worker_rows,
+                cutoff_dt,
+                timeout,
+                dry_run,
+                worker_idx,
+                workers,
+                start_id,
+            )
+            for worker_idx in range(workers)
+        ]
+        for future in concurrent.futures.as_completed(futures):
+            worker_result = future.result()
+            total += worker_result["total"]
+            updated += worker_result["updated"]
+            skipped += worker_result["skipped"]
+            failed += worker_result["failed"]
+
+    logger.info(
+        "add_score 并行任务结束 cutoff_dt={} workers={} total={} updated={} skipped={} failed={} dry_run={}",
+        cutoff_dt,
+        workers,
+        total,
+        updated,
+        skipped,
+        failed,
+        dry_run,
+    )
+
+
+def _parse_args() -> argparse.Namespace:
+    parser = argparse.ArgumentParser(
+        description="为 aigc_topic_decode_task_result 历史记录补充 contribution_results 分数"
+    )
+    parser.add_argument("--cutoff-dt", default=DEFAULT_CUTOFF_DT, help="仅处理 dt < cutoff_dt 的数据")
+    parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="打分接口超时(秒)")
+    parser.add_argument("--dry-run", action="store_true", help="只调用接口并打印日志,不写回数据库")
+    parser.add_argument("--workers", type=int, default=DEFAULT_WORKERS, help="并发worker数量")
+    parser.add_argument("--start-id", type=int, default=DEFAULT_START_ID, help="仅处理 id > start_id 的数据")
+    return parser.parse_args()
+
+
+def main() -> None:
+    args = _parse_args()
+    logger.info(
+        "add_score 任务启动 cutoff_dt={} timeout={} dry_run={} workers={} start_id={}",
+        args.cutoff_dt,
+        args.timeout,
+        args.dry_run,
+        args.workers,
+        args.start_id,
+    )
+    run_add_score_job(
+        cutoff_dt=str(args.cutoff_dt),
+        timeout=max(1, int(args.timeout)),
+        dry_run=bool(args.dry_run),
+        workers=max(1, int(args.workers)),
+        start_id=max(0, int(args.start_id)),
+    )
+
+
+if __name__ == "__main__":
+    main()

+ 97 - 0
scheduler/change_history_status.py

@@ -0,0 +1,97 @@
+import sys
+from pathlib import Path
+from typing import Dict, List
+
+# 支持直接以脚本方式运行:python scheduler/change_history_status.py
+_PROJECT_ROOT = Path(__file__).resolve().parent.parent
+if str(_PROJECT_ROOT) not in sys.path:
+    sys.path.insert(0, str(_PROJECT_ROOT))
+
+from scheduler.decode_dispatch_job import (
+    RESULT_POLL_CHUNK,
+    _apply_result_row_to_db,
+    _submit_decode_result_chunk,
+)
+from utils.scheduler_logger import get_scheduler_logger
+from utils.sync_mysql_help import mysql
+
+
+logger = get_scheduler_logger()
+
+
+def _fetch_pending_vids_by_dt() -> Dict[str, List[str]]:
+    sql = """
+        SELECT DISTINCT dt, vid
+        FROM aigc_topic_decode_task_result
+        WHERE status IN (0, 1)
+          AND dt < DATE_FORMAT(CURDATE(), '%Y%m%d')
+          AND vid IS NOT NULL
+          AND vid != ''
+        ORDER BY dt, vid
+    """
+    rows = mysql.fetchall(sql)
+    grouped: Dict[str, List[str]] = {}
+    for row in rows:
+        dt = str(row.get("dt") or "").strip()
+        vid = str(row.get("vid") or "").strip()
+        if not dt or not vid:
+            continue
+        grouped.setdefault(dt, []).append(vid)
+    return grouped
+
+
+def _refresh_result_for_dt(dt: str, vids: List[str]) -> None:
+    if not vids:
+        return
+    logger.info("开始回填历史解码结果 dt={} 待查询数量={}", dt, len(vids))
+    for i in range(0, len(vids), RESULT_POLL_CHUNK):
+        chunk = vids[i : i + RESULT_POLL_CHUNK]
+        ok, msg, body = _submit_decode_result_chunk(chunk)
+        if not ok:
+            logger.error(
+                "历史解码结果查询失败 dt={} msg={} body={}",
+                dt,
+                msg,
+                body,
+            )
+            continue
+        data_list = body.get("data")
+        if not isinstance(data_list, list):
+            logger.warning("历史解码结果返回data非列表 dt={} body={}", dt, body)
+            continue
+        returned_ids = {str(x.get("channelContentId") or "").strip() for x in data_list}
+        missing = set(chunk) - returned_ids
+        if missing:
+            logger.warning(
+                "历史解码结果返回缺少{}个vid dt={} 示例={}",
+                len(missing),
+                dt,
+                list(missing)[:5],
+            )
+        for item in data_list:
+            if not isinstance(item, dict):
+                continue
+            _apply_result_row_to_db(dt, item)
+        logger.info(
+            "历史解码结果分片处理完成 dt={} 分片大小={} 返回数量={}",
+            dt,
+            len(chunk),
+            len(data_list),
+        )
+
+
+def run_change_history_status_job() -> None:
+    logger.info("历史状态回填任务开始")
+    try:
+        vids_by_dt = _fetch_pending_vids_by_dt()
+        total_rows = sum(len(v) for v in vids_by_dt.values())
+        logger.info("历史状态回填待处理总量={} 涉及dt数={}", total_rows, len(vids_by_dt))
+        for dt, vids in vids_by_dt.items():
+            _refresh_result_for_dt(dt, vids)
+        logger.info("历史状态回填任务结束")
+    except Exception as exc:
+        logger.exception("历史状态回填任务异常: {}", exc)
+
+
+if __name__ == "__main__":
+    run_change_history_status_job()

+ 91 - 8
scheduler/decode_dispatch_job.py

@@ -54,7 +54,7 @@ def _today_dt() -> str:
 
 def _is_allowed_level(level_value: Any) -> bool:
     try:
-        return int(level_value) in (0, 1, 2)
+        return int(level_value) in (0, 1, 2, 3)
     except (TypeError, ValueError):
         return False
 
@@ -93,6 +93,27 @@ def _fetch_today_pending_vids(dt: str) -> List[str]:
     return [str(row["vid"]) for row in rows if row.get("vid")]
 
 
+def _fetch_history_pending_vids_by_dt(today_dt: str) -> Dict[str, List[str]]:
+    sql = """
+        SELECT DISTINCT dt, vid
+        FROM aigc_topic_decode_task_result
+        WHERE status IN (0, 1)
+          AND dt < %s
+          AND vid IS NOT NULL
+          AND vid != ''
+        ORDER BY dt, vid
+    """
+    rows = mysql.fetchall(sql, (today_dt,))
+    grouped: Dict[str, List[str]] = {}
+    for row in rows:
+        dt = str(row.get("dt") or "").strip()
+        vid = str(row.get("vid") or "").strip()
+        if not dt or not vid:
+            continue
+        grouped.setdefault(dt, []).append(vid)
+    return grouped
+
+
 def _count_today_non_terminal(dt: str) -> int:
     sql = """
         SELECT COUNT(1) AS total
@@ -103,6 +124,27 @@ def _count_today_non_terminal(dt: str) -> int:
     return int((result or {}).get("total", 0))
 
 
+def _count_today_total(dt: str) -> int:
+    sql = """
+        SELECT COUNT(1) AS total
+        FROM aigc_topic_decode_task_result
+        WHERE dt = %s
+    """
+    result = mysql.fetchone(sql, (dt,))
+    return int((result or {}).get("total", 0))
+
+
+def _fetch_decode_daily_limit() -> int:
+    sql = """
+        SELECT `max` AS daily_limit
+        FROM aigc_topic_decode_task_oprate
+        ORDER BY id DESC
+        LIMIT 1
+    """
+    row = mysql.fetchone(sql)
+    return int((row or {}).get("daily_limit") or 0)
+
+
 def _submit_decode_result_chunk(
     channel_content_ids: List[str],
 ) -> Tuple[bool, str, Dict[str, Any]]:
@@ -226,6 +268,27 @@ def _poll_decode_results_for_today(dt: str, vids: List[str]) -> None:
     )
 
 
+def _poll_decode_results_for_history(today_dt: str) -> None:
+    vids_by_dt = _fetch_history_pending_vids_by_dt(today_dt)
+    if not vids_by_dt:
+        return
+    total = sum(len(v) for v in vids_by_dt.values())
+    logger.info(
+        "开始查询历史未完成解码结果 today_dt={} 涉及dt数={} 总vid数={}",
+        today_dt,
+        len(vids_by_dt),
+        total,
+    )
+    for dt, vids in vids_by_dt.items():
+        _poll_decode_results_for_today(dt, vids)
+    logger.info(
+        "历史未完成解码结果查询结束 today_dt={} 涉及dt数={} 总vid数={}",
+        today_dt,
+        len(vids_by_dt),
+        total,
+    )
+
+
 def _build_posts_payload(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
     posts: List[Dict[str, Any]] = []
     for item in records:
@@ -235,14 +298,16 @@ def _build_posts_payload(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
             extend_obj = extend_raw
         else:
             extend_obj = _safe_json_loads(str(extend_raw)) if extend_raw is not None else {}
-        cover_url = extend_obj.get("cover_url") or ""
-        images = [cover_url] if cover_url else []
+        # cover_url = extend_obj.get("cover_url") or ""
+        # images = [cover_url] if cover_url else []
         posts.append(
             {
                 "channelContentId": item.get("vid") or "",
                 "title": item.get("title") or "",
                 "video": item.get("url") or "",
-                "images": images,
+                "images": [],
+                "mergeLeve1": item.get("cate1") or "",
+                "mergeLeve2": item.get("cate2") or "",
                 "contentModal": 4,
                 "channel": 10,
             }
@@ -395,6 +460,7 @@ def run_decode_dispatch_job() -> None:
     logger.info("解码调度任务开始执行")
     try:
         dt = _today_dt()
+        _poll_decode_results_for_history(dt)
 
         before_poll_non_terminal = _count_today_non_terminal(dt)
         if before_poll_non_terminal > 0:
@@ -428,8 +494,8 @@ def run_decode_dispatch_job() -> None:
             logger.info("解码调度任务结束(开关关闭:不发起新任务)")
             return
 
-        need = max(0, BATCH_SIZE - after_poll_non_terminal)
-        if need == 0:
+        window_need = max(0, BATCH_SIZE - after_poll_non_terminal)
+        if window_need == 0:
             logger.info(
                 "解构中已满{}条,本轮无需补充 dt={} non_terminal={}",
                 BATCH_SIZE,
@@ -439,11 +505,29 @@ def run_decode_dispatch_job() -> None:
             logger.info("解码调度任务结束(窗口已满)")
             return
 
+        daily_limit = _fetch_decode_daily_limit()
+        today_total = _count_today_total(dt)
+        daily_remaining = max(0, daily_limit - today_total)
+        if daily_remaining == 0:
+            logger.info(
+                "当日解构已达到上限,不再发起新任务 dt={} daily_limit={} today_total={}",
+                dt,
+                daily_limit,
+                today_total,
+            )
+            logger.info("解码调度任务结束(达到当日上限)")
+            return
+
+        need = min(window_need, daily_remaining)
         logger.info(
-            "动态窗口补充 dt={} 当前解构中={} 目标={} 本次补充={}",
+            "动态窗口补充 dt={} 当前解构中={} 目标={} 窗口缺口={} 日上限={} 当日已发起={} 当日剩余额度={} 本次补充={}",
             dt,
             after_poll_non_terminal,
             BATCH_SIZE,
+            window_need,
+            daily_limit,
+            today_total,
+            daily_remaining,
             need,
         )
         records = _pick_candidate_records(dt=dt, batch_size=need)
@@ -451,7 +535,6 @@ def run_decode_dispatch_job() -> None:
             logger.info("无可发起的新批次候选数据 dt={} need={}", dt, need)
             logger.info("解码调度任务结束(无新增任务)")
             return
-        logger.info("解码提交接口执行开始 records={}", records)
         posts = _build_posts_payload(records)
         logger.info("解码提交接口执行开始 posts={}", posts)
         ok, err_msg, body = _submit_decode(posts)

+ 19 - 3
scheduler/decode_hourly_stats_job.py

@@ -17,7 +17,6 @@ if str(_PROJECT_ROOT) not in sys.path:
 
 from utils.scheduler_logger import get_scheduler_logger
 from utils.sync_mysql_help import mysql
-from scheduler.odps_fetch import count_priority_posts
 
 
 logger = get_scheduler_logger()
@@ -78,7 +77,7 @@ def _fetch_today_stats(now: datetime) -> Dict[str, int]:
         ),
     )
     executed_total = int((row or {}).get("executed_count") or 0)
-    demand_total = int(count_priority_posts(now.strftime("%Y%m%d")) or 0)
+    demand_total = _fetch_decode_daily_limit()
     pending_total = max(demand_total - executed_total, 0)
     return {
         "executed_total": executed_total,
@@ -86,6 +85,17 @@ def _fetch_today_stats(now: datetime) -> Dict[str, int]:
     }
 
 
+def _fetch_decode_daily_limit() -> int:
+    sql = """
+        SELECT `max` AS daily_limit
+        FROM aigc_topic_decode_task_oprate
+        ORDER BY id DESC
+        LIMIT 1
+    """
+    row = mysql.fetchone(sql)
+    return int((row or {}).get("daily_limit") or 0)
+
+
 def _has_overdue_pending_task(now: datetime) -> bool:
     overdue_before = now - timedelta(minutes=30)
     sql = """
@@ -130,7 +140,13 @@ def _build_feishu_card(
     )
     level_tag = "异常告警" if is_alert else "运行正常"
     template = "red" if is_alert else "blue"
-    status_line = "⚠️ 当前失败率已超过阈值,请尽快排查" if is_alert else "✅ 当前任务运行正常"
+    if is_alert:
+        if fail_rate > _ALERT_FAIL_RATE:
+            status_line = "⚠️ 当前失败率已超过阈值,请尽快排查"
+        else:
+            status_line = "⚠️ 当前成功率偏低且存在超时任务,请尽快排查"
+    else:
+        status_line = "✅ 当前任务运行正常"
 
     markdown_lines = [
         status_line,

+ 2 - 2
scheduler/odps_fetch.py

@@ -46,7 +46,7 @@ def fetch_priority_posts(limit: int = 10, offset: int = 0, dt: Optional[str] = N
             level, reason, count, extend, dt
         FROM {ODPS_TABLE}
         WHERE dt = '{target_dt}'
-          AND level IN (0, 1, 2)
+          AND level IN (0, 1, 2, 3)
         ORDER BY count DESC, level ASC
         LIMIT {offset}, {limit}
     """
@@ -89,7 +89,7 @@ def count_priority_posts(dt: Optional[str] = None) -> int:
         SELECT COUNT(1) AS total_count
         FROM {ODPS_TABLE}
         WHERE dt = '{target_dt}'
-          AND level IN (0, 1, 2)
+          AND level IN (0, 1, 2, 3)
     """
     logger.info("开始执行ODPS计数 dt={}", target_dt)
     odps = _build_odps_client()