toutiao_recommend_crawler.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import traceback
  7. from typing import Dict
  8. import requests
  9. from tqdm import tqdm
  10. from pymysql.cursors import DictCursor
  11. from applications import bot
  12. from applications import log
  13. from applications import Functions
  14. from applications.db import DatabaseConnector
  15. from config import long_articles_config
  16. from coldStartTasks.filter import article_crawler_duplicate_filter
  17. functions = Functions()
  18. class ToutiaoRecommendCrawler(object):
  19. """
  20. 今日头条推荐流
  21. """
  22. def __init__(self) -> None:
  23. self.db_client = None
  24. def init_database(self) -> None:
  25. """
  26. 初始化数据库
  27. :return:
  28. """
  29. try:
  30. self.db_client = DatabaseConnector(db_config=long_articles_config)
  31. self.db_client.connect()
  32. except Exception as e:
  33. bot(
  34. title="今日头条推荐流文章抓取任务数据库连接失败",
  35. detail={
  36. "error": str(e),
  37. "error_stack": traceback.format_exc()
  38. }
  39. )
  40. def get_request_params(self, category) -> Dict:
  41. """
  42. 获取请求参数
  43. :return:
  44. """
  45. select_sql = f"""
  46. SELECT request_method, request_url, request_headers, post_data
  47. FROM toutiao_request_params
  48. WHERE category = '{category}' and expire_flag = 0
  49. ORDER BY id
  50. LIMIT 1;
  51. """
  52. result = self.db_client.fetch(
  53. query=select_sql,
  54. cursor_type=DictCursor
  55. )
  56. if not result:
  57. print("cookie没了报警")
  58. return {}
  59. else:
  60. return result[0]
  61. def get_recommendation_article_list(self, category) -> Dict:
  62. """
  63. 获取历史推荐流文章
  64. :return:
  65. """
  66. cookie_obj = self.get_request_params(category)
  67. if not cookie_obj:
  68. return {}
  69. response = requests.request(
  70. method=cookie_obj['request_method'],
  71. url=cookie_obj['request_url'],
  72. headers=json.loads(cookie_obj['request_headers']),
  73. proxies=functions.proxy()
  74. )
  75. if response.text is None:
  76. print("{}: cookie 失效".format(category))
  77. return response.json()
  78. def insert_each_article(self, category, item: Dict) -> None:
  79. """
  80. 提取文章信息
  81. :param item
  82. :param category
  83. :return:
  84. """
  85. title = item['title']
  86. if article_crawler_duplicate_filter(new_article_title=title, db_client=self.db_client):
  87. log(
  88. function='toutiao_recommend_crawler',
  89. task='toutiao_recommend',
  90. message='标题去重'
  91. )
  92. return
  93. item_id = item.get('item_id')
  94. article_url = item['article_url']
  95. like_count = item['like_count']
  96. read_count = item['read_count']
  97. user_info = item['user_info']
  98. user_id = user_info.get('user_id')
  99. abstract = item['Abstract']
  100. publish_time = item['publish_time']
  101. insert_sql = f"""
  102. INSERT IGNORE INTO crawler_meta_article
  103. (platform, mode, category, out_account_id, title, link, read_cnt, like_cnt, status, description, publish_time, crawler_time, unique_index)
  104. VALUES
  105. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  106. """
  107. self.db_client.save(
  108. query=insert_sql,
  109. params=(
  110. "toutiao",
  111. "recommend",
  112. category,
  113. user_id,
  114. title,
  115. article_url,
  116. read_count,
  117. like_count,
  118. 1,
  119. abstract,
  120. publish_time,
  121. int(time.time()),
  122. item_id
  123. )
  124. )
  125. def process_recommendation(self, category, recommendation):
  126. """
  127. 处理推荐流文章
  128. :param recommendation
  129. :param category
  130. :return:
  131. """
  132. for item in tqdm(recommendation['data']):
  133. if item.get('article_url'):
  134. video_flag = item.get('has_video')
  135. if not video_flag:
  136. try:
  137. self.insert_each_article(category=category, item=item)
  138. except Exception as e:
  139. error_data = {
  140. "error": str(e),
  141. "error_stack": traceback.format_exc()
  142. }
  143. log(
  144. function='toutiao_recommend_crawler',
  145. task='toutiao_recommend',
  146. message='头条推荐流文章插入失败',
  147. data=error_data,
  148. status='fail'
  149. )
  150. else:
  151. print("视频文章跳过")
  152. else:
  153. print("无链接文章跳过")
  154. def run(self, category) -> None:
  155. """
  156. 主函数
  157. :return:
  158. """
  159. for i in range(10):
  160. try:
  161. article_list = self.get_recommendation_article_list(category=category)
  162. self.process_recommendation(category=category, recommendation=article_list)
  163. time.sleep(3)
  164. except Exception as e:
  165. error_data = {
  166. "error": str(e),
  167. "error_stack": traceback.format_exc()
  168. }
  169. print(error_data)