luojunhui 3 недель назад
Родитель
Сommit
460c2dbc3f

+ 4 - 15
account_crawler_task.py

@@ -68,22 +68,11 @@ def deal_each_platform(platform: str) -> None:
     # start process
     crawler.deal()
 
-    # get videos with score to sync to feishu
-    video_list = crawler.get_video_list_with_score(platform=platform)
-    if video_list:
-        insert_data_into_feishu_sheet(platform=platform, data_list=video_list)
-        video_id_list= [i[0] for i in video_list]
-        # update status
-        crawler.update_video_status(
-            video_id_tuple=tuple(video_id_list), ori_status=0, new_status=1
-        )
-
-
-
 if __name__ == "__main__":
-    platform_list = ["sph", "hksp", "toutiao"]
-    for platform in platform_list:
-        deal_each_platform(platform=platform)
+    # platform_list = ["sph", "hksp", "toutiao"]
+    # for platform_id in platform_list:
+    #     deal_each_platform(platform=platform_id)
+    HaoKanAccountCrawler().deal()
 
 
 

+ 1 - 1
applications/utils/__init__.py

@@ -10,7 +10,7 @@ from .download_video import download_toutiao_video
 from .item import Item
 from .save_to_db import insert_into_single_video_source_table
 from .save_to_db import insert_into_video_meta_accounts_table
-from .save_to_db import insert_into_associated_recommendation_table
+from .save_to_db import insert_into_candidate_account_pool_table
 from .upload import upload_to_oss
 from .fetch_info_from_aigc import fetch_account_fans
 from .fetch_info_from_aigc import fetch_publishing_account_list

+ 11 - 16
applications/utils/item.py

@@ -38,17 +38,12 @@ default_account_table_fields = {
 
 }
 
