wangkun 1 year ago
parent
commit
333f4e8418

+ 14 - 0
README.MD

@@ -108,6 +108,20 @@ ps aux | grep run_zhiqingtiantiankan | grep -v grep | awk '{print $2}' | xargs k
 ps aux | grep Appium.app | grep -v grep | awk '{print $2}' | xargs kill -9
 ```
 
+#### 看一看 session
+```commandline
+# 线下调试
+sh kanyikan/kanyikan_main/run_kyk_session.sh "recommend" "kanyikan" "dev" "kykln"
+# 看一看推荐
+sh kanyikan/kanyikan_main/run_kyk_session.sh "recommend" "kanyikan" "dev" "kyk"
+# 看一看健康
+sh kanyikan/kanyikan_main/run_kyk_session.sh "recommend" "kanyikan" "dev" "kykjk"
+# 看一看老年
+sh kanyikan/kanyikan_main/run_kyk_session.sh "recommend" "kanyikan" "dev" "kykln"
+
+ps aux | grep kanyikan_session | grep -v grep | awk '{print $2}' | xargs kill -9
+```
+
 #### 视频号
 ```commandline
 正式环境

+ 1 - 1
haitunzhufu/haitunzhufu_main/run_htzf_recommend.py

@@ -13,7 +13,7 @@ from haitunzhufu.haitunzhufu_recommend.haitunzhufu_recommend2 import HTZFRecomme
 class HTZFMain:
     @classmethod
     def main(cls, log_type, crawler, env):
