Browse Source

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	kuaishou/kuaishou_author/kuaishou_author_scheduling_new.py
zhangyong 1 year ago
parent
commit
7468b9697d

+ 92 - 0
changsha_bot.py

@@ -0,0 +1,92 @@
+"""
+独立执行,每个人执行一次,单日总量到达 300 即执行一次
+"""
+import json
+import datetime
+import time
+
+import schedule
+import requests
+
+from common.db import RedisClient
+
+
+def bot(name):
+    """
+    报警机器人
+    """
+    id_dict = {
+        "余海涛": "ou_b87d153e200a04de3d82b1b9276e8f90",
+        "范军": "ou_fce9cfef186e260e70554b47fee70a34",
+        "罗情": "ou_88139cd84c2d105c2e1d699c14ec3375",
+        "鲁涛": "ou_7986cccb78e6c981db8d0eef93443d05",
+        "王雪珂": "ou_2233fb8e1302314bae166fcfa144151f",
+        "邓锋": "ou_379d37645f929e1e6553a75aecda42a2"
+    }
+    url = "https://open.feishu.cn/open-apis/bot/v2/hook/df47bb77-ecaa-4628-b076-aae776415ae8"
+    headers = {"Content-Type": "application/json"}
+    payload = {
+        "msg_type": "interactive",
+        "card": {
+            "elements": [
+                {
+                    "tag": "div",
+                    "text": {
+                        "content": "抓取数量触发限量通知, <at id={}></at>, <at id={}></at>, <at id={}></at>\n".format(
+                            id_dict[name], id_dict["邓锋"], id_dict["王雪珂"]
+                        ),
+                        "tag": "lark_md",
+                    },
+                },
+                {
+                    "tag": "div",
+                    "text": {
+                        "content": "当天已经入库 300 条视频",
+                        "tag": "lark_md",
+                    },
+                },
+            ],
+            "header": {"title": {"content": "【 通知 】", "tag": "plain_text"}},
+        },
+    }
+    requests.post(url, headers=headers, data=json.dumps(payload))
+
+
+def monitor():
+    """
+    监测 redis 中数据
+    """
+    counts_info = {
+        "罗情": True,
+        "余海涛": True,
+        "范军": True,
+        "鲁涛": True
+    }
+    keys = {"352": "余海涛", "353": "罗情", "53": "范军", "51": "鲁涛"}
+    now = datetime.datetime.now().time()
+    start_alert_time = datetime.time(10)
+    end_alert_time = datetime.time(20, 30)
+    while True:
+        if start_alert_time <= now <= end_alert_time:
+            try:
+                R = RedisClient()
+                if R.connect():
+                    for key in keys:
+                        count = R.select(key)
+                        if count:
+                            OO = int(count.decode("utf-8"))
+                            name = keys[key]
+                            if OO >= 300 and counts_info[name]:
+                                bot(name)
+                                counts_info[name] = False
+            except Exception as e:
+                pass
+            # 查询一次之后等待 60 s
+        time.sleep(60)
+
+
+if __name__ == '__main__':
+    schedule.every().day.at("08:00").do(monitor)
+    while True:
+        schedule.run_pending()
+        time.sleep(60)

+ 79 - 41
common/db.py

@@ -7,44 +7,49 @@
 import redis
 import pymysql
 from common.common import Common
+
 # from common import Common
 
+
 class MysqlHelper:
     @classmethod
     def connect_mysql(cls, env, machine):
