Browse Source

Merge branch 'wangkun'

add zhufuquanzi
wangkun 1 year ago
parent
commit
864a3b360c

+ 11 - 2
README.MD

@@ -64,6 +64,9 @@ ps aux | grep run_youtube | grep -v grep | awk '{print $2}' | xargs kill -9
 
 
 #### 微信指数
 #### 微信指数
 ```commandline
 ```commandline
+# 微信指数,Mac Air
+00 11 * * * cd ~ && source ./base_profile && ps aux | grep weixinzhishu | grep -v grep | awk '{print $2}' | xargs kill -9 && cd /Users/piaoquan/Desktop/piaoquan_crawler && nohup python3 -u weixinzhishu/weixinzhishu_key/search_key_mac.py >> weixinzhishu/logs/nohup-search-key.log 2>&1 &
+
 获取站外标题, crontab定时脚本, 每天 12:00:00 点运行一次
 获取站外标题, crontab定时脚本, 每天 12:00:00 点运行一次
 00 12 * * * cd /data5/piaoquan_crawler/ && /root/anaconda3/bin/python weixinzhishu/weixinzhishu_main/run_weixinzhishu_hot_search.py >>weixinzhishu/logs/nohup-hot-search.log 2>&1 &
 00 12 * * * cd /data5/piaoquan_crawler/ && /root/anaconda3/bin/python weixinzhishu/weixinzhishu_main/run_weixinzhishu_hot_search.py >>weixinzhishu/logs/nohup-hot-search.log 2>&1 &
 获取站外热词微信指数, crontab定时脚本, 每天 12:30:00 点运行一次
 获取站外热词微信指数, crontab定时脚本, 每天 12:30:00 点运行一次
@@ -86,12 +89,15 @@ ps aux | grep 微信 | grep -v grep | awk '{print $2}' | xargs kill -9
 
 
 ```
 ```
 
 
-#### 线下爬虫: 刚刚都传 / 吉祥幸福 / 知青天天看 / 众妙音信 / wechat_search_key / start_appium
+#### 线下爬虫: 刚刚都传 / 吉祥幸福 / 知青天天看 / 众妙音信 / wechat_search_key / start_appium / 祝福圈子
 ```commandline
 ```commandline
-MacAir 设备, crontab定时任务
+# 线下爬虫调度,每分钟检测线下爬虫进程状态
 * * * * * /bin/sh /Users/piaoquan/Desktop/piaoquan_crawler/main/process_offline.sh "prod"
 * * * * * /bin/sh /Users/piaoquan/Desktop/piaoquan_crawler/main/process_offline.sh "prod"
+# 启动并检测Appium进程状态
 * * * * * /bin/sh /Users/piaoquan/Desktop/piaoquan_crawler/main/start_appium.sh "recommend" "jixiangxingfu" "prod"
 * * * * * /bin/sh /Users/piaoquan/Desktop/piaoquan_crawler/main/start_appium.sh "recommend" "jixiangxingfu" "prod"
+
 线下调试
 线下调试
+ps aux | grep Appium.app | grep -v grep | awk '{print $2}' | xargs kill -9 && nohup /opt/homebrew/bin/node /Applications/Appium.app/Contents/Resources/app/node_modules/appium/build/lib/main.js >>/Users/wangkun/Desktop/logs/nohup.log 2>&1 &
 sh /Users/wangkun/Desktop/crawler/piaoquan_crawler/main/process_offline.sh "dev"
 sh /Users/wangkun/Desktop/crawler/piaoquan_crawler/main/process_offline.sh "dev"
 cd /Users/piaoquan/Desktop/piaoquan_crawler/ && nohup python3 -u weixinzhishu/weixinzhishu_key/search_key_mac.py >> weixinzhishu/logs/nohup-search-key.log 2>&1 &
 cd /Users/piaoquan/Desktop/piaoquan_crawler/ && nohup python3 -u weixinzhishu/weixinzhishu_key/search_key_mac.py >> weixinzhishu/logs/nohup-search-key.log 2>&1 &
 检测进程
 检测进程
@@ -183,6 +189,9 @@ ps aux | grep run_monitor | grep -v grep | awk '{print $2}' | xargs kill -9
 * * * * * /bin/sh /Users/lieyunye/Desktop/crawler/piaoquan_crawler/main/process_mq.sh "sph" "shipinhao" "recommend" "prod"
 * * * * * /bin/sh /Users/lieyunye/Desktop/crawler/piaoquan_crawler/main/process_mq.sh "sph" "shipinhao" "recommend" "prod"
 # 视频号搜索
 # 视频号搜索
 * * * * * /bin/sh /Users/piaoquan/Desktop/piaoquan_crawler/main/process_mq.sh "sph" "shipinhao" "search" "prod"
 * * * * * /bin/sh /Users/piaoquan/Desktop/piaoquan_crawler/main/process_mq.sh "sph" "shipinhao" "search" "prod"
+# 祝福圈子
+* * * * * /bin/sh /Users/piaoquan/Desktop/piaoquan_crawler/main/process_mq.sh "zfqz" "zhufuquanzi" "recommend" "prod"
+
 
 
 杀进程
 杀进程
 ps aux | grep suisuiniannianyingfuqi | grep -v grep | awk '{print $2}' | xargs kill -9
 ps aux | grep suisuiniannianyingfuqi | grep -v grep | awk '{print $2}' | xargs kill -9

+ 1 - 1
kanyikan/kanyikan_recommend/kanyikan_recommend0627.py

@@ -291,7 +291,7 @@ class KanyikanRecommend:
 if __name__ == "__main__":
 if __name__ == "__main__":
     print(get_config_from_mysql(log_type="recommend",
     print(get_config_from_mysql(log_type="recommend",
                                 source="kanyikan",
                                 source="kanyikan",
-                                env="dev",
+                                env="prod",
                                 text="filter",
                                 text="filter",
                                 action=""))
                                 action=""))
     pass
     pass

+ 5 - 0
main/process_mq.sh

@@ -10,6 +10,11 @@ if [ ${env} = "dev" ];then
   profile_path=/etc/profile
   profile_path=/etc/profile
   python=python3
   python=python3
   log_path=${piaoquan_crawler_dir}main/main_logs/process-mq-$(date +%Y-%m-%d).log
   log_path=${piaoquan_crawler_dir}main/main_logs/process-mq-$(date +%Y-%m-%d).log
+elif [ ${crawler} = "zfqz" ];then
+  piaoquan_crawler_dir=/Users/piaoquan/Desktop/piaoquan_crawler/
+  profile_path=./base_profile
+  python=python3
+  log_path=${piaoquan_crawler_dir}main/main_logs/process-mq-$(date +%Y-%m-%d).log
 elif [ ${env} = "hk" ];then
 elif [ ${env} = "hk" ];then
   piaoquan_crawler_dir=/root/piaoquan_crawler/
   piaoquan_crawler_dir=/root/piaoquan_crawler/
   profile_path=/etc/profile
   profile_path=/etc/profile

+ 29 - 4
main/process_offline.sh

@@ -30,11 +30,12 @@ else
 fi
 fi
 
 
 # 知青天天看
 # 知青天天看
-if [[ "$time" > "00:00:00" ]] && [[ "$time" < "08:59:59" ]]; then
+if [[ "$time" > "00:00:00" ]] && [[ "$time" < "02:59:59" ]]; then
   echo "$(date "+%Y-%m-%d %H:%M:%S") 开始启动 知青天天看 爬虫脚本任务" >> ${log_path}
   echo "$(date "+%Y-%m-%d %H:%M:%S") 开始启动 知青天天看 爬虫脚本任务" >> ${log_path}
   ps aux | grep run_zhongmiaoyinxin | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_zhongmiaoyinxin | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_ganggangdouchuan | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_ganggangdouchuan | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_jixiangxingfu | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_jixiangxingfu | grep -v grep | awk '{print $2}' | xargs kill -9
+  ps aux | grep run_zfqz | grep -v grep | awk '{print $2}' | xargs kill -9
   ps -ef | grep "run_zhiqingtiantiankan_recommend.py" | grep -v "grep"
   ps -ef | grep "run_zhiqingtiantiankan_recommend.py" | grep -v "grep"
   if [ "$?" -eq 1 ];then
   if [ "$?" -eq 1 ];then
     echo "$(date "+%Y-%m-%d %H:%M:%S") 知青天天看小程序爬虫, 异常停止, 正在重启!" >> ${log_path}
     echo "$(date "+%Y-%m-%d %H:%M:%S") 知青天天看小程序爬虫, 异常停止, 正在重启!" >> ${log_path}
@@ -49,11 +50,12 @@ else
 fi
 fi
 
 
 # 刚刚都传
 # 刚刚都传
-if [[ "$time" > "09:00:00" ]] && [[ "$time" < "12:59:59" ]]; then
+if [[ "$time" > "03:00:00" ]] && [[ "$time" < "05:59:59" ]]; then
   echo "$(date "+%Y-%m-%d %H:%M:%S") 开始启动 刚刚都传 爬虫脚本任务" >> ${log_path}
   echo "$(date "+%Y-%m-%d %H:%M:%S") 开始启动 刚刚都传 爬虫脚本任务" >> ${log_path}
   ps aux | grep run_zhongmiaoyinxin | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_zhongmiaoyinxin | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_zhiqingtiantiankan | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_zhiqingtiantiankan | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_jixiangxingfu | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_jixiangxingfu | grep -v grep | awk '{print $2}' | xargs kill -9
+  ps aux | grep run_zfqz | grep -v grep | awk '{print $2}' | xargs kill -9
   ps -ef | grep "run_ganggangdouchuan_recommend.py" | grep -v "grep"
   ps -ef | grep "run_ganggangdouchuan_recommend.py" | grep -v "grep"
   if [ "$?" -eq 1 ];then
   if [ "$?" -eq 1 ];then
     echo "$(date "+%Y-%m-%d %H:%M:%S") 刚刚都传小程序爬虫, 异常停止, 正在重启!" >> ${log_path}
     echo "$(date "+%Y-%m-%d %H:%M:%S") 刚刚都传小程序爬虫, 异常停止, 正在重启!" >> ${log_path}
