crawler_accounts_by_association.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. """
  2. @author: luojunhui
  3. """
  4. from __future__ import annotations
  5. import json
  6. import datetime
  7. import traceback
  8. from pymysql.cursors import DictCursor
  9. from tqdm import tqdm
  10. from applications import log
  11. from applications.db import DatabaseConnector
  12. from applications.pipeline import scrape_account_entities_process
  13. from applications.utils import Item
  14. from applications.utils import insert_into_candidate_account_pool_table
  15. from coldStartTasks.crawler.baidu import haokan_search_videos
  16. from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler
  17. from coldStartTasks.crawler.toutiao import get_associated_recommendation
  18. from coldStartTasks.crawler.toutiao import get_toutiao_account_video_list
  19. from coldStartTasks.crawler.channels import search_in_wechat_channel
  20. from coldStartTasks.crawler.channels import get_channel_account_videos
  21. from config import apolloConfig, long_articles_config
  22. config = apolloConfig()
  23. recommend_cookie = config.getConfigValue("toutiao_detail_recommend_cookie")
  24. blogger_cookie = config.getConfigValue("toutiao_blogger_cookie")
  25. class CrawlerAccounts:
  26. def __init__(self):
  27. self.db_client = DatabaseConnector(db_config=long_articles_config)
  28. self.db_client.connect()
  29. def get_seed_keys(self)->list[dict]:
  30. """
  31. get search keys from database
  32. """
  33. fetch_query = "select title from article_pool_promotion_source where status = 1 and deleted = 0 order by level limit 100;"
  34. result = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  35. return result
  36. def insert_video_into_recommend_table(self, item: dict) -> None:
  37. # whether account exists
  38. final_item = scrape_account_entities_process(item, self.db_client)
  39. if not final_item:
  40. return
  41. else:
  42. # save to db
  43. insert_into_candidate_account_pool_table(
  44. db_client=self.db_client, account_item=final_item
  45. )
  46. def update_account_status(self, account_id_tuple: tuple, ori_status: int, new_status: int) -> int:
  47. update_query = f"""
  48. update video_association
  49. set status = %s
  50. where id in %s and status = %s;
  51. """
  52. affected_rows = self.db_client.save(query=update_query, params=(new_status, account_id_tuple, ori_status))
  53. return affected_rows
  54. class ChannelsAccountCrawler(CrawlerAccounts):
  55. """
  56. crawler channel accounts
  57. strategy:
  58. 1. try to get seed titles from database
  59. 2. try to get hot_points from web
  60. 2. use search api to get accounts
  61. """
  62. def process_channels_account(self, account_name: str, account_id: str, title_list_str: str):
  63. """
  64. process video item and save to database
  65. """
  66. account_item = Item()
  67. account_item.add("account_name", account_name)
  68. account_item.add("account_id", account_id)
  69. account_item.add("title_list", title_list_str)
  70. account_item.add(
  71. "crawler_date", datetime.datetime.today().strftime("%Y-%m-%d")
  72. )
  73. account_item.add("platform", "sph")
  74. # check item
  75. account_item.check(source="candidate_account")
  76. # save to db
  77. self.insert_video_into_recommend_table(account_item.item)
  78. def process_search_response(self, video: dict):
  79. """
  80. 通过搜索视频的账号名称去搜索账号,并且抓取该账号下的第一页视频
  81. """
  82. account_name = video["items"][0]["source"]["title"]
  83. # search account detail
  84. search_account_response = search_in_wechat_channel(
  85. search_key=account_name, search_type=2
  86. )
  87. account_detail = search_account_response["data"]["data"][0]["items"][0]
  88. account_id = account_detail["jumpInfo"]["userName"]
  89. # fetch account video list for the first page
  90. search_video_response = get_channel_account_videos(account_id)
  91. video_list = search_video_response["data"]["object"]
  92. title_list = [i['objectDesc']['description'] for i in video_list]
  93. title_list_str = json.dumps(title_list, ensure_ascii=False)
  94. self.process_channels_account(account_name, account_id, title_list_str)
  95. def search_video_in_channels(self, title: str) -> None:
  96. """
  97. search
  98. """
  99. search_response = search_in_wechat_channel(search_key=title, search_type=1)
  100. video_list = search_response["data"]["data"][0]["subBoxes"]
  101. for video in tqdm(video_list, desc="crawler each video"):
  102. try:
  103. self.process_search_response(video)
  104. except Exception as e:
  105. log(
  106. task="crawler_channels_account_videos",
  107. function="process_search_response",
  108. message="search by title failed",
  109. data={
  110. "video": video,
  111. "error": str(e),
  112. "traceback": traceback.format_exc()
  113. }
  114. )
  115. def deal(self):
  116. seed_title_list = self.get_seed_keys()
  117. for item in tqdm(seed_title_list, desc="crawler each title"):
  118. try:
  119. self.search_video_in_channels(title=item["title"])
  120. except Exception as e:
  121. log(
  122. task="crawler_channels_account_videos",
  123. function="search_video_in_channels",
  124. message="search video in channels failed",
  125. data={
  126. "title": item["title"],
  127. "error": str(e),
  128. "traceback": traceback.format_exc()
  129. }
  130. )
  131. class ToutiaoAccountCrawler(CrawlerAccounts):
  132. def get_seed_videos(self):
  133. fetch_query = f"""
  134. select out_account_name, article_title, url_unique_md5
  135. from publish_single_video_source
  136. where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0
  137. order by score desc limit 100;
  138. """
  139. seed_video_list = self.db_client.fetch(
  140. query=fetch_query, cursor_type=DictCursor
  141. )
  142. return seed_video_list
  143. def process_toutiao_account(self, video):
  144. # process video item and save to database
  145. account_item = Item()
  146. user_info = video["user_info"]
  147. account_item.add("account_name", user_info["name"])
  148. account_item.add("account_id", user_info["user_id"])
  149. account_item.add("platform", "toutiao")
  150. account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  151. # fetch account video first page video list
  152. fetch_response = get_toutiao_account_video_list(account_id=user_info["user_id"], cookie=blogger_cookie)
  153. video_list = fetch_response["data"]
  154. title_list = [i["title"] for i in video_list]
  155. title_list_str = json.dumps(title_list, ensure_ascii=False)
  156. account_item.add("title_list", title_list_str)
  157. # check item
  158. account_item.check(source="candidate_account")
  159. # insert into database
  160. self.insert_video_into_recommend_table(account_item.item)
  161. def get_recommend_video_list(self, seed_video: dict):
  162. # get recommend videos for each video
  163. seed_video_id = seed_video["url_unique_md5"]
  164. recommend_response = get_associated_recommendation(seed_video_id, recommend_cookie)
  165. recommend_video_list = recommend_response["data"]
  166. for video in tqdm(recommend_video_list):
  167. try:
  168. self.process_toutiao_account(video)
  169. except Exception as e:
  170. log(
  171. task="toutiao account crawler",
  172. function="process_toutiao_video",
  173. message="get recommend video failed",
  174. data={
  175. "video": video,
  176. "error": str(e),
  177. "traceback": traceback.format_exc()
  178. }
  179. )
  180. def get_category_recommend_list(self):
  181. """
  182. 品类推荐流几乎无视频,暂时不做
  183. """
  184. return NotImplementedError()
  185. def deal(self):
  186. # start
  187. seed_video_list = self.get_seed_videos()
  188. for seed_video in tqdm(seed_video_list, desc="get each video recommendation"):
  189. try:
  190. self.get_recommend_video_list(seed_video)
  191. except Exception as e:
  192. log(
  193. task="toutiao_recommendation_crawler",
  194. function="save_each_recommendation",
  195. message="save recommendation failed",
  196. data={
  197. "error": str(e),
  198. "traceback": traceback.format_exc(),
  199. "seed_video": seed_video,
  200. },
  201. )
  202. class HaoKanAccountCrawler(CrawlerAccounts):
  203. def process_haokan_video(self, video: dict) -> None:
  204. """
  205. process_haokan_video
  206. """
  207. account_item = Item()
  208. account_item.add("account_name", video['author'])
  209. account_item.add("account_id", video['author_id'])
  210. account_item.add("platform", "hksp")
  211. account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  212. # fetch account video first page video list
  213. fetch_response = baidu_account_video_crawler(account_id=video['author_id'])
  214. video_list = fetch_response["results"]
  215. title_list = [i["content"]["title"] for i in video_list]
  216. title_list_str = json.dumps(title_list, ensure_ascii=False)
  217. account_item.add("title_list", title_list_str)
  218. # check item
  219. account_item.check(source="candidate_account")
  220. # insert into database
  221. self.insert_video_into_recommend_table(account_item.item)
  222. def search_videos_in_haokan_video(self, title: str) -> None:
  223. """
  224. search_
  225. """
  226. search_response = haokan_search_videos(title)
  227. video_list = search_response["data"]["list"]
  228. for video in tqdm(video_list, desc="search videos"):
  229. try:
  230. self.process_haokan_video(video)
  231. except Exception as e:
  232. log(
  233. task="haokan_search_crawler",
  234. function="process_haokan_video",
  235. message="process haokan video failed",
  236. data={
  237. "video": video,
  238. "error": str(e),
  239. "traceback": traceback.format_exc()
  240. }
  241. )
  242. def deal(self):
  243. seed_title_list = self.get_seed_keys()
  244. for seed_title in tqdm(seed_title_list, desc="crawler each title"):
  245. try:
  246. self.search_videos_in_haokan_video(seed_title["title"])
  247. except Exception as e:
  248. log(
  249. task="haokan_search_crawler",
  250. function="search_videos_in_haokan_video",
  251. message="search videos in haokan video failed",
  252. data={
  253. "title": seed_title["title"],
  254. "error": str(e),
  255. "traceback": traceback.format_exc()
  256. }
  257. )