ソースを参照

祝福意气风发上线

罗俊辉 1 年間 前
コミット
4f723df021

+ 0 - 0
zhufuyiqifengfa/__init__.py


+ 0 - 0
zhufuyiqifengfa/zhufuyiqifengfa_main/__init__.py


+ 154 - 0
zhufuyiqifengfa/zhufuyiqifengfa_main/run_zfyqff_recommend.py

@@ -0,0 +1,154 @@
+import argparse
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+sys.path.append(os.getcwd())
+from common.public import task_fun_mq, get_consumer, ack_message
+from common.scheduling_db import MysqlHelper
+from common import AliyunLogger
+from zhufuyiqifengfa.zhufuyiqifengfa_recommend import YiQiFengFaRecommend
+
+
+def main(log_type, crawler, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    AliyunLogger.logging(
+        code="1000",
+        platform=crawler,
+        mode=log_type,
+        env=env,
+        message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+        f"WaitSeconds:{wait_seconds}\n"
+        f"TopicName:{topic_name}\n"
+        f"MQConsumer:{group_id}",
+    )
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"Receive\n"
+                    f"MessageId:{msg.message_id}\n"
+                    f"MessageBodyMD5:{msg.message_body_md5}\n"
+                    f"MessageTag:{msg.message_tag}\n"
+                    f"ConsumedTimes:{msg.consumed_times}\n"
+                    f"PublishTime:{msg.publish_time}\n"
+                    f"Body:{msg.message_body}\n"
+                    f"NextConsumeTime:{msg.next_consume_time}\n"
+                    f"ReceiptHandle:{msg.receipt_handle}\n"
+                    f"Properties:{msg.properties}",
+                )
+                # ack_mq_message
+                ack_message(
+                    log_type=log_type,
+                    crawler=crawler,
+                    recv_msgs=recv_msgs,
+                    consumer=consumer,
+                )
+                # 解析 task_dict
+                task_dict = task_fun_mq(msg.message_body)["task_dict"]
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="f调度任务:{task_dict}",
+                )
+                # 解析 rule_dict
+                rule_dict = task_fun_mq(msg.message_body)["rule_dict"]
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"抓取规则:{rule_dict}\n",
+                )
+                # 解析 user_list
+                task_id = task_dict["id"]
+                select_user_sql = (
+                    f"""select * from crawler_user_v3 where task_id={task_id}"""
+                )
+                user_list = MysqlHelper.get_values(
+                    log_type, crawler, select_user_sql, env, action=""
+                )
+                AliyunLogger.logging(
+                    code="1003",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="开始抓取"
+                )
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="开始抓取祝福意气风发——推荐",
+                )
+                main_process = YiQiFengFaRecommend(
+                    platform=crawler,
+                    mode=log_type,
+                    rule_dict=rule_dict,
+                    user_list=user_list,
+                    env=env
+                )
+                token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE3MDIyNjIzMjUsIm5iZiI6MTcwMjI2MjMyNSwiZXhwIjoxNzAyMjY5NTI1LCJkYXRhIjp7InVzZXJfaWQiOjIzOTI0Nzc0MH19.LKxtz3OKw4ADlcLwU2sWMB5AJ5__aPNdVGqtRuk1ps4"
+                main_process.get_video_list(token)
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="完成抓取——祝福意气风发",
+                )
+                AliyunLogger.logging(
+                    code="1004", platform=crawler, mode=log_type, env=env,message="结束一轮抓取"
+                )
+
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n",
+                )
+                continue
+            AliyunLogger.logging(
+                code="2000",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n",
+            )
+            time.sleep(2)
+            continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument("--log_type", type=str)  ## 添加参数,注明参数类型
+    parser.add_argument("--crawler")  ## 添加参数
+    parser.add_argument("--topic_name")  ## 添加参数
+    parser.add_argument("--group_id")  ## 添加参数
+    parser.add_argument("--env")  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    main(
+        log_type=args.log_type,
+        crawler=args.crawler,
+        topic_name=args.topic_name,
+        group_id=args.group_id,
+        env=args.env,
+    )

+ 1 - 0
zhufuyiqifengfa/zhufuyiqifengfa_recommend/__init__.py

@@ -0,0 +1 @@
+from .zhufuyiqifengfa_recommend import YiQiFengFaRecommend

+ 36 - 0
zhufuyiqifengfa/zhufuyiqifengfa_recommend/main_request.py

