123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- from __future__ import annotations
- import time
- import datetime
- import traceback
- from typing import List, Dict
- from pandas import DataFrame
- from applications.api import task_apollo, feishu_robot
- from applications.api import auto_create_crawler_task
- from applications.api import auto_bind_crawler_task_to_generate_task
- from applications.utils import get_titles_from_produce_plan
- class ArticlePoolColdStartConst:
- # article
- DAILY_ARTICLE_NUM = 1000
- SIMILARITY_SCORE_THRESHOLD = 0.5
- TITLE_NOT_SENSITIVE = 0
- TITLE_SENSITIVE = 1
- PUBLISHED_STATUS = 2
- INIT_STATUS = 1
- BAD_STATUS = 0
- READ_TIMES_THRESHOLD = 1.3
- READ_THRESHOLD = 5000
- TITLE_LENGTH_LIMIT = 15
- TITLE_LENGTH_MAX = 50
- DEFAULT_CRAWLER_METHODS = ["1030-手动挑号", "account_association"]
- class ArticlePoolColdStart(ArticlePoolColdStartConst):
- def __init__(self, pool, log_client, trace_id):
- self.pool = pool
- self.log_client = log_client
- self.trace_id = trace_id
- async def get_article_from_meta_table(
- self, platform: str, crawl_method: str, strategy: str
- ) -> DataFrame:
- """
- @param platform: 文章抓取平台
- @param crawl_method: 文章抓取模式
- @param strategy: 供给策略
- """
- match platform:
- case "weixin":
- article_list = await self.get_weixin_cold_start_articles(
- crawl_method, strategy
- )
- case "toutiao":
- article_list = await self.get_toutiao_cold_start_articles(
- crawl_method, strategy
- )
- case _:
- raise ValueError("Invalid platform")
- return DataFrame(
- article_list,
- columns=[
- "article_id",
- "title",
- "link",
- "llm_sensitivity",
- "score",
- "category_by_ai",
- ],
- )
- async def get_weixin_cold_start_articles(
- self, crawl_method: str, strategy: str
- ) -> List[Dict]:
- match strategy:
- case "strategy_v1":
- query = f"""
- select
- article_id, title, link, llm_sensitivity, score, category_by_ai
- from crawler_meta_article t1
- join crawler_meta_article_accounts_read_avg t2 on t1.out_account_id = t2.gh_id and t1.article_index = t2.position
- where category = %s
- and platform = %s
- and title_sensitivity = %s
- and t1.status = %s
- and t1.read_cnt / t2.read_avg >= %s
- and t1.read_cnt >= %s
- and t2.status = %s
- order by score desc;
- """
- article_list = await self.pool.async_fetch(
- query=query,
- params=(
- crawl_method,
- "weixin",
- self.TITLE_NOT_SENSITIVE,
- self.INIT_STATUS,
- self.READ_TIMES_THRESHOLD,
- self.READ_THRESHOLD,
- self.INIT_STATUS,
- ),
- )
- return article_list
- case _:
- raise ValueError("Invalid strategy")
- async def get_toutiao_cold_start_articles(
- self, crawl_method: str, strategy: str
- ) -> List[Dict]:
- match strategy:
- case "strategy_v1":
- query = f"""
- select article_id, title, link, llm_sensitivity, score, category_by_ai
- from crawler_meta_article
- where category = %s
- and platform = %s
- and status = %s;
- """
- article_list = await self.pool.async_fetch(
- query=query, params=(crawl_method, "toutiao", self.INIT_STATUS)
- )
- return article_list
- case _:
- raise ValueError("Invalid strategy")
- async def filter_published_titles(self, plan_id):
- """
- 过滤已添加至aigc中的标题
- """
- published_title_tuple = await get_titles_from_produce_plan(self.pool, plan_id)
- update_query = f"""
- update crawler_meta_article set status = %s where title in %s and status = %s;
- """
- changed_rows = await self.pool.async_save(
- query=update_query,
- params=(self.PUBLISHED_STATUS, published_title_tuple, self.INIT_STATUS),
- )
- return changed_rows
- async def filter_weixin_articles(self, dataframe, crawl_method):
- """微信过滤漏斗"""
- total_length: int = dataframe.shape[0]
- # 通过标题长度过滤
- filter_df = dataframe[
- (dataframe["title"].str.len() <= self.TITLE_LENGTH_MAX)
- & (dataframe["title"].str.len() >= self.TITLE_LENGTH_LIMIT)
- ]
- length_level1 = filter_df.shape[0]
- # 通过敏感词过滤
- sensitive_keywords = [
- "农历",
- "太极",
- "节",
- "早上好",
- "赖清德",
- "普京",
- "俄",
- "南海",
- "台海",
- "解放军",
- "蔡英文",
- "中国",
- ]
- # 构建正则表达式,使用 | 连接表示“或”的关系
- pattern = "|".join(sensitive_keywords)
- filter_df = filter_df[~filter_df["title"].str.contains(pattern, na=False)]
- # 获取过滤后的行数
- length_level2 = filter_df.shape[0]
- filter_df = filter_df[~(filter_df["llm_sensitivity"] > 0)]
- length_level3 = filter_df.shape[0]
- # 第4层通过相关性分数过滤
- filter_df = filter_df[filter_df["score"] > self.SIMILARITY_SCORE_THRESHOLD]
- length_level4 = filter_df.shape[0]
- await feishu_robot.bot(
- title="冷启任务发布通知",
- detail={
- "总文章数量": total_length,
- "通过标题长度过滤": "过滤数量: {} 剩余数量: {}".format(
- total_length - length_level1, length_level1
- ),
- "通过敏感词过滤": "过滤数量: {} 剩余数量: {}".format(
- length_level1 - length_level2, length_level2
- ),
- "通过LLM敏感度过滤": "过滤数量: {} 剩余数量: {}".format(
- length_level2 - length_level3, length_level3
- ),
- "通过相关性分数过滤": "过滤数量: {} 剩余数量: {}".format(
- length_level3 - length_level4, length_level4
- ),
- "渠道": crawl_method,
- "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
- "阅读量阈值": self.READ_THRESHOLD,
- "标题长度阈值": self.TITLE_LENGTH_LIMIT,
- },
- mention=False,
- )
- return filter_df[: self.DAILY_ARTICLE_NUM]
- async def filter_toutiao_articles(self, dataframe, crawl_method):
- total_length = dataframe.shape[0]
- filter_df = dataframe[dataframe["score"] > self.SIMILARITY_SCORE_THRESHOLD]
- await feishu_robot.bot(
- title="冷启动创建抓取计划",
- detail={
- "渠道": crawl_method,
- "总文章数量": total_length,
- "相关性分数过滤剩余": filter_df.shape[0],
- },
- mention=False,
- )
- return filter_df[: self.DAILY_ARTICLE_NUM]
- async def insert_crawler_plan_into_database(
- self, crawler_plan_id, crawler_plan_name, create_timestamp
- ):
- query = f"""
- insert into article_crawler_plan (crawler_plan_id, name, create_timestamp)
- values (%s, %s, %s)
- """
- try:
- await self.pool.async_save(
- query=query,
- params=(crawler_plan_id, crawler_plan_name, create_timestamp),
- )
- except Exception as e:
- await feishu_robot.bot(
- title="品类冷启任务,记录抓取计划id失败",
- detail={
- "error": str(e),
- "error_msg": traceback.format_exc(),
- "crawler_plan_id": crawler_plan_id,
- "crawler_plan_name": crawler_plan_name,
- },
- )
- async def change_article_status_while_publishing(self, article_id_list):
- """
- :param: article_id_list: 文章的唯一 id
- :return:
- """
- query = f"""
- update crawler_meta_article
- set status = %s
- where article_id in %s and status = %s;
- """
- affect_rows = await self.pool.async_save(
- query=query,
- params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS),
- )
- return affect_rows
- async def create_cold_start_plan(
- self, platform, crawl_method, plan_id, strategy="strategy_v1"
- ):
- article_dataframe = await self.get_article_from_meta_table(
- platform, crawl_method, strategy
- )
- await self.log_client.log(
- contents={
- "task": "article_pool_cold_start",
- "platform": platform,
- "crawl_method": crawl_method,
- "status": "success",
- "trace_id": self.trace_id,
- "message": "获取文章成功",
- "data": {"article_length": article_dataframe.shape[0]},
- }
- )
- match platform:
- case "weixin":
- input_source_channel = 5
- filter_article_df = await self.filter_weixin_articles(
- article_dataframe, crawl_method
- )
- case "toutiao":
- input_source_channel = 6
- filter_article_df = await self.filter_toutiao_articles(
- article_dataframe, crawl_method
- )
- case _:
- raise ValueError("Invalid platform")
- # split article into each category
- category_list = await task_apollo.get_config_value(key="category_list")
- for ai_category in category_list:
- filter_category_df = filter_article_df[
- filter_article_df["category_by_ai"] == ai_category
- ]
- url_list = filter_category_df["link"].values.tolist()
- if url_list:
- # create_crawler_plan
- crawler_plan_response = await auto_create_crawler_task(
- plan_id=None,
- plan_name="自动绑定-{}-{}-{}--{}".format(
- crawl_method,
- ai_category,
- datetime.date.today().__str__(),
- len(url_list),
- ),
- plan_tag="品类冷启动",
- platform=platform,
- url_list=url_list,
- )
- # save to db
- create_timestamp = int(time.time()) * 1000
- crawler_plan_id = crawler_plan_response["data"]["id"]
- crawler_plan_name = crawler_plan_response["data"]["name"]
- await self.insert_crawler_plan_into_database(
- crawler_plan_id, crawler_plan_name, create_timestamp
- )
- # auto bind to generate plan
- new_crawler_task_list = [
- {
- "contentType": 1,
- "inputSourceType": 2,
- "inputSourceSubType": None,
- "fieldName": None,
- "inputSourceValue": crawler_plan_id,
- "inputSourceLabel": crawler_plan_name,
- "inputSourceModal": 3,
- "inputSourceChannel": input_source_channel,
- }
- ]
- generate_plan_response = await auto_bind_crawler_task_to_generate_task(
- crawler_task_list=new_crawler_task_list, generate_task_id=plan_id
- )
- await self.log_client.log(
- contents={
- "task": "article_pool_cold_start",
- "platform": platform,
- "crawl_method": crawl_method,
- "status": "success",
- "trace_id": self.trace_id,
- "message": "绑定至生成计划成功",
- "data": generate_plan_response,
- }
- )
- # change article status
- article_id_list = filter_category_df["article_id"].values.tolist()
- await self.change_article_status_while_publishing(
- article_id_list=article_id_list
- )
- async def deal(self, platform: str, crawl_methods: List[str]) -> None:
- if not crawl_methods:
- crawl_methods = self.DEFAULT_CRAWLER_METHODS
- await self.log_client.log(
- contents={
- "task": "article_pool_cold_start",
- "platform": platform,
- "crawl_methods": crawl_methods,
- "status": "success",
- "trace_id": self.trace_id,
- }
- )
- crawl_methods_map = await task_apollo.get_config_value(
- key="category_cold_start_map"
- )
- for crawl_method in crawl_methods:
- try:
- plan_id = crawl_methods_map[crawl_method]
- affected_rows = await self.filter_published_titles(plan_id)
- await self.log_client.log(
- contents={
- "task": "article_pool_cold_start",
- "platform": platform,
- "crawl_method": crawl_method,
- "status": "success",
- "trace_id": self.trace_id,
- "message": "通过已抓取标题修改文章状态",
- "data": {"affected_rows": affected_rows},
- }
- )
- await self.create_cold_start_plan(platform, crawl_method, plan_id)
- except Exception as e:
- await feishu_robot.bot(
- title="文章冷启动异常",
- detail={
- "crawl_method": crawl_method,
- "error": str(e),
- "function": "deal",
- "traceback": traceback.format_exc(),
- },
- )
|