crawler_accounts_by_association.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. """
  2. @author: luojunhui
  3. """
  4. from __future__ import annotations
  5. import datetime
  6. import traceback
  7. import numpy as np
  8. from pymysql.cursors import DictCursor
  9. from tqdm import tqdm
  10. from applications import log
  11. from applications.api import similarity_between_title_list
  12. from applications.db import DatabaseConnector
  13. from applications.pipeline import scrape_account_entities_process
  14. from applications.utils import Item
  15. from applications.utils import insert_into_associated_recommendation_table
  16. from coldStartTasks.crawler.baidu import haokan_search_videos
  17. from coldStartTasks.crawler.toutiao import get_associated_recommendation
  18. from coldStartTasks.crawler.channels import search_in_wechat_channel
  19. from coldStartTasks.crawler.channels import get_channel_account_videos
  20. from config import apolloConfig, long_articles_config
  21. config = apolloConfig()
  22. cookie = config.getConfigValue("toutiao_detail_recommend_cookie")
  23. class CrawlerAccounts:
  24. def __init__(self):
  25. self.db_client = DatabaseConnector(db_config=long_articles_config)
  26. self.db_client.connect()
  27. def get_seed_keys(self)->list[dict]:
  28. """
  29. get search keys from database
  30. """
  31. fetch_query = "select title from article_pool_promotion_source where status = 1 and deleted = 0 order by level limit 100;"
  32. result = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  33. return result
  34. def insert_video_into_recommend_table(self, item: dict) -> None:
  35. # whether account exists
  36. final_item = scrape_account_entities_process(item, self.db_client)
  37. if not final_item:
  38. return
  39. else:
  40. # save to db
  41. insert_into_associated_recommendation_table(
  42. db_client=self.db_client, associated_recommendation_item=final_item
  43. )
  44. def save_similarity_score_to_table(self, association_list: list[dict]) -> int:
  45. """
  46. calculate similarity between seed_title_list and association_title_list
  47. """
  48. association_id_list = [i["id"] for i in association_list]
  49. association_title_list = [i["title"] for i in association_list]
  50. seed_title_list = [i["seed_title"] for i in association_list]
  51. similarity_score_list = similarity_between_title_list(
  52. seed_title_list, association_title_list
  53. )
  54. similarity_score_array = np.array(similarity_score_list)
  55. # get main diagonal score
  56. score_list = np.diag(similarity_score_array)
  57. batch_update_query = """
  58. update video_association
  59. set score = case id
  60. {}
  61. end
  62. where id in %s and score is null;
  63. """
  64. case_statement = []
  65. params = []
  66. for index, score in enumerate(score_list):
  67. association_id = association_id_list[index]
  68. case_statement.append(f"when %s then %s")
  69. params.extend([association_id, score])
  70. params.append(tuple(association_id_list))
  71. case_statements = "\n".join(case_statement)
  72. formatted_sql = batch_update_query.format(case_statements)
  73. affected_rows = self.db_client.save(formatted_sql, params)
  74. return affected_rows
  75. def get_video_list_without_score(self):
  76. fetch_query = f"""
  77. select id, title, seed_title
  78. from video_association
  79. where score is null;
  80. """
  81. fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  82. return fetch_response
  83. def get_video_list_with_score(self, platform: str):
  84. """
  85. find video from video association
  86. """
  87. fetch_query = f"""
  88. select id, account_name, recommend_video_id, title, read_cnt, duration, seed_account, seed_title
  89. from video_association
  90. where score > %s and platform = %s and status = %s
  91. order by account_name;
  92. """
  93. fetch_response = self.db_client.fetch(query=fetch_query, params=(0.5, platform, 0))
  94. return fetch_response
  95. def update_video_status(self, video_id_tuple: tuple, ori_status: int, new_status: int) -> int:
  96. update_query = f"""
  97. update video_association
  98. set status = %s
  99. where id in %s and status = %s;
  100. """
  101. affected_rows = self.db_client.save(query=update_query, params=(new_status, video_id_tuple, ori_status))
  102. return affected_rows
  103. class ChannelsAccountCrawler(CrawlerAccounts):
  104. """
  105. crawler channel accounts
  106. strategy:
  107. 1. try to get seed titles from database
  108. 2. try to get hot_points from web
  109. 2. use search api to get accounts
  110. """
  111. def process_channels_video(self, video: dict, seed_title: str, account_name: str, account_id: str):
  112. """
  113. process video item and save to database
  114. """
  115. video_item = Item()
  116. video_item.add("account_name", account_name)
  117. video_item.add("account_id", account_id)
  118. video_item.add("recommend_video_id", video["id"])
  119. video_item.add("title", video["objectDesc"]["description"])
  120. video_item.add("duration", video["objectDesc"]["media"][0]["VideoPlayLen"])
  121. video_item.add("seed_account", "SearchWithOutAccount")
  122. video_item.add("seed_title", seed_title)
  123. video_item.add(
  124. "recommend_date", datetime.datetime.today().strftime("%Y-%m-%d")
  125. )
  126. video_item.add("platform", "sph")
  127. # check item
  128. video_item.check(source="association")
  129. # save to db
  130. self.insert_video_into_recommend_table(video_item.item)
  131. def process_search_response(self, video: dict, seed_title: str):
  132. """
  133. 通过搜索视频的账号名称去搜索账号,并且抓取该账号下的第一页视频
  134. """
  135. account_name = video["items"][0]["source"]["title"]
  136. # search account detail
  137. search_account_response = search_in_wechat_channel(
  138. search_key=account_name, search_type=2
  139. )
  140. account_detail = search_account_response["data"]["data"][0]["items"][0]
  141. account_id = account_detail["jumpInfo"]["userName"]
  142. # fetch account video list
  143. search_video_response = get_channel_account_videos(account_id)
  144. video_list = search_video_response["data"]["object"]
  145. # process and insert each video
  146. for video in video_list:
  147. try:
  148. self.process_channels_video(video, seed_title, account_name, account_id)
  149. except Exception as e:
  150. log(
  151. task="crawler_channels_account_videos",
  152. function="process_channels_video",
  153. message="process video failed",
  154. data={
  155. "video": video,
  156. "error": str(e),
  157. "traceback": traceback.format_exc()
  158. }
  159. )
  160. def search_video_in_channels(self, title: str) -> None:
  161. """
  162. search
  163. """
  164. search_response = search_in_wechat_channel(search_key=title, search_type=1)
  165. video_list = search_response["data"]["data"][0]["subBoxes"]
  166. for video in tqdm(video_list, desc="crawler each video"):
  167. try:
  168. self.process_search_response(video, seed_title=title)
  169. except Exception as e:
  170. log(
  171. task="channels account crawler",
  172. function="process_search_response",
  173. message="search by title failed",
  174. data={
  175. "video": video,
  176. "error": str(e),
  177. "traceback": traceback.format_exc()
  178. }
  179. )
  180. def deal(self):
  181. seed_title_list = self.get_seed_keys()
  182. for item in tqdm(seed_title_list, desc="crawler each title"):
  183. try:
  184. self.search_video_in_channels(title=item["title"])
  185. except Exception as e:
  186. log(
  187. task="channels account crawler",
  188. function="search_video_in_channels",
  189. message="search video in channels failed",
  190. data={
  191. "title": item["title"],
  192. "error": str(e),
  193. "traceback": traceback.format_exc()
  194. }
  195. )
  196. # cal similarity score
  197. video_list = self.get_video_list_without_score()
  198. affected_rows = self.save_similarity_score_to_table(video_list)
  199. print(affected_rows)
  200. class ToutiaoAccountCrawler(CrawlerAccounts):
  201. def get_seed_videos(self):
  202. fetch_query = f"""
  203. select out_account_name, article_title, url_unique_md5
  204. from publish_single_video_source
  205. where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0
  206. order by score desc limit 100;
  207. """
  208. seed_video_list = self.db_client.fetch(
  209. query=fetch_query, cursor_type=DictCursor
  210. )
  211. return seed_video_list
  212. def process_toutiao_video(self, video, seed_account_name, seed_title):
  213. # process video item and save to database
  214. video_item = Item()
  215. user_info = video["user_info"]
  216. video_item.add("account_name", user_info["name"])
  217. video_item.add("account_id", user_info["user_id"])
  218. video_item.add("platform", "toutiao")
  219. video_item.add("recommend_video_id", video["id"])
  220. video_item.add("title", video["title"])
  221. video_item.add("read_cnt", video.get("read_count"))
  222. video_item.add("duration", video["video_duration"])
  223. video_item.add("seed_account", seed_account_name)
  224. video_item.add("seed_title", seed_title)
  225. video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  226. # check item
  227. video_item.check(source="association")
  228. # insert into database
  229. self.insert_video_into_recommend_table(video_item.item)
  230. def get_recommend_video_list(self, seed_video: dict):
  231. # get recommend videos for each video
  232. seed_video_id = seed_video["url_unique_md5"]
  233. seed_account_name = seed_video["out_account_name"]
  234. seed_title = seed_video["article_title"]
  235. recommend_response = get_associated_recommendation(seed_video_id, cookie)
  236. recommend_video_list = recommend_response["data"]
  237. for video in tqdm(recommend_video_list):
  238. try:
  239. self.process_toutiao_video(video, seed_account_name, seed_title)
  240. except Exception as e:
  241. log(
  242. task="toutiao account crawler",
  243. function="process_toutiao_video",
  244. message="get recommend video failed",
  245. data={
  246. "video": video,
  247. "error": str(e),
  248. "traceback": traceback.format_exc()
  249. }
  250. )
  251. def get_category_recommend_list(self):
  252. """
  253. 品类推荐流几乎无视频,暂时不做
  254. """
  255. return NotImplementedError()
  256. def deal(self):
  257. # start
  258. seed_video_list = self.get_seed_videos()
  259. for seed_video in tqdm(seed_video_list, desc="get each video recommendation"):
  260. try:
  261. self.get_recommend_video_list(seed_video)
  262. except Exception as e:
  263. log(
  264. task="toutiao_recommendation_crawler",
  265. function="save_each_recommendation",
  266. message="save recommendation failed",
  267. data={
  268. "error": str(e),
  269. "traceback": traceback.format_exc(),
  270. "seed_video": seed_video,
  271. },
  272. )
  273. # cal similarity score
  274. video_list = self.get_video_list_without_score()
  275. affected_rows = self.save_similarity_score_to_table(video_list)
  276. print(affected_rows)
  277. class HaoKanAccountCrawler(CrawlerAccounts):
  278. def process_haokan_video(self, video: dict, seed_title: str) -> None:
  279. """
  280. process_haokan_video
  281. """
  282. video_item = Item()
  283. video_item.add("account_name", video['author'])
  284. video_item.add("account_id", video['author_id'])
  285. video_item.add("platform", "hksp")
  286. video_item.add("recommend_video_id", video['vid'])
  287. video_item.add("title", video['title'])
  288. read_num_string = video['read_num'].replace("次播放", "")
  289. if "万" in read_num_string:
  290. read_num_string = read_num_string.replace("万", "")
  291. read_num = int(float(read_num_string) * 10000)
  292. else:
  293. read_num = int(read_num_string)
  294. video_item.add("read_cnt", int(read_num))
  295. duration_string = video['duration']
  296. duration_list = duration_string.split(":")
  297. if len(duration_list) > 2:
  298. # video too long
  299. return
  300. duration = int(duration_list[0]) * 60 + int(duration_list[1])
  301. video_item.add("duration", duration)
  302. video_item.add("seed_account", "SearchWithOutAccount")
  303. video_item.add("seed_title", seed_title)
  304. video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  305. # check item
  306. video_item.check(source="association")
  307. # insert into database
  308. self.insert_video_into_recommend_table(video_item.item)
  309. def search_videos_in_haokan_video(self, title: str) -> None:
  310. """
  311. search_
  312. """
  313. search_response = haokan_search_videos(title)
  314. video_list = search_response["data"]["list"]
  315. for video in tqdm(video_list, desc="search videos"):
  316. try:
  317. self.process_haokan_video(video, seed_title=title)
  318. except Exception as e:
  319. log(
  320. task="haokan_search_crawler",
  321. function="process_haokan_video",
  322. message="process haokan video failed",
  323. data={
  324. "video": video,
  325. "error": str(e),
  326. "traceback": traceback.format_exc()
  327. }
  328. )
  329. def deal(self):
  330. seed_title_list = self.get_seed_keys()
  331. for seed_title in tqdm(seed_title_list, desc="crawler each title"):
  332. try:
  333. self.search_videos_in_haokan_video(seed_title["title"])
  334. except Exception as e:
  335. log(
  336. task="haokan_search_crawler",
  337. function="search_videos_in_haokan_video",
  338. message="search videos in haokan video failed",
  339. data={
  340. "title": seed_title["title"],
  341. "error": str(e),
  342. "traceback": traceback.format_exc()
  343. }
  344. )
  345. video_list = self.get_video_list_without_score()
  346. affected_rows = self.save_similarity_score_to_table(video_list)
  347. print(affected_rows)