odps_fetch.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import os
  2. import json
  3. import sys
  4. from pathlib import Path
  5. from datetime import datetime
  6. from typing import Any, Dict, List, Optional
  7. from zoneinfo import ZoneInfo
  8. from odps import ODPS
  9. from dotenv import find_dotenv, load_dotenv
  10. # 支持直接以脚本方式运行:python scheduler/odps_fetch.py
  11. PROJECT_ROOT = Path(__file__).resolve().parent.parent
  12. if str(PROJECT_ROOT) not in sys.path:
  13. sys.path.insert(0, str(PROJECT_ROOT))
  14. from utils.scheduler_logger import get_scheduler_logger
  15. logger = get_scheduler_logger()
  16. load_dotenv(find_dotenv(), override=False)
  17. # 固定项目与 endpoint(按需求不走环境变量)
  18. ODPS_PROJECT = "loghubods"
  19. ODPS_ENDPOINT = "http://service.cn-hangzhou.maxcompute.aliyun.com/api"
  20. ODPS_TABLE = "dwd_topic_decode_input_vids_di"
  21. def _today_dt() -> str:
  22. return datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y%m%d")
  23. def _build_odps_client() -> ODPS:
  24. ak = os.getenv("ODPS_ACCESS_KEY_ID", "LTAI5t9b7RnUiCy5v3gqXf9Y")
  25. sk = os.getenv("ODPS_ACCESS_KEY_SECRET", "1HVHQe0AV7xaoOWHsM8k9XN3gEaal7")
  26. if not ak or not sk:
  27. raise ValueError("missing ODPS_ACCESS_KEY_ID or ODPS_ACCESS_KEY_SECRET")
  28. return ODPS(ak, sk, ODPS_PROJECT, endpoint=ODPS_ENDPOINT)
  29. def fetch_priority_posts(limit: int = 10, offset: int = 0, dt: Optional[str] = None) -> List[Dict[str, Any]]:
  30. target_dt = dt or _today_dt()
  31. sql = f"""
  32. SELECT
  33. type_id, type, channel, vid, cate1, cate2, title, url,
  34. level, reason, count, extend, dt
  35. FROM {ODPS_TABLE}
  36. WHERE dt = '{target_dt}'
  37. AND level IN (0, 1, 2)
  38. ORDER BY count DESC, level ASC
  39. LIMIT {offset}, {limit}
  40. """
  41. logger.info("开始执行ODPS查询 dt={} limit={} offset={}", target_dt, limit, offset)
  42. odps = _build_odps_client()
  43. instance = odps.execute_sql(sql)
  44. records: List[Dict[str, Any]] = []
  45. with instance.open_reader() as reader:
  46. for row in reader:
  47. records.append(
  48. {
  49. "type_id": row[0],
  50. "type": row[1],
  51. "channel": row[2],
  52. "vid": row[3],
  53. "cate1": row[4],
  54. "cate2": row[5],
  55. "title": row[6],
  56. "url": row[7],
  57. "level": row[8],
  58. "reason": row[9],
  59. "count": row[10],
  60. "extend": row[11],
  61. "dt": row[12] if len(row) > 12 else target_dt,
  62. }
  63. )
  64. logger.info("ODPS查询完成 dt={} 返回数量={}", target_dt, len(records))
  65. return records
  66. def fetch_top_priority_posts(limit: int = 10) -> List[Dict[str, Any]]:
  67. return fetch_priority_posts(limit=limit, offset=0)
  68. if __name__ == "__main__":
  69. try:
  70. test_limit = int(os.getenv("ODPS_TEST_LIMIT", "10"))
  71. data = fetch_top_priority_posts(limit=test_limit)
  72. print(f"fetch_count={len(data)}")
  73. preview = data
  74. print(json.dumps(preview, ensure_ascii=False, indent=2))
  75. except Exception as exc:
  76. logger.exception("ODPS查询测试失败: {}", exc)
  77. raise