Преглед изворни кода

xng账号监测,pipeline 增加了 account 参数

罗俊辉 пре 1 година
родитељ
комит
9720e82337

+ 2 - 1
application/functions/__init__.py

@@ -1,2 +1,3 @@
 from .get_redirect_url import get_redirect_url
-from .clean_title import clean_title
+from .clean_title import clean_title
+from .read_mysql_config import get_config_from_mysql

+ 14 - 6
application/functions/read_mysql_config.py

@@ -1,16 +1,24 @@
+import json
+
 from application.common.mysql import MysqlHelper
 
 
-def get_config_from_mysql(log_type, source, env, text, action=""):
-    select_sql = f"""select * from crawler_config where source="{source}" """
-    contents = MysqlHelper.get_values(log_type, source, select_sql, env, action=action)
+def get_config_from_mysql(log_type, source, text):
+    """
+    :param log_type: mode
+    :param source: platform
+    :param text:
+    :return:
+    """
+    select_sql = f"""select config from crawler_config where source="{source}" """
+    MySQL = MysqlHelper(mode=log_type, platform=select_sql)
+    configs = MySQL.select(select_sql)
     title_list = []
     filter_list = []
     emoji_list = []
     search_word_list = []
-    for content in contents:
-        config = content["config"]
-        config_dict = eval(config)
+    for config in configs:
+        config_dict = json.loads(config[0])
         for k, v in config_dict.items():
             if k == "title":
                 title_list_config = v.split(",")

+ 6 - 2
application/pipeline/pipeline.py

@@ -8,12 +8,12 @@ sys.path.append(os.getcwd())
 from application.common import MysqlHelper, AliyunLogger
 
 
-class PiaoQuanPipeline:
+class PiaoQuanPipeline(object):
     """
     爬虫管道——爬虫规则判断
     """
 
-    def __init__(self, platform, mode, rule_dict, env, item, trace_id):
+    def __init__(self, platform, mode, rule_dict, env, item, trace_id, account=None):
         self.platform = platform
         self.mode = mode
         self.item = item
@@ -22,6 +22,7 @@ class PiaoQuanPipeline:
         self.trace_id = trace_id
         self.mysql = MysqlHelper(env=env, mode=mode, platform=platform)
         self.aliyun_log = AliyunLogger(platform=platform, mode=mode, env=env)
+        self.account = account
 
     def publish_time_flag(self):
         """
@@ -79,6 +80,7 @@ class PiaoQuanPipeline:
                 trace_id=self.trace_id,
                 message="标题中包含敏感词",
                 data=self.item,
+                account=self.account
             )
             return False
         return True
@@ -111,6 +113,7 @@ class PiaoQuanPipeline:
                                 max_value,
                                 flag,
                             ),
+                            account=self.account
                         )
                         return flag
             else:
@@ -134,6 +137,7 @@ class PiaoQuanPipeline:
                 trace_id=self.trace_id,
                 message="重复的视频",
                 data=self.item,
+                account=self.account
             )
             return False
         return True

+ 261 - 42
spider/crawler_author/xiaoniangao.py

@@ -10,85 +10,304 @@ import uuid
 import random
 import asyncio
 import aiohttp
-import datetime
 
 sys.path.append(os.getcwd())
 
-from application.items import VideoItem
 from application.pipeline import PiaoQuanPipeline
 from application.common.messageQueue import MQ
-from application.common.log import AliyunLogger
+from application.common import AliyunLogger, MysqlHelper
+from application.functions import get_config_from_mysql, clean_title
 
 
-class XiaoNianGaoAuthor(object):
+class ImportantXiaoNianGaoAuthor(object):
     """
     小年糕账号爬虫
     """
 
-    def __init__(self, platform, mode, rule_dict, user_list, env="prod"):
+    def __init__(self, platform, mode, rule_dict, env="prod"):
+        self.download_count = 0
         self.platform = platform
         self.mode = mode
         self.rule_dict = rule_dict
-        self.user_list = user_list
         self.env = env
         self.download_cnt = 0
         self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
-        self.expire_flag = False
         self.aliyun_log = AliyunLogger(platform=self.platform, mode=self.mode)
+        self.important_accounts = {
+            "154002316": "58527274",
+            "1163011474": "58527278",
+            "1122750580": "58527291",
+            "37660529": "58527302",
+            "156490323": "58527304",
+            "262696881": "58527313",
+            "1160417236": "58527318",
+            "307419007": "58527399",
+            "1162974507": "58527564",
+            "194287386": "58527570",
+            "1163003217": "58527580",
+            "1162991035": "58527582",
+            "50262268": "58527612",
+            "209764266": "58527649",
+            "26055443": "58527781",
+            "1162977406": "58528076",
+            "605290310": "58528077",
+            "1160417201": "58528085",
+            "32290307": "58528104",
+            "1160417318": "58528114",
+            "306386778": "58528122",
+            "1161593386": "58528130",
+            "1161593368": "58528245",
+            "260159327": "58528249",
+            "801020924": "58528269",
+            "287637208": "58528273",
+            "555866418": "58528298",
+            "303943127": "59758578",
+            "1162953017": "60450745",
+            "1163013756": "63618095",
+            "1162982920": "63642197",
+            "15324740": "65487736",
+            "170182913": "66807289",
+            "1160417241": "66807294",
+            "1220202407": "66807300",
+            "20680": "66807304",
+            "294317767": "66807306",
+            "1162980250": "58527284",
+            "1163008965": "58527307",
+            "230841899": "58527626",
+            "1162998153": "58527790",
+            "1162954764": "58528095",
+            "1160417133": "58528263",
+            "1163005063": "58528268",
+            "1161593366": "58528275",
+            "1162958849": "58528281",
+            "1161593379": "58528286",
+            "1161593373": "58528334",
+            "1163006779": "60450865",
+            "311848591": "63642204",
+        }
 
-    def split_accounts(self):
+    def read_important_accounts(self):
         """
         操作 user_list,把重要账号挑选出来