-        videos_cnt = 50
+        videos_cnt = 100
         Common.logger(log_type, crawler).info('开始抓取"海豚祝福"')
         HTZFRecommend.start_wechat(log_type=log_type,
                                    crawler=crawler,

+ 13 - 1
haitunzhufu/haitunzhufu_recommend/haitunzhufu_recommend2.py

@@ -26,6 +26,7 @@ from common.scheduling_db import MysqlHelper
 
 class HTZFRecommend:
     platform = "海豚祝福"
+    element_list = []
 
     @classmethod
     def today_download_cnt(cls, log_type, crawler, env):
@@ -144,7 +145,7 @@ class HTZFRecommend:
         Common.logger(log_type, crawler).info('点击"推荐"列表成功\n')
 
         # while True:
-        for page in range(200):
+        for page in range(500):
             Common.logger(log_type, crawler).info(f"正在抓取第{page+1}页")
             if cls.search_elements(driver, '//*[@class="list"]') is None:
                 Common.logger(log_type, crawler).info("列表页窗口已销毁\n")
@@ -158,8 +159,19 @@ class HTZFRecommend:
             soup.prettify()
 
             video_list_elements = soup.findAll("wx-view", class_="img_bf")
+            # video_list_elements 有,cls.element_list 中没有的元素
+            video_list_elements = list(set(video_list_elements).difference(set(cls.element_list)))
+            # video_list_elements 与 cls.element_list 的并集
+            cls.element_list = list(set(video_list_elements) | set(cls.element_list))
             Common.logger(log_type, crawler).info(f"第{page+1}页共:{len(video_list_elements)}条视频\n")
 
+            if len(video_list_elements) == 0:
+                for i in range(10):
+                    Common.logger(log_type, crawler).info(f"向上滑动第{i + 1}次")
+                    cls.swipe_up(driver)
+                    time.sleep(0.5)
+                continue
+
             for i, video_element in enumerate(video_list_elements):
                 try:
                     today_download = cls.today_download_cnt(log_type, crawler, env)

+ 25 - 0
kanyikan/kanyikan_main/run_kyk_dev.py

@@ -0,0 +1,25 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/6/27
+import os
+import sys
+sys.path.append(os.getcwd())
+from common.common import Common
+from kanyikan.kanyikan_recommend.kyk_recommend import KanyikanRecommend
+# from kanyikan.kanyikan_recommend.kykjk_recommend import KanyikanRecommend
+# from kanyikan.kanyikan_recommend.kykln_recommend import KanyikanRecommend
+
+
+def kanyikan_main(log_type, crawler, env):
+    while True:
+        Common.logger(log_type, crawler).info(f'开始抓取:看一看推荐\n')
+        KanyikanRecommend.get_videoList(log_type=log_type,
+                                        crawler=crawler,
+                                        rule_dict={},
+                                        our_uid=6267140,
+                                        env=env)
+        Common.logger(log_type, crawler).info("抓取完一轮\n")
+
+
+if __name__ == "__main__":
+    kanyikan_main(log_type="recommend", crawler="kanyikan", env="dev")

+ 2 - 1
kanyikan/kanyikan_main/run_kyk_recommend.py

@@ -10,7 +10,8 @@ sys.path.append(os.getcwd())
 from common.common import Common
 from common.public import get_consumer, ack_message, task_fun_mq
 from common.scheduling_db import MysqlHelper
-from kanyikan.kanyikan_recommend.kanyikan_recommend0627 import KanyikanRecommend
+# from kanyikan.kanyikan_recommend.kanyikan_recommend0627 import KanyikanRecommend
+from kanyikan.kanyikan_recommend.kyk_recommend import KanyikanRecommend
 
 
 def main(log_type, crawler, topic_name, group_id, env):

+ 0 - 21
kanyikan/kanyikan_main/run_kyk_recommend_dev.py

@@ -1,21 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Author: wangkun
-# @Time: 2023/6/27
-import os
-import sys
-sys.path.append(os.getcwd())
-from common.common import Common
-from kanyikan.kanyikan_recommend.kanyikan_recommend0627 import KanyikanRecommend
-
-
-def kanyikan_main(log_type, crawler, env):
-    Common.logger(log_type, crawler).info(f'开始抓取:看一看推荐\n')
-    KanyikanRecommend.get_videoList(log_type=log_type,
-                                    crawler=crawler,
-                                    rule_dict={},
-                                    our_uid=6267140,
-                                    env=env)
-
-
-if __name__ == "__main__":
-    kanyikan_main(log_type="recommend", crawler="kanyikan", env="dev")

+ 48 - 0
kanyikan/kanyikan_main/run_kyk_session.sh

@@ -0,0 +1,48 @@
+#! /bin/bash
+
+log_type=$1           # 爬虫策略
+crawler=$2            # 哪款爬虫
+env=$3                # 爬虫运行环境,正式环境: prod / 测试环境: dev
+kanyikan_type=$4      # 爬虫子类型:看一看推荐(kyk) / 看一看推荐健康(kykjk) / 看一看推荐老年(kykln)
+
+if [ ${env} = "dev" ];then
+  piaoquan_crawler_dir=/Users/wangkun/Desktop/crawler/piaoquan_crawler/
+  profile_path=/etc/profile
+  python=python3
+  log_path=${piaoquan_crawler_dir}main/main_logs/process-kyksession-$(date +%Y-%m-%d).log
+
+elif [ ${kanyikan_type} = "kyk" ];then
+  piaoquan_crawler_dir=/Users/lieyunye/Desktop/crawler/piaoquan_crawler/
+  profile_path=./base_profile
+  python=python3
+  log_path=${piaoquan_crawler_dir}main/main_logs/process-kyksession-$(date +%Y-%m-%d).log
+
+elif [ ${kanyikan_type} = "kykjk" ];then
+  piaoquan_crawler_dir=/Users/kanyikan/Desktop/crawler/piaoquan_crawler/
+  profile_path=/.base_profile
+  python=python3
+  log_path=${piaoquan_crawler_dir}main/main_logs/process-kyksession-$(date +%Y-%m-%d).log
+
+elif [ ${kanyikan_type} = "kykln" ];then
+  piaoquan_crawler_dir=/Users/piaoquan/Desktop/piaoquan_crawler/
+  profile_path=/etc/profile
+  python=python3
+  log_path=${piaoquan_crawler_dir}main/main_logs/process-kyksession-$(date +%Y-%m-%d).log
+fi
+
+time=$(date +%H:%M:%S)
+echo "$(date "+%Y-%m-%d %H:%M:%S") 开始监测 看一看session 进程状态" >> ${log_path}
+ps -ef | grep "kanyikan_session.py" | grep -v "grep"
+if [ "$?" -eq 1 ];then
+  echo "$(date "+%Y-%m-%d_%H:%M:%S") 异常停止,正在重启!" >> ${log_path}
+  cd ${piaoquan_crawler_dir} && nohup ${python} -u kanyikan/kanyikan_session/kanyikan_session.py --log_type="${log_type}" --crawler="${crawler}" --env="${env}" --kanyikan_type="${kanyikan_type}">> kanyikan/logs/kanyikan-session.log 2>&1 &
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 重启完成!" >> ${log_path}
+else
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 看一看session 进程状态正常" >> ${log_path}
+fi
+
+# 删除日志
+echo "$(date "+%Y-%m-%d %H:%M:%S") 开始清理 10 天前的日志文件" >> ${log_path}
+find ${piaoquan_crawler_dir}main/main_logs/ -mtime +10 -name "*.log" -exec rm -rf {} \;
+echo "$(date "+%Y-%m-%d %H:%M:%S") 日志文件清理完毕" >> ${log_path}
+exit 0

+ 2 - 1
kanyikan/kanyikan_main/run_kykjk_recommend.py

@@ -10,7 +10,8 @@ sys.path.append(os.getcwd())
 from common.common import Common
 from common.public import get_consumer, ack_message, task_fun_mq
 from common.scheduling_db import MysqlHelper
-from kanyikan.kanyikan_recommend.kanyikan_recommend0705 import KanyikanRecommend
+# from kanyikan.kanyikan_recommend.kanyikan_recommend0705 import KanyikanRecommend
+from kanyikan.kanyikan_recommend.kykjk_recommend import KanyikanRecommend
 
 
 def main(log_type, crawler, topic_name, group_id, env):

+ 110 - 0
kanyikan/kanyikan_main/run_kykln_recommend.py

@@ -0,0 +1,110 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/9/25
+import argparse
+import random
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+sys.path.append(os.getcwd())
+from common.common import Common
+from common.public import get_consumer, ack_message, task_fun_mq
+from common.scheduling_db import MysqlHelper
+from kanyikan.kanyikan_recommend.kykln_recommend import KanyikanRecommend
+
+
+def main(log_type, crawler, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    Common.logger(log_type, crawler).info(f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                                          f'WaitSeconds:{wait_seconds}\n'
+                                          f'TopicName:{topic_name}\n'
+                                          f'MQConsumer:{group_id}')
+    Common.logging(log_type, crawler, env, f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                                           f'WaitSeconds:{wait_seconds}\n'
+                                           f'TopicName:{topic_name}\n'
+                                           f'MQConsumer:{group_id}')
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                Common.logger(log_type, crawler).info(f"Receive\n"
+                                                      f"MessageId:{msg.message_id}\n"
+                                                      f"MessageBodyMD5:{msg.message_body_md5}\n"
+                                                      f"MessageTag:{msg.message_tag}\n"
+                                                      f"ConsumedTimes:{msg.consumed_times}\n"
+                                                      f"PublishTime:{msg.publish_time}\n"
+                                                      f"Body:{msg.message_body}\n"
+                                                      f"NextConsumeTime:{msg.next_consume_time}\n"
+                                                      f"ReceiptHandle:{msg.receipt_handle}\n"
+                                                      f"Properties:{msg.properties}")
+                Common.logging(log_type, crawler, env, f"Receive\n"
+                                                       f"MessageId:{msg.message_id}\n"
+                                                       f"MessageBodyMD5:{msg.message_body_md5}\n"
+                                                       f"MessageTag:{msg.message_tag}\n"
+                                                       f"ConsumedTimes:{msg.consumed_times}\n"
+                                                       f"PublishTime:{msg.publish_time}\n"
+                                                       f"Body:{msg.message_body}\n"
+                                                       f"NextConsumeTime:{msg.next_consume_time}\n"
+                                                       f"ReceiptHandle:{msg.receipt_handle}\n"
+                                                       f"Properties:{msg.properties}")
+                # ack_mq_message
+                ack_message(log_type=log_type, crawler=crawler, recv_msgs=recv_msgs, consumer=consumer)
+
+                # 处理爬虫业务
+                task_dict = task_fun_mq(msg.message_body)['task_dict']
+                rule_dict = task_fun_mq(msg.message_body)['rule_dict']
+                task_id = task_dict['id']
+                select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
+                user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
+                our_uid_list = []
+                for user in user_list:
+                    our_uid_list.append(user["uid"])
+                our_uid = random.choice(our_uid_list)
+                Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
+                Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
+                Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}")
+                Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}")
+                Common.logger(log_type, crawler).info(f"用户列表:{user_list}\n")
+                Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
+                Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n')
+                KanyikanRecommend.get_videoList(log_type=log_type,
+                                                crawler=crawler,
+                                                rule_dict=rule_dict,
+                                                our_uid=our_uid,
+                                                env=env)
+                Common.del_charles_files(log_type, crawler)
+                Common.logger(log_type, crawler).info('抓取一轮结束\n')
+                Common.logging(log_type, crawler, env, '抓取一轮结束\n')
+
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                Common.logger(log_type, crawler).info(f"No new message! RequestId:{err.req_id}\n")
+                Common.logging(log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n")
+                continue
+
+            Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n")
+            Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n")
+            time.sleep(2)
+            continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
+    parser.add_argument('--crawler')  ## 添加参数
+    parser.add_argument('--topic_name')  ## 添加参数
+    parser.add_argument('--group_id')  ## 添加参数
+    parser.add_argument('--env')  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    main(log_type=args.log_type,
+         crawler=args.crawler,
+         topic_name=args.topic_name,
+         group_id=args.group_id,
+         env=args.env)

+ 0 - 299
kanyikan/kanyikan_recommend/kanyikan_recommend.py

@@ -1,299 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Author: wangkun
-# @Time: 2023/6/21
-import json
-import os
-import random
-import shutil
-import sys
-import time
-from hashlib import md5
-import requests
-import urllib3
-
-from common.mq import MQ
-sys.path.append(os.getcwd())
-from common.common import Common
-from common.feishu import Feishu
-from common.publish import Publish
-from common.scheduling_db import MysqlHelper
-from common.public import get_config_from_mysql, download_rule
-proxies = {"http": None, "https": None}
-
-
-class KanyikanRecommend:
-    platform = "看一看"
-
-    @classmethod
-    def repeat_video(cls, log_type, crawler, video_id, env):
-        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and create_time>='2023-06-26' and out_video_id="{video_id}"; """
-        repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
-        return len(repeat_video)
-
-    @classmethod
-    def get_videoList(cls, log_type, crawler, our_uid, rule_dict, env):
-        mq = MQ(topic_name="topic_crawler_etl_" + env)
-        for page in range(1, 101):
-            try:
-                Common.logger(log_type, crawler).info(f"正在抓取第{page}页")
-                Common.logging(log_type, crawler, env, f"正在抓取第{page}页")
-                session = Common.get_session(log_type, crawler, env)
-                if session is None:
-                    time.sleep(1)
-                    continue
-                url = 'https://search.weixin.qq.com/cgi-bin/recwxa/recwxavideolist?'
-                header = {
-                    "Connection": "keep-alive",
-                    "content-type": "application/json",
-                    "Accept-Encoding": "gzip,compress,br,deflate",
-                    "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) "
-                                  "AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.18(0x18001236) "
-                                  "NetType/WIFI Language/zh_CN",
-                    "Referer": "https://servicewechat.com/wxbb9a805eb4f9533c/234/page-frame.html",
-                }
-                params = {
-                    'session': session,
-                    "offset": 0,
-                    "wxaVersion": "3.9.2",
-                    "count": "10",
-                    "channelid": "208",
-                    "scene": '310',
-                    "subscene": '1089',
-                    "clientVersion": '8.0.18',
-                    "sharesearchid": '0',
-                    "nettype": 'wifi',
-                    "switchprofile": "0",
-                    "switchnewuser": "0",
-                }
-                urllib3.disable_warnings()
-                response = requests.get(url=url, headers=header, params=params, proxies=proxies, verify=False)
-                if "data" not in response.text:
-                    Common.logger(log_type, crawler).info("获取视频list时,session过期,随机睡眠 31-50 秒")
-                    Common.logging(log_type, crawler, env, "获取视频list时,session过期,随机睡眠 31-50 秒")
-                    # 如果返回空信息,则随机睡眠 31-40 秒
-                    time.sleep(random.randint(31, 40))
-                    continue
-                elif "items" not in response.json()["data"]:
-                    Common.logger(log_type, crawler).info(f"get_feeds:{response.json()},随机睡眠 1-3 分钟")
-                    Common.logging(log_type, crawler, env, f"get_feeds:{response.json()},随机睡眠 1-3 分钟")
-                    # 如果返回空信息,则随机睡眠 1-3 分钟
-                    time.sleep(random.randint(60, 180))
-                    continue
-                feeds = response.json().get("data", {}).get("items", "")
-                if feeds == "":
-                    Common.logger(log_type, crawler).info(f"feeds:{feeds}")
-                    Common.logging(log_type, crawler, env, f"feeds:{feeds}")
-                    time.sleep(random.randint(31, 40))
-                    continue
-                for i in range(len(feeds)):
-                    try:
-                        video_title = feeds[i].get("title", "").strip().replace("\n", "") \
-                            .replace("/", "").replace("\\", "").replace("\r", "") \
-                            .replace(":", "").replace("*", "").replace("?", "") \
-                            .replace("?", "").replace('"', "").replace("<", "") \
-                            .replace(">", "").replace("|", "").replace(" ", "") \
-                            .replace("&NBSP", "").replace(".", "。").replace(" ", "") \
-                            .replace("'", "").replace("#", "").replace("Merge", "")
-                        publish_time_stamp = feeds[i].get("date", 0)
-                        publish_time_str = time.strftime("%Y/%m/%d %H:%M:%S", time.localtime(publish_time_stamp))
-                        # 获取播放地址
-                        if "videoInfo" not in feeds[i]:
-                            video_url = ""
-                        elif "mpInfo" in feeds[i]["videoInfo"]["videoCdnInfo"]:
-                            if len(feeds[i]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"]) > 2:
-                                video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"][2]["url"]
-                            else:
-                                video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"][0]["url"]
-                        elif "ctnInfo" in feeds[i]["videoInfo"]["videoCdnInfo"]:
-                            video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["ctnInfo"]["urlInfo"][0]["url"]
-                        else:
-                            video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["urlInfo"][0]["url"]
-                        video_dict = {
-                            "video_title": video_title,
-                            "video_id": feeds[i].get("videoId", ""),
-                            "play_cnt": feeds[i].get("playCount", 0),
-                            "like_cnt": feeds[i].get("liked_cnt", 0),
-                            "comment_cnt": feeds[i].get("comment_cnt", 0),
-                            "share_cnt": feeds[i].get("shared_cnt", 0),
-                            "duration": feeds[i].get("mediaDuration", 0),
-                            "video_width": feeds[i].get("short_video_info", {}).get("width", 0),
-                            "video_height": feeds[i].get("short_video_info", {}).get("height", 0),
-                            "publish_time_stamp": publish_time_stamp,
-                            "publish_time_str": publish_time_str,
-                            "user_name": feeds[i].get("source", "").strip().replace("\n", ""),
-                            "user_id": feeds[i].get("openid", ""),
-                            "avatar_url": feeds[i].get("bizIcon", ""),
-                            "cover_url": feeds[i].get("thumbUrl", ""),
-                            "video_url": video_url,
-                            "session": session,
-                        }
-                        for k, v in video_dict.items():
-                            Common.logger(log_type, crawler).info(f"{k}:{v}")
-                        Common.logging(log_type, crawler, env, f"video_dict:{video_dict}")
-
-                        if video_dict["video_id"] == "" or video_dict["video_title"] == "" or video_dict["video_url"] == "":
-                            Common.logger(log_type, crawler).info("无效视频\n")
-                            Common.logging(log_type, crawler, env, "无效视频\n")
-                        elif download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict) is False:
-                            Common.logger(log_type, crawler).info("不满足抓取规则\n")
-                            Common.logging(log_type, crawler, env, "不满足抓取规则\n")
-                        elif any(str(word) if str(word) in video_dict["video_title"] else False
-                                 for word in get_config_from_mysql(log_type=log_type,
-                                                                   source=crawler,
-                                                                   env=env,
-                                                                   text="filter",
-                                                                   action="")) is True:
-                            Common.logger(log_type, crawler).info('已中过滤词\n')
-                            Common.logging(log_type, crawler, env, '已中过滤词\n')
-                        elif cls.repeat_video(log_type, crawler, video_dict["video_id"], env) != 0:
-                            Common.logger(log_type, crawler).info('视频已下载\n')
-                            Common.logging(log_type, crawler, env, '视频已下载\n')
-                        else:
-                            # cls.download_publish(log_type=log_type,
-                            #                      crawler=crawler,
-                            #                      our_uid=our_uid,
-                            #                      video_dict=video_dict,
-                            #                      rule_dict=rule_dict,
-                            #                      env=env)
-
-                            video_dict["out_user_id"] = video_dict["user_id"]
-                            video_dict["platform"] = crawler
-                            video_dict["strategy"] = log_type
-                            video_dict["out_video_id"] = video_dict["video_id"]
-                            video_dict["width"] = video_dict["video_width"]
-                            video_dict["height"] = video_dict["video_height"]
-                            video_dict["crawler_rule"] = json.dumps(rule_dict)
-                            video_dict["user_id"] = our_uid
-                            video_dict["publish_time"] = video_dict["publish_time_str"]
-
-                            mq.send_msg(video_dict)
-                    except Exception as e:
-                        Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n")
-                        Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n")
-            except Exception as e:
-                Common.logger(log_type, crawler).error(f"抓取第{page}页时异常:{e}\n")
-                Common.logging(log_type, crawler, env, f"抓取第{page}页时异常:{e}\n")
-
-    @classmethod
-    def download_publish(cls, log_type, crawler, our_uid, video_dict, rule_dict, env):
-        # 下载视频
-        Common.download_method(log_type=log_type, crawler=crawler, text='video', title=video_dict['video_title'], url=video_dict['video_url'])
-        md_title = md5(video_dict['video_title'].encode('utf8')).hexdigest()
-        try:
-            if os.path.getsize(f"./{crawler}/videos/{md_title}/video.mp4") == 0:
-                # 删除视频文件夹
-                shutil.rmtree(f"./{crawler}/videos/{md_title}")
-                Common.logger(log_type, crawler).info("视频size=0,删除成功\n")
-                Common.logging(log_type, crawler, env, "视频size=0,删除成功\n")
-                return
-        except FileNotFoundError:
-            # 删除视频文件夹
-            shutil.rmtree(f"./{crawler}/videos/{md_title}")
-            Common.logger(log_type, crawler).info("视频文件不存在,删除文件夹成功\n")
-            Common.logging(log_type, crawler, env, "视频文件不存在,删除文件夹成功\n")
-            return
-        # 下载封面
-        Common.download_method(log_type=log_type, crawler=crawler, text='cover', title=video_dict['video_title'], url=video_dict['cover_url'])
-        # 保存视频信息至txt
-        Common.save_video_info(log_type=log_type, crawler=crawler, video_dict=video_dict)
-
-        # 上传视频
-        Common.logger(log_type, crawler).info("开始上传视频...")
-        Common.logging(log_type, crawler, env, "开始上传视频...")
-        if env == "dev":
-            oss_endpoint = "out"
-            our_video_id = Publish.upload_and_publish(log_type=log_type,
-                                                      crawler=crawler,
-                                                      strategy="推荐抓取策略",
-                                                      our_uid=our_uid,
-                                                      env=env,
-                                                      oss_endpoint=oss_endpoint)
-            our_video_link = f"https://testadmin.piaoquantv.com/cms/post-detail/{our_video_id}/info"
-        else:
-            our_video_id = Publish.upload_and_publish(log_type=log_type,
-                                                      crawler=crawler,
-                                                      strategy="推荐抓取策略",
-                                                      our_uid=our_uid,
-                                                      env=env,
-                                                      oss_endpoint="out")
-
-            our_video_link = f"https://admin.piaoquantv.com/cms/post-detail/{our_video_id}/info"
-
-        if our_video_id is None:
-            try:
-                # 删除视频文件夹
-                shutil.rmtree(f"./{crawler}/videos/{md_title}")
-                return
-            except FileNotFoundError:
-                return
-
-        # 视频信息保存数据库
-        insert_sql = f""" insert into crawler_video(video_id,
-                                                user_id,
-                                                out_user_id,
-                                                platform,
-                                                strategy,
-                                                out_video_id,
-                                                video_title,
-                                                cover_url,
-                                                video_url,
-                                                duration,
-                                                publish_time,
-                                                play_cnt,
-                                                crawler_rule,
-                                                width,
-                                                height)
-                                                values({our_video_id},
-                                                {our_uid},
-                                                "{video_dict['user_id']}",
-                                                "{cls.platform}",
-                                                "推荐抓取策略",
-                                                "{video_dict['video_id']}",
-                                                "{video_dict['video_title']}",
-                                                "{video_dict['cover_url']}",
-                                                "{video_dict['video_url']}",
-                                                {int(video_dict['duration'])},
-                                                "{video_dict['publish_time_str']}",
-                                                {int(video_dict['play_cnt'])},
-                                                '{json.dumps(rule_dict)}',
-                                                {int(video_dict['video_width'])},
-                                                {int(video_dict['video_height'])}) """
-        Common.logger(log_type, crawler).info(f"insert_sql:{insert_sql}")
-        Common.logging(log_type, crawler, env, f"insert_sql:{insert_sql}")
-        MysqlHelper.update_values(log_type, crawler, insert_sql, env, action="")
-        Common.logger(log_type, crawler).info('视频信息写入数据库成功')
-        Common.logging(log_type, crawler, env, '视频信息写入数据库成功')
-
-        # 保存视频信息到云文档:
-        Feishu.insert_columns(log_type, crawler, "20ce0c", "ROWS", 1, 2)
-        # 看一看+ ,视频ID工作表,首行写入数据
-        values = [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))),
-                   "推荐榜",
-                   str(video_dict["video_id"]),
-                   str(video_dict["video_title"]),
-                   our_video_link,
-                   video_dict["play_cnt"],
-                   video_dict["comment_cnt"],
-                   video_dict["like_cnt"],
-                   video_dict["share_cnt"],
-                   video_dict["duration"],
-                   f'{video_dict["video_width"]}*{video_dict["video_height"]}',
-                   video_dict["publish_time_str"],
-                   video_dict["user_name"],
-                   video_dict["user_id"],
-                   video_dict["avatar_url"],
-                   video_dict["cover_url"],
-                   video_dict["video_url"]]]
-        time.sleep(0.5)
-        Feishu.update_values(log_type, crawler, "20ce0c", "F2:Z2", values)
-        Common.logger(log_type, crawler).info("视频信息保存至云文档成功\n")
-        Common.logging(log_type, crawler, env, "视频信息保存至云文档成功\n")
-
-
-if __name__ == "__main__":
-    print(get_config_from_mysql(log_type="recommend",
-                                source="kanyikan",
-                                env="dev",
-                                text="filter",
-                                action=""))
-    pass

+ 191 - 0
kanyikan/kanyikan_recommend/kyk_recommend.py

@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/6/21
+"""
+看一看推荐
+207 服务器抓取模式
+微信 session 从爬虫平台获取
+"""
+
+import json
+import os
+import random
+import sys
+import time
+import requests
+import urllib3
+sys.path.append(os.getcwd())
+from common.mq import MQ
+from common.common import Common
+from common.scheduling_db import MysqlHelper
+from common.public import get_config_from_mysql, download_rule
+proxies = {"http": None, "https": None}
+
+
+class KanyikanRecommend:
+    platform = "看一看"
+    strategy = "推荐抓取策略"
+
+    @classmethod
+    def get_session(cls, log_type, crawler, env):
+        session_sql = """ SELECT * FROM crawler_config WHERE `source` in ('kanyikan', '看一看'); """
+        session_response = MysqlHelper.get_values(log_type, crawler, session_sql, env, action="")
+        for config in session_response:
+            if "token" in config["config"] and config["title"] == "看一看推荐":
+                token_str = config["config"]
+                token = json.loads(token_str)["token"]
+                return token
+        return None
+
+    @classmethod
+    def repeat_video(cls, log_type, crawler, video_id, env):
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and create_time>='2023-06-26' and out_video_id="{video_id}"; """
+        repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
+        return len(repeat_video)
+
+    @classmethod
+    def get_videoList(cls, log_type, crawler, our_uid, rule_dict, env):
+        mq = MQ(topic_name="topic_crawler_etl_" + env)
+        try:
+            Common.logger(log_type, crawler).info(f"正在抓取列表页")
+            Common.logging(log_type, crawler, env, f"正在抓取列表页")
+            session = cls.get_session(log_type, crawler, env)
+            Common.logger(log_type, crawler).info(f"session:{session}")
+            if session is None:
+                Common.logger(log_type, crawler).info("session is None!")
+                time.sleep(1)
+                cls.get_videoList(log_type, crawler, our_uid, rule_dict, env)
+            url = 'https://search.weixin.qq.com/cgi-bin/recwxa/recwxavideolist?'
+            header = {
+                "Connection": "keep-alive",
+                "content-type": "application/json",
+                "Accept-Encoding": "gzip,compress,br,deflate",
+                "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) "
+                              "AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.18(0x18001236) "
+                              "NetType/WIFI Language/zh_CN",
+                "Referer": "https://servicewechat.com/wxbb9a805eb4f9533c/234/page-frame.html",
+            }
+            params = {
+                'session': session,
+                "offset": 0,
+                "wxaVersion": "3.9.2",
+                "count": "10",
+                "channelid": "208",
+                "scene": '310',
+                "subscene": '1089',
+                "clientVersion": '8.0.18',
+                "sharesearchid": '0',
+                "nettype": 'wifi',
+                "switchprofile": "0",
+                "switchnewuser": "0",
+            }
+            urllib3.disable_warnings()
+            response = requests.get(url=url, headers=header, params=params, proxies=proxies, verify=False)
+            if "data" not in response.text:
+                Common.logger(log_type, crawler).info("获取视频list时,session过期,随机睡眠 31-50 秒")
+                Common.logging(log_type, crawler, env, "获取视频list时,session过期,随机睡眠 31-50 秒")
+                # 如果返回空信息,则随机睡眠 31-40 秒
+                time.sleep(random.randint(31, 40))
+                cls.get_videoList(log_type, crawler, our_uid, rule_dict, env)
+            elif "items" not in response.json()["data"]:
+                Common.logger(log_type, crawler).info(f"get_feeds:{response.json()},随机睡眠 1-3 分钟")
+                Common.logging(log_type, crawler, env, f"get_feeds:{response.json()},随机睡眠 1-3 分钟")
+                # 如果返回空信息,则随机睡眠 1-3 分钟
+                time.sleep(random.randint(60, 180))
+                cls.get_videoList(log_type, crawler, our_uid, rule_dict, env)
+            feeds = response.json().get("data", {}).get("items", "")
+            if feeds == "":
+                Common.logger(log_type, crawler).info(f"feeds:{feeds}")
+                Common.logging(log_type, crawler, env, f"feeds:{feeds}")
+                return
+            for i in range(len(feeds)):
+                try:
+                    video_title = feeds[i].get("title", "").strip().replace("\n", "") \
+                        .replace("/", "").replace("\\", "").replace("\r", "") \
+                        .replace(":", "").replace("*", "").replace("?", "") \
+                        .replace("?", "").replace('"', "").replace("<", "") \
+                        .replace(">", "").replace("|", "").replace(" ", "") \
+                        .replace("&NBSP", "").replace(".", "。").replace(" ", "") \
+                        .replace("'", "").replace("#", "").replace("Merge", "")
+                    publish_time_stamp = feeds[i].get("date", 0)
+                    publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp))
+                    # 获取播放地址
+                    if "videoInfo" not in feeds[i]:
+                        video_url = ""
+                    elif "mpInfo" in feeds[i]["videoInfo"]["videoCdnInfo"]:
+                        if len(feeds[i]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"]) > 2:
+                            video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"][2]["url"]
+                        else:
+                            video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"][0]["url"]
+                    elif "ctnInfo" in feeds[i]["videoInfo"]["videoCdnInfo"]:
+                        video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["ctnInfo"]["urlInfo"][0]["url"]
+                    else:
+                        video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["urlInfo"][0]["url"]
+                    video_dict = {
+                        "video_title": video_title,
+                        "video_id": feeds[i].get("videoId", ""),
+                        "play_cnt": feeds[i].get("playCount", 0),
+                        "like_cnt": feeds[i].get("liked_cnt", 0),
+                        "comment_cnt": feeds[i].get("comment_cnt", 0),
+                        "share_cnt": feeds[i].get("shared_cnt", 0),
+                        "duration": feeds[i].get("mediaDuration", 0),
+                        "video_width": feeds[i].get("short_video_info", {}).get("width", 0),
+                        "video_height": feeds[i].get("short_video_info", {}).get("height", 0),
+                        "publish_time_stamp": publish_time_stamp,
+                        "publish_time_str": publish_time_str,
+                        "user_name": feeds[i].get("source", "").strip().replace("\n", ""),
+                        "user_id": feeds[i].get("openid", ""),
+                        "avatar_url": feeds[i].get("bizIcon", ""),
+                        "cover_url": feeds[i].get("thumbUrl", ""),
+                        "video_url": video_url,
+                        "session": session,
+                    }
+                    for k, v in video_dict.items():
+                        Common.logger(log_type, crawler).info(f"{k}:{v}")
+                    Common.logging(log_type, crawler, env, f"video_dict:{video_dict}")
+
+                    if video_dict["video_id"] == "" or video_dict["video_title"] == "" or video_dict["video_url"] == "":
+                        Common.logger(log_type, crawler).info("无效视频\n")
+                        Common.logging(log_type, crawler, env, "无效视频\n")
+                    elif download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict) is False:
+                        Common.logger(log_type, crawler).info("不满足抓取规则\n")
+                        Common.logging(log_type, crawler, env, "不满足抓取规则\n")
+                    elif any(str(word) if str(word) in video_dict["video_title"] else False
+                             for word in get_config_from_mysql(log_type=log_type,
+                                                               source=crawler,
+                                                               env=env,
+                                                               text="filter",
+                                                               action="")) is True:
+                        Common.logger(log_type, crawler).info('已中过滤词\n')
+                        Common.logging(log_type, crawler, env, '已中过滤词\n')
+                    elif cls.repeat_video(log_type, crawler, video_dict["video_id"], env) != 0:
+                        Common.logger(log_type, crawler).info('视频已下载\n')
+                        Common.logging(log_type, crawler, env, '视频已下载\n')
+                    else:
+                        video_dict["out_user_id"] = video_dict["user_id"]
+                        video_dict["platform"] = crawler
+                        video_dict["strategy"] = log_type
+                        video_dict["out_video_id"] = video_dict["video_id"]
+                        video_dict["width"] = video_dict["video_width"]
+                        video_dict["height"] = video_dict["video_height"]
+                        video_dict["crawler_rule"] = json.dumps(rule_dict)
+                        video_dict["user_id"] = our_uid
+                        video_dict["publish_time"] = video_dict["publish_time_str"]
+
+                        mq.send_msg(video_dict)
+                except Exception as e:
+                    Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n")
+                    Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n")
+        except Exception as e:
+            Common.logger(log_type, crawler).error(f"抓取列表页时异常:{e}\n")
+            Common.logging(log_type, crawler, env, f"抓取列表页时异常:{e}\n")
+
+
+if __name__ == "__main__":
+    # print(get_config_from_mysql(log_type="recommend",
+    #                             source="kanyikan",
+    #                             env="dev",
+    #                             text="filter",
+    #                             action=""))
+    KanyikanRecommend.get_session(log_type="recommend", crawler="kanyikan", env="dev")
+    pass

