Просмотр исходного кода

main函数v1【待优化】

爬虫模版v1 【待优化】上线
罗俊辉 1 год назад
Родитель
Сommit
845c8e2c48

+ 3 - 3
application/common/log/local_log.py

@@ -18,12 +18,12 @@ class Local(object):
 
     # 使用 logger 模块生成日志
     @staticmethod
-    def logger(log_type, crawler):
+    def logger(platform, mode):
         """
         使用 logger 模块生成日志
         """
         # 日志路径
-        log_dir = f"./{crawler}/logs/"
+        log_dir = f"./{platform}/logs/"
         log_path = os.getcwd() + os.sep + log_dir
         if not os.path.isdir(log_path):
             os.makedirs(log_path)
@@ -32,7 +32,7 @@ class Local(object):
         # log_name = time.strftime("%Y-%m-%d", time.localtime(time.time())) + f'-{crawler}-{log_type}.log'
         # log_name = datetime.datetime.now().strftime('%Y-%m-%d') + f'-{crawler}-{log_type}.log'
         # log_name = f"{date.today():%Y-%m-%d}-{crawler}-{log_type}.log"
-        log_name = f"{crawler}-{log_type}-{datetime.now().date().strftime('%Y-%m-%d')}.log"
+        log_name = f"{platform}-{mode}-{datetime.now().date().strftime('%Y-%m-%d')}.log"
 
         # 日志不打印到控制台
         logger.remove(handler_id=None)

+ 3 - 3
application/common/messageQueue/ack_message.py

@@ -2,14 +2,14 @@ from application.common.log import Local
 from mq_http_sdk.mq_exception import MQExceptionBase
 
 
-def ack_message(log_type, crawler, recv_msgs, consumer):
+def ack_message(mode, platform, recv_msgs, consumer):
     # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
     # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
     try:
         receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
         consumer.ack_message(receipt_handle_list)
