浏览代码

Merge branch '2025-05-08-add-category-to-cold-start-tasks' of luojunhui/LongArticlesJob into master

luojunhui 5 月之前
父节点
当前提交
fab1cbde13

+ 2 - 1
applications/api/__init__.py

@@ -8,4 +8,5 @@ from .moon_shot_api import fetch_moon_shot_response
 from .nlp_api import similarity_between_title_list
 from .nlp_api import similarity_between_title_list
 from .gewe_api import WechatChannelAPI
 from .gewe_api import WechatChannelAPI
 from .google_ai_api import GoogleAIAPI
 from .google_ai_api import GoogleAIAPI
-from .piaoquan_api import fetch_piaoquan_video_list_detail
+from .piaoquan_api import fetch_piaoquan_video_list_detail
+from .feishu_api import FeishuBotApi, FeishuSheetApi

+ 137 - 0
applications/api/feishu_api.py

@@ -1,3 +1,4 @@
+import json
 import requests
 import requests
 
 
 
 
@@ -5,6 +6,11 @@ class Feishu:
     def __init__(self):
     def __init__(self):
         self.token = None
         self.token = None
         self.headers = {"Content-Type": "application/json"}
         self.headers = {"Content-Type": "application/json"}
+        self.mention_all = {
+            "content": "<at id=all></at>\n",
+            "tag": "lark_md",
+        }
+        self.not_mention = {}
 
 
     def fetch_token(self):
     def fetch_token(self):
         url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
         url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
@@ -52,3 +58,134 @@ class FeishuSheetApi(Feishu):
             "PUT", url=insert_value_url, headers=headers, json=body
             "PUT", url=insert_value_url, headers=headers, json=body
         )
         )
         print(response.json())
         print(response.json())
+
+
+class FeishuBotApi(Feishu):
+
+    @classmethod
+    def create_feishu_columns_sheet(
+        cls,
+        sheet_type,
+        sheet_name,
+        display_name,
+        width="auto",
+        vertical_align="top",
+        horizontal_align="left",
+        number_format=None,
+    ):
+        match sheet_type:
+            case "plain_text":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "width": width,
+                    "data_type": "text",
+                    "vertical_align": vertical_align,
+                    "horizontal_align": horizontal_align,
+                }
+
+            case "lark_md":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "data_type": "lark_md",
+                }
+
+            case "number":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "data_type": "number",
+                    "format": number_format,
+                    "width": width,
+                }
+
+            case "date":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "data_type": "date",
+                    "date_format": "YYYY/MM/DD",
+                }
+
+            case "options":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "data_type": "options",
+                }
+
+            case _:
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "width": width,
+                    "data_type": "text",
+                    "vertical_align": vertical_align,
+                    "horizontal_align": horizontal_align,
+                }
+
+    # 表格形式
+    def create_feishu_table(self, title, columns, rows, mention):
+        table_base = {
+            "header": {
+                "template": "blue",
+                "title": {"content": title, "tag": "plain_text"},
+            },
+            "elements": [
+                self.mention_all if mention else self.not_mention,
+                {
+                    "tag": "table",
+                    "page_size": len(rows) + 1,
+                    "row_height": "low",
+                    "header_style": {
+                        "text_align": "left",
+                        "text_size": "normal",
+                        "background_style": "grey",
+                        "text_color": "default",
+                        "bold": True,
+                        "lines": 1,
+                    },
+                    "columns": columns,
+                    "rows": rows,
+                },
+            ],
+        }
+        return table_base
+
+    # bot
+    def bot(self, title, detail, mention=True, table=False, env="prod"):
+        if env == "prod":
+            url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
+        else:
+            url = "https://open.feishu.cn/open-apis/bot/v2/hook/f32c0456-847f-41f3-97db-33fcc1616bcd"
+        headers = {"Content-Type": "application/json"}
+        if table:
+            card = self.create_feishu_table(
+                title=title,
+                columns=detail["columns"],
+                rows=detail["rows"],
+                mention=mention,
+            )
+        else:
+            card = {
+                "elements": [
+                    {
+                        "tag": "div",
+                        "text": self.mention_all if mention else self.not_mention,
+                    },
+                    {
+                        "tag": "div",
+                        "text": {
+                            "content": json.dumps(detail, ensure_ascii=False, indent=4),
+                            "tag": "lark_md",
+                        },
+                    },
+                ],
+                "header": {"title": {"content": title, "tag": "plain_text"}},
+            }
+        payload = {"msg_type": "interactive", "card": card}
+        res = requests.request(
+            "POST", url=url, headers=headers, data=json.dumps(payload), timeout=10
+        )
+        return res

