| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- import os
- import json
- import sys
- from pathlib import Path
- from datetime import datetime
- from typing import Any, Dict, List, Optional
- from zoneinfo import ZoneInfo
- from odps import ODPS
- from dotenv import find_dotenv, load_dotenv
- # 支持直接以脚本方式运行:python scheduler/odps_fetch.py
- PROJECT_ROOT = Path(__file__).resolve().parent.parent
- if str(PROJECT_ROOT) not in sys.path:
- sys.path.insert(0, str(PROJECT_ROOT))
- from utils.scheduler_logger import get_scheduler_logger
- logger = get_scheduler_logger()
- load_dotenv(find_dotenv(), override=False)
- # 固定项目与 endpoint(按需求不走环境变量)
- ODPS_PROJECT = "loghubods"
- ODPS_ENDPOINT = "http://service.cn-hangzhou.maxcompute.aliyun.com/api"
- ODPS_TABLE = "dwd_topic_decode_input_vids_di"
- def _today_dt() -> str:
- return datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
- def _build_odps_client() -> ODPS:
- ak = os.getenv("ODPS_ACCESS_KEY_ID", "LTAI5t9b7RnUiCy5v3gqXf9Y")
- sk = os.getenv("ODPS_ACCESS_KEY_SECRET", "1HVHQe0AV7xaoOWHsM8k9XN3gEaal7")
- if not ak or not sk:
- raise ValueError("missing ODPS_ACCESS_KEY_ID or ODPS_ACCESS_KEY_SECRET")
- return ODPS(ak, sk, ODPS_PROJECT, endpoint=ODPS_ENDPOINT)
- def fetch_priority_posts(limit: int = 10, offset: int = 0, dt: Optional[str] = None) -> List[Dict[str, Any]]:
- target_dt = dt or _today_dt()
- sql = f"""
- SELECT
- type_id, type, channel, vid, cate1, cate2, title, url,
- level, reason, count, extend, dt
- FROM {ODPS_TABLE}
- WHERE dt = '{target_dt}'
- AND level IN (0, 1, 2)
- ORDER BY count DESC, level ASC
- LIMIT {offset}, {limit}
- """
- logger.info("开始执行ODPS查询 dt={} limit={} offset={}", target_dt, limit, offset)
- odps = _build_odps_client()
- instance = odps.execute_sql(sql)
- records: List[Dict[str, Any]] = []
- with instance.open_reader() as reader:
- for row in reader:
- records.append(
- {
- "type_id": row[0],
- "type": row[1],
- "channel": row[2],
- "vid": row[3],
- "cate1": row[4],
- "cate2": row[5],
- "title": row[6],
- "url": row[7],
- "level": row[8],
- "reason": row[9],
- "count": row[10],
- "extend": row[11],
- "dt": row[12] if len(row) > 12 else target_dt,
- }
- )
- logger.info("ODPS查询完成 dt={} 返回数量={}", target_dt, len(records))
- return records
- def fetch_top_priority_posts(limit: int = 10) -> List[Dict[str, Any]]:
- return fetch_priority_posts(limit=limit, offset=0)
- if __name__ == "__main__":
- try:
- test_limit = int(os.getenv("ODPS_TEST_LIMIT", "10"))
- data = fetch_top_priority_posts(limit=test_limit)
- print(f"fetch_count={len(data)}")
- preview = data
- print(json.dumps(preview, ensure_ascii=False, indent=2))
- except Exception as exc:
- logger.exception("ODPS查询测试失败: {}", exc)
- raise
|