Browse Source

article cold start task bug fix

luojunhui 5 months ago
parent
commit
4a38e4639c

+ 0 - 44
account_association_task.py

@@ -1,44 +0,0 @@
-"""
-@author: luojunhui
-账号联想---账号搜索任务
-"""
-import traceback
-
-from argparse import ArgumentParser
-from datetime import datetime
-
-from applications import bot
-from cold_start.crawler.weixin_account_association_crawler import AccountAssociationCrawler
-
-
-def main():
-    """
-    账号联想---账号搜索任务
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument("--run-date",
-                        help="Run only once for date in format of %Y-%m-%d. \
-                                If no specified, run as daily jobs.")
-    args = parser.parse_args()
-
-    if args.run_date:
-        biz_date_str = args.run_date
-    else:
-        biz_date_str = datetime.today().strftime('%Y-%m-%d')
-    try:
-        biz_date = datetime.strptime(biz_date_str, "%Y-%m-%d")
-        account_association_crawler = AccountAssociationCrawler()
-        account_association_crawler.run_account_association(biz_date=biz_date)
-    except Exception as e:
-        bot(
-            title="账号联想任务失败",
-            detail={
-                "error": str(e),
-                "error_stack": traceback.format_exc()
-            }
-        )
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 147
account_cold_start_daily.py

@@ -1,147 +0,0 @@
-"""
-@author: luojunhui
-"""
-import datetime
-import traceback
-
-from argparse import ArgumentParser
-
-from applications import longArticlesMySQL, bot
-from tasks.crawler_tasks.crawler_articles import CrawlerDailyScrapeAccountArticles
-from tasks.crawler_tasks.crawler_articles import CrawlerAssociationAccountArticles
-from cold_start.publish.publishCategoryArticles import CategoryColdStartTask
-from cold_start.filter.title_similarity_task import ColdStartTitleSimilarityTask
-
-DEFAULT_METHOD_LIST = ['1030-手动挑号', 'account_association']
-
-
-def crawler_task(method_list, date_str):
-    """
-    :return:
-    """
-    # 初始化category抓取类
-    try:
-        daily_scrape_tasks = CrawlerDailyScrapeAccountArticles()
-        daily_scrape_tasks.deal(method_list=method_list)
-
-        association_scrape_tasks = CrawlerAssociationAccountArticles()
-        association_scrape_tasks.deal(date_str=date_str)
-
-        # 抓取完成之后,给抓取到的标题进行相似度打分
-        cold_start_title_similarity_task = ColdStartTitleSimilarityTask()
-        cold_start_title_similarity_task.init_database()
-        cold_start_title_similarity_task.run(meta_source='article')
-
-        bot(
-            title="账号冷启动任务,抓取完成",
-            detail={
-                "finish_time": datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S'),
-                "method": method_list
-            },
-            mention=False
-        )
-    except Exception as e:
-        bot(
-            title="账号抓取冷启动任务,抓取失败",
-            detail={
-                "error": str(e),
-                "error_msg": traceback.format_exc()
-            }
-        )
-
-
-class AccountColdStartDailyTask(object):
-    """
-    账号冷启动代码
-    """
-
-    def __init__(self):
-        """
-        """
-        self.db_client = None
-
-    def init_db(self):
-        """
-        初始化数据库
-        :return:
-        """
-        try:
-            self.db_client = longArticlesMySQL()
-            return True
-        except Exception as e:
-            bot(
-                title='账号抓取任务, 冷启动数据库连接失败',
-                detail={
-                    "error": str(e),
-                    "error_msg": traceback.format_exc()
-                }
-            )
-            return False
-
-    def publish_article_task(self, category_list, article_source):
-        """
-        将账号文章发布到aigc抓取计划,并且绑定生成计划
-        :param category_list:  文章品类
-        :param article_source: 文章来源(toutiao or weixin)
-        :return:
-        """
-        try:
-            weixin_category_publisher = CategoryColdStartTask(db_client=self.db_client)
-            weixin_category_publisher.do_job(
-                category_list=category_list,
-                article_source=article_source
-            )
-            bot(
-                title="账号冷启任务,发布完成",
-                detail={
-                    "finish_time": datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S'),
-                    "category": category_list
-                },
-                mention=False
-            )
-        except Exception as e:
-            bot(
-                title="账号发布冷启动任务,发布失败",
-                detail={
-                    "error": str(e),
-                    "error_msg": traceback.format_exc()
-                }
-            )
-
-
-def main(method_list=None, article_source=None):
-    """
-    main job, use crontab to do job daily
-    :return:
-    """
-    if not method_list:
-        method_list = DEFAULT_METHOD_LIST
-    if not article_source:
-        article_source = 'weixin'
-    task = AccountColdStartDailyTask()
-    if task.init_db():
-        task.publish_article_task(category_list=method_list, article_source=article_source)
-
-
-if __name__ == '__main__':
-    parser = ArgumentParser()
-    parser.add_argument("--run_date", help="--run_date format: %Y-%m-%d")
-    args = parser.parse_args()
-
-    if args.run_date:
-        run_date = args.run_date
-    else:
-        run_date = datetime.date.today().isoformat()
-
-    # 执行头条发布
-    main(
-        method_list=['history', 'tech', 'finance', 'entertainment'],
-        article_source='toutiao'
-    )
-    # 执行微信抓取发布
-    main()
-
-    # 执行抓取
-    crawler_task(
-        method_list=DEFAULT_METHOD_LIST, date_str=run_date)
-

+ 0 - 40
account_explore_task.py

@@ -1,40 +0,0 @@
-"""
-@author: luojunhui
-@description: try to get some more accounts
-"""
-
-from tasks.crawler_tasks.crawler_account.crawler_accounts_by_association import ChannelsAccountCrawler
-from tasks.crawler_tasks.crawler_account.crawler_accounts_by_association import ToutiaoAccountCrawler
-from tasks.crawler_tasks.crawler_account.crawler_accounts_by_association import HaoKanAccountCrawler
-from tasks.crawler_tasks.crawler_account.crawler_accounts_by_association import GzhAccountCrawler
-from tasks.ai_tasks.generate_search_keys import get_association_title_list_in_multi_threads
-
-
-def deal_each_platform(platform: str) -> None:
-    """
-    deal each platform
-    :param platform: str, channels or toutiao
-    """
-    match platform:
-        case "toutiao":
-            crawler = ToutiaoAccountCrawler()
-        case "sph":
-            crawler = ChannelsAccountCrawler()
-        case "hksp":
-            crawler = HaoKanAccountCrawler()
-        case "gzh":
-            crawler = GzhAccountCrawler()
-        case _:
-            raise RuntimeError("platform error")
-
-    # start process
-    crawler.deal()
-
-
-if __name__ == "__main__":
-    get_association_title_list_in_multi_threads()
-
-    # get each platform
-    platform_list = ["sph", "hksp", "toutiao", "gzh"]
-    for platform_id in platform_list:
-        deal_each_platform(platform=platform_id)

+ 0 - 13
account_quality_analysis.py

@@ -1,13 +0,0 @@
-from tasks.ai_tasks.account_recognize_by_llm import AccountRecognizer
-
-
-def main():
-    """
-    main function
-    """
-    account_recognizer = AccountRecognizer()
-    account_recognizer.deal()
-
-
-if __name__ == "__main__":
-    main()

+ 1 - 0
applications/api/__init__.py

@@ -9,4 +9,5 @@ from .nlp_api import similarity_between_title_list
 from .gewe_api import WechatChannelAPI
 from .google_ai_api import GoogleAIAPI
 from .piaoquan_api import fetch_piaoquan_video_list_detail
+from .piaoquan_api import fetch_piaoquan_account_video_list
 from .feishu_api import FeishuBotApi, FeishuSheetApi

+ 34 - 0
applications/api/piaoquan_api.py

@@ -30,3 +30,37 @@ def fetch_piaoquan_video_list_detail(video_id_list: List[int]) -> Optional[Dict]
     except json.JSONDecodeError as e:
         print(f"响应解析失败: {e}")
     return None
+
+
+@retry(**retry_desc)
+def fetch_piaoquan_account_video_list(account_id: str, page_id: int, page_size: int) -> Optional[Dict]:
+    """
+    获取票圈账号视频信息
+    :param: account_id: 账号id
+    :return: account videos
+    """
+    import requests
+
+    url = f"https://admin.piaoquantv.com/manager/video/page?uid={account_id}&pageNum={page_id}&pageSize={page_size}&categoryId=55&muid=7"
+    headers = {
+        'accept': 'application/json, text/plain, */*',
+        'accept-language': 'zh,zh-CN;q=0.9',
+        'priority': 'u=1, i',
+        'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"',
+        'sec-ch-ua-mobile': '?0',
+        'sec-ch-ua-platform': '"macOS"',
+        'sec-fetch-dest': 'empty',
+        'sec-fetch-mode': 'cors',
+        'sec-fetch-site': 'same-origin',
+        'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36',
+        'Cookie': 'SESSION=NGMyMzhkNzQtZmUzMS00ZTAwLTkzMTEtZTYwZThiN2JhNWE3'
+    }
+    try:
+        response = requests.get(url, headers=headers, timeout=60)
+        response.raise_for_status()
+        return response.json()
+    except RequestException as e:
+        print(f"API请求失败: {e}")
+    except json.JSONDecodeError as e:
+        print(f"响应解析失败: {e}")
+    return None

+ 29 - 0
applications/db/model.py

@@ -0,0 +1,29 @@
+from pydantic import BaseModel, Field
+from typing import Optional
+
+
+class CrawlerMetaArticle(BaseModel):
+    article_id: Optional[int] = Field(description="unique id")
+    platform: Optional[str] = Field(None, max_length=64, description="抓取平台")
+    mode: Optional[str] = Field(None, max_length=16, description="抓取模式")
+    category: Optional[str] = Field(None, max_length=32, description="品类,搜索类型")
+    out_account_id: Optional[str] = Field(None, max_length=256, description="外站账号id")
+    article_index: Optional[int] = Field(None, description="文章位置")
+    title: Optional[str] = Field(None, max_length=128, description="文章标题")
+    link: Optional[str] = Field(None, description="文章链接")
+    read_cnt: Optional[int] = Field(None, description="阅读量")
+    like_cnt: Optional[int] = Field(None, description="点赞量")
+    description: Optional[str] = Field(None, description="文章描述")
+    publish_time: Optional[int] = Field(None, description="文章发布时间")
+    crawler_time: Optional[int] = Field(None, description="文章抓取时间")
+    score: Optional[float] = Field(None, description="相关性分数")
+    status: Optional[int] = Field(None, description="状态, 1表示正常,2表示已经进入待发布层, 0表示相关性低文章")
+    channel_content_id: Optional[str] = Field(None, max_length=128, description="channel_content_id by pw")
+    unique_index: Optional[str] = Field(None, max_length=64, description="通过公众号信息生成的唯一字符id")
+    source_article_title: Optional[str] = Field(None, max_length=255, description="来源文章标题")
+    source_account: Optional[str] = Field(None, max_length=64, description="来源账号")
+    llm_sensitivity: Optional[int] = Field(None, description="大模型判断标题敏感性: 0-不敏感, 1-敏感")
+    title_sensitivity: int = Field(0, description="匹配敏感词库: 0敏感, 1:不敏感")
+    category_by_ai: Optional[str] = Field(None, max_length=32, description="通过大模型生成的品类")
+    category_status: int = Field(0, description="0: 初始,1:处理中 2: 成功 99:失败")
+    category_status_update_ts: Optional[int] = Field(None, description="品类状态修改时间戳")

+ 0 - 323
cal_account_read_rate_avg_daily.py

@@ -1,323 +0,0 @@
-"""
-@author: luojunhui
-cal each account && position reading rate
-"""
-import json
-from tqdm import tqdm
-from pandas import DataFrame
-from argparse import ArgumentParser
-from datetime import datetime
-from pymysql.cursors import DictCursor
-
-from applications import bot, Functions, log
-from applications import create_feishu_columns_sheet
-from applications.db import DatabaseConnector
-from applications.const import UpdateAccountReadRateTaskConst
-from applications.utils import fetch_publishing_account_list
-from applications.utils import fetch_account_fans
-from config import apolloConfig, long_articles_config, piaoquan_crawler_config, denet_config
-
-
-const = UpdateAccountReadRateTaskConst()
-config = apolloConfig()
-unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
-backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
-functions = Functions()
-read_rate_table = "long_articles_read_rate"
-
-
-def filter_outlier_data(group, key='show_view_count'):
-    """
-
-    :param group:
-    :param key:
-    :return:
-    """
-    mean = group[key].mean()
-    std = group[key].std()
-    # 过滤二倍标准差的数据
-    filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
-    # 过滤均值倍数大于5的数据
-    new_mean = filtered_group[key].mean()
-    # print("阅读均值", new_mean)
-    filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
-    return filtered_group
-
-
-def get_account_articles_detail(db_client, gh_id_tuple, min_publish_timestamp) -> list[dict]:
-    """
-    get articles details
-    :return:
-    """
-    sql = f"""
-            SELECT 
-                ghId, accountName, ItemIndex, show_view_count, publish_timestamp
-            FROM 
-                official_articles_v2
-            WHERE 
-                ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}' and publish_timestamp >= {min_publish_timestamp};
-            """
-    response_list = db_client.fetch(query=sql, cursor_type=DictCursor)
-    return response_list
-
-
-def cal_account_read_rate(article_list, fans_dict) -> DataFrame:
-    """
-    计算账号位置的阅读率
-    :return:
-    """
-    response = []
-    for line in article_list:
-        gh_id = line['ghId']
-        dt = functions.timestamp_to_str(timestamp=line['publish_timestamp'], string_format='%Y-%m-%d')
-        fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
-        if not fans:
-            fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
-        if not fans:
-            fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
-            log(
-                task='cal_read_rate_avg_task',
-                function='cal_account_read_rate',
-                message='未获取到粉丝,使用备份粉丝表',
-                data=line
-            )
-        line['fans'] = fans
-        if fans > const.MIN_FANS:
-            line['readRate'] = line['show_view_count'] / fans if fans else 0
-            response.append(line)
-    return DataFrame(response, columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate'])
-
-
-def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
-    """
-    计算账号的阅读率均值
-    :return:
-    """
-    max_time = functions.str_to_timestamp(date_string=dt)
-    min_time = max_time - const.STATISTICS_PERIOD
-
-    # 通过
-    filter_dataframe = df[
-        (df["ghId"] == gh_id)
-        & (min_time <= df["publish_timestamp"])
-        & (df["publish_timestamp"] <= max_time)
-        & (df['ItemIndex'] == index)
-        ]
-
-    # 用二倍标准差过滤
-    final_dataframe = filter_outlier_data(filter_dataframe)
-
-    return {
-        "read_rate_avg": final_dataframe['readRate'].mean(),
-        "max_publish_time": final_dataframe['publish_timestamp'].max(),
-        "min_publish_time": final_dataframe['publish_timestamp'].min(),
-        "records": len(final_dataframe)
-    }
-
-
-def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict:
-    """
-    检验某个具体账号的具体文章的阅读率均值和前段日子的比较
-    :param avg_rate: 当天计算出的阅读率均值
-    :param db_client: 数据库连接
-    :param gh_id: 账号 id
-    :param index: 账号  index
-    :param dt:
-    :return:
-    """
-
-    dt = int(dt.replace("-", ""))
-    select_sql = f"""
-        SELECT account_name, read_rate_avg
-        FROM {read_rate_table}
-        WHERE gh_id = '{gh_id}' and position = {index} and dt_version < {dt}
-        ORDER BY dt_version DESC limit 1;
-    """
-    result = db_client.fetch(select_sql)
-    if result:
-        account_name = result[0][0]
-        previous_read_rate_avg = result[0][1]
-        relative_value = (avg_rate - previous_read_rate_avg) / previous_read_rate_avg
-        if -const.RELATIVE_VALUE_THRESHOLD <= relative_value <= const.RELATIVE_VALUE_THRESHOLD:
-            return {}
-        else:
-            response = {
-                "account_name": account_name,
-                "position": index,
-                "read_rate_avg_yesterday": Functions().float_to_percentage(avg_rate),
-                "read_rate_avg_the_day_before_yesterday": Functions().float_to_percentage(previous_read_rate_avg),
-                "relative_change_rate": [
-                    {
-                        "text": Functions().float_to_percentage(relative_value),
-                        "color": "red" if relative_value < 0 else "green"
-                    }
-                ]
-            }
-            return response
-
-
-def update_single_day(dt, account_list, article_df, lam):
-    """
-    更新单天数据
-    :param article_df:
-    :param lam:
-    :param account_list:
-    :param dt:
-    :return:
-    """
-    error_list = []
-    insert_error_list = []
-    update_timestamp = functions.str_to_timestamp(date_string=dt)
-
-    # 因为计算均值的时候是第二天,所以需要把时间前移一天
-    avg_date = functions.timestamp_to_str(
-        timestamp=update_timestamp - const.ONE_DAY_IN_SECONDS,
-        string_format='%Y-%m-%d'
-    )
-
-    # processed_account_set
-    processed_account_set = set()
-
-    for account in tqdm(account_list, desc=dt):
-        for index in const.ARTICLE_INDEX_LIST:
-            read_rate_detail = cal_avg_account_read_rate(
-                df=article_df,
-                gh_id=account['gh_id'],
-                index=index,
-                dt=dt
-            )
-            read_rate_avg = read_rate_detail['read_rate_avg']
-            max_publish_time = read_rate_detail['max_publish_time']
-            min_publish_time = read_rate_detail['min_publish_time']
-            articles_count = read_rate_detail['records']
-            if articles_count:
-                processed_account_set.add(account['gh_id'])
-                # check read rate in position 1 and 2
-                if index in [1, 2]:
-                    error_obj = check_each_position(
-                        db_client=lam,
-                        gh_id=account['gh_id'],
-                        index=index,
-                        dt=dt,
-                        avg_rate=read_rate_avg
-                    )
-                    if error_obj:
-                        error_list.append(error_obj)
-                # insert into database
-                try:
-                    if not read_rate_avg:
-                        continue
-                    insert_sql = f"""
-                        INSERT INTO {read_rate_table}
-                        (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete)
-                        values
-                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
-                    """
-                    lam.save(
-                        query=insert_sql,
-                        params=(
-                            account['account_name'],
-                            account['gh_id'],
-                            index,
-                            read_rate_avg,
-                            "从 {} 开始往前计算 31  天".format(dt),
-                            articles_count,
-                            functions.timestamp_to_str(timestamp=min_publish_time, string_format='%Y-%m-%d'),
-                            functions.timestamp_to_str(timestamp=max_publish_time, string_format='%Y-%m-%d'),
-                            avg_date.replace("-", ""),
-                            0
-                        )
-                    )
-                except Exception as e:
-                    print(e)
-                    insert_error_list.append(str(e))
-
-    # bot sql error
-    if insert_error_list:
-        bot(
-            title="更新阅读率均值,存在sql 插入失败",
-            detail=insert_error_list
-        )
-
-    # bot outliers
-    if error_list:
-        columns = [
-            create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="account_name", display_name="账号名称"),
-            create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="position", display_name="文章位置"),
-            create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="read_rate_avg_yesterday",
-                                        display_name="昨日阅读率均值"),
-            create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="read_rate_avg_the_day_before_yesterday",
-                                        display_name="前天阅读率均值"),
-            create_feishu_columns_sheet(sheet_type="options", sheet_name="relative_change_rate",
-                                        display_name="相对变化率")
-        ]
-        bot(
-            title="阅读率均值表异常信息, 总共处理{}个账号".format(len(processed_account_set)),
-            detail={
-                "columns": columns,
-                "rows": error_list
-            },
-            table=True,
-            mention=False
-        )
-
-    # if no error, send success info
-    if not error_list and not insert_error_list:
-        bot(
-            title="阅读率均值表更新成功, 总共处理{}个账号".format(len(processed_account_set)),
-            detail={
-                "日期": dt
-            },
-            mention=False
-        )
-
-
-def main() -> None:
-    """
-    main function
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument("--run-date",
-                        help="Run only once for date in format of %Y-%m-%d. \
-                                    If no specified, run as daily jobs.")
-    args = parser.parse_args()
-    if args.run_date:
-        dt = args.run_date
-    else:
-        dt = datetime.today().strftime('%Y-%m-%d')
-
-    # init stat period
-    max_time = functions.str_to_timestamp(date_string=dt)
-    min_time = max_time - const.STATISTICS_PERIOD
-    min_stat_date = functions.timestamp_to_str(timestamp=min_time, string_format='%Y-%m-%d')
-
-    # init database connector
-    long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
-    long_articles_db_client.connect()
-
-    piaoquan_crawler_db_client = DatabaseConnector(db_config=piaoquan_crawler_config)
-    piaoquan_crawler_db_client.connect()
-
-    denet_db_client = DatabaseConnector(db_config=denet_config)
-    denet_db_client.connect()
-
-    # get account list
-    account_list = fetch_publishing_account_list(db_client=denet_db_client)
-
-    # get fans dict
-    fans_dict = fetch_account_fans(db_client=denet_db_client, start_date=min_stat_date)
-
-    # get data frame from official_articles_v2
-    gh_id_tuple = tuple([i['gh_id'] for i in account_list])
-    article_list = get_account_articles_detail(db_client=piaoquan_crawler_db_client, gh_id_tuple=gh_id_tuple, min_publish_timestamp=min_time)
-
-    # cal account read rate and make a dataframe
-    read_rate_dataframe = cal_account_read_rate(article_list, fans_dict)
-
-    # update each day's data
-    update_single_day(dt, account_list, read_rate_dataframe, long_articles_db_client)
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 142
checkVideoStatusDaily.py

