123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- from __future__ import annotations
- import time
- import datetime
- import traceback
- from typing import List
- from pandas import DataFrame
- from tqdm.asyncio import tqdm
- from applications.api import task_apollo, feishu_robot
- from applications.api import auto_create_crawler_task
- from applications.api import auto_bind_crawler_task_to_generate_task
- from applications.config import category_config, input_source_map
- from applications.utils import get_titles_from_produce_plan
- from applications.tasks.cold_start_tasks.article_pool import (
- ArticlePoolColdStartStrategy,
- ArticlePoolFilterStrategy
- )
- class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrategy):
- def __init__(self, pool, log_client, trace_id):
- super().__init__(pool, log_client, trace_id)
- async def get_article_from_meta_table(
- self, platform: str, crawl_method: str, strategy: str, category: str | None
- ) -> DataFrame:
- """
- @param platform: 文章抓取平台
- @param crawl_method: 文章抓取模式
- @param strategy: 供给策略
- """
- match platform:
- case "weixin":
- article_list = await self.get_weixin_cold_start_articles(
- crawl_method, strategy, category
- )
- case "toutiao":
- article_list = await self.get_toutiao_cold_start_articles(
- crawl_method, strategy, category
- )
- case _:
- raise ValueError("Invalid platform")
- return DataFrame(
- article_list,
- columns=[
- "article_id",
- "title",
- "link",
- "llm_sensitivity",
- "score",
- "category_by_ai",
- ],
- )
- async def filter_published_titles(self, plan_id):
- """
- 过滤已添加至aigc中的标题
- """
- published_title_tuple = await get_titles_from_produce_plan(self.pool, plan_id)
- update_query = """
- update crawler_meta_article set status = %s where title in %s and status = %s;
- """
- changed_rows = await self.pool.async_save(
- query=update_query,
- params=(self.PUBLISHED_STATUS, published_title_tuple, self.INIT_STATUS),
- )
- return changed_rows
- async def insert_crawler_plan_into_database(
- self, crawler_plan_id, crawler_plan_name, create_timestamp
- ):
- query = """
- insert into article_crawler_plan (crawler_plan_id, name, create_timestamp)
- values (%s, %s, %s)
- """
- try:
- await self.pool.async_save(
- query=query,
- params=(crawler_plan_id, crawler_plan_name, create_timestamp),
- )
- except Exception as e:
- await feishu_robot.bot(
- title="品类冷启任务,记录抓取计划id失败",
- detail={
- "error": str(e),
- "error_msg": traceback.format_exc(),
- "crawler_plan_id": crawler_plan_id,
- "crawler_plan_name": crawler_plan_name,
- },
- )
- async def change_article_status_while_publishing(self, article_id_list):
- """
- :param: article_id_list: 文章的唯一 id
- :return:
- """
- query = """
- update crawler_meta_article
- set status = %s
- where article_id in %s and status = %s;
- """
- affect_rows = await self.pool.async_save(
- query=query,
- params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS),
- )
- return affect_rows
- async def create_crawler_plan_and_bind_to_produce_plan(
- self,
- strategy: str,
- crawl_method: str,
- category: str,
- platform: str,
- url_list: List[str],
- plan_id: str,
- ):
- # create_crawler_plan
- crawler_plan_response = await auto_create_crawler_task(
- plan_id=None,
- plan_name=f"冷启动-{strategy}-{category}-{datetime.date.today().__str__()}-{len(url_list)}",
- plan_tag="品类冷启动",
- platform=platform,
- url_list=url_list,
- )
- # save to db
- create_timestamp = int(time.time()) * 1000
- crawler_plan_id = crawler_plan_response["data"]["id"]
- crawler_plan_name = crawler_plan_response["data"]["name"]
- await self.insert_crawler_plan_into_database(
- crawler_plan_id, crawler_plan_name, create_timestamp
- )
- # auto bind to generate plan
- new_crawler_task_list = [
- {
- "contentType": 1,
- "inputSourceType": 2,
- "inputSourceSubType": None,
- "fieldName": None,
- "inputSourceValue": crawler_plan_id,
- "inputSourceLabel": crawler_plan_name,
- "inputSourceModal": 3,
- "inputSourceChannel": input_source_map[platform],
- }
- ]
- generate_plan_response = await auto_bind_crawler_task_to_generate_task(
- crawler_task_list=new_crawler_task_list, generate_task_id=plan_id
- )
- await self.log_client.log(
- contents={
- "task": "article_pool_cold_start",
- "platform": platform,
- "crawl_method": crawl_method,
- "status": "success",
- "trace_id": self.trace_id,
- "message": "绑定至生成计划成功",
- "data": generate_plan_response,
- }
- )
- async def create_cold_start_plan(
- self,
- platform,
- plan_id,
- strategy="strategy_v1",
- category=None,
- crawl_method=None,
- ):
- # get article data_frame from meta article
- article_dataframe = await self.get_article_from_meta_table(
- platform, crawl_method, strategy, category
- )
- await self.log_client.log(
- contents={
- "task": "article_pool_cold_start",
- "platform": platform,
- "crawl_method": crawl_method,
- "status": "success",
- "trace_id": self.trace_id,
- "message": "获取文章成功",
- "data": {"article_length": article_dataframe.shape[0]},
- }
- )
- filter_article_df = await self.article_pool_filter(
- strategy, platform, article_dataframe, crawl_method, category
- )
- match strategy:
- case "strategy_v1":
- # split article into each category
- category_list = await task_apollo.get_config_value(key="category_list")
- for ai_category in category_list:
- filter_category_df = filter_article_df[
- filter_article_df["category_by_ai"] == ai_category
- ]
- url_list = filter_category_df["link"].values.tolist()
- if url_list:
- await self.create_crawler_plan_and_bind_to_produce_plan(
- strategy, crawl_method, ai_category, platform, url_list, plan_id
- )
- # change article status
- article_id_list = filter_category_df["article_id"].values.tolist()
- await self.change_article_status_while_publishing(
- article_id_list=article_id_list
- )
- case "strategy_v2":
- url_list = filter_article_df["link"].values.tolist()
- await self.create_crawler_plan_and_bind_to_produce_plan(
- strategy, crawl_method, category, platform, url_list, plan_id
- )
- # change article status
- article_id_list = filter_article_df["article_id"].values.tolist()
- await self.change_article_status_while_publishing(
- article_id_list=article_id_list
- )
- async def deal(
- self,
- platform: str,
- strategy="strategy_v1",
- crawl_methods=None,
- category_list=None,
- ) -> None:
- """execute cold start task in different strategy"""
- match strategy:
- case "strategy_v1":
- if not crawl_methods:
- crawl_methods = self.DEFAULT_CRAWLER_METHODS
- await self.log_client.log(
- contents={
- "task": "article_pool_cold_start",
- "platform": platform,
- "crawl_methods": crawl_methods,
- "status": "success",
- "trace_id": self.trace_id,
- }
- )
- crawl_methods_map = await task_apollo.get_config_value(
- key="category_cold_start_map"
- )
- for crawl_method in crawl_methods:
- try:
- plan_id = crawl_methods_map[crawl_method]
- affected_rows = await self.filter_published_titles(plan_id)
- await self.log_client.log(
- contents={
- "task": "article_pool_cold_start",
- "platform": platform,
- "crawl_method": crawl_method,
- "status": "success",
- "trace_id": self.trace_id,
- "message": "通过已抓取标题修改文章状态",
- "data": {"affected_rows": affected_rows},
- }
- )
- await self.create_cold_start_plan(
- platform=platform,
- plan_id=plan_id,
- crawl_method=crawl_method,
- )
- except Exception as e:
- await feishu_robot.bot(
- title="文章冷启动异常",
- detail={
- "crawl_method": crawl_method,
- "error": str(e),
- "function": "deal",
- "traceback": traceback.format_exc(),
- },
- )
- case "strategy_v2":
- if not category_list:
- category_list = list(category_config.keys())
- for category in tqdm(category_list):
- try:
- plan_id = category_config[category]
- affected_rows = await self.filter_published_titles(plan_id)
- await self.log_client.log(
- contents={
- "task": "article_pool_cold_start",
- "platform": platform,
- "category": category,
- "status": "success",
- "trace_id": self.trace_id,
- "message": "通过已抓取标题修改文章状态",
- "data": {"affected_rows": affected_rows},
- }
- )
- await self.create_cold_start_plan(
- platform=platform,
- strategy=strategy,
- plan_id=plan_id,
- category=category,
- )
- # todo add bot notify
- except Exception as e:
- await feishu_robot.bot(
- title="文章冷启动异常",
- detail={
- "category": category,
- "strategy": strategy,
- "error": str(e),
- "function": "deal",
- "traceback": traceback.format_exc(),
- },
- )
- if self.cold_start_records:
- columns = [
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="plain_text",
- sheet_name="category",
- display_name="文章品类",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number",
- sheet_name="cold_start_num",
- display_name="本次冷启数量",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number",
- sheet_name="total_length",
- display_name="总文章剩余数量",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number",
- sheet_name="filter_by_title_length",
- display_name="标题长度过滤",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number",
- sheet_name="filter_by_sensitivity",
- display_name="敏感词过滤",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number",
- sheet_name="filter_by_llm_sensitity",
- display_name="经过大模型判断敏感过滤",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number",
- sheet_name="filter_by_score",
- display_name="经过相关性分过滤",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number",
- sheet_name="read_avg_threshold",
- display_name="阅读均值倍数阈值",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number",
- sheet_name="read_threshold",
- display_name="阅读量阈值",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number",
- sheet_name="title_length_threshold",
- display_name="标题长度阈值",
- ),
- ]
- await feishu_robot.bot(
- title="长文文章路冷启动发布",
- detail={
- "columns": columns,
- "rows": self.cold_start_records,
- },
- table=True,
- mention=False,
- )
- case _:
- raise Exception(f"error strategy {strategy}")
|