Browse Source

Merge remote-tracking branch 'origin/master'

zhangyong 1 year ago
parent
commit
d366f0266f

+ 0 - 27
common/pipeline.py

@@ -79,19 +79,6 @@ class PiaoQuanPipeline:
 
     # 视频基础下载规则
     def download_rule_flag(self):
-        # 格式化 video_dict:publish_time_stamp
-        # if self.item.get("publish_time_stamp"):
-        #     self.item["publish_time"] = self.item["publish_time_stamp"] * 1000
-        # # 格式化 video_dict:period
-        # if (
-        #     self.item.get("publish_time")
-        #     and self.item.get("period", "noperiod") == "noperiod"
-        # ):
-        #     self.item["period"] = int(
-        #         (int(time.time() * 1000) - self.item["publish_time"])
-        #         / (3600 * 24 * 1000)
-        #     )
-        # 格式化 rule_dict 最大值取值为 0 的问题
         for key in self.item:
             if self.rule_dict.get(key):
                 max_value = (
@@ -101,20 +88,6 @@ class PiaoQuanPipeline:
                 )
                 if key == "peroid": # peroid是抓取周期天数
                     continue
-                    # flag = 0 <= int(self.item[key]) <= max_value
-                    # if not flag:
-                    #     AliyunLogger.logging(
-                    #         code="2004",
-                    #         trace_id=self.trace_id,
-                    #         platform=self.platform,
-                    #         mode=self.mode,
-                    #         env=self.env,
-                    #         data=self.item,
-                    #         message="{}: 0 <= {} <= {}, {}".format(
-                    #             key, self.item[key], max_value, flag
-                    #         ),
-                    #     )
-                    #     return flag
                 else:
                     flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
                     if not flag:

+ 6 - 1
main/process_mq.sh

@@ -41,7 +41,12 @@ elif [ ${crawler} = "sph" ] && [ ${log_type} = "search" ];then
   profile_path=/etc/profile
   python=python3
   log_path=${piaoquan_crawler_dir}main/main_logs/process-mq-$(date +%Y-%m-%d).log
-elif [ ${crawler} = "kyk" ] || [ ${crawler} = "sph" ];then
+elif [ ${crawler} = "kyk" ];then
+  piaoquan_crawler_dir=/Users/lieyunye/Desktop/crawler/piaoquan_crawler/
+  profile_path=./base_profile
+  python=python3
+  log_path=${piaoquan_crawler_dir}main/main_logs/process-mq-$(date +%Y-%m-%d).log
+elif [ ${crawler} = "sph" ] && [ ${log_type} = "recommend" ];then
   piaoquan_crawler_dir=/Users/lieyunye/Desktop/crawler/piaoquan_crawler/
   profile_path=./base_profile
   python=python3

+ 1 - 0
shipinhao/shipinhao_author/__init__.py

@@ -0,0 +1 @@
+from .shipinhao_scheduling import ShiPinHaoAccount

+ 243 - 0
shipinhao/shipinhao_author/shipinhao_author_test.py