+ 3 - 0
applications/const/__init__.py

@@ -431,6 +431,9 @@ class SingleVideoPoolPublishTaskConst:
     TRANSFORM_SUCCESS_STATUS = 1
     TRANSFORM_SUCCESS_STATUS = 1
     TRANSFORM_FAIL_STATUS = 99
     TRANSFORM_FAIL_STATUS = 99
 
 
+    SUCCESS_STATUS = 2
+
+
 
 
 class GoogleVideoUnderstandTaskConst:
 class GoogleVideoUnderstandTaskConst:
     # task batch size
     # task batch size

+ 142 - 72
cold_start/publish/publish_single_video_pool_videos.py

@@ -5,15 +5,23 @@ import traceback
 from pymysql.cursors import DictCursor
 from pymysql.cursors import DictCursor
 from tqdm import tqdm
 from tqdm import tqdm
 
 
-from applications import bot, aiditApi
+from applications import aiditApi
+from applications.api import ApolloApi, FeishuBotApi
 from applications.const import SingleVideoPoolPublishTaskConst
 from applications.const import SingleVideoPoolPublishTaskConst
 from applications.db import DatabaseConnector
 from applications.db import DatabaseConnector
-from config import long_articles_config, apolloConfig
+from config import long_articles_config
 
 
-config = apolloConfig()
+# init
+apollo_api = ApolloApi(env="prod")
+feishu_bot_api = FeishuBotApi()
 const = SingleVideoPoolPublishTaskConst()
 const = SingleVideoPoolPublishTaskConst()
 
 
-video_pool_config = json.loads(config.getConfigValue(key="video_pool_publish_config"))
+# get config information from apollo
+video_pool_config = json.loads(
+    apollo_api.get_config_value(key="video_pool_publish_config")
+)
+video_category_list = json.loads(apollo_api.get_config_value(key="category_list"))
+platform_list = list(video_pool_config.keys())
 
 
 
 
 class PublishSingleVideoPoolVideos:
 class PublishSingleVideoPoolVideos:
@@ -21,97 +29,159 @@ class PublishSingleVideoPoolVideos:
         self.db_client = DatabaseConnector(db_config=long_articles_config)
         self.db_client = DatabaseConnector(db_config=long_articles_config)
         self.db_client.connect()
         self.db_client.connect()
 
 
-    def get_task_list(self, platform:str) -> list[dict]:
-        daily_limit = video_pool_config[platform]['process_num_each_day']
+    def get_task_list(self, platform: str) -> list[dict]:
+        daily_limit = video_pool_config[platform]["process_num_each_day"]
         fetch_query = f"""
         fetch_query = f"""
-            select id, content_trace_id, pq_vid
-            from single_video_transform_queue
-            where status = {const.TRANSFORM_INIT_STATUS} and platform = '{platform}'
+            select t1.id, t1.content_trace_id, t1.pq_vid, t1.score, t2.category
+            from single_video_transform_queue t1
+            join publish_single_video_source t2 on t1.content_trace_id = t2.content_trace_id
+            where 
+                t1.status = {const.TRANSFORM_INIT_STATUS} 
+                and t1.platform = '{platform}' 
+                and t2.category_status = {const.SUCCESS_STATUS}
             order by score desc
             order by score desc
             limit {daily_limit};
             limit {daily_limit};
         """
         """
         fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
         fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
         return fetch_response
         return fetch_response
 
 
