Selaa lähdekoodia

Merge branch 'master' into add-celery-app
merge master

luojunhui 2 kuukautta sitten
vanhempi
commit
5ce44acaee

+ 2 - 1
applications/api/__init__.py

@@ -8,4 +8,5 @@ from .moon_shot_api import fetch_moon_shot_response
 from .nlp_api import similarity_between_title_list
 from .gewe_api import WechatChannelAPI
 from .google_ai_api import GoogleAIAPI
-from .piaoquan_api import fetch_piaoquan_video_list_detail
+from .piaoquan_api import fetch_piaoquan_video_list_detail
+from .feishu_api import FeishuBotApi, FeishuSheetApi

+ 137 - 0
applications/api/feishu_api.py

@@ -1,3 +1,4 @@
+import json
 import requests
 
 
@@ -5,6 +6,11 @@ class Feishu:
     def __init__(self):
         self.token = None
         self.headers = {"Content-Type": "application/json"}
+        self.mention_all = {
+            "content": "<at id=all></at>\n",
+            "tag": "lark_md",
+        }
+        self.not_mention = {}
 
     def fetch_token(self):
         url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
@@ -52,3 +58,134 @@ class FeishuSheetApi(Feishu):
             "PUT", url=insert_value_url, headers=headers, json=body
         )
         print(response.json())
+
+
+class FeishuBotApi(Feishu):
+
+    @classmethod
+    def create_feishu_columns_sheet(
+        cls,
+        sheet_type,
+        sheet_name,
+        display_name,
+        width="auto",
+        vertical_align="top",
+        horizontal_align="left",
+        number_format=None,
+    ):
+        match sheet_type:
+            case "plain_text":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "width": width,
+                    "data_type": "text",
+                    "vertical_align": vertical_align,
+                    "horizontal_align": horizontal_align,
+                }
+
+            case "lark_md":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "data_type": "lark_md",
+                }
+
+            case "number":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "data_type": "number",
+                    "format": number_format,
+                    "width": width,
+                }
+
+            case "date":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "data_type": "date",
+                    "date_format": "YYYY/MM/DD",
+                }
+
+            case "options":
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "data_type": "options",
+                }
+
+            case _:
+                return {
+                    "name": sheet_name,
+                    "display_name": display_name,
+                    "width": width,
+                    "data_type": "text",
+                    "vertical_align": vertical_align,
+                    "horizontal_align": horizontal_align,
+                }
+
+    # 表格形式
+    def create_feishu_table(self, title, columns, rows, mention):
+        table_base = {
+            "header": {
+                "template": "blue",
+                "title": {"content": title, "tag": "plain_text"},
+            },
+            "elements": [
+                self.mention_all if mention else self.not_mention,
+                {
+                    "tag": "table",
+                    "page_size": len(rows) + 1,
+                    "row_height": "low",
+                    "header_style": {
+                        "text_align": "left",
+                        "text_size": "normal",
+                        "background_style": "grey",
+                        "text_color": "default",
+                        "bold": True,
+                        "lines": 1,
+                    },
+                    "columns": columns,
+                    "rows": rows,
+                },
+            ],
+        }
+        return table_base
+
+    # bot
+    def bot(self, title, detail, mention=True, table=False, env="prod"):
+        if env == "prod":
+            url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
+        else:
+            url = "https://open.feishu.cn/open-apis/bot/v2/hook/f32c0456-847f-41f3-97db-33fcc1616bcd"
+        headers = {"Content-Type": "application/json"}
+        if table:
+            card = self.create_feishu_table(
+                title=title,
+                columns=detail["columns"],
+                rows=detail["rows"],
+                mention=mention,
+            )
+        else:
+            card = {
+                "elements": [
+                    {
+                        "tag": "div",
+                        "text": self.mention_all if mention else self.not_mention,
+                    },
+                    {
+                        "tag": "div",
+                        "text": {
+                            "content": json.dumps(detail, ensure_ascii=False, indent=4),
+                            "tag": "lark_md",
+                        },
+                    },
+                ],
+                "header": {"title": {"content": title, "tag": "plain_text"}},
+            }
+        payload = {"msg_type": "interactive", "card": card}
+        res = requests.request(
+            "POST", url=url, headers=headers, data=json.dumps(payload), timeout=10
+        )
+        return res

+ 3 - 0
applications/const/__init__.py