-        :return:
+        :return: [ Int ]
+        """
+        return self.important_accounts.keys()
+
+    async def scan_important_accounts(self, accounts):
         """
-        return self.user_list
+        批量扫描重要账号
+        :param accounts:重要账号
+        """
+        tasks = [self.get_user_videos(account) for account in accounts]
+        await asyncio.gather(*tasks)
 
-    async def get_user_videos(self, user_dict):
+    async def get_user_videos(self, link):
         """
-        小年糕执行代码
+        小年糕执行代码, 跳出条件为扫描到三天之前的视频,否则继续抓取
+        :param link: 外部账号 id
         """
         url = "https://kapi-xng-app.xiaoniangao.cn/v1/album/user_public"
         headers = {
-            'Host': 'kapi-xng-app.xiaoniangao.cn',
-            'content-type': 'application/json; charset=utf-8',
-            'accept': '*/*',
-            'authorization': 'hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=',
-            'verb': 'POST',
-            'content-md5': 'c7b7f8663984e8800e3bcd9b44465083',
-            'x-b3-traceid': '2f9da41f960ae077',
-            'accept-language': 'zh-cn',
-            'date': 'Mon, 19 Jun 2023 06:41:17 GMT',
-            'x-token-id': '',
-            'x-signaturemethod': 'hmac-sha1',
-            'user-agent': 'xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0'
+            "Host": "kapi-xng-app.xiaoniangao.cn",
+            "content-type": "application/json; charset=utf-8",
+            "accept": "*/*",
+            "authorization": "hSNQ2s9pvPxvFn4LaQJxKQ6/7Is=",
+            "verb": "POST",
+            "content-md5": "c7b7f8663984e8800e3bcd9b44465083",
+            "x-b3-traceid": "2f9da41f960ae077",
+            "accept-language": "zh-cn",
+            "date": "Mon, 19 Jun 2023 06:41:17 GMT",
+            "x-token-id": "",
+            "x-signaturemethod": "hmac-sha1",
+            "user-agent": "xngapp/157 CFNetwork/1335.0.3.1 Darwin/21.6.0",
         }
         async with aiohttp.ClientSession() as session:
             next_index = -1
-            # 只抓取更新的视频,如果刷到已经更新的立即退出
-            while True:
-                payload = {
-                    "token": "",
-                    "limit": 20,
-                    "start_t": next_index,
-                    "visited_mid": int(user_dict["link"]),
-                    "share_width": 300,
-                    "share_height": 240,
-                }
-                async with session.post(url, headers=headers, data=json.dumps(payload)) as response:
-                    data = await response.json()
-                print(data)
+            payload = {
+                "token": "",
+                "limit": 20,
+                "start_t": next_index,
+                "visited_mid": int(link),
+                "share_width": 300,
+                "share_height": 240,
+            }
+            async with session.post(url, headers=headers, json=payload) as response:
+                data = await response.json()
+                # data_list = data["data"]["list"]
+                # if data_list:
+                #     await self.process_video_pages(data_list, link)
+                try:
+                    data_list = data["data"]["list"]
+                    if data_list:
+                        await self.process_video_pages(data_list, link)
+                except Exception as e:
+                    self.aliyun_log.logging(
+                        code=3000,
+                        message="在抓取账号out_side:{}\t inside:{} 时报错, 报错原因是{}".format(link,
+                                                                                                self.important_accounts[
+                                                                                                    link], e),
+                        account=self.important_accounts[link]
+                    )
 
-    async def scan_important_accounts(self, accounts):
+    async def process_video_pages(self, video_list, link):
         """
