Selaa lähdekoodia

福小顺上线(异步),
爬虫模版异步化

罗俊辉 1 vuosi sitten
vanhempi
commit
7b447ee91b

+ 32 - 14
application/pipeline/pipeline.py

@@ -9,6 +9,10 @@ from application.common import MysqlHelper, AliyunLogger
 
 
 class PiaoQuanPipeline:
+    """
+    爬虫管道——爬虫规则判断
+    """
+
     def __init__(self, platform, mode, rule_dict, env, item, trace_id):
         self.platform = platform
         self.mode = mode
@@ -16,11 +20,14 @@ class PiaoQuanPipeline:
         self.rule_dict = rule_dict
         self.env = env
         self.trace_id = trace_id
-        self.mysql = MysqlHelper(env=env,mode=mode, platform=platform)
+        self.mysql = MysqlHelper(env=env, mode=mode, platform=platform)
         self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)
 
-    # 视频的发布时间限制, 属于是规则过滤
     def publish_time_flag(self):
+        """
+        判断发布时间是否过期
+        :return: True or False
+        """
         # 判断发布时间
         publish_time_stamp = self.item["publish_time_stamp"]
         update_time_stamp = self.item["update_time_stamp"]
@@ -29,11 +36,11 @@ class PiaoQuanPipeline:
         days = max_d if max_d > min_d else min_d
         if self.platform == "gongzhonghao":
             if (
-                int(time.time()) - publish_time_stamp
-                > 3600 * 24 * days
+                    int(time.time()) - publish_time_stamp
+                    > 3600 * 24 * days
             ) and (
-                int(time.time()) - update_time_stamp
-                > 3600 * 24 * days
+                    int(time.time()) - update_time_stamp
+                    > 3600 * 24 * days
             ):
                 self.aliyun_log.logging(
                     code="2004",
@@ -44,8 +51,8 @@ class PiaoQuanPipeline:
                 return False
         else:
             if (
-                int(time.time()) - publish_time_stamp
-                > 3600 * 24 * days
+                    int(time.time()) - publish_time_stamp
+                    > 3600 * 24 * days
             ):
                 self.aliyun_log.logging(
                     code="2004",
@@ -56,8 +63,11 @@ class PiaoQuanPipeline:
                 return False
         return True
 
-    # 视频标题是否满足需求
     def title_flag(self):
+        """
+        视频标题是否满足需求
+        :return:
+        """
         title = self.item["video_title"]
         cleaned_title = re.sub(r"[^\w]", " ", title)
         # 敏感词
@@ -73,8 +83,11 @@ class PiaoQuanPipeline:
             return False
         return True
 
-    # 视频基础下载规则
     def download_rule_flag(self):
+        """
+        视频基础下载规则
+        :return:
+        """
         for key in self.item:
             if self.rule_dict.get(key):
                 max_value = (
@@ -82,7 +95,7 @@ class PiaoQuanPipeline:
                     if int(self.rule_dict[key]["max"]) > 0
                     else 999999999999999
                 )
-                if key == "peroid": # peroid是抓取周期天数
+                if key == "peroid":  # peroid是抓取周期天数
                     continue
                 else:
                     flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
@@ -106,6 +119,10 @@ class PiaoQuanPipeline:
 
     # 按照某个具体平台来去重
     def repeat_video(self):
+        """
+        视频是否重复
+        :return:
+        """
         # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
         out_id = self.item["out_video_id"]
         sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
@@ -121,6 +138,10 @@ class PiaoQuanPipeline:
         return True
 
     def process_item(self):
+        """
+        全规则判断,符合规则的数据则return True
+        :return:
+        """
         if not self.publish_time_flag():
             # 记录相关日志
             return False
@@ -134,6 +155,3 @@ class PiaoQuanPipeline:
             # 记录相关日志
             return False
         return True
-
-
-

+ 2 - 0
requirements.txt

@@ -105,3 +105,5 @@ workalendar==17.0.0
 wsproto==1.2.0
 you-get==0.4.1650
 zstandard==0.19.0
+
+aiohttp~=3.9.1

+ 3 - 1
scheduler/run_spider_online.py

@@ -1,6 +1,7 @@
 import json
 import os
 import sys
+import asyncio
 import argparse
 
 sys.path.append(os.getcwd())
@@ -71,7 +72,8 @@ class OnlineManager(object):
                     user_list=user_list,
                     env=self.env
                 )
-                main_process.run()
+                loop = asyncio.get_event_loop()
+                loop.run_until_complete(main_process.run())
                 self.logger.logging(code=1004, message="完成一轮抓取")
             except Exception as e:
                 self.logger.logging(code=1006, message="启动爬虫出现错误, 报错原因是: {}".format(e))

+ 2 - 1
spider/crawler_online/__init__.py

@@ -1,2 +1,3 @@
 from .test import TestClass
-from .zhuhaoshiduomo import ZhuHaoShiDuoMoRecommend
+from .zhuhaoshiduomo import ZhuHaoShiDuoMoRecommend
+from .fuxiaoshun import FuXiaoShunRecommend

+ 185 - 0
spider/crawler_online/fuxiaoshun.py

@@ -0,0 +1,185 @@
+"""
+福小顺推荐爬虫代码
+2024-01-22
+"""
+
+import os
+import sys
+import json
+import time
+import uuid
+import random
+import asyncio
+import aiohttp
+import datetime
+from base64 import b64decode
+from Crypto.Cipher import AES
+from Crypto.Util.Padding import unpad
+
+sys.path.append(os.getcwd())
+
+from application.items import VideoItem
+from application.pipeline import PiaoQuanPipeline
+from application.common.messageQueue import MQ
+from application.common.proxies import tunnel_proxies
+from application.common.log import AliyunLogger
+
+
+def fxs_decrypt(ciphertext):
+    """
+    福小顺逆向解密代码
+    :param ciphertext: 秘文
+    :return: 原文
+    """
+    password = 'xlc2ze7qnqg8xi1d'.encode()
+    iv = password
+    try:
+        ct = b64decode(ciphertext.encode('utf-8'))
+        cipher = AES.new(password, AES.MODE_CBC, iv)
+        pt = unpad(cipher.decrypt(ct), AES.block_size)
+        return pt.decode()
+    except Exception as e:
+        print("Incorrect decryption {}".format(e))
+        return None
+
+
+class FuXiaoShunRecommend(object):
+    """
+    福小顺推荐爬虫
+    需要逆序, 逆向结果: AES加密,password=iv='xlc2ze7qnqg8xi1d'.encode()
+    """
+
+    def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.user_list = user_list
+        self.env = env
+        self.download_cnt = 0
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        self.expire_flag = False
+        self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
+
+    async def process_video_obj(self, video_obj):
+        """
+        处理每一个视频内容
+        :return: None
+        """
+        trace_id = self.platform + str(uuid.uuid1())
+        our_user = random.choice(self.user_list)
+        publish_time_stamp = datetime.datetime.strptime(
+            video_obj["create_at"], "%Y-%m-%d %H:%M:%S"
+        ).timestamp()
+        item = VideoItem()
+        item.add_video_info("user_id", our_user["uid"])
+        item.add_video_info("user_name", our_user["nick_name"])
+        item.add_video_info("video_id", video_obj["id"])
+        item.add_video_info("video_title", video_obj["name"])
+        item.add_video_info("publish_time_str", video_obj["create_at"])
+        item.add_video_info("publish_time_stamp", int(publish_time_stamp))
+        item.add_video_info("video_url", video_obj["cover"])
+        item.add_video_info(
+            "cover_url", video_obj["cover"] + "&vframe/png/offset/1/w/200"
+        )
+        item.add_video_info("like_cnt", video_obj["num_like"])
+        item.add_video_info("play_cnt", video_obj["num_read"])
+        item.add_video_info("comment_cnt", video_obj["num_comment"])
+        item.add_video_info("out_video_id", video_obj["id"])
+        item.add_video_info("platform", self.platform)
+        item.add_video_info("strategy", self.mode)
+        item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
+        mq_obj = item.produce_item()
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=mq_obj,
+            trace_id=trace_id,
+        )
+        if pipeline.process_item():
+            self.download_cnt += 1
+            self.mq.send_msg(mq_obj)
+            self.aliyun_log.logging(
+                code="1002",
+                message="成功发送至 ETL",
+                data=mq_obj,
+            )
+            if self.download_cnt >= int(
+                    self.rule_dict.get("videos_cnt", {}).get("min", 200)
+            ):
+                self.expire_flag = True
+
+    async def get_recommend_list(self, session, page_index):
+        """
+        获取推荐页面的video_list
+        :param session: aiohttp 的session
+        :param page_index: 页码
+        :return: None
+        """
+        headers = {
+            "Host": "shun.nnjuxing.cn",
+            "xweb_xhr": "1",
+            "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.6(0x13080610) XWEB/1156",
+            "content-type": "application/json",
+            "accept": "*/*",
+            "sec-fetch-site": "cross-site",
+            "sec-fetch-mode": "cors",
+            "sec-fetch-dest": "empty",
+            "referer": "https://servicewechat.com/wx5b89401c90c9f367/3/page-frame.html",
+            "accept-language": "en-US,en;q=0.9"
+        }
+        url = "https://shun.nnjuxing.cn/videos/api.videos/getItem"
+        params = {
+            "mark": "",
+            "page": page_index
+        }
+        async with session.get(url, headers=headers, params=params, proxy=tunnel_proxies()) as response:
+            cryp_data = await response.json()
+            data = json.loads(fxs_decrypt(cryp_data['data']))
+            for index, video_obj in enumerate(data['list'], 1):
+                try:
+                    self.aliyun_log.logging(
+                        code="1001",
+                        message="扫描到一条视频",
+                        data=video_obj,
+                    )
+                    await self.process_video_obj(video_obj)
+                except Exception as e:
+                    self.aliyun_log.logging(
+                        code="3000",
+                        message="抓取第{}条的时候出现问题, 报错信息是{}".format(index, e),
+                    )
+
+    async def run(self):
+        """
+        执行代码
+        :return: None
+        """
+        async with aiohttp.ClientSession() as session:
+            for page in range(1, 100):
+                if self.expire_flag:
+                    self.aliyun_log.logging(
+                        code="2000",
+                        message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt)
+                    )
+                    return
+                else:
+                    try:
+                        await self.get_recommend_list(session, page_index=page)
+                    except Exception as e:
+                        self.aliyun_log.logging(
+                            code="3000",
+                            message="抓取第{}页时候出现错误, 报错信息是{}".format(page, e),
+                        )
+
+
+if __name__ == '__main__':
+    FXS = FuXiaoShunRecommend(
+        platform="fuxiaoshun",
+        mode="recommend",
+        rule_dict={},
+        user_list=[{"uid": "12345678", "nick_name": "luojunhui"}]
+    )
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(FXS.run())