@@ -0,0 +1,243 @@
+import json
+import os
+import sys
+import time
+import uuid
+
+import requests
+
+sys.path.append(os.getcwd())
+from common.pipeline import PiaoQuanPipelineTest
+from common.db import MysqlHelper
+
+
+def find_target_user(name, user_list):
+    for obj in user_list:
+        if obj["nickname"] == name:
+            return obj
+        else:
+            continue
+    return False
+
+
+class ShiPinHaoAccount:
+    def __init__(self, platform, mode, rule_dict, user_dict, env):
+        self.cookie = None
+        self.token = None
+        self.account_name = user_dict["link"]
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.user_dict = user_dict
+        self.env = env
+        self.download_cnt = 0
+        self.token_count = 0
+
+    def get_token_from_mysql(self):
+        select_sql = f"""SELECT config from crawler_config where source = '{ self.platform }'; """
+        # print(select_sql)
+        configs = MysqlHelper.get_values(
+            log_type=self.mode,
+            crawler=self.platform,
+            sql=select_sql,
+            env=self.env,
+            machine="",
+        )
+        token_config = configs[0][0]
+        token_info = json.loads(token_config)
+        self.token = token_info["token"]
+        self.cookie = token_info["cookie"]
+
+    def get_history_id(self):
+        """
+        从数据库表中读取 id
+        """
+        select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
+        name_id = MysqlHelper.get_values(
+            log_type=self.mode,
+            crawler=self.platform,
+            sql=select_user_sql,
+            env=self.env,
+            machine="",
+        )
+        if name_id:
+            return name_id[0]
+        else:
+            return False
+
+    def get_account_id(self):
+        # 读历史数据,如果存在 id,则直接返回 id
+        history_id = self.get_history_id()
+        if history_id:
+            return history_id
+        else:
+            url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
+            params = {
+                "action": "search",
+                "scene": "1",
+                "buffer": "",
+                "query": self.account_name,
+                "count": "21",
+                "token": self.token,
+                "lang": "zh_CN",
+                "f": "json",
+                "ajax": "1",
+            }
+            headers = {
+                "authority": "mp.weixin.qq.com",
+                "accept": "*/*",
+                "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
+                "cookie": self.cookie,
+                "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
+                    self.token
+                ),
+                "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
+                "x-requested-with": "XMLHttpRequest",
+            }
+            response = requests.request("GET", url, headers=headers, params=params)
+            self.token_count += 1
+            user_list = response.json()["acct_list"]
+            target_user = find_target_user(name=self.account_name, user_list=user_list)
+            # 写入 MySql 数据库
+            if target_user:
+                update_sql = f"""INSERT INTO accounts (name, name_id, platform) values ("{self.account_name}", "{target_user['username']}", "{self.platform}")"""
+                # print(update_sql)
+                MysqlHelper.update_values(
+                    log_type=self.mode,
+                    crawler=self.platform,
+                    sql=update_sql,
+                    env=self.env,
+                    machine="",
+                )
+                return target_user["username"]
+            else:
+                return False
+
+    def get_account_videos(self):
+        # 一个账号最多抓 30 条数据
+        self.get_token_from_mysql()
+        user_id = self.get_account_id()
+        if user_id:
+            url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
+            headers = {
+                "authority": "mp.weixin.qq.com",
+                "accept": "*/*",
+                "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
+                "cookie": self.cookie,
+                "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
+                    self.token
+                ),
+                "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
+                "x-requested-with": "XMLHttpRequest",
+            }
+            buffer = ""  # 翻页指示器
+            while True:
+                if self.download_cnt >= 30:
+                    return
+                params = {
+                    "action": "get_feed_list",
+                    "username": user_id,
+                    "buffer": buffer,
+                    "count": "15",
+                    "scene": "1",
+                    "token": self.token,
+                    "lang": "zh_CN",
+                    "f": "json",
+                    "ajax": "1",
+                }
+                response = requests.request("GET", url, headers=headers, params=params)
+                self.token_count += 1
+                res_json = response.json()
+                # 开始判断视频是否有信息,是否频控
+                if res_json["base_resp"]["err_msg"] == "invalid session":
+                    print(
+                        f"status_code:{response.status_code}, get_videoList:{response.text}\n"
+                    )
+                    time.sleep(60 * 15)
+                    continue
+                if res_json["base_resp"]["err_msg"] == "freq control":
+                    print(
+                        f"status_code:{response.status_code}, get_videoList:{response.text}\n"
+                    )
+                    time.sleep(60 * 15)
+                    continue
+                if not res_json["list"]:
+                    print("没有更多视频了")
+                    return
+                else:
+                    buffer = res_json["last_buff"]
+                    for obj in res_json["list"]:
+                        print("扫描到一条视频", self.token_count)
+                        # repeat_flag = self.process_video_obj(obj)
+                        # if not repeat_flag:
+                        #     return
+                        try:
+                            print("扫描到一条视频")
+                            repeat_flag = self.process_video_obj(obj)
+                            if not repeat_flag:
+                                return
+                        except Exception as e:
+                            print(f"抓取单条视频异常:{e}\n")
+        else:
+            print("{}\t获取 id 失败".format(self.account_name))
+
+    def process_video_obj(self, video_obj):
+        trace_id = self.platform + str(uuid.uuid1())
+        # print(json.dumps(video_obj, ensure_ascii=False, indent=4))
+        video_dict = {
+            "video_id": video_obj["nonce_id"],
+            "out_video_id": video_obj["nonce_id"],
+            "video_title": video_obj["desc"],
+            "publish_time_stamp": int(time.time()),
+            "publish_time_str": time.strftime(
+                "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))
+            ),
+            "play_cnt": 0,
+            "comment_cnt": 0,
+            "like_cnt": 0,
+            "share_cnt": 0,
+            "user_id": self.user_dict["user_id"],
+            "cover_url": video_obj["media"][0]["cover_url"],
+            "video_url": video_obj["media"][0]["url"],
+            "avatar_url": video_obj["head_url"],
+            "width": video_obj["media"][0]["width"],
+            "height": video_obj["media"][0]["height"],
+            "duration": video_obj["media"][0]["video_play_len_s"],
+            "platform": self.platform,
+            "strategy": self.mode,
+            "crawler_rule": self.rule_dict,
+            "session": f"shipinhao-author-{int(time.time())}",
+        }
+        # 无更新时间,去重即可
+        pipeline = PiaoQuanPipelineTest(
+            platform=self.platform,
+            mode=self.mode,
+            item=video_dict,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            trace_id=trace_id,
+        )
+        if not pipeline.repeat_video():
+            return False
+        else:
+            video_dict["out_user_id"] = video_dict["user_id"]
+            video_dict["user_id"] = self.user_dict["uid"]
+            video_dict["publish_time"] = video_dict["publish_time_str"]
+            print(video_dict)
+            print("成功发送 MQ 至 ETL")
+            self.download_cnt += 1
+        return True
+
+
+if __name__ == "__main__":
+    temp_token = "2080949641"
+    temp_cookie = "ua_id=bw4VuFJr6fAuSkwdAAAAAClaW0m9Aua-6IfHaXU_zpo=; wxuin=95302180931488; mm_lang=zh_CN; RK=kreEMgtMMJ; ptcz=8fd1b267c98a1185bbe6455a081f1264048ee388363ca305d9ef4812892c7900; qq_domain_video_guid_verify=2ba78a5010233582; poc_sid=HOinP2Wj322Ex737kV651Zqy6y8fSprOUUvaegBg; _qimei_q36=; _qimei_h38=9eea33ea92afe8a922333fce03000001317916; pgv_pvid=9056371236; _clck=3930572231|1|fgk|0; uuid=6562bbd8859230ce4120dfa063c76997; rand_info=CAESIGAatjSIjvxVJVDxRDN7F/CNFWMifvAVqje98rd++8UY; slave_bizuin=3236647229; data_bizuin=3236647229; bizuin=3236647229; data_ticket=qm3i6jRhObs1yKHttGh0gVI02Mz7FTPfatn0RMLdaWyD7Ukcokm5Dc3mmYLQUZPg; slave_sid=UWxjZnhBREZRRTNKZ3dYZTlYRE9Db2lxQUhOM3lZUlRoMkV0MG1wdVVudGpQTWxnVkxzYW5pV2c3NjB3bnAyQ2lPaXBBVVRPazEybWtKSVEzTnUyazZ6WEJsdnFaWWVDaUFrM3pTTXRkeUNJS3RNVTc2NFRBWkZiVGQzYllacEFRalBBZ2tXZlltblJYS2VS; slave_user=gh_d284c09295eb; xid=cb96e6ba4b4960d74a22869b1bb21406; _clsk=z77guf|1699532621466|4|1|mp.weixin.qq.com/weheat-agent/payload/record"
+    SP = ShiPinHaoAccount(
+        platform="shipinhao",
+        mode="author",
+        user_dict={"uid": "123456", "link": "心煤", "user_id": "1234565"},
+        rule_dict={},
+        env="prod",
+    )
+
+    SP.get_account_videos()

