""" @author: luojunhui """ import time import traceback from datetime import datetime import numpy as np from pymysql.cursors import DictCursor from tqdm import tqdm from applications import WeixinSpider, log from applications.api import similarity_between_title_list from applications.const import ColdStartTaskConst from applications.db import DatabaseConnector from applications.functions import Functions from applications.utils import get_inner_account_set from applications.utils import whether_title_sensitive from config import long_articles_config spider = WeixinSpider() functions = Functions() const = ColdStartTaskConst() class ArticleAssociationCrawler(object): """ article association crawler task """ def __init__(self): self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() self.inner_account_set = get_inner_account_set() def get_seed_url_list(self, biz_date): """ 获取种子url列表 """ sql = f""" select gh_id, title, link from datastat_sort_strategy where date_str > DATE_FORMAT(DATE_SUB('{biz_date}', INTERVAL 2 DAY), '%Y%m%d') and view_count > {const.READ_COUNT_THRESHOLD} and read_rate > {const.READ_AVG_THRESHOLD} and type = {const.BULK_PUBLISH_TYPE} order by read_rate desc limit {const.SEED_ARTICLE_LIMIT_NUM}; """ seed_article_list = self.db_client.fetch(query=sql, cursor_type=DictCursor) return seed_article_list def get_level_up_title_list(self): """ 获取晋级文章标题列表 status: 1 表示文章已经溯源完成 deleted: 0 表示文章正常 level = 'autoArticlePoolLevel1' 表示头条 """ sql = f""" select distinct title from article_pool_promotion_source where level = 'autoArticlePoolLevel1' and status = 1 and deleted = 0; """ mysql_response = self.db_client.fetch(query=sql) title_list = [i[0] for i in mysql_response] return title_list def get_recommend_url_list_with_depth( self, seed_url, source_title, source_account, base_title_list, depth=1 ): """ @param seed_url: good url from data_sort_strategy @param depth: association depth @param source_title: article title @param source_account: article account """ if depth > const.ARTICLE_ASSOCIATION_MAX_DEPTH: return res = spider.get_recommend_articles(content_link=seed_url) related_articles = res["data"]["data"]["list"] if related_articles: title_list = [i["title"] for i in related_articles] similarity_array = similarity_between_title_list( title_list, base_title_list ) recommend_articles = [] for index, score_list in enumerate(similarity_array): sorted_score_list = sorted(score_list) percent_threshold_score = np.percentile( sorted_score_list, const.PERCENT_THRESHOLD ) if percent_threshold_score < const.CORRELATION_THRESHOLD: continue else: article_obj = related_articles[index] article_obj["score"] = percent_threshold_score recommend_articles.append(article_obj) recommend_process_bar = tqdm( recommend_articles, desc="save recommend articles" ) for article in recommend_process_bar: obj = { "title": article["title"], "url": article["url"], "gh_id": article["username"], "index": article["idx"], "send_time": article["send_time"], "read_cnt": article["read_num"], "depth": depth, "source_article_title": source_title, "source_account": source_account, } self.insert_recommend_article(obj) recommend_process_bar.set_postfix( {"title": article["title"], "depth": depth} ) self.get_recommend_url_list_with_depth( seed_url=obj["url"], source_title=obj["title"], source_account=obj["gh_id"], base_title_list=base_title_list, depth=depth + 1, ) else: return def insert_recommend_article(self, obj): """ insert recommend article """ # whether account inside if obj["gh_id"] in self.inner_account_set: return # whether article title exists title = obj["title"] select_sql = "select article_id from crawler_meta_article where title = %s;" res = self.db_client.fetch(query=select_sql, params=(title,)) if res: return # whether title sensitive title_sensitivity = ( const.TITLE_SENSITIVE if whether_title_sensitive(title) else const.TITLE_NOT_SENSITIVE ) # insert this article insert_sql = f""" insert into crawler_meta_article (platform, mode, category, out_account_id, article_index, title, link, read_cnt, publish_time, crawler_time, status, unique_index, source_article_title, source_account, title_sensitivity) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ self.db_client.save( query=insert_sql, params=( "weixin", "recommend", "article_association", obj["gh_id"], obj["index"], obj["title"], obj["url"], obj["read_cnt"], obj["send_time"], int(time.time()), const.DEFAULT_ARTICLE_STATUS, functions.generateGzhId(obj["url"]), obj["source_article_title"], obj["source_account"], title_sensitivity, ), ) def deal(self, biz_date=None): """ class entrance :param biz_date: """ if biz_date is None: biz_date = datetime.today().strftime("%Y-%m-%d") seed_article_list = self.get_seed_url_list(biz_date) deal_bar = tqdm(seed_article_list, desc="article association crawler") base_title_list = self.get_level_up_title_list() for article in deal_bar: try: self.get_recommend_url_list_with_depth( seed_url=article["link"], source_title=article["title"], source_account=article["gh_id"], base_title_list=base_title_list, ) deal_bar.set_postfix({"article_title": article["title"]}) except Exception as e: log( task="article_association_crawler", function="deal", message=f"article association crawler error, article title: {article['title']}, error: {e}", data={"article": article, "traceback": traceback.format_exc()}, )