crawler_accounts.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. """
  2. @author: luojunhui
  3. """
  4. from __future__ import annotations
  5. import json
  6. import time
  7. import datetime
  8. import traceback
  9. import numpy as np
  10. from pymysql.cursors import DictCursor
  11. from tqdm import tqdm
  12. from applications import log
  13. from applications.api import similarity_between_title_list
  14. from applications.db import DatabaseConnector
  15. from applications.pipeline import scrape_account_entities_process
  16. from applications.utils import Item
  17. from applications.utils import insert_into_associated_recommendation_table
  18. from coldStartTasks.crawler.toutiao import get_associated_recommendation
  19. from coldStartTasks.crawler.channels import search_in_wechat_channel
  20. from coldStartTasks.crawler.channels import get_channel_account_videos
  21. from config import apolloConfig, long_articles_config
  22. config = apolloConfig()
  23. cookie = config.getConfigValue("toutiao_detail_recommend_cookie")
  24. class CrawlerAccounts:
  25. def __init__(self):
  26. self.db_client = DatabaseConnector(db_config=long_articles_config)
  27. self.db_client.connect()
  28. def insert_video_into_recommend_table(self, item):
  29. # whether account exists
  30. final_item = scrape_account_entities_process(item, self.db_client)
  31. if not final_item:
  32. return
  33. else:
  34. # save to db
  35. insert_into_associated_recommendation_table(db_client=self.db_client, associated_recommendation_item=final_item)
  36. def save_similarity_score_to_table(
  37. self, association_list:list[dict]
  38. ) -> int:
  39. """
  40. calculate similarity between seed_title_list and association_title_list
  41. """
  42. association_id_list = [i['id'] for i in association_list]
  43. association_title_list = [i['title'] for i in association_list]
  44. seed_title_list = [i['seed_title'] for i in association_list]
  45. similarity_score_list = similarity_between_title_list(seed_title_list, association_title_list)
  46. similarity_score_array = np.array(similarity_score_list)
  47. # get main diagonal score
  48. score_list = np.diag(similarity_score_array)
  49. batch_update_query = """
  50. update video_association
  51. set score = case id
  52. {}
  53. end
  54. where id in %s and score is null;
  55. """
  56. case_statement = []
  57. params = []
  58. for index, score in enumerate(score_list):
  59. association_id = association_id_list[index]
  60. case_statement.append(f"when %s then %s")
  61. params.extend([association_id, score])
  62. params.append(tuple(association_id_list))
  63. case_statements = "\n".join(case_statement)
  64. formatted_sql = batch_update_query.format(case_statements)
  65. affected_rows = self.db_client.save(formatted_sql, params)
  66. return affected_rows
  67. def get_video_list_without_score(self):
  68. fetch_query = f"""
  69. select id, title, seed_title
  70. from video_association
  71. where score is null;
  72. """
  73. fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  74. return fetch_response
  75. class ChannelAccountCrawler(CrawlerAccounts):
  76. """
  77. crawler channel accounts
  78. strategy:
  79. 1. try to get seed titles from database
  80. 2. try to get hot_points from web
  81. 2. use search api to get accounts
  82. """
  83. def get_seed_keys(self):
  84. """
  85. get search keys from database
  86. """
  87. fetch_query = "select title from article_pool_promotion_source where status = 1 and deleted = 0 order by level limit 100;"
  88. result = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  89. return result
  90. def process_each_video(self, video: dict, seed_title: str):
  91. """
  92. process video item and save to database
  93. """
  94. account_name = video['items'][0]['source']['title']
  95. search_account_response = search_in_wechat_channel(search_key=account_name, search_type=2)
  96. account_detail = search_account_response['data']['data'][0]['items'][0]
  97. account_id = account_detail['jumpInfo']['userName']
  98. search_video_response = get_channel_account_videos(account_id)
  99. video_list = search_video_response['data']['object']
  100. for video in video_list[:5]:
  101. video_item = Item()
  102. video_item.add("account_name", account_name)
  103. video_item.add("account_id", account_id)
  104. video_item.add("recommend_video_id", video['id'])
  105. video_item.add("title", video['objectDesc']['description'])
  106. video_item.add("duration", video['objectDesc']['media'][0]['VideoPlayLen'])
  107. video_item.add("seed_account", "SearchWithOutAccount")
  108. video_item.add("seed_title", seed_title)
  109. video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  110. video_item.add("platform", "sph")
  111. # check item
  112. video_item.check(source="association")
  113. # save to db
  114. self.insert_video_into_recommend_table(video_item.item)
  115. def search_by_title_from_database(self, title: str) -> None:
  116. """
  117. search
  118. """
  119. search_response = search_in_wechat_channel(search_key=title, search_type=1)
  120. print(search_response)
  121. video_list = search_response['data']['data'][0]['subBoxes']
  122. for video in tqdm(video_list, desc='crawler each video'):
  123. try:
  124. self.process_each_video(video, seed_title=title)
  125. except Exception as e:
  126. print(e)
  127. def search_by_title_from_hotpoint(self, title: str) -> None:
  128. return
  129. def deal(self):
  130. seed_title_list = self.get_seed_keys()
  131. for item in tqdm(seed_title_list, desc='crawler each title'):
  132. try:
  133. self.search_by_title_from_database(title=item['title'])
  134. except Exception as e:
  135. print(e)
  136. # cal similarity score
  137. video_list = self.get_video_list_without_score()
  138. affected_rows = self.save_similarity_score_to_table(video_list)
  139. print(affected_rows)
  140. class ToutiaoAccountCrawler(CrawlerAccounts):
  141. def get_seed_videos(self):
  142. fetch_query = f"""
  143. select out_account_name, article_title, url_unique_md5
  144. from publish_single_video_source
  145. where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0
  146. order by score desc limit 100;
  147. """
  148. seed_video_list = self.db_client.fetch(
  149. query=fetch_query, cursor_type=DictCursor
  150. )
  151. return seed_video_list
  152. def process_each_video(self, video, seed_account_name, seed_title):
  153. # process video item and save to database
  154. video_item = Item()
  155. user_info = video["user_info"]
  156. video_item.add("account_name", user_info["name"])
  157. video_item.add("account_id", user_info["user_id"])
  158. video_item.add("platform", "toutiao")
  159. video_item.add("recommend_video_id", video["id"])
  160. video_item.add("title", video["title"])
  161. video_item.add("read_cnt", video.get("read_count"))
  162. video_item.add("duration", video["video_duration"])
  163. video_item.add("seed_account", seed_account_name)
  164. video_item.add("seed_title", seed_title)
  165. video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  166. # check item
  167. video_item.check(source="association")
  168. # insert into database
  169. self.insert_video_into_recommend_table(video_item.item)
  170. def get_recommend_video_list(self, seed_video: dict):
  171. # get recommend videos for each video
  172. seed_video_id = seed_video["url_unique_md5"]
  173. seed_account_name = seed_video["out_account_name"]
  174. seed_title = seed_video["article_title"]
  175. recommend_response = get_associated_recommendation(seed_video_id, cookie)
  176. recommend_video_list = recommend_response["data"]
  177. for video in tqdm(recommend_video_list):
  178. try:
  179. self.process_each_video(video, seed_account_name, seed_title)
  180. except Exception as e:
  181. print(e)
  182. def deal(self):
  183. # start
  184. seed_video_list = self.get_seed_videos()
  185. for seed_video in tqdm(seed_video_list, desc="get each video recommendation"):
  186. try:
  187. self.get_recommend_video_list(seed_video)
  188. except Exception as e:
  189. log(
  190. task="toutiao_recommendation_crawler",
  191. function="save_each_recommendation",
  192. message="save recommendation failed",
  193. data={
  194. "error": str(e),
  195. "traceback": traceback.format_exc(),
  196. "seed_video": seed_video,
  197. },
  198. )
  199. # cal similarity score
  200. video_list = self.get_video_list_without_score()
  201. affected_rows = self.save_similarity_score_to_table(video_list)
  202. print(affected_rows)