import time import asyncio from unittest import result from aliyun.log import LogClient, PutLogsRequest, LogItem from app.core.config import settings from app.core.logger import logger class AliyunLogClient: def __init__(self, project, logstore): self.project = project self.logstore = logstore self.client = LogClient( f"{settings.ALIYUN_REGION_ID}.log.aliyuncs.com", settings.ALIYUN_ACCESS_KEY_ID, settings.ALIYUN_ACCESS_KEY_SECRET ) self.enabled = settings.ENABLE_MONITOR def write_log(self, message, level="INFO", **kwargs): """写入阿里云日志""" if not self.enabled: return try: log_item = LogItem(timestamp=int(time.time())) contents = [ ("level", level), ("message", str(message)), ("timestamp", str(int(time.time()))) ] for key, value in kwargs.items(): contents.append((str(key), str(value))) log_item.set_contents(contents) request = PutLogsRequest( project=self.project, logstore=self.logstore, logitems=[log_item] ) self.client.put_logs(request) except Exception as e: logger.error(f"Failed to write log to Aliyun SLS: {e}") async def async_write_log(self, message, level="INFO", **kwargs): """异步写入阿里云日志""" if not self.enabled: return loop = asyncio.get_event_loop() await loop.run_in_executor(None, lambda: self.write_log(message, level, **kwargs)) async def async_put_log(self, keyword, status, step="",msg="",result=""): """异步记录API请求状态""" await self.async_write_log( level="INFO" if status == "success" else "ERROR", keyword=keyword, status=status, message=msg, step_name=step, result=result ) # 使用示例 ali_log_client = AliyunLogClient("crawler-log-prod", "wx_trend")