Переглянути джерело

Merge branch '2025-03-20-bind-video-pool-to-aigc' of luojunhui/LongArticlesJob into master

luojunhui 1 місяць тому
батько
коміт
5839b8a6dd

+ 34 - 1
applications/aiditApi.py

@@ -404,4 +404,37 @@ def get_only_auto_reply_accounts():
     denet = DeNetMysql()
     result = denet.select(sql)
     account_id_list = [i[0] for i in result]
-    return set(account_id_list)
+    return set(account_id_list)
+
+
+def auto_create_single_video_crawler_task(plan_name, plan_tag, video_id_list):
+    url = "http://aigc-api.cybertogether.net/aigc/crawler/plan/save"
+    payload = json.dumps({
+        "params": {
+            "contentFilters": [],
+            "accountFilters": [],
+            "filterAccountMatchMode": 1,
+            "filterContentMatchMode": 1,
+            "selectModeValues": [],
+            "searchModeValues": [],
+            "contentModal": 4,
+            "analyze": {},
+            "crawlerComment": 0,
+            "inputGroup": [],
+            "inputSourceGroups": [],
+            "modePublishTime": [],
+            "name": plan_name,
+            "frequencyType": 2,
+            "channel": 10,
+            "crawlerMode": 5,
+            "planTag": plan_tag,
+            "voiceExtractFlag": 1,
+            "srtExtractFlag": 1,
+            "videoKeyFrameType": 1,
+            "inputModeValues": video_id_list,
+            "planType": 2
+        },
+        "baseInfo": PERSON_COOKIE
+    })
+    response = requests.request("POST", url, headers=HEADERS, data=payload)
+    return response.json()

+ 1 - 0
applications/api/__init__.py

@@ -1,6 +1,7 @@
 """
 @author: luojunhui
 """
+from .aigc_system_api import AigcSystemApi
 from .deep_seek_api_by_volcanoengine import fetch_deepseek_response
 from .moon_shot_api import fetch_moon_shot_response
 from .nlp_api import similarity_between_title_list

+ 44 - 0
applications/api/aigc_system_api.py

@@ -0,0 +1,44 @@
+"""
+@author: luojunhui
+"""
+
+from tenacity import retry
+from requests.exceptions import RequestException
+import requests
+import json
+from typing import Optional, Dict, List, TypedDict
+
+from applications.utils import request_retry
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+headers = {
+    "Accept": "application/json",
+    "Accept-Language": "zh-CN,zh;q=0.9",
+    "Content-Type": "application/json",
+    "Proxy-Connection": "keep-alive",
+    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
+}
+
+
+class RelationDict(TypedDict):
+    videoPoolTraceId: str
+    channelContentId: str
+
+
+class AigcSystemApi:
+
+    @retry(**retry_desc)
+    def insert_crawler_relation_to_aigc_system(
+        self, relation_list: List[RelationDict]
+    ) -> Optional[Dict]:
+        url = "http://aigc-api.cybertogether.net/aigc/crawler/content/videoPoolCrawlerRelation"
+        payload = json.dumps({"params": {"relations": relation_list}})
+        try:
+            response = requests.post(url, headers=headers, data=payload, timeout=60)
+            response.raise_for_status()
+            return response.json()
+        except RequestException as e:
+            print(f"API请求失败: {e}")
+        except json.JSONDecodeError as e:
+            print(f"响应解析失败: {e}")
+        return None

+ 10 - 0
applications/const/__init__.py

@@ -368,6 +368,16 @@ class ToutiaoVideoCrawlerConst:
     SLEEP_SECOND = 3
 
 
+class SingleVideoPoolPublishTaskConst:
+    """
+    const for single video pool publish task
+    """
+    TRANSFORM_INIT_STATUS = 0
+    TRANSFORM_SUCCESS_STATUS = 1
+    TRANSFORM_FAIL_STATUS = 99
+
+
+
 
 
 

+ 117 - 0
coldStartTasks/publish/publish_single_video_pool_videos.py

