ソースを参照

批量插入粉丝

luojunhui 1 ヶ月 前
コミット
3614c5e3e1
1 ファイル変更33 行追加16 行削除
  1. 33 16
      applications/tasks/crawler_tasks/crawler_gzh_fans.py

+ 33 - 16
applications/tasks/crawler_tasks/crawler_gzh_fans.py

@@ -38,7 +38,7 @@ class CrawlerGzhFansBase(CrawlerGzhFansConst):
         query = """
             SELECT access_token, expire_timestamp FROM gzh_cookie_info where gh_id = %s;
         """
-        return await self.pool.async_fetch(query=query, params=(gh_id, ))
+        return await self.pool.async_fetch(query=query, params=(gh_id,))
 
     # 从数据库获取粉丝 && token
     async def get_cookie_token_from_database(self, gh_id):
@@ -95,13 +95,8 @@ class CrawlerGzhFansBase(CrawlerGzhFansConst):
 
     # 批量插入粉丝信息
     async def insert_gzh_fans_batch(self, account_info, user_list):
-        for user in user_list:
-            query = """
-                INSERT IGNORE INTO gzh_fans_info
-                    (gh_id, account_name, user_openid, user_name, user_create_time, user_head_img, user_remark, identity_type, identity_open_id)
-                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
-            """
-            params = (
+        param_list = [
+            (
                 account_info["gh_id"],
                 account_info["account_name"],
                 user["user_openid"],
@@ -112,7 +107,14 @@ class CrawlerGzhFansBase(CrawlerGzhFansConst):
                 user["identity_type"],
                 user["identity_open_id"],
             )
-            await self.pool.async_save(query=query, params=params)
+            for user in user_list
+        ]
+        query = """
+            INSERT IGNORE INTO gzh_fans_info
+                (gh_id, account_name, user_openid, user_name, user_create_time, user_head_img, user_remark, identity_type, identity_open_id)
+            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
+        """
+        await self.pool.async_save(query=query, params=param_list, batch=True)
 
     # 插入失败详情
     async def insert_fail_detail(self, gh_id, fail_info):
@@ -210,7 +212,9 @@ class CrawlerGzhFansBase(CrawlerGzhFansConst):
             query=query, params=(cookie, token, self.AVAILABLE_STATUS, gh_id)
         )
 
-    async def set_access_token_for_each_account(self, gh_id, access_token, expire_timestamp):
+    async def set_access_token_for_each_account(
+        self, gh_id, access_token, expire_timestamp
+    ):
         query = """
             UPDATE gzh_cookie_info 
             SET access_token = %s, access_token_status = %s, expire_timestamp = %s 
@@ -218,7 +222,7 @@ class CrawlerGzhFansBase(CrawlerGzhFansConst):
         """
         return await self.pool.async_save(
             query=query,
-            params=(access_token, self.AVAILABLE_STATUS, expire_timestamp, gh_id)
+            params=(access_token, self.AVAILABLE_STATUS, expire_timestamp, gh_id),
         )
 
     async def get_max_cursor_id(self, gh_id):
@@ -260,7 +264,6 @@ class CrawlerGzhFans(CrawlerGzhFansBase):
                     cursor_id=cursor_openid,
                     cursor_timestamp=cursor_timestamp,
                 )
-                print(response)
                 base_resp = response.get("base_resp", {})
                 code = base_resp.get("ret")
                 error_msg = base_resp.get("err_msg")
@@ -283,7 +286,11 @@ class CrawlerGzhFans(CrawlerGzhFansBase):
                         else:
                             cursor_openid = next_cursor_id
                             cursor_timestamp = next_cursor_timestamp
-                            print(datetime.fromtimestamp(cursor_timestamp).strftime("%Y-%m-%d %H:%M:%S"))
+                            print(
+                                datetime.fromtimestamp(cursor_timestamp).strftime(
+                                    "%Y-%m-%d %H:%M:%S"
+                                )
+                            )
 
                     case "00040":
                         print(f"token 非法: {error_msg}")
@@ -359,7 +366,9 @@ class CrawlerGzhFans(CrawlerGzhFansBase):
                         env="cookie_monitor",
                         mention=False,
                     )
-                    await self.update_account_crawl_history_status(account_info["gh_id"], self.INVALID_STATUS)
+                    await self.update_account_crawl_history_status(
+                        account_info["gh_id"], self.INVALID_STATUS
+                    )
 
                 next_cursor_id = user_list[-1].get("user_openid")
                 next_cursor_timestamp = user_list[-1].get("user_create_time")
@@ -451,7 +460,11 @@ class CrawlerGzhFans(CrawlerGzhFansBase):
 
         match task_name:
             case "get_history_fans":
-                crawl_history_accounts = [i for i in account_list if i['crawl_history_status'] == self.AVAILABLE_STATUS]
+                crawl_history_accounts = [
+                    i
+                    for i in account_list
+                    if i["crawl_history_status"] == self.AVAILABLE_STATUS
+                ]
                 return await run_tasks_with_asyncio_task_group(
                     task_list=crawl_history_accounts,
                     handler=self.crawl_history_fans_for_each_account,
@@ -462,7 +475,11 @@ class CrawlerGzhFans(CrawlerGzhFansBase):
                 )
 
             case "get_union_ids":
-                binding_accounts = [i for i in account_list if i['binding_status'] == self.AVAILABLE_STATUS]
+                binding_accounts = [
+                    i
+                    for i in account_list
+                    if i["binding_status"] == self.AVAILABLE_STATUS
+                ]
                 return await run_tasks_with_asyncio_task_group(
                     task_list=binding_accounts,
                     handler=self.get_union_ids_for_each_account,