Przeglądaj źródła

提交mysql——helper

罗俊辉 1 rok temu
rodzic
commit
a46007b3cb

+ 19 - 2
app/main.py

@@ -6,22 +6,35 @@ from mq_http_sdk.mq_exception import MQExceptionBase
 
 sys.path.append(os.getcwd())
 
-from application.common import MysqlHelper, AliyunLogger, get_consumer, ack_message
+from application.common import AliyunLogger, get_consumer, ack_message
 from application.config import TopicGroup
 
 
 async def run(task_id, mode, platform):
     """
     传入参数,然后根据参数执行爬虫代码
+    :param task_id: 任务id
+    :param mode: 任务类型
+    :param platform: 哪个抓取平台
     :return: None
     """
-    # 创建并等待一个子进程
+    # 创建一个aliyun日志对象
+    logger = AliyunLogger(platform=platform, mode=mode)
+    logger.logging(
+        code=1003,
+        message="{}: 开始一轮抓取".format(platform)
+    )
+    # 创建并一个子进程
     await asyncio.create_subprocess_shell(
         "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(task_id, mode, platform)
     )
 
 
 async def consume_single_message(spider):
+    """
+    消费单个消息,若消费成功则启动爬虫新协程;
+    :param spider: 爬虫类
+    """
     topic = spider['topic']
     group = spider['group']
     consumer = get_consumer(topic, group)
@@ -54,6 +67,9 @@ async def consume_single_message(spider):
 
 
 async def main():
+    """
+    主函数
+    """
     spider_list = TopicGroup().produce()
     while spider_list:
         async_tasks = []
@@ -61,6 +77,7 @@ async def main():
             task = asyncio.create_task(consume_single_message(spider))
             async_tasks.append(task)
         await asyncio.gather(*async_tasks)
+        await asyncio.sleep(60)    # 每分钟接收一次MQ,
 
 
 if __name__ == '__main__':

+ 0 - 25
app/tt.py

@@ -1,25 +0,0 @@
-import asyncio
-import time
-
-
-async def run(task_id, mode, platform):
-    """
-    传入参数,然后根据参数执行爬虫代码
-    :return: None
-    """
-    # 创建并等待一个子进程
-    await asyncio.create_subprocess_shell("python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(task_id, mode, platform))
-
-
-async def main():
-    # 创建爬虫task
-    while True:
-        for task_id in range(95, 96):
-            print("start:{:02},  {}".format(task_id, int(time.time())))
-            await asyncio.create_task(run(task_id, "recommend", "test"))
-            time.sleep(1)
-
-
-if __name__ == '__main__':
-    # 运行主事件循环
-    asyncio.run(main())

+ 4 - 3
application/common/log/aliyun_log.py

@@ -6,15 +6,16 @@
 """
 import json
 from aliyun.log import LogClient, PutLogsRequest, LogItem
-from datetime import date, timedelta
-from datetime import datetime
 import time
 
 proxies = {"http": None, "https": None}
 
 
 class AliyunLogger(object):
-    def __init__(self, platform, mode, env):
+    """
+    阿里云日志方法
+    """
+    def __init__(self, platform, mode, env="prod"):
         self.platform = platform
         self.mode = mode
         self.env = env

+ 1 - 1
application/common/mysql/mysql_helper.py

@@ -19,7 +19,7 @@ class MysqlHelper(object):
     """
     MySQL
     """
-    def __init__(self, env, mode, platform, action=''):
+    def __init__(self, env='', mode='', platform='', action=''):
         mysql_config = env_dict[env]
         self.connection = pymysql.connect(
             host=mysql_config['host'],  # 数据库IP地址,内网地址

+ 3 - 0
application/functions/crypt.py

@@ -0,0 +1,3 @@
+"""
+爬虫逆向加密算法
+"""

+ 12 - 0
spider/crawler_online/zhuhaoshiduomo.py

@@ -41,6 +41,9 @@ class AESCipher:
 
 
 class ZhuHaoShiDuoMoRecommend(object):
+    """
+    祝好事多磨小程序爬虫,测试版本
+    """
     def __init__(self, platform, mode, rule_dict, user_list, env):
         self.platform = platform
         self.mode = mode
@@ -53,6 +56,10 @@ class ZhuHaoShiDuoMoRecommend(object):
         self.cryptor = AESCipher()
 
     def get_recommend_list(self):
+        """
+        获取推荐流
+        :return:
+        """
         url = "https://api.lidongze.cn/jeecg-boot/ugc/getVideoListsEn2"
         headers = {
             'Host': 'api.lidongze.cn',
@@ -87,6 +94,11 @@ class ZhuHaoShiDuoMoRecommend(object):
                 self.process_video_obj(video_obj)
 
     def process_video_obj(self, video_obj):
+        """
+        处理视频信息,清洗,规范化,发送至ETL
+        :param video_obj: 视频信息
+        :return: None
+        """
         trace_id = self.platform + str(uuid.uuid1())
         play_cnt = int(video_obj['playnum'].replace("万+", "0000")) if "万+" in video_obj['playnum'] else int(
             video_obj['playnum'])