Prechádzať zdrojové kódy

Merge branch 'master' into 2025-03-17-account-crawler-pipeline

merge master
luojunhui 7 mesiacov pred
rodič
commit
8c314e5160

+ 34 - 1
applications/aiditApi.py

@@ -404,4 +404,37 @@ def get_only_auto_reply_accounts():
     denet = DeNetMysql()
     result = denet.select(sql)
     account_id_list = [i[0] for i in result]
-    return set(account_id_list)
+    return set(account_id_list)
+
+
+def auto_create_single_video_crawler_task(plan_name, plan_tag, video_id_list):
+    url = "http://aigc-api.cybertogether.net/aigc/crawler/plan/save"
+    payload = json.dumps({
+        "params": {
+            "contentFilters": [],
+            "accountFilters": [],
+            "filterAccountMatchMode": 1,
+            "filterContentMatchMode": 1,
+            "selectModeValues": [],
+            "searchModeValues": [],
+            "contentModal": 4,
+            "analyze": {},
+            "crawlerComment": 0,
+            "inputGroup": [],
+            "inputSourceGroups": [],
+            "modePublishTime": [],
+            "name": plan_name,
+            "frequencyType": 2,
+            "channel": 10,
+            "crawlerMode": 5,
+            "planTag": plan_tag,
+            "voiceExtractFlag": 1,
+            "srtExtractFlag": 1,
+            "videoKeyFrameType": 1,
+            "inputModeValues": video_id_list,
+            "planType": 2
+        },
+        "baseInfo": PERSON_COOKIE
+    })
+    response = requests.request("POST", url, headers=HEADERS, data=payload)
+    return response.json()

+ 1 - 0
applications/api/__init__.py

@@ -1,6 +1,7 @@
 """
 @author: luojunhui
 """
+from .aigc_system_api import AigcSystemApi
 from .deep_seek_api_by_volcanoengine import fetch_deepseek_response
 from .moon_shot_api import fetch_moon_shot_response
 from .nlp_api import similarity_between_title_list

+ 45 - 0
applications/api/aigc_system_api.py

@@ -0,0 +1,45 @@
+"""
+@author: luojunhui
+"""
+
+from tenacity import retry
+from requests.exceptions import RequestException
+import requests
+import json
+from typing import Optional, Dict, List, TypedDict
+
+from applications.utils import request_retry
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+headers = {
+    "Accept": "application/json",
+    "Accept-Language": "zh-CN,zh;q=0.9",
+    "Content-Type": "application/json",
+    "Proxy-Connection": "keep-alive",
+    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
+}
+
+
+class RelationDict(TypedDict):
+    videoPoolTraceId: str
+    channelContentId: str
+    platform: str
+
+
+class AigcSystemApi:
+
+    @retry(**retry_desc)
+    def insert_crawler_relation_to_aigc_system(
+        self, relation_list: List[RelationDict]
+    ) -> Optional[Dict]:
+        url = "http://aigc-api.cybertogether.net/aigc/crawler/content/videoPoolCrawlerRelation"
+        payload = json.dumps({"params": {"relations": relation_list}})
+        try:
+            response = requests.post(url, headers=headers, data=payload, timeout=60)
+            response.raise_for_status()
+            return response.json()
+        except RequestException as e:
+            print(f"API请求失败: {e}")
+        except json.JSONDecodeError as e:
+            print(f"响应解析失败: {e}")
+        return None

+ 34 - 2
applications/const/__init__.py

@@ -88,7 +88,7 @@ class updatePublishedMsgTaskConst:
     SUBSCRIBE_FAIL_RATE_THRESHOLD = 0.3
 
 
-class updateAccountReadRateTaskConst:
+class UpdateAccountReadRateTaskConst:
     """
     更新账号阅读率常量配置
     """
@@ -104,8 +104,14 @@ class updateAccountReadRateTaskConst:
     # 文章位置
     ARTICLE_INDEX_LIST = [1, 2, 3, 4, 5, 6, 7, 8]
 
+    # 默认粉丝
+    DEFAULT_FANS = 0
 
-class updateAccountReadAvgTaskConst:
+    # 最低粉丝量
+    MIN_FANS = 1000
+
+
+class UpdateAccountReadAvgTaskConst:
     """
     更新账号阅读均值常量配置
     """
@@ -124,6 +130,19 @@ class updateAccountReadAvgTaskConst:
     ARTICLES_DAILY = 1
     TOULIU = 2
 