@@ -0,0 +1,117 @@
+import json
+import datetime
+import traceback
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import bot, aiditApi
+from applications.const import SingleVideoPoolPublishTaskConst
+from applications.db import DatabaseConnector
+from config import long_articles_config, apolloConfig
+
+config = apolloConfig()
+const = SingleVideoPoolPublishTaskConst()
+
+video_pool_config = json.loads(config.getConfigValue(key="video_pool_config"))
+
+
+class PublishSingleVideoPoolVideos:
+    def __init__(self):
+        self.db_client = DatabaseConnector(db_config=long_articles_config)
+        self.db_client.connect()
+
+    def get_task_list(self, platform:str) -> list[dict]:
+        daily_limit = video_pool_config[platform]['process_num_each_day']
+        fetch_query = f"""
+            select id, content_trace_id, pq_vid
+            from single_video_transform_queue
+            where status = {const.TRANSFORM_INIT_STATUS} and platform = '{platform}'
+            order by score desc
+            limit {daily_limit};
+        """
+        fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
+        return fetch_response
+
+    def update_tasks_status(self,
+                           task_id_tuple: tuple,
+                           ori_status: int,
+                           new_status: int)-> int:
+        update_query = f"""
+            update single_video_transform_queue
+            set status = %s
+            where id in %s and status = %s;
+        """
+        affected_rows = self.db_client.save(
+            query=update_query,
+            params=(new_status, task_id_tuple, ori_status)
+        )
+        return affected_rows
+
+    def deal(self):
+        """
+        entrance of this class
+        """
+        platform_list = ["sph", "gzh", "toutiao", "hksp"]
+        for platform in tqdm(platform_list, desc='process each platform'):
+            task_list = self.get_task_list(platform)
+            task_id_tuple = tuple([task['id'] for task in task_list])
+            vid_list = [task['pq_vid'] for task in task_list]
+            if vid_list:
+                try:
+                    # create video crawler plan
+                    plan_name = f"{video_pool_config[platform]['nick_name']}-{datetime.datetime.today().strftime('%Y-%m-%d')}-视频数量: {len(vid_list)}"
+                    crawler_plan_response = aiditApi.auto_create_single_video_crawler_task(
+                        plan_name=plan_name,
+                        plan_tag="单视频供给冷启动",
+                        video_id_list=vid_list,
+                    )
+                    crawler_plan_id = crawler_plan_response["data"]["id"]
+                    crawler_plan_name = crawler_plan_response["data"]["name"]
+
+                    # bind crawler plan to generate plan
+                    crawler_task_list = [
+                        {
+                            "contentType": 1,
+                            "inputSourceModal": 4,
+                            "inputSourceChannel": 10,
+                            "inputSourceType": 2,
+                            "inputSourceValue": crawler_plan_id,
+                            "inputSourceSubType": None,
+                            "fieldName": None,
+                            "inputSourceLabel": "原始帖子-视频-票圈小程序-内容添加计划-{}".format(crawler_plan_name),
+                        }
+                    ]
+                    generate_plan_id = video_pool_config[platform]['generate_plan_id']
+                    aiditApi.bind_crawler_task_to_generate_task(
+                        crawler_task_list=crawler_task_list,
+                        generate_task_id=generate_plan_id,
+                    )
+
+                    # update status
+                    self.update_tasks_status(
+                        task_id_tuple=task_id_tuple,
+                        ori_status=const.TRANSFORM_INIT_STATUS,
+                        new_status=const.TRANSFORM_SUCCESS_STATUS
+                    )
+                except Exception as e:
+                    bot(
+                        title='视频内容池发布任务',
+                        detail={
+                            'platform': platform,
+                            'date': datetime.datetime.today().strftime('%Y-%m-%d'),
+                            'msg': '发布视频内容池失败,原因:{}'.format(str(e)),
+                            'detail': traceback.format_exc(),
+                        },
+                        mention=False
+                    )
+            else:
+                bot(
+                    title='视频内容池发布任务',
+                    detail={
+                        'platform': platform,
+                        'date': datetime.datetime.today().strftime('%Y-%m-%d'),
+                        'msg': '该平台无待发布视频,请关注供给的抓取'
+                    },
+                    mention=False
+                )

+ 41 - 5
coldStartTasks/publish/publish_video_to_pq_for_audit.py

@@ -11,13 +11,15 @@ from pymysql.cursors import DictCursor
 
 from applications import log
 from applications import PQAPI
-from applications.const import WeixinVideoCrawlerConst
+from applications.api import AigcSystemApi
 from applications.api import fetch_moon_shot_response
+from applications.const import WeixinVideoCrawlerConst
 from applications.db import DatabaseConnector
 from config import long_articles_config
 
 const = WeixinVideoCrawlerConst()
 pq_functions = PQAPI()
+aigc = AigcSystemApi()
 
 
 class PublishVideosForAudit(object):
@@ -142,7 +144,11 @@ class PublishVideosForAudit(object):
         获取需要检查的视频列表
         :return:
         """
-        sql = f"""SELECT audit_video_id FROM publish_single_video_source WHERE audit_status = {const.VIDEO_AUDIT_PROCESSING_STATUS};"""
+        sql = f"""
+            select content_trace_id, audit_video_id, score, platform 
+            from publish_single_video_source
+            where audit_status = {const.VIDEO_AUDIT_PROCESSING_STATUS};
+        """
         response = self.db_client.fetch(sql, cursor_type=DictCursor)
         return response
 
@@ -197,12 +203,30 @@ class PublishVideosForAudit(object):
             )
             return False
 
-    def check_video_status(self, video_id: int) -> Dict:
+    def insert_into_task_queue(self, video) -> int:
+        """
+        enqueue
+        """
+        insert_query = f"""
+            insert into single_video_transform_queue
+            (content_trace_id, pq_vid, score, platform)
+            values (%s, %s, %s, %s);
+        """
+        affected_rows = self.db_client.save(
+            query=insert_query,
+            params=(
+                video['content_trace_id'], video['audit_video_id'], video['score'], video['platform']
+            )
+        )
+        return affected_rows
+
+    def check_video_status(self, video_obj: dict) -> Dict:
         """
         检查视频的状态,若视频审核通过or不通过,修改记录状态
-        :param video_id:
+        :param video_obj:
         :return:
         """
+        video_id = video_obj['audit_video_id']
         response = pq_functions.getPQVideoListDetail([video_id])
         audit_status = response.get("data")[0].get("auditStatus")
         # 请求成功
@@ -216,6 +240,18 @@ class PublishVideosForAudit(object):
                     ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
                     new_audit_status=const.VIDEO_AUDIT_SUCCESS_STATUS
                 )
+                # 将视频存储到任务队列
+                self.insert_into_task_queue(video_obj)
+
+                # 将视频存储到 aigc 表
+                aigc.insert_crawler_relation_to_aigc_system(
+                    relation_list=[
+                        {
+                            "videoPoolTraceId": video_obj['content_trace_id'],
+                            "channelContentId": str(video_id)
+                        }
+                    ]
+                )
             else:
                 # 修改小程序标题失败,修改审核状态为4
                 affected_rows = self.update_audit_status(
@@ -295,7 +331,7 @@ class PublishVideosForAudit(object):
         for video_obj in tqdm(video_list, desc="视频检查"):
             video_id = video_obj.get("audit_video_id")
             try:
-                response = self.check_video_status(video_id)
+                response = self.check_video_status(video_obj)
                 if response.get("affected_rows"):
                     continue
                 else: