odps_module.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. """
  2. ODPS(MaxCompute)数据拉取模块
  3. """
  4. import logging
  5. from typing import Optional, List, Dict
  6. from datetime import datetime, timedelta
  7. logger = logging.getLogger(__name__)
  8. class ODPSClient:
  9. """ODPS客户端封装"""
  10. def __init__(self, project="loghubods"):
  11. """初始化ODPS客户端
  12. Args:
  13. project: ODPS项目名称
  14. """
  15. self.accessId = "LTAIWYUujJAm7CbH"
  16. self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
  17. self.endpoint = "http://service.odps.aliyun.com/api"
  18. self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
  19. self.project = project
  20. self._odps = None
  21. self._initialized = False
  22. def _ensure_initialized(self):
  23. """确保ODPS客户端已初始化"""
  24. if self._initialized:
  25. return True
  26. try:
  27. from odps import ODPS
  28. self._odps = ODPS(
  29. access_id=self.accessId,
  30. secret_access_key=self.accessSecret,
  31. project=self.project,
  32. endpoint=self.endpoint,
  33. tunnel_endpoint=self.tunnelUrl
  34. )
  35. self._initialized = True
  36. logger.info(f"ODPS客户端初始化成功,项目: {self.project}")
  37. return True
  38. except ImportError:
  39. logger.error("ODPS SDK 未安装,请运行: pip install pyodps")
  40. return False
  41. except Exception as e:
  42. logger.error(f"ODPS客户端初始化失败: {e}")
  43. return False
  44. def query(self, sql: str, limit: Optional[int] = None) -> List[Dict]:
  45. """执行SQL查询
  46. Args:
  47. sql: SQL查询语句
  48. limit: 最大返回行数
  49. Returns:
  50. 查询结果列表(每行为字典)
  51. """
  52. if not self._ensure_initialized():
  53. logger.warning("ODPS客户端未初始化,无法执行查询")
  54. return []
  55. try:
  56. logger.info(f"执行ODPS查询: {sql[:100]}...")
  57. instance = self._odps.execute_sql(sql)
  58. instance.wait_for_success()
  59. with instance.open_reader() as reader:
  60. results = []
  61. for i, record in enumerate(reader):
  62. if limit and i >= limit:
  63. break
  64. # 将Record对象转换为字典
  65. row_dict = {col: record[i] for i, col in enumerate(reader._schema.names)}
  66. results.append(row_dict)
  67. logger.info(f"查询完成,返回 {len(results)} 行")
  68. return results
  69. except Exception as e:
  70. logger.error(f"ODPS查询失败: {e}")
  71. return []
  72. def fetch_creative_data(
  73. self,
  74. start_date: str,
  75. end_date: str,
  76. account_ids: Optional[List[int]] = None
  77. ) -> List[Dict]:
  78. """从ODPS拉取创意数据
  79. Args:
  80. start_date: 开始日期 (YYYYMMDD)
  81. end_date: 结束日期 (YYYYMMDD)
  82. account_ids: 账户ID列表,为None时拉取所有账户
  83. Returns:
  84. 创意数据列表
  85. """
  86. # 构建WHERE条件
  87. where_clauses = [f"dt BETWEEN '{start_date}' AND '{end_date}'"]
  88. if account_ids:
  89. account_ids_str = ",".join(map(str, account_ids))
  90. where_clauses.append(f"account_id IN ({account_ids_str})")
  91. where_clause = " AND ".join(where_clauses)
  92. # SQL查询(根据实际表结构调整)
  93. sql = f"""
  94. SELECT
  95. dt,
  96. account_id,
  97. adgroup_id AS ad_id,
  98. adgroup_name AS ad_name,
  99. bid_amount,
  100. configured_status,
  101. system_status,
  102. cost,
  103. view_count,
  104. valid_click_count,
  105. conversions_count,
  106. cost_per_conversion,
  107. thousand_display_price,
  108. ctr,
  109. create_time
  110. FROM tencent_ad_creative_daily
  111. WHERE {where_clause}
  112. ORDER BY dt, account_id, ad_id
  113. """
  114. return self.query(sql)
  115. def test_connection(self) -> bool:
  116. """测试ODPS连接
  117. Returns:
  118. 连接是否成功
  119. """
  120. if not self._ensure_initialized():
  121. return False
  122. try:
  123. # 执行简单查询测试连接
  124. sql = "SELECT 1 AS test"
  125. result = self.query(sql, limit=1)
  126. return len(result) > 0
  127. except Exception as e:
  128. logger.error(f"ODPS连接测试失败: {e}")
  129. return False
  130. # 全局ODPS客户端实例
  131. _odps_client: Optional[ODPSClient] = None
  132. def get_odps_client(project="loghubods") -> Optional[ODPSClient]:
  133. """获取全局ODPS客户端实例
  134. Args:
  135. project: ODPS项目名称
  136. Returns:
  137. ODPS客户端实例,初始化失败时返回None
  138. """
  139. global _odps_client
  140. if _odps_client is None:
  141. _odps_client = ODPSClient(project=project)
  142. if _odps_client._ensure_initialized():
  143. return _odps_client
  144. else:
  145. return None