-default_association_table_fields = {
-    "account_name": 'Not NULL',
+default_candidate_account_table_fields = {
+    "platform": 'Not NULL',
     "account_id": 'Not NULL',
-    "recommend_video_id": 'Not NULL',
-    "title": 'Not NULL',
-    "read_cnt": 0,
-    "duration": 0,
-    "seed_account": 'Not NULL',
-    "seed_title": 'Not NULL',
-    "recommend_date": 'Not NULL',
-    "platform": 'Not NULL'
+    "account_name": 'Not NULL',
+    "crawler_date": 'Not NULL',
+    "title_list": "[]"
 }
 
 
@@ -96,18 +91,18 @@ class Item(object):
             else:
                 self.item[key] = default_account_table_fields[key]
 
-    def check_association_item(self):
+    def check_candidate_account_item(self):
         """
         check association item
         """
-        fields = list(default_association_table_fields.keys())
+        fields = list(default_candidate_account_table_fields.keys())
         for field in fields:
             if self.item.get(field, None) is not None:
                 continue
-            elif default_association_table_fields[field] == 'Not NULL':
+            elif default_candidate_account_table_fields[field] == 'Not NULL':
                 raise ValueError(f"{field} is not None, please check your account item")
             else:
-                self.item[field] = default_association_table_fields[field]
+                self.item[field] = default_candidate_account_table_fields[field]
 
     def check(self, source):
         """
@@ -120,5 +115,5 @@ class Item(object):
                 self.check_article_item()
             case "account":
                 self.check_account_item()
-            case "association":
-                self.check_association_item()
+            case "candidate_account":
+                self.check_candidate_account_item()

+ 17 - 23
applications/utils/save_to_db.py

@@ -86,20 +86,19 @@ def insert_into_video_meta_accounts_table(db_client, account_item):
             },
         )
 
-def insert_into_associated_recommendation_table(db_client, associated_recommendation_item):
+def insert_into_candidate_account_pool_table(db_client, account_item):
     """
     insert recommendation into recommendation table
     """
     # check whether duplicate video
     fetch_query = f"""
-        select id from video_association
-        where account_id = %s and platform = %s and recommend_video_id = %s;
+        select id from crawler_candidate_account_pool
+        where account_id = %s and platform = %s;
     """
     duplicate_id = db_client.fetch(
         query=fetch_query, params=(
-            associated_recommendation_item["account_id"],
-            associated_recommendation_item["platform"],
-            associated_recommendation_item["recommend_video_id"]
+            account_item["account_id"],
+            account_item["platform"]
         )
     )
     if duplicate_id:
@@ -107,35 +106,30 @@ def insert_into_associated_recommendation_table(db_client, associated_recommenda
 
     # insert into table
     insert_query = f"""
-        insert into video_association
-            (account_name, account_id, recommend_video_id, title, read_cnt, duration, seed_account, seed_title, recommend_date, platform)
+        insert into crawler_candidate_account_pool
+            (account_name, account_id, title_list, platform, crawler_date)
             values
-            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+            (%s, %s, %s, %s, %s)
     """
     try:
         db_client.save(
             query=insert_query,
             params=(
-                associated_recommendation_item["account_name"],
-                associated_recommendation_item["account_id"],
-                associated_recommendation_item["recommend_video_id"],
-                associated_recommendation_item["title"],
-                associated_recommendation_item["read_cnt"],
-                associated_recommendation_item["duration"],
-                associated_recommendation_item["seed_account"],
-                associated_recommendation_item["seed_title"],
-                associated_recommendation_item["recommend_date"],
-                associated_recommendation_item["platform"]
+               account_item["account_name"],
+               account_item["account_id"],
+               account_item["title_list"],
+               account_item["platform"],
+               account_item["crawler_date"]
             )
         )
     except Exception as e:
         log(
-            task="{}_recommendation_crawler".format(associated_recommendation_item["platform"]),
-            function="save_each_recommendation",
-            message="save recommendation failed",
+            task="{}_account_crawler".format(account_item["platform"]),
+            function="save_each_account",
+            message="save account failed",
             data={
                 "error": str(e),
                 "traceback": traceback.format_exc(),
-                "item": associated_recommendation_item
+                "item": account_item
             }
         )

+ 62 - 158
tasks/crawler_accounts_by_association.py

@@ -4,27 +4,29 @@
 
 from __future__ import annotations
 
+import json
 import datetime
 import traceback
-import numpy as np
 
 from pymysql.cursors import DictCursor
 from tqdm import tqdm
 
 from applications import log
-from applications.api import similarity_between_title_list
 from applications.db import DatabaseConnector
 from applications.pipeline import scrape_account_entities_process
 from applications.utils import Item
-from applications.utils import insert_into_associated_recommendation_table
+from applications.utils import insert_into_candidate_account_pool_table
 from coldStartTasks.crawler.baidu import haokan_search_videos
+from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler
 from coldStartTasks.crawler.toutiao import get_associated_recommendation
+from coldStartTasks.crawler.toutiao import get_toutiao_account_video_list
 from coldStartTasks.crawler.channels import search_in_wechat_channel
 from coldStartTasks.crawler.channels import get_channel_account_videos
 from config import apolloConfig, long_articles_config
 
 config = apolloConfig()
-cookie = config.getConfigValue("toutiao_detail_recommend_cookie")
+recommend_cookie = config.getConfigValue("toutiao_detail_recommend_cookie")
+blogger_cookie = config.getConfigValue("toutiao_blogger_cookie")
 
 
 class CrawlerAccounts:
@@ -48,85 +50,19 @@ class CrawlerAccounts:
             return
         else:
             # save to db
-            insert_into_associated_recommendation_table(
-                db_client=self.db_client, associated_recommendation_item=final_item
+            insert_into_candidate_account_pool_table(
+                db_client=self.db_client, account_item=final_item
             )
 
-    def save_similarity_score_to_table(self, association_list: list[dict]) -> int:
-        """
-        calculate similarity between seed_title_list and association_title_list
-        """
-        association_id_list = [i["id"] for i in association_list]
-        association_title_list = [i["title"] for i in association_list]
-        seed_title_list = [i["seed_title"] for i in association_list]
-        similarity_score_list = similarity_between_title_list(
-            seed_title_list, association_title_list
-        )
-        similarity_score_array = np.array(similarity_score_list)
-
-        # get main diagonal score
-        score_list = np.diag(similarity_score_array)
-
-        batch_update_query = """
-            update video_association
-            set score = case id
-                {}
-            end
-            where id in %s and score is null;
-        """
-        case_statement = []
-        params = []
-        for index, score in enumerate(score_list):
-            association_id = association_id_list[index]
-            case_statement.append(f"when %s then %s")
-            params.extend([association_id, score])
-
-        params.append(tuple(association_id_list))
-        case_statements = "\n".join(case_statement)
-        formatted_sql = batch_update_query.format(case_statements)
-        affected_rows = self.db_client.save(formatted_sql, params)
-        return affected_rows
-
-    def get_video_list_without_score(self):
-        fetch_query = f"""
-            select id, title, seed_title
-            from video_association
-            where score is null;
-        """
-        fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
-        return fetch_response
-
-    def get_video_list_with_score(self, platform: str):
-        """
-        find video from video association
-        """
-        fetch_query = f"""
-            select id, account_name, account_id, recommend_video_id, title, read_cnt, duration, seed_account, seed_title
-            from video_association 
-            where score > %s and platform = %s and status = %s
-            order by account_name;
-        """
-        fetch_response = self.db_client.fetch(query=fetch_query, params=(0.5, platform, 0))
-        return fetch_response
-
-    def update_video_status(self, video_id_tuple: tuple, ori_status: int, new_status: int) -> int:
+    def update_account_status(self, account_id_tuple: tuple, ori_status: int, new_status: int) -> int:
         update_query = f"""
             update video_association
             set status = %s
             where id in %s and status = %s;
         """
-        affected_rows = self.db_client.save(query=update_query, params=(new_status, video_id_tuple, ori_status))
+        affected_rows = self.db_client.save(query=update_query, params=(new_status, account_id_tuple, ori_status))
         return affected_rows
 
-    def cal_video_similarity_score(self):
-        # cal similarity score
-        video_list = self.get_video_list_without_score()
-        if video_list:
-            affected_rows = self.save_similarity_score_to_table(video_list)
-            print(affected_rows)
-        else:
-            print("no video recommendation")
-
 
 class ChannelsAccountCrawler(CrawlerAccounts):
     """
@@ -137,30 +73,27 @@ class ChannelsAccountCrawler(CrawlerAccounts):
         2. use search api to get accounts
     """
 
-    def process_channels_video(self, video: dict, seed_title: str, account_name: str, account_id: str):
+    def process_channels_account(self, account_name: str, account_id: str, title_list_str: str):
+
         """
         process video item and save to database
         """
 
-        video_item = Item()
-        video_item.add("account_name", account_name)
-        video_item.add("account_id", account_id)
-        video_item.add("recommend_video_id", video["id"])
-        video_item.add("title", video["objectDesc"]["description"])
-        video_item.add("duration", video["objectDesc"]["media"][0]["VideoPlayLen"])
-        video_item.add("seed_account", "SearchWithOutAccount")
-        video_item.add("seed_title", seed_title)
-        video_item.add(
-            "recommend_date", datetime.datetime.today().strftime("%Y-%m-%d")
+        account_item = Item()
+        account_item.add("account_name", account_name)
+        account_item.add("account_id", account_id)
+        account_item.add("title_list", title_list_str)
+        account_item.add(
+            "crawler_date", datetime.datetime.today().strftime("%Y-%m-%d")
         )
-        video_item.add("platform", "sph")
+        account_item.add("platform", "sph")
         # check item
-        video_item.check(source="association")
+        account_item.check(source="candidate_account")
 
         # save to db
-        self.insert_video_into_recommend_table(video_item.item)
+        self.insert_video_into_recommend_table(account_item.item)
 
-    def process_search_response(self, video: dict, seed_title: str):
+    def process_search_response(self, video: dict):
         """
         通过搜索视频的账号名称去搜索账号,并且抓取该账号下的第一页视频
         """
@@ -172,25 +105,12 @@ class ChannelsAccountCrawler(CrawlerAccounts):
         account_detail = search_account_response["data"]["data"][0]["items"][0]
         account_id = account_detail["jumpInfo"]["userName"]
 
-        # fetch account video list
+        # fetch account video list for the first page
         search_video_response = get_channel_account_videos(account_id)
         video_list = search_video_response["data"]["object"]
-
-        # process and insert each video
-        for video in video_list:
-            try:
-                self.process_channels_video(video, seed_title, account_name, account_id)
-            except Exception as e:
-                log(
-                    task="crawler_channels_account_videos",
-                    function="process_channels_video",
-                    message="process video failed",
-                    data={
-                        "video": video,
-                        "error": str(e),
-                        "traceback": traceback.format_exc()
-                    }
-                )
+        title_list = [i['objectDesc']['description'] for i in video_list]
+        title_list_str = json.dumps(title_list, ensure_ascii=False)
+        self.process_channels_account(account_name, account_id, title_list_str)
 
     def search_video_in_channels(self, title: str) -> None:
         """
@@ -200,7 +120,7 @@ class ChannelsAccountCrawler(CrawlerAccounts):
         video_list = search_response["data"]["data"][0]["subBoxes"]
         for video in tqdm(video_list, desc="crawler each video"):
             try:
-                self.process_search_response(video, seed_title=title)
+                self.process_search_response(video)
             except Exception as e:
                 log(
                     task="crawler_channels_account_videos",
@@ -230,8 +150,6 @@ class ChannelsAccountCrawler(CrawlerAccounts):
                     }
                 )
 