-    def update_tasks_status(self,
-                           task_id_tuple: tuple,
-                           ori_status: int,
-                           new_status: int)-> int:
+    def update_tasks_status(
+        self, task_id_tuple: tuple, ori_status: int, new_status: int
+    ) -> int:
         update_query = f"""
         update_query = f"""
             update single_video_transform_queue
             update single_video_transform_queue
             set status = %s
             set status = %s
             where id in %s and status = %s;
             where id in %s and status = %s;
         """
         """
         affected_rows = self.db_client.save(
         affected_rows = self.db_client.save(
-            query=update_query,
-            params=(new_status, task_id_tuple, ori_status)
+            query=update_query, params=(new_status, task_id_tuple, ori_status)
         )
         )
         return affected_rows
         return affected_rows
 
 
+    def create_crawler_plan(
+        self, vid_list: list, platform: str, task_id_tuple: tuple, category: str
+    ) -> None:
+        try:
+            # create video crawler plan
+            plan_name = f"{video_pool_config[platform]['nick_name']}-{category}-{datetime.datetime.today().strftime('%Y-%m-%d')}-视频数量: {len(vid_list)}"
+            crawler_plan_response = aiditApi.auto_create_single_video_crawler_task(
+                plan_name=plan_name,
+                plan_tag="单视频供给冷启动",
+                video_id_list=vid_list,
+            )
+            crawler_plan_id = crawler_plan_response["data"]["id"]
+            crawler_plan_name = crawler_plan_response["data"]["name"]
+
+            # bind crawler plan to generate plan
+            crawler_task_list = [
+                {
+                    "contentType": 1,
+                    "inputSourceModal": 4,
+                    "inputSourceChannel": 10,
+                    "inputSourceType": 2,
+                    "inputSourceValue": crawler_plan_id,
+                    "inputSourceSubType": None,
+                    "fieldName": None,
+                    "inputSourceLabel": "原始帖子-视频-票圈小程序-内容添加计划-{}".format(
+                        crawler_plan_name
+                    ),
+                }
+            ]
+            generate_plan_id = video_pool_config[platform]["generate_plan_id"]
+            aiditApi.bind_crawler_task_to_generate_task(
+                crawler_task_list=crawler_task_list,
+                generate_task_id=generate_plan_id,
+            )
+
+            # update status
+            self.update_tasks_status(
+                task_id_tuple=task_id_tuple,
+                ori_status=const.TRANSFORM_INIT_STATUS,
+                new_status=const.TRANSFORM_SUCCESS_STATUS,
+            )
+        except Exception as e:
+            feishu_bot_api.bot(
+                title="视频内容池发布任务",
+                detail={
+                    "platform": platform,
+                    "date": datetime.datetime.today().strftime("%Y-%m-%d"),
+                    "msg": "发布视频内容池失败,原因:{}".format(str(e)),
+                    "detail": traceback.format_exc(),
+                },
+                mention=False,
+            )
+
     def deal(self):
     def deal(self):
         """
         """
         entrance of this class
         entrance of this class
         """
         """
