Kaynağa Gözat

佳节祝福喜事多多上线
修改 pipelineq去重 sql

罗俊辉 1 yıl önce
ebeveyn
işleme
aae8efbd29

+ 2 - 1
application/config/topic_group_queue.py

@@ -10,7 +10,8 @@ class TopicGroup(object):
             ('zjsjmn', 'recommend', 'zhujinshanjinmeinew'),
             ('hhxxzfdn', 'recommend', 'huanhuanxixizhufudaonew'),
             ('bqzf', 'recommend', 'boqingzhufu'),
-            ('syzf', 'recommend', 'sharkzhufu')
+            ('syzf', 'recommend', 'sharkzhufu'),
+            ("xsdd", 'recommend', 'xishiduoduo')
         ]
 
     def produce(self):

+ 2 - 1
application/pipeline/pipeline.py

@@ -125,8 +125,9 @@ class PiaoQuanPipeline:
         """
         # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
         out_id = self.item["out_video_id"]
-        sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
+        sql = f""" select 1 from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
         repeat_video = self.mysql.select(sql=sql)
+        # print(repeat_video)
         if repeat_video:
             self.aliyun_log.logging(
                 code="2002",

+ 2 - 1
spider/crawler_online/__init__.py

@@ -4,4 +4,5 @@ from .zhujinshanjinmei_2 import ZhuJinShanJinMeiRecommend
 from .huanhuanxixizhufudao_2 import HHXXZFDRecommend
 from .fuxiaoshun import FuXiaoShunRecommend
 from .boqingzhufu import BoQingZhuFu
-from .shayuzhufu import SharkZhuFuRecommend
+from .shayuzhufu import SharkZhuFuRecommend
+from .jiajiezhufuxishiduoduo import XiShiDuoDuoRecommend

+ 183 - 1
spider/crawler_online/jiajiezhufuxishiduoduo.py

@@ -1,5 +1,187 @@
 """
 佳节祝福喜事多多——推荐爬虫
-@author: LuoJunhui
+@author: 罗俊辉
 """
+import json
+import os
+import sys
+import time
+import uuid
+import random
+import asyncio
+import aiohttp
+import datetime
 
+sys.path.append(os.getcwd())
+
+from application.items import VideoItem
+from application.pipeline import PiaoQuanPipeline
+from application.common.messageQueue import MQ
+from application.common.proxies import tunnel_proxies
+from application.common.log import AliyunLogger
+
+
+class XiShiDuoDuoRecommend(object):
+    """
+    佳节祝福喜乐多多,推荐接口,无需逆向
+    """
+
+    def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.user_list = user_list
+        self.env = env
+        self.download_cnt = 0
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        self.expire_flag = False
+        self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
+
+    async def process_video_obj(self, video_obj):
+        """
+        处理每一个视频内容
+        :return: None
+        """
+        trace_id = self.platform + str(uuid.uuid1())
+        our_user = random.choice(self.user_list)
+        # print(video_obj)
+        publish_time_stamp = datetime.datetime.strptime(
+            video_obj["createtime"], "%Y-%m-%d"
+        ).timestamp()
+        item = VideoItem()
+        item.add_video_info("user_id", our_user["uid"])
+        item.add_video_info("user_name", our_user["nick_name"])
+        item.add_video_info("video_id", video_obj["id"])
+        item.add_video_info("video_title", video_obj["title"])
+        item.add_video_info("publish_time_str", video_obj["createtime"])
+        item.add_video_info("publish_time_stamp", int(publish_time_stamp))
+        item.add_video_info("video_url", video_obj["url"])
+        item.add_video_info("cover_url", video_obj["thumb"])
+        # item.add_video_info("like_cnt", video_obj["num_like"])
+        item.add_video_info("play_cnt", video_obj["browse"])
+        # item.add_video_info("comment_cnt", video_obj["num_comment"])
+        item.add_video_info("platform", self.platform)
+        item.add_video_info("strategy", self.mode)
+        item.add_video_info("out_video_id", video_obj["id"])
+        item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
+        mq_obj = item.produce_item()
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=mq_obj,
+            trace_id=trace_id,
+        )
+        if pipeline.process_item():
+            self.download_cnt += 1
+            # print(json.dumps(mq_obj, ensure_ascii=False, indent=4))
+            self.mq.send_msg(mq_obj)
+            self.aliyun_log.logging(
+                code="1002",
+                message="成功发送至 ETL",
+                data=mq_obj,
+            )
+            if self.download_cnt >= int(
+                    self.rule_dict.get("videos_cnt", {}).get("min", 200)
+            ):
+                self.expire_flag = True
+
+    async def get_recommend_list(self, session, page_index):
+        """
+        获取推荐页面的video_list
+        :param session: aiohttp 的session
+        :param page_index: 页码
+        :return: None
+        """
+        if self.expire_flag:
+            self.aliyun_log.logging(
+                code="2000",
+                message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),
+            )
+            return
+        headers = {
+            'Host': 'www.angjukk.cn',
+            'xweb_xhr': '1',
+            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.6(0x13080610) XWEB/1156',
+            'content-type': 'application/json',  # 修改为 application/json
+            'accept': '*/*',
+            'sec-fetch-site': 'cross-site',
+            'sec-fetch-mode': 'cors',
+            'sec-fetch-dest': 'empty',
+            'referer': 'https://servicewechat.com/wx3e999feb402aa372/4/page-frame.html',
+            'accept-language': 'en-US,en;q=0.9',
+        }
+        data = {
+            'time': int(time.time()) * 1000,
+            'str_data': 'TLyWgSmb',
+            'page': page_index,
+            'limit': 10,
+            'appid': 'wx3e999feb402aa372',
+            'version': '1.4.2',
+        }
+        url = "https://www.angjukk.cn/index/home/get_home_list.html"
+        await asyncio.sleep(5)
+        async with session.post(
+                url, headers=headers, json=data, proxy=tunnel_proxies()['https']
+        ) as response:
+            video_list = await response.json()
+            # print(video_list)
+            video_list = video_list['data']['video_list']['data']
+            # print(json.dumps(video_list))
+            for index, video_obj in enumerate(video_list, 1):
+                try:
+                    self.aliyun_log.logging(
+                        code="1001",
+                        message="扫描到一条视频",
+                        data=video_obj,
+                    )
+                    await self.process_video_obj(video_obj)
+                except Exception as e:
+                    self.aliyun_log.logging(
+                        code="3000",
+                        message="抓取第{}条的时候出现问题, 报错信息是{}".format(index, e),
+                    )
+
+    async def run(self):
+        """
+        执行代码
+        :return: None
+        """
+        async with aiohttp.ClientSession() as session:
+            for page in range(1, 90):
+                if self.expire_flag:
+                    self.aliyun_log.logging(
+                        code="2000",
+                        message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),
+                    )
+                    message = "本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt)
+                    # print(message)
+                    return
+                else:
+                    try:
+                        await self.get_recommend_list(session, page_index=page)
+                    except Exception as e:
+                        self.aliyun_log.logging(
+                            code="3000",
+                            message="抓取第{}页时候出现错误, 报错信息是{}".format(page, e),
+                        )
+
+
+# if __name__ == '__main__':
+#     async def test():
+#         W = XiLeDuoDuoRecommend(
+#             platform="喜事多多",
+#             mode="recommend",
+#             rule_dict={},
+#             user_list=[{
+#                 "uid": "test",
+#                 "nick_name": "luojunhuishuaige"
+#             }]
+#         )
+#         async with aiohttp.ClientSession() as session:
+#             await W.get_recommend_list(session=session, page_index=1)
+#
+#
+#     loop = asyncio.get_event_loop()
+#     loop.run_until_complete(test())

+ 4 - 0
spider/spider_map.py

@@ -34,5 +34,9 @@ spider_map = {
     # 鲸鱼祝福
     "sharkzhufu": {
         "recommend": SharkZhuFuRecommend
+    },
+    # 佳节祝福喜事多多
+    "xishiduoduo": {
+        "recommend": XiShiDuoDuoRecommend
     }
 }