wangkun 2 年之前
父节点
当前提交
c6ab37a24f

二进制
.DS_Store


+ 80 - 0
main/schedeling_main.sh

@@ -0,0 +1,80 @@
+#!/bin/bash
+# 看一看+小程序 朋友圈榜单
+# sh ./weixinzhishu_main/weixinzhishu_main.sh ./kanyikan/kanyikan_main/run_kanyikan_moment.py --log_type="moment" --crawler="kanyikan" --strategy="kanyikan_moment" --our_uid="kanyikan_moment" --oss_endpoint="out" --env="dev" ./kanyikan/nohup.log local
+
+crawler_dir=$1  # 爬虫执行路径,如: ./youtube/youtube_main/run_youtube_follow.py
+log_type=$2     # 日志命名格式,如: follow,则在 youtube/logs/目录下,生成 2023-02-08-follow.log
+crawler=$3      # 哪款爬虫,如: youtube / kanyikan / weixinzhishu
+strategy=$4     # 爬虫策略,如: 定向爬虫策略 / 小时榜爬虫策略 / 热榜爬虫策略
+oss_endpoint=$5 # OSS网关,内网: inner / 外网: out / 香港: hk
+env=$6          # 爬虫运行环境,正式环境: prod / 测试环境: dev
+machine=$7      # 爬虫运行机器,阿里云服务器: aliyun_hk / aliyun / macpro / macair / local
+nohup_dir=$8    # nohup日志存储路径,如: ./youtube/nohup.log
+
+echo "开始"
+
+if [ ${machine} = "--machine=macpro" ];then
+  piaoquan_crawler_dir=/Users/lieyunye/Desktop/piaoquan_crawler/
+  profile_path=.bash_profile
+  node_path=/usr/local/bin/node
+  python=python3
+elif [ ${machine} = "--machine=macair" ];then
+  piaoquan_crawler_dir=/Users/piaoquan/Desktop/piaoquan_crawler/
+  profile_path=./base_profile
+  node_path=/usr/local/bin/node
+  python=python3
+elif [ ${machine} = "--machine=aliyun_hk" ];then
+  piaoquan_crawler_dir=/root/piaoquan_crawler/
+  profile_path=/etc/profile
+  python=python3
+elif [ ${machine} = "--machine=aliyun" ];then
+  piaoquan_crawler_dir=/data5/piaoquan_crawler/
+  profile_path=/etc/profile
+  python=python
+elif [ ${machine} = "--machine=local" ];then
+  piaoquan_crawler_dir=/Users/wangkun/Desktop/crawler/piaoquan_crawler/
+  profile_path=/etc/profile
+  node_path=/opt/homebrew/bin/node
+  python=python3
+fi
+
+echo "$(date "+%Y-%m-%d %H:%M:%S") 更新环境变量..."
+cd ~ && source ${profile_path}
+echo "$(date "+%Y-%m-%d %H:%M:%S") 更新环境变量完成!"
+
+echo "$(date "+%Y-%m-%d %H:%M:%S") 正在杀进程..."
+grep_str=run_${crawler##*=}
+ps aux | grep ${grep_str} | grep Python | grep -v grep | awk '{print $2}' | xargs kill -9
+echo "$(date "+%Y-%m-%d %H:%M:%S") 进程已杀死!"
+
+if [ ${machine} = "--machine=aliyun_hk" ];then
+  echo "升级yt-dlp"
+  pip3 install yt-dlp -U
+else
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 正在更新代码..."
+  cd ${piaoquan_crawler_dir} && git pull origin master --force && rm -f ${piaoquan_crawler_dir}main/nohup.log && rm -f ${piaoquan_crawler_dir}${nohup_dir}
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 代码更新完成!"
+fi
+
+if [ ${machine} = "--machine=aliyun_hk" ];then
+  echo "无需重启Appium及adb服务"
+elif [ ${machine} = "--machine=aliyun" ];then
+  echo "无需重启Appium及adb服务"
+else
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 正在重启Appium..."
+  ps aux | grep Appium.app | grep -v grep | awk '{print $2}' | xargs kill -9
+  nohup ${node_path} /Applications/Appium.app/Contents/Resources/app/node_modules/appium/build/lib/main.js >>./nohup.log 2>&1 &
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 重启Appium完毕!"
+
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 正在重启adb..."
+  adb kill-server
+  adb start-server
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 重启adb完毕!"
+fi
+
+echo "$(date "+%Y-%m-%d %H:%M:%S") 正在重启服务..."
+cd ${piaoquan_crawler_dir}
+nohup ${python} -u ${crawler_dir} ${log_type} ${crawler} ${strategy} ${oss_endpoint} ${env} ${machine} >>${nohup_dir} 2>&1 &
+echo "$(date "+%Y-%m-%d %H:%M:%S") 服务重启完毕!"
+
+exit 0

+ 33 - 23
scheduling/crawler_scheduling.py

@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # @Author: wangkun
 # @Time: 2023/3/2
+import argparse
 import os
 import sys
 import time
@@ -13,7 +14,7 @@ class Scheduling:
     # 任务列表
     task_list = []
 
-    # 读取任务表
+    # 读取 / 更新任务表
     @classmethod
     def get_task(cls, log_type, crawler, env, machine):
         get_sql = """ select * from crawler_task_1 """
@@ -62,13 +63,26 @@ class Scheduling:
                     "update_time": update_time,
                 }
                 pre_task_list.append(task_dict)
+            if interval_piaoquan > 0:
+                new_next_time = next_time + interval_piaoquan
+                update_sql = f""" UPDATE crawler_task_1 SET next_time={new_next_time} WHERE task_id={task_id} """
+                MysqlHelper.update_values(log_type, crawler, update_sql, env, machine)
 
         return pre_task_list
 
+    'sh ./main/main.sh' \
+    ' ./xigua/xigua_main/run_xigua_follow.py' \
+    ' --log_type="follow"' \
+    ' --crawler="xigua"' \
+    ' --strategy="定向爬虫策略"' \
+    ' --oss_endpoint="inner"' \
+    ' --env="prod"' \
+    ' --machine="aliyun"' \
+    ' xigua/nohup.log'
 
-    # 组装任务
+    # 资源分配 / 组装 / 调度任务
     @classmethod
-    def update_task(cls, log_type, crawler, env, machine):
+    def main(cls, log_type, crawler, strategy, oss_endpoint, env, machine):
         pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env, machine=machine)
         if len(pre_task_list) == 0:
             Common.logger(log_type, crawler).info("暂无新任务\n")
@@ -79,26 +93,22 @@ class Scheduling:
                 next_time = pre_task_list[i]["next_time"]
                 interval_piaoquan = pre_task_list[i]["interval_piaoquan"]
                 spider_rule = pre_task_list[i]["spider_rule"]
-                print(f"task_id:{task_id}")
-                print(f"task_name:{task_name}")
-                print(f"next_time:{next_time}")
-                print(f"interval_piaoquan:{interval_piaoquan}")
-                print(f"spider_rule:{spider_rule}\n")
-
-    # 资源分配
-    @classmethod
-    def resource_allocation(cls, log_type, crawler, env, machine):
-        'sh ./main/main.sh ./xigua/xigua_main/run_xigua_follow.py --log_type="follow" --crawler="xigua" --strategy="定向爬虫策略" --oss_endpoint="inner" --env="prod" --machine="aliyun" xigua/nohup.log'
-        pass
-
-    # 写入任务队列
-    @classmethod
-    def write_to_queue(cls):
-        pass
 
 
 if __name__ == "__main__":
-    # task_list = Scheduling.get_task("Scheduling", "scheduling", "dev", "local")
-    # print(task_list)
-    Scheduling.update_task("Scheduling", "scheduling", "dev", "local")
-    pass
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
+    parser.add_argument('--crawler')  ## 添加参数
+    parser.add_argument('--strategy')  ## 添加参数
+    parser.add_argument('--our_uid')  ## 添加参数
+    parser.add_argument('--oss_endpoint')  ## 添加参数
+    parser.add_argument('--env')  ## 添加参数
+    parser.add_argument('--machine')  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    # print(args)
+    Scheduling.main(log_type=args.log_type,
+                    crawler=args.crawler,
+                    strategy=args.strategy,
+                    oss_endpoint=args.oss_endpoint,
+                    env=args.env,
+                    machine=args.machine)

+ 1 - 1
weixinzhishu/weixinzhishu_main/get_weixinzhishu.py

@@ -240,6 +240,6 @@ class Weixinzhishu:
 
 
 if __name__ == "__main__":
-    Weixinzhishu.get_score_test('weixin', 'weixinzhishu', 1 , "必要")
+    Weixinzhishu.get_score_test('weixin', 'weixinzhishu', 1 , "根本")
 
     pass

+ 147 - 0
weixinzhishu/weixinzhishu_main/weixinzhishu_inner_long.py

@@ -0,0 +1,147 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/2/28
+import json
+import os
+import sys
+import time
+from datetime import date, timedelta
+import requests
+sys.path.append(os.getcwd())
+from common.common import Common
+from common.feishu import Feishu
+
+
+class Test:
+    # 获取微信 key / openid
+    @classmethod
+    def get_wechat_key(cls, log_type, crawler):
+        """
+        获取微信 key / openid
+        https://w42nne6hzg.feishu.cn/sheets/shtcnqhMRUGunIfGnGXMOBYiy4K?sheet=sVL74k
+        :param log_type: 日志名
+        :param crawler: 哪款爬虫,填写:weixinzhishu
+        :return: search_key, openid
+        """
+        try:
+            # while True:
+            sheet = Feishu.get_values_batch(log_type, crawler, 'sVL74k')
+                # if sheet is None:
+                #     Common.logger(log_type, crawler).warning(f"获取热词sheet:{sheet} ,10秒钟后重试")
+                #     time.sleep(10)
+                # else:
+                #     break
+            for i in range(len(sheet)):
+                search_key = sheet[1][1]
+                openid = sheet[1][2]
+                return search_key, openid
+        except Exception as e:
+            Common.logger(log_type, crawler).error(f"wechat_key:{e}\n")
+
+    @classmethod
+    def get_words(cls, log_type, crawler):
+        try:
+            while True:
+                sheet = Feishu.get_values_batch(log_type, crawler, 'VoqDih')
+                if sheet is None:
+                    Common.logger(log_type, crawler).warning(f"获取热词sheet:{sheet} ,10秒钟后重试")
+                    time.sleep(10)
+                else:
+                    break
+            word_list = []
+            for x in sheet:
+                for y in x:
+                    if y is None:
+                        pass
+                    else:
+                        word_list.append(y)
+            return word_list
+        except Exception as e:
+            Common.logger(log_type, crawler).error(f"get_words:{e}\n")
+
+    @classmethod
+    def get_score_test(cls, log_type, crawler):
+
+        start_ymd = (date.today() + timedelta(days=-7)).strftime("%Y%m%d")
+        end_ymd = (date.today() + timedelta(days=0)).strftime("%Y%m%d")
+
+        word_list = cls.get_words(log_type, crawler)
+        for i in range(len(word_list)):
+            Common.logger(log_type, crawler).info(f"热词: {word_list[i]}")
+            while True:
+                wechat_key = cls.get_wechat_key(log_type, crawler)
+                if wechat_key is None:
+                    Common.logger(log_type, crawler).info(
+                        f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(time.time())))} auth 过期,休眠 10 秒,重新获取")
+                    time.sleep(10)
+                    continue
+
+                search_key = wechat_key[0]
+                openid = wechat_key[-1]
+                url = "https://search.weixin.qq.com/cgi-bin/wxaweb/wxindex"
+                payload = json.dumps({
+                    "openid": openid,
+                    "search_key": search_key,
+                    "cgi_name": "GetDefaultIndex",
+                    "start_ymd": start_ymd,
+                    "end_ymd": end_ymd,
+                    "query": word_list[i]
+                })
+                headers = {
+                    'Host': 'search.weixin.qq.com',
+                    'content-type': 'application/json',
+                    'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.32(0x1800202a) NetType/WIFI Language/zh_CN',
+                    'Referer': 'https://servicewechat.com/wxc026e7662ec26a3a/42/page-frame.html'
+                }
+                response = requests.request("POST", url, headers=headers, data=payload)
+                if response.json()['code'] == -10000:
+                    Common.logger(log_type, crawler).info(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(time.time())))} response:{response.json()['msg']} 休眠 10 秒,重新获取")
+                    time.sleep(10)
+                    continue
+
+                wechat_score_list = []
+                word_wechat_score_dict = {
+                    "id": i+1,
+                    "word": word_list[i],
+                    "wechatScores": wechat_score_list,
+                }
+                if response.json()['code'] == -10002:
+                    Common.logger(log_type, crawler).info("该词暂未收录")
+                    # 写飞书
+                    if word_list[i] in [x for y in Feishu.get_values_batch(log_type, crawler, "zBXOUq") for x in y]:
+                        Common.logger(log_type, crawler).info("该词已存在")
+                        continue
+                    Feishu.insert_columns(log_type, crawler, "zBXOUq", "ROWS", 1, 2)
+                    time.sleep(0.5)
+                    Feishu.update_values(log_type, crawler, "zBXOUq", "F2:Z2",
+                                         [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))),
+                                           word_list[i],
+                                           "",
+                                           "该词暂未收录"]])
+                    Common.logger(log_type, crawler).info("写入飞书成功\n")
+                elif response.json()['code'] != 0:
+                    Common.logger(log_type, crawler).warning(f"{word_wechat_score_dict}")
+                    continue
+                else:
+                    time_index = response.json()['content']['resp_list'][0]['indexes'][0]['time_indexes']
+                    for x in range(len(time_index)):
+                        Common.logger(log_type, crawler).info(f"正在更新 {word_list[i]}")
+                        score_time = time_index[x]['time']
+                        score_time_str = f"{str(score_time)[:4]}-{str(score_time)[4:6]}-{str(score_time)[6:]}"
+                        score = time_index[x]['score']
+                        wechat_score_dict = {"score": score, "scoreDate": score_time_str}
+                        wechat_score_list.append(wechat_score_dict)
+                        Common.logger(log_type, crawler).info(f"wechat_score_dict:{wechat_score_dict}")
+                        Feishu.insert_columns(log_type, crawler, "zBXOUq", "ROWS", 1, 2)
+                        time.sleep(0.5)
+                        Feishu.update_values(log_type, crawler, "zBXOUq", "F2:Z2", [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))),
+                                                                               word_list[i],
+                                                                               score_time_str,
+                                                                               score]])
+                        Common.logger(log_type, crawler).info("写入飞书成功\n")
+                break
+
+
+if __name__ == "__main__":
+    Test.get_score_test("inner-long", "weixinzhishu")
+    pass

+ 147 - 0
weixinzhishu/weixinzhishu_main/weixinzhishu_inner_sort.py

@@ -0,0 +1,147 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/2/28
+import json
+import os
+import sys
+import time
+from datetime import date, timedelta
+import requests
+sys.path.append(os.getcwd())
+from common.common import Common
+from common.feishu import Feishu
+
+
+class Test:
+    # 获取微信 key / openid
+    @classmethod
+    def get_wechat_key(cls, log_type, crawler):
+        """
+        获取微信 key / openid
+        https://w42nne6hzg.feishu.cn/sheets/shtcnqhMRUGunIfGnGXMOBYiy4K?sheet=sVL74k
+        :param log_type: 日志名
+        :param crawler: 哪款爬虫,填写:weixinzhishu
+        :return: search_key, openid
+        """
+        try:
+            # while True:
+            sheet = Feishu.get_values_batch(log_type, crawler, 'sVL74k')
+                # if sheet is None:
+                #     Common.logger(log_type, crawler).warning(f"获取热词sheet:{sheet} ,10秒钟后重试")
+                #     time.sleep(10)
+                # else:
+                #     break
+            for i in range(len(sheet)):
+                search_key = sheet[1][1]
+                openid = sheet[1][2]
+                return search_key, openid
+        except Exception as e:
+            Common.logger(log_type, crawler).error(f"wechat_key:{e}\n")
+
+    @classmethod
+    def get_words(cls, log_type, crawler):
+        try:
+            while True:
+                sheet = Feishu.get_values_batch(log_type, crawler, 'SuhTA6')
+                if sheet is None:
+                    Common.logger(log_type, crawler).warning(f"获取热词sheet:{sheet} ,10秒钟后重试")
+                    time.sleep(10)
+                else:
+                    break
+            word_list = []
+            for x in sheet:
+                for y in x:
+                    if y is None:
+                        pass
+                    else:
+                        word_list.append(y)
+            return word_list
+        except Exception as e:
+            Common.logger(log_type, crawler).error(f"get_words:{e}\n")
+
+    @classmethod
+    def get_score_test(cls, log_type, crawler):
+
+        start_ymd = (date.today() + timedelta(days=-7)).strftime("%Y%m%d")
+        end_ymd = (date.today() + timedelta(days=0)).strftime("%Y%m%d")
+
+        word_list = cls.get_words(log_type, crawler)
+        for i in range(len(word_list)):
+            Common.logger(log_type, crawler).info(f"热词: {word_list[i]}")
+            while True:
+                wechat_key = cls.get_wechat_key(log_type, crawler)
+                if wechat_key is None:
+                    Common.logger(log_type, crawler).info(
+                        f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(time.time())))} auth 过期,休眠 10 秒,重新获取")
+                    time.sleep(10)
+                    continue
+
+                search_key = wechat_key[0]
+                openid = wechat_key[-1]
+                url = "https://search.weixin.qq.com/cgi-bin/wxaweb/wxindex"
+                payload = json.dumps({
+                    "openid": openid,
+                    "search_key": search_key,
+                    "cgi_name": "GetDefaultIndex",
+                    "start_ymd": start_ymd,
+                    "end_ymd": end_ymd,
+                    "query": word_list[i]
+                })
+                headers = {
+                    'Host': 'search.weixin.qq.com',
+                    'content-type': 'application/json',
+                    'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.32(0x1800202a) NetType/WIFI Language/zh_CN',
+                    'Referer': 'https://servicewechat.com/wxc026e7662ec26a3a/42/page-frame.html'
+                }
+                response = requests.request("POST", url, headers=headers, data=payload)
+                if response.json()['code'] == -10000:
+                    Common.logger(log_type, crawler).info(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(time.time())))} response:{response.json()['msg']} 休眠 10 秒,重新获取")
+                    time.sleep(10)
+                    continue
+
+                wechat_score_list = []
+                word_wechat_score_dict = {
+                    "id": i+1,
+                    "word": word_list[i],
+                    "wechatScores": wechat_score_list,
+                }
+                if response.json()['code'] == -10002:
+                    Common.logger(log_type, crawler).info("该词暂未收录")
+                    # 写飞书
+                    if word_list[i] in [x for y in Feishu.get_values_batch(log_type, crawler, "2fP99U") for x in y]:
+                        Common.logger(log_type, crawler).info("该词已存在")
+                        continue
+                    Feishu.insert_columns(log_type, crawler, "2fP99U", "ROWS", 1, 2)
+                    time.sleep(0.5)
+                    Feishu.update_values(log_type, crawler, "2fP99U", "F2:Z2",
+                                         [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))),
+                                           word_list[i],
+                                           "",
+                                           "该词暂未收录"]])
+                    Common.logger(log_type, crawler).info("写入飞书成功\n")
+                elif response.json()['code'] != 0:
+                    Common.logger(log_type, crawler).warning(f"{word_wechat_score_dict}")
+                    continue
+                else:
+                    time_index = response.json()['content']['resp_list'][0]['indexes'][0]['time_indexes']
+                    for x in range(len(time_index)):
+                        Common.logger(log_type, crawler).info(f"正在更新 {word_list[i]}")
+                        score_time = time_index[x]['time']
+                        score_time_str = f"{str(score_time)[:4]}-{str(score_time)[4:6]}-{str(score_time)[6:]}"
+                        score = time_index[x]['score']
+                        wechat_score_dict = {"score": score, "scoreDate": score_time_str}
+                        wechat_score_list.append(wechat_score_dict)
+                        Common.logger(log_type, crawler).info(f"wechat_score_dict:{wechat_score_dict}")
+                        Feishu.insert_columns(log_type, crawler, "2fP99U", "ROWS", 1, 2)
+                        time.sleep(0.5)
+                        Feishu.update_values(log_type, crawler, "2fP99U", "F2:Z2", [[time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))),
+                                                                               word_list[i],
+                                                                               score_time_str,
+                                                                               score]])
+                        Common.logger(log_type, crawler).info("写入飞书成功\n")
+                break
+
+
+if __name__ == "__main__":
+    Test.get_score_test("inner-sort", "weixinzhishu")
+    pass

+ 1 - 1
weixinzhishu/weixinzhishu_main/weixinzhishu_test.py → weixinzhishu/weixinzhishu_main/weixinzhishu_out.py

@@ -143,5 +143,5 @@ class Test:
 
 
 if __name__ == "__main__":
-    Test.get_score_test("test", "weixinzhishu")
+    Test.get_score_test("out", "weixinzhishu")
     pass