Ver Fonte

Merge branch 'master' into 2025-04-07-luojunhui-video-best-frame
merge master

luojunhui há 3 semanas atrás
pai
commit
5f17820603

+ 24 - 2
account_cold_start_daily.py

@@ -8,12 +8,30 @@ from argparse import ArgumentParser
 
 from applications import longArticlesMySQL, bot
 from coldStartTasks.crawler.weixinCategoryCrawler import weixinCategory
+from coldStartTasks.publish.publish_single_video_pool_videos import PublishSingleVideoPoolVideos
 from coldStartTasks.publish.publishCategoryArticles import CategoryColdStartTask
 from coldStartTasks.filter.title_similarity_task import ColdStartTitleSimilarityTask
 
 DEFAULT_CATEGORY_LIST = ['1030-手动挑号', 'account_association']
 
 
+def publish_single_video_task():
+    """
+    从视频内容池获取抓取
+    """
+    try:
+        publish_single_video_pool_videos = PublishSingleVideoPoolVideos()
+        publish_single_video_pool_videos.deal()
+    except Exception as e:
+        bot(
+            title="视频内容池任务创建失败",
+            detail={
+                "error": str(e),
+                "error_msg": traceback.format_exc()
+            }
+        )
+
+
 class AccountColdStartDailyTask(object):
     """
     账号冷启动代码
@@ -73,7 +91,7 @@ class AccountColdStartDailyTask(object):
                 }
             )
 
-    def publish_task(self, category_list, article_source):
+    def publish_article_task(self, category_list, article_source):
         """
         将账号文章发布到aigc抓取计划,并且绑定生成计划
         :param category_list:  文章品类