-        platform_list = ["sph", "gzh", "toutiao", "hksp", "sohu", "piaoquan"]
-        for platform in tqdm(platform_list, desc='process each platform'):
-            task_list = self.get_task_list(platform)
-            task_id_tuple = tuple([task['id'] for task in task_list])
-            vid_list = [task['pq_vid'] for task in task_list]
-            if vid_list:
-                try:
-                    # create video crawler plan
-                    plan_name = f"{video_pool_config[platform]['nick_name']}-{datetime.datetime.today().strftime('%Y-%m-%d')}-视频数量: {len(vid_list)}"
-                    crawler_plan_response = aiditApi.auto_create_single_video_crawler_task(
-                        plan_name=plan_name,
-                        plan_tag="单视频供给冷启动",
-                        video_id_list=vid_list,
-                    )
-                    crawler_plan_id = crawler_plan_response["data"]["id"]
-                    crawler_plan_name = crawler_plan_response["data"]["name"]
+        platform_map = [
+            (key, video_pool_config[key]["nick_name"]) for key in video_pool_config
+        ]
+        columns = [
+            feishu_bot_api.create_feishu_columns_sheet(
+                sheet_type="plain_text", sheet_name="category", display_name="品类"
+            ),
+            feishu_bot_api.create_feishu_columns_sheet(
+                sheet_type="number", sheet_name="total", display_name="品类视频总量"
+            ),
+            *[
+                feishu_bot_api.create_feishu_columns_sheet(
+                    sheet_type="number", sheet_name=platform, display_name=display_name
+                )
+                for platform, display_name in platform_map
+            ],
+        ]
+        publish_detail_table = {}
 
 
-                    # bind crawler plan to generate plan
-                    crawler_task_list = [
-                        {
-                            "contentType": 1,
-                            "inputSourceModal": 4,
-                            "inputSourceChannel": 10,
-                            "inputSourceType": 2,
-                            "inputSourceValue": crawler_plan_id,
-                            "inputSourceSubType": None,
-                            "fieldName": None,
-                            "inputSourceLabel": "原始帖子-视频-票圈小程序-内容添加计划-{}".format(crawler_plan_name),
-                        }
+        for platform in tqdm(platform_list, desc="process each platform"):
+            task_list = self.get_task_list(platform)
+            if task_list:
+                # split task list into each category
+                for category in video_category_list:
+                    task_list_with_category = [
+                        task for task in task_list if task["category"] == category
                     ]
                     ]
-                    generate_plan_id = video_pool_config[platform]['generate_plan_id']
-                    aiditApi.bind_crawler_task_to_generate_task(
-                        crawler_task_list=crawler_task_list,
-                        generate_task_id=generate_plan_id,
-                    )
-
-                    # update status
-                    self.update_tasks_status(
-                        task_id_tuple=task_id_tuple,
-                        ori_status=const.TRANSFORM_INIT_STATUS,
-                        new_status=const.TRANSFORM_SUCCESS_STATUS
-                    )
-                except Exception as e:
-                    bot(
-                        title='视频内容池发布任务',
-                        detail={
-                            'platform': platform,
-                            'date': datetime.datetime.today().strftime('%Y-%m-%d'),
-                            'msg': '发布视频内容池失败,原因:{}'.format(str(e)),
-                            'detail': traceback.format_exc(),
-                        },
-                        mention=False
+                    task_id_tuple = tuple(
+                        [task["id"] for task in task_list_with_category]
                     )
                     )
+                    vid_list = [task["pq_vid"] for task in task_list_with_category]
+                    if vid_list:
+                        self.create_crawler_plan(
+                            vid_list, platform, task_id_tuple, category
+                        )
+                        if publish_detail_table.get(platform):
+                            publish_detail_table[platform][category] = len(vid_list)
+                        else:
+                            publish_detail_table[platform] = {category: len(vid_list)}
+                    else:
+                        continue
             else:
             else:
-                bot(
-                    title='视频内容池发布任务',
+                feishu_bot_api.bot(
+                    title="视频内容池发布任务",
                     detail={
                     detail={
-                        'platform': platform,
-                        'date': datetime.datetime.today().strftime('%Y-%m-%d'),
-                        'msg': '该平台无待发布视频,请关注供给的抓取'
+                        "platform": platform,
+                        "date": datetime.datetime.today().strftime("%Y-%m-%d"),
+                        "msg": "该渠道今日无供给,注意关注供给详情",
                     },
                     },
-                    mention=False
-                )
+                    mention=False,
+                )
+        detail_rows = []
+        for category in video_category_list:
+            platform_counts = {
+                platform: publish_detail_table.get(platform, {}).get(category, 0)
+                for platform in platform_list
+            }
+            total = sum(platform_counts.values())
+            detail_rows.append(
+                {"category": category, **platform_counts, "total": total}
+            )
+        feishu_bot_api.bot(
+            title="视频内容池冷启动发布任务",
+            detail={
+                "columns": columns,
+                "rows": detail_rows,
+            },
+            table=True,
+            mention=False,
+        )