crawler_accounts_by_association.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. """
  2. @author: luojunhui
  3. """
  4. from __future__ import annotations
  5. import json
  6. import datetime
  7. import traceback
  8. from pymysql.cursors import DictCursor
  9. from tqdm import tqdm
  10. from applications import log
  11. from applications.db import DatabaseConnector
  12. from applications.pipeline import scrape_account_entities_process
  13. from applications.utils import Item
  14. from applications.utils import insert_into_candidate_account_pool_table
  15. from coldStartTasks.crawler.baidu import haokan_search_videos
  16. from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler
  17. from coldStartTasks.crawler.channels import search_in_wechat_channel
  18. from coldStartTasks.crawler.channels import get_channel_account_videos
  19. from coldStartTasks.crawler.toutiao import get_associated_recommendation
  20. from coldStartTasks.crawler.toutiao import get_toutiao_account_video_list
  21. from coldStartTasks.crawler.wechat import get_article_detail
  22. from coldStartTasks.crawler.wechat import get_article_list_from_account
  23. from coldStartTasks.crawler.wechat import get_source_account_from_article
  24. from config import apolloConfig, long_articles_config
  25. config = apolloConfig()
  26. recommend_cookie = config.getConfigValue("toutiao_detail_recommend_cookie")
  27. blogger_cookie = config.getConfigValue("toutiao_blogger_cookie")
  28. class CrawlerAccounts:
  29. def __init__(self):
  30. self.db_client = DatabaseConnector(db_config=long_articles_config)
  31. self.db_client.connect()
  32. def get_seed_keys(self) -> list[str]:
  33. """
  34. get search keys from database
  35. """
  36. fetch_query = f"""
  37. select association_title
  38. from `article_pool_promotion_source`
  39. where association_status = 2 order by association_update_timestamp desc limit 100;
  40. """
  41. fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  42. title_list = []
  43. for item in fetch_response:
  44. try:
  45. title_list += json.loads(item['association_title'])
  46. except Exception as e:
  47. print(e)
  48. continue
  49. return list(set(title_list))
  50. def insert_video_into_recommend_table(self, item: dict) -> None:
  51. # whether account exists
  52. final_item = scrape_account_entities_process(item, self.db_client)
  53. if not final_item:
  54. return
  55. else:
  56. # save to db
  57. insert_into_candidate_account_pool_table(
  58. db_client=self.db_client, account_item=final_item
  59. )
  60. def update_account_status(
  61. self, account_id_tuple: tuple, ori_status: int, new_status: int
  62. ) -> int:
  63. update_query = f"""
  64. update video_association
  65. set status = %s
  66. where id in %s and status = %s;
  67. """
  68. affected_rows = self.db_client.save(
  69. query=update_query, params=(new_status, account_id_tuple, ori_status)
  70. )
  71. return affected_rows
  72. class ChannelsAccountCrawler(CrawlerAccounts):
  73. """
  74. crawler channel accounts
  75. strategy:
  76. 1. try to get seed titles from database
  77. 2. try to get hot_points from web
  78. 2. use search api to get accounts
  79. """
  80. def process_channels_account(
  81. self, account_name: str, account_id: str, title_list_str: str
  82. ):
  83. """
  84. process video item and save to database
  85. """
  86. account_item = Item()
  87. account_item.add("account_name", account_name)
  88. account_item.add("account_id", account_id)
  89. account_item.add("title_list", title_list_str)
  90. account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  91. account_item.add("platform", "sph")
  92. # check item
  93. account_item.check(source="candidate_account")
  94. # save to db
  95. self.insert_video_into_recommend_table(account_item.item)
  96. def process_search_response(self, video: dict):
  97. """
  98. 通过搜索视频的账号名称去搜索账号,并且抓取该账号下的第一页视频
  99. """
  100. account_name = video["items"][0]["source"]["title"]
  101. # search account detail
  102. search_account_response = search_in_wechat_channel(
  103. search_key=account_name, search_type=2
  104. )
  105. account_detail = search_account_response["data"]["data"][0]["items"][0]
  106. account_id = account_detail["jumpInfo"]["userName"]
  107. # fetch account video list for the first page
  108. search_video_response = get_channel_account_videos(account_id)
  109. search_response_data = search_video_response["data"]
  110. search_response_data_type = type(search_response_data)
  111. if search_response_data_type == dict:
  112. video_list = search_response_data["object"]
  113. elif search_response_data_type == list:
  114. video_list = search_response_data
  115. else:
  116. raise RuntimeError("search video response type error")
  117. title_list = [i["objectDesc"]["description"] for i in video_list]
  118. title_list_str = json.dumps(title_list, ensure_ascii=False)
  119. self.process_channels_account(account_name, account_id, title_list_str)
  120. def search_video_in_channels(self, title: str) -> None:
  121. """
  122. search
  123. """
  124. search_response = search_in_wechat_channel(search_key=title, search_type=1)
  125. video_list = search_response["data"]["data"][0]["subBoxes"]
  126. for video in tqdm(video_list, desc="crawler each video"):
  127. try:
  128. self.process_search_response(video)
  129. except Exception as e:
  130. log(
  131. task="crawler_channels_account_videos",
  132. function="process_search_response",
  133. message="search by title failed",
  134. data={
  135. "video": video,
  136. "error": str(e),
  137. "traceback": traceback.format_exc(),
  138. },
  139. )
  140. def deal(self):
  141. seed_title_list = self.get_seed_keys()
  142. for seed_title in tqdm(seed_title_list, desc="crawler each title"):
  143. try:
  144. self.search_video_in_channels(title=seed_title)
  145. except Exception as e:
  146. log(
  147. task="crawler_channels_account_videos",
  148. function="search_video_in_channels",
  149. message="search video in channels failed",
  150. data={
  151. "title": seed_title,
  152. "error": str(e),
  153. "traceback": traceback.format_exc(),
  154. },
  155. )
  156. class ToutiaoAccountCrawler(CrawlerAccounts):
  157. def get_seed_videos(self):
  158. fetch_query = f"""
  159. select out_account_name, article_title, url_unique_md5
  160. from publish_single_video_source
  161. where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0
  162. order by score desc limit 100;
  163. """
  164. seed_video_list = self.db_client.fetch(
  165. query=fetch_query, cursor_type=DictCursor
  166. )
  167. return seed_video_list
  168. def get_level_up_videos(self):
  169. fetch_query = f"""
  170. select out_account_name, article_title, url_unique_md5
  171. from publish_single_video_source
  172. where platform = 'toutiao' and (
  173. article_title in (
  174. select distinct(title) from article_pool_promotion_source where status = 1 and deleted = 0
  175. )
  176. or flow_pool_level < 4
  177. );
  178. """
  179. uplevel_video_list = self.db_client.fetch(
  180. query=fetch_query, cursor_type=DictCursor
  181. )
  182. return uplevel_video_list
  183. def process_toutiao_account(self, video):
  184. # process video item and save to database
  185. account_item = Item()
  186. user_info = video["user_info"]
  187. account_item.add("account_name", user_info["name"])
  188. account_item.add("account_id", user_info["user_id"])
  189. account_item.add("platform", "toutiao")
  190. account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  191. # fetch account video first page video list
  192. fetch_response = get_toutiao_account_video_list(
  193. account_id=user_info["user_id"], cookie=blogger_cookie
  194. )
  195. video_list = fetch_response["data"]
  196. title_list = [i["title"] for i in video_list]
  197. title_list_str = json.dumps(title_list, ensure_ascii=False)
  198. account_item.add("title_list", title_list_str)
  199. # check item
  200. account_item.check(source="candidate_account")
  201. # insert into database
  202. self.insert_video_into_recommend_table(account_item.item)
  203. def get_recommend_video_list(self, seed_video: dict):
  204. # get recommend videos for each video
  205. seed_video_id = seed_video["url_unique_md5"]
  206. recommend_response = get_associated_recommendation(
  207. seed_video_id, recommend_cookie
  208. )
  209. recommend_video_list = recommend_response["data"]
  210. for video in tqdm(recommend_video_list):
  211. try:
  212. self.process_toutiao_account(video)
  213. except Exception as e:
  214. log(
  215. task="toutiao account crawler",
  216. function="process_toutiao_video",
  217. message="get recommend video failed",
  218. data={
  219. "video": video,
  220. "error": str(e),
  221. "traceback": traceback.format_exc(),
  222. },
  223. )
  224. def get_category_recommend_list(self):
  225. """
  226. 品类推荐流几乎无视频,暂时不做
  227. """
  228. return NotImplementedError()
  229. def deal(self):
  230. # start
  231. # seed_video_list = self.get_seed_videos()
  232. seed_video_list = self.get_level_up_videos()
  233. for seed_video in tqdm(seed_video_list, desc="get each video recommendation"):
  234. try:
  235. self.get_recommend_video_list(seed_video)
  236. except Exception as e:
  237. log(
  238. task="toutiao_recommendation_crawler",
  239. function="save_each_recommendation",
  240. message="save recommendation failed",
  241. data={
  242. "error": str(e),
  243. "traceback": traceback.format_exc(),
  244. "seed_video": seed_video,
  245. },
  246. )
  247. class HaoKanAccountCrawler(CrawlerAccounts):
  248. def process_haokan_video(self, video: dict) -> None:
  249. """
  250. process_haokan_video
  251. """
  252. account_item = Item()
  253. account_item.add("account_name", video["author"])
  254. account_item.add("account_id", video["author_id"])
  255. account_item.add("platform", "hksp")
  256. account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  257. # fetch account video first page video list
  258. fetch_response = baidu_account_video_crawler(account_id=video["author_id"])
  259. video_list = fetch_response["results"]
  260. title_list = [i["content"]["title"] for i in video_list]
  261. title_list_str = json.dumps(title_list, ensure_ascii=False)
  262. account_item.add("title_list", title_list_str)
  263. # check item
  264. account_item.check(source="candidate_account")
  265. # insert into database
  266. self.insert_video_into_recommend_table(account_item.item)
  267. def search_videos_in_haokan_video(self, title: str) -> None:
  268. """
  269. search_
  270. """
  271. search_response = haokan_search_videos(title)
  272. video_list = search_response["data"]["list"]
  273. for video in tqdm(video_list, desc="search videos"):
  274. try:
  275. self.process_haokan_video(video)
  276. except Exception as e:
  277. log(
  278. task="haokan_search_crawler",
  279. function="process_haokan_video",
  280. message="process haokan video failed",
  281. data={
  282. "video": video,
  283. "error": str(e),
  284. "traceback": traceback.format_exc(),
  285. },
  286. )
  287. def deal(self):
  288. seed_title_list = self.get_seed_keys()
  289. for seed_title in tqdm(seed_title_list, desc="crawler each title"):
  290. try:
  291. self.search_videos_in_haokan_video(seed_title)
  292. except Exception as e:
  293. log(
  294. task="haokan_search_crawler",
  295. function="search_videos_in_haokan_video",
  296. message="search videos in haokan video failed",
  297. data={
  298. "title": seed_title,
  299. "error": str(e),
  300. "traceback": traceback.format_exc(),
  301. },
  302. )
  303. class GzhAccountCrawler(CrawlerAccounts):
  304. def get_task_list(self):
  305. fetch_query = f"""
  306. select id, article_url
  307. from publish_single_video_source
  308. where source_account = 1 and platform = 'gzh' limit 10;
  309. """
  310. task_list = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  311. return task_list
  312. def process_official_account(self, account_name, account_id):
  313. """
  314. process_official_account
  315. """
  316. account_item = Item()
  317. account_item.add("account_name", account_name)
  318. account_item.add("account_id", account_id)
  319. account_item.add("platform", "gzh")
  320. account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  321. # fetch account video first page video list
  322. fetch_response = get_article_list_from_account(account_id=account_id, index=None)
  323. msg_list = fetch_response["data"]["data"]
  324. title_list = []
  325. for msg in msg_list:
  326. sub_title_list = [i['Title'] for i in msg['AppMsg']['DetailInfo']]
  327. if len(title_list) > 10:
  328. continue
  329. else:
  330. title_list += sub_title_list
  331. title_list_str = json.dumps(title_list, ensure_ascii=False)
  332. account_item.add("title_list", title_list_str)
  333. # check item
  334. account_item.check(source="candidate_account")
  335. # insert into database
  336. self.insert_video_into_recommend_table(account_item.item)
  337. def extract_account_from_article_link(self, article_link):
  338. """
  339. try to get account info from article link
  340. """
  341. # is article link original
  342. article_detail = get_article_detail(article_link)
  343. is_original = article_detail["data"]["data"]["is_original"]
  344. if is_original:
  345. return
  346. # extract source account
  347. source_account = get_source_account_from_article(article_link)
  348. if not source_account:
  349. return
  350. else:
  351. account_name = source_account['name']
  352. gh_id = source_account['gh_id']
  353. self.process_official_account(account_name, gh_id)
  354. def update_crawler_article_status(self, article_id_tuple: tuple):
  355. """
  356. update crawler article status
  357. """
  358. update_query = f"""
  359. update publish_single_video_source
  360. set source_account = %s
  361. where id in %s;
  362. """
  363. affected_rows = self.db_client.save(
  364. query=update_query, params=(0, article_id_tuple)
  365. )
  366. return affected_rows
  367. def deal(self):
  368. task_list = self.get_task_list()
  369. task_id_list = []
  370. for crawler_article_obj in tqdm(task_list, desc="crawler article list"):
  371. article_url = crawler_article_obj['article_url']
  372. article_id = crawler_article_obj['id']
  373. task_id_list.append(int(article_id))
  374. try:
  375. self.extract_account_from_article_link(article_url)
  376. except Exception as e:
  377. log(
  378. task="gzh_account_crawler",
  379. function="extract_account_from_article_link",
  380. message="extract account from article link failed",
  381. data={
  382. "article_url": article_url,
  383. "error": str(e),
  384. "traceback": traceback.format_exc(),
  385. },
  386. )
  387. if task_id_list:
  388. article_id_tuple = tuple(task_id_list)
  389. affected_rows = self.update_crawler_article_status(article_id_tuple)
  390. print(affected_rows)