123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- """
- @author: luojunhui
- """
- import json
- import time
- import datetime
- import pandas as pd
- import traceback
- from pandas import DataFrame
- from tqdm import tqdm
- from applications import log, aiditApi, bot
- from applications.const import ColdStartTaskConst
- from config import apolloConfig
- const = ColdStartTaskConst()
- config = apolloConfig()
- category_cold_start_threshold = json.loads(
- config.getConfigValue("category_cold_start_threshold")
- )
- READ_TIMES_THRESHOLD = category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
- READ_THRESHOLD = category_cold_start_threshold.get("READ_THRESHOLD", 5000)
- LIMIT_TITLE_LENGTH = category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
- TITLE_LENGTH_MAX = category_cold_start_threshold.get("TITLE_LENGTH_MAX", 50)
- def get_article_from_meta_table(db_client, category: str, platform: str) -> DataFrame:
- """
- get article from meta data
- :param db_client: database connector
- :param category: article category
- :param platform: article platform
- :return: article dataframe
- """
- sql = f"""
- select
- article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score
- from crawler_meta_article
- where category = "{category}" and platform = "{platform}" and title_sensitivity = {const.TITLE_NOT_SENSITIVE}
- order by score desc;
- """
- article_list = db_client.fetch(sql)
- log(
- task="category_publish_task",
- function="get_articles_from_meta_table",
- message="获取品类文章总数",
- data={"total_articles": len(article_list), "category": category},
- )
- article_df = pd.DataFrame(
- article_list,
- columns=[
- "article_id",
- "gh_id",
- "position",
- "title",
- "link",
- "read_cnt",
- "status",
- "llm_sensitivity",
- "score",
- ],
- )
- return article_df
- def update_published_articles_status(db_client) -> None:
- """
- filter published articles
- """
- category_map = json.loads(config.getConfigValue("category_cold_start_map"))
- category_list = list(category_map.keys())
- processing_bar = tqdm(category_list, desc="fileter_published_articles")
- for category in processing_bar:
- plan_id = category_map.get(category)
- if plan_id:
- article_list = aiditApi.get_generated_article_list(plan_id)
- title_list = [i[1] for i in article_list]
- if title_list:
- update_sql = f"""
- update crawler_meta_article
- set status = %s
- where title in %s and status = %s;
- """
- affected_rows = db_client.save(
- query=update_sql,
- params=(
- const.PUBLISHED_STATUS,
- tuple(title_list),
- const.INIT_STATUS,
- ),
- )
- processing_bar.set_postfix(
- {"category": category, "affected_rows": affected_rows}
- )
- else:
- return
- def filter_by_read_times(article_df: DataFrame) -> DataFrame:
- """
- filter by read times
- """
- article_df["average_read"] = article_df.groupby(["gh_id", "position"])[
- "read_cnt"
- ].transform("mean")
- article_df["read_times"] = article_df["read_cnt"] / article_df["average_read"]
- filter_df = article_df[article_df["read_times"] >= READ_TIMES_THRESHOLD]
- return filter_df
- def filter_by_status(article_df: DataFrame) -> DataFrame:
- """
- filter by status
- """
- filter_df = article_df[article_df["status"] == const.INIT_STATUS]
- return filter_df
- def filter_by_read_cnt(article_df: DataFrame) -> DataFrame:
- """
- filter by read cnt
- """
- filter_df = article_df[article_df["read_cnt"] >= READ_THRESHOLD]
- return filter_df
- def filter_by_title_length(article_df: DataFrame) -> DataFrame:
- """
- filter by title length
- """
- filter_df = article_df[
- (article_df["title"].str.len() >= LIMIT_TITLE_LENGTH)
- & (article_df["title"].str.len() <= TITLE_LENGTH_MAX)
- ]
- return filter_df
- def filter_by_sensitive_words(article_df: DataFrame) -> DataFrame:
- """
- filter by sensitive words
- """
- filter_df = article_df[
- (~article_df["title"].str.contains("农历"))
- & (~article_df["title"].str.contains("太极"))
- & (~article_df["title"].str.contains("节"))
- & (~article_df["title"].str.contains("早上好"))
- & (~article_df["title"].str.contains("赖清德"))
- & (~article_df["title"].str.contains("普京"))
- & (~article_df["title"].str.contains("俄"))
- & (~article_df["title"].str.contains("南海"))
- & (~article_df["title"].str.contains("台海"))
- & (~article_df["title"].str.contains("解放军"))
- & (~article_df["title"].str.contains("蔡英文"))
- & (~article_df["title"].str.contains("中国"))
- ]
- return filter_df
- def filter_by_similarity_score(article_df: DataFrame, score) -> DataFrame:
- """
- filter by similarity score
- """
- filter_df = article_df[article_df["score"] >= score]
- return filter_df
- def insert_into_article_crawler_plan(
- db_client, crawler_plan_id, crawler_plan_name, create_timestamp
- ):
- """
- insert into article crawler plan
- """
- insert_sql = f"""
- insert into article_crawler_plan (crawler_plan_id, name, create_timestamp)
- values (%s, %s, %s);
- """
- try:
- db_client.save(
- query=insert_sql,
- params=(crawler_plan_id, crawler_plan_name, create_timestamp),
- )
- except Exception as e:
- bot(
- title="品类冷启任务,记录抓取计划id失败",
- detail={
- "error": str(e),
- "error_msg": traceback.format_exc(),
- "crawler_plan_id": crawler_plan_id,
- "crawler_plan_name": crawler_plan_name,
- },
- )
- def create_crawler_plan(url_list, plan_tag, platform) -> tuple:
- """
- create crawler plan
- """
- crawler_plan_response = aiditApi.auto_create_crawler_task(
- plan_id=None,
- plan_name="自动绑定-文章联想--{}--{}".format(
- datetime.date.today().__str__(), len(url_list)
- ),
- plan_tag=plan_tag,
- article_source=platform,
- url_list=url_list,
- )
- log(
- task="category_publish_task",
- function="publish_filter_articles",
- message="成功创建抓取计划",
- data=crawler_plan_response,
- )
- # save to db
- create_timestamp = int(time.time()) * 1000
- crawler_plan_id = crawler_plan_response["data"]["id"]
- crawler_plan_name = crawler_plan_response["data"]["name"]
- return crawler_plan_id, crawler_plan_name, create_timestamp
- def bind_to_generate_plan(category, crawler_plan_id, crawler_plan_name, platform):
- """
- auto bind to generate plan
- """
- match platform:
- case "weixin":
- input_source_channel = 5
- case "toutiao":
- input_source_channel = 6
- case _:
- input_source_channel = 5
- new_crawler_task_list = [
- {
- "contentType": 1,
- "inputSourceType": 2,
- "inputSourceSubType": None,
- "fieldName": None,
- "inputSourceValue": crawler_plan_id,
- "inputSourceLabel": crawler_plan_name,
- "inputSourceModal": 3,
- "inputSourceChannel": input_source_channel,
- }
- ]
- category_map = json.loads(config.getConfigValue("category_cold_start_map"))
- generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
- crawler_task_list=new_crawler_task_list, generate_task_id=category_map[category]
- )
- log(
- task="category_publish_task",
- function="publish_filter_articles",
- message="成功绑定到生成计划",
- data=generate_plan_response,
- )
- def update_article_status_after_publishing(db_client, article_id_list):
- """
- update article status after publishing
- """
- update_sql = f"""
- update crawler_meta_article
- set status = %s
- where article_id in %s and status = %s;
- """
- affect_rows = db_client.save(
- query=update_sql,
- params=(const.PUBLISHED_STATUS, tuple(article_id_list), const.INIT_STATUS),
- )
- if affect_rows != len(article_id_list):
- bot(
- title="品类冷启任务中,出现更新状文章状态失败异常",
- detail={"affected_rows": affect_rows, "task_rows": len(article_id_list)},
- )
|