-        Local.logger(log_type, crawler).info(
+        Local.logger(platform, mode).info(
             f"Ack {len(receipt_handle_list)} Message Succeed.\n"
         )
     except MQExceptionBase as err:
-        Local.logger(log_type, crawler).info(f"Ack Message Fail! Exception:{err}\n")
+        Local.logger(platform, mode).info(f"Ack Message Fail! Exception:{err}\n")

+ 2 - 1
application/config/__init__.py

@@ -1,3 +1,4 @@
 from .ipconfig import ip_config
 from .mysql_config import env_dict
-from .topic_group_queue import TopicGroup
+from .topic_group_queue import TopicGroup
+from .spider_map import spider_map

+ 18 - 0
application/config/spider_map.py

@@ -0,0 +1,18 @@
+"""
+爬虫的配置map, 一个大字典;
+key是spider的platform ;
+sub_key是recommend,author, value;
+value是爬虫封装好的类
+"""
+from application.spider.crawler_online import *
+
+spider_map = {
+    # 祝万物复苏
+    "zhuwanwufusu": {
+        "recommend": "ZhuWanWuFuSuRecommend"
+    },
+    # 测试脚本
+    "test": {
+        "recommend": TestClass
+    }
+}

+ 6 - 3
application/config/topic_group_queue.py

@@ -1,15 +1,18 @@
 class TopicGroup(object):
     def __init__(self):
         self.spider_list = [
-            ("zwwfs", "recommend"),
-            ("zchqs", "recommend"),
+            ("test", "recommend")
+            # ("zwwfs", "recommend"),
+            # ("zchqs", "recommend"),
         ]
 
     def produce(self):
         result = [
             {
                 "topic": "{}_{}_prod".format(i[0], i[1]),
-                "group": "{}_{}_prod".format(i[0], i[1])
+                "group": "{}_{}_prod".format(i[0], i[1]),
+                "mode": i[1],
+                "platform": i[0]
             } for i in self.spider_list
         ]
         return result

Разница между файлами не показана из-за своего большого размера
+ 0 - 0
application/spider/crawler_offline/test.html


+ 1 - 0
application/spider/crawler_online/__init__.py

@@ -0,0 +1 @@
+from .test import TestClass

+ 15 - 0
application/spider/crawler_online/test.py

@@ -0,0 +1,15 @@
+class TestClass(object):
+    def __init__(self, platform, mode, env, rule_dict, user_list):
+        self.platform = platform
+        self.mode = mode
+        self.env = env
+        self.rule_dict = rule_dict
+        self.user_list = user_list
+
+    def run(self):
+        print("爬虫成功启动")
+        print(self.platform)
+        print(self.mode)
+        print(self.env)
+        print(self.rule_dict)
+        print(self.user_list)

+ 31 - 13
main.py

@@ -1,6 +1,6 @@
 import asyncio
+import json
 
-from mq_http_sdk.mq_client import *
 from mq_http_sdk.mq_consumer import *
 from mq_http_sdk.mq_exception import MQExceptionBase
 
@@ -12,31 +12,49 @@ from application.common.mysql import MysqlHelper
 from application.config import TopicGroup
 
 
-async def run():
+async def run(task_id, mode, platform):
     """
     传入参数,然后根据参数执行爬虫代码
-    :return:
+    :return: None
     """
     # 创建并等待一个子进程
-    process = await asyncio.create_subprocess_shell("python3 test2.py")
-    # 等待子进程完成
-    await process.wait()
+    await asyncio.create_subprocess_shell(
+        "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(task_id, mode, platform))
 
 
 async def main():
     spider_list = TopicGroup().produce()
-    async_tasks = []  # 异步任务池
     while spider_list:
         for spider in spider_list:
+            # 获取消息
             topic = spider['topic']
             group = spider['group']
             consumer = get_consumer(topic, group)
-            messages = consumer.consume_message(batch=1, wait_seconds=5)
-            if messages:
-                task = asyncio.create_task(run())
-                async_tasks.append(task)
-            else:
-                continue
+            try:
+                messages = consumer.consume_message(wait_seconds=10, batch_size=1)
+                if messages:
+                    # 在这里消费消息,做一些数据处理分析
+                    for single_message in messages:
+                        ack_message(mode=spider['mode'], platform=spider['platform'], recv_msgs=messages,
+                                    consumer=consumer)
+                        message_body = single_message.message_body
+                        task_id = json.loads(message_body)['id']
+                        print(message_body)
+                        # 创建爬虫task
+                        await asyncio.create_task(run(task_id, spider['mode'], spider['platform']))
+                else:
+                    message = "Messages Queue is Empty"
+                    print(message)
+
+            except MQExceptionBase as err:
+                # Topic中没有消息可消费。
+                if err.type == "MessageNotExist":
+                    message = f"No new message! RequestId:{err.req_id}\n"
+                    print(message)
+                    continue
+                else:
+                    message = f"Consume Message Fail! Exception:{err}\n"
+                    print(message)
 
 
 if __name__ == '__main__':

+ 0 - 0
scheduler/auto_ads_schedule.py


+ 55 - 144
scheduler/run_spider_online.py

@@ -1,155 +1,66 @@
+import json
+import os
+import sys
 import argparse
-import time
-import random
-from mq_http_sdk.mq_client import *
-from mq_http_sdk.mq_consumer import *
-from mq_http_sdk.mq_exception import MQExceptionBase
 
 sys.path.append(os.getcwd())
-# from common.public import task_fun_mq, get_consumer, ack_message
-# from common.scheduling_db import MysqlHelper
-# from common import AliyunLogger
-# from zhuwanwufusu.zhuwanwufusu_recommend import ZhuWanWuFuSuRecommend
 
+from application.common.mysql import MysqlHelper
+from application.config import spider_map
+from application.spider.crawler_online import *
 
-def main(platform, mode, env):
-    # consumer = get_consumer(topic_name, group_id)
-    # # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
-    # # 长轮询时间3秒(最多可设置为30秒)。
-    # wait_seconds = 30
-    # # 一次最多消费3条(最多可设置为16条)。
-    # batch = 1
-    # AliyunLogger.logging(
-    #     code="1000",
-    #     platform=crawler,
-    #     mode=log_type,
-    #     env=env,
-    #     message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
-    #     f"WaitSeconds:{wait_seconds}\n"
-    #     f"TopicName:{topic_name}\n"
-    #     f"MQConsumer:{group_id}",
-    # )
-    while True:
-        try:
-            # 长轮询消费消息。
-            recv_msgs = consumer.consume_message(batch, wait_seconds)
-            for msg in recv_msgs:
-                AliyunLogger.logging(
-                    code="1000",
-                    platform=crawler,
-                    mode=log_type,
-                    env=env,
-                    message=f"Receive\n"
-                    f"MessageId:{msg.message_id}\n"
-                    f"MessageBodyMD5:{msg.message_body_md5}\n"
-                    f"MessageTag:{msg.message_tag}\n"
-                    f"ConsumedTimes:{msg.consumed_times}\n"
-                    f"PublishTime:{msg.publish_time}\n"
-                    f"Body:{msg.message_body}\n"
-                    f"NextConsumeTime:{msg.next_consume_time}\n"
-                    f"ReceiptHandle:{msg.receipt_handle}\n"
-                    f"Properties:{msg.properties}",
-                )
-                # ack_mq_message
-                ack_message(
-                    log_type=log_type,
-                    crawler=crawler,
-                    recv_msgs=recv_msgs,
-                    consumer=consumer,
-                )
-                # 解析 task_dict
-                task_dict = task_fun_mq(msg.message_body)["task_dict"]
-                AliyunLogger.logging(
-                    code="1000",
-                    platform=crawler,
-                    mode=log_type,
-                    env=env,
-                    message="f调度任务:{task_dict}",
-                )
-                # 解析 rule_dict
-                rule_dict = task_fun_mq(msg.message_body)["rule_dict"]
-                AliyunLogger.logging(
-                    code="1000",
-                    platform=crawler,
-                    mode=log_type,
-                    env=env,
-                    message=f"抓取规则:{rule_dict}\n",
-                )
-                # 解析 user_list
-                task_id = task_dict["id"]
-                select_user_sql = (
-                    f"""select * from crawler_user_v3 where task_id={task_id}"""
-                )
-                user_list = MysqlHelper.get_values(
-                    log_type, crawler, select_user_sql, env, action=""
-                )
-                AliyunLogger.logging(
-                    code="1003",
-                    platform=crawler,
-                    mode=log_type,
-                    env=env,
-                    message="开始抓取"
-                )
-                AliyunLogger.logging(
-                    code="1000",
-                    platform=crawler,
-                    mode=log_type,
-                    env=env,
-                    message="开始抓取祝万物复苏——推荐",
-                )
-                main_process = ZhuWanWuFuSuRecommend(
-                    platform=crawler,
-                    mode=log_type,
-                    rule_dict=rule_dict,
-                    user_list=user_list,
-                    env=env
-                )
-                main_process.schedule()
-                AliyunLogger.logging(
-                    code="1000",
-                    platform=crawler,
-                    mode=log_type,
-                    env=env,
-                    message="完成抓取——祝万物复苏",
-                )
-                AliyunLogger.logging(
-                    code="1004", platform=crawler, mode=log_type, env=env,message="结束一轮抓取"
-                )
 
-        except MQExceptionBase as err:
-            # Topic中没有消息可消费。
-            if err.type == "MessageNotExist":
-                AliyunLogger.logging(
-                    code="2000",
-                    platform=crawler,
-                    mode=log_type,
-                    env=env,
-                    message=f"No new message! RequestId:{err.req_id}\n",
-                )
-                continue
-            AliyunLogger.logging(
-                code="2000",
-                platform=crawler,
-                mode=log_type,
-                env=env,
-                message=f"Consume Message Fail! Exception:{err}\n",
+class OnlineManager(object):
+    def __init__(self, task_id, mode, platform):
+        self.env = "prod"
+        self.task_id = task_id
+        self.mode = mode
+        self.platform = platform
+        self.MySQL = MysqlHelper(mode=self.mode, platform=self.platform, env=self.env)
+
+    def get_task_rule(self):
+        """
+        :return: 返回任务的规则, task_rule
+        """
+        rule_dict = {}
+        task_rule_sql = f"SELECT rule FROM crawler_task_v3 WHERE id = {self.task_id};"
+        data = self.MySQL.select(task_rule_sql)
+        if data:
+            rule_list = json.loads(data[0][0])
+            for item in rule_list:
+                for key in item:
+                    rule_dict[key] = item[key]
+        return rule_dict
+
+    def get_task_user_list(self):
+        """
+        :return: 返回用户列表
+        """
+        task_user_list_sql = f"SELECT uid, link from crawler_user_v3 where task_id = {self.task_id};"
+        uid_list = self.MySQL.select(task_user_list_sql)
+        user_list = [{"uid": i[0], "link": i[1]} for i in uid_list] if uid_list else []
+        return user_list
+
+    def start_crawl(self):
+        rule_dict = self.get_task_rule()
+        user_list = self.get_task_user_list()
+        if rule_dict and user_list:
+            spider_class = spider_map[self.platform][self.mode]
+            main_process = spider_class(
+                platform=self.platform,
+                mode=self.mode,
+                rule_dict=rule_dict,
+                user_list=user_list,
+                env=self.env
             )
-            time.sleep(2)
-            continue
+            main_process.run()
 
 
 if __name__ == "__main__":
     parser = argparse.ArgumentParser()  ## 新建参数解释器对象
-    parser.add_argument("--log_type", type=str)  ## 添加参数,注明参数类型
-    parser.add_argument("--crawler")  ## 添加参数
-    parser.add_argument("--topic_name")  ## 添加参数
-    parser.add_argument("--group_id")  ## 添加参数
-    parser.add_argument("--env")  ## 添加参数
-    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
-    main(
-        log_type=args.log_type,
-        crawler=args.crawler,
-        topic_name=args.topic_name,
-        group_id=args.group_id,
-        env=args.env,
-    )
+    parser.add_argument("--task_id")
+    parser.add_argument("--mode")
+    parser.add_argument("--platform")
+    args = parser.parse_args()
+    M = OnlineManager(task_id=args.task_id, mode=args.mode, platform=args.platform)
+    M.start_crawl()

+ 25 - 0
tt.py

@@ -0,0 +1,25 @@
+import asyncio
+import time
+
+
+async def run(task_id, mode, platform):
+    """
+    传入参数,然后根据参数执行爬虫代码
+    :return: None
+    """
+    # 创建并等待一个子进程
+    await asyncio.create_subprocess_shell("python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(task_id, mode, platform))
+
+
+async def main():
+    # 创建爬虫task
+    while True:
+        for task_id in range(95, 96):
+            print("start:{:02},  {}".format(task_id, int(time.time())))
+            await asyncio.create_task(run(task_id, "recommend", "test"))
+            time.sleep(1)
+
+
+if __name__ == '__main__':
+    # 运行主事件循环
+    asyncio.run(main())

Некоторые файлы не были показаны из-за большого количества измененных файлов