crawler_accounts_by_association.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. """
  2. @author: luojunhui
  3. """
  4. from __future__ import annotations
  5. import json
  6. import time
  7. import datetime
  8. import traceback
  9. import numpy as np
  10. from pymysql.cursors import DictCursor
  11. from tqdm import tqdm
  12. from applications import log
  13. from applications.api import similarity_between_title_list
  14. from applications.db import DatabaseConnector
  15. from applications.pipeline import scrape_account_entities_process
  16. from applications.utils import Item
  17. from applications.utils import insert_into_associated_recommendation_table
  18. from coldStartTasks.crawler.toutiao import get_associated_recommendation
  19. from coldStartTasks.crawler.channels import search_in_wechat_channel
  20. from coldStartTasks.crawler.channels import get_channel_account_videos
  21. from config import apolloConfig, long_articles_config
  22. config = apolloConfig()
  23. cookie = config.getConfigValue("toutiao_detail_recommend_cookie")
  24. class CrawlerAccounts:
  25. def __init__(self):
  26. self.db_client = DatabaseConnector(db_config=long_articles_config)
  27. self.db_client.connect()
  28. def get_seed_keys(self):
  29. """
  30. get search keys from database
  31. """
  32. fetch_query = "select title from article_pool_promotion_source where status = 1 and deleted = 0 order by level limit 100;"
  33. result = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  34. return result
  35. def insert_video_into_recommend_table(self, item):
  36. # whether account exists
  37. final_item = scrape_account_entities_process(item, self.db_client)
  38. if not final_item:
  39. return
  40. else:
  41. # save to db
  42. insert_into_associated_recommendation_table(
  43. db_client=self.db_client, associated_recommendation_item=final_item
  44. )
  45. def save_similarity_score_to_table(self, association_list: list[dict]) -> int:
  46. """
  47. calculate similarity between seed_title_list and association_title_list
  48. """
  49. association_id_list = [i["id"] for i in association_list]
  50. association_title_list = [i["title"] for i in association_list]
  51. seed_title_list = [i["seed_title"] for i in association_list]
  52. similarity_score_list = similarity_between_title_list(
  53. seed_title_list, association_title_list
  54. )
  55. similarity_score_array = np.array(similarity_score_list)
  56. # get main diagonal score
  57. score_list = np.diag(similarity_score_array)
  58. batch_update_query = """
  59. update video_association
  60. set score = case id
  61. {}
  62. end
  63. where id in %s and score is null;
  64. """
  65. case_statement = []
  66. params = []
  67. for index, score in enumerate(score_list):
  68. association_id = association_id_list[index]
  69. case_statement.append(f"when %s then %s")
  70. params.extend([association_id, score])
  71. params.append(tuple(association_id_list))
  72. case_statements = "\n".join(case_statement)
  73. formatted_sql = batch_update_query.format(case_statements)
  74. affected_rows = self.db_client.save(formatted_sql, params)
  75. return affected_rows
  76. def get_video_list_without_score(self):
  77. fetch_query = f"""
  78. select id, title, seed_title
  79. from video_association
  80. where score is null;
  81. """
  82. fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  83. return fetch_response
  84. class ChannelAccountCrawler(CrawlerAccounts):
  85. """
  86. crawler channel accounts
  87. strategy:
  88. 1. try to get seed titles from database
  89. 2. try to get hot_points from web
  90. 2. use search api to get accounts
  91. """
  92. def process_each_video(self, video: dict, seed_title: str):
  93. """
  94. process video item and save to database
  95. """
  96. account_name = video["items"][0]["source"]["title"]
  97. search_account_response = search_in_wechat_channel(
  98. search_key=account_name, search_type=2
  99. )
  100. account_detail = search_account_response["data"]["data"][0]["items"][0]
  101. account_id = account_detail["jumpInfo"]["userName"]
  102. search_video_response = get_channel_account_videos(account_id)
  103. video_list = search_video_response["data"]["object"]
  104. for video in video_list[:5]:
  105. video_item = Item()
  106. video_item.add("account_name", account_name)
  107. video_item.add("account_id", account_id)
  108. video_item.add("recommend_video_id", video["id"])
  109. video_item.add("title", video["objectDesc"]["description"])
  110. video_item.add("duration", video["objectDesc"]["media"][0]["VideoPlayLen"])
  111. video_item.add("seed_account", "SearchWithOutAccount")
  112. video_item.add("seed_title", seed_title)
  113. video_item.add(
  114. "recommend_date", datetime.datetime.today().strftime("%Y-%m-%d")
  115. )
  116. video_item.add("platform", "sph")
  117. # check item
  118. video_item.check(source="association")
  119. # save to db
  120. self.insert_video_into_recommend_table(video_item.item)
  121. def search_by_title_from_database(self, title: str) -> None:
  122. """
  123. search
  124. """
  125. search_response = search_in_wechat_channel(search_key=title, search_type=1)
  126. # print(search_response)
  127. video_list = search_response["data"]["data"][0]["subBoxes"]
  128. for video in tqdm(video_list, desc="crawler each video"):
  129. try:
  130. self.process_each_video(video, seed_title=title)
  131. except Exception as e:
  132. print(e)
  133. def search_by_title_from_hotpoint(self, title: str) -> None:
  134. return
  135. def deal(self):
  136. seed_title_list = self.get_seed_keys()
  137. for item in tqdm(seed_title_list, desc="crawler each title"):
  138. try:
  139. self.search_by_title_from_database(title=item["title"])
  140. except Exception as e:
  141. print(e)
  142. # cal similarity score
  143. video_list = self.get_video_list_without_score()
  144. affected_rows = self.save_similarity_score_to_table(video_list)
  145. print(affected_rows)
  146. class ToutiaoAccountCrawler(CrawlerAccounts):
  147. def get_seed_videos(self):
  148. fetch_query = f"""
  149. select out_account_name, article_title, url_unique_md5
  150. from publish_single_video_source
  151. where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0
  152. order by score desc limit 100;
  153. """
  154. seed_video_list = self.db_client.fetch(
  155. query=fetch_query, cursor_type=DictCursor
  156. )
  157. return seed_video_list
  158. def process_each_video(self, video, seed_account_name, seed_title):
  159. # process video item and save to database
  160. video_item = Item()
  161. user_info = video["user_info"]
  162. video_item.add("account_name", user_info["name"])
  163. video_item.add("account_id", user_info["user_id"])
  164. video_item.add("platform", "toutiao")
  165. video_item.add("recommend_video_id", video["id"])
  166. video_item.add("title", video["title"])
  167. video_item.add("read_cnt", video.get("read_count"))
  168. video_item.add("duration", video["video_duration"])
  169. video_item.add("seed_account", seed_account_name)
  170. video_item.add("seed_title", seed_title)
  171. video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  172. # check item
  173. video_item.check(source="association")
  174. # insert into database
  175. self.insert_video_into_recommend_table(video_item.item)
  176. def get_recommend_video_list(self, seed_video: dict):
  177. # get recommend videos for each video
  178. seed_video_id = seed_video["url_unique_md5"]
  179. seed_account_name = seed_video["out_account_name"]
  180. seed_title = seed_video["article_title"]
  181. recommend_response = get_associated_recommendation(seed_video_id, cookie)
  182. recommend_video_list = recommend_response["data"]
  183. for video in tqdm(recommend_video_list):
  184. try:
  185. self.process_each_video(video, seed_account_name, seed_title)
  186. except Exception as e:
  187. print(e)
  188. def deal(self):
  189. # start
  190. seed_video_list = self.get_seed_videos()
  191. for seed_video in tqdm(seed_video_list, desc="get each video recommendation"):
  192. try:
  193. self.get_recommend_video_list(seed_video)
  194. except Exception as e:
  195. log(
  196. task="toutiao_recommendation_crawler",
  197. function="save_each_recommendation",
  198. message="save recommendation failed",
  199. data={
  200. "error": str(e),
  201. "traceback": traceback.format_exc(),
  202. "seed_video": seed_video,
  203. },
  204. )
  205. # cal similarity score
  206. video_list = self.get_video_list_without_score()
  207. affected_rows = self.save_similarity_score_to_table(video_list)
  208. print(affected_rows)
  209. class HaoKanAccountCrawler(CrawlerAccounts):
  210. def deal(self):
  211. return