luojunhui 8 months ago
parent
commit
2aae07ff7a

+ 230 - 0
coldStartTasks/publish/basic.py

@@ -0,0 +1,230 @@
+"""
+@author: luojunhui
+"""
+import json
+import time
+import datetime
+import pandas as pd
+import traceback
+
+from pandas import DataFrame
+from tqdm import tqdm
+
+from applications import log, aiditApi, bot
+from applications.const import ColdStartTaskConst
+from config import apolloConfig
+
+const = ColdStartTaskConst()
+config = apolloConfig()
+
+category_cold_start_threshold = json.loads(config.getConfigValue("category_cold_start_threshold"))
+READ_TIMES_THRESHOLD = category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
+READ_THRESHOLD = category_cold_start_threshold.get("READ_THRESHOLD", 5000)
+LIMIT_TITLE_LENGTH = category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
+TITLE_LENGTH_MAX = category_cold_start_threshold.get("TITLE_LENGTH_MAX", 50)
+
+
+def get_article_from_meta_table(db_client, category: str, platform: str) -> DataFrame:
+    """
+    get article from meta data
+    :param db_client: database connector
+    :param category: article category
+    :param platform: article platform
+    :return: article dataframe
+    """
+    sql = f"""
+        select 
+            article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score
+        from crawler_meta_article
+        where category = "{category}" and platform = "{platform}" and title_sensitivity = {const.TITLE_NOT_SENSITIVE}
+        order score desc;
+    """
+    article_list = db_client.fetch(sql)
+    log(
+        task="category_publish_task",
+        function="get_articles_from_meta_table",
+        message="获取品类文章总数",
+        data={
+            "total_articles": len(article_list),
+            "category": category
+        }
+    )
+    article_df = pd.DataFrame(article_list,
+                              columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status',
+                                       'llm_sensitivity', 'score'])
+    return article_df
+
+
+def update_published_articles_status(db_client) -> None:
+    """
+    filter published articles
+    """
+    category_map = json.loads(config.getConfigValue("category_cold_start_map"))
+    category_list = list(category_map.keys())
+    processing_bar = tqdm(category_list, desc="fileter_published_articles")
+    for category in processing_bar:
+        plan_id = category_map.get(category)
+        if plan_id:
+            article_list = aiditApi.get_generated_article_list(plan_id)
+            title_list = [i[1] for i in article_list]
+            if title_list:
+                update_sql = f"""
+                        update crawler_meta_article
+                        set status = %s 
+                        where title in %s and status = %s;
+                """
+                affected_rows = db_client.save(
+                    sql=update_sql,
+                    params=(const.PUBLISHED_STATUS, tuple(title_list), const.INIT_STATUS)
+                )
+                processing_bar.set_postfix({"category": affected_rows})
+        else:
+            return
+
+
+def filter_by_read_times(article_df: DataFrame) -> DataFrame:
+    """
+    filter by read times
+    """
+    article_df['average_read'] = article_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
+    article_df['read_times'] = article_df['read_cnt'] / article_df['average_read']
+    filter_df = article_df[article_df['read_times'] >= READ_TIMES_THRESHOLD]
+    return filter_df
+
+
+def filter_by_status(article_df: DataFrame) -> DataFrame:
+    """
+    filter by status
+    """
+    filter_df = article_df[article_df['status'] == const.INIT_STATUS]
+    return filter_df
+
+
+def filter_by_read_cnt(article_df: DataFrame) -> DataFrame:
+    """
+    filter by read cnt
+    """
+    filter_df = article_df[article_df['read_cnt'] >= READ_THRESHOLD]
+    return filter_df
+
+
+def filter_by_title_length(article_df: DataFrame) -> DataFrame:
+    """
+    filter by title length
+    """
+    filter_df = article_df[
+        (article_df['title'].str.len() >= LIMIT_TITLE_LENGTH)
+        & (article_df['title'].str.len() <= TITLE_LENGTH_MAX)
+        ]
+    return filter_df
+
+
+def filter_by_sensitive_words(article_df: DataFrame) -> DataFrame:
+    """
+    filter by sensitive words
+    """
+    filter_df = article_df[
+        (~article_df['title'].str.contains('农历'))
+        & (~article_df['title'].str.contains('太极'))
+        & (~article_df['title'].str.contains('节'))
+        & (~article_df['title'].str.contains('早上好'))
+        & (~article_df['title'].str.contains('赖清德'))
+        & (~article_df['title'].str.contains('普京'))
+        & (~article_df['title'].str.contains('俄'))
+        & (~article_df['title'].str.contains('南海'))
+        & (~article_df['title'].str.contains('台海'))
+        & (~article_df['title'].str.contains('解放军'))
+        & (~article_df['title'].str.contains('蔡英文'))
+        & (~article_df['title'].str.contains('中国'))
+        ]
+    return filter_df
+
+
+def filter_by_similarity_score(article_df: DataFrame, score) -> DataFrame:
+    """
+    filter by similarity score
+    """
+    filter_df = article_df[article_df['score'] >= score]
+    return filter_df
+
+
+def insert_into_article_crawler_plan(db_client, crawler_plan_id, crawler_plan_name, create_timestamp):
+    """
+    insert into article crawler plan
+    """
+    insert_sql = f"""
+                INSERT INTO article_crawler_plan
+                (crawler_plan_id, name, create_timestamp)
+                values 
+                (%s, %s, %s)
+            """
+    try:
+        db_client.save(
+            query=insert_sql,
+            params=(crawler_plan_id, crawler_plan_name, create_timestamp)
+        )
+    except Exception as e:
+        bot(
+            title="品类冷启任务,记录抓取计划id失败",
+            detail={
+                "error": str(e),
+                "error_msg": traceback.format_exc(),
+                "crawler_plan_id": crawler_plan_id,
+                "crawler_plan_name": crawler_plan_name
+            }
+        )
+
+
+def create_crawler_plan(db_client, url_list, plan_tag, platform):
+    """
+    create crawler plan
+    """
+    crawler_plan_response = aiditApi.auto_create_crawler_task(
+        plan_id=None,
+        plan_name="自动绑定-文章联想--{}--{}".format(datetime.date.today().__str__(), len(url_list)),
+        plan_tag=plan_tag,
+        article_source=platform,
+        url_list=url_list
+    )
+    log(
+        task="category_publish_task",
+        function="publish_filter_articles",
+        message="成功创建抓取计划",
+        data=crawler_plan_response
+    )
+    # save to db
+    create_timestamp = int(time.time()) * 1000
+    crawler_plan_id = crawler_plan_response['data']['id']
+    crawler_plan_name = crawler_plan_response['data']['name']
+    insert_into_article_crawler_plan(db_client, crawler_plan_id, crawler_plan_name, create_timestamp)
+
+    bind_to_generate_plan(crawler_plan_id, crawler_plan_name, )
+
+
+def bind_to_generate_plan(crawler_plan_id, crawler_plan_name, input_source_channel):
+    """
+    auto bind to generate plan
+    """
+    new_crawler_task_list = [
+        {
+            "contentType": 1,
+            "inputSourceType": 2,
+            "inputSourceSubType": None,
+            "fieldName": None,
+            "inputSourceValue": crawler_plan_id,
+            "inputSourceLabel": crawler_plan_name,
+            "inputSourceModal": 3,
+            "inputSourceChannel": input_source_channel
+        }
+    ]
+    category_map = json.loads(config.getConfigValue("category_cold_start_map"))
+    generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
+        crawler_task_list=new_crawler_task_list,
+        generate_task_id=category_map[category]
+    )
+    log(
+        task="category_publish_task",
+        function="publish_filter_articles",
+        message="成功绑定到生成计划",
+        data=generate_plan_response
+    )

