luojunhui 3 bulan lalu
induk
melakukan
8ac596205f
1 mengubah file dengan 126 tambahan dan 11 penghapusan
  1. 126 11
      tasks/crawler_accounts.py

+ 126 - 11
tasks/crawler_accounts.py

@@ -8,16 +8,20 @@ import json
 import time
 import datetime
 import traceback
+import numpy as np
 
 from pymysql.cursors import DictCursor
 from tqdm import tqdm
 
 from applications import log
+from applications.api import similarity_between_title_list
 from applications.db import DatabaseConnector
 from applications.pipeline import scrape_account_entities_process
 from applications.utils import Item
 from applications.utils import insert_into_associated_recommendation_table
 from coldStartTasks.crawler.toutiao import get_associated_recommendation
+from coldStartTasks.crawler.channels import search_in_wechat_channel
+from coldStartTasks.crawler.channels import get_channel_account_videos
 from config import apolloConfig, long_articles_config
 
 config = apolloConfig()
@@ -30,12 +34,65 @@ class CrawlerAccounts:
         self.db_client = DatabaseConnector(db_config=long_articles_config)
         self.db_client.connect()
 
+    def insert_video_into_recommend_table(self, item):
+        # whether account exists
+        final_item = scrape_account_entities_process(item, self.db_client)
+        if not final_item:
+            return
+        else:
+            # save to db
+            insert_into_associated_recommendation_table(db_client=self.db_client, associated_recommendation_item=final_item)
+
+    def save_similarity_score_to_table(
+            self, association_list:list[dict]
+    ) -> int:
+        """
+        calculate similarity between seed_title_list and association_title_list
+        """
+        association_id_list = [i['id'] for i in association_list]
+        association_title_list = [i['title'] for i in association_list]
+        seed_title_list = [i['seed_title'] for i in association_list]
+        similarity_score_list =  similarity_between_title_list(seed_title_list, association_title_list)
+        similarity_score_array = np.array(similarity_score_list)
+
+        # get main diagonal score
+        score_list = np.diag(similarity_score_array)
+
+        batch_update_query = """
+            update video_association
+            set score = case id
+                {}
+            end
+            where id in %s and score is null;
+        """
+        case_statement = []
+        params = []
+        for index, score in enumerate(score_list):
+            association_id = association_id_list[index]
+            case_statement.append(f"when %s then %s")
+            params.extend([association_id, score])
+
+        params.append(tuple(association_id_list))
+        case_statements = "\n".join(case_statement)
+        formatted_sql =  batch_update_query.format(case_statements)
+        affected_rows = self.db_client.save(formatted_sql, params)
+        return affected_rows
+
+    def get_video_list_without_score(self):
+        fetch_query = f"""
+            select id, title, seed_title
+            from video_association
+            where score is null;
+        """
+        fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
+        return fetch_response
+
 
 class ChannelAccountCrawler(CrawlerAccounts):
     """
     crawler channel accounts
     strategy:
-        1. try to get search keys and titles from database
+        1. try to get seed titles from database
         2. try to get hot_points from web
         2. use search api to get accounts
     """
@@ -44,10 +101,65 @@ class ChannelAccountCrawler(CrawlerAccounts):
         """
         get search keys from database
         """
-        sql = "select * from datastat_sort_strategy limit 100;"
-        result = self.db_client.fetch(sql)
+        fetch_query = "select title from article_pool_promotion_source where status = 1 and deleted = 0 order by level limit 100;"
+        result = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
         return result
 
+    def process_each_video(self, video: dict, seed_title: str):
+        """
+        process video item and save to database
+        """
+        account_name = video['items'][0]['source']['title']
+        search_account_response = search_in_wechat_channel(search_key=account_name, search_type=2)
+        account_detail = search_account_response['data']['data'][0]['items'][0]
+        account_id = account_detail['jumpInfo']['userName']
+        search_video_response = get_channel_account_videos(account_id)
+        video_list = search_video_response['data']['object']
+        for video in video_list[:5]:
+            video_item = Item()
+            video_item.add("account_name", account_name)
+            video_item.add("account_id", account_id)
+            video_item.add("recommend_video_id", video['id'])
+            video_item.add("title", video['objectDesc']['description'])
+            video_item.add("duration", video['objectDesc']['media'][0]['VideoPlayLen'])
+            video_item.add("seed_account", "SearchWithOutAccount")
+            video_item.add("seed_title", seed_title)
+            video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d"))
+            video_item.add("platform", "sph")
+            # check item
+            video_item.check(source="association")
+
+            # save to db
+            self.insert_video_into_recommend_table(video_item.item)
+
+    def search_by_title_from_database(self, title: str) -> None:
+        """
+        search
+        """
+        search_response = search_in_wechat_channel(search_key=title, search_type=1)
+        print(search_response)
+        video_list = search_response['data']['data'][0]['subBoxes']
+        for video in tqdm(video_list, desc='crawler each video'):
+            try:
+                self.process_each_video(video, seed_title=title)
+            except Exception as e:
+                print(e)
+
+    def search_by_title_from_hotpoint(self, title: str) -> None:
+        return
+
+    def deal(self):
+        seed_title_list = self.get_seed_keys()
+        for item in tqdm(seed_title_list, desc='crawler each title'):
+            try:
+                self.search_by_title_from_database(title=item['title'])
+            except Exception as e:
+                print(e)
+
+        # cal similarity score
+        video_list = self.get_video_list_without_score()
+        affected_rows = self.save_similarity_score_to_table(video_list)
+        print(affected_rows)
 
 class ToutiaoAccountCrawler(CrawlerAccounts):
 
@@ -81,13 +193,8 @@ class ToutiaoAccountCrawler(CrawlerAccounts):
         # check item
         video_item.check(source="association")
 
-        # whether account exists
-        final_item = scrape_account_entities_process(video_item.item, self.db_client)
-        if not final_item:
-            return
-        else:
-            # save to db
-            insert_into_associated_recommendation_table(db_client=self.db_client, associated_recommendation_item=final_item)
+        # insert into database
+        self.insert_video_into_recommend_table(video_item.item)
 
     def get_recommend_video_list(self, seed_video: dict):
 
@@ -98,7 +205,10 @@ class ToutiaoAccountCrawler(CrawlerAccounts):
         recommend_response = get_associated_recommendation(seed_video_id, cookie)
         recommend_video_list = recommend_response["data"]
         for video in tqdm(recommend_video_list):
-            self.process_each_video(video, seed_account_name, seed_title)
+            try:
+                self.process_each_video(video, seed_account_name, seed_title)
+            except Exception as e:
+                print(e)
 
     def deal(self):
 
@@ -118,3 +228,8 @@ class ToutiaoAccountCrawler(CrawlerAccounts):
                         "seed_video": seed_video,
                     },
                 )
+
+        # cal similarity score
+        video_list = self.get_video_list_without_score()
+        affected_rows = self.save_similarity_score_to_table(video_list)
+        print(affected_rows)