@@ -68,11 +70,12 @@ else
 fi
 fi
 
 
 # 吉祥幸福
 # 吉祥幸福
-if [[ "$time" > "13:00:00" ]] && [[ "$time" < "16:59:59" ]]; then
+if [[ "$time" > "06:00:00" ]] && [[ "$time" < "08:59:59" ]]; then
   echo "$(date "+%Y-%m-%d %H:%M:%S") 开始启动 吉祥幸福 爬虫脚本任务" >> ${log_path}
   echo "$(date "+%Y-%m-%d %H:%M:%S") 开始启动 吉祥幸福 爬虫脚本任务" >> ${log_path}
   ps aux | grep run_zhongmiaoyinxin | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_zhongmiaoyinxin | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_zhiqingtiantiankan | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_zhiqingtiantiankan | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_ganggangdouchuan | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_ganggangdouchuan | grep -v grep | awk '{print $2}' | xargs kill -9
+  ps aux | grep run_zfqz | grep -v grep | awk '{print $2}' | xargs kill -9
   ps -ef | grep "run_jixiangxingfu_recommend.py" | grep -v "grep"
   ps -ef | grep "run_jixiangxingfu_recommend.py" | grep -v "grep"
   if [ "$?" -eq 1 ];then
   if [ "$?" -eq 1 ];then
     echo "$(date "+%Y-%m-%d %H:%M:%S") 吉祥幸福爬虫, 异常停止, 正在重启!" >> ${log_path}
     echo "$(date "+%Y-%m-%d %H:%M:%S") 吉祥幸福爬虫, 异常停止, 正在重启!" >> ${log_path}
@@ -87,11 +90,12 @@ else
 fi
 fi
 
 
 # 众妙音信
 # 众妙音信
-if [[ "$time" > "17:00:00" ]] && [[ "$time" < "23:59:59" ]]; then
+if [[ "$time" > "09:00:00" ]] && [[ "$time" < "11:59:59" ]]; then
   echo "$(date "+%Y-%m-%d %H:%M:%S") 开始启动 众妙音信 爬虫脚本任务" >> ${log_path}
   echo "$(date "+%Y-%m-%d %H:%M:%S") 开始启动 众妙音信 爬虫脚本任务" >> ${log_path}
   ps aux | grep run_ganggangdouchuan | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_ganggangdouchuan | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_zhiqingtiantiankan | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_zhiqingtiantiankan | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_jixiangxingfu | grep -v grep | awk '{print $2}' | xargs kill -9
   ps aux | grep run_jixiangxingfu | grep -v grep | awk '{print $2}' | xargs kill -9
+  ps aux | grep run_zfqz | grep -v grep | awk '{print $2}' | xargs kill -9
   ps -ef | grep "run_zhongmiaoyinxin_recommend.py" | grep -v "grep"
   ps -ef | grep "run_zhongmiaoyinxin_recommend.py" | grep -v "grep"
   if [ "$?" -eq 1 ];then
   if [ "$?" -eq 1 ];then
     echo "$(date "+%Y-%m-%d %H:%M:%S") 众妙音信小程序爬虫, 异常停止, 正在重启!" >> ${log_path}
     echo "$(date "+%Y-%m-%d %H:%M:%S") 众妙音信小程序爬虫, 异常停止, 正在重启!" >> ${log_path}
@@ -106,6 +110,27 @@ else
   echo "$(date "+%Y-%m-%d %H:%M:%S") 众妙音信 爬虫脚本任务结束" >> ${log_path}
   echo "$(date "+%Y-%m-%d %H:%M:%S") 众妙音信 爬虫脚本任务结束" >> ${log_path}
 fi
 fi
 
 
+# 祝福圈子
+if [[ "$time" > "12:00:00" ]] && [[ "$time" < "23:59:59" ]]; then
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 开始启动 祝福圈子 爬虫脚本任务" >> ${log_path}
+  ps aux | grep run_zhongmiaoyinxin | grep -v grep | awk '{print $2}' | xargs kill -9
+  ps aux | grep run_ganggangdouchuan | grep -v grep | awk '{print $2}' | xargs kill -9
+  ps aux | grep run_zhiqingtiantiankan | grep -v grep | awk '{print $2}' | xargs kill -9
+  ps aux | grep run_jixiangxingfu | grep -v grep | awk '{print $2}' | xargs kill -9
+  ps -ef | grep "run_zfqz_recommend.py" | grep -v "grep"
+  if [ "$?" -eq 1 ];then
+    echo "$(date "+%Y-%m-%d %H:%M:%S") 祝福圈子小程序爬虫, 异常停止, 正在重启!" >> ${log_path}
+    cd ${piaoquan_crawler_dir}
+    /bin/sh ${piaoquan_crawler_dir}main/process_mq.sh "zfqz" "zhufuquanzi" "recommend" ${env}
+    echo "$(date "+%Y-%m-%d %H:%M:%S") 重启完成!" >> ${log_path}
+  else
+    echo "$(date "+%Y-%m-%d %H:%M:%S") 祝福圈子小程序爬虫, 进程状态正常" >> ${log_path}
+  fi
+
+else
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 祝福圈子 爬虫脚本任务结束" >> ${log_path}
+fi
+
 # 删除日志
 # 删除日志
 echo "$(date "+%Y-%m-%d %H:%M:%S") 开始清理 10 天前的日志文件" >> ${log_path}
 echo "$(date "+%Y-%m-%d %H:%M:%S") 开始清理 10 天前的日志文件" >> ${log_path}
 find ${piaoquan_crawler_dir}main/main_logs/ -mtime +10 -name "*.log" -exec rm -rf {} \;
 find ${piaoquan_crawler_dir}main/main_logs/ -mtime +10 -name "*.log" -exec rm -rf {} \;

