| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- """
- ODPS(MaxCompute)数据拉取模块
- """
- import logging
- from typing import Optional, List, Dict
- from datetime import datetime, timedelta
- logger = logging.getLogger(__name__)
- class ODPSClient:
- """ODPS客户端封装"""
- def __init__(self, project="loghubods"):
- """初始化ODPS客户端
- Args:
- project: ODPS项目名称
- """
- self.accessId = "LTAIWYUujJAm7CbH"
- self.accessSecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
- self.endpoint = "http://service.odps.aliyun.com/api"
- self.tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
- self.project = project
- self._odps = None
- self._initialized = False
- def _ensure_initialized(self):
- """确保ODPS客户端已初始化"""
- if self._initialized:
- return True
- try:
- from odps import ODPS
- self._odps = ODPS(
- access_id=self.accessId,
- secret_access_key=self.accessSecret,
- project=self.project,
- endpoint=self.endpoint,
- tunnel_endpoint=self.tunnelUrl
- )
- self._initialized = True
- logger.info(f"ODPS客户端初始化成功,项目: {self.project}")
- return True
- except ImportError:
- logger.error("ODPS SDK 未安装,请运行: pip install pyodps")
- return False
- except Exception as e:
- logger.error(f"ODPS客户端初始化失败: {e}")
- return False
- def query(self, sql: str, limit: Optional[int] = None) -> List[Dict]:
- """执行SQL查询
- Args:
- sql: SQL查询语句
- limit: 最大返回行数
- Returns:
- 查询结果列表(每行为字典)
- """
- if not self._ensure_initialized():
- logger.warning("ODPS客户端未初始化,无法执行查询")
- return []
- try:
- logger.info(f"执行ODPS查询: {sql[:100]}...")
- instance = self._odps.execute_sql(sql)
- instance.wait_for_success()
- with instance.open_reader() as reader:
- results = []
- for i, record in enumerate(reader):
- if limit and i >= limit:
- break
- # 将Record对象转换为字典
- row_dict = {col: record[i] for i, col in enumerate(reader._schema.names)}
- results.append(row_dict)
- logger.info(f"查询完成,返回 {len(results)} 行")
- return results
- except Exception as e:
- logger.error(f"ODPS查询失败: {e}")
- return []
- def fetch_creative_data(
- self,
- start_date: str,
- end_date: str,
- account_ids: Optional[List[int]] = None
- ) -> List[Dict]:
- """从ODPS拉取创意数据
- Args:
- start_date: 开始日期 (YYYYMMDD)
- end_date: 结束日期 (YYYYMMDD)
- account_ids: 账户ID列表,为None时拉取所有账户
- Returns:
- 创意数据列表
- """
- # 构建WHERE条件
- where_clauses = [f"dt BETWEEN '{start_date}' AND '{end_date}'"]
- if account_ids:
- account_ids_str = ",".join(map(str, account_ids))
- where_clauses.append(f"account_id IN ({account_ids_str})")
- where_clause = " AND ".join(where_clauses)
- # SQL查询(根据实际表结构调整)
- sql = f"""
- SELECT
- dt,
- account_id,
- adgroup_id AS ad_id,
- adgroup_name AS ad_name,
- bid_amount,
- configured_status,
- system_status,
- cost,
- view_count,
- valid_click_count,
- conversions_count,
- cost_per_conversion,
- thousand_display_price,
- ctr,
- create_time
- FROM tencent_ad_creative_daily
- WHERE {where_clause}
- ORDER BY dt, account_id, ad_id
- """
- return self.query(sql)
- def test_connection(self) -> bool:
- """测试ODPS连接
- Returns:
- 连接是否成功
- """
- if not self._ensure_initialized():
- return False
- try:
- # 执行简单查询测试连接
- sql = "SELECT 1 AS test"
- result = self.query(sql, limit=1)
- return len(result) > 0
- except Exception as e:
- logger.error(f"ODPS连接测试失败: {e}")
- return False
- # 全局ODPS客户端实例
- _odps_client: Optional[ODPSClient] = None
- def get_odps_client(project="loghubods") -> Optional[ODPSClient]:
- """获取全局ODPS客户端实例
- Args:
- project: ODPS项目名称
- Returns:
- ODPS客户端实例,初始化失败时返回None
- """
- global _odps_client
- if _odps_client is None:
- _odps_client = ODPSClient(project=project)
- if _odps_client._ensure_initialized():
- return _odps_client
- else:
- return None
|