aliyun_log.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. # -*- coding: utf-8 -*-
  2. """
  3. 公共方法,包含:生成log / 查询log
  4. """
  5. import json
  6. from aliyun.log import LogClient, GetLogsRequest
  7. from datetime import date, timedelta
  8. from datetime import datetime
  9. from typing import Optional
  10. from aliyun.log import PutLogsRequest, LogClient, LogItem
  11. class AliyunLogger:
  12. # 查询阿里云日志
  13. @staticmethod
  14. def query_logs_by_status():
  15. pq_list = []
  16. # 当前时间
  17. now = datetime.now()
  18. # 设置开始时间为当前时间的 1 分钟前
  19. start_time = int((now - timedelta(minutes=1)).timestamp())
  20. # 设置结束时间为当前时间
  21. end_time = int(now.timestamp())
  22. accessKeyId = "LTAIWYUujJAm7CbH"
  23. accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
  24. project = "manager-new"
  25. logstore = "request-log"
  26. endpoint = "cn-hangzhou.log.aliyuncs.com"
  27. try:
  28. # 创建 LogClient 实例
  29. client = LogClient(endpoint, accessKeyId, accessKey)
  30. # 新的查询条件
  31. query = "* and url: updateVideoRecommendStatus | select trim(split(requestBody, ',')[1], '[]') as videoid, trim(split(requestBody, ',')[3], '[]') as videorecomendstatus HAVING videorecomendstatus='-6'"
  32. # 创建查询日志的请求
  33. request = GetLogsRequest(
  34. project=project,
  35. logstore=logstore,
  36. fromTime=start_time,
  37. toTime=end_time,
  38. topic="",
  39. query=query,
  40. line=100,
  41. offset=0,
  42. reverse=False
  43. )
  44. # 打印请求参数
  45. print(f"请求参数: {request}")
  46. # 执行查询请求
  47. response = client.get_logs(request)
  48. if response.get_count() > 0:
  49. logs = response.get_logs()
  50. for log in logs:
  51. contents = log.get_contents()
  52. # 确保处理内容时不产生错误
  53. log_str = {}
  54. for k, v in contents.items():
  55. if isinstance(v, bytes):
  56. log_str[k] = v.decode('utf-8', errors='replace') # 解码字节为字符串,替换错误字符
  57. elif isinstance(v, str):
  58. log_str[k] = v # 保持字符串类型
  59. else:
  60. log_str[k] = str(v) # 其他类型转换为字符串
  61. video_id = log_str['videoid']
  62. if int(video_id) >= 30418085:
  63. AliyunLogger.logging(video_id, "扫描到一条视频ID,等待封面处理", "1001")
  64. # 输出日志内容
  65. pq_list.append(video_id)
  66. return pq_list
  67. else:
  68. print("没有符合条件的日志")
  69. return pq_list
  70. except Exception as e:
  71. print(f"查询日志时出错: {e}")
  72. return pq_list
  73. # 写入阿里云日志
  74. @staticmethod
  75. def logging(video_id: str,
  76. message: str,
  77. code: str,
  78. data: Optional[str] = None,
  79. old_cover_url: Optional[str] = None,
  80. new_cover_url: Optional[str] = None
  81. ):
  82. """
  83. 写入阿里云日志
  84. 测试库: https://sls.console.aliyun.com/lognext/project/crawler-log-dev/logsearch/crawler-log-dev
  85. 正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod
  86. """
  87. accessKeyId = "LTAIWYUujJAm7CbH"
  88. accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
  89. project = "crawler-log-prod"
  90. logstore = "video_cover_info"
  91. endpoint = "cn-hangzhou.log.aliyuncs.com"
  92. try:
  93. # if data:
  94. # data = dict(item.split(":", 1) for item in data.split(","))
  95. contents = [
  96. ("video_id", str(video_id)),
  97. ("message", message),
  98. ("code", code),
  99. ("old_cover_url", old_cover_url if old_cover_url is not None else ""),
  100. ("new_cover_url", new_cover_url if new_cover_url is not None else ""),
  101. ("data", json.dumps(data, ensure_ascii=False) if data else ""),
  102. ]
  103. # 创建 LogClient 实例
  104. client = LogClient(endpoint, accessKeyId, accessKey)
  105. log_group = []
  106. log_item = LogItem()
  107. log_item.set_contents(contents)
  108. log_group.append(log_item)
  109. # 写入日志
  110. request = PutLogsRequest(
  111. project=project,
  112. logstore=logstore,
  113. topic="",
  114. source="",
  115. logitems=log_group,
  116. compress=False,
  117. )
  118. client.put_logs(request)
  119. except Exception as e:
  120. print(f"写入日志失败: {e}")
  121. # 示例使用
  122. if __name__ == "__main__":
  123. # 当前时间
  124. now = datetime.now()
  125. # 设置开始时间为当前时间的 1 分钟前
  126. start_time = int((now - timedelta(minutes=1)).timestamp())
  127. # 设置结束时间为当前时间
  128. end_time = int(now.timestamp())
  129. AliyunLogger.query_logs_by_status()