+ 0 - 3
zhiqingtiantiankan/logs/__init__.py

@@ -1,3 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Author: wangkun
-# @Time: 2023/4/18

+ 1 - 1
zhiqingtiantiankan/zhiqingtiantiankan_recommend/zhiqingtiantiankan_recommend.py

@@ -52,7 +52,7 @@ class ZhiqingtiantiankanRecommend:
         try:
         try:
             Common.logger(log_type, crawler).info('启动微信')
             Common.logger(log_type, crawler).info('启动微信')
             if env == "dev":
             if env == "dev":
-                chromedriverExecutable = '/Users/wangkun/Downloads/chromedriver/chromedriver_v107/chromedriver'
+                chromedriverExecutable = '/Users/wangkun/Downloads/chromedriver/chromedriver_v111/chromedriver'
                 # chromedriverExecutable = 'C:\\chromedriver\\chromedriver.exe'  # 阿里云 Windows
                 # chromedriverExecutable = 'C:\\chromedriver\\chromedriver.exe'  # 阿里云 Windows
             else:
             else:
                 chromedriverExecutable = '/Users/piaoquan/Downloads/chromedriver'  # Mac 爬虫机器
                 chromedriverExecutable = '/Users/piaoquan/Downloads/chromedriver'  # Mac 爬虫机器

+ 0 - 3
zhongmiaoyinxin/logs/__init__.py

@@ -1,3 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Author: wangkun
-# @Time: 2023/4/18

+ 1 - 1
zhongmiaoyinxin/zhongmiaoyinxin_recommend/zhongmiaoyinxin_recommend.py

@@ -51,7 +51,7 @@ class ZhongmiaoyinxinRecommend:
     def start_wechat(cls, log_type, crawler, env):
     def start_wechat(cls, log_type, crawler, env):
         try:
         try:
             if env == "dev":
             if env == "dev":
-                chromedriverExecutable = "/Users/wangkun/Downloads/chromedriver/chromedriver_v107/chromedriver"
+                chromedriverExecutable = "/Users/wangkun/Downloads/chromedriver/chromedriver_v111/chromedriver"
                 # chromedriverExecutable = 'C:\\chromedriver\\chromedriver.exe'  # 阿里云 Windows
                 # chromedriverExecutable = 'C:\\chromedriver\\chromedriver.exe'  # 阿里云 Windows
             else:
             else:
                 chromedriverExecutable = '/Users/piaoquan/Downloads/chromedriver'  # Mac 爬虫机器
                 chromedriverExecutable = '/Users/piaoquan/Downloads/chromedriver'  # Mac 爬虫机器

+ 12 - 11
zhufuquanzi/zhufuquanzi_main/run_zfqz_dev.py

@@ -1,31 +1,32 @@
 # -*- coding: utf-8 -*-
 # -*- coding: utf-8 -*-
-# @Author: wangkun
+# @Author: wang
 # @Time: 2023/9/6
 # @Time: 2023/9/6
 import os
 import os
 import sys
 import sys
 
 
 sys.path.append(os.getcwd())
 sys.path.append(os.getcwd())
 from common.common import Common
 from common.common import Common
-from zhufuquanzi.zhufuquanzi_recommend.zhufuquanzi_recommend import ZhufuquanziRecommend
+from zhufuquanzi.zhufuquanzi_recommend.zhufuquanzi_recommend import ZFQZRecommend
 
 
 
 
-class ZhufuquanziRecommendMain:
+class ZFQZRecommendMain:
     @classmethod
     @classmethod
     def zhufuquanzi_recommend_main(cls, log_type, crawler, env):
     def zhufuquanzi_recommend_main(cls, log_type, crawler, env):
         Common.logger(log_type, crawler).info('开始抓取"祝福圈子"推荐')
         Common.logger(log_type, crawler).info('开始抓取"祝福圈子"推荐')
         Common.logging(log_type, crawler, env, '开始抓取"祝福圈子"推荐')
         Common.logging(log_type, crawler, env, '开始抓取"祝福圈子"推荐')
         rule_dict = {"period": {"min": 365, "max": 365},
         rule_dict = {"period": {"min": 365, "max": 365},
-                     "duration": {"min": 30, "max": 1800},
+                     "duration": {"min": 40, "max": 2400},
-                     "favorite_cnt": {"min": 5000, "max": 0},
+                     "play_cnt": {"min": 100000, "max": 0},
                      "videos_cnt": {"min": 10, "max": 20},
                      "videos_cnt": {"min": 10, "max": 20},
-                     "share_cnt": {"min": 1000, "max": 0}}
+                     "like_cnt": {"min": 1000, "max": 0}}
-        ZhufuquanziRecommend.start_wechat(log_type=log_type,
+        ZFQZRecommend.start_wechat(log_type=log_type,
-                                          crawler=crawler,
+                                   crawler=crawler,
-                                          rule_dict=rule_dict,
+                                   rule_dict=rule_dict,
-                                          env=env)
+                                   our_uid=6267141,
+                                   env=env)
         Common.logger(log_type, crawler).info("抓取一轮结束\n")
         Common.logger(log_type, crawler).info("抓取一轮结束\n")
         Common.logging(log_type, crawler, env, "抓取一轮结束\n")
         Common.logging(log_type, crawler, env, "抓取一轮结束\n")
 
 
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
-    ZhufuquanziRecommendMain.zhufuquanzi_recommend_main("recommend", "zhufuquanzi", "dev")
+    ZFQZRecommendMain.zhufuquanzi_recommend_main("recommend", "zhufuquanzi", "dev")