-        if machine == 'aliyun_hk':
+        if machine == "aliyun_hk":
             # 创建一个 Connection 对象,代表了一个数据库连接
             connection = pymysql.connect(
-                host="rm-j6cz4c6pt96000xi3.mysql.rds.aliyuncs.com",# 数据库IP地址,内网地址
+                host="rm-j6cz4c6pt96000xi3.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
                 # host="rm-j6cz4c6pt96000xi3lo.mysql.rds.aliyuncs.com",# 数据库IP地址,外网地址
-                port=3306,                      # 端口号
-                user="crawler",                 #  mysql用户名
-                passwd="crawler123456@",        # mysql用户登录密码
-                db="piaoquan-crawler" ,         # 数据库名
+                port=3306,  # 端口号
+                user="crawler",  #  mysql用户名
+                passwd="crawler123456@",  # mysql用户登录密码
+                db="piaoquan-crawler",  # 数据库名
                 # 如果数据库里面的文本是utf8编码的,charset指定是utf8
-                charset = "utf8")
-        elif env == 'prod':
+                charset="utf8",
+            )
+        elif env == "prod":
             # 创建一个 Connection 对象,代表了一个数据库连接
             connection = pymysql.connect(
-                host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",# 数据库IP地址,内网地址
+                host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
                 # host="rm-bp1159bu17li9hi94ro.mysql.rds.aliyuncs.com",# 数据库IP地址,外网地址
-                port=3306,                      # 端口号
-                user="crawler",                 #  mysql用户名
-                passwd="crawler123456@",        # mysql用户登录密码
-                db="piaoquan-crawler" ,         # 数据库名
+                port=3306,  # 端口号
+                user="crawler",  #  mysql用户名
+                passwd="crawler123456@",  # mysql用户登录密码
+                db="piaoquan-crawler",  # 数据库名
                 # 如果数据库里面的文本是utf8编码的,charset指定是utf8
-                charset = "utf8")
+                charset="utf8",
+            )
         else:
             # 创建一个 Connection 对象,代表了一个数据库连接
             connection = pymysql.connect(
-                host="rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com",# 数据库IP地址,内网地址
+                host="rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
                 # host="rm-bp1k5853td1r25g3ndo.mysql.rds.aliyuncs.com",  # 数据库IP地址,外网地址
                 port=3306,  # 端口号
                 user="crawler",  # mysql用户名
                 passwd="crawler123456@",  # mysql用户登录密码
                 db="piaoquan-crawler",  # 数据库名
                 # 如果数据库里面的文本是utf8编码的,charset指定是utf8
-                charset="utf8")
+                charset="utf8",
+            )
 
         return connection
 
@@ -91,34 +96,35 @@ class MysqlHelper:
         # 关闭数据库连接
         connect.close()
 
+
 class RedisHelper:
     @classmethod
     def connect_redis(cls, env, machine):
-        if machine == 'aliyun_hk':
+        if machine == "aliyun_hk":
             redis_pool = redis.ConnectionPool(
                 # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
-                host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 外网地址
+                host="r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com",  # 外网地址
                 port=6379,
                 db=2,
-                password='Qingqu2019'
+                password="Qingqu2019",
             )
             redis_conn = redis.Redis(connection_pool=redis_pool)
-        elif env == 'prod':
+        elif env == "prod":
             redis_pool = redis.ConnectionPool(
-                host='r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com',  # 内网地址
+                host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com",  # 内网地址
                 # host='r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com',  # 外网地址
                 port=6379,
                 db=2,
-                password='Qingqu2019'
+                password="Qingqu2019",
             )
             redis_conn = redis.Redis(connection_pool=redis_pool)
         else:
             redis_pool = redis.ConnectionPool(
                 # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
-                host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 外网地址
+                host="r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com",  # 外网地址
                 port=6379,
                 db=2,
-                password='Qingqu2019'
+                password="Qingqu2019",
             )
             redis_conn = redis.Redis(connection_pool=redis_pool)
         return redis_conn
@@ -139,20 +145,52 @@ class RedisHelper:
             return redis_conn.rpop(machine)
 
 