@@ -431,6 +431,9 @@ class SingleVideoPoolPublishTaskConst:
     TRANSFORM_SUCCESS_STATUS = 1
     TRANSFORM_FAIL_STATUS = 99
 
+    SUCCESS_STATUS = 2
+
+
 
 class GoogleVideoUnderstandTaskConst:
     # task batch size

+ 0 - 48
cold_start/publish/publishAccountAssociationArticles.py

@@ -1,48 +0,0 @@
-"""
-@author: luojunhui
-发布i2u2i文章
-"""
-import pandas as pd
-from applications import DeNetMysql
-
-
-class I2U2I(object):
-    """
-    发布账号联想文章
-    """
-    db = DeNetMysql()
-
-    @classmethod
-    def getAccountPositionArticles(cls, gh_id, position):
-        """
-        获取联想账号的某个位置的所有文章
-        :return:
-        """
-        sql = f"""
-            select title, read_cnt, link 
-            from crawler_meta_article
-            where out_account_id = '{gh_id}' and article_index = {position};
-        """
-        article_list = cls.db.select(sql)
-        # df = pd.DataFrame(article_list, columns=['title', 'read_cnt', 'link'])
-        # read_mean = df['read_cnt'].mean()
-        # filter_response = df[
-        #     (df['read_cnt'] > read_mean * 1.3)
-        #     & (df['read_cnt'] > 5000)
-        #     ]
-        # return filter_response
-        return article_list
-
-    @classmethod
-    def filter(cls):
-        """
-        :return:
-        """
-        return
-
-if __name__ == '__main__':
-    job = I2U2I()
-    article_list = job.getAccountPositionArticles(gh_id='gh_e6be5a12e83c', position=1)
-    for article in article_list:
-        print(article)
-

+ 54 - 50
cold_start/publish/publishCategoryArticles.py

@@ -34,6 +34,7 @@ class CategoryColdStartTask(object):
         self.db_client = db_client
         self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map"))
         self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold"))
+        self.article_category_list = json.loads(apollo.getConfigValue("category_list"))
         self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000)
         self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
         self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
@@ -86,7 +87,7 @@ class CategoryColdStartTask(object):
         """
         sql = f"""
         SELECT 
-            article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score
+            article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score, category_by_ai
         FROM
             crawler_meta_article
         WHERE 
@@ -105,7 +106,7 @@ class CategoryColdStartTask(object):
         )
         article_df = DataFrame(article_list,
                                columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status',
-                                        'llm_sensitivity', 'score'])
+                                        'llm_sensitivity', 'score', 'category_by_ai'])
         return article_df
 
     def filter_each_category(self, category):
@@ -341,55 +342,58 @@ class CategoryColdStartTask(object):
             except Exception as e:
                 print("failed to update sensitive status: {}".format(e))
 
