""" ODPS(MaxCompute)数据拉取模块 """ import logging from typing import Optional, List, Dict from datetime import datetime, timedelta logger = logging.getLogger(__name__) class ODPSClient: """ODPS客户端封装""" def __init__(self, project="loghubods"): """初始化ODPS客户端 Args: project: ODPS项目名称 """ self.accessId = "LTAIWYUujJAm7CbH" self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P" self.endpoint = "http://service.odps.aliyun.com/api" self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com" self.project = project self._odps = None self._initialized = False def _ensure_initialized(self): """确保ODPS客户端已初始化""" if self._initialized: return True try: from odps import ODPS self._odps = ODPS( access_id=self.accessId, secret_access_key=self.accessSecret, project=self.project, endpoint=self.endpoint, tunnel_endpoint=self.tunnelUrl ) self._initialized = True logger.info(f"ODPS客户端初始化成功,项目: {self.project}") return True except ImportError: logger.error("ODPS SDK 未安装,请运行: pip install pyodps") return False except Exception as e: logger.error(f"ODPS客户端初始化失败: {e}") return False def query(self, sql: str, limit: Optional[int] = None) -> List[Dict]: """执行SQL查询 Args: sql: SQL查询语句 limit: 最大返回行数 Returns: 查询结果列表(每行为字典) """ if not self._ensure_initialized(): logger.warning("ODPS客户端未初始化,无法执行查询") return [] try: logger.info(f"执行ODPS查询: {sql[:100]}...") instance = self._odps.execute_sql(sql) instance.wait_for_success() with instance.open_reader() as reader: results = [] for i, record in enumerate(reader): if limit and i >= limit: break # 将Record对象转换为字典 row_dict = {col: record[i] for i, col in enumerate(reader._schema.names)} results.append(row_dict) logger.info(f"查询完成,返回 {len(results)} 行") return results except Exception as e: logger.error(f"ODPS查询失败: {e}") return [] def fetch_creative_data( self, start_date: str, end_date: str, account_ids: Optional[List[int]] = None ) -> List[Dict]: """从ODPS拉取创意数据 Args: start_date: 开始日期 (YYYYMMDD) end_date: 结束日期 (YYYYMMDD) account_ids: 账户ID列表,为None时拉取所有账户 Returns: 创意数据列表 """ # 构建WHERE条件 where_clauses = [f"dt BETWEEN '{start_date}' AND '{end_date}'"] if account_ids: account_ids_str = ",".join(map(str, account_ids)) where_clauses.append(f"account_id IN ({account_ids_str})") where_clause = " AND ".join(where_clauses) # SQL查询(根据实际表结构调整) sql = f""" SELECT dt, account_id, adgroup_id AS ad_id, adgroup_name AS ad_name, bid_amount, configured_status, system_status, cost, view_count, valid_click_count, conversions_count, cost_per_conversion, thousand_display_price, ctr, create_time FROM tencent_ad_creative_daily WHERE {where_clause} ORDER BY dt, account_id, ad_id """ return self.query(sql) def test_connection(self) -> bool: """测试ODPS连接 Returns: 连接是否成功 """ if not self._ensure_initialized(): return False try: # 执行简单查询测试连接 sql = "SELECT 1 AS test" result = self.query(sql, limit=1) return len(result) > 0 except Exception as e: logger.error(f"ODPS连接测试失败: {e}") return False # 全局ODPS客户端实例 _odps_client: Optional[ODPSClient] = None def get_odps_client(project="loghubods") -> Optional[ODPSClient]: """获取全局ODPS客户端实例 Args: project: ODPS项目名称 Returns: ODPS客户端实例,初始化失败时返回None """ global _odps_client if _odps_client is None: _odps_client = ODPSClient(project=project) if _odps_client._ensure_initialized(): return _odps_client else: return None