luojunhui vor 1 Monat
Ursprung
Commit
47ed36be3b

+ 103 - 57
coldStartTasks/publish/basic.py

@@ -1,6 +1,7 @@
 """
 @author: luojunhui
 """
+
 import json
 import time
 import datetime
@@ -17,7 +18,9 @@ from config import apolloConfig
 const = ColdStartTaskConst()
 config = apolloConfig()
 
-category_cold_start_threshold = json.loads(config.getConfigValue("category_cold_start_threshold"))
+category_cold_start_threshold = json.loads(
+    config.getConfigValue("category_cold_start_threshold")
+)
 READ_TIMES_THRESHOLD = category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
 READ_THRESHOLD = category_cold_start_threshold.get("READ_THRESHOLD", 5000)
 LIMIT_TITLE_LENGTH = category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
@@ -37,21 +40,29 @@ def get_article_from_meta_table(db_client, category: str, platform: str) -> Data
             article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score
         from crawler_meta_article
         where category = "{category}" and platform = "{platform}" and title_sensitivity = {const.TITLE_NOT_SENSITIVE}
-        order score desc;
+        order by score desc;
     """
     article_list = db_client.fetch(sql)
     log(
         task="category_publish_task",
         function="get_articles_from_meta_table",
         message="获取品类文章总数",
-        data={
-            "total_articles": len(article_list),
-            "category": category
-        }
+        data={"total_articles": len(article_list), "category": category},
+    )
+    article_df = pd.DataFrame(
+        article_list,
+        columns=[
+            "article_id",
+            "gh_id",
+            "position",
+            "title",
+            "link",
+            "read_cnt",
+            "status",
+            "llm_sensitivity",
+            "score",
+        ],
     )
-    article_df = pd.DataFrame(article_list,
-                              columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status',
-                                       'llm_sensitivity', 'score'])
     return article_df
 
 
@@ -74,10 +85,16 @@ def update_published_articles_status(db_client) -> None:
                         where title in %s and status = %s;
                 """
                 affected_rows = db_client.save(
-                    sql=update_sql,
-                    params=(const.PUBLISHED_STATUS, tuple(title_list), const.INIT_STATUS)
+                    query=update_sql,
+                    params=(
+                        const.PUBLISHED_STATUS,
+                        tuple(title_list),
+                        const.INIT_STATUS,
+                    ),
+                )
+                processing_bar.set_postfix(
+                    {"category": category, "affected_rows": affected_rows}
                 )
-                processing_bar.set_postfix({"category": affected_rows})
         else:
             return
 
@@ -86,9 +103,11 @@ def filter_by_read_times(article_df: DataFrame) -> DataFrame:
     """
     filter by read times
     """
-    article_df['average_read'] = article_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
-    article_df['read_times'] = article_df['read_cnt'] / article_df['average_read']
-    filter_df = article_df[article_df['read_times'] >= READ_TIMES_THRESHOLD]
+    article_df["average_read"] = article_df.groupby(["gh_id", "position"])[
+        "read_cnt"
+    ].transform("mean")
+    article_df["read_times"] = article_df["read_cnt"] / article_df["average_read"]
+    filter_df = article_df[article_df["read_times"] >= READ_TIMES_THRESHOLD]
     return filter_df
 
 
@@ -96,7 +115,7 @@ def filter_by_status(article_df: DataFrame) -> DataFrame:
     """
     filter by status
     """
-    filter_df = article_df[article_df['status'] == const.INIT_STATUS]
+    filter_df = article_df[article_df["status"] == const.INIT_STATUS]
     return filter_df
 
 
@@ -104,7 +123,7 @@ def filter_by_read_cnt(article_df: DataFrame) -> DataFrame:
     """
     filter by read cnt
     """
-    filter_df = article_df[article_df['read_cnt'] >= READ_THRESHOLD]
+    filter_df = article_df[article_df["read_cnt"] >= READ_THRESHOLD]
     return filter_df
 
 