+ 298 - 0
shipinhao/shipinhao_author/shipinhao_scheduling.py

@@ -0,0 +1,298 @@
+import os
+import json
+import sys
+import datetime
+import time
+import uuid
+
+import requests
+
+sys.path.append(os.getcwd())
+from common import PiaoQuanPipeline, AliyunLogger
+from common.feishu import Feishu
+from common.db import MysqlHelper
+from common.mq import MQ
+
+
+def find_target_user(name, user_list):
+    for obj in user_list:
+        if obj["nickname"] == name:
+            return obj
+        else:
+            continue
+    return False
+
+
+class ShiPinHaoAccount:
+    def __init__(self, platform, mode, rule_dict, user_dict, env):
+        # self.token = token
+        # self.cookie = cookie
+        self.account_name = user_dict["link"]
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.user_dict = user_dict
+        self.env = env
+        self.download_cnt = 0
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+
+    def get_token_from_mysql(self):
+        select_sql = (
+            f"""SELECT config from crawler_config where source = '{self.platform}'; """
+        )
+        # print(select_sql)
+        configs = MysqlHelper.get_values(
+            log_type=self.mode,
+            crawler=self.platform,
+            sql=select_sql,
+            env=self.env,
+            machine="",
+        )
+        token_config = configs[0][0]
+        token_info = json.loads(token_config)
+        self.token = token_info["token"]
+        self.cookie = token_info["cookie"]
+
+    def get_history_id(self):
+        """
+        从数据库表中读取 id
+        """
+        select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
+        name_id = MysqlHelper.get_values(
+            log_type=self.mode,
+            crawler=self.platform,
+            sql=select_user_sql,
+            env=self.env,
+            machine="",
+        )
+        if name_id:
+            return name_id[0]
+        else:
+            return False
+
+    def get_account_id(self):
+        # 读历史数据,如果存在 id,则直接返回 id
+        history_id = self.get_history_id()
+        if history_id:
+            return history_id
+        else:
+            url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
+            params = {
+                "action": "search",
+                "scene": "1",
+                "buffer": "",
+                "query": self.account_name,
+                "count": "21",
+                "token": self.token,
+                "lang": "zh_CN",
+                "f": "json",
+                "ajax": "1",
+            }
+            headers = {
+                "authority": "mp.weixin.qq.com",
+                "accept": "*/*",
+                "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
+                "cookie": self.cookie,
+                "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
+                    self.token
+                ),
+                "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
+                "x-requested-with": "XMLHttpRequest",
+            }
+            response = requests.request("GET", url, headers=headers, params=params)
+            user_list = response.json()["acct_list"]
+            target_user = find_target_user(name=self.account_name, user_list=user_list)
+            # 写入 MySql 数据库
+            if target_user:
+                update_sql = f"""INSERT INTO accounts (name, name_id, platform) values ("{self.account_name}", "{target_user['username']}", "{self.platform}")"""
+                # print(update_sql)
+                MysqlHelper.update_values(
+                    log_type=self.mode,
+                    crawler=self.platform,
+                    sql=update_sql,
+                    env=self.env,
+                    machine="",
+                )
+                return target_user["username"]
+            else:
+                return False
+
+    def get_account_videos(self):
+        # 一个账号最多抓取 30 条数据
+        self.get_token_from_mysql()
+        user_id = self.get_account_id()
+        if user_id:
+            url = "https://mp.weixin.qq.com/cgi-bin/videosnap"
+            headers = {
+                "authority": "mp.weixin.qq.com",
+                "accept": "*/*",
+                "accept-language": "en,zh-CN;q=0.9,zh;q=0.8",
+                "cookie": self.cookie,
+                "referer": "https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={}&lang=zh_CN".format(
+                    self.token
+                ),
+                "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
+                "x-requested-with": "XMLHttpRequest",
+            }
+            buffer = ""  # 翻页指示器
+            while True:
+                if self.download_cnt >= int(
+                    self.rule_dict.get("videos_cnt", {}).get("min", 30)
+                ):
+                    return
+                params = {
+                    "action": "get_feed_list",
+                    "username": user_id,
+                    "buffer": buffer,
+                    "count": "15",
+                    "scene": "1",
+                    "token": self.token,
+                    "lang": "zh_CN",
+                    "f": "json",
+                    "ajax": "1",
+                }
+                response = requests.request("GET", url, headers=headers, params=params)
+                res_json = response.json()
+                # 开始判断视频是否有信息,是否频控
+                if res_json["base_resp"]["err_msg"] == "invalid session":
+                    AliyunLogger.logging(
+                        code="2000",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        message=f"status_code:{response.status_code}, get_videoList:{response.text}\n",
+                    )
+                    if 20 >= datetime.datetime.now().hour >= 10:
+                        Feishu.bot(
+                            log_type=self.mode,
+                            crawler=self.platform,
+                            text="视频号Token 过期啦"
+                            # text=f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/"
+                        )
+                    time.sleep(60 * 15)
+                    continue
+                if res_json["base_resp"]["err_msg"] == "freq control":
+                    AliyunLogger.logging(
+                        code="2000",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        message=f"status_code:{response.status_code}, get_videoList:{response.text}\n",
+                    )
+                    if 20 >= datetime.datetime.now().hour >= 10:
+                        Feishu.bot(
+                            log_type=self.mode,
+                            crawler=self.platform,
+                            text="视频号Token 过期啦"
+                            # text=f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/"
+                        )
+                    time.sleep(60 * 15)
+                    continue
+                if not res_json.get("list"):
+                    AliyunLogger.logging(
+                        code="2000",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        message="没有更多视频了",
+                    )
+                    return
+                else:
+                    buffer = res_json["last_buff"]
+                    for obj in res_json["list"]:
+                        try:
+                            AliyunLogger.logging(
+                                code="1001",
+                                platform=self.platform,
+                                mode=self.mode,
+                                message="扫描到一条视频",
+                                env=self.env,
+                                data=obj,
+                            )
+                            repeat_flag = self.process_video_obj(obj)
+                            if not repeat_flag:
+                                return
+                        except Exception as e:
+                            AliyunLogger.logging(
+                                code="3000",
+                                platform=self.platform,
+                                mode=self.mode,
+                                env=self.env,
+                                message=f"抓取单条视频异常:{e}\n",
+                            )
+        else:
+            AliyunLogger.logging(
+                code="3000",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                message="{}\t获取 id 失败".format(self.account_name),
+            )
+
+    def process_video_obj(self, video_obj):
+        trace_id = self.platform + str(uuid.uuid1())
+        video_dict = {
+            "video_id": video_obj["nonce_id"],
+            "video_title": video_obj["desc"],
+            "out_video_id": video_obj["nonce_id"],
+            "publish_time_stamp": int(time.time()),
+            "publish_time_str": time.strftime(
+                "%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))
+            ),
+            "play_cnt": 0,
+            "comment_cnt": 0,
+            "like_cnt": 0,
+            "share_cnt": 0,
+            "user_id": self.user_dict["uid"],
+            "cover_url": video_obj["media"][0]["cover_url"],
+            "video_url": video_obj["media"][0]["video_url"],
+            "avatar_url": video_obj["head_url"],
+            "width": video_obj["media"][0]["width"],
+            "height": video_obj["media"][0]["height"],
+            "duration": video_obj["media"][0]["video_play_len_s"],
+            "platform": self.platform,
+            "strategy": self.mode,
+            "crawler_rule": self.rule_dict,
+            "session": f"shipinhao-author-{int(time.time())}",
+        }
+        # video_dict["out_user_id"] = video_dict["user_id"]
+        # 无更新时间,去重即可
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            item=video_dict,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            trace_id=trace_id,
+        )
+        if not pipeline.repeat_video():
+            return False
+        else:
+            video_dict["publish_time"] = video_dict["publish_time_str"]
+            self.mq.send_msg(video_dict)
+            self.download_cnt += 1
+            AliyunLogger.logging(
+                code="1002",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                data=video_dict,
+                trace_id=trace_id,
+                message="成功发送 MQ 至 ETL",
+            )
+        return True
+
+
+# if __name__ == "__main__":
+#     temp_token = "2080949641"
+#     temp_cookie = "ua_id=bw4VuFJr6fAuSkwdAAAAAClaW0m9Aua-6IfHaXU_zpo=; wxuin=95302180931488; mm_lang=zh_CN; RK=kreEMgtMMJ; ptcz=8fd1b267c98a1185bbe6455a081f1264048ee388363ca305d9ef4812892c7900; qq_domain_video_guid_verify=2ba78a5010233582; poc_sid=HOinP2Wj322Ex737kV651Zqy6y8fSprOUUvaegBg; _qimei_q36=; _qimei_h38=9eea33ea92afe8a922333fce03000001317916; pgv_pvid=9056371236; _clck=3930572231|1|fgk|0; uuid=6562bbd8859230ce4120dfa063c76997; rand_info=CAESIGAatjSIjvxVJVDxRDN7F/CNFWMifvAVqje98rd++8UY; slave_bizuin=3236647229; data_bizuin=3236647229; bizuin=3236647229; data_ticket=qm3i6jRhObs1yKHttGh0gVI02Mz7FTPfatn0RMLdaWyD7Ukcokm5Dc3mmYLQUZPg; slave_sid=UWxjZnhBREZRRTNKZ3dYZTlYRE9Db2lxQUhOM3lZUlRoMkV0MG1wdVVudGpQTWxnVkxzYW5pV2c3NjB3bnAyQ2lPaXBBVVRPazEybWtKSVEzTnUyazZ6WEJsdnFaWWVDaUFrM3pTTXRkeUNJS3RNVTc2NFRBWkZiVGQzYllacEFRalBBZ2tXZlltblJYS2VS; slave_user=gh_d284c09295eb; xid=cb96e6ba4b4960d74a22869b1bb21406; _clsk=z77guf|1699532621466|4|1|mp.weixin.qq.com/weheat-agent/payload/record"
+#     SP = ShiPinHaoAccount(
+#         token=temp_token,
+#         cookie=temp_cookie,
+#         account_name="心煤",
+#         platform="shipinhao",
+#         mode="author",
+#         rule_dict={},
+#         env="prod",
+#     )
+#     SP.get_account_videos()

