فهرست منبع

提交main函数线下测试版本(无阿里云日志)

罗俊辉 1 سال پیش
والد
کامیت
96ca32deb9
1فایلهای تغییر یافته به همراه96 افزوده شده و 0 حذف شده
  1. 96 0
      app/main_dev_version.py

+ 96 - 0
app/main_dev_version.py

@@ -0,0 +1,96 @@
+"""
+Created on January 4, 2024,
+@author: luojunhui
+description: 测试版本
+"""
+import asyncio
+import json
+
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+sys.path.append(os.getcwd())
+
+from application.common import get_consumer, ack_message
+from application.config import TopicGroup
+
+
+async def run(task_id, mode, platform):
+    """
+    传入参数,然后根据参数执行爬虫代码
+    :param task_id: 任务id
+    :param mode: 任务类型
+    :param platform: 哪个抓取平台
+    :return: None
+    """
+    # 创建一个aliyun日志对象
+
+    message = "{}: 开始一轮抓取".format(platform)
+    print(message)
+    # 创建并一个子进程
+    await asyncio.create_subprocess_shell(
+        "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(task_id, mode, platform)
+    )
+    print("successfully run spider")
+
+
+async def consume_single_message(spider):
+    """
+    消费单个消息,若消费成功则启动爬虫新协程;
+    :param spider: 爬虫类
+    """
+    topic = spider['topic']
+    group = spider['group']
+    platform = spider['platform']
+    mode = spider['mode']
+    consumer = get_consumer(topic, group)
+    try:
+        messages = consumer.consume_message(wait_seconds=10, batch_size=1)
+        if messages:
+            # 在这里消费消息,做一些数据处理分析
+            for single_message in messages:
+                ack_message(
+                    mode=mode,
+                    platform=platform,
+                    recv_msgs=messages,
+                    consumer=consumer
+                )
+                message="successfully consumed message"
+                print(message)
+                message_body = single_message.message_body
+                task_id = json.loads(message_body)['id']
+                # 创建爬虫task
+                await asyncio.create_task(run(task_id, spider['mode'], spider['platform']))
+                message="successfully created task"
+                print(message)
+        else:
+            message="Messages Queue is Empty"
+            print(message)
+
+    except MQExceptionBase as err:
+        # Topic中没有消息可消费。
+        if err.type == "MessageNotExist":
+            message = "No new message! RequestId:{}\n".format(err.req_id)
+            print(message)
+        else:
+            message = "Consume Message Fail! Exception:{}\n".format(err)
+            print(message)
+
+
+async def main():
+    """
+    主函数
+    """
+    spider_list = TopicGroup().produce()
+    while spider_list:
+        async_tasks = []
+        for spider in spider_list:
+            task = asyncio.create_task(consume_single_message(spider))
+            async_tasks.append(task)
+        await asyncio.gather(*async_tasks)
+        # await asyncio.sleep(60)  # 每分钟接收一次MQ,
+
+
+if __name__ == '__main__':
+    # 运行主事件循环
+    asyncio.run(main())