+ 116 - 0
zhufuquanzi/zhufuquanzi_main/run_zfqz_recommend.py

@@ -0,0 +1,116 @@
+# -*- coding: utf-8 -*-
+# @Author: wang
+# @Time: 2023/9/7
+import argparse
+import random
+
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+sys.path.append(os.getcwd())
+from common.common import Common
+from common.public import get_consumer, ack_message, task_fun_mq
+from common.scheduling_db import MysqlHelper
+from zhufuquanzi.zhufuquanzi_recommend.zhufuquanzi_recommend import ZFQZRecommend
+
+
+class ZFQZMain:
+    @classmethod
+    def zhufuquanzi_main(cls, log_type, crawler, topic_name, group_id, env):
+        consumer = get_consumer(topic_name, group_id)
+        # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+        # 长轮询时间3秒(最多可设置为30秒)。
+        wait_seconds = 30
+        # 一次最多消费3条(最多可设置为16条)。
+        batch = 1
+        Common.logger(log_type, crawler).info(f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                                              f'WaitSeconds:{wait_seconds}\n'
+                                              f'TopicName:{topic_name}\n'
+                                              f'MQConsumer:{group_id}')
+        Common.logging(log_type, crawler, env, f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                                               f'WaitSeconds:{wait_seconds}\n'
+                                               f'TopicName:{topic_name}\n'
+                                               f'MQConsumer:{group_id}')
+        while True:
+            try:
+                # 长轮询消费消息。
+                recv_msgs = consumer.consume_message(batch, wait_seconds)
+                for msg in recv_msgs:
+                    Common.logger(log_type, crawler).info(f"Receive\n"
+                                                          f"MessageId:{msg.message_id}\n"
+                                                          f"MessageBodyMD5:{msg.message_body_md5}\n"
+                                                          f"MessageTag:{msg.message_tag}\n"
+                                                          f"ConsumedTimes:{msg.consumed_times}\n"
+                                                          f"PublishTime:{msg.publish_time}\n"
+                                                          f"Body:{msg.message_body}\n"
+                                                          f"NextConsumeTime:{msg.next_consume_time}\n"
+                                                          f"ReceiptHandle:{msg.receipt_handle}\n"
+                                                          f"Properties:{msg.properties}")
+                    Common.logging(log_type, crawler, env, f"Receive\n"
+                                                           f"MessageId:{msg.message_id}\n"
+                                                           f"MessageBodyMD5:{msg.message_body_md5}\n"
+                                                           f"MessageTag:{msg.message_tag}\n"
+                                                           f"ConsumedTimes:{msg.consumed_times}\n"
+                                                           f"PublishTime:{msg.publish_time}\n"
+                                                           f"Body:{msg.message_body}\n"
+                                                           f"NextConsumeTime:{msg.next_consume_time}\n"
+                                                           f"ReceiptHandle:{msg.receipt_handle}\n"
+                                                           f"Properties:{msg.properties}")
+                    # ack_mq_message
+                    ack_message(log_type=log_type, crawler=crawler, recv_msgs=recv_msgs, consumer=consumer)
+
+                    # 处理爬虫业务
+                    task_dict = task_fun_mq(msg.message_body)['task_dict']
+                    rule_dict = task_fun_mq(msg.message_body)['rule_dict']
+                    task_id = task_dict['id']
+                    select_user_sql = f"""select * from crawler_user_v3 where task_id={task_id}"""
+                    user_list = MysqlHelper.get_values(log_type, crawler, select_user_sql, env, action="")
+                    our_uid_list = []
+                    for user in user_list:
+                        our_uid_list.append(user["uid"])
+                    our_uid = random.choice(our_uid_list)
+                    Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
+                    Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
+                    Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}")
+                    Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}")
+                    Common.logger(log_type, crawler).info(f"用户列表:{user_list}\n")
+                    Common.logging(log_type, crawler, env, f"用户列表:{user_list}\n")
+                    Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
+                    Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n')
+
+                    # 抓取符合规则的视频列表
+                    ZFQZRecommend.start_wechat(log_type=log_type,
+                                               crawler=crawler,
+                                               rule_dict=rule_dict,
+                                               our_uid=our_uid,
+                                               env=env)
+                    Common.logger(log_type, crawler).info('抓取一轮结束\n')
+                    Common.logging(log_type, crawler, env, '抓取一轮结束\n')
+
+            except MQExceptionBase as err:
+                # Topic中没有消息可消费。
+                if err.type == "MessageNotExist":
+                    Common.logger(log_type, crawler).info(f"No new message! RequestId:{err.req_id}\n")
+                    Common.logging(log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n")
+                    continue
+
+                Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n")
+                Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n")
+                time.sleep(2)
+                continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
+    parser.add_argument('--crawler')  ## 添加参数
+    parser.add_argument('--topic_name')  ## 添加参数
+    parser.add_argument('--group_id')  ## 添加参数
+    parser.add_argument('--env')  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    ZFQZMain.zhufuquanzi_main(log_type=args.log_type,
+                              crawler=args.crawler,
+                              topic_name=args.topic_name,
+                              group_id=args.group_id,
+                              env=args.env)

+ 125 - 22
zhufuquanzi/zhufuquanzi_recommend/zhufuquanzi_recommend.py

@@ -1,26 +1,32 @@
 # -*- coding: utf-8 -*-
 # -*- coding: utf-8 -*-
-# @Author: wangkun
+# @Author: wang
 # @Time: 2023/9/6
 # @Time: 2023/9/6
+import json
 import os
 import os
 import sys
 import sys
 import time
 import time
+from hashlib import md5
 
 
 from appium import webdriver
 from appium import webdriver
+from appium.webdriver.extensions.android.nativekey import AndroidKey
 from appium.webdriver.webdriver import WebDriver
 from appium.webdriver.webdriver import WebDriver
 from selenium.common import NoSuchElementException
 from selenium.common import NoSuchElementException
 from selenium.webdriver.common.by import By
 from selenium.webdriver.common.by import By
 
 
 sys.path.append(os.getcwd())
 sys.path.append(os.getcwd())
 from common.common import Common
 from common.common import Common
+from common.mq import MQ
+from common.public import download_rule, get_config_from_mysql
+from common.scheduling_db import MysqlHelper
 
 
 
 
-class ZhufuquanziRecommend:
+class ZFQZRecommend:
     platform = "祝福圈子"
     platform = "祝福圈子"
     download_cnt = 0
     download_cnt = 0
     i = 0
     i = 0
 
 
     @classmethod
     @classmethod
-    def start_wechat(cls, log_type, crawler, env, rule_dict):
+    def start_wechat(cls, log_type, crawler, env, rule_dict, our_uid):
         if env == "dev":
         if env == "dev":
             chromedriverExecutable = "/Users/wangkun/Downloads/chromedriver/chromedriver_v111/chromedriver"
             chromedriverExecutable = "/Users/wangkun/Downloads/chromedriver/chromedriver_v111/chromedriver"
         else:
         else:
@@ -36,16 +42,16 @@ class ZhufuquanziRecommend:
             "appPackage": "com.tencent.mm",
             "appPackage": "com.tencent.mm",
             "appActivity": ".ui.LauncherUI",
             "appActivity": ".ui.LauncherUI",
             "autoGrantPermissions": "true",
             "autoGrantPermissions": "true",
-            "unicodekeyboard": True,
-            "resetkeyboard": True,
             "noReset": True,
             "noReset": True,
-            "printPageSourceOnFailure": True,
+            "resetkeyboard": True,
-            "newCommandTimeout": 6000,
+            "unicodekeyboard": True,
-            "aotomationName": "UiAutomator2",
             "showChromedriverLog": True,
             "showChromedriverLog": True,
+            "printPageSourceOnFailure": True,
+            "recreateChromeDriverSessions": True,
             "enableWebviewDetailsCollection": True,
             "enableWebviewDetailsCollection": True,
             "setWebContentsDebuggingEnabled": True,
             "setWebContentsDebuggingEnabled": True,
-            "recreateChromeDriverSessions": True,
+            "newCommandTimeout": 6000,
+            "automationName": "UiAutomator2",
             "chromedriverExecutable": chromedriverExecutable,
             "chromedriverExecutable": chromedriverExecutable,
             "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"},
             "chromeOptions": {"androidProcess": "com.tencent.mm:appbrand0"},
         }
         }
