zhangliang hai 8 horas
pai
achega
9a7cfedb14
Modificáronse 1 ficheiros con 3 adicións e 5 borrados
  1. 3 5
      scheduler/async_consumer.py

+ 3 - 5
scheduler/async_consumer.py

@@ -40,8 +40,6 @@ async def async_handle_topic(topic: str, stop_event: asyncio.Event):
                     data=payload,
                     account=topic
                 )
-                # 确认消息(单条消息处理成功后才 Ack) 由于获取到消息到确认消息有超时问题,暂时先把ask提前,后面改为使用sdk
-                await consumer.ack_message(message.receipt_handle)
 
                 # 从数据库查询配置
                 async with AsyncMysqlService() as mysql:
@@ -54,10 +52,10 @@ async def async_handle_topic(topic: str, stop_event: asyncio.Event):
                     rule_dict=rule_dict,
                     user_list=user_list,
                 )
+                await crawler.run()  # 爬虫成功执行后再确认消息
 
-
-
-                await crawler.run()
+                # 确认消息(单条消息处理成功后才 Ack)
+                await consumer.ack_message(message.receipt_handle)
 
                 logger.info(f"[{topic}]任务 {task_id} 执行成功并已 Ack")
                 aliyun_logger.logging(