zhangliang 3 viikkoa sitten
vanhempi
commit
d87a19e969
2 muutettua tiedostoa jossa 26 lisäystä ja 25 poistoa
  1. 25 24
      app/main.py
  2. 1 1
      application/common/messageQueue/ack_message.py

+ 25 - 24
app/main.py

@@ -6,6 +6,7 @@ description: 票圈线上代码控制程序,分布式抓取消费核心
 import asyncio
 import json
 import time
+import traceback
 import uuid
 from mq_http_sdk.mq_consumer import *
 from mq_http_sdk.mq_exception import MQExceptionBase
@@ -17,7 +18,7 @@ from application.config import TopicGroup
 from application.common.log import Local
 
 
-async def run(task_id, mode, platform):
+async def run(task_id, mode, platform,trace_id):
     """
     传入参数,然后根据参数执行爬虫代码
     :param task_id: 任务id
@@ -36,17 +37,17 @@ async def run(task_id, mode, platform):
         )
         # 检查进程是否已经启动
         if process.returncode is None:
-            logger.logging(code=1005, message="{}: 启动进程成功,进程ID:{}".format(platform,process.pid))
+            logger.logging(code=1005, message="{}: 启动进程成功,进程ID:{}".format(platform,process.pid),trace_id=trace_id)
         else:
-            logger.logging(code=1006, message="{}: 启动进程失败".format(platform))
+            logger.logging(code=1006, message="{}: 启动进程失败".format(platform),trace_id=trace_id)
         # logger.logging(code=5002, message="successfully run spider")
     except Exception as e:
-        logger.logging(code=1007,message= f"发生未知错误: {e}")
+        logger.logging(code=1007,message= f"发生未知错误: {e}\n {traceback.format_exc()}",trace_id=trace_id)
 
-# def generate_trace_id():
-#     timestamp = str(int(time.time() * 1000))
-#     unique_id = str(uuid.uuid4())
-#     return f"{timestamp}-{unique_id}"
+def generate_trace_id():
+    timestamp = str(int(time.time() * 1000))
+    unique_id = str(uuid.uuid4())
+    return f"{timestamp}-{unique_id}"
 
 
 async def consume_single_message(spider):
@@ -60,44 +61,44 @@ async def consume_single_message(spider):
     mode = spider["mode"]
     logger = AliyunLogger(platform=platform, mode=mode)
     consumer = get_consumer(topic, group)
+    trace_id = generate_trace_id()
     try:
-        try:
-            messages = consumer.consume_message(wait_seconds=10, batch_size=1)
-        except Exception as err:
-            logger.logging(code="5005", message=f"Consume Message Fail! Exception:{err}")
-            raise
+        messages = consumer.consume_message(wait_seconds=10, batch_size=1)
         if messages:
             # 在这里消费消息,做一些数据处理分析
             for single_message in messages:
-                Local.logger(platform, mode).info("收到一条消息\t{}{}".format(single_message, single_message.message_body))
+                message_id = single_message.message_id
+                Local.logger(platform, mode).info("收到一条消息\t{}{},trace_id={},message_id = {}".format(single_message, single_message.message_body, trace_id,message_id))
                 ack_message(
-                    mode=mode, platform=platform, recv_msgs=messages, consumer=consumer
+                    mode=mode, platform=platform, recv_msgs=single_message, consumer=consumer
                 )
                 logger.logging(
                     code=5000,
-                    message="successfully consumed message",
+                    message=f"successfully consumed message [message_id] ={message_id}",
                     data=single_message.message_body,
+                    trace_id=trace_id
                 )
                 message_body = single_message.message_body
                 task_id = json.loads(message_body)["id"]
                 Local.logger(platform, mode).info(
-                    f"task_id=={task_id}")
+                    f"task_id=={task_id} trace_id={trace_id} message_id ={message_id}")
                 # 创建爬虫task
                 await asyncio.create_task(
-                    run(task_id, spider["mode"], spider["platform"])
+                    run(task_id, spider["mode"], spider["platform"],trace_id)
                 )
-                logger.logging(code=5001, message="successfully created task")
+                logger.logging(code=5001, message=f"successfully created task [message_id] = {message_id}",trace_id=trace_id)
         else:
-            logger.logging(code=5003, message="Messages Queue is Empty")
+            logger.logging(code=5003, message="Messages Queue is Empty",trace_id=trace_id)
 
     except MQExceptionBase as err:
+        tb = traceback.format_exc()
         # Topic中没有消息可消费。
         if err.type == "MessageNotExist":
-            message = "No new message! RequestId:{}\n".format(err.req_id)
-            logger.logging(code="5004", message=message)
+            message = "No new message! RequestId:{}\n{}".format(err.req_id,tb)
+            logger.logging(code="5004", message=message,trace_id=trace_id)
         else:
-            message = "Consume Message Fail! Exception:{}\n".format(err)
-            logger.logging(code="5004", message=message)
+            message = "Consume Message Fail! Exception:{}\n{}".format(err,tb)
+            logger.logging(code="5004", message=message,trace_id=trace_id)
 
 
 async def main():

+ 1 - 1
application/common/messageQueue/ack_message.py

@@ -12,7 +12,7 @@ def ack_message(mode, platform, recv_msgs, consumer):
     :param consumer:
     """
     try:
-        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
+        receipt_handle_list = recv_msgs.receipt_handle
         consumer.ack_message(receipt_handle_list)
         Local.logger(platform, mode).info(
             f"Ack {len(receipt_handle_list)} Message Succeed.\n"