| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 | 
							- """
 
- @author: luojunhui
 
- """
 
- import json
 
- import time
 
- import traceback
 
- from typing import Dict
 
- import requests
 
- from tqdm import tqdm
 
- from pymysql.cursors import DictCursor
 
- from applications import bot
 
- from applications import log
 
- from applications import Functions
 
- from applications.db import DatabaseConnector
 
- from config import long_articles_config
 
- from coldStartTasks.filter import article_crawler_duplicate_filter
 
- functions = Functions()
 
- class ToutiaoRecommendCrawler(object):
 
-     """
 
-     今日头条推荐流
 
-     """
 
-     def __init__(self) -> None:
 
-         self.db_client = None
 
-     def init_database(self) -> None:
 
-         """
 
-         初始化数据库
 
-         :return:
 
-         """
 
-         try:
 
-             self.db_client = DatabaseConnector(db_config=long_articles_config)
 
-             self.db_client.connect()
 
-         except Exception as e:
 
-             bot(
 
-                 title="今日头条推荐流文章抓取任务数据库连接失败",
 
-                 detail={
 
-                     "error": str(e),
 
-                     "error_stack": traceback.format_exc()
 
-                 }
 
-             )
 
-     def get_request_params(self, category) -> Dict:
 
-         """
 
-         获取请求参数
 
-         :return:
 
-         """
 
-         select_sql = f"""
 
-                     SELECT request_method, request_url, request_headers, post_data
 
-                     FROM toutiao_request_params
 
-                     WHERE category = '{category}' and expire_flag = 0
 
-                     ORDER BY id
 
-                     LIMIT 1;
 
-                 """
 
-         result = self.db_client.fetch(
 
-             query=select_sql,
 
-             cursor_type=DictCursor
 
-         )
 
-         if not result:
 
-             print("cookie没了报警")
 
-             return {}
 
-         else:
 
-             return result[0]
 
-     def get_recommendation_article_list(self, category) -> Dict:
 
-         """
 
-         获取历史推荐流文章
 
-         :return:
 
-         """
 
-         cookie_obj = self.get_request_params(category)
 
-         if not cookie_obj:
 
-             return {}
 
-         response = requests.request(
 
-             method=cookie_obj['request_method'],
 
-             url=cookie_obj['request_url'],
 
-             headers=json.loads(cookie_obj['request_headers']),
 
-             proxies=functions.proxy()
 
-         )
 
-         if response.text is None:
 
-             print("{}: cookie  失效".format(category))
 
-         return response.json()
 
-     def insert_each_article(self, category, item: Dict) -> None:
 
-         """
 
-         提取文章信息
 
-         :param item
 
-         :param category
 
-         :return:
 
-         """
 
-         title = item['title']
 
-         if article_crawler_duplicate_filter(new_article_title=title, db_client=self.db_client):
 
-             log(
 
-                 function='toutiao_recommend_crawler',
 
-                 task='toutiao_recommend',
 
-                 message='标题去重'
 
-             )
 
-             return
 
-         item_id = item.get('item_id')
 
-         article_url = item['article_url']
 
-         like_count = item['like_count']
 
-         read_count = item['read_count']
 
-         user_info = item['user_info']
 
-         user_id = user_info.get('user_id')
 
-         abstract = item['Abstract']
 
-         publish_time = item['publish_time']
 
-         insert_sql = f"""
 
-             INSERT IGNORE INTO crawler_meta_article
 
-             (platform, mode, category, out_account_id, title, link, read_cnt, like_cnt, status, description, publish_time, crawler_time, unique_index)
 
-             VALUES
 
-             (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
 
-         """
 
-         self.db_client.save(
 
-             query=insert_sql,
 
-             params=(
 
-                 "toutiao",
 
-                 "recommend",
 
-                 category,
 
-                 user_id,
 
-                 title,
 
-                 article_url,
 
-                 read_count,
 
-                 like_count,
 
-                 1,
 
-                 abstract,
 
-                 publish_time,
 
-                 int(time.time()),
 
-                 item_id
 
-             )
 
-         )
 
-     def process_recommendation(self, category, recommendation):
 
-         """
 
-         处理推荐流文章
 
-         :param recommendation
 
-         :param category
 
-         :return:
 
-         """
 
-         for item in tqdm(recommendation['data']):
 
-             if item.get('article_url'):
 
-                 video_flag = item.get('has_video')
 
-                 if not video_flag:
 
-                     try:
 
-                         self.insert_each_article(category=category, item=item)
 
-                     except Exception as e:
 
-                         error_data = {
 
-                             "error": str(e),
 
-                             "error_stack": traceback.format_exc()
 
-                         }
 
-                         log(
 
-                             function='toutiao_recommend_crawler',
 
-                             task='toutiao_recommend',
 
-                             message='头条推荐流文章插入失败',
 
-                             data=error_data,
 
-                             status='fail'
 
-                         )
 
-                 else:
 
-                     print("视频文章跳过")
 
-             else:
 
-                 print("无链接文章跳过")
 
-     def run(self, category) -> None:
 
-         """
 
-         主函数
 
-         :return:
 
-         """
 
-         for i in range(10):
 
-             try:
 
-                 article_list = self.get_recommendation_article_list(category=category)
 
-                 self.process_recommendation(category=category, recommendation=article_list)
 
-                 time.sleep(3)
 
-             except Exception as e:
 
-                 error_data = {
 
-                     "error": str(e),
 
-                     "error_stack": traceback.format_exc()
 
-                 }
 
-                 print(error_data)
 
 
  |