data_query_tools.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. from odps import ODPS
  2. from odps.errors import ODPSError
  3. from datetime import date, timedelta
  4. from agent import tool
  5. def get_odps_data(sql):
  6. # 配置信息
  7. access_id = 'LTAI9EBa0bd5PrDa'
  8. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'
  9. project = 'loghubods'
  10. endpoint = 'http://service.odps.aliyun.com/api'
  11. # 1. 初始化 ODPS 入口
  12. o = ODPS(access_id, access_key, project, endpoint=endpoint)
  13. try:
  14. # 2. 执行 SQL 并获取结果
  15. # execute_sql 会等待任务完成,使用 open_reader 读取数据
  16. with o.execute_sql(sql).open_reader() as reader:
  17. # reader 类似于 Java 中的 List<Record>
  18. # 我们可以直接将其转换为 Python 的 list
  19. records = [record for record in reader]
  20. return records
  21. except ODPSError as e:
  22. print(f"ODPS 错误: {e}")
  23. return None
  24. def get_rov_by_merge_leve2_and_video_ids(merge_leve2, video_ids):
  25. merge_level_in_clause = f"'{merge_leve2}'"
  26. video_ids_in_clause = ", ".join([f"'{video_id}'" for video_id in video_ids])
  27. end_date = (date.today() - timedelta(days=1)).strftime("%Y%m%d")
  28. start_date = (date.today() - timedelta(days=14)).strftime("%Y%m%d")
  29. sql_query = f'''
  30. SELECT
  31. v.videoid,
  32. CASE
  33. WHEN COALESCE(SUM(COALESCE(t3.`当日分发曝光pv`, 0)), 0) < 1000 THEN 0
  34. ELSE COALESCE(AVG(NULLIF(t3.rov_t0, 0)), 0)
  35. END AS avg_rov_t0
  36. FROM
  37. (
  38. SELECT
  39. t2.videoid,
  40. t2.merge_leve2
  41. FROM videoods.content_profile t1
  42. JOIN loghubods.video_merge_tag t2
  43. ON t1.content_id = t2.videoid
  44. WHERE
  45. t1.status = 3
  46. AND t1.is_deleted = 0
  47. AND t2.merge_leve2 IN ({merge_level_in_clause})
  48. ) v
  49. LEFT JOIN loghubods.video_dimension_detail_add_column t3
  50. ON v.videoid = t3.视频id
  51. AND t3.dt >= '{start_date}'
  52. AND t3.dt <= '{end_date}'
  53. WHERE v.videoid in ({video_ids_in_clause})
  54. GROUP BY
  55. v.videoid
  56. ;
  57. '''
  58. data = get_odps_data(sql_query)
  59. result_dict = {}
  60. if data:
  61. result_dict = {r[0]: r[1] for r in data}
  62. return result_dict
  63. if __name__ == '__main__':
  64. videos = ["64429933"]
  65. print(get_rov_by_merge_leve2_and_video_ids('历史名人', videos))