crawler_accounts_by_association.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. """
  2. @author: luojunhui
  3. """
  4. from __future__ import annotations
  5. import json
  6. import time
  7. import datetime
  8. import traceback
  9. import numpy as np
  10. from pymysql.cursors import DictCursor
  11. from tqdm import tqdm
  12. from applications import log
  13. from applications.api import similarity_between_title_list
  14. from applications.db import DatabaseConnector
  15. from applications.pipeline import scrape_account_entities_process
  16. from applications.utils import Item
  17. from applications.utils import insert_into_associated_recommendation_table
  18. from coldStartTasks.crawler.toutiao import get_associated_recommendation
  19. from coldStartTasks.crawler.channels import search_in_wechat_channel
  20. from coldStartTasks.crawler.channels import get_channel_account_videos
  21. from config import apolloConfig, long_articles_config
  22. config = apolloConfig()
  23. cookie = config.getConfigValue("toutiao_detail_recommend_cookie")
  24. class CrawlerAccounts:
  25. def __init__(self):
  26. self.db_client = DatabaseConnector(db_config=long_articles_config)
  27. self.db_client.connect()
  28. def get_seed_keys(self)->list[dict]:
  29. """
  30. get search keys from database
  31. """
  32. fetch_query = "select title from article_pool_promotion_source where status = 1 and deleted = 0 order by level limit 100;"
  33. result = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  34. return result
  35. def insert_video_into_recommend_table(self, item: dict) -> None:
  36. # whether account exists
  37. final_item = scrape_account_entities_process(item, self.db_client)
  38. if not final_item:
  39. return
  40. else:
  41. # save to db
  42. insert_into_associated_recommendation_table(
  43. db_client=self.db_client, associated_recommendation_item=final_item
  44. )
  45. def save_similarity_score_to_table(self, association_list: list[dict]) -> int:
  46. """
  47. calculate similarity between seed_title_list and association_title_list
  48. """
  49. association_id_list = [i["id"] for i in association_list]
  50. association_title_list = [i["title"] for i in association_list]
  51. seed_title_list = [i["seed_title"] for i in association_list]
  52. similarity_score_list = similarity_between_title_list(
  53. seed_title_list, association_title_list
  54. )
  55. similarity_score_array = np.array(similarity_score_list)
  56. # get main diagonal score
  57. score_list = np.diag(similarity_score_array)
  58. batch_update_query = """
  59. update video_association
  60. set score = case id
  61. {}
  62. end
  63. where id in %s and score is null;
  64. """
  65. case_statement = []
  66. params = []
  67. for index, score in enumerate(score_list):
  68. association_id = association_id_list[index]
  69. case_statement.append(f"when %s then %s")
  70. params.extend([association_id, score])
  71. params.append(tuple(association_id_list))
  72. case_statements = "\n".join(case_statement)
  73. formatted_sql = batch_update_query.format(case_statements)
  74. affected_rows = self.db_client.save(formatted_sql, params)
  75. return affected_rows
  76. def get_video_list_without_score(self):
  77. fetch_query = f"""
  78. select id, title, seed_title
  79. from video_association
  80. where score is null;
  81. """
  82. fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  83. return fetch_response
  84. def get_video_list_with_score(self, platform: str):
  85. """
  86. find video from video association
  87. """
  88. fetch_query = f"""
  89. select id, account_name, recommend_video_id, title, read_cnt, duration, seed_account, seed_title
  90. from video_association
  91. where score > 0.5 and platform = '{platform}' and status = 0
  92. order by account_name;
  93. """
  94. fetch_response = self.db_client.fetch(query=fetch_query)
  95. return fetch_response
  96. def update_video_status(self, video_id_tuple: tuple, ori_status: int, new_status: int) -> int:
  97. update_query = f"""
  98. update video_association
  99. set status = %s
  100. where id in %s and status = %s;
  101. """
  102. affected_rows = self.db_client.save(query=update_query, params=(new_status, video_id_tuple, ori_status))
  103. return affected_rows
  104. class ChannelsAccountCrawler(CrawlerAccounts):
  105. """
  106. crawler channel accounts
  107. strategy:
  108. 1. try to get seed titles from database
  109. 2. try to get hot_points from web
  110. 2. use search api to get accounts
  111. """
  112. def process_each_video(self, video: dict, seed_title: str):
  113. """
  114. process video item and save to database
  115. """
  116. account_name = video["items"][0]["source"]["title"]
  117. search_account_response = search_in_wechat_channel(
  118. search_key=account_name, search_type=2
  119. )
  120. account_detail = search_account_response["data"]["data"][0]["items"][0]
  121. account_id = account_detail["jumpInfo"]["userName"]
  122. search_video_response = get_channel_account_videos(account_id)
  123. video_list = search_video_response["data"]["object"]
  124. for video in video_list[:5]:
  125. try:
  126. video_item = Item()
  127. video_item.add("account_name", account_name)
  128. video_item.add("account_id", account_id)
  129. video_item.add("recommend_video_id", video["id"])
  130. video_item.add("title", video["objectDesc"]["description"])
  131. video_item.add("duration", video["objectDesc"]["media"][0]["VideoPlayLen"])
  132. video_item.add("seed_account", "SearchWithOutAccount")
  133. video_item.add("seed_title", seed_title)
  134. video_item.add(
  135. "recommend_date", datetime.datetime.today().strftime("%Y-%m-%d")
  136. )
  137. video_item.add("platform", "sph")
  138. # check item
  139. video_item.check(source="association")
  140. # save to db
  141. self.insert_video_into_recommend_table(video_item.item)
  142. except Exception as e:
  143. log(
  144. task="channel account crawler",
  145. function="process_each_video",
  146. message="create item and save to db failed",
  147. data={
  148. "video": video,
  149. "error": str(e),
  150. "traceback": traceback.format_exc()
  151. }
  152. )
  153. def search_by_title_from_database(self, title: str) -> None:
  154. """
  155. search
  156. """
  157. search_response = search_in_wechat_channel(search_key=title, search_type=1)
  158. # print(search_response)
  159. video_list = search_response["data"]["data"][0]["subBoxes"]
  160. for video in tqdm(video_list, desc="crawler each video"):
  161. try:
  162. self.process_each_video(video, seed_title=title)
  163. except Exception as e:
  164. print(e)
  165. def search_by_title_from_hotpoint(self, title: str) -> None:
  166. return
  167. def deal(self):
  168. seed_title_list = self.get_seed_keys()
  169. for item in tqdm(seed_title_list, desc="crawler each title"):
  170. try:
  171. self.search_by_title_from_database(title=item["title"])
  172. except Exception as e:
  173. print(e)
  174. # cal similarity score
  175. video_list = self.get_video_list_without_score()
  176. affected_rows = self.save_similarity_score_to_table(video_list)
  177. print(affected_rows)
  178. class ToutiaoAccountCrawler(CrawlerAccounts):
  179. def get_seed_videos(self):
  180. fetch_query = f"""
  181. select out_account_name, article_title, url_unique_md5
  182. from publish_single_video_source
  183. where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0
  184. order by score desc limit 100;
  185. """
  186. seed_video_list = self.db_client.fetch(
  187. query=fetch_query, cursor_type=DictCursor
  188. )
  189. return seed_video_list
  190. def process_each_video(self, video, seed_account_name, seed_title):
  191. # process video item and save to database
  192. video_item = Item()
  193. user_info = video["user_info"]
  194. video_item.add("account_name", user_info["name"])
  195. video_item.add("account_id", user_info["user_id"])
  196. video_item.add("platform", "toutiao")
  197. video_item.add("recommend_video_id", video["id"])
  198. video_item.add("title", video["title"])
  199. video_item.add("read_cnt", video.get("read_count"))
  200. video_item.add("duration", video["video_duration"])
  201. video_item.add("seed_account", seed_account_name)
  202. video_item.add("seed_title", seed_title)
  203. video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  204. # check item
  205. video_item.check(source="association")
  206. # insert into database
  207. self.insert_video_into_recommend_table(video_item.item)
  208. def get_recommend_video_list(self, seed_video: dict):
  209. # get recommend videos for each video
  210. seed_video_id = seed_video["url_unique_md5"]
  211. seed_account_name = seed_video["out_account_name"]
  212. seed_title = seed_video["article_title"]
  213. recommend_response = get_associated_recommendation(seed_video_id, cookie)
  214. recommend_video_list = recommend_response["data"]
  215. for video in tqdm(recommend_video_list):
  216. try:
  217. self.process_each_video(video, seed_account_name, seed_title)
  218. except Exception as e:
  219. print(e)
  220. def deal(self):
  221. # start
  222. seed_video_list = self.get_seed_videos()
  223. for seed_video in tqdm(seed_video_list, desc="get each video recommendation"):
  224. try:
  225. self.get_recommend_video_list(seed_video)
  226. except Exception as e:
  227. log(
  228. task="toutiao_recommendation_crawler",
  229. function="save_each_recommendation",
  230. message="save recommendation failed",
  231. data={
  232. "error": str(e),
  233. "traceback": traceback.format_exc(),
  234. "seed_video": seed_video,
  235. },
  236. )
  237. # cal similarity score
  238. video_list = self.get_video_list_without_score()
  239. affected_rows = self.save_similarity_score_to_table(video_list)
  240. print(affected_rows)
  241. class HaoKanAccountCrawler(CrawlerAccounts):
  242. def deal(self):
  243. raise NotImplementedError()