-
-if __name__ == "__main__":
-    # sql_statement = f"INSERT INTO crawler_user ( user_id, out_user_id, out_user_name, out_avatar_url, platform, tag) " \
-    #       f"VALUES ('6282398', 'out_uid_003', 'out_user_name', '', 'xiaoniangao', 'xiaoniangao_play')"
-    # edit_data = MysqlHelper.edit_data(sql=sql_statement)
-    # print(edit_data)
-
-    # get_data = MysqlHelper.get_values("demo", "youtube", "select * from crawler_user", "dev", "local")
-    # print(get_data)
-    print(RedisHelper.connect_redis("prod", "aliyun"))
-    # RedisHelper.redis_push("dev", "local", "test1")
-    # RedisHelper.redis_push("dev", "local", "test2")
-
-    # print(RedisHelper.redis_pop("dev", "local"))
-
-    pass
-
+class RedisClient(object):
+    """
+    Redis client by python
+    Todo 如果 Redis 服务挂了,怎么做能够不影响业务
+    思路, 每次使用 redis 接口前先判断是否连接成功,如果连接失败则跳过 redis ,不影响全局
+    """
+
+    def __init__(self):
+        self.pool = None
+        self.host = "r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com"
+        self.port = 6379
+        self.db = 2
+        self.password = "Wqsd@2019"
+
+    def connect(self):
+        """
+        connect to redis server
+        :return: bool
+        """
+        try:
+            self.pool = redis.Redis(
+                host=self.host, port=self.port, db=self.db, password=self.password
+            )
+            return True
+        except Exception as e:
+            print("connect to redis fail, the reason is {}".format(e))
+            return False
+
+    def select(self, key):
+        """
+        read info from redis
+        :return:
+        """
+        return self.pool.get(key)
+
+    def insert(self, key, value, expire_time):
+        """
+        insert info from redis
+        :return:
+        """
+        self.pool.set(key, value, expire_time)
+
+    def delete(self, key):
+        """
+        delete key
+        :param key:
+        :return:
+        """
+        self.pool.delete(key)

+ 93 - 0
common/limit.py

@@ -0,0 +1,93 @@
+import os
+import sys
+from datetime import datetime, timedelta
+
+sys.path.append(os.getcwd())
+
+from common.aliyun_log import AliyunLogger
+from common.scheduling_db import MysqlHelper
+from common.db import RedisClient
+
+
+def generate_expire_time():
+    """
+    计算出过期时间
+    """
+    now = datetime.now()
+
+    # 当天晚上12点的时间
+    midnight = datetime(now.year, now.month, now.day) + timedelta(days=1)
+
+    # 计算当前时间到当天晚上12点的秒数
+    seconds_until_midnight = int((midnight - now).total_seconds())
+    return seconds_until_midnight
+
+
+class AuthorLimit(object):
+    """
+    账号爬虫限量
+    """
+
+    def __init__(self, mode, platform):
+        self.mode = mode
+        self.platform = platform
+        self.limit_tag_dict = {"352": "余海涛", "353": "罗情", "53": "范军", "51": "鲁涛"}
+
+    def find_tag(self, uid):
+        """
+        通过 uid 去找符合标准的 tag
+        """
+        sql = f"""select tag from crawler_user_v3 where uid={uid};"""
+        result = MysqlHelper.get_values(
+            log_type=self.mode, crawler=self.platform, env="prod", sql=sql
+        )
+        tags = result[0]["tag"]
+        if tags:
+            tags = tags.split(",")
+            for tag in tags:
+                if self.limit_tag_dict.get(tag):
+                    return tag
+        return None
+
+    def author_limitation(self, user_id):
+        """
+        限制账号, 服务长沙四名同学
+        """
+        if self.mode == "author":
+            tag = self.find_tag(user_id)
+            if tag:
+                AliyunLogger.logging(
+                    code="8807",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env="prod",
+                    message="找到个人账号,{}".format(tag)
+                )
+                R = RedisClient()
+                if R.connect():
+                    tag_count = R.select(tag)
+                    if tag_count:
+                        tag_count = int(tag_count.decode("utf-8"))
+                        if tag_count <= 300:
+                            tag_count += 1
+                            expire_seconds = generate_expire_time()
+                            R.insert(
+                                key=tag, value=tag_count, expire_time=expire_seconds
+                            )
+                            return True
+                        else:
+                            # 报警
+                            return False
+                    else:
+                        tag_count = 1
+                        expire_seconds = generate_expire_time()
+                        R.insert(
+                            key=tag, value=tag_count, expire_time=expire_seconds
+                        )
+                        return True
+                else:
+                    return True
+            else:
+                return True
+        else:
+            return True

