crawler_accounts_by_association.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. """
  2. @author: luojunhui
  3. """
  4. from __future__ import annotations
  5. import json
  6. import datetime
  7. import traceback
  8. from pymysql.cursors import DictCursor
  9. from tqdm import tqdm
  10. from applications import log
  11. from applications.db import DatabaseConnector
  12. from applications.pipeline import scrape_account_entities_process
  13. from applications.utils import Item
  14. from applications.utils import insert_into_candidate_account_pool_table
  15. from coldStartTasks.crawler.baidu import haokan_search_videos
  16. from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler
  17. from coldStartTasks.crawler.toutiao import get_associated_recommendation
  18. from coldStartTasks.crawler.toutiao import get_toutiao_account_video_list
  19. from coldStartTasks.crawler.channels import search_in_wechat_channel
  20. from coldStartTasks.crawler.channels import get_channel_account_videos
  21. from config import apolloConfig, long_articles_config
  22. config = apolloConfig()
  23. recommend_cookie = config.getConfigValue("toutiao_detail_recommend_cookie")
  24. blogger_cookie = config.getConfigValue("toutiao_blogger_cookie")
  25. with open("result2.json", encoding="utf-8") as f:
  26. temp_dict = json.loads(f.read())
  27. class CrawlerAccounts:
  28. def __init__(self):
  29. self.db_client = DatabaseConnector(db_config=long_articles_config)
  30. self.db_client.connect()
  31. def get_seed_keys(self) -> list[dict]:
  32. """
  33. get search keys from database
  34. """
  35. fetch_query = f"""
  36. select distinct title
  37. from `article_pool_promotion_source`
  38. WHERE `level` = 'autoArticlePoolLevel1' and status = 1 and `deleted` = 0 order by `create_timestamp` desc limit 1000;
  39. """
  40. fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  41. result = []
  42. for row in fetch_response:
  43. if temp_dict.get(row["title"]):
  44. title_obj_list = [{"title": i} for i in temp_dict[row["title"]]]
  45. result += title_obj_list
  46. return result
  47. def insert_video_into_recommend_table(self, item: dict) -> None:
  48. # whether account exists
  49. final_item = scrape_account_entities_process(item, self.db_client)
  50. if not final_item:
  51. return
  52. else:
  53. # save to db
  54. insert_into_candidate_account_pool_table(
  55. db_client=self.db_client, account_item=final_item
  56. )
  57. def update_account_status(
  58. self, account_id_tuple: tuple, ori_status: int, new_status: int
  59. ) -> int:
  60. update_query = f"""
  61. update video_association
  62. set status = %s
  63. where id in %s and status = %s;
  64. """
  65. affected_rows = self.db_client.save(
  66. query=update_query, params=(new_status, account_id_tuple, ori_status)
  67. )
  68. return affected_rows
  69. class ChannelsAccountCrawler(CrawlerAccounts):
  70. """
  71. crawler channel accounts
  72. strategy:
  73. 1. try to get seed titles from database
  74. 2. try to get hot_points from web
  75. 2. use search api to get accounts
  76. """
  77. def process_channels_account(
  78. self, account_name: str, account_id: str, title_list_str: str
  79. ):
  80. """
  81. process video item and save to database
  82. """
  83. account_item = Item()
  84. account_item.add("account_name", account_name)
  85. account_item.add("account_id", account_id)
  86. account_item.add("title_list", title_list_str)
  87. account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  88. account_item.add("platform", "sph")
  89. # check item
  90. account_item.check(source="candidate_account")
  91. # save to db
  92. self.insert_video_into_recommend_table(account_item.item)
  93. def process_search_response(self, video: dict):
  94. """
  95. 通过搜索视频的账号名称去搜索账号,并且抓取该账号下的第一页视频
  96. """
  97. account_name = video["items"][0]["source"]["title"]
  98. # search account detail
  99. search_account_response = search_in_wechat_channel(
  100. search_key=account_name, search_type=2
  101. )
  102. account_detail = search_account_response["data"]["data"][0]["items"][0]
  103. account_id = account_detail["jumpInfo"]["userName"]
  104. # fetch account video list for the first page
  105. search_video_response = get_channel_account_videos(account_id)
  106. search_response_data = search_video_response["data"]
  107. search_response_data_type = type(search_response_data)
  108. if search_response_data_type == dict:
  109. video_list = search_response_data["object"]
  110. elif search_response_data_type == list:
  111. video_list = search_response_data
  112. else:
  113. raise RuntimeError("search video response type error")
  114. title_list = [i["objectDesc"]["description"] for i in video_list]
  115. title_list_str = json.dumps(title_list, ensure_ascii=False)
  116. self.process_channels_account(account_name, account_id, title_list_str)
  117. def search_video_in_channels(self, title: str) -> None:
  118. """
  119. search
  120. """
  121. search_response = search_in_wechat_channel(search_key=title, search_type=1)
  122. video_list = search_response["data"]["data"][0]["subBoxes"]
  123. for video in tqdm(video_list, desc="crawler each video"):
  124. try:
  125. self.process_search_response(video)
  126. except Exception as e:
  127. log(
  128. task="crawler_channels_account_videos",
  129. function="process_search_response",
  130. message="search by title failed",
  131. data={
  132. "video": video,
  133. "error": str(e),
  134. "traceback": traceback.format_exc(),
  135. },
  136. )
  137. def deal(self):
  138. seed_title_list = self.get_seed_keys()
  139. for item in tqdm(seed_title_list, desc="crawler each title"):
  140. try:
  141. self.search_video_in_channels(title=item["title"])
  142. except Exception as e:
  143. log(
  144. task="crawler_channels_account_videos",
  145. function="search_video_in_channels",
  146. message="search video in channels failed",
  147. data={
  148. "title": item["title"],
  149. "error": str(e),
  150. "traceback": traceback.format_exc(),
  151. },
  152. )
  153. class ToutiaoAccountCrawler(CrawlerAccounts):
  154. def get_seed_videos(self):
  155. fetch_query = f"""
  156. select out_account_name, article_title, url_unique_md5
  157. from publish_single_video_source
  158. where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0
  159. order by score desc limit 100;
  160. """
  161. seed_video_list = self.db_client.fetch(
  162. query=fetch_query, cursor_type=DictCursor
  163. )
  164. return seed_video_list
  165. def get_level_up_videos(self):
  166. fetch_query = f"""
  167. select out_account_name, article_title, url_unique_md5
  168. from publish_single_video_source
  169. where platform = 'toutiao' and (
  170. article_title in (
  171. select distinct(title) from article_pool_promotion_source where status = 1 and deleted = 0
  172. )
  173. or flow_pool_level < 4
  174. );
  175. """
  176. uplevel_video_list = self.db_client.fetch(
  177. query=fetch_query, cursor_type=DictCursor
  178. )
  179. return uplevel_video_list
  180. def process_toutiao_account(self, video):
  181. # process video item and save to database
  182. account_item = Item()
  183. user_info = video["user_info"]
  184. account_item.add("account_name", user_info["name"])
  185. account_item.add("account_id", user_info["user_id"])
  186. account_item.add("platform", "toutiao")
  187. account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  188. # fetch account video first page video list
  189. fetch_response = get_toutiao_account_video_list(
  190. account_id=user_info["user_id"], cookie=blogger_cookie
  191. )
  192. video_list = fetch_response["data"]
  193. title_list = [i["title"] for i in video_list]
  194. title_list_str = json.dumps(title_list, ensure_ascii=False)
  195. account_item.add("title_list", title_list_str)
  196. # check item
  197. account_item.check(source="candidate_account")
  198. # insert into database
  199. self.insert_video_into_recommend_table(account_item.item)
  200. def get_recommend_video_list(self, seed_video: dict):
  201. # get recommend videos for each video
  202. seed_video_id = seed_video["url_unique_md5"]
  203. recommend_response = get_associated_recommendation(
  204. seed_video_id, recommend_cookie
  205. )
  206. recommend_video_list = recommend_response["data"]
  207. for video in tqdm(recommend_video_list):
  208. try:
  209. self.process_toutiao_account(video)
  210. except Exception as e:
  211. log(
  212. task="toutiao account crawler",
  213. function="process_toutiao_video",
  214. message="get recommend video failed",
  215. data={
  216. "video": video,
  217. "error": str(e),
  218. "traceback": traceback.format_exc(),
  219. },
  220. )
  221. def get_category_recommend_list(self):
  222. """
  223. 品类推荐流几乎无视频,暂时不做
  224. """
  225. return NotImplementedError()
  226. def deal(self):
  227. # start
  228. # seed_video_list = self.get_seed_videos()
  229. seed_video_list = self.get_level_up_videos()
  230. for seed_video in tqdm(seed_video_list, desc="get each video recommendation"):
  231. try:
  232. self.get_recommend_video_list(seed_video)
  233. except Exception as e:
  234. log(
  235. task="toutiao_recommendation_crawler",
  236. function="save_each_recommendation",
  237. message="save recommendation failed",
  238. data={
  239. "error": str(e),
  240. "traceback": traceback.format_exc(),
  241. "seed_video": seed_video,
  242. },
  243. )
  244. class HaoKanAccountCrawler(CrawlerAccounts):
  245. def process_haokan_video(self, video: dict) -> None:
  246. """
  247. process_haokan_video
  248. """
  249. account_item = Item()
  250. account_item.add("account_name", video["author"])
  251. account_item.add("account_id", video["author_id"])
  252. account_item.add("platform", "hksp")
  253. account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  254. # fetch account video first page video list
  255. fetch_response = baidu_account_video_crawler(account_id=video["author_id"])
  256. video_list = fetch_response["results"]
  257. title_list = [i["content"]["title"] for i in video_list]
  258. title_list_str = json.dumps(title_list, ensure_ascii=False)
  259. account_item.add("title_list", title_list_str)
  260. # check item
  261. account_item.check(source="candidate_account")
  262. # insert into database
  263. self.insert_video_into_recommend_table(account_item.item)
  264. def search_videos_in_haokan_video(self, title: str) -> None:
  265. """
  266. search_
  267. """
  268. search_response = haokan_search_videos(title)
  269. video_list = search_response["data"]["list"]
  270. for video in tqdm(video_list, desc="search videos"):
  271. try:
  272. self.process_haokan_video(video)
  273. except Exception as e:
  274. log(
  275. task="haokan_search_crawler",
  276. function="process_haokan_video",
  277. message="process haokan video failed",
  278. data={
  279. "video": video,
  280. "error": str(e),
  281. "traceback": traceback.format_exc(),
  282. },
  283. )
  284. def deal(self):
  285. seed_title_list = self.get_seed_keys()
  286. for seed_title in tqdm(seed_title_list, desc="crawler each title"):
  287. try:
  288. self.search_videos_in_haokan_video(seed_title["title"])
  289. except Exception as e:
  290. log(
  291. task="haokan_search_crawler",
  292. function="search_videos_in_haokan_video",
  293. message="search videos in haokan video failed",
  294. data={
  295. "title": seed_title["title"],
  296. "error": str(e),
  297. "traceback": traceback.format_exc(),
  298. },
  299. )