123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- # -*- coding: utf-8 -*-
- """
- 公共方法,包含:生成log / 查询log
- """
- import json
- from aliyun.log import LogClient, GetLogsRequest
- from datetime import date, timedelta
- from datetime import datetime
- from typing import Optional
- from aliyun.log import PutLogsRequest, LogClient, LogItem
- class AliyunLogger:
- # 查询阿里云日志
- @staticmethod
- def query_logs_by_status():
- pq_list = []
- # 当前时间
- now = datetime.now()
- # 设置开始时间为当前时间的 1 分钟前
- start_time = int((now - timedelta(minutes=1)).timestamp())
- # 设置结束时间为当前时间
- end_time = int(now.timestamp())
- accessKeyId = "LTAIWYUujJAm7CbH"
- accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
- project = "manager-new"
- logstore = "request-log"
- endpoint = "cn-hangzhou.log.aliyuncs.com"
- try:
- # 创建 LogClient 实例
- client = LogClient(endpoint, accessKeyId, accessKey)
- # 新的查询条件
- query = "* and url: updateVideoRecommendStatus | select trim(split(requestBody, ',')[1], '[]') as videoid, trim(split(requestBody, ',')[3], '[]') as videorecomendstatus HAVING videorecomendstatus='-6'"
- # 创建查询日志的请求
- request = GetLogsRequest(
- project=project,
- logstore=logstore,
- fromTime=start_time,
- toTime=end_time,
- topic="",
- query=query,
- line=100,
- offset=0,
- reverse=False
- )
- # 打印请求参数
- print(f"请求参数: {request}")
- # 执行查询请求
- response = client.get_logs(request)
- if response.get_count() > 0:
- logs = response.get_logs()
- for log in logs:
- contents = log.get_contents()
- # 确保处理内容时不产生错误
- log_str = {}
- for k, v in contents.items():
- if isinstance(v, bytes):
- log_str[k] = v.decode('utf-8', errors='replace') # 解码字节为字符串,替换错误字符
- elif isinstance(v, str):
- log_str[k] = v # 保持字符串类型
- else:
- log_str[k] = str(v) # 其他类型转换为字符串
- video_id = log_str['videoid']
- if int(video_id) >= 30418085:
- AliyunLogger.logging(video_id, "扫描到一条视频ID,等待封面处理", "1001")
- # 输出日志内容
- pq_list.append(video_id)
- return pq_list
- else:
- print("没有符合条件的日志")
- return pq_list
- except Exception as e:
- print(f"查询日志时出错: {e}")
- return pq_list
- # 写入阿里云日志
- @staticmethod
- def logging(video_id: str,
- message: str,
- code: str,
- data: Optional[str] = None,
- old_cover_url: Optional[str] = None,
- new_cover_url: Optional[str] = None
- ):
- """
- 写入阿里云日志
- 测试库: https://sls.console.aliyun.com/lognext/project/crawler-log-dev/logsearch/crawler-log-dev
- 正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod
- """
- accessKeyId = "LTAIWYUujJAm7CbH"
- accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
- project = "crawler-log-prod"
- logstore = "video_cover_info"
- endpoint = "cn-hangzhou.log.aliyuncs.com"
- try:
- # if data:
- # data = dict(item.split(":", 1) for item in data.split(","))
- contents = [
- ("video_id", str(video_id)),
- ("message", message),
- ("code", code),
- ("old_cover_url", old_cover_url if old_cover_url is not None else ""),
- ("new_cover_url", new_cover_url if new_cover_url is not None else ""),
- ("data", json.dumps(data, ensure_ascii=False) if data else ""),
- ]
- # 创建 LogClient 实例
- client = LogClient(endpoint, accessKeyId, accessKey)
- log_group = []
- log_item = LogItem()
- log_item.set_contents(contents)
- log_group.append(log_item)
- # 写入日志
- request = PutLogsRequest(
- project=project,
- logstore=logstore,
- topic="",
- source="",
- logitems=log_group,
- compress=False,
- )
- client.put_logs(request)
- except Exception as e:
- print(f"写入日志失败: {e}")
- # 示例使用
- if __name__ == "__main__":
- # 当前时间
- now = datetime.now()
- # 设置开始时间为当前时间的 1 分钟前
- start_time = int((now - timedelta(minutes=1)).timestamp())
- # 设置结束时间为当前时间
- end_time = int(now.timestamp())
- AliyunLogger.query_logs_by_status()
|