Parcourir la source

重新上线 limit.py

上线 manage_accounts.py, 用于处理长沙同学账号
罗俊辉 il y a 1 an
Parent
commit
6816e040bb
3 fichiers modifiés avec 152 ajouts et 83 suppressions
  1. 49 52
      common/limit.py
  2. 102 0
      manage_accounts.py
  3. 1 31
      test.py

+ 49 - 52
common/limit.py

@@ -31,67 +31,64 @@ class AuthorLimit(object):
     def __init__(self, mode, platform):
         self.mode = mode
         self.platform = platform
-        self.limit_tag_dict = {"352": "余海涛", "353": "罗情", "53": "范军", "51": "鲁涛", "131": False}
+        self.limit_tag_dict = {
+            "余海涛": "352",
+            "罗情": "353",
+            "范军": "53",
+            "鲁涛": "51"
+        }
 
     def find_tag(self, uid):
         """
-        通过 uid 去找符合标准的 tag
+        判断 uid 是否存在changsha_user_accounts中
         """
-        sql = f"""select tag from crawler_user_v3 where uid={uid};"""
+        sql = f"""select user_name from changsha_user_accounts where piaoquan_account_id = {uid};"""
         result = MysqlHelper.get_values(
             log_type=self.mode, crawler=self.platform, env="prod", sql=sql
         )
-        tags = result[0]["tag"]
-        if tags:
-            tags = tags.split(",")
-            if "131" in tags:
-                return None
-            else:
-                for tag in tags:
-                    if self.limit_tag_dict.get(tag):
-                        return tag
-        return None
+        return result
 
     def author_limitation(self, user_id):
         """
         限制账号, 服务长沙四名同学
         """
-        return True
-        # if self.mode == "author":
-        #     tag = self.find_tag(user_id)
-        #     if tag:
-        #         AliyunLogger.logging(
-        #             code="8807",
-        #             platform=self.platform,
-        #             mode=self.mode,
-        #             env="prod",
-        #             message="找到个人账号,{}".format(tag)
-        #         )
-        #         R = RedisClient()
-        #         if R.connect():
-        #             tag_count = R.select(tag)
-        #             if tag_count:
-        #                 tag_count = int(tag_count.decode("utf-8"))
-        #                 if tag_count <= 300:
-        #                     tag_count += 1
-        #                     expire_seconds = generate_expire_time()
-        #                     R.insert(
-        #                         key=tag, value=tag_count, expire_time=expire_seconds
-        #                     )
-        #                     return True
-        #                 else:
-        #                     # 报警
-        #                     return False
-        #             else:
-        #                 tag_count = 1
-        #                 expire_seconds = generate_expire_time()
-        #                 R.insert(
-        #                     key=tag, value=tag_count, expire_time=expire_seconds
-        #                 )
-        #                 return True
-        #         else:
-        #             return True
-        #     else:
-        #         return True
-        # else:
-        #     return True
+        if self.mode == "author":
+            result = self.find_tag(user_id)
+            if result:
+                user_name = result[0]['user_name']
+                AliyunLogger.logging(
+                    code="8807",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env="prod",
+                    message="找到个人账号,{}".format(user_name)
+                )
+                R = RedisClient()
+                if R.connect():
+                    tag = self.limit_tag_dict[user_name]
+                    tag_count = R.select(tag)
+                    if tag_count:
+                        tag_count = int(tag_count.decode("utf-8"))
+                        if tag_count <= 300:
+                            tag_count += 1
+                            expire_seconds = generate_expire_time()
+                            R.insert(
+                                key=tag, value=tag_count, expire_time=expire_seconds
+                            )
+                            return True
+                        else:
+                            # 报警
+                            return False
+                    else:
+                        tag_count = 1
+                        expire_seconds = generate_expire_time()
+                        R.insert(
+                            key=tag, value=tag_count, expire_time=expire_seconds
+                        )
+                        return True
+                else:
+                    return True
+            else:
+                return True
+        else:
+            return True

+ 102 - 0
manage_accounts.py

@@ -0,0 +1,102 @@
+import time
+import schedule
+import multiprocessing
+from common.scheduling_db import MysqlHelper
+
+
+def read_accounts_from_mysql():
+    """
+    Read accounts from mysql database
+    """
+    sql = f"""select tag, uid from crawler_user_v3 order by create_time desc;"""
+    result = MysqlHelper.get_values(
+        log_type="author", crawler="changsha", env="prod", sql=sql
+    )
+    limit_tag_dict = {
+        "352": "余海涛",
+        "353": "罗情",
+        "53": "范军",
+        "51": "鲁涛",
+        "131": "王雪珂",
+        "6682": "公众新号",
+        "469": "小年糕",
+        "464": "快手",
+        "5662": "快手账号爬虫",
+        "459": "spider",
+        "85": "快手爬虫",
+        "454": "账号",
+        "467": "视频号",
+        "106": "⭐️小年糕爬虫",
+        "120": "西瓜新爬虫",
+        "499": "抖音",
+        "2235": "抖音爬虫"
+    }
+    p_dict = {}
+    for item in result:
+        tag_list = item['tag'].split(",")
+        tag_set = set(tag_list)
+        require_set = {'454', '459'}
+        forbidden_set = {'131', '465', '1379', '160'}
+        if len(tag_set) >= 5:
+            if require_set.issubset(tag_set) and forbidden_set.isdisjoint(tag_set):
+                w = [limit_tag_dict.get(tag, None) for tag in tag_list]
+                p_dict[item['uid']] = w
+    return p_dict
+
+
+def insert_accounts(account_dict):
+    """
+    把长沙同学账号插入到 changsha_accounts 中
+    """
+    for key in account_dict:
+        select_sql = f"""select id from changsha_user_accounts where piaoquan_account_id = {key};"""
+        result = MysqlHelper.get_values(
+            log_type="author", crawler="changsha", env="prod", sql=select_sql
+        )
+        if result:
+            continue
+        tags = set(account_dict[key])
+        name_set = {'鲁涛', '罗情', '余海涛', '范军'}
+        platform_set = {'西瓜新爬虫', '快手账号爬虫', '公众新号', '⭐️小年糕爬虫', '抖音爬虫', '视频号'}
+        name = tags & name_set
+        platform = tags & platform_set
+        if name and platform:
+            user_name = list(name)[0]
+            platform_name = list(platform)[0]
+            sql = f"""INSERT INTO changsha_user_accounts (piaoquan_account_id, user_name, platform) VALUES ('{key}', '{user_name}', '{platform_name}');"""
+            MysqlHelper.update_values(log_type="author", crawler="changsha", sql=sql, env="prod")
+
+
+def protect_(function):
+    """
+    守护进程,在程序启动后的某一个时段内守护爬虫进程
+    :param function: 被守护的函数
+    """
+    process = multiprocessing.Process(target=function)
+    process.start()
+    while True:
+        if not process.is_alive():
+            process.terminate()
+            time.sleep(60)
+            process = multiprocessing.Process(target=function)
+            process.start()
+        time.sleep(60)
+
+
+def process_acc():
+    """
+    执行函数
+    """
+    dd_dict = read_accounts_from_mysql()
+    insert_accounts(dd_dict)
+
+
+def main():
+    """
+    定时执行任务, 每天晚上更新账号
+    """
+    schedule.every().day.at("23:40").do(process_acc)
+
+
+if __name__ == '__main__':
+    protect_(main())

Fichier diff supprimé car celui-ci est trop grand
+ 1 - 31
test.py


Certains fichiers n'ont pas été affichés car il y a eu trop de fichiers modifiés dans ce diff