+ 191 - 0
kanyikan/kanyikan_recommend/kykjk_recommend.py

@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/6/21
+"""
+看一看推荐
+207 服务器抓取模式
+微信 session 从爬虫平台获取
+"""
+
+import json
+import os
+import random
+import sys
+import time
+import requests
+import urllib3
+sys.path.append(os.getcwd())
+from common.mq import MQ
+from common.common import Common
+from common.scheduling_db import MysqlHelper
+from common.public import get_config_from_mysql, download_rule
+proxies = {"http": None, "https": None}
+
+
+class KanyikanRecommend:
+    platform = "看一看"
+    strategy = "推荐抓取策略"
+
+    @classmethod
+    def get_session(cls, log_type, crawler, env):
+        session_sql = """ SELECT * FROM crawler_config WHERE `source` in ('kanyikan', '看一看'); """
+        session_response = MysqlHelper.get_values(log_type, crawler, session_sql, env, action="")
+        for config in session_response:
+            if "token" in config["config"] and config["title"] == "看一看健康":
+                token_str = config["config"]
+                token = json.loads(token_str)["token"]
+                return token
+        return None
+
+    @classmethod
+    def repeat_video(cls, log_type, crawler, video_id, env):
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and create_time>='2023-08-15' and out_video_id="{video_id}"; """
+        repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
+        return len(repeat_video)
+
+    @classmethod
+    def get_videoList(cls, log_type, crawler, our_uid, rule_dict, env):
+        mq = MQ(topic_name="topic_crawler_etl_" + env)
+        try:
+            Common.logger(log_type, crawler).info(f"正在抓取列表页")
+            Common.logging(log_type, crawler, env, f"正在抓取列表页")
+            session = cls.get_session(log_type, crawler, env)
+            Common.logger(log_type, crawler).info(f"session:{session}")
+            if session is None:
+                Common.logger(log_type, crawler).info("session is None!")
+                time.sleep(1)
+                cls.get_videoList(log_type, crawler, our_uid, rule_dict, env)
+            url = 'https://search.weixin.qq.com/cgi-bin/recwxa/recwxavideolist?'
+            header = {
+                "Connection": "keep-alive",
+                "content-type": "application/json",
+                "Accept-Encoding": "gzip,compress,br,deflate",
+                "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) "
+                              "AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.18(0x18001236) "
+                              "NetType/WIFI Language/zh_CN",
+                "Referer": "https://servicewechat.com/wxbb9a805eb4f9533c/234/page-frame.html",
+            }
+            params = {
+                'session': session,
+                "offset": 0,
+                "wxaVersion": "3.9.2",
+                "count": "10",
+                "channelid": "208",
+                "scene": '310',
+                "subscene": '1089',
+                "clientVersion": '8.0.18',
+                "sharesearchid": '0',
+                "nettype": 'wifi',
+                "switchprofile": "0",
+                "switchnewuser": "0",
+            }
+            urllib3.disable_warnings()
+            response = requests.get(url=url, headers=header, params=params, proxies=proxies, verify=False)
+            if "data" not in response.text:
+                Common.logger(log_type, crawler).info("获取视频list时,session过期,随机睡眠 31-50 秒")
+                Common.logging(log_type, crawler, env, "获取视频list时,session过期,随机睡眠 31-50 秒")
+                # 如果返回空信息,则随机睡眠 31-40 秒
+                time.sleep(random.randint(31, 40))
+                cls.get_videoList(log_type, crawler, our_uid, rule_dict, env)
+            elif "items" not in response.json()["data"]:
+                Common.logger(log_type, crawler).info(f"get_feeds:{response.json()},随机睡眠 1-3 分钟")
+                Common.logging(log_type, crawler, env, f"get_feeds:{response.json()},随机睡眠 1-3 分钟")
+                # 如果返回空信息,则随机睡眠 1-3 分钟
+                time.sleep(random.randint(60, 180))
+                cls.get_videoList(log_type, crawler, our_uid, rule_dict, env)
+            feeds = response.json().get("data", {}).get("items", "")
+            if feeds == "":
+                Common.logger(log_type, crawler).info(f"feeds:{feeds}")
+                Common.logging(log_type, crawler, env, f"feeds:{feeds}")
+                return
+            for i in range(len(feeds)):
+                try:
+                    video_title = feeds[i].get("title", "").strip().replace("\n", "") \
+                        .replace("/", "").replace("\\", "").replace("\r", "") \
+                        .replace(":", "").replace("*", "").replace("?", "") \
+                        .replace("?", "").replace('"', "").replace("<", "") \
+                        .replace(">", "").replace("|", "").replace(" ", "") \
+                        .replace("&NBSP", "").replace(".", "。").replace(" ", "") \
+                        .replace("'", "").replace("#", "").replace("Merge", "")
+                    publish_time_stamp = feeds[i].get("date", 0)
+                    publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp))
+                    # 获取播放地址
+                    if "videoInfo" not in feeds[i]:
+                        video_url = ""
+                    elif "mpInfo" in feeds[i]["videoInfo"]["videoCdnInfo"]:
+                        if len(feeds[i]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"]) > 2:
+                            video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"][2]["url"]
+                        else:
+                            video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"][0]["url"]
+                    elif "ctnInfo" in feeds[i]["videoInfo"]["videoCdnInfo"]:
+                        video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["ctnInfo"]["urlInfo"][0]["url"]
+                    else:
+                        video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["urlInfo"][0]["url"]
+                    video_dict = {
+                        "video_title": video_title,
+                        "video_id": feeds[i].get("videoId", ""),
+                        "play_cnt": feeds[i].get("playCount", 0),
+                        "like_cnt": feeds[i].get("liked_cnt", 0),
+                        "comment_cnt": feeds[i].get("comment_cnt", 0),
+                        "share_cnt": feeds[i].get("shared_cnt", 0),
+                        "duration": feeds[i].get("mediaDuration", 0),
+                        "video_width": feeds[i].get("short_video_info", {}).get("width", 0),
+                        "video_height": feeds[i].get("short_video_info", {}).get("height", 0),
+                        "publish_time_stamp": publish_time_stamp,
+                        "publish_time_str": publish_time_str,
+                        "user_name": feeds[i].get("source", "").strip().replace("\n", ""),
+                        "user_id": feeds[i].get("openid", ""),
+                        "avatar_url": feeds[i].get("bizIcon", ""),
+                        "cover_url": feeds[i].get("thumbUrl", ""),
+                        "video_url": video_url,
+                        "session": session,
+                    }
+                    for k, v in video_dict.items():
+                        Common.logger(log_type, crawler).info(f"{k}:{v}")
+                    Common.logging(log_type, crawler, env, f"video_dict:{video_dict}")
+
+                    if video_dict["video_id"] == "" or video_dict["video_title"] == "" or video_dict["video_url"] == "":
+                        Common.logger(log_type, crawler).info("无效视频\n")
+                        Common.logging(log_type, crawler, env, "无效视频\n")
+                    elif download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict) is False:
+                        Common.logger(log_type, crawler).info("不满足抓取规则\n")
+                        Common.logging(log_type, crawler, env, "不满足抓取规则\n")
+                    elif any(str(word) if str(word) in video_dict["video_title"] else False
+                             for word in get_config_from_mysql(log_type=log_type,
+                                                               source=crawler,
+                                                               env=env,
+                                                               text="filter",
+                                                               action="")) is True:
+                        Common.logger(log_type, crawler).info('已中过滤词\n')
+                        Common.logging(log_type, crawler, env, '已中过滤词\n')
+                    elif cls.repeat_video(log_type, crawler, video_dict["video_id"], env) != 0:
+                        Common.logger(log_type, crawler).info('视频已下载\n')
+                        Common.logging(log_type, crawler, env, '视频已下载\n')
+                    else:
+                        video_dict["out_user_id"] = video_dict["user_id"]
+                        video_dict["platform"] = crawler
+                        video_dict["strategy"] = log_type
+                        video_dict["out_video_id"] = video_dict["video_id"]
+                        video_dict["width"] = video_dict["video_width"]
+                        video_dict["height"] = video_dict["video_height"]
+                        video_dict["crawler_rule"] = json.dumps(rule_dict)
+                        video_dict["user_id"] = our_uid
+                        video_dict["publish_time"] = video_dict["publish_time_str"]
+
+                        mq.send_msg(video_dict)
+                except Exception as e:
+                    Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n")
+                    Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n")
+        except Exception as e:
+            Common.logger(log_type, crawler).error(f"抓取列表页时异常:{e}\n")
+            Common.logging(log_type, crawler, env, f"抓取列表页时异常:{e}\n")
+
+
+if __name__ == "__main__":
+    # print(get_config_from_mysql(log_type="recommend",
+    #                             source="kanyikan",
+    #                             env="dev",
+    #                             text="filter",
+    #                             action=""))
+    KanyikanRecommend.get_session(log_type="recommend", crawler="kanyikan", env="dev")
+    pass

+ 191 - 0
kanyikan/kanyikan_recommend/kykln_recommend.py

@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/6/21
+"""
+看一看推荐
+207 服务器抓取模式
+微信 session 从爬虫平台获取
+"""
+
+import json
+import os
+import random
+import sys
+import time
+import requests
+import urllib3
+sys.path.append(os.getcwd())
+from common.mq import MQ
+from common.common import Common
+from common.scheduling_db import MysqlHelper
+from common.public import get_config_from_mysql, download_rule
+proxies = {"http": None, "https": None}
+
+
+class KanyikanRecommend:
+    platform = "看一看"
+    strategy = "推荐抓取策略"
+
+    @classmethod
+    def get_session(cls, log_type, crawler, env):
+        session_sql = """ SELECT * FROM crawler_config WHERE `source` in ('kanyikan', '看一看'); """
+        session_response = MysqlHelper.get_values(log_type, crawler, session_sql, env, action="")
+        for config in session_response:
+            if "token" in config["config"] and config["title"] == "看一看老年":
+                token_str = config["config"]
+                token = json.loads(token_str)["token"]
+                return token
+        return None
+
+    @classmethod
+    def repeat_video(cls, log_type, crawler, video_id, env):
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and create_time>='2023-08-15' and out_video_id="{video_id}"; """
+        repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
+        return len(repeat_video)
+
+    @classmethod
+    def get_videoList(cls, log_type, crawler, our_uid, rule_dict, env):
+        mq = MQ(topic_name="topic_crawler_etl_" + env)
+        try:
+            Common.logger(log_type, crawler).info(f"正在抓取列表页")
+            Common.logging(log_type, crawler, env, f"正在抓取列表页")
+            session = cls.get_session(log_type, crawler, env)
+            Common.logger(log_type, crawler).info(f"session:{session}")
+            if session is None:
+                Common.logger(log_type, crawler).info("session is None!")
+                time.sleep(1)
+                cls.get_videoList(log_type, crawler, our_uid, rule_dict, env)
+            url = 'https://search.weixin.qq.com/cgi-bin/recwxa/recwxavideolist?'
+            header = {
+                "Connection": "keep-alive",
+                "content-type": "application/json",
+                "Accept-Encoding": "gzip,compress,br,deflate",
+                "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X) "
+                              "AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.18(0x18001236) "
+                              "NetType/WIFI Language/zh_CN",
+                "Referer": "https://servicewechat.com/wxbb9a805eb4f9533c/234/page-frame.html",
+            }
+            params = {
+                'session': session,
+                "offset": 0,
+                "wxaVersion": "3.9.2",
+                "count": "10",
+                "channelid": "208",
+                "scene": '310',
+                "subscene": '1089',
+                "clientVersion": '8.0.18',
+                "sharesearchid": '0',
+                "nettype": 'wifi',
+                "switchprofile": "0",
+                "switchnewuser": "0",
+            }
+            urllib3.disable_warnings()
+            response = requests.get(url=url, headers=header, params=params, proxies=proxies, verify=False)
+            if "data" not in response.text:
+                Common.logger(log_type, crawler).info("获取视频list时,session过期,随机睡眠 31-50 秒")
+                Common.logging(log_type, crawler, env, "获取视频list时,session过期,随机睡眠 31-50 秒")
+                # 如果返回空信息,则随机睡眠 31-40 秒
+                time.sleep(random.randint(31, 40))
+                cls.get_videoList(log_type, crawler, our_uid, rule_dict, env)
+            elif "items" not in response.json()["data"]:
+                Common.logger(log_type, crawler).info(f"get_feeds:{response.json()},随机睡眠 1-3 分钟")
+                Common.logging(log_type, crawler, env, f"get_feeds:{response.json()},随机睡眠 1-3 分钟")
+                # 如果返回空信息,则随机睡眠 1-3 分钟
+                time.sleep(random.randint(60, 180))
+                cls.get_videoList(log_type, crawler, our_uid, rule_dict, env)
+            feeds = response.json().get("data", {}).get("items", "")
+            if feeds == "":
+                Common.logger(log_type, crawler).info(f"feeds:{feeds}")
+                Common.logging(log_type, crawler, env, f"feeds:{feeds}")
+                return
+            for i in range(len(feeds)):
+                try:
+                    video_title = feeds[i].get("title", "").strip().replace("\n", "") \
+                        .replace("/", "").replace("\\", "").replace("\r", "") \
+                        .replace(":", "").replace("*", "").replace("?", "") \
+                        .replace("?", "").replace('"', "").replace("<", "") \
+                        .replace(">", "").replace("|", "").replace(" ", "") \
+                        .replace("&NBSP", "").replace(".", "。").replace(" ", "") \
+                        .replace("'", "").replace("#", "").replace("Merge", "")
+                    publish_time_stamp = feeds[i].get("date", 0)
+                    publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp))
+                    # 获取播放地址
+                    if "videoInfo" not in feeds[i]:
+                        video_url = ""
+                    elif "mpInfo" in feeds[i]["videoInfo"]["videoCdnInfo"]:
+                        if len(feeds[i]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"]) > 2:
+                            video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"][2]["url"]
+                        else:
+                            video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["mpInfo"]["urlInfo"][0]["url"]
+                    elif "ctnInfo" in feeds[i]["videoInfo"]["videoCdnInfo"]:
+                        video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["ctnInfo"]["urlInfo"][0]["url"]
+                    else:
+                        video_url = feeds[i]["videoInfo"]["videoCdnInfo"]["urlInfo"][0]["url"]
+                    video_dict = {
+                        "video_title": video_title,
+                        "video_id": feeds[i].get("videoId", ""),
+                        "play_cnt": feeds[i].get("playCount", 0),
+                        "like_cnt": feeds[i].get("liked_cnt", 0),
+                        "comment_cnt": feeds[i].get("comment_cnt", 0),
+                        "share_cnt": feeds[i].get("shared_cnt", 0),
+                        "duration": feeds[i].get("mediaDuration", 0),
+                        "video_width": feeds[i].get("short_video_info", {}).get("width", 0),
+                        "video_height": feeds[i].get("short_video_info", {}).get("height", 0),
+                        "publish_time_stamp": publish_time_stamp,
+                        "publish_time_str": publish_time_str,
+                        "user_name": feeds[i].get("source", "").strip().replace("\n", ""),
+                        "user_id": feeds[i].get("openid", ""),
+                        "avatar_url": feeds[i].get("bizIcon", ""),
+                        "cover_url": feeds[i].get("thumbUrl", ""),
+                        "video_url": video_url,
+                        "session": session,
+                    }
+                    for k, v in video_dict.items():
+                        Common.logger(log_type, crawler).info(f"{k}:{v}")
+                    Common.logging(log_type, crawler, env, f"video_dict:{video_dict}")
+
+                    if video_dict["video_id"] == "" or video_dict["video_title"] == "" or video_dict["video_url"] == "":
+                        Common.logger(log_type, crawler).info("无效视频\n")
+                        Common.logging(log_type, crawler, env, "无效视频\n")
+                    elif download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict, rule_dict=rule_dict) is False:
+                        Common.logger(log_type, crawler).info("不满足抓取规则\n")
+                        Common.logging(log_type, crawler, env, "不满足抓取规则\n")
+                    elif any(str(word) if str(word) in video_dict["video_title"] else False
+                             for word in get_config_from_mysql(log_type=log_type,
+                                                               source=crawler,
+                                                               env=env,
+                                                               text="filter",
+                                                               action="")) is True:
+                        Common.logger(log_type, crawler).info('已中过滤词\n')
+                        Common.logging(log_type, crawler, env, '已中过滤词\n')
+                    elif cls.repeat_video(log_type, crawler, video_dict["video_id"], env) != 0:
+                        Common.logger(log_type, crawler).info('视频已下载\n')
+                        Common.logging(log_type, crawler, env, '视频已下载\n')
+                    else:
+                        video_dict["out_user_id"] = video_dict["user_id"]
+                        video_dict["platform"] = crawler
+                        video_dict["strategy"] = log_type
+                        video_dict["out_video_id"] = video_dict["video_id"]
+                        video_dict["width"] = video_dict["video_width"]
+                        video_dict["height"] = video_dict["video_height"]
+                        video_dict["crawler_rule"] = json.dumps(rule_dict)
+                        video_dict["user_id"] = our_uid
+                        video_dict["publish_time"] = video_dict["publish_time_str"]
+
+                        mq.send_msg(video_dict)
+                except Exception as e:
+                    Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n")
+                    Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n")
+        except Exception as e:
+            Common.logger(log_type, crawler).error(f"抓取列表页时异常:{e}\n")
+            Common.logging(log_type, crawler, env, f"抓取列表页时异常:{e}\n")
+
+
+if __name__ == "__main__":
+    # print(get_config_from_mysql(log_type="recommend",
+    #                             source="kanyikan",
+    #                             env="dev",
+    #                             text="filter",
+    #                             action=""))
+    KanyikanRecommend.get_session(log_type="recommend", crawler="kanyikan", env="dev")
+    pass

