crawler_accounts.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """
  2. @author: luojunhui
  3. """
  4. from __future__ import annotations
  5. import json
  6. import time
  7. import datetime
  8. import traceback
  9. from pymysql.cursors import DictCursor
  10. from tqdm import tqdm
  11. from applications import log
  12. from applications.db import DatabaseConnector
  13. from applications.pipeline import scrape_account_entities_process
  14. from applications.utils import Item
  15. from applications.utils import insert_into_associated_recommendation_table
  16. from coldStartTasks.crawler.toutiao import get_associated_recommendation
  17. from config import apolloConfig, long_articles_config
  18. config = apolloConfig()
  19. cookie = config.getConfigValue("toutiao_detail_recommend_cookie")
  20. class CrawlerAccounts:
  21. def __init__(self):
  22. self.db_client = DatabaseConnector(db_config=long_articles_config)
  23. self.db_client.connect()
  24. class ChannelAccountCrawler(CrawlerAccounts):
  25. """
  26. crawler channel accounts
  27. strategy:
  28. 1. try to get search keys and titles from database
  29. 2. try to get hot_points from web
  30. 2. use search api to get accounts
  31. """
  32. def get_seed_keys(self):
  33. """
  34. get search keys from database
  35. """
  36. sql = "select * from datastat_sort_strategy limit 100;"
  37. result = self.db_client.fetch(sql)
  38. return result
  39. class ToutiaoAccountCrawler(CrawlerAccounts):
  40. def get_seed_videos(self):
  41. fetch_query = f"""
  42. select out_account_name, article_title, url_unique_md5
  43. from publish_single_video_source
  44. where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0
  45. order by score desc limit 100;
  46. """
  47. seed_video_list = self.db_client.fetch(
  48. query=fetch_query, cursor_type=DictCursor
  49. )
  50. return seed_video_list
  51. def process_each_video(self, video, seed_account_name, seed_title):
  52. # process video item and save to database
  53. video_item = Item()
  54. user_info = video["user_info"]
  55. video_item.add("account_name", user_info["name"])
  56. video_item.add("account_id", user_info["user_id"])
  57. video_item.add("platform", "toutiao")
  58. video_item.add("recommend_video_id", video["id"])
  59. video_item.add("title", video["title"])
  60. video_item.add("read_cnt", video["read_count"])
  61. video_item.add("duration", video["video_duration"])
  62. video_item.add("seed_account", seed_account_name)
  63. video_item.add("seed_title", seed_title)
  64. video_item.add("recommend_date", datetime.datetime.today().strftime("%Y-%m-%d"))
  65. # check item
  66. video_item.check(source="association")
  67. # whether account exists
  68. final_item = scrape_account_entities_process(video_item.item, self.db_client)
  69. if not final_item:
  70. return
  71. else:
  72. # save to db
  73. insert_into_associated_recommendation_table(db_client=self.db_client, associated_recommendation_item=final_item)
  74. def get_recommend_video_list(self, seed_video: dict):
  75. # get recommend videos for each video
  76. seed_video_id = seed_video["url_unique_md5"]
  77. seed_account_name = seed_video["out_account_name"]
  78. seed_title = seed_video["article_title"]
  79. recommend_response = get_associated_recommendation(seed_video_id, cookie)
  80. recommend_video_list = recommend_response["data"]
  81. for video in tqdm(recommend_video_list):
  82. self.process_each_video(video, seed_account_name, seed_title)
  83. def deal(self):
  84. # start
  85. seed_video_list = self.get_seed_videos()
  86. for seed_video in tqdm(seed_video_list, desc="get each video recommendation"):
  87. try:
  88. self.get_recommend_video_list(seed_video)
  89. except Exception as e:
  90. log(
  91. task="{}_recommendation_crawler".format(seed_video["platform"]),
  92. function="save_each_recommendation",
  93. message="save recommendation failed",
  94. data={
  95. "error": str(e),
  96. "traceback": traceback.format_exc(),
  97. "seed_video": seed_video,
  98. },
  99. )