-        批量扫描重要账号
-        :param accounts:重要账号
+        处理抓取到的某一页的视频
+        :param link: 外站 id link
+        :param video_list:
         """
-        tasks = [self.get_user_videos(account) for account in accounts]
+        tasks = [self.process_video(video, link) for video in video_list]
         await asyncio.gather(*tasks)
 
+    async def process_video(self, video, link):
+        """
+        :param link: 外站 id
+        :param video: 处理视频信息
+        """
+        trace_id = self.platform + str(uuid.uuid1())
+        # 标题,表情随机加在片头、片尾,或替代句子中间的标点符号
+        title = clean_title(video.get("title", ""))
+        # 随机取一个表情/符号
+        emoji = random.choice(
+            get_config_from_mysql(self.mode, self.platform, "emoji")
+        )
+        # 生成最终标题,标题list[表情+title, title+表情]随机取一个
+        video_title = random.choice([f"{emoji}{title}", f"{title}{emoji}"])
+        # 发布时间
+        publish_time_stamp = int(int(video.get("t", 0)) / 1000)
+        publish_time_str = time.strftime(
+            "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
+        )
+        # 用户名 / 头像
+        user_name = (
+            video.get("user", {})
+            .get("nick", "")
+            .strip()
+            .replace("\n", "")
+            .replace("/", "")
+            .replace(" ", "")
+            .replace(" ", "")
+            .replace("&NBSP", "")
+            .replace("\r", "")
+        )
+        video_dict = {
+            "video_title": video_title,
+            "video_id": video.get("vid", ""),
+            "duration": int(video.get("du", 0) / 1000),
+            "play_cnt": video.get("play_pv", 0),
+            "like_cnt": video.get("favor", {}).get("total", 0),
+            "comment_cnt": video.get("comment_count", 0),
+            "share_cnt": video.get("share", 0),
+            "user_name": user_name,
+            "publish_time_stamp": publish_time_stamp,
+            "publish_time_str": publish_time_str,
+            "update_time_stamp": int(time.time()),
+            "video_width": int(video.get("w", 0)),
+            "video_height": int(video.get("h", 0)),
+            "avatar_url": video.get("user", {}).get("hurl", ""),
+            "profile_id": video["id"],
+            "profile_mid": video.get("user", {}).get("mid", ""),
+            "cover_url": video.get("url", ""),
+            "video_url": video.get("v_url", ""),
+            "session": f"xiaoniangao-author-{int(time.time())}",
+            "out_user_id": video["id"],
+            "platform": self.platform,
+            "strategy": self.mode,
+            "out_video_id": video.get("vid", ""),
+        }
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=video_dict,
+            trace_id=trace_id,
+            account=self.important_accounts[link]
+        )
+        if int(time.time()) - publish_time_stamp > 3600 * 24 * int(
+                self.rule_dict.get("period", {}).get("max", 1000)
+        ):
+            self.aliyun_log.logging(
+                code="2004",
+                trace_id=trace_id,
+                data=video_dict,
+                message="发布时间超过{}天".format(
+                    int(self.rule_dict.get("period", {}).get("max", 1000))
+                ),
+                account=self.important_accounts[link],
+            )
+            return
+        flag = pipeline.process_item()
+        if flag:
+            video_dict["width"] = video_dict["video_width"]
+            video_dict["height"] = video_dict["video_height"]
+            video_dict["crawler_rule"] = json.dumps(self.rule_dict)
+            video_dict["user_id"] = self.important_accounts[link]
+            video_dict["publish_time"] = video_dict["publish_time_str"]
+            self.mq.send_msg(video_dict)
+            self.download_count += 1
+            self.aliyun_log.logging(
+                code="1002",
+                data=video_dict,
+                trace_id=trace_id,
+                message="成功发送 MQ 至 ETL",
+                account=self.important_accounts[link],
+            )
+
     async def run(self):
         """
         控制函数代码
         :return:
         """
-        self.split_acoounts()
+        user_list = self.read_important_accounts()
+        await self.scan_important_accounts(user_list)
+
+
+def get_task_rule():
+    """
+    :return: 返回任务的规则, task_rule
+    """
+
+    rule_dict = {}
+    task_rule_sql = f"SELECT rule FROM crawler_task_v3 WHERE id = 21;"
+    MySQL = MysqlHelper(mode="author", platform="xiaoniangao")
+    data = MySQL.select(task_rule_sql)
+    if data:
+        rule_list = json.loads(data[0][0])
+        for item in rule_list:
+            for key in item:
+                rule_dict[key] = item[key]
+    return rule_dict
+
+
+async def run_spider(rule_dict):
+    """
+    执行爬虫
+    :param rule_dict:
+    """
+    Ixng = ImportantXiaoNianGaoAuthor(
+        platform="xiaoniangao",
+        mode="author",
+        rule_dict=rule_dict
+    )
+    await Ixng.run()
+
+
+async def periodic_task():
+    """
+    定时执行异步任务
+    """
+    while True:
+        rule = get_task_rule()
+        await run_spider(rule_dict=rule)  # 直接在当前事件循环中运行异步任务
+        await asyncio.sleep(30 * 60)  # 等待30分钟
+
+
+async def main():
+    """
+    main 函数
+    """
+    await periodic_task()
+
+
+if __name__ == "__main__":
+    asyncio.run(main())