# -*- coding: utf-8 -*- """ 公共方法,包含:生成log / 查询log """ import json from aliyun.log import LogClient, GetLogsRequest from datetime import date, timedelta from datetime import datetime from typing import Optional from aliyun.log import PutLogsRequest, LogClient, LogItem class AliyunLogger: # 查询阿里云日志 @staticmethod def query_logs_by_status(): pq_list = [] # 当前时间 now = datetime.now() # 设置开始时间为当前时间的 1 分钟前 start_time = int((now - timedelta(minutes=1)).timestamp()) # 设置结束时间为当前时间 end_time = int(now.timestamp()) accessKeyId = "LTAIWYUujJAm7CbH" accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P" project = "manager-new" logstore = "request-log" endpoint = "cn-hangzhou.log.aliyuncs.com" try: # 创建 LogClient 实例 client = LogClient(endpoint, accessKeyId, accessKey) # 新的查询条件 query = "* and url: updateVideoRecommendStatus | select trim(split(requestBody, ',')[1], '[]') as videoid, trim(split(requestBody, ',')[3], '[]') as videorecomendstatus HAVING videorecomendstatus='-6'" # 创建查询日志的请求 request = GetLogsRequest( project=project, logstore=logstore, fromTime=start_time, toTime=end_time, topic="", query=query, line=100, offset=0, reverse=False ) # 打印请求参数 print(f"请求参数: {request}") # 执行查询请求 response = client.get_logs(request) if response.get_count() > 0: logs = response.get_logs() for log in logs: contents = log.get_contents() # 确保处理内容时不产生错误 log_str = {} for k, v in contents.items(): if isinstance(v, bytes): log_str[k] = v.decode('utf-8', errors='replace') # 解码字节为字符串,替换错误字符 elif isinstance(v, str): log_str[k] = v # 保持字符串类型 else: log_str[k] = str(v) # 其他类型转换为字符串 video_id = log_str['videoid'] AliyunLogger.logging(video_id, "扫描到一条视频ID,等待封面处理", "1001") # 输出日志内容 pq_list.append(video_id) return pq_list else: print("没有符合条件的日志") return pq_list except Exception as e: print(f"查询日志时出错: {e}") return pq_list # 写入阿里云日志 @staticmethod def logging(video_id: str, message: str, code: str, data: Optional[str] = None, old_cover_url: Optional[str] = None, new_cover_url: Optional[str] = None ): """ 写入阿里云日志 测试库: https://sls.console.aliyun.com/lognext/project/crawler-log-dev/logsearch/crawler-log-dev 正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod """ accessKeyId = "LTAIWYUujJAm7CbH" accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P" project = "crawler-log-prod" logstore = "video_cover_info" endpoint = "cn-hangzhou.log.aliyuncs.com" try: # if data: # data = dict(item.split(":", 1) for item in data.split(",")) contents = [ ("video_id", str(video_id)), ("message", message), ("code", code), ("old_cover_url", old_cover_url if old_cover_url is not None else ""), ("new_cover_url", new_cover_url if new_cover_url is not None else ""), ("data", json.dumps(data, ensure_ascii=False) if data else ""), ] # 创建 LogClient 实例 client = LogClient(endpoint, accessKeyId, accessKey) log_group = [] log_item = LogItem() log_item.set_contents(contents) log_group.append(log_item) # 写入日志 request = PutLogsRequest( project=project, logstore=logstore, topic="", source="", logitems=log_group, compress=False, ) client.put_logs(request) except Exception as e: print(f"写入日志失败: {e}") # 示例使用 if __name__ == "__main__": # 当前时间 now = datetime.now() # 设置开始时间为当前时间的 1 分钟前 start_time = int((now - timedelta(minutes=1)).timestamp()) # 设置结束时间为当前时间 end_time = int(now.timestamp()) AliyunLogger.query_logs_by_status()