import os import json import sys from pathlib import Path from datetime import datetime from typing import Any, Dict, List, Optional from zoneinfo import ZoneInfo from odps import ODPS from dotenv import find_dotenv, load_dotenv # 支持直接以脚本方式运行:python scheduler/odps_fetch.py PROJECT_ROOT = Path(__file__).resolve().parent.parent if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from utils.scheduler_logger import get_scheduler_logger logger = get_scheduler_logger() load_dotenv(find_dotenv(), override=False) # 固定项目与 endpoint(按需求不走环境变量) ODPS_PROJECT = "loghubods" ODPS_ENDPOINT = "http://service.cn-hangzhou.maxcompute.aliyun.com/api" ODPS_TABLE = "dwd_topic_decode_input_vids_di" def _today_dt() -> str: return datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d") def _build_odps_client() -> ODPS: ak = os.getenv("ODPS_ACCESS_KEY_ID", "LTAI5t9b7RnUiCy5v3gqXf9Y") sk = os.getenv("ODPS_ACCESS_KEY_SECRET", "1HVHQe0AV7xaoOWHsM8k9XN3gEaal7") if not ak or not sk: raise ValueError("missing ODPS_ACCESS_KEY_ID or ODPS_ACCESS_KEY_SECRET") return ODPS(ak, sk, ODPS_PROJECT, endpoint=ODPS_ENDPOINT) def fetch_priority_posts(limit: int = 10, offset: int = 0, dt: Optional[str] = None) -> List[Dict[str, Any]]: target_dt = dt or _today_dt() sql = f""" SELECT type_id, type, channel, vid, cate1, cate2, title, url, level, reason, count, extend, dt FROM {ODPS_TABLE} WHERE dt = '{target_dt}' AND level IN (0, 1, 2, 3) ORDER BY count DESC, level ASC LIMIT {offset}, {limit} """ logger.info("开始执行ODPS查询 dt={} limit={} offset={}", target_dt, limit, offset) odps = _build_odps_client() instance = odps.execute_sql(sql) records: List[Dict[str, Any]] = [] with instance.open_reader() as reader: for row in reader: records.append( { "type_id": row[0], "type": row[1], "channel": row[2], "vid": row[3], "cate1": row[4], "cate2": row[5], "title": row[6], "url": row[7], "level": row[8], "reason": row[9], "count": row[10], "extend": row[11], "dt": row[12] if len(row) > 12 else target_dt, } ) logger.info("ODPS查询完成 dt={} 返回数量={}", target_dt, len(records)) return records def fetch_top_priority_posts(limit: int = 10) -> List[Dict[str, Any]]: return fetch_priority_posts(limit=limit, offset=0) def count_priority_posts(dt: Optional[str] = None) -> int: target_dt = dt or _today_dt() sql = f""" SELECT COUNT(1) AS total_count FROM {ODPS_TABLE} WHERE dt = '{target_dt}' AND level IN (0, 1, 2, 3) """ logger.info("开始执行ODPS计数 dt={}", target_dt) odps = _build_odps_client() instance = odps.execute_sql(sql) with instance.open_reader() as reader: for row in reader: return int(row[0] or 0) return 0 if __name__ == "__main__": try: test_limit = int(os.getenv("ODPS_TEST_LIMIT", "10")) data = fetch_top_priority_posts(limit=test_limit) print(f"fetch_count={len(data)}") preview = data print(json.dumps(preview, ensure_ascii=False, indent=2)) except Exception as exc: logger.exception("ODPS查询测试失败: {}", exc) raise