Przeglądaj źródła

add 老年团队

zhangyong 1 rok temu
rodzic
commit
1ac3a3995c

+ 0 - 0
laoniantuandui/__init__.py


+ 0 - 0
laoniantuandui/laoniantuandui_main/__init__.py


+ 112 - 0
laoniantuandui/laoniantuandui_main/run_lntd_recommend.py

@@ -0,0 +1,112 @@
+# -*- coding: utf-8 -*-
+
+import argparse
+import random
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+
+sys.path.append(os.getcwd())
+from laoniantuandui.laoniantuandui_recommend.lntd_recommend import LntdRecommend
+
+from common.common import Common
+from common.public import get_consumer, ack_message, task_fun_mq
+from common.scheduling_db import MysqlHelper
+
+
+def main(log_type, crawler, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    Common.logger(log_type, crawler).info(f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                                          f'WaitSeconds:{wait_seconds}\n'
+                                          f'TopicName:{topic_name}\n'
+                                          f'MQConsumer:{group_id}')
+    Common.logging(log_type, crawler, env, f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                                           f'WaitSeconds:{wait_seconds}\n'
+                                           f'TopicName:{topic_name}\n'
+                                           f'MQConsumer:{group_id}')
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                Common.logger(log_type, crawler).info(f"Receive\n"
+                                                      f"MessageId:{msg.message_id}\n"
+                                                      f"MessageBodyMD5:{msg.message_body_md5}\n"
+                                                      f"MessageTag:{msg.message_tag}\n"
+                                                      f"ConsumedTimes:{msg.consumed_times}\n"
+                                                      f"PublishTime:{msg.publish_time}\n"
+                                                      f"Body:{msg.message_body}\n"
+                                                      f"NextConsumeTime:{msg.next_consume_time}\n"
+                                                      f"ReceiptHandle:{msg.receipt_handle}\n"
+                                                      f"Properties:{msg.properties}")
+                Common.logging(log_type, crawler, env, f"Receive\n"
+                                                       f"MessageId:{msg.message_id}\n"
+                                                       f"MessageBodyMD5:{msg.message_body_md5}\n"
+                                                       f"MessageTag:{msg.message_tag}\n"
+                                                       f"ConsumedTimes:{msg.consumed_times}\n"
+                                                       f"PublishTime:{msg.publish_time}\n"
+                                                       f"Body:{msg.message_body}\n"
+                                                       f"NextConsumeTime:{msg.next_consume_time}\n"
+                                                       f"ReceiptHandle:{msg.receipt_handle}\n"
+                                                       f"Properties:{msg.properties}")
+                # ack_mq_message
+                ack_message(log_type=log_type, crawler=crawler, recv_msgs=recv_msgs, consumer=consumer)
+
+                # 处理爬虫业务
+                task_dict = task_fun_mq(msg.message_body)['task_dict']
+                rule_dict = task_fun_mq(msg.message_body)['rule_dict']
+                task_id = task_dict['id']
+                select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
+                user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
+                our_uid_list = []
+                for user in user_list:
+                    our_uid_list.append(user["uid"])
+                our_uid = random.choice(our_uid_list)
+                Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
+                Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
+                Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}")
+                Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}")
+                Common.logger(log_type, crawler).info(f"用户列表:{user_list}\n")
+                Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
+                Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n')
+                LntdRecommend.get_videoList(log_type=log_type,
+                                                crawler=crawler,
+                                                rule_dict=rule_dict,
+                                                our_uid=our_uid,
+                                                env=env)
+                Common.del_charles_files(log_type, crawler)
+                Common.logger(log_type, crawler).info('抓取一轮结束\n')
+                Common.logging(log_type, crawler, env, '抓取一轮结束\n')
+
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                Common.logger(log_type, crawler).info(f"No new message! RequestId:{err.req_id}\n")
+                Common.logging(log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n")
+                continue
+
+            Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n")
+            Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n")
+            time.sleep(2)
+            continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
+    parser.add_argument('--crawler')  ## 添加参数
+    parser.add_argument('--topic_name')  ## 添加参数
+    parser.add_argument('--group_id')  ## 添加参数
+    parser.add_argument('--env')  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    main(log_type=args.log_type,
+         crawler=args.crawler,
+         topic_name=args.topic_name,
+         group_id=args.group_id,
+         env=args.env)

+ 22 - 0
laoniantuandui/laoniantuandui_main/run_lntd_recommend_dev.py

@@ -0,0 +1,22 @@
+# -*- coding: utf-8 -*-
+
+import os
+import sys
+
+from laoniantuandui.laoniantuandui_recommend.lntd_recommend import LntdRecommend
+
+sys.path.append(os.getcwd())
+from common.common import Common
+
+
+def zhufushenghuo_main(log_type, crawler, env):
+    Common.logger(log_type, crawler).info(f'开始抓取老年团队-精彩看点:\n')
+    LntdRecommend.get_videoList(log_type=log_type,
+                                    crawler=crawler,
+                                    rule_dict={},
+                                    our_uid=6267140,
+                                    env=env)
+
+
+if __name__ == "__main__":
+    zhufushenghuo_main(log_type="recommend", crawler="laoniantuandui", env="dev")

+ 0 - 0
laoniantuandui/laoniantuandui_recommend/__init__.py


+ 193 - 0
laoniantuandui/laoniantuandui_recommend/lntd_recommend.py

@@ -0,0 +1,193 @@
+import json
+import random
+import time
+import uuid
+
+import requests
+
+from common.common import Common
+from common.scheduling_db import MysqlHelper
+from common.mq import MQ
+from common.public import download_rule, get_config_from_mysql
+
+proxies = {"http": None, "https": None}
+headers = {
+  'Host': 'kkj.xinhuachuanmeijs.com',
+  'accept': '*/*',
+  'content-type': 'application/x-www-form-urlencoded',
+  'accept-language': 'zh-cn',
+  'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 11_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E217 MicroMessenger/6.8.0(0x16080000) NetType/WIFI Language/en Branch/Br_trunk MiniProgramEnv/Mac',
+  'referer': 'https://servicewechat.com/wxb22a8cfe60d6cfe9/9/page-frame.html',
+}
+
+
+
+class LntdRecommend:
+    platform = ("老年团队-精彩看点")
+    download_cnt = 0
+    element_list = []
+    i = 0
+
+    @classmethod
+    def get_video_url(cls, vid, log_type, crawler, env):
+        try:
+            url = "https://kkj.xinhuachuanmeijs.com/app/index.php?i=304&t=0&m=jyt_txvideo&v=1.0&from=wxapp&c=entry&a=wxapp&do=videoinfo&vid={}&version=1.0.3".format(vid)
+            payload={}
+
+            r = requests.request("GET", url, headers=headers, data=payload, proxies=proxies, verify=False)
+            if "data" not in r.text or r.status_code != 200:
+                Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n")
+                Common.logging(log_type, crawler, env, f"get_videoList:{r.text}\n")
+                return
+            elif "data" not in r.json():
+                Common.logger(log_type, crawler).info(f"get_videoList:{r.json()}\n")
+                Common.logging(log_type, crawler, env, f"get_videoList:{r.json()}\n")
+                return
+            elif len(r.json()["data"]) == 0:
+                Common.logger(log_type, crawler).warning(f"get_videoList:{r.json()['data']}\n")
+                Common.logging(log_type, crawler, env, f"get_videoList:{r.json()['data']}\n")
+                return
+            else:
+                # 获取视频url
+                data = r.json()["data"]
+                return data
+        except Exception as e:
+            Common.logger(log_type, crawler).error(f" 获取视频链接异常:{e}\n")
+            Common.logging(log_type, crawler, env, f"获取视频链接异常:{e}\n")
+    @classmethod
+    def repeat_video(cls, log_type, crawler, video_id, env):
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and create_time>='2023-06-26' and out_video_id="{video_id}"; """
+        repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
+        return len(repeat_video)
+
+    @classmethod
+    def get_vlist(cls, log_type, crawler, env):
+        url = "https://kkj.xinhuachuanmeijs.com/app/index.php?i=304&t=0&m=jyt_txvideo&v=1.0&from=wxapp&c=entry&a=wxapp&do=videolist"
+        payload = "category=3612&page=1&israndom=1&type=0&isview=&noauth=true"
+
+        r = requests.request("POST", url, headers=headers, data=payload, proxies=proxies, verify=False)
+        if "data" not in r.text or r.status_code != 200:
+            Common.logger(log_type, crawler).warning(f"get_videoList:{r.text}\n")
+            Common.logging(log_type, crawler, env, f"get_videoList:{r.text}\n")
+            return
+        elif "data" not in r.json():
+            Common.logger(log_type, crawler).info(f"get_videoList:{r.json()}\n")
+            Common.logging(log_type, crawler, env, f"get_videoList:{r.json()}\n")
+            return
+        elif len(r.json()["data"]) == 0:
+            Common.logger(log_type, crawler).warning(f"get_videoList:{r.json()['data']}\n")
+            Common.logging(log_type, crawler, env, f"get_videoList:{r.json()['data']}\n")
+            return
+        else:
+            return r
+    @classmethod
+    def get_videoList(cls, log_type, crawler, our_uid, rule_dict, env):
+        mq = MQ(topic_name="topic_crawler_etl_" + env)
+        for page in range(1, 101):
+            try:
+                Common.logger(log_type, crawler).info(f"正在抓取第{page}页")
+                Common.logging(log_type, crawler, env, f"正在抓取第{page}页")
+                # 获取列表
+                list = cls.get_vlist(log_type, crawler, env)
+                # 视频列表
+                feeds = list.json()["data"]
+                for i in range(len(feeds)):
+                    try:
+                        if cls.download_cnt >= int(rule_dict.get("videos_cnt", {}).get("min", 10)):
+                            cls.i = 0
+                            cls.download_cnt = 0
+                            cls.element_list = []
+                            return
+                        cls.i += 1
+                        video_title = feeds[i].get("vtitle", "").strip().replace("\n", "") \
+                            .replace("/", "").replace("\\", "").replace("\r", "") \
+                            .replace(":", "").replace("*", "").replace("?", "") \
+                            .replace("?", "").replace('"', "").replace("<", "") \
+                            .replace(">", "").replace("|", "").replace(" ", "") \
+                            .replace("&NBSP", "").replace(".", "。").replace(" ", "") \
+                            .replace("'", "").replace("#", "").replace("Merge", "")
+                        publish_time_stamp = int(feeds[i].get("create_time", 0))
+                        publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp))
+                        vid = feeds[i]["vid"]
+                        # 获取视频链接
+                        video_page = cls.get_video_url(vid, log_type, crawler, env)
+                        cover = video_page.get("cover", "")
+                        if cover == "":
+                            cover = feeds[i].get("poster", "")
+                        video_dict = {
+                            "video_title": video_title,
+                            "video_id": feeds[i]["vid"],  # 视频id
+                            "publish_time_stamp": publish_time_stamp,
+                            "publish_time_str": publish_time_str,
+                            "is_video": int(feeds[i].get("is_video", 0)),  # 类型
+                            "category_id": int(feeds[i].get("category_id", 0)),  # 视频来源
+                            "cover_url": cover,  # 视频封面
+                            "video_url": video_page.get("res"),  # 视频链接
+                            "click": int(feeds[i].get("click", 0)),  # 点击数
+                            "video_time": int(feeds[i].get("vtime", 0)),  # 视频时长(m)
+                            "video_width": int(feeds[i].get("vw", 0)),
+                            "video_height": int(feeds[i].get("vh", 0)),
+                            "user_name": feeds[i].get("source", "").strip().replace("\n", ""),
+                            "user_id": feeds[i].get("openid", ""),
+                            "play_cnt": 0,
+                            "like_cnt": 0,
+                            "comment_cnt": 0,
+                            "share_cnt": 0,
+                            # "duration": feeds[i].get("mediaDuration", 0),
+                            "session": ""
+
+                        }
+                        for k, v in video_dict.items():
+                            Common.logger(log_type, crawler).info(f"{k}:{v}")
+                        Common.logging(log_type, crawler, env, f"video_dict:{video_dict}")
+
+                        if video_dict["video_id"] == "" or video_dict["video_title"] == "" or video_dict[
+                            "video_url"] == "":
+                            Common.logger(log_type, crawler).info("无效视频\n")
+                            Common.logging(log_type, crawler, env, "无效视频\n")
+                        elif download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict,
+                                           rule_dict=rule_dict) is False:
+                            Common.logger(log_type, crawler).info("不满足抓取规则\n")
+                            Common.logging(log_type, crawler, env, "不满足抓取规则\n")
+                        elif any(str(word) if str(word) in video_dict["video_title"] else False
+                                 for word in get_config_from_mysql(log_type=log_type,
+                                                                   source=crawler,
+                                                                   env=env,
+                                                                   text="filter",
+                                                                   action="")) is True:
+                            Common.logger(log_type, crawler).info('已中过滤词\n')
+                            Common.logging(log_type, crawler, env, '已中过滤词\n')
+                        elif cls.repeat_video(log_type, crawler, video_dict["video_id"], env) != 0:
+                            Common.logger(log_type, crawler).info('视频已下载\n')
+                            Common.logging(log_type, crawler, env, '视频已下载\n')
+
+                        else:
+                            video_dict["out_user_id"] = video_dict["user_id"]
+                            video_dict["platform"] = crawler
+                            video_dict["strategy"] = log_type
+                            video_dict["out_video_id"] = video_dict["video_id"]
+                            video_dict["width"] = video_dict["video_width"]
+                            video_dict["height"] = video_dict["video_height"]
+                            video_dict["crawler_rule"] = json.dumps(rule_dict)
+                            video_dict["user_id"] = our_uid
+                            video_dict["publish_time"] = video_dict["publish_time_str"]
+                            mq.send_msg(video_dict)
+                            cls.download_cnt += 1
+                            interval = random.randrange(5, 11)
+                            time.sleep(interval)
+                    except Exception as e:
+                        Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n")
+                        Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n")
+            except Exception as e:
+                Common.logger(log_type, crawler).error(f"抓取第{page}页时异常:{e}\n")
+                Common.logging(log_type, crawler, env, f"抓取第{page}页时异常:{e}\n")
+
+
+
+if __name__ == "__main__":
+    rule_dict1 = {"period": {"min": 365, "max": 365},
+                  "duration": {"min": 30, "max": 1800},
+                  "favorite_cnt": {"min": 0, "max": 0},
+                  "videos_cnt": {"min": 10, "max": 20},
+                  "share_cnt": {"min": 0, "max": 0}}
+    LntdRecommend.get_videoList("recommend", "laoniantuandui,", "16QspO", rule_dict1, 'dev')