@@ -113,9 +132,9 @@ def filter_by_title_length(article_df: DataFrame) -> DataFrame:
     filter by title length
     """
     filter_df = article_df[
-        (article_df['title'].str.len() >= LIMIT_TITLE_LENGTH)
-        & (article_df['title'].str.len() <= TITLE_LENGTH_MAX)
-        ]
+        (article_df["title"].str.len() >= LIMIT_TITLE_LENGTH)
+        & (article_df["title"].str.len() <= TITLE_LENGTH_MAX)
+    ]
     return filter_df
 
 
@@ -124,19 +143,19 @@ def filter_by_sensitive_words(article_df: DataFrame) -> DataFrame:
     filter by sensitive words
     """
     filter_df = article_df[
-        (~article_df['title'].str.contains('农历'))
-        & (~article_df['title'].str.contains('太极'))
-        & (~article_df['title'].str.contains('节'))
-        & (~article_df['title'].str.contains('早上好'))
-        & (~article_df['title'].str.contains('赖清德'))
-        & (~article_df['title'].str.contains('普京'))
-        & (~article_df['title'].str.contains('俄'))
-        & (~article_df['title'].str.contains('南海'))
-        & (~article_df['title'].str.contains('台海'))
-        & (~article_df['title'].str.contains('解放军'))
-        & (~article_df['title'].str.contains('蔡英文'))
-        & (~article_df['title'].str.contains('中国'))
-        ]
+        (~article_df["title"].str.contains("农历"))
+        & (~article_df["title"].str.contains("太极"))
+        & (~article_df["title"].str.contains("节"))
+        & (~article_df["title"].str.contains("早上好"))
+        & (~article_df["title"].str.contains("赖清德"))
+        & (~article_df["title"].str.contains("普京"))
+        & (~article_df["title"].str.contains("俄"))
+        & (~article_df["title"].str.contains("南海"))
+        & (~article_df["title"].str.contains("台海"))
+        & (~article_df["title"].str.contains("解放军"))
+        & (~article_df["title"].str.contains("蔡英文"))
+        & (~article_df["title"].str.contains("中国"))
+    ]
     return filter_df
 
 
@@ -144,24 +163,24 @@ def filter_by_similarity_score(article_df: DataFrame, score) -> DataFrame:
     """
     filter by similarity score
     """
-    filter_df = article_df[article_df['score'] >= score]
+    filter_df = article_df[article_df["score"] >= score]
     return filter_df
 
 