-        self.cal_video_similarity_score()
-
 
 class ToutiaoAccountCrawler(CrawlerAccounts):
 
@@ -247,38 +165,38 @@ class ToutiaoAccountCrawler(CrawlerAccounts):
         )
         return seed_video_list
 
-    def process_toutiao_video(self, video, seed_account_name, seed_title):
+    def process_toutiao_account(self, video):
 
         # process video item and save to database
-        video_item = Item()
+        account_item = Item()
         user_info = video["user_info"]
-        video_item.add("account_name", user_info["name"])
-        video_item.add("account_id", user_info["user_id"])
-        video_item.add("platform", "toutiao")
-        video_item.add("recommend_video_id", video["id"])
-        video_item.add("title", video["title"])
-        video_item.add("read_cnt", video.get("read_count"))
-        video_item.add("duration", video["video_duration"])
-        video_item.add("seed_account", seed_account_name)
-        video_item.add("seed_title", seed_title)
-        video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d"))
+        account_item.add("account_name", user_info["name"])
+        account_item.add("account_id", user_info["user_id"])
+        account_item.add("platform", "toutiao")
+        account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
+
+        # fetch account video first page video list
+        fetch_response = get_toutiao_account_video_list(account_id=user_info["user_id"], cookie=blogger_cookie)
+        video_list = fetch_response["data"]
+        title_list = [i["title"] for i in video_list]
+        title_list_str = json.dumps(title_list, ensure_ascii=False)
+        account_item.add("title_list", title_list_str)
+
         # check item
-        video_item.check(source="association")
+        account_item.check(source="candidate_account")
 
         # insert into database
-        self.insert_video_into_recommend_table(video_item.item)
+        self.insert_video_into_recommend_table(account_item.item)
 
     def get_recommend_video_list(self, seed_video: dict):
 
         # get recommend videos for each video
         seed_video_id = seed_video["url_unique_md5"]
-        seed_account_name = seed_video["out_account_name"]
-        seed_title = seed_video["article_title"]
-        recommend_response = get_associated_recommendation(seed_video_id, cookie)
+        recommend_response = get_associated_recommendation(seed_video_id, recommend_cookie)
         recommend_video_list = recommend_response["data"]
         for video in tqdm(recommend_video_list):
             try:
-                self.process_toutiao_video(video, seed_account_name, seed_title)
+                self.process_toutiao_account(video)
 
             except Exception as e:
                 log(
@@ -317,44 +235,32 @@ class ToutiaoAccountCrawler(CrawlerAccounts):
                     },
                 )
 
-        self.cal_video_similarity_score()
-
 
 class HaoKanAccountCrawler(CrawlerAccounts):
 
-    def process_haokan_video(self, video: dict, seed_title: str) -> None:
+    def process_haokan_video(self, video: dict) -> None:
         """
         process_haokan_video
         """
 
-        video_item = Item()
-        video_item.add("account_name", video['author'])
-        video_item.add("account_id", video['author_id'])
-        video_item.add("platform", "hksp")
-        video_item.add("recommend_video_id", video['vid'])
-        video_item.add("title", video['title'])
-        read_num_string = video['read_num'].replace("次播放", "")
-        if "万" in read_num_string:
-            read_num_string = read_num_string.replace("万", "")
-            read_num = int(float(read_num_string) * 10000)
-        else:
-            read_num = int(read_num_string)
-        video_item.add("read_cnt", int(read_num))
-        duration_string = video['duration']
-        duration_list = duration_string.split(":")
-        if len(duration_list) > 2:
-            # video too long
-            return
-        duration = int(duration_list[0]) * 60 + int(duration_list[1])
-        video_item.add("duration", duration)
-        video_item.add("seed_account", "SearchWithOutAccount")
-        video_item.add("seed_title", seed_title)
-        video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d"))
+        account_item = Item()
+        account_item.add("account_name", video['author'])
+        account_item.add("account_id", video['author_id'])
+        account_item.add("platform", "hksp")
+        account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
+
+        # fetch account video first page video list
+        fetch_response = baidu_account_video_crawler(account_id=video['author_id'])
+        video_list = fetch_response["results"]
+        title_list = [i["content"]["title"] for i in video_list]
+        title_list_str = json.dumps(title_list, ensure_ascii=False)
+        account_item.add("title_list", title_list_str)
+
         # check item
-        video_item.check(source="association")
+        account_item.check(source="candidate_account")
 
         # insert into database
-        self.insert_video_into_recommend_table(video_item.item)
+        self.insert_video_into_recommend_table(account_item.item)
 
     def search_videos_in_haokan_video(self, title: str) -> None:
         """
@@ -364,7 +270,7 @@ class HaoKanAccountCrawler(CrawlerAccounts):
         video_list = search_response["data"]["list"]
         for video in tqdm(video_list, desc="search videos"):
             try:
-                self.process_haokan_video(video, seed_title=title)
+                self.process_haokan_video(video)
             except Exception as e:
                 log(
                     task="haokan_search_crawler",
@@ -394,5 +300,3 @@ class HaoKanAccountCrawler(CrawlerAccounts):
                         "traceback": traceback.format_exc()
                     }
                 )
-
-        self.cal_video_similarity_score()