@@ -78,7 +84,7 @@ class ZhufuquanziRecommend:
         driver.find_elements(By.XPATH, '//*[@text="祝福圈子"]')[-1].click()
         driver.find_elements(By.XPATH, '//*[@text="祝福圈子"]')[-1].click()
         time.sleep(10)
         time.sleep(10)
 
 
-        cls.get_videoList(log_type, crawler, driver, env, rule_dict)
+        cls.get_videoList(log_type, crawler, driver, env, rule_dict, our_uid)
 
 
         time.sleep(3)
         time.sleep(3)
         driver.quit()
         driver.quit()
@@ -98,12 +104,12 @@ class ZhufuquanziRecommend:
                 pass
                 pass
 
 
     @classmethod
     @classmethod
-    def check_to_appplet(cls, log_type, crawler, env, driver: WebDriver, xpath):
+    def check_to_applet(cls, log_type, crawler, env, driver: WebDriver, xpath):
         time.sleep(1)
         time.sleep(1)
-        webviews = driver.contexts
+        webViews = driver.contexts
-        Common.logger(log_type, crawler).info(f"webviews:{webviews}")
+        Common.logger(log_type, crawler).info(f"webViews:{webViews}")
-        Common.logging(log_type, crawler, env, f"webviews:{webviews}")
+        Common.logging(log_type, crawler, env, f"webViews:{webViews}")
-        driver.switch_to.context(webviews[1])
+        driver.switch_to.context(webViews[1])
         windowHandles = driver.window_handles
         windowHandles = driver.window_handles
         for handle in windowHandles:
         for handle in windowHandles:
             driver.switch_to.window(handle)
             driver.switch_to.window(handle)
@@ -117,9 +123,16 @@ class ZhufuquanziRecommend:
                 time.sleep(1)
                 time.sleep(1)
 
 
     @classmethod
     @classmethod
-    def get_videoList(cls, log_type, crawler, driver: WebDriver, env, rule_dict):
+    def repeat_video(cls, log_type, crawler, video_id, env):
+        sql = f""" select * from crawler_video where platform in ("{crawler}","{cls.platform}") and out_video_id="{video_id}"; """
+        repeat_video = MysqlHelper.get_values(log_type, crawler, sql, env)
+        return len(repeat_video)
+
+    @classmethod
+    def get_videoList(cls, log_type, crawler, driver: WebDriver, env, rule_dict, our_uid):
+        mq = MQ(topic_name="topic_crawler_etl_" + env)
         driver.implicitly_wait(20)
         driver.implicitly_wait(20)
-        cls.check_to_appplet(log_type=log_type,
+        cls.check_to_applet(log_type=log_type,
                              crawler=crawler,
                              crawler=crawler,
                              env=env,
                              env=env,
                              driver=driver,
                              driver=driver,
@@ -148,23 +161,113 @@ class ZhufuquanziRecommend:
                 return
                 return
 
 
             for i, video_element in enumerate(video_list):
             for i, video_element in enumerate(video_list):
-                if cls.download_cnt >= int(rule_dict.get("videos_cnt", {}).get("min", 20)):
+                if cls.download_cnt >= int(rule_dict.get("videos_cnt", {}).get("min", 10)):
                     Common.logger(log_type, crawler).info(f"本轮已抓取视频数:{cls.download_cnt}")
                     Common.logger(log_type, crawler).info(f"本轮已抓取视频数:{cls.download_cnt}")
                     Common.logging(log_type, crawler, env, f"本轮已抓取视频数:{cls.download_cnt}")
                     Common.logging(log_type, crawler, env, f"本轮已抓取视频数:{cls.download_cnt}")
                     return
                     return
 
 
                 if video_element is None:
                 if video_element is None:
-                    Common.logger(log_type, crawler).info("没有更多数据啦\n")
+                    Common.logger(log_type, crawler).info("没有更多数据啦~\n")
-                    Common.logging(log_type, crawler, env, "没有更多数据啦\n")
+                    Common.logging(log_type, crawler, env, "没有更多数据啦~\n")
                     return
                     return
 
 
                 cls.i += 1
                 cls.i += 1
+                cls.search_elements(driver, '//*[@is="pages/discover/components/bless/dynamic/dynamic"]')
                 Common.logger(log_type, crawler).info(f"拖动第{cls.i}条视频至屏幕中间")
                 Common.logger(log_type, crawler).info(f"拖动第{cls.i}条视频至屏幕中间")
                 Common.logging(log_type, crawler, env, f"拖动第{cls.i}条视频至屏幕中间")
                 Common.logging(log_type, crawler, env, f"拖动第{cls.i}条视频至屏幕中间")
                 time.sleep(3)
                 time.sleep(3)
                 driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'})",
                 driver.execute_script("arguments[0].scrollIntoView({block:'center',inline:'center'})",
                                       video_element)
                                       video_element)
-                cls.download_cnt += 1
+
+                video_title = video_element.find_elements(By.XPATH, '//*[@class="dynamic--title"]')[index+i].text
+                play_cnt_str = video_element.find_elements(By.XPATH, '//*[@class="dynamic--views"]')[index + i].text
+                duration_str = video_element.find_elements(By.XPATH, '//*[@class="dynamic--duration"]')[index + i].text
+                user_name = video_element.find_elements(By.XPATH, '//*[@class="dynamic--nick-top"]')[index + i].text
+                like_cnt_str = video_element.find_elements(By.XPATH, '//*[@class="dynamic--commerce"]/*[1]/*[2]')[index + i].text
+                comment_cnt_str = video_element.find_elements(By.XPATH, '//*[@class="dynamic--commerce"]/*[2]/*[2]')[index + i].text
+                cover_url = video_element.find_elements(By.XPATH, '//*[@class="dynamic--bg-image"]')[index+i].get_attribute('src')
+                avatar_url = video_element.find_elements(By.XPATH, '//*[@class="avatar--avatar"]')[index+i].get_attribute('src')
+
+                play_cnt = int(play_cnt_str.replace("+", "").replace("次播放", ""))
+                duration = int(duration_str.split(":")[0].strip())*60 + int(duration_str.split(":")[-1].strip())
+                if "点赞" in like_cnt_str:
+                    like_cnt = 0
+                elif "万" in like_cnt_str:
+                    like_cnt = int(like_cnt_str.split("万")[0])*10000
+                else:
+                    like_cnt = int(like_cnt_str)
+                if "评论" in comment_cnt_str:
+                    comment_cnt = 0
+                elif "万" in comment_cnt_str:
+                    comment_cnt = int(comment_cnt_str.split("万")[0])*10000
+                else:
+                    comment_cnt = int(comment_cnt_str)
+                out_video_id = md5(video_title.encode('utf8')).hexdigest()
+                out_user_id = md5(user_name.encode('utf8')).hexdigest()
+
+                video_dict = {
+                    "video_title": video_title,
+                    "video_id": out_video_id,
+                    "duration": duration,
+                    "play_cnt": play_cnt,
+                    "like_cnt": like_cnt,
+                    "comment_cnt": comment_cnt,
+                    "share_cnt": 0,
+                    "user_name": user_name,
+                    "user_id": out_user_id,
+                    'publish_time_stamp': int(time.time()),
+                    'publish_time_str': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))),
+                    "avatar_url": avatar_url,
+                    "cover_url": cover_url,
+                    "session": f"zhufuquanzi-{int(time.time())}"
+                }
+                for k, v in video_dict.items():
+                    Common.logger(log_type, crawler).info(f"{k}:{v}")
+                Common.logging(log_type, crawler, env, f"video_dict:{video_dict}")
+
+                if video_title is None or cover_url is None:
+                    Common.logger(log_type, crawler).info("无效视频\n")
+                    Common.logging(log_type, crawler, env, '无效视频\n')
+                elif download_rule(log_type=log_type, crawler=crawler, video_dict=video_dict,
+                                   rule_dict=rule_dict) is False:
+                    Common.logger(log_type, crawler).info("不满足抓取规则\n")
+                    Common.logging(log_type, crawler, env, "不满足抓取规则\n")
+                elif any(str(word) if str(word) in video_dict["video_title"] else False
+                         for word in get_config_from_mysql(log_type=log_type,
+                                                           source=crawler,
+                                                           env=env,
+                                                           text="filter",
+                                                           action="")) is True:
+                    Common.logger(log_type, crawler).info('已中过滤词\n')
+                    Common.logging(log_type, crawler, env, '已中过滤词\n')
+                elif cls.repeat_video(log_type, crawler, out_video_id, env) != 0:
+                    Common.logger(log_type, crawler).info('视频已下载\n')
+                    Common.logging(log_type, crawler, env, '视频已下载\n')
+                else:
+                    video_element.click()
+                    time.sleep(3)
+                    video_url_elements = cls.search_elements(driver, '//*[@class="index--video-item index--video"]')
+                    if video_url_elements is None or len(video_url_elements) == 0:
+                        Common.logger(log_type, crawler).info("未获取到视频播放地址\n")
+                        Common.logging(log_type, crawler, env, "未获取到视频播放地址\n")
+                        driver.press_keycode(AndroidKey.BACK)
+                    else:
+                        video_url = video_url_elements[0].get_attribute("src")
+                        video_dict["video_url"] = video_url
+                        Common.logger(log_type, crawler).info(f"video_url:{video_url}")
+
+                        video_dict["platform"] = crawler
+                        video_dict["strategy"] = log_type
+                        video_dict["out_video_id"] = video_dict["video_id"]
+                        video_dict["crawler_rule"] = json.dumps(rule_dict)
+                        video_dict["user_id"] = our_uid
+                        video_dict["publish_time"] = video_dict["publish_time_str"]
+                        mq.send_msg(video_dict)
+                        cls.download_cnt += 1
+                        driver.press_keycode(AndroidKey.BACK)
+                        Common.logger(log_type, crawler).info("符合抓取条件,mq send msg 成功\n")
+                        Common.logging(log_type, crawler, env, "符合抓取条件,ACK MQ 成功\n")
+
 
 
             Common.logger(log_type, crawler).info("已抓取完一组,休眠 10 秒\n")
             Common.logger(log_type, crawler).info("已抓取完一组,休眠 10 秒\n")
             Common.logging(log_type, crawler, env, "已抓取完一组,休眠 10 秒\n")
             Common.logging(log_type, crawler, env, "已抓取完一组,休眠 10 秒\n")
@@ -178,4 +281,4 @@ if __name__ == "__main__":
                  "favorite_cnt": {"min": 5000, "max": 0},
                  "favorite_cnt": {"min": 5000, "max": 0},
                  "videos_cnt": {"min": 10, "max": 20},
                  "videos_cnt": {"min": 10, "max": 20},
                  "share_cnt": {"min": 1000, "max": 0}}
                  "share_cnt": {"min": 1000, "max": 0}}
-    ZhufuquanziRecommend.start_wechat("recommend", "zhufuquanzi", "dev", rule_dict1)
+    ZFQZRecommend.start_wechat("recommend", "zhufuquanzi", "dev", rule_dict1, 6267141)