-def insert_into_article_crawler_plan(db_client, crawler_plan_id, crawler_plan_name, create_timestamp):
+def insert_into_article_crawler_plan(
+    db_client, crawler_plan_id, crawler_plan_name, create_timestamp
+):
     """
     insert into article crawler plan
     """
     insert_sql = f"""
-                INSERT INTO article_crawler_plan
-                (crawler_plan_id, name, create_timestamp)
-                values 
-                (%s, %s, %s)
-            """
+        insert into article_crawler_plan (crawler_plan_id, name, create_timestamp)
+        values (%s, %s, %s);
+    """
     try:
         db_client.save(
             query=insert_sql,
-            params=(crawler_plan_id, crawler_plan_name, create_timestamp)
+            params=(crawler_plan_id, crawler_plan_name, create_timestamp),
         )
     except Exception as e:
         bot(
@@ -170,41 +189,49 @@ def insert_into_article_crawler_plan(db_client, crawler_plan_id, crawler_plan_na
                 "error": str(e),
                 "error_msg": traceback.format_exc(),
                 "crawler_plan_id": crawler_plan_id,
-                "crawler_plan_name": crawler_plan_name
-            }
+                "crawler_plan_name": crawler_plan_name,
+            },
         )
 
 
-def create_crawler_plan(db_client, url_list, plan_tag, platform):
+def create_crawler_plan(url_list, plan_tag, platform) -> tuple:
     """
     create crawler plan
     """
     crawler_plan_response = aiditApi.auto_create_crawler_task(
         plan_id=None,
-        plan_name="自动绑定-文章联想--{}--{}".format(datetime.date.today().__str__(), len(url_list)),
+        plan_name="自动绑定-文章联想--{}--{}".format(
+            datetime.date.today().__str__(), len(url_list)
+        ),
         plan_tag=plan_tag,
         article_source=platform,
-        url_list=url_list
+        url_list=url_list,
     )
     log(
         task="category_publish_task",
         function="publish_filter_articles",
         message="成功创建抓取计划",
-        data=crawler_plan_response
+        data=crawler_plan_response,
     )
     # save to db
     create_timestamp = int(time.time()) * 1000
-    crawler_plan_id = crawler_plan_response['data']['id']
-    crawler_plan_name = crawler_plan_response['data']['name']
-    insert_into_article_crawler_plan(db_client, crawler_plan_id, crawler_plan_name, create_timestamp)
+    crawler_plan_id = crawler_plan_response["data"]["id"]
+    crawler_plan_name = crawler_plan_response["data"]["name"]
+    return crawler_plan_id, crawler_plan_name, create_timestamp
 
-    bind_to_generate_plan(crawler_plan_id, crawler_plan_name, )
 
-
-def bind_to_generate_plan(crawler_plan_id, crawler_plan_name, input_source_channel):
+def bind_to_generate_plan(category, crawler_plan_id, crawler_plan_name, platform):
     """
     auto bind to generate plan
     """
+    match platform:
+        case "weixin":
+            input_source_channel = 5
+        case "toutiao":
+            input_source_channel = 6
+        case _:
+            input_source_channel = 5
+
     new_crawler_task_list = [
         {
             "contentType": 1,
@@ -214,17 +241,36 @@ def bind_to_generate_plan(crawler_plan_id, crawler_plan_name, input_source_chann
             "inputSourceValue": crawler_plan_id,
             "inputSourceLabel": crawler_plan_name,
             "inputSourceModal": 3,
-            "inputSourceChannel": input_source_channel
+            "inputSourceChannel": input_source_channel,
         }
     ]
     category_map = json.loads(config.getConfigValue("category_cold_start_map"))
     generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
-        crawler_task_list=new_crawler_task_list,
-        generate_task_id=category_map[category]
+        crawler_task_list=new_crawler_task_list, generate_task_id=category_map[category]
     )
     log(
         task="category_publish_task",
         function="publish_filter_articles",
         message="成功绑定到生成计划",
-        data=generate_plan_response
+        data=generate_plan_response,
     )
+
+
+def update_article_status_after_publishing(db_client, article_id_list):
+    """
+    update article status after publishing
+    """
+    update_sql = f"""
+        update crawler_meta_article
+        set status = %s
+        where article_id in %s and status = %s;
+    """
+    affect_rows = db_client.save(
+        query=update_sql,
+        params=(const.PUBLISHED_STATUS, tuple(article_id_list), const.INIT_STATUS),
+    )
+    if affect_rows != len(article_id_list):
+        bot(
+            title="品类冷启任务中,出现更新状文章状态失败异常",
+            detail={"affected_rows": affect_rows, "task_rows": len(article_id_list)},
+        )

+ 0 - 75
coldStartTasks/publish/publishArticleAssociationArticles.py

@@ -1,75 +0,0 @@
-"""
-@author: luojunhui
-"""
-import datetime
-from applications import bot
-from applications.db import DatabaseConnector
-from config import long_articles_config
-from coldStartTasks.publish.basic import *
-
-const = ColdStartTaskConst()
-
-
-class ArticleAssociationPublish(object):
-    """
-    publish i2i articles
-    """
-
-    def __init__(self):
-        self.db_client = DatabaseConnector(db_config=long_articles_config)
-        self.db_client.connect()
-
-    def filter_articles_before_create_plan(self, article_df: DataFrame) -> DataFrame:
-        """
-        filter articles before create plan
-        """
-        total_length = article_df.shape[0]
-
-        # filter by status
-        filter_df = filter_by_status(article_df)
-        filter_length0 = filter_df.shape[0]
-
-        # filter by sensitive words
-        filter_df = filter_by_sensitive_words(filter_df)
-        filter_length1 = filter_df.shape[0]
-
-        # filter by title length
-        filter_df = filter_by_title_length(filter_df)
-        filter_length2 = filter_df.shape[0]
-
-        bot(
-            title="文章联想任务,开始创建抓取计划",
-            detail={
-                "文章总数": total_length,
-                "发布状态过滤": "过滤: {}, 剩余: {}".format(total_length - filter_length0, filter_length0),
-                "敏感词过滤": "过滤: {}, 剩余: {}".format(filter_length0 - filter_length1, filter_length1),
-                "标题长度过滤": "过滤: {}, 剩余: {}".format(filter_length1 - filter_length2, filter_length2)
-            },
-            mention=False
-        )
-
-        return filter_df
-
-    def deal(self):
-        """
-        class entrance
-        """
-        # update published articles
-        update_published_articles_status(db_client=self.db_client)
-
-        # get data from meta table
-        article_dataframe = get_article_from_meta_table(db_client=self.db_client, category='article_association',
-                                                        platform='weixin')
-
-        # fileter articles
-        filter_dataframe = self.filter_articles_before_create_plan(article_dataframe)
-
-        # create crawler plan
-        url_list = filter_dataframe['link'].values.tolist()
-        if url_list:
-            # create_crawler_plan
-            create_crawler_plan(db_client=self.db_client, url_list=url_list, plan_tag='article_association', platform='weixin')
-
-            # change article status
-            article_id_list = filtered_articles_df['article_id'].values.tolist()
-            self.change_article_status_while_publishing(article_id_list=article_id_list)

+ 125 - 0
coldStartTasks/publish/publish_article_association_articles.py

@@ -0,0 +1,125 @@
+"""
+@author: luojunhui
+"""
+
+from pandas import DataFrame
+
+from applications import bot
+from applications.const import ColdStartTaskConst
+from applications.db import DatabaseConnector
+from config import long_articles_config
+
+from coldStartTasks.publish.basic import filter_by_status
+from coldStartTasks.publish.basic import filter_by_sensitive_words
+from coldStartTasks.publish.basic import filter_by_title_length
+from coldStartTasks.publish.basic import update_published_articles_status
+from coldStartTasks.publish.basic import get_article_from_meta_table
+from coldStartTasks.publish.basic import update_article_status_after_publishing
+from coldStartTasks.publish.basic import create_crawler_plan
+from coldStartTasks.publish.basic import insert_into_article_crawler_plan
+from coldStartTasks.publish.basic import bind_to_generate_plan
+
+const = ColdStartTaskConst()
+
+
+def filter_articles_before_create_plan(article_df: DataFrame) -> DataFrame:
+    """
+    filter articles before create plan
+    """
+    total_length = article_df.shape[0]
+
+    # filter by status
+    filter_df = filter_by_status(article_df)
+    filter_length0 = filter_df.shape[0]
+
+    # filter by sensitive words
+    filter_df = filter_by_sensitive_words(filter_df)
+    filter_length1 = filter_df.shape[0]
+
+    # filter by title length
+    filter_df = filter_by_title_length(filter_df)
+    filter_length2 = filter_df.shape[0]
+
+    bot(
+        title="文章联想任务,开始创建抓取计划",
+        detail={
+            "文章总数": total_length,
+            "发布状态过滤": "过滤: {}, 剩余: {}".format(
+                total_length - filter_length0, filter_length0
+            ),
+            "敏感词过滤": "过滤: {}, 剩余: {}".format(
+                filter_length0 - filter_length1, filter_length1
+            ),
+            "标题长度过滤": "过滤: {}, 剩余: {}".format(
+                filter_length1 - filter_length2, filter_length2
+            ),
+        },
+        mention=False,
+    )
+
+    return filter_df
+
+
+class ArticleAssociationPublish(object):
+    """
+    publish i2i articles
+    """
+
+    def __init__(self):
+        self.db_client = DatabaseConnector(db_config=long_articles_config)
+        self.db_client.connect()
+
+    def deal(self):
+        """
+        class entrance
+        """
+        # update published articles
+        update_published_articles_status(db_client=self.db_client)
+
+        # get data from meta table
+        article_dataframe = get_article_from_meta_table(
+            db_client=self.db_client, category="article_association", platform="weixin"
+        )
+
+        # fileter articles
+        filter_dataframe = filter_articles_before_create_plan(article_dataframe)
+
+        # create crawler plan
+        url_list = filter_dataframe["link"].values.tolist()
+        if url_list:
+            crawler_plan_id, crawler_plan_name, create_timestamp = create_crawler_plan(
+                url_list=url_list, plan_tag="article_association", platform="weixin"
+            )
+
+            # insert crawler plan
+            insert_into_article_crawler_plan(
+                db_client=self.db_client,
+                crawler_plan_id=crawler_plan_id,
+                crawler_plan_name=crawler_plan_name,
+                create_timestamp=create_timestamp,
+            )
+
+            # bind to generate plan
+            bind_to_generate_plan(
+                category="article_association",
+                crawler_plan_id=crawler_plan_id,
+                crawler_plan_name=crawler_plan_name,
+                platform="weixin",
+            )
+
+            # update status
+            article_id_list = filter_dataframe["article_id"].values.tolist()
+            update_article_status_after_publishing(
+                db_client=self.db_client, article_id_list=article_id_list
+            )
+
+            bot(
+                title="文章联想任务,创建抓取计划成功",
+                detail={
+                    "抓取计划id": crawler_plan_id,
+                    "抓取计划名称": crawler_plan_name,
+                    "抓取条数": len(url_list),
+                    "冷启动类型": "article_association",
+                },
+                mention=False,
+            )