소스 검색

新框架 bug 再测试

罗俊辉 1 년 전
부모
커밋
0987c63c13
2개의 변경된 파일7개의 추가작업 그리고 4개의 파일을 삭제
  1. 6 4
      app/main.py
  2. 1 0
      application/common/messageQueue/ack_message.py

+ 6 - 4
app/main.py

@@ -14,6 +14,7 @@ sys.path.append(os.getcwd())
 
 from application.common import AliyunLogger, get_consumer, ack_message
 from application.config import TopicGroup
+from application.common.log import Local
 
 
 async def run(task_id, mode, platform):
@@ -52,6 +53,7 @@ async def consume_single_message(spider):
         if messages:
             # 在这里消费消息,做一些数据处理分析
             for single_message in messages:
+                Local.logger(platform, mode).info("收到一条消息\t{}{}".format(single_message, single_message.message_body))
                 ack_message(
                     mode=mode, platform=platform, recv_msgs=messages, consumer=consumer
                 )
@@ -86,10 +88,10 @@ async def main():
     """
     spider_list = TopicGroup().produce()
     while spider_list:
-        print(spider_list)
-        # tasks = [consume_single_message(spider) for spider in spider_list]
-        # await asyncio.gather(*tasks)
-        await asyncio.sleep(2000)
+        # print(spider_list)
+        tasks = [consume_single_message(spider) for spider in spider_list]
+        await asyncio.gather(*tasks)
+        await asyncio.sleep(20)
 
 
         # print("Hello World {}".format(" ".join(spider_list)))

+ 1 - 0
application/common/messageQueue/ack_message.py

@@ -17,5 +17,6 @@ def ack_message(mode, platform, recv_msgs, consumer):
         Local.logger(platform, mode).info(
             f"Ack {len(receipt_handle_list)} Message Succeed.\n"
         )
+
     except MQExceptionBase as err:
         Local.logger(platform, mode).info(f"Ack Message Fail! Exception:{err}\n")