@@ -0,0 +1,36 @@
+import json
+import requests
+
+headers = {
+    'Host': 'api.xiahong.top',
+    # 'ik': 'b326b5062b2f0e69046810717534cb09',
+    'xweb_xhr': '1',
+    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100',
+    'token': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE3MDIwMjU1MTgsIm5iZiI6MTcwMjAyNTUxOCwiZXhwIjoxNzAyMDMyNzE4LCJkYXRhIjp7InVzZXJfaWQiOiIyMzkyNDc3NDAifX0.SGfKqlSL11OxDtx5t2XvYXsxTC0hSR--5QpdOmWPIKU',
+    'Content-Type': 'application/json',
+    'Accept': '*/*',
+    'Sec-Fetch-Site': 'cross-site',
+    'Sec-Fetch-Mode': 'cors',
+    'Sec-Fetch-Dest': 'empty',
+    'Referer': 'https://servicewechat.com/wxe5389a260a0a4ee2/2/page-frame.html',
+    'Accept-Language': 'en-US,en;q=0.9',
+}
+
+params = {
+    's': 'mobile/Video/getList',
+    'cid': '1',
+    'page': '11',
+    'api_version': '4',
+    'appid': 'wxe5389a260a0a4ee2',
+    'version': '1.9.1',
+    'env_version': 'release',
+    'scene': '1089'
+}
+
+response = requests.get('https://api.xiahong.top/index.php', headers=headers, params=params)
+
+result = response.json()
+
+
+print(json.dumps(result, ensure_ascii=False, indent=4))
+print(len(result['data']['list']))

+ 148 - 0
zhufuyiqifengfa/zhufuyiqifengfa_recommend/zhufuyiqifengfa_recommend.py

@@ -0,0 +1,148 @@
+import os
+import random
+import sys
+import time
+import uuid
+
+import requests
+
+sys.path.append(os.getcwd())
+from common.video_item import VideoItem
+from common import PiaoQuanPipeline, AliyunLogger, tunnel_proxies
+from common.mq import MQ
+
+
+class YiQiFengFaRecommend(object):
+    def __init__(self, platform, mode, rule_dict, user_list, env):
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.user_list = user_list
+        self.env = env
+        self.download_cnt = 0
+        self.limit_flag = False
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+
+    def get_video_list(self, token):
+        """
+        推荐流大约110条数据,目前暂时不会更新
+        """
+        headers = {
+            'Host': 'api.xiahong.top',
+            'ik': 'b326b5062b2f0e69046810717534cb09',
+            'xweb_xhr': '1',
+            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100',
+            'token': token,
+            'Content-Type': 'application/json',
+            'Accept': '*/*',
+            'Sec-Fetch-Site': 'cross-site',
+            'Sec-Fetch-Mode': 'cors',
+            'Sec-Fetch-Dest': 'empty',
+            'Referer': 'https://servicewechat.com/wxe5389a260a0a4ee2/2/page-frame.html',
+            'Accept-Language': 'en-US,en;q=0.9',
+        }
+        while True:
+            time.sleep(random.randint(1, 10))
+            try:
+                if self.limit_flag:
+                    message = "本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt)
+                    AliyunLogger.logging(
+                        code="2000",
+                        platform=self.platform,
+                        env=self.env,
+                        mode=self.mode,
+                        message=message
+                    )
+                    return
+                else:
+                    params = {
+                        's': 'mobile/Video/getList',
+                        'cid': '1',
+                        'page': '11',
+                        'api_version': '4',
+                        'appid': 'wxe5389a260a0a4ee2',
+                        'version': '1.9.1',
+                        'env_version': 'release',
+                        'scene': '1089'
+                    }
+                    response = requests.get('https://api.xiahong.top/index.php', headers=headers, params=params, proxies=tunnel_proxies())
+                    video_list = response.json()['data']['list']
+                    if video_list:
+                        for index, video_obj in enumerate(video_list, 1):
+                            try:
+                                AliyunLogger.logging(
+                                    code="1001",
+                                    platform=self.platform,
+                                    mode=self.mode,
+                                    env=self.env,
+                                    message="扫描到一条视频",
+                                    data=video_obj
+                                )
+                                self.process_video_obj(video_obj)
+                            except Exception as e:
+                                AliyunLogger.logging(
+                                    code="3000",
+                                    platform=self.platform,
+                                    mode=self.mode,
+                                    env=self.env,
+                                    message="抓取第{}条的时候出现问题, 报错信息是{}".format(index, e)
+                                )
+                    else:
+                        AliyunLogger.logging(
+                            code="2000",
+                            platform=self.platform,
+                            mode=self.mode,
+                            env=self.env,
+                            message="视频已经抓完",
+                        )
+                        return
+            except Exception as e:
+                AliyunLogger.logging(
+                    code="3000",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    message="抓取推荐页的时候出现错误, 报错信息是{}".format(e)
+                )
+
+    def process_video_obj(self, video_obj):
+        trace_id = self.platform + str(uuid.uuid1())
+        our_user = random.choice(self.user_list)
+        item = VideoItem()
+        item.add_video_info("user_id", our_user["uid"])
+        item.add_video_info("user_name", our_user["nick_name"])
+        item.add_video_info("video_id", video_obj["id"])
+        item.add_video_info("video_title", video_obj["title"])
+        item.add_video_info("video_url", video_obj['video_url'])
+        item.add_video_info("cover_url", video_obj["video_cover"])
+        item.add_video_info("play_cnt", video_obj['visited'])
+        item.add_video_info("share_cnt", video_obj['shared'])
+        item.add_video_info("out_video_id", video_obj["id"])
+        item.add_video_info("platform", self.platform)
+        item.add_video_info("strategy", self.mode)
+        item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
+        mq_obj = item.produce_item()
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=mq_obj,
+            trace_id=trace_id,
+        )
+        if pipeline.process_item():
+            self.download_cnt += 1
+            self.mq.send_msg(mq_obj)
+            AliyunLogger.logging(
+                code="1002",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                message="成功发送至 ETL",
+                data=mq_obj,
+                trace_id=trace_id
+            )
+            if self.download_cnt >= int(
+                self.rule_dict.get("videos_cnt", {}).get("min", 200)
+            ):
+                self.limit_flag = True