+ 72 - 1
coldStartTasks/publish/publishArticleAssociationArticles.py

@@ -1,4 +1,75 @@
 """
 """
 @author: luojunhui
 @author: luojunhui
-发布i2i文章
 """
 """
+import datetime
+from applications import bot
+from applications.db import DatabaseConnector
+from config import long_articles_config
+from coldStartTasks.publish.basic import *
+
+const = ColdStartTaskConst()
+
+
+class ArticleAssociationPublish(object):
+    """
+    publish i2i articles
+    """
+
+    def __init__(self):
+        self.db_client = DatabaseConnector(db_config=long_articles_config)
+        self.db_client.connect()
+
+    def filter_articles_before_create_plan(self, article_df: DataFrame) -> DataFrame:
+        """
+        filter articles before create plan
+        """
+        total_length = article_df.shape[0]
+
+        # filter by status
+        filter_df = filter_by_status(article_df)
+        filter_length0 = filter_df.shape[0]
+
+        # filter by sensitive words
+        filter_df = filter_by_sensitive_words(filter_df)
+        filter_length1 = filter_df.shape[0]
+
+        # filter by title length
+        filter_df = filter_by_title_length(filter_df)
+        filter_length2 = filter_df.shape[0]
+
+        bot(
+            title="文章联想任务,开始创建抓取计划",
+            detail={
+                "文章总数": total_length,
+                "发布状态过滤": "过滤: {}, 剩余: {}".format(total_length - filter_length0, filter_length0),
+                "敏感词过滤": "过滤: {}, 剩余: {}".format(filter_length0 - filter_length1, filter_length1),
+                "标题长度过滤": "过滤: {}, 剩余: {}".format(filter_length1 - filter_length2, filter_length2)
+            },
+            mention=False
+        )
+
+        return filter_df
+
+    def deal(self):
+        """
+        class entrance
+        """
+        # update published articles
+        update_published_articles_status(db_client=self.db_client)
+
+        # get data from meta table
+        article_dataframe = get_article_from_meta_table(db_client=self.db_client, category='article_association',
+                                                        platform='weixin')
+
+        # fileter articles
+        filter_dataframe = self.filter_articles_before_create_plan(article_dataframe)
+
+        # create crawler plan
+        url_list = filter_dataframe['link'].values.tolist()
+        if url_list:
+            # create_crawler_plan
+            create_crawler_plan(db_client=self.db_client, url_list=url_list, plan_tag='article_association', platform='weixin')
+
+            # change article status
+            article_id_list = filtered_articles_df['article_id'].values.tolist()
+            self.change_article_status_while_publishing(article_id_list=article_id_list)