-        url_list = filtered_articles_df['link'].values.tolist()
-        if url_list:
-            # create_crawler_plan
-            crawler_plan_response = aiditApi.auto_create_crawler_task(
-                plan_id=None,
-                plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)),
-                plan_tag="品类冷启动",
-                article_source=article_source,
-                url_list=url_list
-            )
-            log(
-                task="category_publish_task",
-                function="publish_filter_articles",
-                message="成功创建抓取计划",
-                data=crawler_plan_response
-            )
-            # save to db
-            create_timestamp = int(time.time()) * 1000
-            crawler_plan_id = crawler_plan_response['data']['id']
-            crawler_plan_name = crawler_plan_response['data']['name']
-            self.insert_into_db(crawler_plan_id, crawler_plan_name, create_timestamp)
-
-            # auto bind to generate plan
-            new_crawler_task_list = [
-                {
-                    "contentType": 1,
-                    "inputSourceType": 2,
-                    "inputSourceSubType": None,
-                    "fieldName": None,
-                    "inputSourceValue": crawler_plan_id,
-                    "inputSourceLabel": crawler_plan_name,
-                    "inputSourceModal": 3,
-                    "inputSourceChannel": input_source_channel
-                }
-            ]
-            generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
-                crawler_task_list=new_crawler_task_list,
-                generate_task_id=self.category_map[category]
-            )
-            log(
-                task="category_publish_task",
-                function="publish_filter_articles",
-                message="成功绑定到生成计划",
-                data=generate_plan_response
-            )
+        # split into different category
+        for ai_category in self.article_category_list:
+            filter_category_df = filtered_articles_df[filtered_articles_df['category_by_ai'] == ai_category]
+            url_list = filter_category_df['link'].values.tolist()
+            if url_list:
+                # create_crawler_plan
+                crawler_plan_response = aiditApi.auto_create_crawler_task(
+                    plan_id=None,
+                    plan_name="自动绑定-{}-{}-{}--{}".format(category, ai_category,datetime.date.today().__str__(), len(url_list)),
+                    plan_tag="品类冷启动",
+                    article_source=article_source,
+                    url_list=url_list
+                )
+                log(
+                    task="category_publish_task",
+                    function="publish_filter_articles",
+                    message="成功创建抓取计划",
+                    data=crawler_plan_response
+                )
+                # save to db
+                create_timestamp = int(time.time()) * 1000
+                crawler_plan_id = crawler_plan_response['data']['id']
+                crawler_plan_name = crawler_plan_response['data']['name']
+                self.insert_into_db(crawler_plan_id, crawler_plan_name, create_timestamp)
+
+                # auto bind to generate plan
+                new_crawler_task_list = [
+                    {
+                        "contentType": 1,
+                        "inputSourceType": 2,
+                        "inputSourceSubType": None,
+                        "fieldName": None,
+                        "inputSourceValue": crawler_plan_id,
+                        "inputSourceLabel": crawler_plan_name,
+                        "inputSourceModal": 3,
+                        "inputSourceChannel": input_source_channel
+                    }
+                ]
+                generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
+                    crawler_task_list=new_crawler_task_list,
+                    generate_task_id=self.category_map[category]
+                )
+                log(
+                    task="category_publish_task",
+                    function="publish_filter_articles",
+                    message="成功绑定到生成计划",
+                    data=generate_plan_response
+                )
 
-            # change article status
-            article_id_list = filtered_articles_df['article_id'].values.tolist()
-            self.change_article_status_while_publishing(article_id_list=article_id_list)
+                # change article status
+                article_id_list = filter_category_df['article_id'].values.tolist()
+                self.change_article_status_while_publishing(article_id_list=article_id_list)
 
     def do_job(self, article_source, category_list=None):
         """

+ 19 - 1
cold_start/publish/publish_article_pool_articles.py

@@ -10,7 +10,7 @@ from applications.db import DatabaseConnector
 from config import long_articles_config
 
 
-class CategoryColdStartTask:
+class PublishArticlePoolArticles:
     def __init__(self):
         self.db_client = DatabaseConnector(long_articles_config)
         self.db_client.connect()
@@ -35,3 +35,21 @@ class CategoryColdStartTask:
                 }
             )
 
+
+class PublishGzhArticles(PublishArticlePoolArticles):
+
+    def get_articles_by_crawler_method(self, crawler_method):
+        fetch_query = f"""
+            select
+                article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score, category_by_ai
+            from crawler_meta_article
+            where category = '{crawler_method}' and platform = 'weixin' and title_sensitivity = 0;
+        """
+        fetch_response = self.db_client.fetch(fetch_query)
+        article_data_frame = DataFrame(
+            fetch_response,
+            columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status',
+                                        'llm_sensitivity', 'score', 'category_by_ai']
+        )
+        return article_data_frame
+

+ 142 - 72
cold_start/publish/publish_single_video_pool_videos.py

@@ -5,15 +5,23 @@ import traceback
 from pymysql.cursors import DictCursor
 from tqdm import tqdm
 
-from applications import bot, aiditApi
+from applications import aiditApi
+from applications.api import ApolloApi, FeishuBotApi
 from applications.const import SingleVideoPoolPublishTaskConst
 from applications.db import DatabaseConnector
-from config import long_articles_config, apolloConfig
+from config import long_articles_config
 
-config = apolloConfig()
+# init
+apollo_api = ApolloApi(env="prod")
+feishu_bot_api = FeishuBotApi()
 const = SingleVideoPoolPublishTaskConst()
 
-video_pool_config = json.loads(config.getConfigValue(key="video_pool_publish_config"))
+# get config information from apollo
+video_pool_config = json.loads(
+    apollo_api.get_config_value(key="video_pool_publish_config")
+)
+video_category_list = json.loads(apollo_api.get_config_value(key="category_list"))
+platform_list = list(video_pool_config.keys())
 
 
 class PublishSingleVideoPoolVideos:
@@ -21,97 +29,159 @@ class PublishSingleVideoPoolVideos:
         self.db_client = DatabaseConnector(db_config=long_articles_config)
         self.db_client.connect()
 
-    def get_task_list(self, platform:str) -> list[dict]:
-        daily_limit = video_pool_config[platform]['process_num_each_day']
+    def get_task_list(self, platform: str) -> list[dict]:
+        daily_limit = video_pool_config[platform]["process_num_each_day"]
         fetch_query = f"""