+    # 默认粉丝
+    DEFAULT_FANS = 0
+
+    # index list
+    ARTICLE_INDEX_LIST = [1, 2, 3, 4, 5, 6, 7, 8]
+
+    # 默认点赞
+    DEFAULT_LIKE = 0
+
+    # 状态
+    USING_STATUS = 1
+    NOT_USING_STATUS = 0
+
 
 class WeixinVideoCrawlerConst:
     """
@@ -179,6 +198,9 @@ class WeixinVideoCrawlerConst:
     # 每天发送的审核视频数量
     MAX_VIDEO_NUM = 1000
 
+    # 单次发布视频审核量
+    MAX_VIDEO_NUM_PER_PUBLISH = 350
+
     # 标题状态
     TITLE_DEFAULT_STATUS = 0
     TITLE_EXIT_STATUS = 1
@@ -346,6 +368,16 @@ class ToutiaoVideoCrawlerConst:
     SLEEP_SECOND = 3
 
 
+class SingleVideoPoolPublishTaskConst:
+    """
+    const for single video pool publish task
+    """
+    TRANSFORM_INIT_STATUS = 0
+    TRANSFORM_SUCCESS_STATUS = 1
+    TRANSFORM_FAIL_STATUS = 99
+
+
+
 
 
 

+ 2 - 0
applications/utils/__init__.py

@@ -12,3 +12,5 @@ from .save_to_db import insert_into_single_video_source_table
 from .save_to_db import insert_into_video_meta_accounts_table
 from .save_to_db import insert_into_associated_recommendation_table
 from .upload import upload_to_oss
+from .fetch_info_from_aigc import fetch_account_fans
+from .fetch_info_from_aigc import fetch_publishing_account_list

+ 58 - 0
applications/utils/fetch_info_from_aigc.py

@@ -0,0 +1,58 @@
+"""
+fetch info from aigc database system
+"""
+from collections import defaultdict
+from typing import List, Dict
+
+from pymysql.cursors import DictCursor
+
+
+def fetch_publishing_account_list(db_client) -> List[Dict]:
+    """
+    fetch account_list from aigc database
+    """
+    fetch_sql = f"""
+        SELECT DISTINCT
+            t3.`name` as account_name,
+            t3.gh_id as gh_id,
+            t3.follower_count as fans,
+            t6.account_source_name as account_source,
+            t6.mode_type as mode_type,
+            t6.account_type as account_type,
+            t6.`status` as status
+        FROM
+            publish_plan t1
+            JOIN publish_plan_account t2 ON t1.id = t2.plan_id
+            JOIN publish_account t3 ON t2.account_id = t3.id
+            LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
+            LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
+            LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
+        WHERE
+            t1.plan_status = 1
+            AND t3.channel = 5
+            GROUP BY t3.id;
+    """
+    account_list = db_client.fetch(
+        query=fetch_sql,
+        cursor_type=DictCursor
+    )
+    return account_list
+
+def fetch_account_fans(db_client, start_date: str) -> Dict:
+    """
+    fetch account fans from aigc database
+    """
+    sql = f"""
+        SELECT t1.date_str, t1.fans_count, t2.gh_id
+        FROM datastat_wx t1 JOIN publish_account t2 ON t1.account_id = t2.id
+        WHERE t2.channel = 5
+            AND t2.status = 1 
+            AND t1.date_str >= '{start_date}' 
+        ORDER BY t1.date_str;
+        """
+    result = db_client.fetch(sql)
+    fans_dict = defaultdict(dict)
+    for dt, fans, gh_id in result:
+        fans_dict.setdefault(gh_id, {})[dt] = fans
+    return fans_dict
+

+ 81 - 117
cal_account_read_rate_avg_daily.py

@@ -7,14 +7,21 @@ from tqdm import tqdm
 from pandas import DataFrame
 from argparse import ArgumentParser
 from datetime import datetime
+from pymysql.cursors import DictCursor
 
-from applications import DeNetMysql, PQMySQL, longArticlesMySQL, bot, Functions, create_feishu_columns_sheet
-from applications.const import updateAccountReadRateTaskConst
-from config import apolloConfig
+from applications import bot, Functions, log
+from applications import create_feishu_columns_sheet
+from applications.db import DatabaseConnector
+from applications.const import UpdateAccountReadRateTaskConst
+from applications.utils import fetch_publishing_account_list
+from applications.utils import fetch_account_fans
+from config import apolloConfig, long_articles_config, piaoquan_crawler_config, denet_config
 
-const = updateAccountReadRateTaskConst()
+
+const = UpdateAccountReadRateTaskConst()
 config = apolloConfig()
 unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
+backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
 functions = Functions()
 read_rate_table = "long_articles_read_rate"
 
@@ -37,75 +44,7 @@ def filter_outlier_data(group, key='show_view_count'):
     return filtered_group
 
 
-def get_account_fans_by_dt(db_client) -> dict:
-    """
-    获取每个账号发粉丝,通过日期来区分
-    :return:
-    """
-    sql = f"""
-        SELECT 
-            t1.date_str, 
-            t1.fans_count, 
-            t2.gh_id
-        FROM datastat_wx t1
-        JOIN publish_account t2 ON t1.account_id = t2.id
-        WHERE 
-            t2.channel = 5 
-        AND t2.status = 1 
-        AND t1.date_str >= '2024-07-01' 
-        ORDER BY t1.date_str;
-    """
-    result = db_client.select(sql)
-    D = {}
-    for line in result:
-        dt = line[0]
-        fans = line[1]
-        gh_id = line[2]
-        if D.get(gh_id):
-            D[gh_id][dt] = fans
-        else:
-            D[gh_id] = {dt: fans}
-    return D
-
-
-def get_publishing_accounts(db_client) -> list[dict]:
-    """
-    获取每日正在发布的账号
-    :return:
-    """
-    sql = f"""
-    SELECT DISTINCT
-        t3.`name`,
-        t3.gh_id,
-        t3.follower_count,
-        t6.account_source_name,
-        t6.mode_type,
-        t6.account_type,
-        t6.`status`
-    FROM
-        publish_plan t1
-        JOIN publish_plan_account t2 ON t1.id = t2.plan_id
-        JOIN publish_account t3 ON t2.account_id = t3.id
-        LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
-        LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
-        LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
-    WHERE
-        t1.plan_status = 1
-        AND t3.channel = 5
-        -- AND t3.follower_count > 0
-        GROUP BY t3.id;
-    """
-    account_list = db_client.select(sql)
-    result_list = [
-        {
-            "account_name": i[0],
-            "gh_id": i[1]
-        } for i in account_list
-    ]
-    return result_list
-
-
-def get_account_articles_detail(db_client, gh_id_tuple) -> list[dict]:
+def get_account_articles_detail(db_client, gh_id_tuple, min_publish_timestamp) -> list[dict]:
     """
     get articles details
     :return:
@@ -116,47 +55,37 @@ def get_account_articles_detail(db_client, gh_id_tuple) -> list[dict]:
             FROM 
                 official_articles_v2
             WHERE 
-                ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}';
+                ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}' and publish_timestamp >= {min_publish_timestamp};
             """
-    result = db_client.select(sql)
-    response_list = [
-        {
-            "ghId": i[0],
-            "accountName": i[1],
-            "ItemIndex": i[2],
-            "show_view_count": i[3],
-            "publish_timestamp": i[4]
-        }
-        for i in result
-    ]
+    response_list = db_client.fetch(query=sql, cursor_type=DictCursor)
     return response_list
 
 
-def cal_account_read_rate(gh_id_tuple) -> DataFrame:
+def cal_account_read_rate(article_list, fans_dict) -> DataFrame:
     """
     计算账号位置的阅读率
     :return:
     """
-    pq_db = PQMySQL()
-    de_db = DeNetMysql()
     response = []
-    fans_dict_each_day = get_account_fans_by_dt(db_client=de_db)
-    account_article_detail = get_account_articles_detail(
-        db_client=pq_db,
-        gh_id_tuple=gh_id_tuple
-    )
-    for line in account_article_detail:
+    for line in article_list:
         gh_id = line['ghId']
         dt = functions.timestamp_to_str(timestamp=line['publish_timestamp'], string_format='%Y-%m-%d')
-        fans = fans_dict_each_day.get(gh_id, {}).get(dt, 0)
+        fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
+        if not fans:
+            fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
         if not fans:
-            fans = int(unauthorized_account.get(gh_id, 0))
+            fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
+            log(
+                task='cal_read_rate_avg_task',
+                function='cal_account_read_rate',
+                message='未获取到粉丝,使用备份粉丝表',
+                data=line
+            )
         line['fans'] = fans
-        if fans > 1000:
+        if fans > const.MIN_FANS:
             line['readRate'] = line['show_view_count'] / fans if fans else 0
             response.append(line)
-    return DataFrame(response,
-                     columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate'])
+    return DataFrame(response, columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate'])
 
 
 def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
@@ -168,7 +97,7 @@ def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
     min_time = max_time - const.STATISTICS_PERIOD
 
     # 通过
-    filterDataFrame = df[
+    filter_dataframe = df[
         (df["ghId"] == gh_id)
         & (min_time <= df["publish_timestamp"])
         & (df["publish_timestamp"] <= max_time)
@@ -176,13 +105,13 @@ def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
         ]
 
     # 用二倍标准差过滤
-    finalDF = filter_outlier_data(filterDataFrame)
+    final_dataframe = filter_outlier_data(filter_dataframe)
 
     return {
-        "read_rate_avg": finalDF['readRate'].mean(),
-        "max_publish_time": finalDF['publish_timestamp'].max(),
-        "min_publish_time": finalDF['publish_timestamp'].min(),
-        "records": len(finalDF)
+        "read_rate_avg": final_dataframe['readRate'].mean(),
+        "max_publish_time": final_dataframe['publish_timestamp'].max(),
+        "min_publish_time": final_dataframe['publish_timestamp'].min(),
+        "records": len(final_dataframe)
     }
 
 
@@ -204,7 +133,7 @@ def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict:
         WHERE gh_id = '{gh_id}' and position = {index} and dt_version < {dt}
         ORDER BY dt_version DESC limit 1;
     """
-    result = db_client.select(select_sql)
+    result = db_client.fetch(select_sql)
     if result:
         account_name = result[0][0]
         previous_read_rate_avg = result[0][1]
@@ -246,6 +175,9 @@ def update_single_day(dt, account_list, article_df, lam):
         string_format='%Y-%m-%d'
     )
 
+    # processed_account_set
+    processed_account_set = set()
+
     for account in tqdm(account_list, desc=dt):
         for index in const.ARTICLE_INDEX_LIST:
             read_rate_detail = cal_avg_account_read_rate(
@@ -259,7 +191,9 @@ def update_single_day(dt, account_list, article_df, lam):
             min_publish_time = read_rate_detail['min_publish_time']
             articles_count = read_rate_detail['records']
             if articles_count:
-                if index in {1, 2}:
+                processed_account_set.add(account['gh_id'])
+                # check read rate in position 1 and 2
+                if index in [1, 2]:
                     error_obj = check_each_position(
                         db_client=lam,
                         gh_id=account['gh_id'],
@@ -269,6 +203,7 @@ def update_single_day(dt, account_list, article_df, lam):
                     )
                     if error_obj:
                         error_list.append(error_obj)
+                # insert into database
                 try:
                     if not read_rate_avg:
                         continue
@@ -278,8 +213,8 @@ def update_single_day(dt, account_list, article_df, lam):
                         values
                         (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                     """
-                    lam.update(
-                        sql=insert_sql,
+                    lam.save(
+                        query=insert_sql,
                         params=(
                             account['account_name'],
                             account['gh_id'],
@@ -294,14 +229,17 @@ def update_single_day(dt, account_list, article_df, lam):
                         )
                     )
                 except Exception as e:
+                    print(e)
                     insert_error_list.append(str(e))
 
+    # bot sql error
     if insert_error_list:
         bot(
             title="更新阅读率均值,存在sql 插入失败",
             detail=insert_error_list
         )
 
+    # bot outliers
     if error_list:
         columns = [
             create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="account_name", display_name="账号名称"),
@@ -314,7 +252,7 @@ def update_single_day(dt, account_list, article_df, lam):
                                         display_name="相对变化率")
         ]
         bot(
-            title="更新阅读率均值,头次出现异常值通知",
+            title="阅读率均值表异常信息, 总共处理{}个账号".format(len(processed_account_set)),
             detail={
                 "columns": columns,
                 "rows": error_list
@@ -323,12 +261,14 @@ def update_single_day(dt, account_list, article_df, lam):
             mention=False
         )
 
+    # if no error, send success info
     if not error_list and not insert_error_list:
         bot(
-            title="阅读率均值表更新成功",
+            title="阅读率均值表更新成功, 总共处理{}个账号".format(len(processed_account_set)),
             detail={
                 "日期": dt
-            }
+            },
+            mention=False
         )
 
 
@@ -347,12 +287,36 @@ def main() -> None:
     else:
         dt = datetime.today().strftime('%Y-%m-%d')
 
-    lam = longArticlesMySQL()
-    de = DeNetMysql()
-    account_list = get_publishing_accounts(db_client=de)
-    df = cal_account_read_rate(tuple([i['gh_id'] for i in account_list]))
+    # init stat period
+    max_time = functions.str_to_timestamp(date_string=dt)
+    min_time = max_time - const.STATISTICS_PERIOD
+    min_stat_date = functions.timestamp_to_str(timestamp=min_time, string_format='%Y-%m-%d')
+
+    # init database connector
+    long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
+    long_articles_db_client.connect()
+
+    piaoquan_crawler_db_client = DatabaseConnector(db_config=piaoquan_crawler_config)
+    piaoquan_crawler_db_client.connect()
+
+    denet_db_client = DatabaseConnector(db_config=denet_config)
+    denet_db_client.connect()
+
+    # get account list
+    account_list = fetch_publishing_account_list(db_client=denet_db_client)
+
+    # get fans dict
+    fans_dict = fetch_account_fans(db_client=denet_db_client, start_date=min_stat_date)
+
+    # get data frame from official_articles_v2
+    gh_id_tuple = tuple([i['gh_id'] for i in account_list])
+    article_list = get_account_articles_detail(db_client=piaoquan_crawler_db_client, gh_id_tuple=gh_id_tuple, min_publish_timestamp=min_time)
+
+    # cal account read rate and make a dataframe
+    read_rate_dataframe = cal_account_read_rate(article_list, fans_dict)
 
-    update_single_day(dt, account_list, df, lam)
+    # update each day's data
+    update_single_day(dt, account_list, read_rate_dataframe, long_articles_db_client)
 
 
 if __name__ == '__main__':

+ 117 - 0
coldStartTasks/publish/publish_single_video_pool_videos.py

@@ -0,0 +1,117 @@
+import json
+import datetime
+import traceback
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import bot, aiditApi
+from applications.const import SingleVideoPoolPublishTaskConst
+from applications.db import DatabaseConnector
+from config import long_articles_config, apolloConfig
+
+config = apolloConfig()
+const = SingleVideoPoolPublishTaskConst()
+
+video_pool_config = json.loads(config.getConfigValue(key="video_pool_config"))
+
+
+class PublishSingleVideoPoolVideos:
+    def __init__(self):
+        self.db_client = DatabaseConnector(db_config=long_articles_config)
+        self.db_client.connect()
+
+    def get_task_list(self, platform:str) -> list[dict]:
+        daily_limit = video_pool_config[platform]['process_num_each_day']
+        fetch_query = f"""
+            select id, content_trace_id, pq_vid
+            from single_video_transform_queue
+            where status = {const.TRANSFORM_INIT_STATUS} and platform = '{platform}'
+            order by score desc
+            limit {daily_limit};
+        """
+        fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
+        return fetch_response
+
+    def update_tasks_status(self,
+                           task_id_tuple: tuple,
+                           ori_status: int,
+                           new_status: int)-> int:
+        update_query = f"""
+            update single_video_transform_queue
+            set status = %s
+            where id in %s and status = %s;
+        """
+        affected_rows = self.db_client.save(
+            query=update_query,
+            params=(new_status, task_id_tuple, ori_status)
+        )
+        return affected_rows
+
+    def deal(self):
+        """
+        entrance of this class
+        """
+        platform_list = ["sph", "gzh", "toutiao", "hksp"]
+        for platform in tqdm(platform_list, desc='process each platform'):
+            task_list = self.get_task_list(platform)
+            task_id_tuple = tuple([task['id'] for task in task_list])
+            vid_list = [task['pq_vid'] for task in task_list]
+            if vid_list:
+                try:
+                    # create video crawler plan
+                    plan_name = f"{video_pool_config[platform]['nick_name']}-{datetime.datetime.today().strftime('%Y-%m-%d')}-视频数量: {len(vid_list)}"
+                    crawler_plan_response = aiditApi.auto_create_single_video_crawler_task(
+                        plan_name=plan_name,
+                        plan_tag="单视频供给冷启动",
+                        video_id_list=vid_list,
+                    )
+                    crawler_plan_id = crawler_plan_response["data"]["id"]
+                    crawler_plan_name = crawler_plan_response["data"]["name"]
+
+                    # bind crawler plan to generate plan
+                    crawler_task_list = [
+                        {
+                            "contentType": 1,
+                            "inputSourceModal": 4,
+                            "inputSourceChannel": 10,
+                            "inputSourceType": 2,
+                            "inputSourceValue": crawler_plan_id,
+                            "inputSourceSubType": None,
+                            "fieldName": None,
+                            "inputSourceLabel": "原始帖子-视频-票圈小程序-内容添加计划-{}".format(crawler_plan_name),
+                        }
+                    ]
+                    generate_plan_id = video_pool_config[platform]['generate_plan_id']
+                    aiditApi.bind_crawler_task_to_generate_task(
+                        crawler_task_list=crawler_task_list,
+                        generate_task_id=generate_plan_id,
+                    )
+
+                    # update status
+                    self.update_tasks_status(
+                        task_id_tuple=task_id_tuple,
+                        ori_status=const.TRANSFORM_INIT_STATUS,
+                        new_status=const.TRANSFORM_SUCCESS_STATUS
+                    )
+                except Exception as e:
+                    bot(
+                        title='视频内容池发布任务',
+                        detail={
+                            'platform': platform,
+                            'date': datetime.datetime.today().strftime('%Y-%m-%d'),
+                            'msg': '发布视频内容池失败,原因:{}'.format(str(e)),
+                            'detail': traceback.format_exc(),
+                        },
+                        mention=False
+                    )
+            else:
+                bot(
+                    title='视频内容池发布任务',
+                    detail={
+                        'platform': platform,
+                        'date': datetime.datetime.today().strftime('%Y-%m-%d'),
+                        'msg': '该平台无待发布视频,请关注供给的抓取'
+                    },
+                    mention=False
+                )

+ 45 - 6
coldStartTasks/publish/publish_video_to_pq_for_audit.py

@@ -11,13 +11,15 @@ from pymysql.cursors import DictCursor
 
 from applications import log
 from applications import PQAPI
-from applications.const import WeixinVideoCrawlerConst
+from applications.api import AigcSystemApi
 from applications.api import fetch_moon_shot_response
+from applications.const import WeixinVideoCrawlerConst
 from applications.db import DatabaseConnector
 from config import long_articles_config
 
 const = WeixinVideoCrawlerConst()
 pq_functions = PQAPI()
+aigc = AigcSystemApi()
 
 
 class PublishVideosForAudit(object):
@@ -36,12 +38,14 @@ class PublishVideosForAudit(object):
         """
         already_published_count = self.get_published_articles_today()
         rest_count = const.MAX_VIDEO_NUM - already_published_count
+
+        limit_count = min(rest_count, const.MAX_VIDEO_NUM_PER_PUBLISH)
         sql = f"""
             SELECT id, article_title, video_oss_path 
             FROM publish_single_video_source 
             WHERE audit_status = {const.VIDEO_AUDIT_INIT_STATUS} and bad_status = {const.TITLE_DEFAULT_STATUS}
             ORDER BY score DESC
-            LIMIT {rest_count};
+            LIMIT {limit_count};
             """
         response = self.db_client.fetch(sql, cursor_type=DictCursor)
         return response
@@ -140,7 +144,11 @@ class PublishVideosForAudit(object):
         获取需要检查的视频列表
         :return:
         """
-        sql = f"""SELECT audit_video_id FROM publish_single_video_source WHERE audit_status = {const.VIDEO_AUDIT_PROCESSING_STATUS};"""
+        sql = f"""
+            select content_trace_id, audit_video_id, score, platform 
+            from publish_single_video_source
+            where audit_status = {const.VIDEO_AUDIT_PROCESSING_STATUS};
+        """
         response = self.db_client.fetch(sql, cursor_type=DictCursor)
         return response
 
@@ -195,12 +203,30 @@ class PublishVideosForAudit(object):
             )
             return False
 
-    def check_video_status(self, video_id: int) -> Dict:
+    def insert_into_task_queue(self, video) -> int:
+        """
+        enqueue
+        """
+        insert_query = f"""
+            insert into single_video_transform_queue
+            (content_trace_id, pq_vid, score, platform)
+            values (%s, %s, %s, %s);
+        """
+        affected_rows = self.db_client.save(
+            query=insert_query,
+            params=(
+                video['content_trace_id'], video['audit_video_id'], video['score'], video['platform']
+            )
+        )
+        return affected_rows
+
+    def check_video_status(self, video_obj: dict) -> Dict:
         """
         检查视频的状态,若视频审核通过or不通过,修改记录状态
-        :param video_id:
+        :param video_obj:
         :return:
         """
+        video_id = video_obj['audit_video_id']
         response = pq_functions.getPQVideoListDetail([video_id])
         audit_status = response.get("data")[0].get("auditStatus")
         # 请求成功
@@ -214,6 +240,19 @@ class PublishVideosForAudit(object):
                     ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
                     new_audit_status=const.VIDEO_AUDIT_SUCCESS_STATUS
                 )
+                # 将视频存储到任务队列
+                self.insert_into_task_queue(video_obj)
+
+                # 将视频存储到 aigc 表
+                aigc.insert_crawler_relation_to_aigc_system(
+                    relation_list=[
+                        {
+                            "videoPoolTraceId": video_obj['content_trace_id'],
+                            "channelContentId": str(video_id),
+                            "platform": video_obj['platform'],
+                        }
+                    ]
+                )
             else:
                 # 修改小程序标题失败,修改审核状态为4
                 affected_rows = self.update_audit_status(
@@ -293,7 +332,7 @@ class PublishVideosForAudit(object):
         for video_obj in tqdm(video_list, desc="视频检查"):
             video_id = video_obj.get("audit_video_id")
             try:
-                response = self.check_video_status(video_id)
+                response = self.check_video_status(video_obj)
                 if response.get("affected_rows"):
                     continue
                 else:

+ 26 - 0
sh/run_gzh_video_crawler.sh

@@ -0,0 +1,26 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/gzh_video_crawler_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 run_video_account_crawler.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - run_video_account_crawler.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart run_video_account_crawler.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 run_video_account_crawler.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted run_video_account_crawler.py"
+fi

+ 0 - 2
sh/run_video_publish_and_audit.sh

@@ -21,8 +21,6 @@ else
     conda activate tasks
 
     # 在后台运行 Python 脚本并重定向日志输出
-    nohup python3 run_video_account_crawler.py >> "${LOG_FILE}" 2>&1 &
-    sleep 180
     nohup python3 run_video_publish_and_audit.py >> "${LOG_FILE}" 2>&1 &
     echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted run_video_publish_and_audit.py"
 fi

+ 58 - 26
tasks/update_published_articles_minigram_detail.py

@@ -1,6 +1,7 @@
 """
 @author: luojunhui
 """
+import json
 import traceback
 
 from datetime import datetime, timedelta
@@ -49,24 +50,6 @@ def extract_path(path: str) -> Dict:
         return EMPTY_DICT
 
 
-def get_article_mini_program_info(content_url: str) -> List[Dict]:
-    """
-    获取文章的小程序信息
-    :return:
-    """
-    try:
-        article_detail = spider.get_article_text(content_url)
-    except Exception as e:
-        raise SpiderError(error=e, spider="detail", url=content_url)
-
-    response_code = article_detail['code']
-    if response_code == const.ARTICLE_SUCCESS_CODE:
-        mini_info = article_detail['data']['data']['mini_program']
-        return mini_info
-    else:
-        return EMPTY_LIST
-
-
 class UpdatePublishedArticlesMinigramDetail(object):
     """
     更新已发布文章数据
@@ -136,7 +119,7 @@ class UpdatePublishedArticlesMinigramDetail(object):
         :return:
         """
         sql = f"""
-             SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title
+             SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
              FROM official_articles_v2
              WHERE FROM_UNIXTIME(publish_timestamp)
              BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
@@ -181,8 +164,9 @@ class UpdatePublishedArticlesMinigramDetail(object):
         url = article_info['ContentUrl']
         publish_timestamp = article_info['publish_timestamp']
         wx_sn = article_info['wx_sn'].decode()
+        root_source_id_list = json.loads(article_info['root_source_id_list'] if article_info['root_source_id_list'] else EMPTY_LIST)
 
-        article_mini_program_detail = get_article_mini_program_info(url)
+        article_mini_program_detail = self.get_article_mini_program_info(url, root_source_id_list)
         if article_mini_program_detail:
             log(
                 task=TASK_NAME,
@@ -208,9 +192,13 @@ class UpdatePublishedArticlesMinigramDetail(object):
                         image_url = mini_item['image_url']
                         nick_name = mini_item['nike_name']
                         # extract video id and root_source_id
-                        id_info = extract_path(mini_item['path'])
-                        root_source_id = id_info['root_source_id']
-                        video_id = id_info['video_id']
+                        if mini_item.get("root_source_id") and mini_item.get("video_id"):
+                            root_source_id = mini_item['root_source_id']
+                            video_id = mini_item['video_id']
+                        else:
+                            id_info = extract_path(mini_item['path'])
+                            root_source_id = id_info['root_source_id']
+                            video_id = id_info['video_id']
                         kimi_title = mini_item['title']
                         self.insert_each_root_source_id(
                             wx_sn=wx_sn,
@@ -235,7 +223,53 @@ class UpdatePublishedArticlesMinigramDetail(object):
                 return article_info
 
         else:
-            return article_info
+            return EMPTY_DICT
+
+    def get_article_mini_program_info(self, content_url: str, root_source_id_list: list) -> List[Dict]:
+        """
+        获取文章的小程序信息
+        :return:
+        """
+        if root_source_id_list:
+            # 说明已经获取到 root_source_id了
+            fetch_sql = f"""
+                select video_id, root_source_id from long_articles_root_source_id where root_source_id in %s;
+            """
+            fetch_response = self.long_articles_db_client.fetch(
+                query=fetch_sql,
+                params=(tuple(root_source_id_list),),
+                cursor_type=DictCursor
+            )
+            mini_info = []
+            if fetch_response:
+                # 构造 mini_info 的格式
+                for item in fetch_response:
+                    mini_info.append(
+                        {
+                            "app_id": "wx89e7eb06478361d7",
+                            "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
+                            "image_url": "",
+                            "nike_name": "票圈 l 3亿人喜欢的视频平台",
+                            "root_source_id": item['root_source_id'],
+                            "video_id": item['video_id'],
+                            "service_type": "0",
+                            "title": "",
+                            "type": "card"
+                        }
+                    )
+                return mini_info
+
+        try:
+            article_detail = spider.get_article_text(content_url)
+        except Exception as e:
+            raise SpiderError(error=e, spider="detail", url=content_url)
+
+        response_code = article_detail['code']
+        if response_code == const.ARTICLE_SUCCESS_CODE:
+            mini_info = article_detail['data']['data']['mini_program']
+            return mini_info
+        else:
+            return EMPTY_LIST
 
     def get_root_source_id_for_three_days(self, biz_date: str) -> List[Dict]:
         """
@@ -375,5 +409,3 @@ class UpdatePublishedArticlesMinigramDetail(object):
             )
 
 
-
-

+ 83 - 131
updateAccountV3.py

@@ -7,151 +7,104 @@ import time
 from tqdm import tqdm
 from datetime import datetime, timedelta
 from argparse import ArgumentParser
+from pymysql.cursors import DictCursor
 
-from applications import PQMySQL, DeNetMysql, longArticlesMySQL
-from applications.const import updateAccountReadAvgTaskConst
+from applications.const import UpdateAccountReadAvgTaskConst
+from applications.db import DatabaseConnector
+from applications.utils import fetch_account_fans
+from applications.utils import fetch_publishing_account_list
 from config import apolloConfig
+from config import long_articles_config, denet_config, piaoquan_crawler_config
 
+read_rate_table = "long_articles_read_rate"
+read_avg_table = "account_avg_info_v3"
 config = apolloConfig()
+const = UpdateAccountReadAvgTaskConst()
 unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
 touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
-
-
-def get_account_fans_by_dt(db_client) -> dict:
-    """
-    获取每个账号发粉丝,通过日期来区分
-    :return:
-    """
-    sql = f"""
-        SELECT 
-            t1.date_str, 
-            t1.fans_count, 
-            t2.gh_id
-        FROM datastat_wx t1
-        JOIN publish_account t2 ON t1.account_id = t2.id
-        WHERE 
-            t2.channel = 5 
-        AND t2.status = 1 
-        AND t1.date_str >= '2024-09-01' 
-        ORDER BY t1.date_str;
-    """
-    result = db_client.select(sql)
-    D = {}
-    for line in result:
-        dt = line[0]
-        fans = line[1]
-        gh_id = line[2]
-        if D.get(gh_id):
-            D[gh_id][dt] = fans
-        else:
-            D[gh_id] = {dt: fans}
-    return D
-
+backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
 
 class UpdateAccountInfoVersion3(object):
     """
-    更新账号信息 v3
+    更新账号的平均阅读率
     """
 
     def __init__(self):
-        self.const = updateAccountReadAvgTaskConst()
-        self.pq = PQMySQL()
-        self.de = DeNetMysql()
-        self.lam = longArticlesMySQL()
+        # init piaoquan crawler db client
+        self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
+        self.piaoquan_crawler_db_client.connect()
+
+        # init long articles db client
+        self.long_articles_db_client = DatabaseConnector(long_articles_config)
+        self.long_articles_db_client.connect()
+
+        #  init aigc db client
+        self.denet_db_client = DatabaseConnector(denet_config)
+        self.denet_db_client.connect()
 
-    def get_account_position_read_rate(self, dt):
+    def fetch_read_rate_avg_for_each_account(self, dt):
         """
         从长文数据库获取账号阅读均值
         :return:
         """
         dt = int(dt.replace("-", ""))
         sql = f"""
-            SELECT 
-                gh_id, position, read_rate_avg
-            FROM
-                long_articles_read_rate
-            WHERE dt_version = {dt};
+            select gh_id, position, read_rate_avg
+            from {read_rate_table}
+            where dt_version = {dt};
         """
-
-        result = self.lam.select(sql)
+        fetch_response_list = self.long_articles_db_client.fetch(query=sql, cursor_type=DictCursor)
         account_read_rate_dict = {}
-        for item in result:
-            gh_id = item[0]
-            position = item[1]
-            rate = item[2]
-            key = "{}_{}".format(gh_id, position)
-            account_read_rate_dict[key] = rate
+        for item in fetch_response_list:
+            key = "{}_{}".format(item['gh_id'], item['position'])
+            account_read_rate_dict[key] = item['read_rate_avg']
         return account_read_rate_dict
 
-    def get_publishing_accounts(self):
-        """
-        获取每日正在发布的账号
-        :return:
-        """
-        sql = f"""
-        SELECT DISTINCT
-            t3.`name`,
-            t3.gh_id,
-            t3.follower_count,
-            t6.account_source_name,
-            t6.mode_type,
-            t6.account_type,
-            t6.`status`
-        FROM
-            publish_plan t1
-            JOIN publish_plan_account t2 ON t1.id = t2.plan_id
-            JOIN publish_account t3 ON t2.account_id = t3.id
-            LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
-            LEFT JOIN wx_statistics_group_source_account t5 on t3.id = t5.account_id
-            LEFT JOIN wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
-        WHERE
-            t1.plan_status = 1
-            AND t3.channel = 5
-            GROUP BY t3.id;
-        """
-        account_list = self.de.select(sql)
-        result_list = [
-            {
-                "account_name": i[0],
-                "gh_id": i[1],
-                "fans": i[2],
-                "account_source_name": i[3],
-                "mode_type": i[4],
-                "account_type": i[5],
-                "status": i[6]
-            } for i in account_list
-        ]
-        return result_list
-
     def do_task_list(self, dt):
         """
         do it
         """
-        fans_dict = get_account_fans_by_dt(db_client=self.de)
-        account_list = self.get_publishing_accounts()
-        rate_dict = self.get_account_position_read_rate(dt)
+        # get fans dict from aigc
+        fans_dict = fetch_account_fans(self.denet_db_client, dt)
+
+        # get publishing account list from aigc
+        account_list = fetch_publishing_account_list(self.denet_db_client)
+
+        # fetch each account's read avg for each position
+        read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt)
+
         for account in tqdm(account_list, desc=dt):
             gh_id = account["gh_id"]
-            business_type = self.const.TOULIU if gh_id in touliu_accounts else self.const.ARTICLES_DAILY
-            fans = fans_dict.get(gh_id, {}).get(dt, 0)
+            business_type = const.TOULIU if gh_id in touliu_accounts else const.ARTICLES_DAILY
+            fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
+
+            # use unauthorized account's fans if not found in aigc
+            if not fans:
+                fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
+
+            # use backup account's fans if not found in aigc
             if not fans:
-                fans = int(unauthorized_account.get(gh_id, 0))
+                fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
+
             if fans:
-                for index in range(1, 9):
+                for index in const.ARTICLE_INDEX_LIST:
                     gh_id_position = "{}_{}".format(gh_id, index)
-                    if rate_dict.get(gh_id_position):
-                        rate = rate_dict[gh_id_position]
-                        read_avg = fans * rate
-                        print(rate, read_avg)
+                    if read_rate_avg_dict.get(gh_id_position):
+                        # fetch read rate avg
+                        read_rate_avg = read_rate_avg_dict[gh_id_position]
+                        # cal read avg
+                        read_avg = fans * read_rate_avg
+
+                        # insert into database
                         insert_sql = f"""
-                        INSERT INTO account_avg_info_v3
-                        (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
-                        values
-                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                            insert into {read_avg_table}
+                            (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, account_mode, account_source, account_status, business_type, read_rate_avg)
+                            values
+                            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                         """
                         try:
-                            self.pq.update(
-                                sql=insert_sql,
+                            self.piaoquan_crawler_db_client.save(
+                                query=insert_sql,
                                 params=(
                                     gh_id,
                                     index,
@@ -159,29 +112,29 @@ class UpdateAccountInfoVersion3(object):
                                     account['account_name'],
                                     fans,
                                     read_avg,
-                                    0,
-                                    1,
+                                    const.DEFAULT_LIKE,
+                                    const.USING_STATUS,
                                     account['account_type'],
                                     account['mode_type'],
-                                    account['account_source_name'],
+                                    account['account_source'],
                                     account['status'],
                                     business_type,
-                                    rate
+                                    read_rate_avg
                                 )
                             )
                         except Exception as e:
-                            updateSQL = f"""
-                            UPDATE account_avg_info_v3
-                            set fans = %s, read_avg = %s, read_rate_avg = %s
-                            where gh_id = %s and position = %s and update_time = %s
+                            update_sql = f"""
+                                update {read_avg_table}
+                                set fans = %s, read_avg = %s, read_rate_avg = %s
+                                where gh_id = %s and position = %s and update_time = %s
                             """
                             try:
-                                affected_rows = self.pq.update(
-                                    sql=updateSQL,
+                                self.piaoquan_crawler_db_client.save(
+                                    query=update_sql,
                                     params=(
                                         fans,
                                         read_avg,
-                                        rate,
+                                        read_rate_avg,
                                         account['gh_id'],
                                         index,
                                         dt
@@ -192,17 +145,16 @@ class UpdateAccountInfoVersion3(object):
 
                         # 修改前一天的状态为 0
                         update_status_sql = f"""
-                        UPDATE account_avg_info_v3
-                        SET status = %s
-                        where update_time != %s and gh_id = %s and position = %s;
+                            update {read_avg_table}
+                            set status = %s
+                            where update_time != %s and gh_id = %s and position = %s;
                         """
-                        rows_affected = self.pq.update(
-                            sql=update_status_sql,
+                        self.piaoquan_crawler_db_client.save(
+                            query=update_status_sql,
                             params=(
-                                0, dt, account['gh_id'], index
+                                const.NOT_USING_STATUS, dt, account['gh_id'], index
                             )
                         )
-                        print("修改成功")
 
 
 def main():
@@ -215,15 +167,15 @@ def main():
                         help="Run only once for date in format of %Y-%m-%d. \
                                 If no specified, run as daily jobs.")
     args = parser.parse_args()
-    Up = UpdateAccountInfoVersion3()
+    update_account_read_avg_task = UpdateAccountInfoVersion3()
     if args.run_date:
-        Up.do_task_list(dt=args.run_date)
+        update_account_read_avg_task.do_task_list(dt=args.run_date)
     else:
         dt_object = datetime.fromtimestamp(int(time.time()))
         one_day = timedelta(days=1)
         yesterday = dt_object - one_day
         yesterday_str = yesterday.strftime('%Y-%m-%d')
-        Up.do_task_list(dt=yesterday_str)
+        update_account_read_avg_task.do_task_list(dt=yesterday_str)
 
 
 if __name__ == '__main__':