@@ -109,6 +127,10 @@ def main(date_str, category_list=None, article_source=None):
     main job, use crontab to do job daily
     :return:
     """
+    # 首先发布视频内容池
+    publish_single_video_task()
+
+    # 再处理文章内容池
     if not category_list:
         category_list = DEFAULT_CATEGORY_LIST
     if not article_source:
@@ -118,7 +140,7 @@ def main(date_str, category_list=None, article_source=None):
         if article_source == 'weixin':
             task.crawler_task(category_list=category_list, date_str=date_str)
 
-        task.publish_task(category_list=category_list, article_source=article_source)
+        task.publish_article_task(category_list=category_list, article_source=article_source)
 
 
 if __name__ == '__main__':

+ 34 - 1
applications/aiditApi.py

@@ -404,4 +404,37 @@ def get_only_auto_reply_accounts():
     denet = DeNetMysql()
     result = denet.select(sql)
     account_id_list = [i[0] for i in result]
-    return set(account_id_list)
+    return set(account_id_list)
+
+
+def auto_create_single_video_crawler_task(plan_name, plan_tag, video_id_list):
+    url = "http://aigc-api.cybertogether.net/aigc/crawler/plan/save"
+    payload = json.dumps({
+        "params": {
+            "contentFilters": [],
+            "accountFilters": [],
+            "filterAccountMatchMode": 1,
+            "filterContentMatchMode": 1,
+            "selectModeValues": [],
+            "searchModeValues": [],
+            "contentModal": 4,
+            "analyze": {},
+            "crawlerComment": 0,
+            "inputGroup": [],
+            "inputSourceGroups": [],
+            "modePublishTime": [],
+            "name": plan_name,
+            "frequencyType": 2,
+            "channel": 10,
+            "crawlerMode": 5,
+            "planTag": plan_tag,
+            "voiceExtractFlag": 1,
+            "srtExtractFlag": 1,
+            "videoKeyFrameType": 1,
+            "inputModeValues": video_id_list,
+            "planType": 2
+        },
+        "baseInfo": PERSON_COOKIE
+    })
+    response = requests.request("POST", url, headers=HEADERS, data=payload)
+    return response.json()

+ 1 - 0
applications/api/__init__.py

@@ -1,6 +1,7 @@
 """
 @author: luojunhui
 """
+from .aigc_system_api import AigcSystemApi
 from .deep_seek_api_by_volcanoengine import fetch_deepseek_response
 from .moon_shot_api import fetch_moon_shot_response
 from .nlp_api import similarity_between_title_list

+ 45 - 0
applications/api/aigc_system_api.py

@@ -0,0 +1,45 @@
+"""
+@author: luojunhui
+"""
+
+from tenacity import retry
+from requests.exceptions import RequestException
+import requests
+import json
+from typing import Optional, Dict, List, TypedDict
+
+from applications.utils import request_retry
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+headers = {
+    "Accept": "application/json",
+    "Accept-Language": "zh-CN,zh;q=0.9",
+    "Content-Type": "application/json",
+    "Proxy-Connection": "keep-alive",
+    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
+}
+
+
+class RelationDict(TypedDict):
+    videoPoolTraceId: str
+    channelContentId: str
+    platform: str
+
+
+class AigcSystemApi:
+
+    @retry(**retry_desc)
+    def insert_crawler_relation_to_aigc_system(
+        self, relation_list: List[RelationDict]
+    ) -> Optional[Dict]:
+        url = "http://aigc-api.cybertogether.net/aigc/crawler/content/videoPoolCrawlerRelation"
+        payload = json.dumps({"params": {"relations": relation_list}})
+        try:
+            response = requests.post(url, headers=headers, data=payload, timeout=60)
+            response.raise_for_status()
+            return response.json()
+        except RequestException as e:
+            print(f"API请求失败: {e}")
+        except json.JSONDecodeError as e:
+            print(f"响应解析失败: {e}")
+        return None

+ 10 - 0
applications/const/__init__.py

@@ -368,6 +368,16 @@ class ToutiaoVideoCrawlerConst:
     SLEEP_SECOND = 3
 
 
+class SingleVideoPoolPublishTaskConst:
+    """
+    const for single video pool publish task
+    """
+    TRANSFORM_INIT_STATUS = 0
+    TRANSFORM_SUCCESS_STATUS = 1
+    TRANSFORM_FAIL_STATUS = 99
+
+
+
 
 
 

+ 117 - 0
coldStartTasks/publish/publish_single_video_pool_videos.py

@@ -0,0 +1,117 @@
+import json
+import datetime
+import traceback
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import bot, aiditApi
+from applications.const import SingleVideoPoolPublishTaskConst
+from applications.db import DatabaseConnector
+from config import long_articles_config, apolloConfig
+
+config = apolloConfig()
+const = SingleVideoPoolPublishTaskConst()
+
+video_pool_config = json.loads(config.getConfigValue(key="video_pool_publish_config"))
+
+
+class PublishSingleVideoPoolVideos:
+    def __init__(self):
+        self.db_client = DatabaseConnector(db_config=long_articles_config)
+        self.db_client.connect()
+
+    def get_task_list(self, platform:str) -> list[dict]:
+        daily_limit = video_pool_config[platform]['process_num_each_day']
+        fetch_query = f"""
+            select id, content_trace_id, pq_vid
+            from single_video_transform_queue
+            where status = {const.TRANSFORM_INIT_STATUS} and platform = '{platform}'
+            order by score desc
+            limit {daily_limit};
+        """
+        fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
+        return fetch_response
+
+    def update_tasks_status(self,
+                           task_id_tuple: tuple,
+                           ori_status: int,
+                           new_status: int)-> int:
+        update_query = f"""
+            update single_video_transform_queue
+            set status = %s
+            where id in %s and status = %s;
+        """
+        affected_rows = self.db_client.save(
+            query=update_query,
+            params=(new_status, task_id_tuple, ori_status)
+        )
+        return affected_rows
+
+    def deal(self):
+        """
+        entrance of this class
+        """
+        platform_list = ["sph", "gzh", "toutiao", "hksp"]
+        for platform in tqdm(platform_list, desc='process each platform'):
+            task_list = self.get_task_list(platform)
+            task_id_tuple = tuple([task['id'] for task in task_list])
+            vid_list = [task['pq_vid'] for task in task_list]
+            if vid_list:
+                try:
+                    # create video crawler plan
+                    plan_name = f"{video_pool_config[platform]['nick_name']}-{datetime.datetime.today().strftime('%Y-%m-%d')}-视频数量: {len(vid_list)}"
+                    crawler_plan_response = aiditApi.auto_create_single_video_crawler_task(
+                        plan_name=plan_name,
+                        plan_tag="单视频供给冷启动",
+                        video_id_list=vid_list,
+                    )
+                    crawler_plan_id = crawler_plan_response["data"]["id"]
+                    crawler_plan_name = crawler_plan_response["data"]["name"]
+
+                    # bind crawler plan to generate plan
+                    crawler_task_list = [
+                        {
+                            "contentType": 1,
+                            "inputSourceModal": 4,
+                            "inputSourceChannel": 10,
+                            "inputSourceType": 2,
+                            "inputSourceValue": crawler_plan_id,
+                            "inputSourceSubType": None,
+                            "fieldName": None,
+                            "inputSourceLabel": "原始帖子-视频-票圈小程序-内容添加计划-{}".format(crawler_plan_name),
+                        }
+                    ]
+                    generate_plan_id = video_pool_config[platform]['generate_plan_id']
+                    aiditApi.bind_crawler_task_to_generate_task(
+                        crawler_task_list=crawler_task_list,
+                        generate_task_id=generate_plan_id,
+                    )
+
+                    # update status
+                    self.update_tasks_status(
+                        task_id_tuple=task_id_tuple,
+                        ori_status=const.TRANSFORM_INIT_STATUS,
+                        new_status=const.TRANSFORM_SUCCESS_STATUS
+                    )
+                except Exception as e:
+                    bot(
+                        title='视频内容池发布任务',
+                        detail={
+                            'platform': platform,
+                            'date': datetime.datetime.today().strftime('%Y-%m-%d'),
+                            'msg': '发布视频内容池失败,原因:{}'.format(str(e)),
+                            'detail': traceback.format_exc(),
+                        },
+                        mention=False
+                    )
+            else:
+                bot(
+                    title='视频内容池发布任务',
+                    detail={
+                        'platform': platform,
+                        'date': datetime.datetime.today().strftime('%Y-%m-%d'),
+                        'msg': '该平台无待发布视频,请关注供给的抓取'
+                    },
+                    mention=False
+                )

+ 42 - 5
coldStartTasks/publish/publish_video_to_pq_for_audit.py

@@ -11,13 +11,15 @@ from pymysql.cursors import DictCursor
 
 from applications import log
 from applications import PQAPI
-from applications.const import WeixinVideoCrawlerConst
+from applications.api import AigcSystemApi
 from applications.api import fetch_moon_shot_response
+from applications.const import WeixinVideoCrawlerConst
 from applications.db import DatabaseConnector
 from config import long_articles_config
 
 const = WeixinVideoCrawlerConst()
 pq_functions = PQAPI()
+aigc = AigcSystemApi()
 
 
 class PublishVideosForAudit(object):
@@ -142,7 +144,11 @@ class PublishVideosForAudit(object):
         获取需要检查的视频列表
         :return:
         """