-            select id, content_trace_id, pq_vid
-            from single_video_transform_queue
-            where status = {const.TRANSFORM_INIT_STATUS} and platform = '{platform}'
+            select t1.id, t1.content_trace_id, t1.pq_vid, t1.score, t2.category
+            from single_video_transform_queue t1
+            join publish_single_video_source t2 on t1.content_trace_id = t2.content_trace_id
+            where 
+                t1.status = {const.TRANSFORM_INIT_STATUS} 
+                and t1.platform = '{platform}' 
+                and t2.category_status = {const.SUCCESS_STATUS}
             order by score desc
             limit {daily_limit};
         """
         fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
         return fetch_response
 
-    def update_tasks_status(self,
-                           task_id_tuple: tuple,
-                           ori_status: int,
-                           new_status: int)-> int:
+    def update_tasks_status(
+        self, task_id_tuple: tuple, ori_status: int, new_status: int
+    ) -> int:
         update_query = f"""
             update single_video_transform_queue
             set status = %s
             where id in %s and status = %s;
         """
         affected_rows = self.db_client.save(
-            query=update_query,
-            params=(new_status, task_id_tuple, ori_status)
+            query=update_query, params=(new_status, task_id_tuple, ori_status)
         )
         return affected_rows
 
+    def create_crawler_plan(
+        self, vid_list: list, platform: str, task_id_tuple: tuple, category: str
+    ) -> None:
+        try:
+            # create video crawler plan
+            plan_name = f"{video_pool_config[platform]['nick_name']}-{category}-{datetime.datetime.today().strftime('%Y-%m-%d')}-视频数量: {len(vid_list)}"
+            crawler_plan_response = aiditApi.auto_create_single_video_crawler_task(
+                plan_name=plan_name,
+                plan_tag="单视频供给冷启动",
+                video_id_list=vid_list,
+            )
+            crawler_plan_id = crawler_plan_response["data"]["id"]
+            crawler_plan_name = crawler_plan_response["data"]["name"]
+
+            # bind crawler plan to generate plan
+            crawler_task_list = [
+                {
+                    "contentType": 1,
+                    "inputSourceModal": 4,
+                    "inputSourceChannel": 10,
+                    "inputSourceType": 2,
+                    "inputSourceValue": crawler_plan_id,
+                    "inputSourceSubType": None,
+                    "fieldName": None,
+                    "inputSourceLabel": "原始帖子-视频-票圈小程序-内容添加计划-{}".format(
+                        crawler_plan_name
+                    ),
+                }
+            ]
+            generate_plan_id = video_pool_config[platform]["generate_plan_id"]
+            aiditApi.bind_crawler_task_to_generate_task(
+                crawler_task_list=crawler_task_list,
+                generate_task_id=generate_plan_id,
+            )
+
+            # update status
+            self.update_tasks_status(
+                task_id_tuple=task_id_tuple,
+                ori_status=const.TRANSFORM_INIT_STATUS,
+                new_status=const.TRANSFORM_SUCCESS_STATUS,
+            )
+        except Exception as e:
+            feishu_bot_api.bot(
+                title="视频内容池发布任务",
+                detail={
+                    "platform": platform,
+                    "date": datetime.datetime.today().strftime("%Y-%m-%d"),
+                    "msg": "发布视频内容池失败,原因:{}".format(str(e)),
+                    "detail": traceback.format_exc(),
+                },
+                mention=False,
+            )
+
     def deal(self):
         """
         entrance of this class
         """
-        platform_list = ["sph", "gzh", "toutiao", "hksp", "sohu", "piaoquan"]
-        for platform in tqdm(platform_list, desc='process each platform'):
-            task_list = self.get_task_list(platform)
-            task_id_tuple = tuple([task['id'] for task in task_list])
-            vid_list = [task['pq_vid'] for task in task_list]
-            if vid_list:
-                try:
-                    # create video crawler plan
-                    plan_name = f"{video_pool_config[platform]['nick_name']}-{datetime.datetime.today().strftime('%Y-%m-%d')}-视频数量: {len(vid_list)}"
-                    crawler_plan_response = aiditApi.auto_create_single_video_crawler_task(
-                        plan_name=plan_name,
-                        plan_tag="单视频供给冷启动",
-                        video_id_list=vid_list,
-                    )
-                    crawler_plan_id = crawler_plan_response["data"]["id"]
-                    crawler_plan_name = crawler_plan_response["data"]["name"]
+        platform_map = [
+            (key, video_pool_config[key]["nick_name"]) for key in video_pool_config
+        ]
+        columns = [
+            feishu_bot_api.create_feishu_columns_sheet(
+                sheet_type="plain_text", sheet_name="category", display_name="品类"
+            ),
+            feishu_bot_api.create_feishu_columns_sheet(
+                sheet_type="number", sheet_name="total", display_name="品类视频总量"
+            ),
+            *[
+                feishu_bot_api.create_feishu_columns_sheet(
+                    sheet_type="number", sheet_name=platform, display_name=display_name
+                )
+                for platform, display_name in platform_map
+            ],
+        ]
+        publish_detail_table = {}
 
-                    # bind crawler plan to generate plan
-                    crawler_task_list = [
-                        {
-                            "contentType": 1,
-                            "inputSourceModal": 4,
-                            "inputSourceChannel": 10,
-                            "inputSourceType": 2,
-                            "inputSourceValue": crawler_plan_id,
-                            "inputSourceSubType": None,
-                            "fieldName": None,
-                            "inputSourceLabel": "原始帖子-视频-票圈小程序-内容添加计划-{}".format(crawler_plan_name),
-                        }
+        for platform in tqdm(platform_list, desc="process each platform"):
+            task_list = self.get_task_list(platform)
+            if task_list:
+                # split task list into each category
+                for category in video_category_list:
+                    task_list_with_category = [
+                        task for task in task_list if task["category"] == category
                     ]
-                    generate_plan_id = video_pool_config[platform]['generate_plan_id']
-                    aiditApi.bind_crawler_task_to_generate_task(
-                        crawler_task_list=crawler_task_list,
-                        generate_task_id=generate_plan_id,
-                    )
-
-                    # update status
-                    self.update_tasks_status(
-                        task_id_tuple=task_id_tuple,
-                        ori_status=const.TRANSFORM_INIT_STATUS,
-                        new_status=const.TRANSFORM_SUCCESS_STATUS
-                    )
-                except Exception as e:
-                    bot(
-                        title='视频内容池发布任务',
-                        detail={
-                            'platform': platform,
-                            'date': datetime.datetime.today().strftime('%Y-%m-%d'),
-                            'msg': '发布视频内容池失败,原因:{}'.format(str(e)),
-                            'detail': traceback.format_exc(),
-                        },
-                        mention=False
+                    task_id_tuple = tuple(
+                        [task["id"] for task in task_list_with_category]
                     )
+                    vid_list = [task["pq_vid"] for task in task_list_with_category]
+                    if vid_list:
+                        self.create_crawler_plan(
+                            vid_list, platform, task_id_tuple, category
+                        )
+                        if publish_detail_table.get(platform):
+                            publish_detail_table[platform][category] = len(vid_list)
+                        else:
+                            publish_detail_table[platform] = {category: len(vid_list)}
+                    else:
+                        continue
             else:
-                bot(
-                    title='视频内容池发布任务',
+                feishu_bot_api.bot(
+                    title="视频内容池发布任务",
                     detail={
-                        'platform': platform,
-                        'date': datetime.datetime.today().strftime('%Y-%m-%d'),
-                        'msg': '该平台无待发布视频,请关注供给的抓取'
+                        "platform": platform,
+                        "date": datetime.datetime.today().strftime("%Y-%m-%d"),
+                        "msg": "该渠道今日无供给,注意关注供给详情",
                     },
-                    mention=False
-                )
+                    mention=False,
+                )
+        detail_rows = []
+        for category in video_category_list:
+            platform_counts = {
+                platform: publish_detail_table.get(platform, {}).get(category, 0)
+                for platform in platform_list
+            }
+            total = sum(platform_counts.values())
+            detail_rows.append(
+                {"category": category, **platform_counts, "total": total}
+            )
+        feishu_bot_api.bot(
+            title="视频内容池冷启动发布任务",
+            detail={
+                "columns": columns,
+                "rows": detail_rows,
+            },
+            table=True,
+            mention=False,
+        )