wangkun 2 tahun lalu
induk
melakukan
7f8b86d7a0

TEMPAT SAMPAH
.DS_Store


+ 36 - 2
README.MD

@@ -1,9 +1,43 @@
-# 爬虫平台
+# 爬虫调度系统
 
 ### 启动
 1. cd ./piaoquan_crawler
-2. sh ./main/main.sh ${crawler_dir} ${log_type} ${crawler} ${strategy} ${oss_endpoint} ${env} ${machine} ${nohup_dir} 
+2. sh ./main/scheduling_main.sh ${crawler_dir} ${log_type} ${crawler} ${env} ${machine} >>${nohup_dir} 2>&1 &
+```
+参数说明
+${crawler_dir}:     爬虫执行路径,如: ./youtube/youtube_main/run_youtube_follow.py
+${log_type}:        日志命名格式,如: follow,则在 youtube/logs/目录下,生成 2023-02-08-follow.log
+${crawler}:         哪款爬虫,如: youtube / kanyikan / weixinzhishu
+${env}:             爬虫运行环境,正式环境: prod / 测试环境: dev
+${machine}:         爬虫运行机器,阿里云服务器: aliyun_hk / aliyun / macpro / macair / local
+${nohup_dir}:       nohup日志存储路径,如: ./youtube/nohup.log
+```
+
+#### 运行命令
+```
+阿里云 102 服务器
+sh ./main/scheduling_main.sh scheduling/scheduling_main/run_write_task.py --log_type="scheduling-write" --crawler="scheduling" --env="prod" --machine="aliyun" shceduling/nohup-write.log 
+sh ./main/scheduling_main.sh scheduling/scheduling_main/run_scheduling_task.py --log_type="scheduling-task" --crawler="scheduling" --env="prod" --machine="aliyun" shceduling/nohup-write.log 
+
+香港服务器
+sh ./main/scheduling_main.sh scheduling/scheduling_main/run_write_task.py --log_type="scheduling-write" --crawler="scheduling" --env="prod" --machine="aliyun_hk" shceduling/nohup-write.log 
+sh ./main/scheduling_main.sh scheduling/scheduling_main/run_scheduling_task.py --log_type="scheduling-task" --crawler="scheduling" --env="prod" --machine="aliyun_hk" shceduling/nohup-task.log 
 
+线下调试
+sh ./main/scheduling_main.sh scheduling/scheduling_main/run_write_task.py --log_type="scheduling-write" --crawler="scheduling" --env="dev" --machine="local" ./scheduling/nohup-write.log 
+sh ./main/scheduling_main.sh scheduling/scheduling_main/run_scheduling_task.py --log_type="scheduling-task" --crawler="scheduling" --env="dev" --machine="local" ./scheduling/nohup-task.log 
+
+杀进程
+ps aux | grep scheduling
+ps aux | grep run_xigua
+ps aux | grep scheduling | grep -v grep | awk '{print $2}' | xargs kill -9
+```
+
+
+# 爬虫平台
+### 启动
+1. cd ./piaoquan_crawler
+2. sh ./main/main.sh ${crawler_dir} ${log_type} ${crawler} ${strategy} ${oss_endpoint} ${env} ${machine} ${nohup_dir}
 ```
 参数说明
 ${crawler_dir}:     爬虫执行路径,如: ./youtube/youtube_main/run_youtube_follow.py

+ 25 - 12
common/db.py

@@ -7,6 +7,7 @@
 import redis
 import pymysql
 from common.common import Common
+# from common import Common
 
 class MysqlHelper:
     @classmethod
@@ -95,8 +96,8 @@ class RedisHelper:
     def connect_redis(cls, env, machine):
         if machine == 'aliyun_hk':
             redis_pool = redis.ConnectionPool(
-                host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
-                # host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 外网地址
+                # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
+                host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 外网地址
                 port=6379,
                 db=2,
                 password='Qingqu2019'
@@ -104,8 +105,8 @@ class RedisHelper:
             redis_conn = redis.Redis(connection_pool=redis_pool)
         elif env == 'prod':
             redis_pool = redis.ConnectionPool(
-                host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
-                # host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 外网地址
+                host='r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com',  # 内网地址
+                # host='r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com',  # 外网地址
                 port=6379,
                 db=2,
                 password='Qingqu2019'
@@ -113,8 +114,8 @@ class RedisHelper:
             redis_conn = redis.Redis(connection_pool=redis_pool)
         else:
             redis_pool = redis.ConnectionPool(
-                host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
-                # host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 外网地址
+                # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
+                host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 外网地址
                 port=6379,
                 db=2,
                 password='Qingqu2019'
@@ -123,14 +124,19 @@ class RedisHelper:
         return redis_conn
 
     @classmethod
-    def redis_push(cls, env, machine, name, data):
+    def redis_push(cls, env, machine, data):
         redis_conn = cls.connect_redis(env, machine)
-        redis_conn.lpush(name, data)
+        # print("开始写入数据")
+        redis_conn.lpush(machine, data)
+        # print("数据写入完成")
 
     @classmethod
-    def redis_pop(cls, env, machine, name):
+    def redis_pop(cls, env, machine):
         redis_conn = cls.connect_redis(env, machine)
-        redis_conn.rpop(name)
+        if redis_conn.llen(machine) == 0:
+            return None
+        else:
+            return redis_conn.rpop(machine)
 
 
 
@@ -140,6 +146,13 @@ if __name__ == "__main__":
     # edit_data = MysqlHelper.edit_data(sql=sql_statement)
     # print(edit_data)
 
-    get_data = MysqlHelper.get_values("demo", "youtube", "select * from crawler_user", "dev", "local")
-    print(get_data)
+    # get_data = MysqlHelper.get_values("demo", "youtube", "select * from crawler_user", "dev", "local")
+    # print(get_data)
+    print(RedisHelper.connect_redis("prod", "aliyun"))
+    # RedisHelper.redis_push("dev", "local", "test1")
+    # RedisHelper.redis_push("dev", "local", "test2")
+
+    # print(RedisHelper.redis_pop("dev", "local"))
+
+    pass
 

+ 158 - 0
common/scheduling_db.py

@@ -0,0 +1,158 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/2/2
+"""
+数据库连接及操作
+"""
+import redis
+import pymysql
+from common.common import Common
+# from common import Common
+
+class MysqlHelper:
+    @classmethod
+    def connect_mysql(cls, env, machine):
+        if machine == 'aliyun_hk':
+            # 创建一个 Connection 对象,代表了一个数据库连接
+            connection = pymysql.connect(
+                host="rm-j6cz4c6pt96000xi3.mysql.rds.aliyuncs.com",# 数据库IP地址,内网地址
+                # host="rm-j6cz4c6pt96000xi3lo.mysql.rds.aliyuncs.com",# 数据库IP地址,外网地址
+                port=3306,                      # 端口号
+                user="crawler",                 #  mysql用户名
+                passwd="crawler123456@",        # mysql用户登录密码
+                db="piaoquan-crawler" ,         # 数据库名
+                # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+                charset = "utf8")
+        elif env == 'prod':
+            # 创建一个 Connection 对象,代表了一个数据库连接
+            connection = pymysql.connect(
+                host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",# 数据库IP地址,内网地址
+                # host="rm-bp1159bu17li9hi94ro.mysql.rds.aliyuncs.com",# 数据库IP地址,外网地址
+                port=3306,                      # 端口号
+                user="crawler",                 #  mysql用户名
+                passwd="crawler123456@",        # mysql用户登录密码
+                db="piaoquan-crawler" ,         # 数据库名
+                # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+                charset = "utf8")
+        else:
+            # 创建一个 Connection 对象,代表了一个数据库连接
+            connection = pymysql.connect(
+                host="rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com",# 数据库IP地址,内网地址
+                # host="rm-bp1k5853td1r25g3ndo.mysql.rds.aliyuncs.com",  # 数据库IP地址,外网地址
+                port=3306,  # 端口号
+                user="crawler",  # mysql用户名
+                passwd="crawler123456@",  # mysql用户登录密码
+                db="piaoquan-crawler",  # 数据库名
+                # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+                charset="utf8")
+
+        return connection
+
+    @classmethod
+    def get_values(cls, log_type, crawler, sql, env, machine):
+        try:
+            # 连接数据库
+            connect = cls.connect_mysql(env, machine)
+            # 返回一个 Cursor对象
+            mysql = connect.cursor(cursor=pymysql.cursors.DictCursor)
+
+            # 执行 sql 语句
+            mysql.execute(sql)
+
+            # fetchall方法返回的是一个元组,里面每个元素也是元组,代表一行记录
+            data = mysql.fetchall()
+
+            # 关闭数据库连接
+            connect.close()
+
+            # 返回查询结果,元组
+            return data
+        except Exception as e:
+            Common.logger(log_type, crawler).error(f"get_values异常:{e}\n")
+
+    @classmethod
+    def update_values(cls, log_type, crawler, sql, env, machine):
+        # 连接数据库
+        connect = cls.connect_mysql(env, machine)
+        # 返回一个 Cursor对象
+        mysql = connect.cursor()
+
+        try:
+            # 执行 sql 语句
+            res = mysql.execute(sql)
+            # 注意 一定要commit,否则添加数据不生效
+            connect.commit()
+            return res
+        except Exception as e:
+            Common.logger(log_type, crawler).error(f"update_values异常,进行回滚操作:{e}\n")
+            # 发生错误时回滚
+            connect.rollback()
+
+        # 关闭数据库连接
+        connect.close()
+
+class RedisHelper:
+    @classmethod
+    def connect_redis(cls, env, machine):
+        if machine == 'aliyun_hk':
+            redis_pool = redis.ConnectionPool(
+                # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
+                host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 外网地址
+                port=6379,
+                db=2,
+                password='Qingqu2019'
+            )
+            redis_conn = redis.Redis(connection_pool=redis_pool)
+        elif env == 'prod':
+            redis_pool = redis.ConnectionPool(
+                host='r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com',  # 内网地址
+                # host='r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com',  # 外网地址
+                port=6379,
+                db=2,
+                password='Qingqu2019'
+            )
+            redis_conn = redis.Redis(connection_pool=redis_pool)
+        else:
+            redis_pool = redis.ConnectionPool(
+                # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
+                host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 外网地址
+                port=6379,
+                db=2,
+                password='Qingqu2019'
+            )
+            redis_conn = redis.Redis(connection_pool=redis_pool)
+        return redis_conn
+
+    @classmethod
+    def redis_push(cls, env, machine, data):
+        redis_conn = cls.connect_redis(env, machine)
+        # print("开始写入数据")
+        redis_conn.lpush(machine, data)
+        # print("数据写入完成")
+
+    @classmethod
+    def redis_pop(cls, env, machine):
+        redis_conn = cls.connect_redis(env, machine)
+        if redis_conn.llen(machine) == 0:
+            return None
+        else:
+            return redis_conn.rpop(machine)
+
+
+
+if __name__ == "__main__":
+    # sql_statement = f"INSERT INTO crawler_user ( user_id, out_user_id, out_user_name, out_avatar_url, platform, tag) " \
+    #       f"VALUES ('6282398', 'out_uid_003', 'out_user_name', '', 'xiaoniangao', 'xiaoniangao_play')"
+    # edit_data = MysqlHelper.edit_data(sql=sql_statement)
+    # print(edit_data)
+
+    # get_data = MysqlHelper.get_values("demo", "youtube", "select * from crawler_user", "dev", "local")
+    # print(get_data)
+    print(RedisHelper.connect_redis("prod", "aliyun"))
+    # RedisHelper.redis_push("dev", "local", "test1")
+    # RedisHelper.redis_push("dev", "local", "test2")
+
+    # print(RedisHelper.redis_pop("dev", "local"))
+
+    pass
+

+ 66 - 0
main/scheduling_main.sh

@@ -0,0 +1,66 @@
+#!/bin/bash
+
+crawler_dir=$1  # 爬虫执行路径,如: ./youtube/youtube_main/run_youtube_follow.py
+log_type=$2     # 日志命名格式,如: follow,则在 youtube/logs/目录下,生成 2023-02-08-follow.log
+crawler=$3      # 哪款爬虫,如: youtube / kanyikan / weixinzhishu
+env=$4          # 爬虫运行环境,正式环境: prod / 测试环境: dev
+machine=$5      # 爬虫运行机器,阿里云服务器: aliyun_hk / aliyun / macpro / macair / local
+nohup_dir=$6    # nohup日志存储路径,如: ./youtube/nohup.log
+
+echo "开始"
+echo ${machine}
+if [ ${machine} = "--machine=aliyun_hk" ];then
+  piaoquan_crawler_dir=/root/piaoquan_crawler/
+  profile_path=/etc/profile
+  python=python3
+elif [ ${machine} = "--machine=aliyun" ];then
+  piaoquan_crawler_dir=/data5/piaoquan_crawler/
+  profile_path=/etc/profile
+  python=python
+elif [ ${machine} = "--machine=local" ];then
+  piaoquan_crawler_dir=/Users/wangkun/Desktop/crawler/piaoquan_crawler/
+  profile_path=/etc/profile
+  node_path=/opt/homebrew/bin/node
+  python=python3
+fi
+
+echo "$(date "+%Y-%m-%d %H:%M:%S") 更新环境变量..."
+cd ~ && source ${profile_path}
+echo "$(date "+%Y-%m-%d %H:%M:%S") 更新环境变量完成!"
+
+echo "$(date "+%Y-%m-%d %H:%M:%S") 正在杀进程..."
+grep_str=run_${crawler##*=}
+ps aux | grep ${grep_str} | grep Python | grep -v grep | awk '{print $2}' | xargs kill -9
+echo "$(date "+%Y-%m-%d %H:%M:%S") 进程已杀死!"
+
+if [ ${machine} = "--machine=aliyun_hk" ];then
+  echo "升级yt-dlp"
+  pip3 install yt-dlp -U
+else
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 正在更新代码..."
+  cd ${piaoquan_crawler_dir} && git pull origin master --force && rm -f ${piaoquan_crawler_dir}main/nohup.log && rm -f ${piaoquan_crawler_dir}${nohup_dir}
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 代码更新完成!"
+fi
+
+if [ ${machine} = "--machine=aliyun_hk" ];then
+  echo "无需重启Appium及adb服务"
+elif [ ${machine} = "--machine=aliyun" ];then
+  echo "无需重启Appium及adb服务"
+else
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 正在重启Appium..."
+  ps aux | grep Appium.app | grep -v grep | awk '{print $2}' | xargs kill -9
+  nohup ${node_path} /Applications/Appium.app/Contents/Resources/app/node_modules/appium/build/lib/main.js >>./nohup.log 2>&1 &
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 重启Appium完毕!"
+
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 正在重启adb..."
+  adb kill-server
+  adb start-server
+  echo "$(date "+%Y-%m-%d %H:%M:%S") 重启adb完毕!"
+fi
+
+echo "$(date "+%Y-%m-%d %H:%M:%S") 正在重启服务..."
+cd ${piaoquan_crawler_dir}
+nohup ${python} -u ${crawler_dir} ${log_type} ${crawler} ${env} ${machine} >>${nohup_dir} 2>&1 &
+echo "$(date "+%Y-%m-%d %H:%M:%S") 服务重启完毕!"
+
+exit 0

TEMPAT SAMPAH
scheduling/.DS_Store


+ 0 - 119
scheduling/crawler_scheduling.py

@@ -1,119 +0,0 @@
-# -*- coding: utf-8 -*-
-# @Author: wangkun
-# @Time: 2023/3/2
-import argparse
-import os
-import sys
-import time
-sys.path.append(os.getcwd())
-from common.common import Common
-from common.db import MysqlHelper, RedisHelper
-
-
-class Scheduling:
-    # 任务列表
-    task_list = []
-
-    # 读取任务表
-    @classmethod
-    def get_task(cls, log_type, crawler, env, machine):
-        get_sql = """ select * from crawler_task_1 """
-        all_task_list = MysqlHelper.get_values(log_type=log_type, crawler=crawler, sql=get_sql, env=env, machine=machine)
-        pre_task_list = []
-        for task in all_task_list:
-            task_id = task[0]
-            task_name = task[1]
-            source = task[2]
-            next_time = task[3]
-            interval_piaoquan = task[4]
-            spider_rule = task[5]
-            task_type = task[6]
-            spider_link = task[7]
-            spider_name = task[8]
-            min_publish_time = task[9]
-            min_publish_day = task[10]
-            media_id = task[11]
-            applets_status = task[12]
-            app_status = task[13]
-            user_tag = task[14]
-            user_content_tag = task[15]
-            machine = task[16]
-            insert_time = task[17]
-            update_time = task[18]
-            if int(time.time()) >= next_time:
-                task_dict = {
-                    "task_id": task_id,
-                    "task_name": task_name,
-                    "source": source,
-                    "next_time": next_time,
-                    "interval_piaoquan": interval_piaoquan,
-                    "spider_rule": spider_rule,
-                    "task_type": task_type,
-                    "spider_link": spider_link,
-                    "spider_name": spider_name,
-                    "min_publish_time": min_publish_time,
-                    "min_publish_day": min_publish_day,
-                    "media_id": media_id,
-                    "applets_status": applets_status,
-                    "app_status": app_status,
-                    "user_tag": user_tag,
-                    "user_content_tag": user_content_tag,
-                    "machine": machine,
-                    "insert_time": insert_time,
-                    "update_time": update_time,
-                }
-                pre_task_list.append(task_dict)
-        return pre_task_list
-
-    # 更新下次启动时间,调用时机:调度该 task_id 的任务时
-    @classmethod
-    def update_task(cls, log_type, crawler, task_id, next_time, interval_piaoquan, env, machine):
-        if interval_piaoquan > 0:
-            new_next_time = next_time + interval_piaoquan
-            update_sql = f""" UPDATE crawler_task_1 SET next_time={new_next_time} WHERE task_id={task_id} """
-            MysqlHelper.update_values(log_type, crawler, update_sql, env, machine)
-
-
-    'sh ./main/main.sh' \
-    ' ./xigua/xigua_main/run_xigua_follow.py' \
-    ' --log_type="follow"' \
-    ' --crawler="xigua"' \
-    ' --strategy="定向爬虫策略"' \
-    ' --oss_endpoint="inner"' \
-    ' --env="prod"' \
-    ' --machine="aliyun"' \
-    ' xigua/nohup.log'
-
-    # 资源分配 / 组装
-    @classmethod
-    def write_redis(cls, log_type, crawler, env, machine):
-        pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env, machine=machine)
-        if len(pre_task_list) == 0:
-            Common.logger(log_type, crawler).info("暂无新任务\n")
-        else:
-            for pre_task in pre_task_list:
-                print(type(pre_task))
-                print(pre_task)
-                if machine == "hk":
-                    # 写入 redis
-                    pass
-                elif machine == "aliyun":
-                    # 写入 redis
-                    pass
-                else:
-                    # 写入 redis
-                    RedisHelper.redis_push(env, machine,pre_task['task_id'], str(pre_task))
-
-    @classmethod
-    def main(cls, log_type, crawler):
-        # 当前时间 >= next_time,更新 next_time(调用update_task),然后启动该爬虫
-        pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env, machine=machine)
-        if len(pre_task_list) == 0:
-            Common.logger(log_type, crawler).info("暂无新任务\n")
-        else:
-            for pre_task in pre_task_list:
-                task_list = RedisHelper.redis_pop()
-
-
-if __name__ == "__main__":
-    Scheduling.write_redis("scheduling", "scheduling", "dev", "local")

TEMPAT SAMPAH
scheduling/scheduling_main/.DS_Store


+ 3 - 0
scheduling/scheduling_main/__init__.py

@@ -0,0 +1,3 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/3/8

+ 95 - 0
scheduling/scheduling_main/crawler_scheduling.py

@@ -0,0 +1,95 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/3/2
+import os
+import sys
+import time
+sys.path.append(os.getcwd())
+from common.common import Common
+from common.scheduling_db  import MysqlHelper, RedisHelper
+
+
+class Scheduling:
+    # 读取任务表
+    @classmethod
+    def get_task(cls, log_type, crawler, env, machine):
+        get_sql = """ select * from crawler_task """
+        all_task_list = MysqlHelper.get_values(log_type=log_type, crawler=crawler, sql=get_sql, env=env, machine=machine)
+        pre_task_list = []
+        for task in all_task_list:
+            if int(time.time()) >= task["next_time"]:
+                pre_task_list.append(task)
+        return pre_task_list
+
+    # 更新下次启动时间,调用时机:调度该 task_id 的任务时
+    @classmethod
+    def update_task(cls, log_type, crawler, task_id, next_time, interval_piaoquan, env, machine):
+        if interval_piaoquan > 0:
+            new_next_time = next_time + interval_piaoquan
+            update_sql = f""" UPDATE crawler_task SET next_time={new_next_time} WHERE task_id={task_id} """
+            MysqlHelper.update_values(log_type, crawler, update_sql, env, machine)
+
+    # 资源分配 / 组装
+    @classmethod
+    def write_redis(cls, log_type, crawler, env, machine):
+        pre_task_list = cls.get_task(log_type=log_type, crawler=crawler, env=env, machine=machine)
+        if len(pre_task_list) == 0:
+            Common.logger(log_type, crawler).info("暂无新任务\n")
+        else:
+            for pre_task in pre_task_list:
+                if machine == "hk":
+                    # 写入 redis
+                    pass
+                elif machine == "aliyun":
+                    # 写入 redis
+                    pass
+                else:
+                    # 写入 redis
+                    RedisHelper.redis_push(env, machine,str(pre_task))
+
+    @classmethod
+    def get_redis(cls, log_type, crawler, env, machine):
+        redis_data = RedisHelper.redis_pop(env, machine)
+        if redis_data is None:
+            Common.logger(log_type, crawler).info("Redis为空,等待1秒")
+            time.sleep(1)
+        else:
+            task = eval(str(redis_data, encoding="utf8"))
+            return task
+
+    @classmethod
+    def scheduling_task(cls, log_type, crawler, env, machine):
+        task = cls.get_redis(log_type, crawler, env, machine)
+        Common.logger(log_type, crawler).info(f"已获取调度任务:{task}")
+        task_id = task['task_id']
+        source = task['source']
+        next_time = task['next_time']
+        interval_piaoquan = task['interval_piaoquan']
+        spider_name = task['spider_name']
+        if machine == "aliyun":
+            oss_endpoint = "inner"
+        elif machine == "aliyun_hk":
+            oss_endpoint = "hk"
+        else:
+            oss_endpoint = "out"
+
+        if int(time.time()) >= next_time:
+            cls.update_task(log_type, crawler, task_id, next_time, interval_piaoquan, env, machine)
+            # 正式环境,调度任务
+            Common.logger(log_type, crawler).info(f"开始调度任务:{task}\n")
+            task_str = [('task_id', str(task_id)), ('task_name', str(task['task_name'])), ('source', str(task['source'])), ('next_time', str(task['next_time'])), ('interval_piaoquan', str(task['interval_piaoquan'])), ('play_cnt', eval(task['spider_rule'])['play_cnt']),('video_width', eval(task['spider_rule'])['video_width']),('video_height', eval(task['spider_rule'])['video_height']),('video_like', eval(task['spider_rule'])['video_like']),('share_cnt', eval(task['spider_rule'])['share_cnt']),('duration_min', eval(task['spider_rule'])['duration']['min']),('duration_max', eval(task['spider_rule'])['duration']['max']),('task_type', task['task_type']),('spider_link', eval(task['spider_link'])),('spider_name', str(task['spider_name'])),('min_publish_time', str(task['min_publish_time'])),('min_publish_day', str(task['min_publish_day'])),('media_id', str(task['media_id'])),('applets_status', str(task['applets_status'])),('app_status', str(task['app_status'])),('user_tag', str(task['user_tag'])),('user_content_tag',str(task['user_content_tag'])),('machine', str(task['machine']))]
+            task_str = str(task_str).replace(' ', '')
+            cmd = f"""sh scheduling/scheduling_main/scheduling.sh {source}/{source}_main/{spider_name}_scheduling.py --log_type="{spider_name}" --crawler="{source}" --task="{str(task_str)}" --oss_endpoint="{oss_endpoint}" --env="{env}" --machine="{machine}" {source}/{source}-nohup.log """
+            Common.logger(log_type, crawler).info(f"cmd:{cmd}\n")
+            os.system(cmd)
+
+
+
+
+if __name__ == "__main__":
+    # print(Scheduling.get_task("scheduling", "scheduling", "dev", "local"))
+    # print(Scheduling.get_redis("scheduling", "scheduling", "dev", "local"))
+    # Scheduling.write_redis("scheduling", "scheduling", "dev", "local")
+    Scheduling.scheduling_task("scheduling", "scheduling", "dev", "local")
+
+    pass

+ 35 - 0
scheduling/scheduling_main/run_scheduling_task.py

@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/3/7
+import argparse
+import os
+import sys
+import time
+sys.path.append(os.getcwd())
+from common.common import Common
+from scheduling.scheduling_main.crawler_scheduling import Scheduling
+
+
+class SchedulingTask:
+    @classmethod
+    def scheduling_task(cls, log_type, crawler, env, machine):
+        while True:
+            Common.logger(log_type, crawler).info("开始调度爬虫任务")
+            Scheduling.scheduling_task(log_type, crawler, env, machine)
+            Common.logger(log_type, crawler).info("爬虫任务调度完成")
+            Common.del_logs(log_type, crawler)
+            time.sleep(60)
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
+    parser.add_argument('--crawler')  ## 添加参数
+    parser.add_argument('--env')  ## 添加参数
+    parser.add_argument('--machine')  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    # print(args)
+    SchedulingTask.scheduling_task(log_type=args.log_type,
+                                   crawler=args.crawler,
+                                   env=args.env,
+                                   machine=args.machine)

+ 35 - 0
scheduling/scheduling_main/run_write_task.py

@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/3/7
+import argparse
+import os
+import sys
+import time
+sys.path.append(os.getcwd())
+from common.common import Common
+from scheduling.scheduling_main.crawler_scheduling import Scheduling
+
+
+class WriteTask:
+    @classmethod
+    def write_task(cls, log_type, crawler, env, machine):
+        while True:
+            Common.logger(log_type, crawler).info("开始读取爬虫任务,写入Redis")
+            Scheduling.write_redis(log_type=log_type, crawler=crawler, env=env, machine=machine)
+            Common.logger(log_type, crawler).info("写入Redis完成")
+            Common.del_logs(log_type, crawler)
+            time.sleep(60)
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
+    parser.add_argument('--crawler')  ## 添加参数
+    parser.add_argument('--env')  ## 添加参数
+    parser.add_argument('--machine')  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    # print(args)
+    WriteTask.write_task(log_type=args.log_type,
+                         crawler=args.crawler,
+                         env=args.env,
+                         machine=args.machine)

+ 10 - 19
main/schedeling_main.sh → scheduling/scheduling_main/scheduling.sh

@@ -1,29 +1,18 @@
 #!/bin/bash
-# 看一看+小程序 朋友圈榜单
-# sh ./weixinzhishu_main/weixinzhishu_main.sh ./kanyikan/kanyikan_main/run_kanyikan_moment.py --log_type="moment" --crawler="kanyikan" --strategy="kanyikan_moment" --our_uid="kanyikan_moment" --oss_endpoint="out" --env="dev" ./kanyikan/nohup.log local
 
 crawler_dir=$1  # 爬虫执行路径,如: ./youtube/youtube_main/run_youtube_follow.py
 log_type=$2     # 日志命名格式,如: follow,则在 youtube/logs/目录下,生成 2023-02-08-follow.log
 crawler=$3      # 哪款爬虫,如: youtube / kanyikan / weixinzhishu
-strategy=$4     # 爬虫策略,如: 定向爬虫策略 / 小时榜爬虫策略 / 热榜爬虫策略
+task=$4         # 爬虫任务
 oss_endpoint=$5 # OSS网关,内网: inner / 外网: out / 香港: hk
 env=$6          # 爬虫运行环境,正式环境: prod / 测试环境: dev
 machine=$7      # 爬虫运行机器,阿里云服务器: aliyun_hk / aliyun / macpro / macair / local
 nohup_dir=$8    # nohup日志存储路径,如: ./youtube/nohup.log
 
 echo "开始"
+#echo "machine:"${machine}
 
-if [ ${machine} = "--machine=macpro" ];then
-  piaoquan_crawler_dir=/Users/lieyunye/Desktop/piaoquan_crawler/
-  profile_path=.bash_profile
-  node_path=/usr/local/bin/node
-  python=python3
-elif [ ${machine} = "--machine=macair" ];then
-  piaoquan_crawler_dir=/Users/piaoquan/Desktop/piaoquan_crawler/
-  profile_path=./base_profile
-  node_path=/usr/local/bin/node
-  python=python3
-elif [ ${machine} = "--machine=aliyun_hk" ];then
+if [ ${machine} = "--machine=aliyun_hk" ];then
   piaoquan_crawler_dir=/root/piaoquan_crawler/
   profile_path=/etc/profile
   python=python3
@@ -36,16 +25,18 @@ elif [ ${machine} = "--machine=local" ];then
   profile_path=/etc/profile
   node_path=/opt/homebrew/bin/node
   python=python3
+else
+  echo ${machine}
 fi
 
 echo "$(date "+%Y-%m-%d %H:%M:%S") 更新环境变量..."
 cd ~ && source ${profile_path}
 echo "$(date "+%Y-%m-%d %H:%M:%S") 更新环境变量完成!"
 
-echo "$(date "+%Y-%m-%d %H:%M:%S") 正在杀进程..."
-grep_str=run_${crawler##*=}
-ps aux | grep ${grep_str} | grep Python | grep -v grep | awk '{print $2}' | xargs kill -9
-echo "$(date "+%Y-%m-%d %H:%M:%S") 进程已杀死!"
+#echo "$(date "+%Y-%m-%d %H:%M:%S") 正在杀进程..."
+#grep_str=run_${crawler##*=}
+#ps aux | grep ${grep_str} | grep Python | grep -v grep | awk '{print $2}' | xargs kill -9
+#echo "$(date "+%Y-%m-%d %H:%M:%S") 进程已杀死!"
 
 if [ ${machine} = "--machine=aliyun_hk" ];then
   echo "升级yt-dlp"
@@ -74,7 +65,7 @@ fi
 
 echo "$(date "+%Y-%m-%d %H:%M:%S") 正在重启服务..."
 cd ${piaoquan_crawler_dir}
-nohup ${python} -u ${crawler_dir} ${log_type} ${crawler} ${strategy} ${oss_endpoint} ${env} ${machine} >>${nohup_dir} 2>&1 &
+nohup ${python} -u ${crawler_dir} ${log_type} ${crawler} ${task} ${oss_endpoint} ${env} ${machine} >>${nohup_dir} 2>&1 &
 echo "$(date "+%Y-%m-%d %H:%M:%S") 服务重启完毕!"
 
 exit 0

TEMPAT SAMPAH
xigua/.DS_Store


+ 38 - 0
xigua/xigua_follow/demo.py

@@ -0,0 +1,38 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/3/7
+import time
+from datetime import date, timedelta
+
+
+class Demo:
+    @classmethod
+    def test_time(cls):
+        min_publish_day = 10
+        min_publish_day = (date.today() + timedelta(days=-min_publish_day)).strftime("%Y-%m-%d")
+        min_publish_day = int(time.mktime(time.strptime(min_publish_day, "%Y-%m-%d")))
+        print(min_publish_day)
+
+
+    @classmethod
+    def test_str(cls):
+        list1 = [('task_id','19'),('task_name','西瓜定向抓取'),('source','xigua'),('next_time','1678174642'),('interval_piaoquan','600'),('play_cnt',{'min':1000}),('video_width',{'min':720}),('video_height',{'min':720}),('video_like',{'min':0}),('share_cnt',{'min':0}),('duration_min',60),('duration_max',6000),('task_type','author'),('spider_link',['https://www.ixigua.com/home/95420624045','https://www.ixigua.com/home/6431477489']),('spider_name','run_xigua_follow'),('min_publish_time','0'),('min_publish_day','10'),('media_id','6267141'),('applets_status','0'),('app_status','3'),('user_tag','西瓜爬虫,定向爬虫策略'),('user_content_tag','搞笑博主'),('local','aliyun')]
+        dict1 = dict(list1)
+        print(type(dict1))
+        print(dict1)
+        for k, v in dict1.items():
+            print(f"{k}:{v},{type(v)}")
+
+
+
+
+        # for tuple1 in str1:
+        #     list1 = list(tuple1)
+        #     dict1 = dict(zip(list1))
+
+
+if __name__ == "__main__":
+    # Demo.test_time()
+    # Demo.test_str()
+    print(int('0'))
+    pass

+ 125 - 236
xigua/xigua_follow/xigua_follow_scheduling.py

@@ -9,63 +9,92 @@ import shutil
 import string
 import sys
 import time
+from datetime import date, timedelta
 import requests
 import urllib3
 from requests.adapters import HTTPAdapter
-
-from selenium.webdriver import DesiredCapabilities
-from selenium.webdriver.chrome.service import Service
-from selenium.webdriver.common.by import By
-from selenium import webdriver
-from lxml import etree
-
 sys.path.append(os.getcwd())
-from common.db import MysqlHelper
-from common.users import Users
+from common.scheduling_db import  MysqlHelper
 from common.common import Common
 from common.feishu import Feishu
 from common.publish import Publish
 
 
-class Follow:
+class SchedulingFollow:
     # 个人主页视频翻页参数
     offset = 0
-
     platform = "西瓜视频"
-    tag = "西瓜视频爬虫,定向爬虫策略"
 
     @classmethod
-    def get_rule(cls, log_type, crawler):
-        try:
-            while True:
-                rule_sheet = Feishu.get_values_batch(log_type, crawler, "4kxd31")
-                if rule_sheet is None:
-                    Common.logger(log_type, crawler).warning("rule_sheet is None! 10秒后重新获取")
-                    time.sleep(10)
-                    continue
-                rule_dict = {
-                    "play_cnt": int(rule_sheet[1][2]),
-                    "comment_cnt": int(rule_sheet[2][2]),
-                    "like_cnt": int(rule_sheet[3][2]),
-                    "duration": int(rule_sheet[4][2]),
-                    "publish_time": int(rule_sheet[5][2]),
-                    "video_width": int(rule_sheet[6][2]),
-                    "video_height": int(rule_sheet[7][2]),
-                }
-                return rule_dict
-        except Exception as e:
-            Common.logger(log_type, crawler).error(f"get_rule:{e}\n")
+    def get_users(cls, log_type, crawler, task, env, machine):
+        link_list = task['spider_link']
+        user_list = []
+        for link in link_list:
+            out_uid = int(link.split("https://www.ixigua.com/home/")[-1].replace("/", "").strip())
+            sql = f""" select * from crawler_author_map where spider_link="{link}" """
+            our_user_info = MysqlHelper.get_values(log_type=log_type, crawler=crawler, sql=sql, env=env, machine=machine)
+            if len(our_user_info) == 0:
+                our_uid = 0
+                Common.logger(log_type, crawler).info(f"没有站内虚拟账号: {link}\n")
+            else:
+                # print(type(our_user_info[0]))
+                # print(our_user_info[0])
+                our_uid = our_user_info[0]["media_id"]
+            user_dict = {
+                "out_uid": out_uid,
+                "our_uid": our_uid
+            }
+            user_list.append(user_dict)
+        Common.logger(log_type, crawler).info(f"user_list:{user_list}")
+        return user_list
 
     # 下载规则
     @classmethod
-    def download_rule(cls, video_info_dict, rule_dict):
-        if video_info_dict['play_cnt'] >= rule_dict['play_cnt']:
-            if video_info_dict['comment_cnt'] >= rule_dict['comment_cnt']:
-                if video_info_dict['like_cnt'] >= rule_dict['like_cnt']:
-                    if video_info_dict['duration'] >= rule_dict['duration']:
-                        if video_info_dict['video_width'] >= rule_dict['video_width'] \
-                                or video_info_dict['video_height'] >= rule_dict['video_height']:
-                            return True
+    def download_rule_scheduling(cls, video_info_dict, task):
+        try:
+            play_cnt_min = int(task['play_cnt']['min'])
+        except:
+            play_cnt_min = 0
+
+        try:
+            video_like_min = int(task['video_like']['min'])
+        except:
+            video_like_min = 0
+
+        try:
+            share_cnt_min = int(task['share_cnt']['min'])
+        except:
+            share_cnt_min = 0
+
+        try:
+            video_width_min = int(task['video_width']['min'])
+        except:
+            video_width_min = 0
+
+        try:
+            video_height_min = task['video_height']['min']
+        except:
+            video_height_min = 0
+
+        try:
+            duration_min = int(task['duration_min'])
+        except:
+            duration_min = 0
+
+        try:
+            duration_max = int(task['duration_max'])
+        except:
+            duration_max = 1000000000
+
+        if int(video_info_dict['play_cnt']) >= play_cnt_min:
+            if int(video_info_dict['like_cnt']) >= video_like_min:
+                if int(video_info_dict['share_cnt']) >= share_cnt_min:
+                    if duration_max >= int(video_info_dict['duration']) >= duration_min:
+                        if int(video_info_dict['video_width']) >= video_width_min:
+                            if int(video_info_dict['video_height']) >= video_height_min:
+                                return True
+                            else:
+                                return False
                         else:
                             return False
                     else:
@@ -97,101 +126,6 @@ class Follow:
         except Exception as e:
             Common.logger(log_type, crawler).error(f'filter_words异常:{e}\n')
 
-    @classmethod
-    def get_out_user_info(cls, log_type, crawler, out_uid):
-        try:
-            headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41',
-                       'referer': f'https://www.ixigua.com/home/{out_uid}',
-                       'Cookie': f'ixigua-a-s=1; support_webp=true; support_avif=false; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; __ac_signature={cls.random_signature()}; MONITOR_WEB_ID=67cb5099-a022-4ec3-bb8e-c4de6ba51dd0; s_v_web_id=verify_lef4i99x_32SosrdH_Qrtk_4LJn_8S7q_fhu16xe3s8ZV; tt_scid=QLJjPuHf6wxVqu6IIq6gHiJXQpVrCwrdhjH2zpm7-E3ZniE1RXBcP6M8b41FJOdo41e1; ttwid=1%7CHHtv2QqpSGuSu8r-zXF1QoWsvjmNi1SJrqOrZzg-UCY%7C1677047013%7C5866a444e5ae10a9df8c11551db75010fb77b657f214ccf84e503fae8d313d09; msToken=PerXJcDdIsZ6zXkGITsftXX4mDaVaW21GuqtzSVdctH46oXXT2GcELIs9f0XW2hunRzP6KVHLZaYElRvNYflLKUXih7lC27XKxs3HjdZiXPK9NQaoKbLfA==; ixigua-a-s=1',}
-            url = f"https://www.ixigua.com/home/{out_uid}"
-            urllib3.disable_warnings()
-            s = requests.session()
-            # max_retries=3 重试3次
-            s.mount('http://', HTTPAdapter(max_retries=3))
-            s.mount('https://', HTTPAdapter(max_retries=3))
-            response = s.get(url=url, headers=headers, proxies=Common.tunnel_proxies(), verify=False, timeout=5).text
-            html = etree.HTML(response)
-            out_follow_str = html.xpath('//div[@class="userDetailV3__header__detail2"]/*[1]/span')[0].text.encode('raw_unicode_escape').decode()
-            out_fans_str = html.xpath('//div[@class="userDetailV3__header__detail2"]/*[2]/span')[0].text.encode('raw_unicode_escape').decode()
-            out_like_str = html.xpath('//div[@class="userDetailV3__header__detail2"]/*[3]/span')[0].text.encode('raw_unicode_escape').decode()
-            out_avatar_url = f"""https:{html.xpath('//span[@class="component-avatar__inner"]//img/@src')[0]}"""
-            if "万" in out_follow_str:
-                out_follow = int(float(out_follow_str.split("万")[0])*10000)
-            else:
-                out_follow = int(out_follow_str.replace(",", ""))
-            if "万" in out_fans_str:
-                out_fans = int(float(out_fans_str.split("万")[0])*10000)
-            else:
-                out_fans = int(out_fans_str.replace(",", ""))
-            if "万" in out_like_str:
-                out_like = int(float(out_like_str.split("万")[0])*10000)
-            else:
-                out_like = int(out_like_str.replace(",", ""))
-            out_user_dict = {
-                "out_follow": out_follow,
-                "out_fans": out_fans,
-                "out_like": out_like,
-                "out_avatar_url": out_avatar_url,
-            }
-            # for k, v in out_user_dict.items():
-            #     print(f"{k}:{v}")
-            return out_user_dict
-        except Exception as e:
-            Common.logger(log_type, crawler).error(f"get_out_user_info:{e}\n")
-
-    # 获取用户信息(字典格式). 注意:部分 user_id 字符类型是 int / str
-    @classmethod
-    def get_user_list(cls, log_type, crawler, sheetid, env, machine):
-        try:
-            while True:
-                user_sheet = Feishu.get_values_batch(log_type, crawler, sheetid)
-                if user_sheet is None:
-                    Common.logger(log_type, crawler).warning(f"user_sheet:{user_sheet} 10秒钟后重试")
-                    continue
-                our_user_list = []
-                for i in range(1, len(user_sheet)):
-                    out_uid = user_sheet[i][2]
-                    user_name = user_sheet[i][3]
-                    our_uid = user_sheet[i][6]
-                    our_user_link = user_sheet[i][7]
-                    if out_uid is None or user_name is None:
-                        Common.logger(log_type, crawler).info("空行\n")
-                    else:
-                        Common.logger(log_type, crawler).info(f"正在更新 {user_name} 用户信息\n")
-                        if our_uid is None:
-                            out_user_info = cls.get_out_user_info(log_type, crawler, out_uid)
-                            out_user_dict = {
-                                "out_uid": out_uid,
-                                "user_name": user_name,
-                                "out_avatar_url": out_user_info["out_avatar_url"],
-                                "out_create_time": '',
-                                "out_tag": '',
-                                "out_play_cnt": 0,
-                                "out_fans": out_user_info["out_fans"],
-                                "out_follow": out_user_info["out_follow"],
-                                "out_friend": 0,
-                                "out_like": out_user_info["out_like"],
-                                "platform": cls.platform,
-                                "tag": cls.tag,
-                            }
-                            our_user_dict = Users.create_user(log_type=log_type, crawler=crawler, out_user_dict=out_user_dict, env=env, machine=machine)
-                            our_uid = our_user_dict['our_uid']
-                            our_user_link = our_user_dict['our_user_link']
-                            Feishu.update_values(log_type, crawler, sheetid, f'G{i + 1}:H{i + 1}', [[our_uid, our_user_link]])
-                            Common.logger(log_type, crawler).info(f'站内用户信息写入飞书成功!\n')
-                            our_user_list.append(our_user_dict)
-                        else:
-                            our_user_dict = {
-                                'out_uid': out_uid,
-                                'user_name': user_name,
-                                'our_uid': our_uid,
-                                'our_user_link': our_user_link,
-                            }
-                            our_user_list.append(our_user_dict)
-                return our_user_list
-        except Exception as e:
-            Common.logger(log_type, crawler).error(f'get_user_id_from_feishu异常:{e}\n')
-
     @classmethod
     def random_signature(cls):
         src_digits = string.digits  # string_数字
@@ -218,39 +152,6 @@ class Follow:
             new_password = new_password_start + 'y' + new_password_end
         return new_password
 
-    @classmethod
-    def get_signature(cls, log_type, crawler, out_uid, machine):
-        try:
-            # 打印请求配置
-            ca = DesiredCapabilities.CHROME
-            ca["goog:loggingPrefs"] = {"performance": "ALL"}
-
-            # 不打开浏览器运行
-            chrome_options = webdriver.ChromeOptions()
-            chrome_options.add_argument("--headless")
-            chrome_options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36')
-            chrome_options.add_argument("--no-sandbox")
-
-            # driver初始化
-            if machine == 'aliyun' or machine == 'aliyun_hk':
-                driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options)
-            elif machine == 'macpro':
-                driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options,
-                                          service=Service('/Users/lieyunye/Downloads/chromedriver_v86/chromedriver'))
-            elif machine == 'macair':
-                driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options,
-                                          service=Service('/Users/piaoquan/Downloads/chromedriver'))
-            else:
-                driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options, service=Service('/Users/wangkun/Downloads/chromedriver/chromedriver_v110/chromedriver'))
-            driver.implicitly_wait(10)
-            driver.get(f'https://www.ixigua.com/home/{out_uid}/')
-            time.sleep(3)
-            data_src = driver.find_elements(By.XPATH, '//img[@class="tt-img BU-MagicImage tt-img-loaded"]')[1].get_attribute("data-src")
-            signature = data_src.split("x-signature=")[-1]
-            return signature
-        except Exception as e:
-            Common.logger(log_type, crawler).error(f'get_signature异常:{e}\n')
-
     # 获取视频详情
     @classmethod
     def get_video_url(cls, log_type, crawler, gid):
@@ -652,7 +553,7 @@ class Follow:
             Common.logger(log_type, crawler).error(f'get_video_url:{e}\n')
 
     @classmethod
-    def get_videolist(cls, log_type, crawler, strategy, our_uid, out_uid, oss_endpoint, env, machine):
+    def get_videolist(cls, log_type, crawler, task, our_uid, out_uid, oss_endpoint, env, machine):
         try:
             signature = cls.random_signature()
             while True:
@@ -664,26 +565,11 @@ class Follow:
                     'maxBehotTime': '0',
                     'order': 'new',
                     'isHome': '0',
-                    # 'msToken': 'G0eRzNkw189a8TLaXjc6nTHVMQwh9XcxVAqTbGKi7iPJdQcLwS3-XRrJ3MZ7QBfqErpxp3EX1WtvWOIcZ3NIgr41hgcd-v64so_RRj3YCRw1UsKW8mIssNLlIMspsg==',
-                    # 'X-Bogus': 'DFSzswVuEkUANjW9ShFTgR/F6qHt',
                     '_signature': signature,
                 }
                 headers = {
-                    # 'authority': 'www.ixigua.com',
-                    # 'accept': 'application/json, text/plain, */*',
-                    # 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
-                    # 'cache-control': 'no-cache',
-                    # 'cookie': f'MONITOR_WEB_ID=7168304743566296612; __ac_signature={signature}; ixigua-a-s=1; support_webp=true; support_avif=false; csrf_session_id=a5355d954d3c63ed1ba35faada452b4d; msToken=G0eRzNkw189a8TLaXjc6nTHVMQwh9XcxVAqTbGKi7iPJdQcLwS3-XRrJ3MZ7QBfqErpxp3EX1WtvWOIcZ3NIgr41hgcd-v64so_RRj3YCRw1UsKW8mIssNLlIMspsg==; tt_scid=o4agqz7u9SKPwfBoPt6S82Cw0q.9KDtqmNe0JHxMqmpxNHQWq1BmrQdgVU6jEoX7ed99; ttwid=1%7CHHtv2QqpSGuSu8r-zXF1QoWsvjmNi1SJrqOrZzg-UCY%7C1676618894%7Cee5ad95378275f282f230a7ffa9947ae7eff40d0829c5a2568672a6dc90a1c96; ixigua-a-s=1',
-                    # 'pragma': 'no-cache',
                     'referer': f'https://www.ixigua.com/home/{out_uid}/video/?preActiveKey=hotsoon&list_entrance=userdetail',
-                    # 'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"',
-                    # 'sec-ch-ua-mobile': '?0',
-                    # 'sec-ch-ua-platform': '"macOS"',
-                    # 'sec-fetch-dest': 'empty',
-                    # 'sec-fetch-mode': 'cors',
-                    # 'sec-fetch-site': 'same-origin',
                     'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41',
-                    # 'x-secsdk-csrf-token': '00010000000119e3f9454d1dcbb288704cda1960f241e2d19bd21f2fd283520c3615a990ac5a17448bfbb902a249'
                 }
                 urllib3.disable_warnings()
                 s = requests.session()
@@ -804,20 +690,23 @@ class Follow:
                         else:
                             cover_url = videoList[i]['video_detail_info']['detail_video_large_image']['url_list'][0]['url']
 
-                        while True:
-                            rule_dict = cls.get_rule(log_type, crawler)
-                            if rule_dict is None:
-                                Common.logger(log_type, crawler).warning(f"rule_dict:{rule_dict}, 10秒后重试")
-                                time.sleep(10)
-                            else:
-                                break
+                        min_publish_time = int(task["min_publish_time"])
+                        min_publish_day = int(task["min_publish_day"])
+                        min_publish_day = (date.today() + timedelta(days=-min_publish_day)).strftime("%Y-%m-%d")
+                        min_publish_day = int(time.mktime(time.strptime(min_publish_day, "%Y-%m-%d")))
+                        if min_publish_time > 0 and min_publish_day > 0:
+                            publish_time_rule = min_publish_time
+                        elif min_publish_time > 0:
+                            publish_time_rule = min_publish_time
+                        else:
+                            publish_time_rule = min_publish_day
 
                         if gid == 0 or video_id == 0 or cover_url == 0:
                             Common.logger(log_type, crawler).info('无效视频\n')
-                        elif is_top is True and int(time.time()) - int(publish_time) > 3600 * 24 * rule_dict['publish_time']:
-                            Common.logger(log_type, crawler).info(f'置顶视频,且发布时间:{publish_time_str} 超过{rule_dict["publish_time"]}天\n')
-                        elif int(time.time()) - int(publish_time) > 3600 * 24 * rule_dict['publish_time']:
-                            Common.logger(log_type, crawler).info(f'发布时间:{publish_time_str}超过{rule_dict["publish_time"]}天\n')
+                        elif is_top is True and int(publish_time) < publish_time_rule:
+                            Common.logger(log_type, crawler).info(f'置顶视频,且发布时间超过抓取时间\n')
+                        elif int(publish_time) < publish_time_rule:
+                            Common.logger(log_type, crawler).info(f'发布时间超过抓取时间\n')
                             cls.offset = 0
                             return
                         else:
@@ -852,8 +741,8 @@ class Follow:
                             cls.download_publish(log_type=log_type,
                                                  crawler=crawler,
                                                  video_dict=video_dict,
-                                                 rule_dict=rule_dict,
-                                                 strategy=strategy,
+                                                 task=task,
+                                                 strategy=task["task_name"],
                                                  our_uid=our_uid,
                                                  oss_endpoint=oss_endpoint,
                                                  env=env,
@@ -869,22 +758,14 @@ class Follow:
 
     # 下载 / 上传
     @classmethod
-    def download_publish(cls, log_type, crawler, strategy, video_dict, rule_dict, our_uid, oss_endpoint, env, machine):
+    def download_publish(cls, log_type, crawler, strategy, video_dict, task, our_uid, oss_endpoint, env, machine):
         try:
-            if cls.download_rule(video_dict, rule_dict) is False:
+            if cls.download_rule_scheduling(video_dict, task) is False:
                 Common.logger(log_type, crawler).info('不满足抓取规则\n')
             elif any(word if word in video_dict['video_title'] else False for word in cls.filter_words(log_type, crawler)) is True:
                 Common.logger(log_type, crawler).info('标题已中过滤词:{}\n', video_dict['video_title'])
             elif cls.repeat_video(log_type, crawler, video_dict['video_id'], env, machine) != 0:
                 Common.logger(log_type, crawler).info('视频已下载\n')
-            # elif str(video_dict['video_id']) in [x for y in Feishu.get_values_batch(log_type, 'xigua', 'e075e9') for x in y]:
-            #     Common.logger(log_type, crawler).info('视频已下载\n')
-            # elif str(video_dict['video_id']) in [x for y in Feishu.get_values_batch(log_type, 'xigua', '3Ul6wZ') for x in y]:
-            #     Common.logger(log_type, crawler).info('视频已下载\n')
-            # elif str(video_dict['video_id']) in [x for y in Feishu.get_values_batch(log_type, 'xigua', 'QOWqMo') for x in y]:
-            #     Common.logger(log_type, crawler).info('视频已下载\n')
-            # elif str(video_dict['video_id']) in [x for y in Feishu.get_values_batch(log_type, 'xigua', 'wjhpDs') for x in y]:
-            #     Common.logger(log_type, crawler).info('视频已存在\n')
             else:
                 # 下载视频
                 Common.download_method(log_type=log_type, crawler=crawler, text='xigua_video', title=video_dict['video_title'], url=video_dict['video_url'])
@@ -948,6 +829,15 @@ class Follow:
                 Feishu.update_values(log_type, 'xigua', "e075e9", "F2:Z2", values)
                 Common.logger(log_type, crawler).info(f"视频已保存至云文档\n")
 
+                rule_dict = {
+                    "play_cnt": task["play_cnt"],
+                    "video_width": task["video_width"],
+                    "video_height": task["video_height"],
+                    "video_like": task["video_like"],
+                    "share_cnt": task["share_cnt"],
+                    "duration": {"min": task["duration_min"], "max": task["duration_max"]}
+                }
+
                 # 视频信息保存数据库
                 insert_sql = f""" insert into crawler_video(video_id,
                                 user_id,
@@ -986,41 +876,40 @@ class Follow:
             Common.logger(log_type, crawler).error(f'download_publish异常:{e}\n')
 
     @classmethod
-    def get_follow_videos(cls, log_type, crawler, strategy, oss_endpoint, env, machine):
+    def get_follow_videos(cls, log_type, crawler, task, oss_endpoint, env, machine):
         try:
-            user_list = cls.get_user_list(log_type=log_type, crawler=crawler, sheetid="5tlTYB", env=env, machine=machine)
+            user_list = cls.get_users(log_type=log_type,
+                                      crawler=crawler,
+                                      task=task,
+                                      env=env,
+                                      machine=machine)
             for user in user_list:
                 out_uid = user["out_uid"]
-                user_name = user["user_name"]
-                our_uid = user["our_uid"]
-                Common.logger(log_type, crawler).info(f"开始抓取 {user_name} 用户主页视频\n")
-                cls.get_videolist(log_type=log_type,
-                                  crawler=crawler,
-                                  strategy=strategy,
-                                  our_uid=our_uid,
-                                  out_uid=out_uid,
-                                  oss_endpoint=oss_endpoint,
-                                  env=env,
-                                  machine=machine)
-                cls.offset = 0
-                time.sleep(1)
+                our_uid = int(user["our_uid"])
+                if our_uid == 0:
+                    pass
+                else:
+                    Common.logger(log_type, crawler).info(f"开始抓取 {out_uid} 用户主页视频\n")
+                    cls.get_videolist(log_type=log_type,
+                                      crawler=crawler,
+                                      task=task,
+                                      our_uid=our_uid,
+                                      out_uid=out_uid,
+                                      oss_endpoint=oss_endpoint,
+                                      env=env,
+                                      machine=machine)
+                    cls.offset = 0
+                    time.sleep(1)
         except Exception as e:
             Common.logger(log_type, crawler).error(f"get_follow_videos:{e}\n")
 
 
 if __name__ == '__main__':
-    # print(Follow.get_signature("follow", "xigua", "95420624045", "local"))
-    # Follow.get_videolist(log_type="follow",
-    #                      crawler="xigua",
-    #                      strategy="定向爬虫策略",
-    #                      our_uid="6267141",
-    #                      out_uid="95420624045",
-    #                      oss_endpoint="out",
-    #                      env="dev",
-    #                      machine="local")
-    # print(Follow.random_signature())
-    rule = Follow.get_rule("follow", "xigua")
-    print(type(rule))
-    print(type(json.dumps(rule)))
-    print(json.dumps(rule))
+    # SchedulingFollow.get_users(log_type="follow",
+    #                            crawler="xigua",
+    #                            spider_rule="['https://www.ixigua.com/home/95420624045', 'https://www.ixigua.com/home/6431477489']",
+    #                            env="dev",
+    #                            machine="local")
+
+    print(SchedulingFollow.repeat_video("follow", "xigua", "v0201ag10000ce3jcjbc77u8jsplpgrg", "dev", "local"))
     pass

+ 20 - 16
xigua/xigua_main/run_xigua_follow_scheduling.py

@@ -4,25 +4,30 @@
 import argparse
 import os
 import sys
-import time
 
 sys.path.append(os.getcwd())
 from common.common import Common
-from xigua.xigua_follow.xigua_follow_scheduling import Follow
+from xigua.xigua_follow.xigua_follow_scheduling import SchedulingFollow
 from common.feishu import Feishu
 
 
-def main(log_type, crawler, strategy, oss_endpoint, env, machine):
-    while True:
-        try:
-            Common.logger(log_type, crawler).info('开始抓取 西瓜视频 定向榜\n')
-            Follow.get_follow_videos(log_type, crawler, strategy, oss_endpoint, env, machine)
-            Common.del_logs(log_type, crawler)
-            Common.logger(log_type, crawler).info('抓取完一轮,休眠 1 分钟\n')
-            time.sleep(60)
-        except Exception as e:
-            Common.logger(log_type, crawler).info(f"西瓜视频异常,触发报警:{e}\n")
-            Feishu.bot(log_type, crawler, f"{e}")
+def main(log_type, crawler, task, oss_endpoint, env, machine):
+    task = dict(eval(task))
+    Common.logger(log_type, crawler).info(f"{type(task)}\n")
+    Common.logger(log_type, crawler).info(f"{task}\n")
+    try:
+        Common.logger(log_type, crawler).info('开始抓取 西瓜视频 定向榜\n')
+        SchedulingFollow.get_follow_videos(log_type=log_type,
+                                           crawler=crawler,
+                                           task=task,
+                                           oss_endpoint=oss_endpoint,
+                                           env=env,
+                                           machine=machine)
+        Common.del_logs(log_type, crawler)
+        Common.logger(log_type, crawler).info('抓取任务结束\n')
+    except Exception as e:
+        Common.logger(log_type, crawler).info(f"西瓜视频异常,触发报警:{e}\n")
+        # Feishu.bot(log_type, crawler, f"{e}")
 
 
 if __name__ == "__main__":
@@ -30,15 +35,14 @@ if __name__ == "__main__":
     parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
     parser.add_argument('--crawler')  ## 添加参数
     parser.add_argument('--strategy')  ## 添加参数
-    parser.add_argument('--our_uid')  ## 添加参数
+    parser.add_argument('--task')  ## 添加参数
     parser.add_argument('--oss_endpoint')  ## 添加参数
     parser.add_argument('--env')  ## 添加参数
     parser.add_argument('--machine')  ## 添加参数
     args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
-    # print(args)
     main(log_type=args.log_type,
          crawler=args.crawler,
-         strategy=args.strategy,
+         task=args.task,
          oss_endpoint=args.oss_endpoint,
          env=args.env,
          machine=args.machine)

TEMPAT SAMPAH
youtube/.DS_Store


+ 1 - 1
youtube/youtube_follow/youtube_follow.py

@@ -393,7 +393,7 @@ class Follow:
                 user_name = user_sheet[i][3]
                 browse_id = user_sheet[i][5]
                 our_uid = user_sheet[i][6]
-                if our_uid is not None and user_name is not None:
+                if out_uid is not None and user_name is not None:
                     Common.logger(log_type, crawler).info(f"正在更新 {user_name} 用户信息\n")
                     # 获取站外browse_id,并写入飞书
                     if browse_id is None:

+ 1 - 1
youtube/youtube_follow/youtube_follow_api.py

@@ -463,7 +463,7 @@ class Follow:
                 browse_id = user_sheet[i][5]
                 our_uid = user_sheet[i][6]
                 uer_url = user_sheet[i][4]
-                if our_uid is not None and user_name is not None:
+                if out_uid is not None and user_name is not None:
                     Common.logger(log_type, crawler).info(f"正在更新 {user_name} 用户信息\n")
                     # 获取站外browse_id,并写入飞书
                     # if browse_id is None: