toutiao_recommend_crawler.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import traceback
  7. from typing import Dict
  8. import requests
  9. from tqdm import tqdm
  10. from pymysql.cursors import DictCursor
  11. from applications import bot
  12. from applications import log
  13. from applications import Functions
  14. from applications.db import DatabaseConnector
  15. from config import long_articles_config
  16. functions = Functions()
  17. class ToutiaoRecommendCrawler(object):
  18. """
  19. 今日头条推荐流
  20. """
  21. def __init__(self) -> None:
  22. self.db_client = None
  23. def init_database(self) -> None:
  24. """
  25. 初始化数据库
  26. :return:
  27. """
  28. try:
  29. self.db_client = DatabaseConnector(db_config=long_articles_config)
  30. self.db_client.connect()
  31. except Exception as e:
  32. bot(
  33. title="今日头条推荐流文章抓取任务数据库连接失败",
  34. detail={
  35. "error": str(e),
  36. "error_stack": traceback.format_exc()
  37. }
  38. )
  39. def get_history_recommendation(self) -> Dict:
  40. """
  41. 获取历史推荐流文章
  42. :return:
  43. """
  44. select_sql = f"""
  45. SELECT request_method, request_url, request_headers, post_data
  46. FROM toutiao_request_params
  47. WHERE category = 'history' and expire_flag = 0
  48. ORDER BY id
  49. LIMIT 1;
  50. """
  51. result = self.db_client.fetch(
  52. query=select_sql,
  53. cursor_type=DictCursor
  54. )
  55. if not result:
  56. print("cookie没了报警")
  57. return {}
  58. cookie_obj = result[0]
  59. response = requests.request(
  60. method=cookie_obj['request_method'],
  61. url=cookie_obj['request_url'],
  62. headers=json.loads(cookie_obj['request_headers']),
  63. proxies=functions.proxy()
  64. )
  65. return response.json()
  66. def get_tech_recommendation(self) -> Dict:
  67. """
  68. 获取科技推荐流文章
  69. :return:
  70. """
  71. return
  72. def insert_each_article(self, item: Dict) -> Dict:
  73. """
  74. 提取文章信息
  75. :param article_info:
  76. :return:
  77. """
  78. item_id = item.get('item_id')
  79. article_url = item['article_url']
  80. like_count = item['like_count']
  81. read_count = item['read_count']
  82. title = item['title']
  83. user_info = item['user_info']
  84. user_id = user_info.get('user_id')
  85. abstract = item['Abstract']
  86. publish_time = item['publish_time']
  87. insert_sql = f"""
  88. INSERT IGNORE INTO crawler_meta_article
  89. (platform, mode, category, out_account_id, title, link, read_cnt, like_cnt, status, description, publish_time, crawler_time, unique_index)
  90. VALUES
  91. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  92. """
  93. self.db_client.save(
  94. query=insert_sql,
  95. params=(
  96. "toutiao",
  97. "recommend",
  98. "history",
  99. user_id,
  100. title,
  101. article_url,
  102. read_count,
  103. like_count,
  104. 1,
  105. abstract,
  106. publish_time,
  107. int(time.time()),
  108. item_id
  109. )
  110. )
  111. def process_recommendation(self, recommendation) -> Dict:
  112. """
  113. 处理推荐流文章
  114. :param recommendation:
  115. :return:
  116. """
  117. for item in tqdm(recommendation['data']):
  118. if item.get('article_url'):
  119. video_flag = item.get('has_video')
  120. if not video_flag:
  121. try:
  122. self.insert_each_article(item)
  123. except Exception as e:
  124. error_data = {
  125. "error": str(e),
  126. "error_stack": traceback.format_exc()
  127. }
  128. log(
  129. task='toutiao_recommend',
  130. message='头条推荐流文章插入失败',
  131. data=error_data,
  132. status='fail'
  133. )
  134. else:
  135. print("视频文章跳过")
  136. else:
  137. print("无链接文章跳过")
  138. def run(self) -> None:
  139. """
  140. 主函数
  141. :return:
  142. """
  143. for i in range(10):
  144. try:
  145. article_list = self.get_history_recommendation()
  146. self.process_recommendation(article_list)
  147. time.sleep(3)
  148. except Exception as e:
  149. error_data = {
  150. "error": str(e),
  151. "error_stack": traceback.format_exc()
  152. }
  153. print(error_data)