+ 145 - 0
zhufuyiqifengfa/zhufuyiqifengfa_recommend/zhufuyiqifengfa_recommend_dev.py

@@ -0,0 +1,145 @@
+import os
+import json
+import random
+import sys
+import time
+import uuid
+
+import requests
+
+sys.path.append(os.getcwd())
+from common.video_item import VideoItem
+from common import tunnel_proxies
+from common.pipeline import PiaoQuanPipelineTest
+
+
+class YiQiFengFaRecommend(object):
+    def __init__(self, platform, mode, rule_dict, user_list, env):
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.user_list = user_list
+        self.env = env
+        self.download_cnt = 0
+        self.limit_flag = False
+
+    def get_video_list(self, token):
+        """
+        推荐流大约110条数据,目前暂时不会更新
+        """
+        headers = {
+            'Host': 'api.xiahong.top',
+            'ik': 'b326b5062b2f0e69046810717534cb09',
+            'xweb_xhr': '1',
+            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF MacWechat/3.8.5(0x13080510)XWEB/1100',
+            'token': token,
+            'Content-Type': 'application/json',
+            'Accept': '*/*',
+            'Sec-Fetch-Site': 'cross-site',
+            'Sec-Fetch-Mode': 'cors',
+            'Sec-Fetch-Dest': 'empty',
+            'Referer': 'https://servicewechat.com/wxe5389a260a0a4ee2/2/page-frame.html',
+            'Accept-Language': 'en-US,en;q=0.9',
+        }
+        while True:
+            time.sleep(random.randint(1, 10))
+            try:
+                if self.limit_flag:
+                    message = "本轮已经抓取到足够的数据,自动退出\t{}".format(self.download_cnt)
+                    print(message)
+                    return
+                else:
+                    params = {
+                        's': 'mobile/Video/getList',
+                        'cid': '1',
+                        'page': '11',
+                        'api_version': '4',
+                        'appid': 'wxe5389a260a0a4ee2',
+                        'version': '1.9.1',
+                        'env_version': 'release',
+                        'scene': '1089'
+                    }
+                    response = requests.get('https://api.xiahong.top/index.php', headers=headers, params=params, proxies=tunnel_proxies())
+                    video_list = response.json()['data']['list']
+                    if video_list:
+                        for index, video_obj in enumerate(video_list, 1):
+                            try:
+                                print("扫描到一条视频")
+                                self.process_video_obj(video_obj)
+                            except Exception as e:
+                                print("抓取第{}条的时候出现问题, 报错信息是{}".format(index, e))
+                    else:
+                        print("已经抓完了,自动退出")
+                        return
+            except Exception as e:
+                print("抓取推荐页的时候出现错误, 报错信息是{}".format(e))
+
+    def process_video_obj(self, video_obj):
+        obj = {
+            "id": 224708,
+            "title": "🔴这个女博士的遭遇❗️让人心疼,群友们都看看吧!",
+            "images": "http://pic.weitunit.com/ca/a9/caa9007f30a99b6b427e1f3664fe527b.jpg",
+            "video_url": "https://api-hl.huoshan.com/hotsoon/item/video/_source/?video_id=v0200fg10000clhclejc77u23q1m9ojg&line=0&app_id=0&vquality=normal&watermark=0&long_video=0&sf=4&ts=1702261882&item_id=7305624354258275638",
+            "out_link_id": "",
+            "type": 0,
+            "share_title": "🔴这个女博士的遭遇❗️让人心疼,群友们都看看吧!",
+            "image_breathing": False,
+            "video_cover": "https://mmbiz.qpic.cn/sz_mmbiz_png/enW3mhIB3IULz7uuISC8VTFZvc6F86PdUpo9ZAdo0UfyBmFPic6l7zoXpKKWU2Dt0Vf2Q8XV9jKkX63yP20G0CA/640?wx_fmt=png&from=appmsg",
+            "end_title": "❤️请帮忙转发各大群里!拜托大家!🙏",
+            "end_cover": "https://mmbiz.qpic.cn/sz_mmbiz_png/enW3mhIB3IULz7uuISC8VTFZvc6F86PdCVQYE2xgQ9qyjLL43ib6aQ4EaFdFruicY79WUiapcxQK5a955sqia50KFA/640?wx_fmt=png&from=appmsg",
+            "author_nickname": "",
+            "author_picture": "",
+            "prompt_type": "",
+            "display_image": "https://mmbiz.qpic.cn/mmbiz_gif/NlUgcycicAT2CehvYKTx4YCTGSMJ8XFRXIQCwX1q6ibG9TprFAGicHre6aicHxQ1qxW0wzUgW5lmRDQDxPdyJNZxag/0?wx_fmt=gif",
+            "share_image": "http://pic.weitunit.com/ca/a9/caa9007f30a99b6b427e1f3664fe527b.jpg?x-oss-process=image/resize,m_fill,w_400,h_320,limit_0/watermark,image_bmV3X3dhdGVybWFyay5wbmc_eC1vc3MtcHJvY2Vzcz1pbWFnZS9yZXNpemUsUF81MA,g_center/watermark,image_dmlld3MucG5nP3gtb3NzLXByb2Nlc3M9aW1hZ2UvcmVzaXplLFBfNDA,g_sw,x_10,y_10/quality,Q_70",
+            "click_jump_path_index": "/pages/index/index?videoId=224708&from=video",
+            "share_jump_path_index": "/pages/index/index?videoId=224708&from=video",
+            "visited": 8376085,
+            "shared": 1159920,
+            "share_vid": 224708
+        }
+        trace_id = self.platform + str(uuid.uuid1())
+        our_user = random.choice(self.user_list)
+        item = VideoItem()
+        item.add_video_info("user_id", our_user["uid"])
+        item.add_video_info("user_name", our_user["nick_name"])
+        item.add_video_info("video_id", video_obj["id"])
+        item.add_video_info("video_title", video_obj["title"])
+        item.add_video_info("video_url", video_obj['video_url'])
+        item.add_video_info("cover_url", video_obj["video_cover"])
+        item.add_video_info("play_cnt", video_obj['visited'])
+        item.add_video_info("share_cnt", video_obj['shared'])
+        item.add_video_info("out_video_id", video_obj["id"])
+        item.add_video_info("platform", self.platform)
+        item.add_video_info("strategy", self.mode)
+        item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))
+        mq_obj = item.produce_item()
+        pipeline = PiaoQuanPipelineTest(
+            platform=self.platform,
+            mode=self.mode,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=mq_obj,
+            trace_id=trace_id,
+        )
+        if pipeline.process_item():
+            self.download_cnt += 1
+            # self.mq.send_msg(mq_obj)
+            print(mq_obj)
+            print("成功发送至 ETL")
+            if self.download_cnt >= int(
+                self.rule_dict.get("videos_cnt", {}).get("min", 200)
+            ):
+                self.limit_flag = True
+
+
+if __name__ == '__main__':
+    S = YiQiFengFaRecommend(
+        platform="zhufuyiqifengfa",
+        mode="recommend",
+        env="dev",
+        rule_dict={},
+        user_list=[{'nick_name': "Ivring", 'uid': "1997"}, {'nick_name': "paul", 'uid': "1998"}]
+    )
+    token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE3MDIyNjIzMjUsIm5iZiI6MTcwMjI2MjMyNSwiZXhwIjoxNzAyMjY5NTI1LCJkYXRhIjp7InVzZXJfaWQiOjIzOTI0Nzc0MH19.LKxtz3OKw4ADlcLwU2sWMB5AJ5__aPNdVGqtRuk1ps4"
+    S.get_video_list(token)