@@ -1,142 +0,0 @@
-"""
-@author: luojunhui
-@description: 校验视频状态,若视频状态为不通过,则修改视频状态
-"""
-import traceback
-
-from tqdm import tqdm
-
-from applications import PQAPI, PQMySQL, bot
-
-
-class VideoStatusManager(object):
-    """
-    视频状态校验 and 修改
-    """
-    def __init__(self):
-        self.db_client = None
-        self.pq_api = None
-
-    def base_init(self):
-        """
-        初始化数据库连接和 pq 方法类
-        """
-        try:
-            self.db_client = PQMySQL()
-        except Exception as e:
-            error_msg = traceback.format_exc()
-            bot(
-                title="视频状态校验任务,每 20 分钟执行一次,数据库初始化异常",
-                detail={
-                    "error": str(e),
-                    "error_msg": error_msg
-                }
-            )
-            return False
-        try:
-            self.pq_api = PQAPI()
-        except Exception as e:
-            error_msg = traceback.format_exc()
-            bot(
-                title="视频状态校验任务,每 20 分钟执行一次,apollo连接异常",
-                detail={
-                    "error": str(e),
-                    "error_msg": error_msg
-                }
-            )
-            return False
-        return True
-
-    def get_video_list_status(self, videoList):
-        """
-        获取视频 list 的状态,并且返回状态为不通过的视频 list
-        :param videoList: 视频 id_list, 最长为 20
-        :return: bad video list
-        """
-        response = self.pq_api.getPQVideoListDetail(video_list=videoList)
-        detail_list = response.get('data', [])
-        if detail_list:
-            bad_video_list = [i for i in detail_list if i['auditStatus'] != 5]
-            bad_id_list = [i['id'] for i in bad_video_list]
-        else:
-            bad_id_list = []
-        return bad_id_list
-
-    def get_published_video_ids_daily(self):
-        """
-        获取每日发布的视频 id 的状态
-        :return:
-        publish_time > {today_time_stamp} and
-        """
-        select_sql = f"""
-        SELECT video_id
-        FROM get_off_videos
-        WHERE check_status = 0 and video_status = 1;
-        """
-        video_id_tuple = self.db_client.select(select_sql)
-        video_id_list = [i[0] for i in video_id_tuple]
-        return video_id_list
-
-    def update_check_status(self, vid_list):
-        """
-
-        :param vid_list:
-        :return:
-        """
-        sql = f"""
-        UPDATE get_off_videos
-        SET check_status = %s
-        where video_id in %s;
-        """
-        self.db_client.update(
-            sql=sql,
-            params=(1, tuple(vid_list))
-        )
-        print("更新 check_status 成功")
-
-    def deal(self):
-        """
-        Deal Function
-        :return:
-        """
-        def chunk_iterator(arr, chunk_size):
-            """生成器函数,将数组分组成指定大小的chunks"""
-            for i in range(0, len(arr), chunk_size):
-                yield arr[i:i + chunk_size]
-
-        video_id_list = self.get_published_video_ids_daily()
-        video_chunks = chunk_iterator(video_id_list, 10)
-
-        bad_count = 0
-        for video_temp in video_chunks:
-            bad_id_list = self.get_video_list_status(video_temp)
-            fail_list = []
-            if bad_id_list:
-                bad_count += len(bad_id_list)
-                for bad_id in tqdm(bad_id_list):
-                    response = self.pq_api.changeVideoStatus(bad_id)
-                    if not response:
-                        fail_list.append(bad_id)
-            if fail_list:
-                bot(
-                    title="修改视频状态失败",
-                    detail=fail_list
-                )
-            self.update_check_status(video_temp)
-        print("total", len(video_id_list))
-        print("bad_total", bad_count)
-
-
-def main():
-    """
-    task
-    :return:
-    """
-
-    VM = VideoStatusManager()
-    if VM.base_init():
-        VM.deal()
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 6
cold_start_publish_to_aigc.py