+ 172 - 0
shipinhao/shipinhao_main/run_sph_author.py

@@ -0,0 +1,172 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/6/30
+import argparse
+from multiprocessing import Process
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+sys.path.append(os.getcwd())
+from common.public import task_fun_mq, get_consumer, ack_message
+from common.scheduling_db import MysqlHelper
+from common import AliyunLogger
+from shipinhao.shipinhao_author import ShiPinHaoAccount
+
+
+def main(log_type, crawler, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    AliyunLogger.logging(
+        code="1000",
+        platform=crawler,
+        mode=log_type,
+        env=env,
+        message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+        f"WaitSeconds:{wait_seconds}\n"
+        f"TopicName:{topic_name}\n"
+        f"MQConsumer:{group_id}",
+    )
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"Receive\n"
+                    f"MessageId:{msg.message_id}\n"
+                    f"MessageBodyMD5:{msg.message_body_md5}\n"
+                    f"MessageTag:{msg.message_tag}\n"
+                    f"ConsumedTimes:{msg.consumed_times}\n"
+                    f"PublishTime:{msg.publish_time}\n"
+                    f"Body:{msg.message_body}\n"
+                    f"NextConsumeTime:{msg.next_consume_time}\n"
+                    f"ReceiptHandle:{msg.receipt_handle}\n"
+                    f"Properties:{msg.properties}",
+                )
+                # ack_mq_message
+                ack_message(
+                    log_type=log_type,
+                    crawler=crawler,
+                    recv_msgs=recv_msgs,
+                    consumer=consumer,
+                )
+
+                # 解析 task_dict
+                task_dict = task_fun_mq(msg.message_body)["task_dict"]
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="f调度任务:{task_dict}",
+                )
+                # 解析 rule_dict
+                rule_dict = task_fun_mq(msg.message_body)["rule_dict"]
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"抓取规则:{rule_dict}\n",
+                )
+
+                # 解析 user_list
+                task_id = task_dict["id"]
+                select_user_sql = (
+                    f"""select * from crawler_user_v3 where task_id={task_id}"""
+                )
+                user_list = MysqlHelper.get_values(
+                    log_type, crawler, select_user_sql, env, action=""
+                )
+                AliyunLogger.logging(
+                    code="1003",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="开始抓取"
+                )
+                for user_dict in user_list:
+                    try:
+                        AliyunLogger.logging(
+                            code="1000",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message="开始抓取视频号{}".format(user_dict["name"]),
+                        )
+                        # 初始化
+                        SPHA = ShiPinHaoAccount(
+                            platform=crawler,
+                            mode=log_type,
+                            rule_dict=rule_dict,
+                            user_dict=user_dict,
+                            env=env,
+                        )
+                        SPHA.get_account_videos()
+                        AliyunLogger.logging(
+                            code="1000",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message="完成抓取视频号{}".format(user_dict["name"]),
+                        )
+                    except Exception as e:
+                        AliyunLogger.logging(
+                            code="3000",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message="抓取视频号{}出现问题, 报错为{}".format(user_dict["name"], e),
+                        )
+
+                AliyunLogger.logging(
+                    code="1004", platform=crawler, mode=log_type, message="结束一轮抓取"
+                )
+
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n",
+                )
+                continue
+            AliyunLogger.logging(
+                code="2000",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n",
+            )
+
+            time.sleep(2)
+            continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument("--log_type", type=str)  ## 添加参数,注明参数类型
+    parser.add_argument("--crawler")  ## 添加参数
+    parser.add_argument("--topic_name")  ## 添加参数
+    parser.add_argument("--group_id")  ## 添加参数
+    parser.add_argument("--env")  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    main(
+        log_type=args.log_type,
+        crawler=args.crawler,
+        topic_name=args.topic_name,
+        group_id=args.group_id,
+        env=args.env,
+    )

+ 29 - 14
xigua/xigua_author/xigua_author.py

@@ -783,6 +783,7 @@ class XiGuaAuthor:
                 ),
             )
             return False
+
         pipeline = PiaoQuanPipeline(
             platform=self.platform,
             mode=self.mode,
@@ -791,20 +792,34 @@ class XiGuaAuthor:
             item=video_dict,
             trace_id=trace_id,
         )
-        flag = pipeline.process_item()
-        if flag:
-            print(json.dumps(video_dict, ensure_ascii=False, indent=4))
-            self.mq.send_msg(video_dict)
-            self.download_count += 1
-            AliyunLogger.logging(
-                code="1002",
-                platform=self.platform,
-                mode=self.mode,
-                env=self.env,
-                data=video_dict,
-                trace_id=trace_id,
-                message="成功发送 MQ 至 ETL",
-            )
+        title_flag = pipeline.title_flag()
+        repeat_flag = pipeline.repeat_video()
+        if title_flag and repeat_flag:
+            if int(video_dict['play_cnt']) >= int(self.rule_dict.get("play_cnt", {}).get("min", 100000)):
+                self.mq.send_msg(video_dict)
+                self.download_count += 1
+                AliyunLogger.logging(
+                    code="1002",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    data=video_dict,
+                    trace_id=trace_id,
+                    message="成功发送 MQ 至 ETL",
+                )
+            else:
+                if float(video_dict['like_cnt']) / float(video_dict['play_cnt']) >= 0.04:
+                    self.mq.send_msg(video_dict)
+                    self.download_count += 1
+                    AliyunLogger.logging(
+                        code="1002",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        data=video_dict,
+                        trace_id=trace_id,
+                        message="成功发送 MQ 至 ETL",
+                    )
         return True
 
     def get_video_info(self, item_id, trace_id):