+ 38 - 22
common/pipeline.py

@@ -5,6 +5,10 @@ from common.scheduling_db import MysqlHelper
 
 
 class PiaoQuanPipeline:
+    """
+    pipeline code
+    """
+
     def __init__(self, platform, mode, rule_dict, env, item, trace_id):
         self.platform = platform
         self.mode = mode
@@ -13,21 +17,18 @@ class PiaoQuanPipeline:
         self.env = env
         self.trace_id = trace_id
 
-    # 视频的发布时间限制, 属于是规则过滤
     def publish_time_flag(self):
-        # 判断发布时间
+        """
+        发布时间判断
+        """
         publish_time_stamp = self.item["publish_time_stamp"]
         update_time_stamp = self.item["update_time_stamp"]
         max_d = self.rule_dict.get("period", {}).get("max", 1000)
         min_d = self.rule_dict.get("period", {}).get("min", 1000)
         days = max_d if max_d > min_d else min_d
         if self.platform == "gongzhonghao" or self.platform == "gongzhongxinhao":
-            if (
-                int(time.time()) - publish_time_stamp
-                > 3600 * 24 * days
-            ) and (
-                int(time.time()) - update_time_stamp
-                > 3600 * 24 * days
+            if (int(time.time()) - publish_time_stamp > 3600 * 24 * days) and (
+                int(time.time()) - update_time_stamp > 3600 * 24 * days
             ):
                 AliyunLogger.logging(
                     code="2004",
@@ -40,10 +41,7 @@ class PiaoQuanPipeline:
                 )
                 return False
         else:
-            if (
-                int(time.time()) - publish_time_stamp
-                > 3600 * 24 * days
-            ):
+            if int(time.time()) - publish_time_stamp > 3600 * 24 * days:
                 AliyunLogger.logging(
                     code="2004",
                     trace_id=self.trace_id,
@@ -56,8 +54,10 @@ class PiaoQuanPipeline:
                 return False
         return True
 
-    # 视频标题是否满足需求
     def title_flag(self):
+        """
+        视频标题是否满足需求
+        """
         title = self.item["video_title"]
         cleaned_title = re.sub(r"[^\w]", " ", title)
         # 敏感词
@@ -76,8 +76,10 @@ class PiaoQuanPipeline:
             return False
         return True
 
-    # 视频基础下载规则
     def download_rule_flag(self):
+        """
+        视频基础下载规则
+        """
         for key in self.item:
             if self.rule_dict.get(key):
                 max_value = (
@@ -85,10 +87,14 @@ class PiaoQuanPipeline:
                     if int(self.rule_dict[key]["max"]) > 0
                     else 999999999999999
                 )
-                if key == "peroid": # peroid是抓取周期天数
+                if key == "peroid":  # peroid是抓取周期天数
                     continue
                 else:
-                    flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
+                    flag = (
+                        int(self.rule_dict[key]["min"])
+                        <= int(self.item[key])
+                        <= max_value
+                    )
                     if not flag:
                         AliyunLogger.logging(
                             code="2004",
@@ -110,9 +116,13 @@ class PiaoQuanPipeline:
                 continue
         return True
 
-    # 按照某个具体平台来去重
     def repeat_video(self):
+        """
+        按照某个具体平台来去重
+        """
         # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
+        if self.platform == "jingdianfuqiwang" and self.mode == "recommend":
+            return True
         out_id = self.item["out_video_id"]
         sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
         repeat_video = MysqlHelper.get_values(
@@ -132,6 +142,9 @@ class PiaoQuanPipeline:
         return True
 
     def process_item(self):
+        """
+        判断是否符合规则
+        """
         if not self.publish_time_flag():
             # 记录相关日志
             return False
@@ -175,9 +188,8 @@ class PiaoQuanPipelineTest:
                 print(message)
                 return False
         else:
-            if (
-                int(time.time()) - publish_time_stamp
-                > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
+            if int(time.time()) - publish_time_stamp > 3600 * 24 * int(
+                self.rule_dict.get("period", {}).get("max", 1000)
             ):
                 message = "发布时间超过{}天".format(
                     int(self.rule_dict.get("period", {}).get("max", 1000))
@@ -208,10 +220,14 @@ class PiaoQuanPipelineTest:
                     if int(self.rule_dict[key]["max"]) > 0
                     else 999999999999999
                 )
-                if key == "peroid": # peroid是抓取周期天数
+                if key == "peroid":  # peroid是抓取周期天数
                     continue
                 else:
-                    flag = int(self.rule_dict[key]["min"]) <= int(self.item[key]) <= max_value
+                    flag = (
+                        int(self.rule_dict[key]["min"])
+                        <= int(self.item[key])
+                        <= max_value
+                    )
                     if not flag:
                         message = "{}: {} <= {} <= {}, {}".format(
                             key,

+ 7 - 3
douyin/douyin_author/douyin_author_scheduling_new.py

@@ -19,12 +19,14 @@ from requests.adapters import HTTPAdapter
 from common.scheduling_db import MysqlHelper
 from common.public import get_config_from_mysql, download_rule
 from douyin.douyin_author.douyin_author_scheduling_help import DouYinHelper
-
+from common.limit import AuthorLimit
 
 
 class DouyinauthorScheduling:
     platform = "抖音"
     download_cnt = 0
+    limiter = AuthorLimit(platform="douyin", mode="author")
+
 
     @classmethod
     def videos_cnt(cls, rule_dict):
@@ -266,8 +268,10 @@ class DouyinauthorScheduling:
                             video_dict["user_id"] = user_dict["uid"]
                             video_dict["publish_time"] = video_dict["publish_time_str"]
                             video_dict["strategy_type"] = log_type
-                            mq.send_msg(video_dict)
-                            cls.download_cnt += 1
+                            limit_flag = cls.limiter.author_limitation(user_id=video_dict['user_id'])
+                            if limit_flag:
+                                mq.send_msg(video_dict)
+                                cls.download_cnt += 1
 
                 except Exception as e:
                     Common.logger(log_type, crawler).warning(f"抓取单条视频异常:{e}\n")

+ 46 - 18
gongzhongxinhao/gongzhongxinhao/gongzhongxinhao_author.py

@@ -17,6 +17,7 @@ from selenium import webdriver
 
 from common.mq import MQ
 from common import AliyunLogger, PiaoQuanPipeline
+from common.limit import AuthorLimit
 from datetime import datetime
 
 class GZXHAuthor:
@@ -31,6 +32,7 @@ class GZXHAuthor:
         self.env = env
         self.download_cnt = 0
         self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        self.limiter = AuthorLimit(platform=self.platform, mode=self.mode)
 
     def get_account_videos(self):
             AliyunLogger.logging(
@@ -61,12 +63,36 @@ class GZXHAuthor:
             # 获取腾讯视频下载链接
 
     def get_tencent_video_url(self, video_id):
-        url = 'https://vv.video.qq.com/getinfo?vids=' + str(video_id) + '&platform=101001&charge=0&otype=json'
-        response = requests.get(url=url).text.replace('QZOutputJson=', '').replace('"};', '"}')
-        response = json.loads(response)
-        url = response['vl']['vi'][0]['ul']['ui'][0]['url']
-        fvkey = response['vl']['vi'][0]['fvkey']
-        video_url = url + str(video_id) + '.mp4?vkey=' + fvkey
+        # url = 'https://vv.video.qq.com/getinfo?vids=' + str(video_id) + '&platform=101001&charge=0&otype=json'
+        # response = requests.get(url=url).text.replace('QZOutputJson=', '').replace('"};', '"}')
+        # response = json.loads(response)
+        # url = response['vl']['vi'][0]['ul']['ui'][0]['url']
+        # fvkey = response['vl']['vi'][0]['fvkey']
+        # video_url = url + str(video_id) + '.mp4?vkey=' + fvkey
+        # return video_url
+        url = "https://h5vv.video.qq.com/getinfo?vid={}&platform=101001&charge=0&otype=json&defn=shd".format(
+            video_id
+        )
+        headers = {
+            "Host": "h5vv.video.qq.com",
+            "xweb_xhr": "1",
+            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817",
+            "Content-Type": "application/x-www-form-urlencoded",
+            "Accept": "*/*",
+            "Sec-Fetch-Site": "cross-site",
+            "Sec-Fetch-Mode": "cors",
+            "Sec-Fetch-Dest": "empty",
+            "Referer": "https://servicewechat.com/wx5fcd817f3f80aece/3/page-frame.html",
+            "Accept-Language": "en",
+        }
+        response = requests.get(url, headers=headers)
+        result = json.loads(response.text.replace("QZOutputJson=", "")[:-1])
+        vl = result["vl"]["vi"][0]
+        key = vl["fvkey"]
+        name = vl["fn"]
+        folder = vl["ul"]["ui"][0]["url"]
+        video_url = folder + name + "?vkey=" + key
+        time.sleep(random.randint(1, 5))
         return video_url
 
     def get_video_url(self, article_url):
@@ -296,18 +322,20 @@ class GZXHAuthor:
             time.sleep(0.5)
             Feishu.update_values('gongzhonghao', 'gongzhonghao', "9QU7wE", "A2:Z2", values)
             video_dict["publish_time"] = video_dict["publish_time_str"]
-            self.mq.send_msg(video_dict)
-            self.download_cnt += 1
-            AliyunLogger.logging(
-                code="1002",
-                platform=self.platform,
-                mode=self.mode,
-                env=self.env,
-                data=video_dict,
-                trace_id=trace_id,
-                message="成功发送 MQ 至 ETL",
-            )
-            time.sleep(5)
+            limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id'])
+            if limit_flag:
+                self.mq.send_msg(video_dict)
+                self.download_cnt += 1
+                AliyunLogger.logging(
+                    code="1002",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    data=video_dict,
+                    trace_id=trace_id,
+                    message="成功发送 MQ 至 ETL",
+                )
+                time.sleep(5)
         return True
 
 

+ 3 - 3
jingdianfuqiwang/jingdianfuqiwang_recommend/jingdianfuqiwang_recommend_scheduling.py

@@ -62,8 +62,7 @@ class TFuQiWangRecommend(object):
                     response = requests.get(
                         url=base_url,
                         headers=headers,
-                        params=params,
-                        proxies=tunnel_proxies(),
+                        params=params
                     )
                     video_list = response.json()['data']['list']
                     if video_list:
@@ -104,7 +103,7 @@ class TFuQiWangRecommend(object):
                     platform=self.platform,
                     mode=self.mode,
                     env=self.env,
-                    message="抓取第{}页时候出现错误, 报错信息是{}".format(i + 1, e),
+                    message="抓取第{}页时候出现错误, 报错信息是{}".format(page_index + 1, e),
                 )
 
     def process_video_obj(self, video_obj):
@@ -134,6 +133,7 @@ class TFuQiWangRecommend(object):
         if pipeline.process_item():
             self.download_cnt += 1
             self.mq.send_msg(mq_obj)
+            time.sleep(60 * random.randint(1, 5))
             AliyunLogger.logging(
                 code="1002",
                 platform=self.platform,

+ 2 - 0
kuaishou/kuaishou_author/kuaishou_author_scheduling_new.py

@@ -19,11 +19,13 @@ from common.mq import MQ
 from requests.adapters import HTTPAdapter
 from common.scheduling_db import MysqlHelper
 from common.public import random_title, get_config_from_mysql, download_rule
+from common.limit import AuthorLimit
 
 
 class KuaishouauthorScheduling:
     platform = "快手"
     download_cnt = 0
+    limiter = AuthorLimit(platform="kuaishou", mode="author")
 
     @classmethod
     def videos_cnt(cls, rule_dict):

+ 16 - 12
shipinhao/shipinhao_author/shipinhao_author.py

@@ -16,6 +16,7 @@ from common import PiaoQuanPipeline, AliyunLogger
 from common.db import MysqlHelper
 from common.mq import MQ
 from common.public import clean_title
+from common.limit import AuthorLimit
 
 
 def find_target_user(name, user_list):
@@ -43,6 +44,7 @@ class ShiPinHaoAuthor(object):
         self.env = env
         self.download_cnt = 0
         self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        self.limiter = AuthorLimit(platform=self.platform, mode=self.mode)
 
     def get_history_id(self):
         """
@@ -352,18 +354,20 @@ class ShiPinHaoAuthor(object):
             time.sleep(0.5)
             Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
             video_dict["publish_time"] = video_dict["publish_time_str"]
-            self.mq.send_msg(video_dict)
-            self.download_cnt += 1
-            AliyunLogger.logging(
-                code="1002",
-                platform=self.platform,
-                mode=self.mode,
-                env=self.env,
-                data=video_dict,
-                trace_id=trace_id,
-                message="成功发送 MQ 至 ETL",
-            )
-            time.sleep(5)
+            limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id'])
+            if limit_flag:
+                self.mq.send_msg(video_dict)
+                self.download_cnt += 1
+                AliyunLogger.logging(
+                    code="1002",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    data=video_dict,
+                    trace_id=trace_id,
+                    message="成功发送 MQ 至 ETL",
+                )
+                time.sleep(5)
         return True
 
     def video_duration(self, filename):

+ 0 - 0
xiaoniangao/xiaoniangao_account_scan.py


+ 32 - 18
xiaoniangao/xiaoniangao_author/xiaoniangao_author_v2.py

@@ -13,6 +13,7 @@ sys.path.append(os.getcwd())
 from common.common import Common
 from common import AliyunLogger, PiaoQuanPipeline
 from common.public import get_config_from_mysql, clean_title
+from common.limit import AuthorLimit
 
 
 def tunnel_proxies():
@@ -43,6 +44,7 @@ class XiaoNianGaoAuthor:
         self.download_count = 0
         self.test_account = [58528285, 58527674, 58528085, 58527582, 58527601, 58527612, 58528281, 58528095, 58527323,
                              58528071, 58527278]
+        self.limiter = AuthorLimit(platform=self.platform, mode=self.mode)
 
     def get_author_list(self):
         # 每轮只抓取定量的数据,到达数量后自己退出
@@ -254,7 +256,7 @@ class XiaoNianGaoAuthor:
             trace_id=trace_id,
         )
         # account_level = user_dict['account_level']
-        if user_dict['uid'] in self.test_account:
+        if user_dict['link'] in self.test_account:
             if (
                     int(time.time()) - publish_time_stamp
                     > 3600 * 24
@@ -295,21 +297,33 @@ class XiaoNianGaoAuthor:
             video_dict["user_id"] = user_dict["uid"]
             video_dict["publish_time"] = video_dict["publish_time_str"]
             # print(video_dict)
-            self.mq.send_msg(video_dict)
-            self.download_count += 1
-            AliyunLogger.logging(
-                code="1002",
-                platform=self.platform,
-                mode=self.mode,
-                env=self.env,
-                data=video_dict,
-                trace_id=trace_id,
-                message="成功发送 MQ 至 ETL",
-            )
-            Common.logging(
-                log_type=self.mode,
-                crawler=self.platform,
-                env=self.env,
-                message="成功发送 MQ 至 ETL",
-            )
+            limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id'])
+            if limit_flag:
+                self.mq.send_msg(video_dict)
+                self.download_count += 1
+                AliyunLogger.logging(
+                    code="1002",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    data=video_dict,
+                    trace_id=trace_id,
+                    message="成功发送 MQ 至 ETL",
+                )
+                Common.logging(
+                    log_type=self.mode,
+                    crawler=self.platform,
+                    env=self.env,
+                    message="成功发送 MQ 至 ETL",
+                )
+            else:
+                AliyunLogger.logging(
+                    code="8808",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    trace_id=trace_id,
+                    account=video_dict['user_id'],
+                    message="监测到个人账号数量超过 300,停止抓取该账号"
+                )
         return True

+ 37 - 33
xigua/xigua_author/xigua_author.py

@@ -14,6 +14,7 @@ from common.mq import MQ
 sys.path.append(os.getcwd())
 
 from common import AliyunLogger, PiaoQuanPipeline, tunnel_proxies
+from common.limit import AuthorLimit
 
 
 def random_signature():
@@ -647,6 +648,7 @@ class XiGuaAuthor:
         self.user_list = user_list
         self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
         self.download_count = 0
+        self.limiter = AuthorLimit(platform=self.platform, mode=self.mode)
 
     def rule_maker(self, account):
         """
@@ -844,12 +846,39 @@ class XiGuaAuthor:
             item=video_dict,
             trace_id=trace_id,
         )
-        title_flag = pipeline.title_flag()
-        repeat_flag = pipeline.repeat_video()
-        if title_flag and repeat_flag:
-            if new_rule.get("special"):
-                if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)):
-                    if float(video_dict['like_cnt']) / float(video_dict['play_cnt']) >= new_rule['special']:
+        limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id'])
+        if limit_flag:
+            title_flag = pipeline.title_flag()
+            repeat_flag = pipeline.repeat_video()
+            if title_flag and repeat_flag:
+                if new_rule.get("special"):
+                    if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)):
+                        if float(video_dict['like_cnt']) / float(video_dict['play_cnt']) >= new_rule['special']:
+                            self.mq.send_msg(video_dict)
+                            self.download_count += 1
+                            AliyunLogger.logging(
+                                code="1002",
+                                account=user_dict['uid'],
+                                platform=self.platform,
+                                mode=self.mode,
+                                env=self.env,
+                                data=video_dict,
+                                trace_id=trace_id,
+                                message="成功发送 MQ 至 ETL",
+                            )
+                            return True
+                        else:
+                            AliyunLogger.logging(
+                                code="2008",
+                                account=user_dict['uid'],
+                                platform=self.platform,
+                                mode=self.mode,
+                                env=self.env,
+                                message="不满足特殊规则, 点赞量/播放量",
+                                data=video_dict
+                            )
+                else:
+                    if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)):
                         self.mq.send_msg(video_dict)
                         self.download_count += 1
                         AliyunLogger.logging(
@@ -870,35 +899,10 @@ class XiGuaAuthor:
                             platform=self.platform,
                             mode=self.mode,
                             env=self.env,
-                            message="不满足特殊规则, 点赞量/播放量",
+                            message="不满足特殊规则, 播放量",
                             data=video_dict
                         )
-            else:
-                if int(video_dict['play_cnt']) >= int(new_rule.get("play_cnt", {}).get("min", 100000)):
-                    self.mq.send_msg(video_dict)
-                    self.download_count += 1
-                    AliyunLogger.logging(
-                        code="1002",
-                        account=user_dict['uid'],
-                        platform=self.platform,
-                        mode=self.mode,
-                        env=self.env,
-                        data=video_dict,
-                        trace_id=trace_id,
-                        message="成功发送 MQ 至 ETL",
-                    )
-                    return True
-                else:
-                    AliyunLogger.logging(
-                        code="2008",
-                        account=user_dict['uid'],
-                        platform=self.platform,
-                        mode=self.mode,
-                        env=self.env,
-                        message="不满足特殊规则, 播放量",
-                        data=video_dict
-                    )
-        return True
+            return True
 
     def get_video_info(self, item_id, trace_id):
         url = "https://www.ixigua.com/api/mixVideo/information?"