-        sql = f"""SELECT audit_video_id FROM publish_single_video_source WHERE audit_status = {const.VIDEO_AUDIT_PROCESSING_STATUS};"""
+        sql = f"""
+            select content_trace_id, audit_video_id, score, platform 
+            from publish_single_video_source
+            where audit_status = {const.VIDEO_AUDIT_PROCESSING_STATUS};
+        """
         response = self.db_client.fetch(sql, cursor_type=DictCursor)
         return response
 
@@ -197,12 +203,30 @@ class PublishVideosForAudit(object):
             )
             return False
 
-    def check_video_status(self, video_id: int) -> Dict:
+    def insert_into_task_queue(self, video) -> int:
+        """
+        enqueue
+        """
+        insert_query = f"""
+            insert into single_video_transform_queue
+            (content_trace_id, pq_vid, score, platform)
+            values (%s, %s, %s, %s);
+        """
+        affected_rows = self.db_client.save(
+            query=insert_query,
+            params=(
+                video['content_trace_id'], video['audit_video_id'], video['score'], video['platform']
+            )
+        )
+        return affected_rows
+
+    def check_video_status(self, video_obj: dict) -> Dict:
         """
         检查视频的状态,若视频审核通过or不通过,修改记录状态
-        :param video_id:
+        :param video_obj:
         :return:
         """
+        video_id = video_obj['audit_video_id']
         response = pq_functions.getPQVideoListDetail([video_id])
         audit_status = response.get("data")[0].get("auditStatus")
         # 请求成功
@@ -216,6 +240,19 @@ class PublishVideosForAudit(object):
                     ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
                     new_audit_status=const.VIDEO_AUDIT_SUCCESS_STATUS
                 )
+                # 将视频存储到任务队列
+                self.insert_into_task_queue(video_obj)
+
+                # 将视频存储到 aigc 表
+                aigc.insert_crawler_relation_to_aigc_system(
+                    relation_list=[
+                        {
+                            "videoPoolTraceId": video_obj['content_trace_id'],
+                            "channelContentId": str(video_id),
+                            "platform": video_obj['platform'],
+                        }
+                    ]
+                )
             else:
                 # 修改小程序标题失败,修改审核状态为4
                 affected_rows = self.update_audit_status(
@@ -295,7 +332,7 @@ class PublishVideosForAudit(object):
         for video_obj in tqdm(video_list, desc="视频检查"):
             video_id = video_obj.get("audit_video_id")
             try:
-                response = self.check_video_status(video_id)
+                response = self.check_video_status(video_obj)
                 if response.get("affected_rows"):
                     continue
                 else:

+ 11 - 3
tasks/crawler_channel_account_videos.py

@@ -158,9 +158,17 @@ class CrawlerChannelAccountVideos:
                 break
 
             response_data = response["data"]
-            current_last_buffer = response_data["lastBuffer"]  # 更新分页游标
-            has_more = response_data["continueFlag"]  # 是否还有下一页
-            video_list = response_data["object"]
+            response_data_type = type(response_data)
+            if response_data_type is dict:
+                current_last_buffer = response_data.get["lastBuffer"]  # 更新分页游标
+                has_more = response_data["continueFlag"]  # 是否还有下一页
+                video_list = response_data["object"]
+            elif response_data_type is list:
+                has_more = False
+                video_list = response_data
+                video_list = video_list
+            else:
+                return
 
             if not video_list:
                 break

+ 57 - 25
tasks/update_published_articles_minigram_detail.py

@@ -1,6 +1,7 @@
 """
 @author: luojunhui
 """
+import json
 import traceback
 
 from datetime import datetime, timedelta
@@ -49,24 +50,6 @@ def extract_path(path: str) -> Dict:
         return EMPTY_DICT
 
 
-def get_article_mini_program_info(content_url: str) -> List[Dict]:
-    """
-    获取文章的小程序信息
-    :return:
-    """
-    try:
-        article_detail = spider.get_article_text(content_url)
-    except Exception as e:
-        raise SpiderError(error=e, spider="detail", url=content_url)
-
-    response_code = article_detail['code']
-    if response_code == const.ARTICLE_SUCCESS_CODE:
-        mini_info = article_detail['data']['data']['mini_program']
-        return mini_info
-    else:
-        return EMPTY_LIST
-
-
 class UpdatePublishedArticlesMinigramDetail(object):
     """
     更新已发布文章数据
@@ -136,7 +119,7 @@ class UpdatePublishedArticlesMinigramDetail(object):
         :return:
         """
         sql = f"""
-             SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title
+             SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
              FROM official_articles_v2
              WHERE FROM_UNIXTIME(publish_timestamp)
              BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
@@ -181,8 +164,9 @@ class UpdatePublishedArticlesMinigramDetail(object):
         url = article_info['ContentUrl']
         publish_timestamp = article_info['publish_timestamp']
         wx_sn = article_info['wx_sn'].decode()
+        root_source_id_list = json.loads(article_info['root_source_id_list'] if article_info['root_source_id_list'] else EMPTY_LIST)
 
-        article_mini_program_detail = get_article_mini_program_info(url)
+        article_mini_program_detail = self.get_article_mini_program_info(url, root_source_id_list)
         if article_mini_program_detail:
             log(
                 task=TASK_NAME,
@@ -208,9 +192,13 @@ class UpdatePublishedArticlesMinigramDetail(object):
                         image_url = mini_item['image_url']
                         nick_name = mini_item['nike_name']
                         # extract video id and root_source_id
-                        id_info = extract_path(mini_item['path'])
-                        root_source_id = id_info['root_source_id']
-                        video_id = id_info['video_id']
+                        if mini_item.get("root_source_id") and mini_item.get("video_id"):
+                            root_source_id = mini_item['root_source_id']
+                            video_id = mini_item['video_id']
+                        else:
+                            id_info = extract_path(mini_item['path'])
+                            root_source_id = id_info['root_source_id']
+                            video_id = id_info['video_id']
                         kimi_title = mini_item['title']
                         self.insert_each_root_source_id(
                             wx_sn=wx_sn,
@@ -237,6 +225,52 @@ class UpdatePublishedArticlesMinigramDetail(object):
         else:
             return EMPTY_DICT
 
+    def get_article_mini_program_info(self, content_url: str, root_source_id_list: list) -> List[Dict]:
+        """
+        获取文章的小程序信息
+        :return:
+        """
+        if root_source_id_list:
+            # 说明已经获取到 root_source_id了
+            fetch_sql = f"""
+                select video_id, root_source_id from long_articles_root_source_id where root_source_id in %s;
+            """
+            fetch_response = self.long_articles_db_client.fetch(
+                query=fetch_sql,
+                params=(tuple(root_source_id_list),),
+                cursor_type=DictCursor
+            )
+            mini_info = []
+            if fetch_response:
+                # 构造 mini_info 的格式
+                for item in fetch_response:
+                    mini_info.append(
+                        {
+                            "app_id": "wx89e7eb06478361d7",
+                            "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
+                            "image_url": "",
+                            "nike_name": "票圈 l 3亿人喜欢的视频平台",
+                            "root_source_id": item['root_source_id'],
+                            "video_id": item['video_id'],
+                            "service_type": "0",
+                            "title": "",
+                            "type": "card"
+                        }
+                    )
+                return mini_info
+
+        try:
+            article_detail = spider.get_article_text(content_url)
+        except Exception as e:
+            raise SpiderError(error=e, spider="detail", url=content_url)
+
+        response_code = article_detail['code']
+        if response_code == const.ARTICLE_SUCCESS_CODE:
+            mini_info = article_detail['data']['data']['mini_program']
+            return mini_info
+        else:
+            return EMPTY_LIST
+
     def get_root_source_id_for_three_days(self, biz_date: str) -> List[Dict]:
         """
         获取publish_dt在 biz_date前三天的root_source_id
@@ -375,5 +409,3 @@ class UpdatePublishedArticlesMinigramDetail(object):
             )
 
 
-
-