Przeglądaj źródła

auto publish toutiao articles

luojunhui 3 miesięcy temu
rodzic
commit
8c87375b97

+ 20 - 7
account_cold_start_daily.py

@@ -64,16 +64,18 @@ class AccountColdStartDailyTask(object):
                 }
             )
 
-    def publish_task(self, category_list):
+    def publish_task(self, category_list, article_source):
         """
         将账号文章发布到aigc抓取计划,并且绑定生成计划
-        :param category_list:
+        :param category_list:  文章品类
+        :param article_source: 文章来源(toutiao or weixin)
         :return:
         """
         try:
             weixin_category_publisher = CategoryColdStartTask(db_client=self.db_client)
             weixin_category_publisher.do_job(
-                category_list=category_list
+                category_list=category_list,
+                article_source=article_source
             )
             bot(
                 title="账号冷启任务,发布完成",
@@ -85,7 +87,7 @@ class AccountColdStartDailyTask(object):
             )
         except Exception as e:
             bot(
-                title="账号发布冷启动任务,抓取失败",
+                title="账号发布冷启动任务,发布失败",
                 detail={
                     "error": str(e),
                     "error_msg": traceback.format_exc()
@@ -93,7 +95,7 @@ class AccountColdStartDailyTask(object):
             )
 
 
-def main(category_list=None):
+def main(category_list=None, article_source=None):
     """
     main job, use crontab to do job daily
     todo: 1. 开放一个输入可以输入指定品类  2. 增加对指定账号的抓取&&发布
@@ -101,11 +103,22 @@ def main(category_list=None):
     """
     if not category_list:
         category_list = DEFAULT_CATEGORY_LIST
+    if not article_source:
+        article_source = 'weixin'
     task = AccountColdStartDailyTask()
     if task.init_db():
-        task.crawler_task(category_list=category_list)
-        task.publish_task(category_list=category_list)
+        if article_source == 'weixin':
+            task.crawler_task(category_list=category_list)
+
+        task.publish_task(category_list=category_list, article_source=article_source)
 
 
 if __name__ == '__main__':
+    # 执行微信抓取发布
     main()
+
+    # 执行头条发布
+    main(
+        category_list=['history', 'tech', 'finance', 'entertainment'],
+        article_source='toutiao'
+    )

+ 11 - 2
applications/aiditApi.py

@@ -127,15 +127,23 @@ def get_publish_account_from_aigc():
     return info_list
 
 
-def auto_create_crawler_task(plan_id, plan_name, plan_tag, url_list):
+def auto_create_crawler_task(plan_id, plan_name, plan_tag, url_list, article_source):
     """
     通过 url 自动创建抓取计划
     :param plan_id: 计划 id, 若往已经存在的 plan_id 中加文章则需要传,否则会新生成一个 id
     :param plan_name: 计划名称
     :param plan_tag: 计划标签
     :param url_list: 输入的 url_list
+    :param article_source: 文章来源
     :return:
     """
+    match article_source:
+        case "toutiao":
+            channel = 6
+        case "weixin":
+            channel = 5
+        case _:
+            return
     url = "http://aigc-api.cybertogether.net/aigc/crawler/plan/save"
     payload = json.dumps({
         "params": {
@@ -157,7 +165,7 @@ def auto_create_crawler_task(plan_id, plan_name, plan_tag, url_list):
             "tagPenetrateFlag": 0,
             "id": plan_id,
             "name": plan_name,
-            "channel": 5,
+            "channel": channel,
             "crawlerMode": 5,
             "inputModeValues": url_list,
             "modePublishTimeStart": None,
@@ -187,6 +195,7 @@ def bind_crawler_task_to_generate_task(crawler_task_list, generate_task_id):
     生成计划已经存在
     :crawler_task_list: 要输入的抓取计划List
     :generate_task_id: 目标生成计划 id
+    :article_source: 账号类型
     :return: response
     """
     url = "http://aigc-api.cybertogether.net/aigc/produce/plan/save"

+ 50 - 25
coldStartTasks/publish/publishCategoryArticles.py

@@ -74,7 +74,7 @@ class CategoryColdStartTask(object):
                 }
             )
 
-    def get_articles_from_meta_table(self, category):
+    def get_articles_from_meta_table(self, category, article_source):
         """
         从长文 meta 库中获取冷启文章
         :return:
@@ -85,7 +85,7 @@ class CategoryColdStartTask(object):
         FROM
             crawler_meta_article
         WHERE 
-            category = "{category}";
+            category = "{category}" and platform = "{article_source}" and status = 1;
         """
         article_list = self.db_client.select(sql)
         log(
@@ -153,12 +153,9 @@ class CategoryColdStartTask(object):
                 }
             )
 
-    def publish_filter_articles(self, category, articles_df):
+    def filter_weixin_articles(self, articles_df, category):
         """
-        过滤文章
-        :param category:
-        :param articles_df:
-        :return:
+        微信抓取文章过滤漏斗
         """
         articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
         articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
@@ -174,13 +171,13 @@ class CategoryColdStartTask(object):
         # 第二层漏斗通过阅读量过滤
         second_level_funnel_df = first_level_funnel_df[
             first_level_funnel_df['read_cnt'] >= self.READ_THRESHOLD
-        ]
+            ]
         second_level_funnel_length = second_level_funnel_df.shape[0]
 
         # 第三层漏斗通过标题长度过滤
         third_level_funnel_df = second_level_funnel_df[
             second_level_funnel_df['title'].str.len() >= self.LIMIT_TITLE_LENGTH
-        ]
+            ]
         third_level_funnel_length = third_level_funnel_df.shape[0]
 
         # 最后一层通过敏感词过滤
@@ -199,7 +196,6 @@ class CategoryColdStartTask(object):
             & (~third_level_funnel_df['title'].str.contains('中国'))
             ]
         final_length = filter_df.shape[0]
-        url_list = filter_df['link'].values.tolist()
         log(
             task="category_publish_task",
             function="publish_filter_articles",
@@ -213,11 +209,16 @@ class CategoryColdStartTask(object):
             title="冷启任务发布通知",
             detail={
                 "总文章数量": total_length,
-                "通过已经发布状态过滤": "过滤数量: {}    剩余数量: {}".format(total_length - zero_level_funnel_length, zero_level_funnel_length),
-                "通过阅读均值倍数过滤": "过滤数量: {}    剩余数量: {}".format(zero_level_funnel_length - first_level_funnel_length, first_level_funnel_length),
-                "通过阅读量过滤": "过滤数量: {}    剩余数量: {}".format(first_level_funnel_length - second_level_funnel_length, second_level_funnel_length),
-                "通过标题长度过滤": "过滤数量: {}    剩余数量: {}".format(second_level_funnel_length - third_level_funnel_length, third_level_funnel_length),
-                "通过敏感词过滤":  "过滤数量: {}    剩余数量: {}".format(third_level_funnel_length - final_length, final_length),
+                "通过已经发布状态过滤": "过滤数量: {}    剩余数量: {}".format(total_length - zero_level_funnel_length,
+                                                                              zero_level_funnel_length),
+                "通过阅读均值倍数过滤": "过滤数量: {}    剩余数量: {}".format(
+                    zero_level_funnel_length - first_level_funnel_length, first_level_funnel_length),
+                "通过阅读量过滤": "过滤数量: {}    剩余数量: {}".format(
+                    first_level_funnel_length - second_level_funnel_length, second_level_funnel_length),
+                "通过标题长度过滤": "过滤数量: {}    剩余数量: {}".format(
+                    second_level_funnel_length - third_level_funnel_length, third_level_funnel_length),
+                "通过敏感词过滤": "过滤数量: {}    剩余数量: {}".format(third_level_funnel_length - final_length,
+                                                                        final_length),
                 "品类": category,
                 "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
                 "阅读量阈值": self.READ_THRESHOLD,
@@ -225,12 +226,34 @@ class CategoryColdStartTask(object):
             },
             mention=False
         )
+        return filter_df
+
+    def publish_filter_articles(self, category, articles_df, article_source):
+        """
+        过滤文章
+        :param category: 文章品类
+        :param articles_df: 该品类下的文章data_frame
+        :param article_source: 文章来源
+        :return:
+        """
+        match article_source:
+            case "weixin":
+                filtered_articles_df = self.filter_weixin_articles(articles_df, category)
+                input_source_channel = 5
+            case "toutiao":
+                filtered_articles_df = articles_df
+                input_source_channel = 6
+            case _:
+                return
+
+        url_list = filtered_articles_df['link'].values.tolist()
         if url_list:
             # create_crawler_plan
             crawler_plan_response = aiditApi.auto_create_crawler_task(
                 plan_id=None,
                 plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)),
                 plan_tag="品类冷启动",
+                article_source=article_source,
                 url_list=url_list
             )
             log(
@@ -239,7 +262,6 @@ class CategoryColdStartTask(object):
                 message="成功创建抓取计划",
                 data=crawler_plan_response
             )
-
             # save to db
             create_timestamp = int(time.time()) * 1000
             crawler_plan_id = crawler_plan_response['data']['id']
@@ -253,10 +275,10 @@ class CategoryColdStartTask(object):
                     "inputSourceType": 2,
                     "inputSourceSubType": None,
                     "fieldName": None,
-                    "inputSourceValue": crawler_plan_response['data']['id'],
-                    "inputSourceLabel": crawler_plan_response['data']['name'],
+                    "inputSourceValue": crawler_plan_id,
+                    "inputSourceLabel": crawler_plan_name,
                     "inputSourceModal": 3,
-                    "inputSourceChannel": 5
+                    "inputSourceChannel": input_source_channel
                 }
             ]
             generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
@@ -271,16 +293,17 @@ class CategoryColdStartTask(object):
             )
 
             # change article status
-            article_id_list = filter_df['article_id'].values.tolist()
+            article_id_list = articles_df['article_id'].values.tolist()
             self.change_article_status_while_publishing(article_id_list=article_id_list)
 
-    def do_job(self, category_list=None):
+    def do_job(self, article_source, category_list=None):
         """
         执行任务
         :return:
         """
         if not category_list:
             category_list = self.category_map.keys()
+
         log(
             task="category_publish_task",
             function="do_job",
@@ -291,10 +314,11 @@ class CategoryColdStartTask(object):
         )
         for category in category_list:
             try:
-                category_df = self.get_articles_from_meta_table(category=category)
+                category_df = self.get_articles_from_meta_table(category=category, article_source=article_source)
                 self.publish_filter_articles(
                     category=category,
-                    articles_df=category_df
+                    articles_df=category_df,
+                    article_source=article_source
                 )
             except Exception as e:
                 bot(
@@ -302,6 +326,7 @@ class CategoryColdStartTask(object):
                     detail={
                         "category": category,
                         "error": str(e),
-                        "function": "do_job"
+                        "function": "do_job",
+                        "traceback": traceback.format_exc()
                     }
-                )
+                )