+ 3 - 0
kanyikan/kanyikan_session/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/9/25

+ 110 - 0
kanyikan/kanyikan_session/kanyikan_session.py

@@ -0,0 +1,110 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/9/25
+import argparse
+import json
+import os
+import sys
+import time
+
+sys.path.append(os.getcwd())
+from common.common import Common
+from common.scheduling_db import MysqlHelper
+
+
+class KanyikanSession:
+    chls_file_path = "./kanyikan/chlsfiles/"
+
+    @classmethod
+    def get_session(cls, log_type, crawler, env):
+        while True:
+            all_files = os.listdir(cls.chls_file_path)
+            chls_files = []
+            for chls_file in all_files:
+                if "charles" in os.path.splitext(chls_file)[0]:
+                    chls_files.append(chls_file)
+
+            if len(chls_files) < 1:
+                Common.logger(log_type, crawler).info("未找到chlsfile文件,等待60s重试")
+                Common.logging(log_type, crawler, env, "未找到chlsfile文件,等待60s重试")
+                time.sleep(60)
+                continue
+
+            chls_file = sorted(chls_files)[-1]
+            Common.logger(log_type, crawler).info(f"chls_file:{chls_file}")
+            chls_file_name = os.path.splitext(chls_file)[0]
+            # 重命名文件后缀
+            os.rename(os.path.join(cls.chls_file_path, chls_file),
+                      os.path.join(cls.chls_file_path, f"{chls_file_name}.txt"))
+
+            with open(os.path.join(cls.chls_file_path, f"{chls_file_name}.txt"), encoding='utf-8-sig',
+                      errors='ignore') as f:
+                contents = json.load(f, strict=False)
+
+            kanyikan_request_list = []
+            for content in contents:
+                if "search.weixin.qq.com" in content["host"]:
+                    kanyikan_request_list.append(content)
+            if len(kanyikan_request_list) == 0:
+                Common.logger(log_type, crawler).info("chlsfile文件中未找到:search.weixin.qq.com,等待60s重试")
+                Common.logging(log_type, crawler, env, "chlsfile文件中未找到:search.weixin.qq.com,等待60s重试")
+                time.sleep(60)
+                continue
+
+            for kanyikan_request in kanyikan_request_list:
+                if kanyikan_request["path"] == "/cgi-bin/recwxa/recwxagetunreadmessagecnt":
+                    Common.logger(log_type, crawler).info(f'query:{kanyikan_request["query"]}\n')
+                    sessions = kanyikan_request["query"].split("session=")[-1].split("&")[0]
+                    Common.logger(log_type, crawler).info(f"sessions:{sessions}\n")
+                    for session in sessions:
+                        if any(keyword in session for keyword in
+                               ["vid", "offset", "wxaVersion", "limit", "scene", "count", "channelid", "subscene",
+                                "clientVersion", "sharesearchid", "nettype", "switchprofile", "switchnewuser"]):
+                            session = session.split("&")[0]
+                            return session
+                        return sessions
+
+    @classmethod
+    def del_chls_file(cls, log_type, crawler):
+        all_file = sorted(os.listdir(cls.chls_file_path))
+        for file in all_file:
+            os.remove(os.path.join(cls.chls_file_path, file))
+        Common.logger(log_type, crawler).info("删除 charles 缓存文件成功\n")
+
+    @classmethod
+    def save_session(cls, log_type, crawler, env, kanyikan_type):
+        session = cls.get_session(log_type, crawler, env)
+        Common.logger(log_type, crawler).info(session)
+        if kanyikan_type == "kyk":
+            update_sql = f""" UPDATE crawler_config SET config = JSON_SET (config, "$.token", "{session}"), update_time={int(time.time()*1000)} WHERE title="看一看推荐"; """
+            MysqlHelper.update_values(log_type, crawler, update_sql, env, action="")
+        elif kanyikan_type == "kykjk":
+            update_sql = f""" UPDATE crawler_config SET config = JSON_SET (config, "$.token", "{session}"), update_time={int(time.time()*1000)} WHERE title="看一看健康"; """
+            MysqlHelper.update_values(log_type, crawler, update_sql, env, action="")
+        elif kanyikan_type == "kykln":
+            update_sql = f""" UPDATE crawler_config SET config = JSON_SET (config, "$.token", "{session}"), update_time={int(time.time()*1000)} WHERE title="看一看老年"; """
+            MysqlHelper.update_values(log_type, crawler, update_sql, env, action="")
+        Common.logger(log_type, crawler).info("session 更新数据库成功")
+        cls.del_chls_file(log_type, crawler)
+
+    @classmethod
+    def main(cls, log_type, crawler, env, kanyikan_type):
+        Common.logger(log_type, crawler).info(f'开始抓取:看一看 session\n')
+        Common.logging(log_type, crawler, env, f'开始抓取:看一看 session\n')
+        cls.save_session(log_type, crawler, env, kanyikan_type)
+        Common.del_logs(log_type, crawler)
+        Common.logger(log_type, crawler).info('抓取一轮结束\n')
+        Common.logging(log_type, crawler, env, '抓取一轮结束\n')
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
+    parser.add_argument('--crawler')  ## 添加参数
+    parser.add_argument('--kanyikan_type')  ## 添加参数
+    parser.add_argument('--env')  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    KanyikanSession.main(log_type=args.log_type,
+                         crawler=args.crawler,
+                         kanyikan_type=args.kanyikan_type,
+                         env=args.env)