123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- """
- @author: luojunhui
- """
- import json
- import time
- import traceback
- from typing import Dict
- import requests
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from applications import bot
- from applications import log
- from applications import Functions
- from applications.db import DatabaseConnector
- from config import long_articles_config
- from coldStartTasks.filter import article_crawler_duplicate_filter
- functions = Functions()
- class ToutiaoRecommendCrawler(object):
- """
- 今日头条推荐流
- """
- def __init__(self) -> None:
- self.db_client = None
- def init_database(self) -> None:
- """
- 初始化数据库
- :return:
- """
- try:
- self.db_client = DatabaseConnector(db_config=long_articles_config)
- self.db_client.connect()
- except Exception as e:
- bot(
- title="今日头条推荐流文章抓取任务数据库连接失败",
- detail={
- "error": str(e),
- "error_stack": traceback.format_exc()
- }
- )
- def get_request_params(self, category) -> Dict:
- """
- 获取请求参数
- :return:
- """
- select_sql = f"""
- SELECT request_method, request_url, request_headers, post_data
- FROM toutiao_request_params
- WHERE category = '{category}' and expire_flag = 0
- ORDER BY id
- LIMIT 1;
- """
- result = self.db_client.fetch(
- query=select_sql,
- cursor_type=DictCursor
- )
- if not result:
- print("cookie没了报警")
- return {}
- else:
- return result[0]
- def get_recommendation_article_list(self, category) -> Dict:
- """
- 获取历史推荐流文章
- :return:
- """
- cookie_obj = self.get_request_params(category)
- if not cookie_obj:
- return {}
- response = requests.request(
- method=cookie_obj['request_method'],
- url=cookie_obj['request_url'],
- headers=json.loads(cookie_obj['request_headers']),
- proxies=functions.proxy()
- )
- if response.text is None:
- print("{}: cookie 失效".format(category))
- return response.json()
- def insert_each_article(self, category, item: Dict) -> None:
- """
- 提取文章信息
- :param item
- :param category
- :return:
- """
- title = item['title']
- if article_crawler_duplicate_filter(new_article_title=title, db_client=self.db_client):
- log(
- function='toutiao_recommend_crawler',
- task='toutiao_recommend',
- message='标题去重'
- )
- return
- item_id = item.get('item_id')
- article_url = item['article_url']
- like_count = item['like_count']
- read_count = item['read_count']
- user_info = item['user_info']
- user_id = user_info.get('user_id')
- abstract = item['Abstract']
- publish_time = item['publish_time']
- insert_sql = f"""
- INSERT IGNORE INTO crawler_meta_article
- (platform, mode, category, out_account_id, title, link, read_cnt, like_cnt, status, description, publish_time, crawler_time, unique_index)
- VALUES
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- self.db_client.save(
- query=insert_sql,
- params=(
- "toutiao",
- "recommend",
- category,
- user_id,
- title,
- article_url,
- read_count,
- like_count,
- 1,
- abstract,
- publish_time,
- int(time.time()),
- item_id
- )
- )
- def process_recommendation(self, category, recommendation):
- """
- 处理推荐流文章
- :param recommendation
- :param category
- :return:
- """
- for item in tqdm(recommendation['data']):
- if item.get('article_url'):
- video_flag = item.get('has_video')
- if not video_flag:
- try:
- self.insert_each_article(category=category, item=item)
- except Exception as e:
- error_data = {
- "error": str(e),
- "error_stack": traceback.format_exc()
- }
- log(
- function='toutiao_recommend_crawler',
- task='toutiao_recommend',
- message='头条推荐流文章插入失败',
- data=error_data,
- status='fail'
- )
- else:
- print("视频文章跳过")
- else:
- print("无链接文章跳过")
- def run(self, category) -> None:
- """
- 主函数
- :return:
- """
- for i in range(10):
- try:
- article_list = self.get_recommendation_article_list(category=category)
- self.process_recommendation(category=category, recommendation=article_list)
- time.sleep(3)
- except Exception as e:
- error_data = {
- "error": str(e),
- "error_stack": traceback.format_exc()
- }
- print(error_data)
|