@@ -1,6 +0,0 @@
-from tasks.publish_tasks.cold_start_publish_daily import ColdStartPublishDailyTask
-
-
-if __name__ == '__main__':
-    cold_start_publish_daily_task = ColdStartPublishDailyTask()
-    cold_start_publish_daily_task.publish_articles_from_video_pool()

+ 0 - 9
crawler_sph_video.py

@@ -1,9 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-from tasks.crawler_tasks.crawler_video.crawler_sph_videos import CrawlerChannelAccountVideos
-
-if __name__ == "__main__":
-    crawler_channel_account_videos = CrawlerChannelAccountVideos()
-    crawler_channel_account_videos.deal()

+ 0 - 211
getOffVideosDaily.py

@@ -1,211 +0,0 @@
-"""
-@author: luojunhui
-@description: GetOffVideos Daily
-"""
-import time
-import datetime
-import traceback
-
-from tqdm import tqdm
-
-from applications import PQMySQL, PQAPI, log, bot
-
-EXPIRE_TIME = 3 * 24 * 60 * 60
-
-
-class AutoGetOffVideos(object):
-    """
-    自动下架视频
-    """
-
-    def __init__(self):
-        self.db_client = None
-        self.pq_api = None
-
-    def base_init(self):
-        """
-        初始化数据库和票圈公共方法
-        :return:
-        """
-        try:
-            self.db_client = PQMySQL()
-        except Exception as e:
-            error_msg = traceback.format_exc()
-            bot(
-                title="自动下架视频任务,数据库初始化失败",
-                detail={
-                    "error": str(e),
-                    "error_msg": error_msg
-                }
-            )
-            return False
-
-        try:
-            self.pq_api = PQAPI()
-        except Exception as e:
-            error_msg = traceback.format_exc()
-            bot(
-                title="自动下架视频任务,pq公共方法连接失败",
-                detail={
-                    "error": str(e),
-                    "error_msg": error_msg
-                }
-            )
-            return False
-        return True
-
-    def get_long_articles_videos(self, timestamp):
-        """
-        获取待下架的视频
-        :return:
-        """
-        select_sql = f"""
-            SELECT video_id
-            FROM get_off_videos
-            WHERE video_status = 1 and publish_time < {timestamp};
-        """
-        result = self.db_client.select(sql=select_sql)
-        log(
-            task="getOffVideosDaily",
-            function="get_long_articles_videos",
-            message="查找到视频 id_list,一共{}条视频".format(len(result))
-        )
-        return result
-
-    def update_video_id_status(self, video_id):
-        """
-        修改数据库内视频状态
-        :param video_id:
-        :return:
-        """
-        timestamp = int(time.time())
-        select_sql = f"""
-                UPDATE get_off_videos
-                SET video_status = 0, get_off_time = {timestamp}
-                WHERE video_id = %s;
-                """
-        try:
-            self.db_client.update(
-                sql=select_sql,
-                params=video_id
-            )
-            log(
-                task="getOffVideosDaily",
-                function="update_video_id_status",
-                message="成功修改视频状态",
-                data={"video_id": video_id}
-            )
-        except Exception as e:
-            log(
-                task="getOffVideosDaily",
-                function="update_video_id_status",
-                message="修改视频状态失败--- 推测 sql 问题,报错信息:{}".format(e),
-                status="fail",
-                data={"video_id": video_id}
-            )
-
-    def change_video_id_status(self, video_id):
-        """
-        修改视频id状态
-        :return:
-        """
-        response = self.pq_api.changeVideoStatus(video_id, 2)
-        if response:
-            self.update_video_id_status(video_id=video_id)
-        else:
-            log(
-                task="getOffVideosDaily",
-                function="change_video_id_status",
-                status="fail",
-                message="请求票圈修改状态异常: ---video_id = {}".format(video_id),
-            )
-            bot(
-                title="get_off_videos 下架视频异常",
-                detail={
-                    "video_id": video_id
-                },
-                mention=False
-            )
-
-    def get_off_job(self):
-        """
-        已经请求超过3天的视频全部下架
-        :return:
-        """
-        now_stamp = int(time.time())
-        three_days_before = now_stamp - EXPIRE_TIME
-        video_tuple = self.get_long_articles_videos(timestamp=three_days_before)
-        vid_list = [i[0] for i in video_tuple]
-        for video_id in tqdm(vid_list):
-            try:
-                self.change_video_id_status(video_id=video_id)
-            except Exception as e:
-                log(
-                    task="getOffVideosDaily",
-                    function="get_off_job",
-                    status="fail",
-                    message="get_off_job下架单个视频失败,video_id={}, 报错信息={}".format(video_id, e),
-                )
-
-    def check_job(self):
-        """
-        校验 3 天前发布的视频是否已经下架
-        :return:
-        """
-        three_days_ago = int(time.time()) - EXPIRE_TIME
-        sql = f"""
-            SELECT video_id
-            FROM get_off_videos
-            WHERE publish_time < {three_days_ago}
-            AND video_status = 1;
-        """
-        vid_tuple = self.db_client.select(sql)
-        if vid_tuple:
-            vid_list = [i[0] for i in vid_tuple]
-            for vid in vid_list:
-                try:
-                    self.change_video_id_status(video_id=vid)
-                except Exception as e:
-                    log(
-                        task="getOffVideosDaily",
-                        function="check_job",
-                        status="fail",
-                        message="task2下架单个视频失败,video_id={}, 报错信息={}".format(vid, e),
-                    )
-            time.sleep(10)
-            vid_tuple2 = self.db_client.select(sql)
-            if vid_tuple2:
-                vid_list2 = [i[0] for i in vid_tuple2]
-                bot(
-                    title="getOffVideosDaily_check_job",
-                    detail={
-                        "video_list": vid_list2
-                    }
-                )
-            else:
-                return
-        else:
-            return
-
-
-def main():
-    """
-    main function
-    :return:
-    """
-    auto_get_off_job = AutoGetOffVideos()
-    if auto_get_off_job.base_init():
-        auto_get_off_job.get_off_job()
-        time.sleep(60)
-        auto_get_off_job.check_job()
-        bot(
-            title="get_off_jobs任务执行完成通知",
-            mention=False,
-            detail={
-                "finish_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-            }
-        )
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 56
kimi_balance_monitor.py

@@ -1,56 +0,0 @@
-"""
-@author: luojunhui
-"""
-import requests
-import traceback
-
-from applications import bot
-from applications.decoratorApi import retryOnTimeout
-
-BALANCE_LIMIT_THRESHOLD = 200.0
-
-
-@retryOnTimeout(retries=5, delay=5)
-def check_kimi_balance():
-    """
-    校验kimi余额
-    :return:
-    """
-    url = "https://api.moonshot.cn/v1/users/me/balance"
-
-    payload = {}
-    headers = {
-        'Authorization': 'Bearer sk-5DqYCa88kche6nwIWjLE1p4oMm8nXrR9kQMKbBolNAWERu7q'
-    }
-    response = requests.request("GET", url, headers=headers, data=payload, timeout=10)
-    if response.status_code == 200:
-        response_json = response.json()
-        try:
-            balance = response_json['data']['available_balance']
-            if balance < BALANCE_LIMIT_THRESHOLD:
-                bot(
-                    title="kimi余额小于 {} 块".format(BALANCE_LIMIT_THRESHOLD),
-                    detail={
-                        "balance": balance
-                    }
-                )
-        except Exception as e:
-            error_stack = traceback.format_exc()
-            bot(
-                title="kimi余额接口处理失败,数据结构异常",
-                detail={
-                    "error": str(e),
-                    "error_msg": error_stack
-                }
-            )
-    else:
-        bot(
-            title="kimi余额接口调用失败",
-            detail={
-                "response": response.text
-            }
-        )
-
-
-if __name__ == '__main__':
-    check_kimi_balance()

+ 0 - 31
outside_server_accounts_monitor.py

@@ -1,31 +0,0 @@
-from argparse import ArgumentParser
-
-from tasks.monitor_tasks.outside_gzh_articles_monitor import OutsideGzhArticlesCollector
-from tasks.monitor_tasks.outside_gzh_articles_monitor import OutsideGzhArticlesMonitor
-
-
-if __name__ == "__main__":
-    parser = ArgumentParser()
-    parser.add_argument("--task", help="input monitor or collector")
-    args = parser.parse_args()
-    if args.task:
-        task = args.task
-        match task:
-            case "monitor":
-                monitor = OutsideGzhArticlesMonitor()
-                monitor.deal()
-            case "collector":
-                collector = OutsideGzhArticlesCollector()
-                collector.deal()
-            case _:
-                print("task is not support")
-    else:
-        # first collect data
-        collector = OutsideGzhArticlesCollector()
-        collector.deal()
-
-        # then monitor each article
-        monitor = OutsideGzhArticlesMonitor()
-        monitor.deal()
-
-

+ 0 - 113
published_articles_monitor.py

@@ -1,113 +0,0 @@
-"""
-监测已发布文章
-"""
-
-from datetime import datetime
-from argparse import ArgumentParser
-from concurrent.futures import ThreadPoolExecutor
-
-from tqdm import tqdm
-from applications import bot
-from applications import aiditApi
-from applications.db import DatabaseConnector
-from applications.const import updatePublishedMsgTaskConst
-from applications import WeixinSpider
-from config import piaoquan_crawler_config, long_articles_config
-
-const = updatePublishedMsgTaskConst()
-spider = WeixinSpider()
-
-
-def monitor_article(article):
-    """
-    校验单篇文章是否
-    """
-    gh_id, account_name, title, url, wx_sn, publish_date = article
-    try:
-        response = spider.get_article_text(url, is_cache=False)
-        response_code = response["code"]
-        if response_code == const.ARTICLE_ILLEGAL_CODE:
-            error_detail = response.get("msg")
-            insert_sql = f"""
-                INSERT IGNORE INTO illegal_articles 
-                (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
-                VALUES 
-                (%s, %s, %s, %s, %s, %s);
-                """
-            affected_rows = long_articles_db_client.save(
-                query=insert_sql,
-                params=(gh_id, account_name, title, wx_sn, publish_date, error_detail),
-            )
-            if affected_rows:
-                bot(
-                    title="文章违规告警",
-                    detail={
-                        "account_name": account_name,
-                        "gh_id": gh_id,
-                        "title": title,
-                        "wx_sn": wx_sn.decode("utf-8"),
-                        "publish_date": str(publish_date),
-                        "error_detail": error_detail,
-                    },
-                    mention=False
-                )
-                aiditApi.delete_articles(
-                    gh_id=gh_id,
-                    title=title
-                )
-    except Exception as e:
-        print(e)
-
-
-def get_article_list(run_date):
-    """
-    监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警
-    :return:
-    """
-    if not run_date:
-        run_date = datetime.today().strftime("%Y-%m-%d")
-
-    monitor_start_timestamp = (
-            int(datetime.strptime(run_date, "%Y-%m-%d").timestamp()) - const.MONITOR_PERIOD
-    )
-    select_sql = f"""
-        SELECT ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) AS publish_timestamp
-        FROM official_articles_v2
-        WHERE publish_timestamp >= {monitor_start_timestamp}
-        ORDER BY publish_timestamp DESC;
-    """
-    article_list = piaoquan_crawler_db_client.fetch(select_sql)
-    return article_list
-
-
-if __name__ == "__main__":
-    parser = ArgumentParser()
-
-    parser.add_argument(
-        "--run_date",
-        help="--run_date %Y-%m-%d",
-    )
-    args = parser.parse_args()
-    if args.run_date:
-        run_date = args.run_date
-    else:
-        run_date = None
-
-    piaoquan_crawler_db_client = DatabaseConnector(db_config=piaoquan_crawler_config)
-    piaoquan_crawler_db_client.connect()
-    long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
-    long_articles_db_client.connect()
-
-    # Number of concurrent threads
-    MAX_WORKERS = 4
-
-    article_list = get_article_list(run_date=None)
-
-    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
-        list(
-            tqdm(
-                executor.map(monitor_article, article_list),
-                total=len(article_list),
-                desc="Monitor Article List",
-            )
-        )

+ 0 - 8
run_article_title_exit_v1.py

@@ -1,8 +0,0 @@
-"""
-@author: luojunhui
-"""
-from tasks.flow_pool_tasks.exit_article_with_title import main
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 8
run_baidu_video_crawler.py

@@ -1,8 +0,0 @@
-"""
-@author: luojunhui
-"""
-from cold_start.crawler.baidu import BaiduVideoCrawler
-
-if __name__ == '__main__':
-    task = BaiduVideoCrawler()
-    task.deal()

+ 0 - 13
run_toutiao_recommend.py

@@ -1,13 +0,0 @@
-"""
-@author: luojunhui
-@description: 今日头条推荐流文章抓取任务
-"""
-from cold_start.crawler.toutiao_recommend_crawler import ToutiaoRecommendCrawler
-
-
-if __name__ == "__main__":
-    toutiao_recommend_crawler = ToutiaoRecommendCrawler()
-    toutiao_recommend_crawler.init_database()
-    category_list = ['finance', 'tech', 'history', 'entertainment']
-    for category in category_list:
-        toutiao_recommend_crawler.run(category=category)

+ 0 - 9
run_update_articles_from_aigc_system.py

@@ -1,9 +0,0 @@
-"""
-@author: luojunhui
-"""
-from tasks.update_article_info_from_aigc import UpdateArticleInfoFromAIGC
-
-
-if __name__ == "__main__":
-    update_article_info_from_aigc = UpdateArticleInfoFromAIGC()
-    update_article_info_from_aigc.deal()

+ 0 - 72
run_video_account_crawler.py

@@ -1,72 +0,0 @@
-"""
-@author: luojunhui
-执行视频&&账号的抓取
-"""
-import traceback
-
-from datetime import datetime
-from argparse import ArgumentParser
-
-from applications import bot
-from cold_start.crawler import WeixinAccountCrawler, WeixinVideoCrawler
-
-account_crawler = WeixinAccountCrawler()
-video_crawler = WeixinVideoCrawler()
-
-
-def main():
-    """
-    主函数
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument("--run-date",
-                        help="Run only once for date in format of %Y%m%d. \
-                            If no specified, run as daily jobs.")
-    args = parser.parse_args()
-
-    if args.run_date:
-        run_date = datetime.strptime(args.run_date, "%Y-%m-%d")
-        print("Run in manual mode. Date: {}".format(args.run_date))
-    else:
-        run_date = datetime.today()
-    # # 先执行账号抓取
-    # try:
-    #     account_crawler.run(run_date)
-    # except Exception as e:
-    #     error_msg = traceback.format_exc()
-    #     bot(
-    #         title='账号抓取v1执行失败',
-    #         detail={
-    #             "error": str(e),
-    #             "traceback": error_msg
-    #         }
-    #     )
-    # 再执行文章抓取
-    try:
-        video_crawler.run()
-    except Exception as e:
-        error_msg = traceback.format_exc()
-        bot(
-            title='视频抓取执行失败',
-            detail={
-                "error": str(e),
-                "traceback": error_msg
-            }
-        )
-    # 再执行账号抓取v2
-    try:
-        account_crawler.run_v2()
-    except Exception as e:
-        error_msg = traceback.format_exc()
-        bot(
-            title='账号抓取V2执行失败',
-            detail={
-                "error": str(e),
-                "traceback": error_msg
-            }
-        )
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 34
run_video_publish_and_audit.py

@@ -1,34 +0,0 @@
-"""
-@author: luojunhui
-"""
-from argparse import ArgumentParser
-
-from cold_start.publish import PublishVideosForAudit
-
-pub = PublishVideosForAudit()
-
-
-def main():
-    """
-    主函数
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument("--run_task", type=str, help="run task, input publish or check")
-    args = parser.parse_args()
-
-    if args.run_task:
-        task = args.run_task
-        if task == "publish":
-            pub.publish_job()
-        elif task == "check":
-            pub.check_job()
-        else:
-            print("run_task input ERROR,please input publish or check")
-    else:
-        pub.publish_job()
-        pub.check_job()
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 57
run_video_understanding_with_google.py

@@ -1,57 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-import datetime
-import multiprocessing
-
-from applications import log
-from cold_start.ai_pipeline import ExtractVideoBestFrame
-
-PROCESS_EXIT_TIMEOUT = 10 * 60
-
-
-def start_task():
-    task = ExtractVideoBestFrame()
-
-    # 查询有多少任务正在处理中
-    processing_tasks = task.get_processing_task_pool_size()
-
-    if processing_tasks:
-        print(
-            f"{datetime.datetime.now()} 当前有 {processing_tasks} 个任务正在等待 google 处理..."
-        )
-        task.extract_best_frame_with_gemini_ai()
-    else:
-        print(f"{datetime.datetime.now()} 没有任务正在处理中...")
-        # upload video to google ai
-        task.upload_video_to_gemini_ai()
-        log(
-            task="video_understanding_with_google",
-            function="main",
-            message="upload_video_to_google_ai_task",
-        )
-        task.extract_best_frame_with_gemini_ai()
-
-    # 调用接口,使用 ffmpeg 获取视频的最佳帧作为封面
-    task.get_cover_with_best_frame()
-
-
-def main():
-    # create sub process
-    process = multiprocessing.Process(target=start_task)
-    process.start()
-
-    # wait for sub process to finish
-    process.join(PROCESS_EXIT_TIMEOUT)
-
-    if process.is_alive():
-        print(
-            f"Process {process.pid} did not finish within {PROCESS_EXIT_TIMEOUT} seconds. Terminating..."
-        )
-        process.terminate()
-        process.join()
-
-
-if __name__ == "__main__":
-    main()

+ 51 - 19
schedule_app.py

@@ -1,26 +1,58 @@
-# from celery import Celery
-from tasks.crawler_tasks.crawler_video.crawler_piaoquan_videos import CrawlerPiaoQuanVideos
-from tasks.crawler_tasks.crawler_video.crawler_sohu_videos import CrawlerSohuHotVideos
-from tasks.crawler_tasks.crawler_video.crawler_sohu_videos import CrawlerSohuRecommendVideos
+from tqdm import tqdm
 
+from pymysql.cursors import DictCursor
+from applications.api import fetch_piaoquan_video_list_detail
+from applications.db import DatabaseConnector
+from config import denet_config, long_articles_config
 
-# app = Celery('tasks', broker='redis://localhost:6379/0')
+denet_client = DatabaseConnector(denet_config)
+denet_client.connect()
 
-# @app.task
-def run_piaoquan_video_crawler():
-    crawler = CrawlerPiaoQuanVideos()
-    crawler.deal()
+lam_client = DatabaseConnector(long_articles_config)
+lam_client.connect()
 
-def run_sohu_video_crawler():
-    # step1, crawl sohu hot videos
-    crawler_sohu_hot_videos = CrawlerSohuHotVideos()
-    crawler_sohu_hot_videos.deal()
+# step 1, get publish_content_id var publish_content_plan
 
-    # step2, crawl sohu recommend videos
-    crawler_sohu_recommend_videos = CrawlerSohuRecommendVideos()
-    crawler_sohu_recommend_videos.deal()
+sql = f"""
+    select id, crawler_channel_content_id, source_id, publish_stage_id, t2.channel_content_id
+    from publish_content t1
+    join produce_plan_exe_record t2 on t1.source_id = t2.plan_exe_id
+    where t1.plan_id = '20250520112612679208238' and t1.status = 2;
+"""
+success_task_list = denet_client.fetch(sql, cursor_type=DictCursor)
 
-if __name__ == "__main__":
-    run_piaoquan_video_crawler()
-    run_sohu_video_crawler()
+ori_publish_id_list = []
+ori_publish_content_id_publish_stage_id_dict = {}
+for task in tqdm(success_task_list):
+    # step2, get channel_channel_content_id var source_id
+    channel_content_id = task['channel_content_id'].replace(task['crawler_channel_content_id'], '').replace('#', '')
+    # step3, get ori_source_id as content_id
+    video_id = task['publish_stage_id']
+    if video_id:
+        # continue
+        try:
+            pq_video_detail = fetch_piaoquan_video_list_detail([video_id])
+            video_oss_path = pq_video_detail['data'][0]['ossVideoPath']
+            title = pq_video_detail['data'][0]['title']
+            update_query = f"""
+                update ai_video_ab_test
+                set title = %s, pq_video_id = %s, video_oss_path = %s, status = %s
+                where new_channel_content_id = %s and status = %s;
+            """
+            affected_rows = lam_client.save(update_query, (title, video_id, video_oss_path, 1, channel_content_id, 0))
+            print(affected_rows)
+        except Exception as e:
+            print(e)
+            print(video_id)
+    else:
+        print(task)
 
+
+#
+# L = {}
+# for task in response:
+#     publish_content_id = task['id']
+#     source_id = task['source_id']
+#     L[source_id] = ori_publish_content_id_publish_stage_id_dict[publish_content_id]
+#
+# print(json.dumps(L, indent=4, ensure_ascii=False))

+ 25 - 1
tasks/crawler_tasks/crawler_video/crawler_piaoquan_videos.py

@@ -10,6 +10,7 @@ from tqdm import tqdm
 from applications import log
 from applications.api import ApolloApi
 from applications.api import fetch_piaoquan_video_list_detail
+from applications.api import fetch_piaoquan_account_video_list
 from applications.const.crawler_video_const import CrawlerPiaoQuanVideosConst
 from applications.db import DatabaseConnector
 from applications.pipeline import scrape_video_entities_process
@@ -29,6 +30,8 @@ class CrawlerPiaoQuanVideos:
         self.db_client = DatabaseConnector(long_articles_config)
         self.db_client.connect()
 
+
+class CrawlerPiaoQuanTopVideos(CrawlerPiaoQuanVideos):
     def get_piaoquan_top_video_list(self) -> list[dict]:
         fetch_query = f"""
             select id, video_id, title, category
@@ -174,4 +177,25 @@ class CrawlerPiaoQuanVideos:
                         "error": str(e),
                         "traceback": traceback.format_exc(),
                     }
-                )
+                )
+
+
+class CrawlerPiaoQuanAccountVideos(CrawlerPiaoQuanVideos):
+
+    def get_piaoquan_account_video_list(self) -> list[dict]:
+        account_id = "81584998"
+        has_next_page = True
+        page_id = 1
+        page_size = 10
+        while has_next_page:
+            response = fetch_piaoquan_account_video_list(
+                account_id=account_id,
+                page_id=page_id,
+                page_size=page_size
+            )
+            video_list = response["content"]["objs"]
+            self.insert_video_list(video_list)
+
+    def insert_video_list(self, video_list: list[dict]) -> None:
+        pass
+

+ 0 - 21
title_process_task.py

@@ -1,21 +0,0 @@
-"""
-@author: luojunhui
-"""
-from tasks.ai_tasks.category_generation_task import ArticlePoolCategoryGenerationTask
-from tasks.ai_tasks.category_generation_task import VideoPoolCategoryGenerationTask
-from tasks.ai_tasks.title_rewrite_task import TitleRewriteTask
-
-
-if __name__ == '__main__':
-    # 1. 标题重写
-    title_rewrite_task = TitleRewriteTask()
-    title_rewrite_task.deal()
-
-    # 2. 视频内容池标题分类
-    video_pool_category_generation_task = VideoPoolCategoryGenerationTask()
-    video_pool_category_generation_task.deal()
-
-    # 3. 文章内容池标题分类
-    article_pool_category_generation_task = ArticlePoolCategoryGenerationTask()
-    article_pool_category_generation_task.deal()
-

+ 0 - 36
title_similarity_score_task.py

@@ -1,36 +0,0 @@
-"""
-@author: luojunhui
-"""
-import traceback
-from applications import bot
-from cold_start.filter.title_similarity_task import ColdStartTitleSimilarityTask
-
-
-if __name__ == '__main__':
-    batch_size = 3000
-    task = ColdStartTitleSimilarityTask()
-    task.init_database()
-    # process video
-    try:
-        task.run(meta_source="video")
-    except Exception as e:
-        bot(
-            title="视频冷启池nlp任务异常",
-            mention=False,
-            detail={
-                "traceback": traceback.format_exc(),
-                "error": f"{e}"
-            }
-        )
-    # process article
-    try:
-        task.run(meta_source="article")
-    except Exception as e:
-        bot(
-            title="文章冷启池nlp任务异常",
-            mention=False,
-            detail={
-                "traceback": traceback.format_exc(),
-                "error": f"{e}"
-            }
-        )

+ 0 - 10
toutiao_video_crawler.py

@@ -1,10 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-from tasks.crawler_tasks.crawler_video.crawler_toutiao_videos import CrawlerToutiaoAccountVideos
-
-
-if __name__ == '__main__':
-    crawler = CrawlerToutiaoAccountVideos()
-    crawler.deal()

+ 0 - 33
updateAccountV3.py

@@ -1,33 +0,0 @@
-import time
-
-from datetime import datetime, timedelta
-from argparse import ArgumentParser
-
-from tasks.data_tasks.account_position_read_avg_task import AccountPositionReadAvgTask
-
-
-def main():
-    """
-    main job
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument(
-        "--run-date",
-        help="Run only once for date in format of %Y-%m-%d. \
-                                If no specified, run as daily jobs.",
-    )
-    args = parser.parse_args()
-    update_account_read_avg_task = AccountPositionReadAvgTask()
-    if args.run_date:
-        update_account_read_avg_task.do_task_list(dt=args.run_date)
-    else:
-        dt_object = datetime.fromtimestamp(int(time.time()))
-        one_day = timedelta(days=1)
-        yesterday = dt_object - one_day
-        yesterday_str = yesterday.strftime("%Y-%m-%d")
-        update_account_read_avg_task.do_task_list(dt=yesterday_str)
-
-
-if __name__ == "__main__":
-    main()

+ 0 - 733
updatePublishedMsgDaily.py

@@ -1,733 +0,0 @@
-"""
-@author: luojunhui
-@description: update daily information into official articles v2
-"""
-import json
-import time
-import traceback
-import urllib.parse
-from argparse import ArgumentParser
-from datetime import datetime
-from typing import Dict, List, Tuple
-
-from pymysql.cursors import DictCursor
-from tqdm import tqdm
-
-from applications import aiditApi
-from applications import bot
-from applications import create_feishu_columns_sheet
-from applications import Functions
-from applications import log
-from applications import WeixinSpider
-from applications.const import updatePublishedMsgTaskConst
-from applications.db import DatabaseConnector
-from config import denet_config, long_articles_config, piaoquan_crawler_config
-
-ARTICLE_TABLE = "official_articles_v2"
-const = updatePublishedMsgTaskConst()
-spider = WeixinSpider()
-functions = Functions()
-
-
-def generate_bot_columns():
-    """
-    生成列
-    :return:
-    """
-    columns = [
-        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="name", display_name="公众号名称"),
-        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="ghId", display_name="ghId"),
-        create_feishu_columns_sheet(sheet_type="number", sheet_name="follower_count", display_name="粉丝数"),
-        create_feishu_columns_sheet(sheet_type="date", sheet_name="account_init_timestamp",
-                                    display_name="账号接入系统时间"),
-        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="using_status", display_name="利用状态")
-    ]
-    return columns
-
-
-def get_account_status(db_client: DatabaseConnector) -> Dict:
-    """
-    获取账号的实验状态
-    :return:
-    """
-    sql = f"""  
-            SELECT t1.account_id, t2.status
-            FROM wx_statistics_group_source_account t1
-            JOIN wx_statistics_group_source t2
-            ON t1.group_source_name = t2.account_source_name;
-            """
-    account_status_list = db_client.fetch(sql, cursor_type=DictCursor)
-    account_status_dict = {account['account_id']: account['status'] for account in account_status_list}
-    return account_status_dict
-
-
-def get_accounts(db_client: DatabaseConnector) -> List[Dict]:
-    """
-    从 aigc 数据库中获取目前处于发布状态的账号
-    :return:
-    "name": line[0],
-    "ghId": line[1],
-    "follower_count": line[2],
-    "account_init_time": int(line[3] / 1000),
-    "account_type": line[4], # 订阅号 or 服务号
-    "account_auth": line[5]
-    """
-    illegal_accounts = [
-            'gh_4c058673c07e',
-            'gh_de9f9ebc976b',
-            'gh_7b4a5f86d68c',
-            'gh_f902cea89e48',
-            'gh_789a40fe7935',
-            'gh_cd041ed721e6',
-            'gh_62d7f423f382',
-            'gh_043223059726',
-            'gh_5bb79339a1f4'
-    ]
-    account_list_with_out_using_status = aiditApi.get_publish_account_from_aigc()
-    account_status_dict = get_account_status(db_client)
-    account_list = [
-        {
-            **item,
-            'using_status': 0 if account_status_dict.get(item['account_id']) == '实验' else 1
-        }
-        for item in account_list_with_out_using_status
-    ]
-    account_list = [account for account in account_list if account['ghId'] not in illegal_accounts]
-    return account_list
-
-
-def insert_each_msg(db_client: DatabaseConnector, account_info: Dict, msg_list: List[Dict]) -> None:
-    """
-    把消息数据更新到数据库中
-    :param account_info:
-    :param db_client:
-    :param msg_list:
-    :return:
-    """
-    gh_id = account_info['ghId']
-    account_name = account_info['name']
-    for info in msg_list:
-        baseInfo = info.get("BaseInfo", {})
-        appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
-        createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
-        updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
-        Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
-        detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
-        if detail_article_list:
-            for article in detail_article_list:
-                title = article.get("Title", None)
-                Digest = article.get("Digest", None)
-                ItemIndex = article.get("ItemIndex", None)
-                ContentUrl = article.get("ContentUrl", None)
-                SourceUrl = article.get("SourceUrl", None)
-                CoverImgUrl = article.get("CoverImgUrl", None)
-                CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
-                CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
-                ItemShowType = article.get("ItemShowType", None)
-                IsOriginal = article.get("IsOriginal", None)
-                ShowDesc = article.get("ShowDesc", None)
-                show_stat = functions.show_desc_to_sta(ShowDesc)
-                ori_content = article.get("ori_content", None)
-                show_view_count = show_stat.get("show_view_count", 0)
-                show_like_count = show_stat.get("show_like_count", 0)
-                show_zs_count = show_stat.get("show_zs_count", 0)
-                show_pay_count = show_stat.get("show_pay_count", 0)
-                wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
-                status = account_info['using_status']
-                info_tuple = (
-                    gh_id,
-                    account_name,
-                    appMsgId,
-                    title,
-                    Type,
-                    createTime,
-                    updateTime,
-                    Digest,
-                    ItemIndex,
-                    ContentUrl,
-                    SourceUrl,
-                    CoverImgUrl,
-                    CoverImgUrl_1_1,
-                    CoverImgUrl_235_1,
-                    ItemShowType,
-                    IsOriginal,
-                    ShowDesc,
-                    ori_content,
-                    show_view_count,
-                    show_like_count,
-                    show_zs_count,
-                    show_pay_count,
-                    wx_sn,
-                    json.dumps(baseInfo, ensure_ascii=False),
-                    functions.str_to_md5(title),
-                    status
-                )
-                try:
-                    insert_sql = f"""
-                        INSERT INTO {ARTICLE_TABLE}
-                        (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
-                        values
-                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
-                        """
-                    db_client.save(query=insert_sql, params=info_tuple)
-                    log(
-                        task="updatePublishedMsgDaily",
-                        function="insert_each_msg",
-                        message="插入文章数据成功",
-                        data={
-                            "info": info_tuple
-                        }
-
-                    )
-                except Exception as e:
-                    try:
-                        update_sql = f"""
-                        UPDATE {ARTICLE_TABLE}
-                        SET show_view_count = %s, show_like_count=%s
-                        WHERE wx_sn = %s;
-                        """
-                        db_client.save(query=update_sql,
-                                       params=(show_view_count, show_like_count, wx_sn))
-                        log(
-                            task="updatePublishedMsgDaily",
-                            function="insert_each_msg",
-                            message="更新文章数据成功",
-                            data={
-                                "wxSn": wx_sn,
-                                "likeCount": show_like_count,
-                                "viewCount": show_view_count
-                            }
-
-                        )
-                    except Exception as e:
-                        log(
-                            task="updatePublishedMsgDaily",
-                            function="insert_each_msg",
-                            message="更新文章失败, 报错原因是: {}".format(e),
-                            status="fail"
-                        )
-                        continue
-
-
-def update_each_account(db_client: DatabaseConnector, account_info: Dict, latest_update_time: int, cursor=None):
-    """
-    更新每一个账号信息
-    :param account_info:
-    :param cursor:
-    :param latest_update_time: 最新更新时间
-    :param db_client: 数据库连接信息
-    :return: None
-    """
-    gh_id = account_info['ghId']
-    response = spider.update_msg_list(ghId=gh_id, index=cursor)
-    msg_list = response.get("data", {}).get("data", [])
-    if msg_list:
-        # do
-        last_article_in_this_msg = msg_list[-1]
-        last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
-        # last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
-        # resdata = spider.get_account_by_url(last_url)
-        # check_id = resdata['data'].get('data', {}).get('wx_gh')
-        # if check_id == gh_id:
-        insert_each_msg(
-            db_client=db_client,
-            account_info=account_info,
-            msg_list=msg_list
-        )
-        if last_time_stamp_in_this_msg > latest_update_time:
-            next_cursor = response['data']['next_cursor']
-            return update_each_account(
-                db_client=db_client,
-                account_info=account_info,
-                latest_update_time=latest_update_time,
-                cursor=next_cursor
-            )
-        log(
-            task="updatePublishedMsgDaily",
-            function="update_each_account",
-            message="账号文章更新成功",
-            data=response
-        )
-        return None
-    else:
-        log(
-            task="updatePublishedMsgDaily",
-            function="update_each_account",
-            message="账号文章更新失败",
-            status="fail",
-            data=response
-        )
-        return None
-
-
-def check_account_info(db_client: DatabaseConnector, gh_id: str) -> int:
-    """
-    通过 gh_id查询账号信息的最新发布时间
-    :param db_client:
-    :param gh_id:
-    :return:
-    """
-    sql = f"""
-        SELECT MAX(publish_timestamp)
-        FROM {ARTICLE_TABLE}
-        WHERE ghId = '{gh_id}';
-        """
-    result = db_client.fetch(sql)
-    if result:
-        return result[0][0]
-    else:
-        # 新号,抓取周期定位抓取时刻往前推30天
-        return int(time.time()) - const.NEW_ACCOUNT_CRAWL_PERIOD
-
-
-def update_single_account(db_client: DatabaseConnector, account_info: Dict):
-    """
-    更新单个账号
-    :param db_client:
-    :param account_info:
-    :return:
-    """
-    gh_id = account_info['ghId']
-    max_publish_time = check_account_info(db_client, gh_id)
-    update_each_account(
-        db_client=db_client,
-        account_info=account_info,
-        latest_update_time=max_publish_time
-    )
-
-
-def check_single_account(db_client: DatabaseConnector, account_item: Dict) -> bool:
-    """
-    校验每个账号是否更新
-    :param db_client:
-    :param account_item:
-    :return: True / False
-    """
-    gh_id = account_item['ghId']
-    account_type = account_item['account_type']
-    today_str = datetime.today().strftime("%Y-%m-%d")
-    today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
-    today_timestamp = today_date_time.timestamp()
-    sql = f"""
-            SELECT max(updateTime)
-            FROM {ARTICLE_TABLE}
-            WHERE ghId = '{gh_id}';
-            """
-    try:
-        latest_update_time = db_client.fetch(sql)[0][0]
-        # 判断该账号当天发布的文章是否被收集
-        if account_type in const.SUBSCRIBE_TYPE_SET:
-            if int(latest_update_time) > int(today_timestamp):
-                return True
-            else:
-                return False
-        else:
-            if int(latest_update_time) > int(today_timestamp) - 7 * 24 * 3600:
-                return True
-            else:
-                return False
-    except Exception as e:
-        print(e)
-        return False
-
-
-def get_articles(db_client: DatabaseConnector):
-    """
-
-    :return:
-    """
-    sql = f"""
-    SELECT ContentUrl, wx_sn 
-    FROM {ARTICLE_TABLE}
-    WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};"""
-    response = db_client.fetch(sql)
-    return response
-
-
-def update_publish_timestamp(db_client: DatabaseConnector, row: Tuple):
-    """
-    更新发布时间戳 && minigram 信息
-    :param db_client:
-    :param row:
-    :return:
-    """
-    url = row[0]
-    wx_sn = row[1]
-    try:
-        response = spider.get_article_text(url)
-        response_code = response['code']
-
-        if response_code == const.ARTICLE_DELETE_CODE:
-            publish_timestamp_s = const.DELETE_STATUS
-            root_source_id_list = []
-        elif response_code == const.ARTICLE_ILLEGAL_CODE:
-            publish_timestamp_s = const.ILLEGAL_STATUS
-            root_source_id_list = []
-        elif response_code == const.ARTICLE_SUCCESS_CODE:
-            data = response['data']['data']
-            publish_timestamp_ms = data['publish_timestamp']
-            publish_timestamp_s = int(publish_timestamp_ms / 1000)
-            mini_program = data.get('mini_program', [])
-            if mini_program:
-                root_source_id_list = [
-                    urllib.parse.parse_qs(
-                        urllib.parse.unquote(i['path'])
-                    )['rootSourceId'][0]
-                    for i in mini_program
-                ]
-            else:
-                root_source_id_list = []
-        else:
-            publish_timestamp_s = const.UNKNOWN_STATUS
-            root_source_id_list = []
-    except Exception as e:
-        publish_timestamp_s = const.REQUEST_FAIL_STATUS
-        root_source_id_list = None
-        error_msg = traceback.format_exc()
-        print(e, error_msg)
-
-    update_sql = f"""
-            UPDATE {ARTICLE_TABLE}
-            SET publish_timestamp = %s, root_source_id_list = %s
-            WHERE wx_sn = %s;
-        """
-    db_client.save(
-        query=update_sql,
-        params=(
-            publish_timestamp_s,
-            json.dumps(root_source_id_list, ensure_ascii=False),
-            wx_sn
-        ))
-    if publish_timestamp_s == const.REQUEST_FAIL_STATUS:
-        return row
-    else:
-        return None
-
-
-def get_article_detail_job(db_client: DatabaseConnector):
-    """
-    获取发布文章详情
-    :return:
-    """
-
-    article_tuple = get_articles(db_client)
-    for article in tqdm(article_tuple):
-        try:
-            update_publish_timestamp(db_client=db_client, row=article)
-        except Exception as e:
-            print(e)
-            error_msg = traceback.format_exc()
-            print(error_msg)
-    # check 一遍存在请求失败-1 && 0 的文章
-    process_failed_articles = get_articles(db_client)
-    fail_list = []
-    if process_failed_articles:
-        for article in tqdm(process_failed_articles):
-            try:
-                res = update_publish_timestamp(db_client=db_client, row=article)
-                fail_list.append({"wx_sn": res[1], "url": res[0]})
-            except Exception as e:
-                print(e)
-                error_msg = traceback.format_exc()
-                print(error_msg)
-
-    # 通过msgId 来修改publish_timestamp
-    update_sql = f"""
-        UPDATE {ARTICLE_TABLE} oav 
-        JOIN (
-            SELECT ghId, appMsgId, MAX(publish_timestamp) AS publish_timestamp 
-            FROM {ARTICLE_TABLE} 
-            WHERE publish_timestamp > %s 
-            GROUP BY ghId, appMsgId
-            ) vv 
-            ON oav.appMsgId = vv.appMsgId AND oav.ghId = vv.ghId
-        SET oav.publish_timestamp = vv.publish_timestamp
-        WHERE oav.publish_timestamp <= %s;
-    """
-    db_client.save(
-        query=update_sql,
-        params=(0, 0)
-    )
-
-    # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
-    update_sql_2 = f"""
-        UPDATE {ARTICLE_TABLE}
-        SET publish_timestamp = updateTime
-        WHERE publish_timestamp < %s;
-    """
-    db_client.save(
-        query=update_sql_2,
-        params=0
-    )
-    if fail_list:
-        bot(
-            title="更新文章任务,请求detail失败",
-            detail=fail_list
-        )
-
-
-def whether_title_unsafe(db_client: DatabaseConnector, title: str):
-    """
-    检查文章标题是否已经存在违规记录
-    :param db_client:
-    :param title:
-    :return:
-    """
-    title_md5 = functions.str_to_md5(title)
-    sql = f"""
-        SELECT title_md5
-        FROM article_unsafe_title
-        WHERE title_md5 = '{title_md5}';
-    """
-    res = db_client.fetch(sql)
-    if res:
-        return True
-    else:
-        return False
-
-
-def update_job(piaoquan_crawler_db_client, aigc_db_client):
-    """
-    更新任务
-    :return:
-    """
-    account_list = get_accounts(db_client=aigc_db_client)
-    # 订阅号
-    subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
-    success_count = 0
-    fail_count = 0
-    for sub_item in tqdm(subscription_accounts):
-        try:
-            update_single_account(piaoquan_crawler_db_client, sub_item)
-            success_count += 1
-            # time.sleep(5)
-        except Exception as e:
-            fail_count += 1
-            log(
-                task="updatePublishedMsgDaily",
-                function="update_job",
-                message="单个账号文章更新失败, 报错信息是: {}".format(e),
-                status="fail",
-                data={
-                    "account": sub_item,
-                    "error": str(e),
-                    "traceback": traceback.format_exc()
-                }
-            )
-    log(
-        task="updatePublishedMsgDaily",
-        function="update_job",
-        message="订阅号更新完成",
-        data={
-            "success": success_count,
-            "fail": fail_count
-        }
-    )
-
-    if fail_count / (success_count + fail_count) > const.SUBSCRIBE_FAIL_RATE_THRESHOLD:
-        bot(
-            title="订阅号超过 {}% 的账号更新失败".format(int(const.SUBSCRIBE_FAIL_RATE_THRESHOLD * 100)),
-            detail={
-                "success": success_count,
-                "fail": fail_count,
-                "failRate": fail_count / (success_count + fail_count)
-            }
-        )
-    bot(
-        title="更新每日发布文章任务完成通知",
-        detail={
-            "msg": "订阅号更新完成",
-            "finish_time": datetime.today().__str__()
-        },
-        mention=False
-    )
-    # 服务号
-    server_accounts = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
-    for sub_item in tqdm(server_accounts):
-        try:
-            update_single_account(piaoquan_crawler_db_client, sub_item)
-            time.sleep(5)
-        except Exception as e:
-            print(e)
-    bot(
-        title="更新每日发布文章任务完成通知",
-        detail={
-            "msg": "服务号更新完成",
-            "finish_time": datetime.today().__str__()
-        },
-        mention=False
-    )
-
-
-def check_job(piaoquan_crawler_db_client, aigc_db_client):
-    """
-    校验任务
-    :return:
-    """
-    account_list = get_accounts(db_client=aigc_db_client)
-    # 订阅号
-    subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
-    fail_list = []
-    # check and rework if fail
-    for sub_item in tqdm(subscription_accounts):
-        res = check_single_account(piaoquan_crawler_db_client, sub_item)
-        if not res:
-            try:
-                update_single_account(piaoquan_crawler_db_client, sub_item)
-            except Exception as e:
-                print(e)
-                print(sub_item)
-                # fail_list.append(sub_item)
-    # check whether success and bot if fails
-    for sub_item in tqdm(subscription_accounts):
-        res = check_single_account(piaoquan_crawler_db_client, sub_item)
-        if not res:
-            # 去掉三个不需要查看的字段
-            sub_item.pop('account_type', None)
-            sub_item.pop('account_auth', None)
-            sub_item.pop('account_id', None)
-            fail_list.append(sub_item)
-    if fail_list:
-        try:
-            bot(
-                title="更新当天发布文章,存在未更新的账号",
-                detail={
-                    "columns": generate_bot_columns(),
-                    "rows": fail_list
-                },
-                table=True
-            )
-        except Exception as e:
-            print("Timeout Error: {}".format(e))
-    else:
-        bot(
-            title="更新当天发布文章,所有账号均更新成功",
-            mention=False,
-            detail={
-                "msg": "校验任务完成",
-                "finish_time": datetime.today().__str__()
-            }
-        )
-
-
-def monitor(piaoquan_crawler_db_client, long_articles_db_client, run_date):
-    """
-    监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警
-    :return:
-    """
-    if not run_date:
-        run_date = datetime.today().strftime("%Y-%m-%d")
-
-    monitor_start_timestamp = int(datetime.strptime(run_date, "%Y-%m-%d").timestamp()) - const.MONITOR_PERIOD
-    select_sql = f"""
-        SELECT ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) AS publish_timestamp
-        FROM {ARTICLE_TABLE}
-        WHERE publish_timestamp >= {monitor_start_timestamp};
-    """
-    article_list = piaoquan_crawler_db_client.fetch(select_sql)
-    for article in tqdm(article_list, desc="monitor article list"):
-        gh_id = article[0]
-        account_name = article[1]
-        title = article[2]
-        # 判断标题是否存在违规记录
-        if whether_title_unsafe(long_articles_db_client, title):
-            continue
-        url = article[3]
-        wx_sn = article[4]
-        publish_date = article[5]
-        try:
-            response = spider.get_article_text(url, is_cache=False)
-            response_code = response['code']
-            if response_code == const.ARTICLE_ILLEGAL_CODE:
-                bot(
-                    title="文章违规告警",
-                    detail={
-                        "ghId": gh_id,
-                        "accountName": account_name,
-                        "title": title,
-                        "wx_sn": str(wx_sn),
-                        "publish_date": str(publish_date)
-                    },
-                    mention=False
-                )
-                aiditApi.delete_articles(
-                    gh_id=gh_id,
-                    title=title
-                )
-        except Exception as e:
-            error_msg = traceback.format_exc()
-            log(
-                task="monitor",
-                function="monitor",
-                message="请求文章详情失败",
-                data={
-                    "ghId": gh_id,
-                    "accountName": account_name,
-                    "title": title,
-                    "wx_sn": str(wx_sn),
-                    "error": str(e),
-                    "msg": error_msg
-                }
-            )
-
-
-def main():
-    """
-    main
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument(
-        "--run_task",
-        help="update: update_job, check: check_job, detail: get_article_detail_job, monitor: monitor")
-    parser.add_argument(
-        "--run_date",
-        help="--run_date %Y-%m-%d",
-    )
-    args = parser.parse_args()
-
-    # 初始化数据库连接
-    try:
-        piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
-        piaoquan_crawler_db_client.connect()
-        aigc_db_client = DatabaseConnector(denet_config)
-        aigc_db_client.connect()
-        long_articles_db_client = DatabaseConnector(long_articles_config)
-    except Exception as e:
-        error_msg = traceback.format_exc()
-        bot(
-            title="更新文章任务连接数据库失败",
-            detail={
-                "error": e,
-                "msg": error_msg
-            }
-        )
-        return
-
-    if args.run_task:
-        run_task = args.run_task
-        match run_task:
-            case "update":
-                update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
-                get_article_detail_job(db_client=piaoquan_crawler_db_client)
-            case "check":
-                check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
-            case "detail":
-                get_article_detail_job(db_client=piaoquan_crawler_db_client)
-            case "monitor":
-                if args.run_date:
-                    run_date = args.run_date
-                else:
-                    run_date = None
-                monitor(piaoquan_crawler_db_client=piaoquan_crawler_db_client,
-                        long_articles_db_client=long_articles_db_client, run_date=run_date)
-            case _:
-                print("No such task, input update: update_job, check: check_job, detail: get_article_detail_job")
-    else:
-        update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
-        check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
-        get_article_detail_job(db_client=piaoquan_crawler_db_client)
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 33
update_mini_info_v2.py

@@ -1,33 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-from argparse import ArgumentParser
-
-from tasks.data_tasks.update_published_articles_minigram_detail import UpdatePublishedArticlesMinigramDetail
-
-
-def main():
-    """
-    update mini program detail main
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument("--run-date",
-                        help="Run only once for date in format of %Y-%m-%d. \
-                            If no specified, run as daily jobs.")
-    args = parser.parse_args()
-
-    update_minigram_detail_task = UpdatePublishedArticlesMinigramDetail()
-    update_minigram_detail_task.init_database()
-
-    if args.run_date:
-        update_minigram_detail_task.update_published_articles_job(biz_date=args.run_date)
-        update_minigram_detail_task.update_mini_program_detail_job(biz_date=args.run_date)
-    else:
-        update_minigram_detail_task.update_published_articles_job()
-        update_minigram_detail_task.update_mini_program_detail_job()
-
-
-if __name__ == '__main__':
-    main()