""" @author: luojunhui """ import json import time import datetime import pandas as pd import traceback from pandas import DataFrame from tqdm import tqdm from applications import log, aiditApi, bot from applications.const import ColdStartTaskConst from config import apolloConfig const = ColdStartTaskConst() config = apolloConfig() category_cold_start_threshold = json.loads( config.getConfigValue("category_cold_start_threshold") ) READ_TIMES_THRESHOLD = category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3) READ_THRESHOLD = category_cold_start_threshold.get("READ_THRESHOLD", 5000) LIMIT_TITLE_LENGTH = category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15) TITLE_LENGTH_MAX = category_cold_start_threshold.get("TITLE_LENGTH_MAX", 50) def get_article_from_meta_table(db_client, category: str, platform: str) -> DataFrame: """ get article from meta data :param db_client: database connector :param category: article category :param platform: article platform :return: article dataframe """ sql = f""" select article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score from crawler_meta_article where category = "{category}" and platform = "{platform}" and title_sensitivity = {const.TITLE_NOT_SENSITIVE} order by score desc; """ article_list = db_client.fetch(sql) log( task="category_publish_task", function="get_articles_from_meta_table", message="获取品类文章总数", data={"total_articles": len(article_list), "category": category}, ) article_df = pd.DataFrame( article_list, columns=[ "article_id", "gh_id", "position", "title", "link", "read_cnt", "status", "llm_sensitivity", "score", ], ) return article_df def update_published_articles_status(db_client) -> None: """ filter published articles """ category_map = json.loads(config.getConfigValue("category_cold_start_map")) category_list = list(category_map.keys()) processing_bar = tqdm(category_list, desc="fileter_published_articles") for category in processing_bar: plan_id = category_map.get(category) if plan_id: article_list = aiditApi.get_generated_article_list(plan_id) title_list = [i[1] for i in article_list] if title_list: update_sql = f""" update crawler_meta_article set status = %s where title in %s and status = %s; """ affected_rows = db_client.save( query=update_sql, params=( const.PUBLISHED_STATUS, tuple(title_list), const.INIT_STATUS, ), ) processing_bar.set_postfix( {"category": category, "affected_rows": affected_rows} ) else: return def filter_by_read_times(article_df: DataFrame) -> DataFrame: """ filter by read times """ article_df["average_read"] = article_df.groupby(["gh_id", "position"])[ "read_cnt" ].transform("mean") article_df["read_times"] = article_df["read_cnt"] / article_df["average_read"] filter_df = article_df[article_df["read_times"] >= READ_TIMES_THRESHOLD] return filter_df def filter_by_status(article_df: DataFrame) -> DataFrame: """ filter by status """ filter_df = article_df[article_df["status"] == const.INIT_STATUS] return filter_df def filter_by_read_cnt(article_df: DataFrame) -> DataFrame: """ filter by read cnt """ filter_df = article_df[article_df["read_cnt"] >= READ_THRESHOLD] return filter_df def filter_by_title_length(article_df: DataFrame) -> DataFrame: """ filter by title length """ filter_df = article_df[ (article_df["title"].str.len() >= LIMIT_TITLE_LENGTH) & (article_df["title"].str.len() <= TITLE_LENGTH_MAX) ] return filter_df def filter_by_sensitive_words(article_df: DataFrame) -> DataFrame: """ filter by sensitive words """ filter_df = article_df[ (~article_df["title"].str.contains("农历")) & (~article_df["title"].str.contains("太极")) & (~article_df["title"].str.contains("节")) & (~article_df["title"].str.contains("早上好")) & (~article_df["title"].str.contains("赖清德")) & (~article_df["title"].str.contains("普京")) & (~article_df["title"].str.contains("俄")) & (~article_df["title"].str.contains("南海")) & (~article_df["title"].str.contains("台海")) & (~article_df["title"].str.contains("解放军")) & (~article_df["title"].str.contains("蔡英文")) & (~article_df["title"].str.contains("中国")) ] return filter_df def filter_by_similarity_score(article_df: DataFrame, score) -> DataFrame: """ filter by similarity score """ filter_df = article_df[article_df["score"] >= score] return filter_df def insert_into_article_crawler_plan( db_client, crawler_plan_id, crawler_plan_name, create_timestamp ): """ insert into article crawler plan """ insert_sql = f""" insert into article_crawler_plan (crawler_plan_id, name, create_timestamp) values (%s, %s, %s); """ try: db_client.save( query=insert_sql, params=(crawler_plan_id, crawler_plan_name, create_timestamp), ) except Exception as e: bot( title="品类冷启任务,记录抓取计划id失败", detail={ "error": str(e), "error_msg": traceback.format_exc(), "crawler_plan_id": crawler_plan_id, "crawler_plan_name": crawler_plan_name, }, ) def create_crawler_plan(url_list, plan_tag, platform) -> tuple: """ create crawler plan """ crawler_plan_response = aiditApi.auto_create_crawler_task( plan_id=None, plan_name="自动绑定-文章联想--{}--{}".format( datetime.date.today().__str__(), len(url_list) ), plan_tag=plan_tag, article_source=platform, url_list=url_list, ) log( task="category_publish_task", function="publish_filter_articles", message="成功创建抓取计划", data=crawler_plan_response, ) # save to db create_timestamp = int(time.time()) * 1000 crawler_plan_id = crawler_plan_response["data"]["id"] crawler_plan_name = crawler_plan_response["data"]["name"] return crawler_plan_id, crawler_plan_name, create_timestamp def bind_to_generate_plan(category, crawler_plan_id, crawler_plan_name, platform): """ auto bind to generate plan """ match platform: case "weixin": input_source_channel = 5 case "toutiao": input_source_channel = 6 case _: input_source_channel = 5 new_crawler_task_list = [ { "contentType": 1, "inputSourceType": 2, "inputSourceSubType": None, "fieldName": None, "inputSourceValue": crawler_plan_id, "inputSourceLabel": crawler_plan_name, "inputSourceModal": 3, "inputSourceChannel": input_source_channel, } ] category_map = json.loads(config.getConfigValue("category_cold_start_map")) generate_plan_response = aiditApi.bind_crawler_task_to_generate_task( crawler_task_list=new_crawler_task_list, generate_task_id=category_map[category] ) log( task="category_publish_task", function="publish_filter_articles", message="成功绑定到生成计划", data=generate_plan_response, ) def update_article_status_after_publishing(db_client, article_id_list): """ update article status after publishing """ update_sql = f""" update crawler_meta_article set status = %s where article_id in %s and status = %s; """ affect_rows = db_client.save( query=update_sql, params=(const.PUBLISHED_STATUS, tuple(article_id_list), const.INIT_STATUS), ) if affect_rows != len(article_id_list): bot( title="品类冷启任务中,出现更新状文章状态失败异常", detail={"affected_rows": affect_rows, "task_rows": len(article_id_list)}, )