| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- import time
- import asyncio
- from unittest import result
- from aliyun.log import LogClient, PutLogsRequest, LogItem
- from app.core.config import settings
- from app.core.logger import logger
- class AliyunLogClient:
- def __init__(self, project, logstore):
- self.project = project
- self.logstore = logstore
- self.client = LogClient(
- f"{settings.ALIYUN_REGION_ID}.log.aliyuncs.com",
- settings.ALIYUN_ACCESS_KEY_ID,
- settings.ALIYUN_ACCESS_KEY_SECRET
- )
- self.enabled = settings.ENABLE_MONITOR
- def write_log(self, message, level="INFO", **kwargs):
- """写入阿里云日志"""
- if not self.enabled:
- return
- try:
- log_item = LogItem(timestamp=int(time.time()))
- contents = [
- ("level", level),
- ("message", str(message)),
- ("timestamp", str(int(time.time())))
- ]
- for key, value in kwargs.items():
- contents.append((str(key), str(value)))
- log_item.set_contents(contents)
- request = PutLogsRequest(
- project=self.project,
- logstore=self.logstore,
- logitems=[log_item]
- )
- self.client.put_logs(request)
- except Exception as e:
- logger.error(f"Failed to write log to Aliyun SLS: {e}")
- async def async_write_log(self, message, level="INFO", **kwargs):
- """异步写入阿里云日志"""
- if not self.enabled:
- return
- loop = asyncio.get_event_loop()
- await loop.run_in_executor(None, lambda: self.write_log(message, level, **kwargs))
- async def async_put_log(self, keyword, status, step="",msg="",result=""):
- """异步记录API请求状态"""
- await self.async_write_log(
- level="INFO" if status == "success" else "ERROR",
- keyword=keyword,
- status=status,
- message=msg,
- step_name=step,
- result=result
-
- )
- # 使用示例
- ali_log_client = AliyunLogClient("crawler-log-prod", "wx_trend")
|