소스 검색

福小顺修改异步处理

罗俊辉 1 년 전
부모
커밋
e2b5e1654c
4개의 변경된 파일120개의 추가작업 그리고 32개의 파일을 삭제
  1. 0 0
      application/items/item_dev.py
  2. 41 30
      spider/crawler_online/fuxiaoshun.py
  3. 2 2
      spider/crawler_online/gongzhonghao_author.py
  4. 77 0
      spider/crawler_online/xiaoniangao.py

+ 0 - 0
application/items/item_dev.py


+ 41 - 30
spider/crawler_online/fuxiaoshun.py

@@ -31,10 +31,10 @@ def fxs_decrypt(ciphertext):
     :param ciphertext: 秘文
     :return: 原文
     """
-    password = 'xlc2ze7qnqg8xi1d'.encode()
+    password = "xlc2ze7qnqg8xi1d".encode()
     iv = password
     try:
-        ct = b64decode(ciphertext.encode('utf-8'))
+        ct = b64decode(ciphertext.encode("utf-8"))
         cipher = AES.new(password, AES.MODE_CBC, iv)
         pt = unpad(cipher.decrypt(ct), AES.block_size)
         return pt.decode()
@@ -107,7 +107,7 @@ class FuXiaoShunRecommend(object):
                 data=mq_obj,
             )
             if self.download_cnt >= int(
-                    self.rule_dict.get("videos_cnt", {}).get("min", 200)
+                self.rule_dict.get("videos_cnt", {}).get("min", 200)
             ):
                 self.expire_flag = True
 
@@ -118,6 +118,12 @@ class FuXiaoShunRecommend(object):
         :param page_index: 页码
         :return: None
         """
+        if self.expire_flag:
+            self.aliyun_log.logging(
+                code="2000",
+                message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),
+            )
+            return
         headers = {
             "Host": "shun.nnjuxing.cn",
             "xweb_xhr": "1",
@@ -128,17 +134,16 @@ class FuXiaoShunRecommend(object):
             "sec-fetch-mode": "cors",
             "sec-fetch-dest": "empty",
             "referer": "https://servicewechat.com/wx5b89401c90c9f367/3/page-frame.html",
-            "accept-language": "en-US,en;q=0.9"
+            "accept-language": "en-US,en;q=0.9",
         }
         url = "https://shun.nnjuxing.cn/videos/api.videos/getItem"
-        params = {
-            "mark": "",
-            "page": page_index
-        }
-        async with session.get(url, headers=headers, params=params, proxy=tunnel_proxies()['https']) as response:
+        params = {"mark": "", "page": page_index}
+        async with session.get(
+            url, headers=headers, params=params, proxy=tunnel_proxies()["https"]
+        ) as response:
             cryp_data = await response.json()
-            data = json.loads(fxs_decrypt(cryp_data['data']))
-            for index, video_obj in enumerate(data['list'], 1):
+            data = json.loads(fxs_decrypt(cryp_data["data"]))
+            for index, video_obj in enumerate(data["list"], 1):
                 try:
                     self.aliyun_log.logging(
                         code="1001",
@@ -158,22 +163,28 @@ class FuXiaoShunRecommend(object):
         :return: None
         """
         async with aiohttp.ClientSession() as session:
-            for page in range(1, 100):
-                if self.expire_flag:
-                    self.aliyun_log.logging(
-                        code="2000",
-                        message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt)
-                    )
-                    message = "本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt)
-                    print(message)
-                    return
-                else:
-                    try:
-                        await self.get_recommend_list(session, page_index=page)
-                    except Exception as e:
-                        self.aliyun_log.logging(
-                            code="3000",
-                            message="抓取第{}页时候出现错误, 报错信息是{}".format(page, e),
-                        )
-
-
+            tasks = [self.get_recommend_list(session, index) for index in range(1, 100)]
+            await asyncio.gather(*tasks)
+            done, pending = await asyncio.wait(
+                tasks, return_when=asyncio.FIRST_COMPLETED
+            )
+            # 取消所有剩余的任务
+            for task in pending:
+                task.cancel()
+            # for page in range(1, 100):
+            #     if self.expire_flag:
+            #         self.aliyun_log.logging(
+            #             code="2000",
+            #             message="本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt),
+            #         )
+            #         message = "本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt)
+            #         print(message)
+            #         return
+            #     else:
+            #         try:
+            #             await self.get_recommend_list(session, page_index=page)
+            #         except Exception as e:
+            #             self.aliyun_log.logging(
+            #                 code="3000",
+            #                 message="抓取第{}页时候出现错误, 报错信息是{}".format(page, e),
+            #             )

+ 2 - 2
spider/crawler_online/gongzhonghao_author.py

@@ -5,9 +5,9 @@
 """
 
 
-class GongZhongHaoAuthor(object):
+class OfficialAccountAuthor(object):
     """
-    公众号账号爬虫
+    公众号账号爬虫
     """
 
     def __init__(self, platform, mode, user_list, rule_dict, env="prod"):

+ 77 - 0
spider/crawler_online/xiaoniangao.py

@@ -0,0 +1,77 @@
+"""
+@Author  : luojunhui
+小年糕账号爬虫
+"""
+import os
+import sys
+import json
+import time
+import uuid
+import random
+import asyncio
+import aiohttp
+import datetime
+
+sys.path.append(os.getcwd())
+
+from application.items import VideoItem
+from application.pipeline import PiaoQuanPipeline
+from application.common.messageQueue import MQ
+from application.common.log import AliyunLogger
+
+
+class XiaoNianGaoAuthor(object):
+    """
+    小年糕账号爬虫
+    """
+
+    def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.user_list = user_list
+        self.env = env
+        self.download_cnt = 0
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        self.expire_flag = False
+        self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
+
+    async def get_user_videos(self, user_dict):
+        """
+        小年糕执行代码
+        """
+        url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public"
+        headers = {
+            'Host': 'kapi-xng-app.xiaoniangao.cn',
+            'content-type': 'application/json; charset=utf-8',
+            'accept': '*/*',
+            'authorization': 'hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=',
+            'verb': 'POST',
+            'content-md5': 'c7b7f8663984e8800e3bcd9b44465083',
+            'x-b3-traceid': '2f9da41f960ae077',
+            'accept-language': 'zh-cn',
+            'date': 'Mon, 19 Jun 2023 06:41:17 GMT',
+            'x-token-id': '',
+            'x-signaturemethod': 'hmac-sha1',
+            'user-agent': 'xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0'
+        }
+        async with aiohttp.ClientSession() as session:
+            next_index = -1
+            # 只抓取更新的视频,如果刷到已经更新的立即退出
+            while True:
+                payload = {
+                    "token": "",
+                    "limit": 20,
+                    "start_t": next_index,
+                    "visited_mid": int(user_dict["link"]),
+                    "share_width": 300,
+                    "share_height": 240,
+                }
+                async with session.post(
+                    url,
+                    headers=headers,
+                    data=json.dumps(payload)
+                ) as response:
+                    data = await response.json()
+
+