Przeglądaj źródła

发小程序实验

luojunhui 2 miesięcy temu
rodzic
commit
341bd2935b

+ 0 - 44
account_association_task.py

@@ -1,44 +0,0 @@
-"""
-@author: luojunhui
-账号联想---账号搜索任务
-"""
-import traceback
-
-from argparse import ArgumentParser
-from datetime import datetime
-
-from applications import bot
-from cold_start.crawler.weixin_account_association_crawler import AccountAssociationCrawler
-
-
-def main():
-    """
-    账号联想---账号搜索任务
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument("--run-date",
-                        help="Run only once for date in format of %Y-%m-%d. \
-                                If no specified, run as daily jobs.")
-    args = parser.parse_args()
-
-    if args.run_date:
-        biz_date_str = args.run_date
-    else:
-        biz_date_str = datetime.today().strftime('%Y-%m-%d')
-    try:
-        biz_date = datetime.strptime(biz_date_str, "%Y-%m-%d")
-        account_association_crawler = AccountAssociationCrawler()
-        account_association_crawler.run_account_association(biz_date=biz_date)
-    except Exception as e:
-        bot(
-            title="账号联想任务失败",
-            detail={
-                "error": str(e),
-                "error_stack": traceback.format_exc()
-            }
-        )
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 147
account_cold_start_daily.py

@@ -1,147 +0,0 @@
-"""
-@author: luojunhui
-"""
-import datetime
-import traceback
-
-from argparse import ArgumentParser
-
-from applications import longArticlesMySQL, bot
-from tasks.crawler_tasks.crawler_articles import CrawlerDailyScrapeAccountArticles
-from tasks.crawler_tasks.crawler_articles import CrawlerAssociationAccountArticles
-from cold_start.publish.publishCategoryArticles import CategoryColdStartTask
-from cold_start.filter.title_similarity_task import ColdStartTitleSimilarityTask
-
-DEFAULT_METHOD_LIST = ['1030-手动挑号', 'account_association']
-
-
-def crawler_task(method_list, date_str):
-    """
-    :return:
-    """
-    # 初始化category抓取类
-    try:
-        daily_scrape_tasks = CrawlerDailyScrapeAccountArticles()
-        daily_scrape_tasks.deal(method_list=method_list)
-
-        association_scrape_tasks = CrawlerAssociationAccountArticles()
-        association_scrape_tasks.deal(date_str=date_str)
-
-        # 抓取完成之后,给抓取到的标题进行相似度打分
-        cold_start_title_similarity_task = ColdStartTitleSimilarityTask()
-        cold_start_title_similarity_task.init_database()
-        cold_start_title_similarity_task.run(meta_source='article')
-
-        bot(
-            title="账号冷启动任务,抓取完成",
-            detail={
-                "finish_time": datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S'),
-                "method": method_list
-            },
-            mention=False
-        )
-    except Exception as e:
-        bot(
-            title="账号抓取冷启动任务,抓取失败",
-            detail={
-                "error": str(e),
-                "error_msg": traceback.format_exc()
-            }
-        )
-
-
-class AccountColdStartDailyTask(object):
-    """
-    账号冷启动代码
-    """
-
-    def __init__(self):
-        """
-        """
-        self.db_client = None
-
-    def init_db(self):
-        """
-        初始化数据库
-        :return:
-        """
-        try:
-            self.db_client = longArticlesMySQL()
-            return True
-        except Exception as e:
-            bot(
-                title='账号抓取任务, 冷启动数据库连接失败',
-                detail={
-                    "error": str(e),
-                    "error_msg": traceback.format_exc()
-                }
-            )
-            return False
-
-    def publish_article_task(self, category_list, article_source):
-        """
-        将账号文章发布到aigc抓取计划,并且绑定生成计划
-        :param category_list:  文章品类
-        :param article_source: 文章来源(toutiao or weixin)
-        :return:
-        """
-        try:
-            weixin_category_publisher = CategoryColdStartTask(db_client=self.db_client)
-            weixin_category_publisher.do_job(
-                category_list=category_list,
-                article_source=article_source
-            )
-            bot(
-                title="账号冷启任务,发布完成",
-                detail={
-                    "finish_time": datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S'),
-                    "category": category_list
-                },
-                mention=False
-            )
-        except Exception as e:
-            bot(
-                title="账号发布冷启动任务,发布失败",
-                detail={
-                    "error": str(e),
-                    "error_msg": traceback.format_exc()
-                }
-            )
-
-
-def main(method_list=None, article_source=None):
-    """
-    main job, use crontab to do job daily
-    :return:
-    """
-    if not method_list:
-        method_list = DEFAULT_METHOD_LIST
-    if not article_source:
-        article_source = 'weixin'
-    task = AccountColdStartDailyTask()
-    if task.init_db():
-        task.publish_article_task(category_list=method_list, article_source=article_source)
-
-
-if __name__ == '__main__':
-    parser = ArgumentParser()
-    parser.add_argument("--run_date", help="--run_date format: %Y-%m-%d")
-    args = parser.parse_args()
-
-    if args.run_date:
-        run_date = args.run_date
-    else:
-        run_date = datetime.date.today().isoformat()
-
-    # 执行头条发布
-    main(
-        method_list=['history', 'tech', 'finance', 'entertainment'],
-        article_source='toutiao'
-    )
-    # 执行微信抓取发布
-    main()
-
-    # 执行抓取
-    crawler_task(
-        method_list=DEFAULT_METHOD_LIST, date_str=run_date)
-

+ 0 - 40
account_explore_task.py

@@ -1,40 +0,0 @@
-"""
-@author: luojunhui
-@description: try to get some more accounts
-"""
-
-from tasks.crawler_tasks.crawler_account.crawler_accounts_by_association import ChannelsAccountCrawler
-from tasks.crawler_tasks.crawler_account.crawler_accounts_by_association import ToutiaoAccountCrawler
-from tasks.crawler_tasks.crawler_account.crawler_accounts_by_association import HaoKanAccountCrawler
-from tasks.crawler_tasks.crawler_account.crawler_accounts_by_association import GzhAccountCrawler
-from tasks.ai_tasks.generate_search_keys import get_association_title_list_in_multi_threads
-
-
-def deal_each_platform(platform: str) -> None:
-    """
-    deal each platform
-    :param platform: str, channels or toutiao
-    """
-    match platform:
-        case "toutiao":
-            crawler = ToutiaoAccountCrawler()
-        case "sph":
-            crawler = ChannelsAccountCrawler()
-        case "hksp":
-            crawler = HaoKanAccountCrawler()
-        case "gzh":
-            crawler = GzhAccountCrawler()
-        case _:
-            raise RuntimeError("platform error")
-
-    # start process
-    crawler.deal()
-
-
-if __name__ == "__main__":
-    get_association_title_list_in_multi_threads()
-
-    # get each platform
-    platform_list = ["sph", "hksp", "toutiao", "gzh"]
-    for platform_id in platform_list:
-        deal_each_platform(platform=platform_id)

+ 0 - 17
account_quality_analysis.py

@@ -1,17 +0,0 @@
-from tasks.ai_tasks.account_recognize_by_llm import CandidateAccountCategoryRecognizer
-from tasks.ai_tasks.account_recognize_by_llm import CandidateAccountQualityScoreRecognizer
-
-
-def main():
-    """
-    main function
-    """
-    account_category_task = CandidateAccountCategoryRecognizer()
-    account_category_task.deal()
-
-    account_quality_task = CandidateAccountQualityScoreRecognizer()
-    account_quality_task.deal()
-
-
-if __name__ == "__main__":
-    main()

+ 86 - 0
applications/api/odps_api.py

@@ -0,0 +1,86 @@
+from typing import Optional
+
+import pandas as pd
+from odps import ODPS
+
+
+class ODPSApi:
+    """ODPS操作工具类,封装常用的ODPS操作"""
+
+    # 默认配置
+    DEFAULT_ACCESS_ID = "LTAIWYUujJAm7CbH"
+    DEFAULT_ACCESS_KEY = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+    DEFAULT_PROJECT = "loghubods"
+    DEFAULT_ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api"
+    DEFAULT_LOG_FILE = None
+
+    def __init__(
+        self,
+        access_id="LTAIWYUujJAm7CbH",
+        access_key="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P",
+        project="loghubods",
+        endpoint="http://service.cn.maxcompute.aliyun.com/api",
+    ):
+        """
+        初始化ODPS连接
+
+        参数:
+            access_id: ODPS访问ID
+            access_key: ODPS访问密钥
+            project: ODPS项目名
+            endpoint: ODPS服务地址
+            log_level: 日志级别,默认为INFO
+            log_file: 日志文件路径,默认为None(不写入文件)
+        """
+        # 使用默认值或用户提供的值
+        self.access_id = access_id
+        self.access_key = access_key
+        self.project = project
+        self.endpoint = endpoint
+
+        # 初始化ODPS连接
+        self.odps = None
+        self.connect()
+
+    def connect(self):
+        """建立ODPS连接"""
+        try:
+            self.odps = ODPS(
+                self.access_id,
+                self.access_key,
+                project=self.project,
+                endpoint=self.endpoint,
+            )
+            return True
+        except Exception as e:
+            return False
+
+    def execute_sql(self, sql, max_wait_time=3600, tunnel=True) -> Optional[pd.DataFrame]:
+        """
+        执行SQL查询并返回结果
+
+        参数:
+            sql: SQL查询语句
+            max_wait_time: 最大等待时间(秒)
+            tunnel: 是否使用Tunnel下载结果,默认为True
+
+        返回:
+            pandas DataFrame包含查询结果
+        """
+        if not self.odps:
+            return None
+
+        try:
+            with self.odps.execute_sql(sql).open_reader(tunnel=tunnel) as reader:
+                # 转换结果为DataFrame
+                records = []
+                for record in reader:
+                    records.append(dict(record))
+
+                if records:
+                    df = pd.DataFrame(records)
+                    return df
+                else:
+                    return pd.DataFrame()
+        except Exception as e:
+            return None

+ 0 - 323
cal_account_read_rate_avg_daily.py

@@ -1,323 +0,0 @@
-"""
-@author: luojunhui
-cal each account && position reading rate
-"""
-import json
-from tqdm import tqdm
-from pandas import DataFrame
-from argparse import ArgumentParser
-from datetime import datetime
-from pymysql.cursors import DictCursor
-
-from applications import bot, Functions, log
-from applications import create_feishu_columns_sheet
-from applications.db import DatabaseConnector
-from applications.const import UpdateAccountReadRateTaskConst
-from applications.utils import fetch_publishing_account_list
-from applications.utils import fetch_account_fans
-from config import apolloConfig, long_articles_config, piaoquan_crawler_config, denet_config
-
-
-const = UpdateAccountReadRateTaskConst()
-config = apolloConfig()
-unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
-backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
-functions = Functions()
-read_rate_table = "long_articles_read_rate"
-
-
-def filter_outlier_data(group, key='show_view_count'):
-    """
-
-    :param group:
-    :param key:
-    :return:
-    """
-    mean = group[key].mean()
-    std = group[key].std()
-    # 过滤二倍标准差的数据
-    filtered_group = group[(group[key] > mean - 2 * std) & (group[key] < mean + 2 * std)]
-    # 过滤均值倍数大于5的数据
-    new_mean = filtered_group[key].mean()
-    # print("阅读均值", new_mean)
-    filtered_group = filtered_group[filtered_group[key] < new_mean * 5]
-    return filtered_group
-
-
-def get_account_articles_detail(db_client, gh_id_tuple, min_publish_timestamp) -> list[dict]:
-    """
-    get articles details
-    :return:
-    """
-    sql = f"""
-            SELECT 
-                ghId, accountName, ItemIndex, show_view_count, publish_timestamp
-            FROM 
-                official_articles_v2
-            WHERE 
-                ghId IN {gh_id_tuple} and Type = '{const.BULK_PUBLISH_TYPE}' and publish_timestamp >= {min_publish_timestamp};
-            """
-    response_list = db_client.fetch(query=sql, cursor_type=DictCursor)
-    return response_list
-
-
-def cal_account_read_rate(article_list, fans_dict) -> DataFrame:
-    """
-    计算账号位置的阅读率
-    :return:
-    """
-    response = []
-    for line in article_list:
-        gh_id = line['ghId']
-        dt = functions.timestamp_to_str(timestamp=line['publish_timestamp'], string_format='%Y-%m-%d')
-        fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
-        if not fans:
-            fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
-        if not fans:
-            fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
-            log(
-                task='cal_read_rate_avg_task',
-                function='cal_account_read_rate',
-                message='未获取到粉丝,使用备份粉丝表',
-                data=line
-            )
-        line['fans'] = fans
-        if fans > const.MIN_FANS:
-            line['readRate'] = line['show_view_count'] / fans if fans else 0
-            response.append(line)
-    return DataFrame(response, columns=['ghId', 'accountName', 'ItemIndex', 'show_view_count', 'publish_timestamp', 'readRate'])
-
-
-def cal_avg_account_read_rate(df, gh_id, index, dt) -> dict:
-    """
-    计算账号的阅读率均值
-    :return:
-    """
-    max_time = functions.str_to_timestamp(date_string=dt)
-    min_time = max_time - const.STATISTICS_PERIOD
-
-    # 通过
-    filter_dataframe = df[
-        (df["ghId"] == gh_id)
-        & (min_time <= df["publish_timestamp"])
-        & (df["publish_timestamp"] <= max_time)
-        & (df['ItemIndex'] == index)
-        ]
-
-    # 用二倍标准差过滤
-    final_dataframe = filter_outlier_data(filter_dataframe)
-
-    return {
-        "read_rate_avg": final_dataframe['readRate'].mean(),
-        "max_publish_time": final_dataframe['publish_timestamp'].max(),
-        "min_publish_time": final_dataframe['publish_timestamp'].min(),
-        "records": len(final_dataframe)
-    }
-
-
-def check_each_position(db_client, gh_id, index, dt, avg_rate) -> dict:
-    """
-    检验某个具体账号的具体文章的阅读率均值和前段日子的比较
-    :param avg_rate: 当天计算出的阅读率均值
-    :param db_client: 数据库连接
-    :param gh_id: 账号 id
-    :param index: 账号  index
-    :param dt:
-    :return:
-    """
-
-    dt = int(dt.replace("-", ""))
-    select_sql = f"""
-        SELECT account_name, read_rate_avg
-        FROM {read_rate_table}
-        WHERE gh_id = '{gh_id}' and position = {index} and dt_version < {dt}
-        ORDER BY dt_version DESC limit 1;
-    """
-    result = db_client.fetch(select_sql)
-    if result:
-        account_name = result[0][0]
-        previous_read_rate_avg = result[0][1]
-        relative_value = (avg_rate - previous_read_rate_avg) / previous_read_rate_avg
-        if -const.RELATIVE_VALUE_THRESHOLD <= relative_value <= const.RELATIVE_VALUE_THRESHOLD:
-            return {}
-        else:
-            response = {
-                "account_name": account_name,
-                "position": index,
-                "read_rate_avg_yesterday": Functions().float_to_percentage(avg_rate),
-                "read_rate_avg_the_day_before_yesterday": Functions().float_to_percentage(previous_read_rate_avg),
-                "relative_change_rate": [
-                    {
-                        "text": Functions().float_to_percentage(relative_value),
-                        "color": "red" if relative_value < 0 else "green"
-                    }
-                ]
-            }
-            return response
-
-
-def update_single_day(dt, account_list, article_df, lam):
-    """
-    更新单天数据
-    :param article_df:
-    :param lam:
-    :param account_list:
-    :param dt:
-    :return:
-    """
-    error_list = []
-    insert_error_list = []
-    update_timestamp = functions.str_to_timestamp(date_string=dt)
-
-    # 因为计算均值的时候是第二天,所以需要把时间前移一天
-    avg_date = functions.timestamp_to_str(
-        timestamp=update_timestamp - const.ONE_DAY_IN_SECONDS,
-        string_format='%Y-%m-%d'
-    )
-
-    # processed_account_set
-    processed_account_set = set()
-
-    for account in tqdm(account_list, desc=dt):
-        for index in const.ARTICLE_INDEX_LIST:
-            read_rate_detail = cal_avg_account_read_rate(
-                df=article_df,
-                gh_id=account['gh_id'],
-                index=index,
-                dt=dt
-            )
-            read_rate_avg = read_rate_detail['read_rate_avg']
-            max_publish_time = read_rate_detail['max_publish_time']
-            min_publish_time = read_rate_detail['min_publish_time']
-            articles_count = read_rate_detail['records']
-            if articles_count:
-                processed_account_set.add(account['gh_id'])
-                # check read rate in position 1 and 2
-                if index in [1, 2]:
-                    error_obj = check_each_position(
-                        db_client=lam,
-                        gh_id=account['gh_id'],
-                        index=index,
-                        dt=dt,
-                        avg_rate=read_rate_avg
-                    )
-                    if error_obj:
-                        error_list.append(error_obj)
-                # insert into database
-                try:
-                    if not read_rate_avg:
-                        continue
-                    insert_sql = f"""
-                        INSERT INTO {read_rate_table}
-                        (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete)
-                        values
-                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
-                    """
-                    lam.save(
-                        query=insert_sql,
-                        params=(
-                            account['account_name'],
-                            account['gh_id'],
-                            index,
-                            read_rate_avg,
-                            "从 {} 开始往前计算 31  天".format(dt),
-                            articles_count,
-                            functions.timestamp_to_str(timestamp=min_publish_time, string_format='%Y-%m-%d'),
-                            functions.timestamp_to_str(timestamp=max_publish_time, string_format='%Y-%m-%d'),
-                            avg_date.replace("-", ""),
-                            0
-                        )
-                    )
-                except Exception as e:
-                    print(e)
-                    insert_error_list.append(str(e))
-
-    # bot sql error
-    if insert_error_list:
-        bot(
-            title="更新阅读率均值,存在sql 插入失败",
-            detail=insert_error_list
-        )
-
-    # bot outliers
-    if error_list:
-        columns = [
-            create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="account_name", display_name="账号名称"),
-            create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="position", display_name="文章位置"),
-            create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="read_rate_avg_yesterday",
-                                        display_name="昨日阅读率均值"),
-            create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="read_rate_avg_the_day_before_yesterday",
-                                        display_name="前天阅读率均值"),
-            create_feishu_columns_sheet(sheet_type="options", sheet_name="relative_change_rate",
-                                        display_name="相对变化率")
-        ]
-        bot(
-            title="阅读率均值表异常信息, 总共处理{}个账号".format(len(processed_account_set)),
-            detail={
-                "columns": columns,
-                "rows": error_list
-            },
-            table=True,
-            mention=False
-        )
-
-    # if no error, send success info
-    if not error_list and not insert_error_list:
-        bot(
-            title="阅读率均值表更新成功, 总共处理{}个账号".format(len(processed_account_set)),
-            detail={
-                "日期": dt
-            },
-            mention=False
-        )
-
-
-def main() -> None:
-    """
-    main function
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument("--run-date",
-                        help="Run only once for date in format of %Y-%m-%d. \
-                                    If no specified, run as daily jobs.")
-    args = parser.parse_args()
-    if args.run_date:
-        dt = args.run_date
-    else:
-        dt = datetime.today().strftime('%Y-%m-%d')
-
-    # init stat period
-    max_time = functions.str_to_timestamp(date_string=dt)
-    min_time = max_time - const.STATISTICS_PERIOD
-    min_stat_date = functions.timestamp_to_str(timestamp=min_time, string_format='%Y-%m-%d')
-
-    # init database connector
-    long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
-    long_articles_db_client.connect()
-
-    piaoquan_crawler_db_client = DatabaseConnector(db_config=piaoquan_crawler_config)
-    piaoquan_crawler_db_client.connect()
-
-    denet_db_client = DatabaseConnector(db_config=denet_config)
-    denet_db_client.connect()
-
-    # get account list
-    account_list = fetch_publishing_account_list(db_client=denet_db_client)
-
-    # get fans dict
-    fans_dict = fetch_account_fans(db_client=denet_db_client, start_date=min_stat_date)
-
-    # get data frame from official_articles_v2
-    gh_id_tuple = tuple([i['gh_id'] for i in account_list])
-    article_list = get_account_articles_detail(db_client=piaoquan_crawler_db_client, gh_id_tuple=gh_id_tuple, min_publish_timestamp=min_time)
-
-    # cal account read rate and make a dataframe
-    read_rate_dataframe = cal_account_read_rate(article_list, fans_dict)
-
-    # update each day's data
-    update_single_day(dt, account_list, read_rate_dataframe, long_articles_db_client)
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 142
checkVideoStatusDaily.py

@@ -1,142 +0,0 @@
-"""
-@author: luojunhui
-@description: 校验视频状态,若视频状态为不通过,则修改视频状态
-"""
-import traceback
-
-from tqdm import tqdm
-
-from applications import PQAPI, PQMySQL, bot
-
-
-class VideoStatusManager(object):
-    """
-    视频状态校验 and 修改
-    """
-    def __init__(self):
-        self.db_client = None
-        self.pq_api = None
-
-    def base_init(self):
-        """
-        初始化数据库连接和 pq 方法类
-        """
-        try:
-            self.db_client = PQMySQL()
-        except Exception as e:
-            error_msg = traceback.format_exc()
-            bot(
-                title="视频状态校验任务,每 20 分钟执行一次,数据库初始化异常",
-                detail={
-                    "error": str(e),
-                    "error_msg": error_msg
-                }
-            )
-            return False
-        try:
-            self.pq_api = PQAPI()
-        except Exception as e:
-            error_msg = traceback.format_exc()
-            bot(
-                title="视频状态校验任务,每 20 分钟执行一次,apollo连接异常",
-                detail={
-                    "error": str(e),
-                    "error_msg": error_msg
-                }
-            )
-            return False
-        return True
-
-    def get_video_list_status(self, videoList):
-        """
-        获取视频 list 的状态,并且返回状态为不通过的视频 list
-        :param videoList: 视频 id_list, 最长为 20
-        :return: bad video list
-        """
-        response = self.pq_api.getPQVideoListDetail(video_list=videoList)
-        detail_list = response.get('data', [])
-        if detail_list:
-            bad_video_list = [i for i in detail_list if i['auditStatus'] != 5]
-            bad_id_list = [i['id'] for i in bad_video_list]
-        else:
-            bad_id_list = []
-        return bad_id_list
-
-    def get_published_video_ids_daily(self):
-        """
-        获取每日发布的视频 id 的状态
-        :return:
-        publish_time > {today_time_stamp} and
-        """
-        select_sql = f"""
-        SELECT video_id
-        FROM get_off_videos
-        WHERE check_status = 0 and video_status = 1;
-        """
-        video_id_tuple = self.db_client.select(select_sql)
-        video_id_list = [i[0] for i in video_id_tuple]
-        return video_id_list
-
-    def update_check_status(self, vid_list):
-        """
-
-        :param vid_list:
-        :return:
-        """
-        sql = f"""
-        UPDATE get_off_videos
-        SET check_status = %s
-        where video_id in %s;
-        """
-        self.db_client.update(
-            sql=sql,
-            params=(1, tuple(vid_list))
-        )
-        print("更新 check_status 成功")
-
-    def deal(self):
-        """
-        Deal Function
-        :return:
-        """
-        def chunk_iterator(arr, chunk_size):
-            """生成器函数,将数组分组成指定大小的chunks"""
-            for i in range(0, len(arr), chunk_size):
-                yield arr[i:i + chunk_size]
-
-        video_id_list = self.get_published_video_ids_daily()
-        video_chunks = chunk_iterator(video_id_list, 10)
-
-        bad_count = 0
-        for video_temp in video_chunks:
-            bad_id_list = self.get_video_list_status(video_temp)
-            fail_list = []
-            if bad_id_list:
-                bad_count += len(bad_id_list)
-                for bad_id in tqdm(bad_id_list):
-                    response = self.pq_api.changeVideoStatus(bad_id)
-                    if not response:
-                        fail_list.append(bad_id)
-            if fail_list:
-                bot(
-                    title="修改视频状态失败",
-                    detail=fail_list
-                )
-            self.update_check_status(video_temp)
-        print("total", len(video_id_list))
-        print("bad_total", bad_count)
-
-
-def main():
-    """
-    task
-    :return:
-    """
-
-    VM = VideoStatusManager()
-    if VM.base_init():
-        VM.deal()
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 6
cold_start_publish_to_aigc.py

@@ -1,6 +0,0 @@
-from tasks.publish_tasks.cold_start_publish_daily import ColdStartPublishDailyTask
-
-
-if __name__ == '__main__':
-    cold_start_publish_daily_task = ColdStartPublishDailyTask()
-    cold_start_publish_daily_task.publish_articles_from_video_pool()

+ 11 - 0
config/__init__.py

@@ -49,6 +49,17 @@ class apolloConfig(object):
         return response
 
 
+# growth数据库连接
+growth_config = {
+    "host": "rm-bp17q95335a99272b.mysql.rds.aliyuncs.com",
+    "port": 3306,
+    "user": "crawler",
+    "password": "crawler123456@",
+    "db": "growth",
+    "charset": "utf8mb4"
+}
+
+
 # aigc后台数据库连接配置
 denet_config = {
     'host': 'rm-t4na9qj85v7790tf84o.mysql.singapore.rds.aliyuncs.com',

+ 0 - 9
crawler_sph_video.py

@@ -1,9 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-from tasks.crawler_tasks.crawler_video.crawler_sph_videos import CrawlerChannelAccountVideos
-
-if __name__ == "__main__":
-    crawler_channel_account_videos = CrawlerChannelAccountVideos()
-    crawler_channel_account_videos.deal()

+ 0 - 12
fwh_data_manager.py

@@ -1,12 +0,0 @@
-from tasks.data_tasks.fwh_data_recycle import FwhGroupPublishRecordManager
-from tasks.data_tasks.fwh_data_recycle import SaveFwhDataToDatabase
-
-
-if __name__ == '__main__':
-    # 1. 从 aigc 获取数据
-    fwh_group_publish_record_manager = FwhGroupPublishRecordManager()
-    fwh_group_publish_record_manager.deal()
-
-    # 2. 保存数据到数据库
-    save_fwh_data_to_database = SaveFwhDataToDatabase()
-    save_fwh_data_to_database.deal()

+ 0 - 211
getOffVideosDaily.py

@@ -1,211 +0,0 @@
-"""
-@author: luojunhui
-@description: GetOffVideos Daily
-"""
-import time
-import datetime
-import traceback
-
-from tqdm import tqdm
-
-from applications import PQMySQL, PQAPI, log, bot
-
-EXPIRE_TIME = 3 * 24 * 60 * 60
-
-
-class AutoGetOffVideos(object):
-    """
-    自动下架视频
-    """
-
-    def __init__(self):
-        self.db_client = None
-        self.pq_api = None
-
-    def base_init(self):
-        """
-        初始化数据库和票圈公共方法
-        :return:
-        """
-        try:
-            self.db_client = PQMySQL()
-        except Exception as e:
-            error_msg = traceback.format_exc()
-            bot(
-                title="自动下架视频任务,数据库初始化失败",
-                detail={
-                    "error": str(e),
-                    "error_msg": error_msg
-                }
-            )
-            return False
-
-        try:
-            self.pq_api = PQAPI()
-        except Exception as e:
-            error_msg = traceback.format_exc()
-            bot(
-                title="自动下架视频任务,pq公共方法连接失败",
-                detail={
-                    "error": str(e),
-                    "error_msg": error_msg
-                }
-            )
-            return False
-        return True
-
-    def get_long_articles_videos(self, timestamp):
-        """
-        获取待下架的视频
-        :return:
-        """
-        select_sql = f"""
-            SELECT video_id
-            FROM get_off_videos
-            WHERE video_status = 1 and publish_time < {timestamp};
-        """
-        result = self.db_client.select(sql=select_sql)
-        log(
-            task="getOffVideosDaily",
-            function="get_long_articles_videos",
-            message="查找到视频 id_list,一共{}条视频".format(len(result))
-        )
-        return result
-
-    def update_video_id_status(self, video_id):
-        """
-        修改数据库内视频状态
-        :param video_id:
-        :return:
-        """
-        timestamp = int(time.time())
-        select_sql = f"""
-                UPDATE get_off_videos
-                SET video_status = 0, get_off_time = {timestamp}
-                WHERE video_id = %s;
-                """
-        try:
-            self.db_client.update(
-                sql=select_sql,
-                params=video_id
-            )
-            log(
-                task="getOffVideosDaily",
-                function="update_video_id_status",
-                message="成功修改视频状态",
-                data={"video_id": video_id}
-            )
-        except Exception as e:
-            log(
-                task="getOffVideosDaily",
-                function="update_video_id_status",
-                message="修改视频状态失败--- 推测 sql 问题,报错信息:{}".format(e),
-                status="fail",
-                data={"video_id": video_id}
-            )
-
-    def change_video_id_status(self, video_id):
-        """
-        修改视频id状态
-        :return:
-        """
-        response = self.pq_api.changeVideoStatus(video_id, 2)
-        if response:
-            self.update_video_id_status(video_id=video_id)
-        else:
-            log(
-                task="getOffVideosDaily",
-                function="change_video_id_status",
-                status="fail",
-                message="请求票圈修改状态异常: ---video_id = {}".format(video_id),
-            )
-            bot(
-                title="get_off_videos 下架视频异常",
-                detail={
-                    "video_id": video_id
-                },
-                mention=False
-            )
-
-    def get_off_job(self):
-        """
-        已经请求超过3天的视频全部下架
-        :return:
-        """
-        now_stamp = int(time.time())
-        three_days_before = now_stamp - EXPIRE_TIME
-        video_tuple = self.get_long_articles_videos(timestamp=three_days_before)
-        vid_list = [i[0] for i in video_tuple]
-        for video_id in tqdm(vid_list):
-            try:
-                self.change_video_id_status(video_id=video_id)
-            except Exception as e:
-                log(
-                    task="getOffVideosDaily",
-                    function="get_off_job",
-                    status="fail",
-                    message="get_off_job下架单个视频失败,video_id={}, 报错信息={}".format(video_id, e),
-                )
-
-    def check_job(self):
-        """
-        校验 3 天前发布的视频是否已经下架
-        :return:
-        """
-        three_days_ago = int(time.time()) - EXPIRE_TIME
-        sql = f"""
-            SELECT video_id
-            FROM get_off_videos
-            WHERE publish_time < {three_days_ago}
-            AND video_status = 1;
-        """
-        vid_tuple = self.db_client.select(sql)
-        if vid_tuple:
-            vid_list = [i[0] for i in vid_tuple]
-            for vid in vid_list:
-                try:
-                    self.change_video_id_status(video_id=vid)
-                except Exception as e:
-                    log(
-                        task="getOffVideosDaily",
-                        function="check_job",
-                        status="fail",
-                        message="task2下架单个视频失败,video_id={}, 报错信息={}".format(vid, e),
-                    )
-            time.sleep(10)
-            vid_tuple2 = self.db_client.select(sql)
-            if vid_tuple2:
-                vid_list2 = [i[0] for i in vid_tuple2]
-                bot(
-                    title="getOffVideosDaily_check_job",
-                    detail={
-                        "video_list": vid_list2
-                    }
-                )
-            else:
-                return
-        else:
-            return
-
-
-def main():
-    """
-    main function
-    :return:
-    """
-    auto_get_off_job = AutoGetOffVideos()
-    if auto_get_off_job.base_init():
-        auto_get_off_job.get_off_job()
-        time.sleep(60)
-        auto_get_off_job.check_job()
-        bot(
-            title="get_off_jobs任务执行完成通知",
-            mention=False,
-            detail={
-                "finish_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-            }
-        )
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 56
kimi_balance_monitor.py

@@ -1,56 +0,0 @@
-"""
-@author: luojunhui
-"""
-import requests
-import traceback
-
-from applications import bot
-from applications.decoratorApi import retryOnTimeout
-
-BALANCE_LIMIT_THRESHOLD = 200.0
-
-
-@retryOnTimeout(retries=5, delay=5)
-def check_kimi_balance():
-    """
-    校验kimi余额
-    :return:
-    """
-    url = "https://api.moonshot.cn/v1/users/me/balance"
-
-    payload = {}
-    headers = {
-        'Authorization': 'Bearer sk-5DqYCa88kche6nwIWjLE1p4oMm8nXrR9kQMKbBolNAWERu7q'
-    }
-    response = requests.request("GET", url, headers=headers, data=payload, timeout=10)
-    if response.status_code == 200:
-        response_json = response.json()
-        try:
-            balance = response_json['data']['available_balance']
-            if balance < BALANCE_LIMIT_THRESHOLD:
-                bot(
-                    title="kimi余额小于 {} 块".format(BALANCE_LIMIT_THRESHOLD),
-                    detail={
-                        "balance": balance
-                    }
-                )
-        except Exception as e:
-            error_stack = traceback.format_exc()
-            bot(
-                title="kimi余额接口处理失败,数据结构异常",
-                detail={
-                    "error": str(e),
-                    "error_msg": error_stack
-                }
-            )
-    else:
-        bot(
-            title="kimi余额接口调用失败",
-            detail={
-                "response": response.text
-            }
-        )
-
-
-if __name__ == '__main__':
-    check_kimi_balance()

+ 0 - 31
outside_server_accounts_monitor.py

@@ -1,31 +0,0 @@
-from argparse import ArgumentParser
-
-from tasks.monitor_tasks.outside_gzh_articles_monitor import OutsideGzhArticlesCollector
-from tasks.monitor_tasks.outside_gzh_articles_monitor import OutsideGzhArticlesMonitor
-
-
-if __name__ == "__main__":
-    parser = ArgumentParser()
-    parser.add_argument("--task", help="input monitor or collector")
-    args = parser.parse_args()
-    if args.task:
-        task = args.task
-        match task:
-            case "monitor":
-                monitor = OutsideGzhArticlesMonitor()
-                monitor.deal()
-            case "collector":
-                collector = OutsideGzhArticlesCollector()
-                collector.deal()
-            case _:
-                print("task is not support")
-    else:
-        # first collect data
-        collector = OutsideGzhArticlesCollector()
-        collector.deal()
-
-        # then monitor each article
-        monitor = OutsideGzhArticlesMonitor()
-        monitor.deal()
-
-

+ 0 - 113
published_articles_monitor.py

@@ -1,113 +0,0 @@
-"""
-监测已发布文章
-"""
-
-from datetime import datetime
-from argparse import ArgumentParser
-from concurrent.futures import ThreadPoolExecutor
-
-from tqdm import tqdm
-from applications import bot
-from applications import aiditApi
-from applications.db import DatabaseConnector
-from applications.const import updatePublishedMsgTaskConst
-from applications import WeixinSpider
-from config import piaoquan_crawler_config, long_articles_config
-
-const = updatePublishedMsgTaskConst()
-spider = WeixinSpider()
-
-
-def monitor_article(article):
-    """
-    校验单篇文章是否
-    """
-    gh_id, account_name, title, url, wx_sn, publish_date = article
-    try:
-        response = spider.get_article_text(url, is_cache=False)
-        response_code = response["code"]
-        if response_code == const.ARTICLE_ILLEGAL_CODE:
-            error_detail = response.get("msg")
-            insert_sql = f"""
-                INSERT IGNORE INTO illegal_articles 
-                (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
-                VALUES 
-                (%s, %s, %s, %s, %s, %s);
-                """
-            affected_rows = long_articles_db_client.save(
-                query=insert_sql,
-                params=(gh_id, account_name, title, wx_sn, publish_date, error_detail),
-            )
-            if affected_rows:
-                bot(
-                    title="文章违规告警",
-                    detail={
-                        "account_name": account_name,
-                        "gh_id": gh_id,
-                        "title": title,
-                        "wx_sn": wx_sn.decode("utf-8"),
-                        "publish_date": str(publish_date),
-                        "error_detail": error_detail,
-                    },
-                    mention=False
-                )
-                aiditApi.delete_articles(
-                    gh_id=gh_id,
-                    title=title
-                )
-    except Exception as e:
-        print(e)
-
-
-def get_article_list(run_date):
-    """
-    监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警
-    :return:
-    """
-    if not run_date:
-        run_date = datetime.today().strftime("%Y-%m-%d")
-
-    monitor_start_timestamp = (
-            int(datetime.strptime(run_date, "%Y-%m-%d").timestamp()) - const.MONITOR_PERIOD
-    )
-    select_sql = f"""
-        SELECT ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) AS publish_timestamp
-        FROM official_articles_v2
-        WHERE publish_timestamp >= {monitor_start_timestamp}
-        ORDER BY publish_timestamp DESC;
-    """
-    article_list = piaoquan_crawler_db_client.fetch(select_sql)
-    return article_list
-
-
-if __name__ == "__main__":
-    parser = ArgumentParser()
-
-    parser.add_argument(
-        "--run_date",
-        help="--run_date %Y-%m-%d",
-    )
-    args = parser.parse_args()
-    if args.run_date:
-        run_date = args.run_date
-    else:
-        run_date = None
-
-    piaoquan_crawler_db_client = DatabaseConnector(db_config=piaoquan_crawler_config)
-    piaoquan_crawler_db_client.connect()
-    long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
-    long_articles_db_client.connect()
-
-    # Number of concurrent threads
-    MAX_WORKERS = 4
-
-    article_list = get_article_list(run_date=None)
-
-    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
-        list(
-            tqdm(
-                executor.map(monitor_article, article_list),
-                total=len(article_list),
-                desc="Monitor Article List",
-            )
-        )

+ 0 - 8
run_article_title_exit_v1.py

@@ -1,8 +0,0 @@
-"""
-@author: luojunhui
-"""
-from tasks.flow_pool_tasks.exit_article_with_title import main
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 8
run_baidu_video_crawler.py

@@ -1,8 +0,0 @@
-"""
-@author: luojunhui
-"""
-from cold_start.crawler.baidu import BaiduVideoCrawler
-
-if __name__ == '__main__':
-    task = BaiduVideoCrawler()
-    task.deal()

+ 0 - 13
run_toutiao_recommend.py

@@ -1,13 +0,0 @@
-"""
-@author: luojunhui
-@description: 今日头条推荐流文章抓取任务
-"""
-from cold_start.crawler.toutiao_recommend_crawler import ToutiaoRecommendCrawler
-
-
-if __name__ == "__main__":
-    toutiao_recommend_crawler = ToutiaoRecommendCrawler()
-    toutiao_recommend_crawler.init_database()
-    category_list = ['finance', 'tech', 'history', 'entertainment']
-    for category in category_list:
-        toutiao_recommend_crawler.run(category=category)

+ 0 - 9
run_update_articles_from_aigc_system.py

@@ -1,9 +0,0 @@
-"""
-@author: luojunhui
-"""
-from tasks.update_article_info_from_aigc import UpdateArticleInfoFromAIGC
-
-
-if __name__ == "__main__":
-    update_article_info_from_aigc = UpdateArticleInfoFromAIGC()
-    update_article_info_from_aigc.deal()

+ 0 - 72
run_video_account_crawler.py

@@ -1,72 +0,0 @@
-"""
-@author: luojunhui
-执行视频&&账号的抓取
-"""
-import traceback
-
-from datetime import datetime
-from argparse import ArgumentParser
-
-from applications import bot
-from cold_start.crawler import WeixinAccountCrawler, WeixinVideoCrawler
-
-account_crawler = WeixinAccountCrawler()
-video_crawler = WeixinVideoCrawler()
-
-
-def main():
-    """
-    主函数
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument("--run-date",
-                        help="Run only once for date in format of %Y%m%d. \
-                            If no specified, run as daily jobs.")
-    args = parser.parse_args()
-
-    if args.run_date:
-        run_date = datetime.strptime(args.run_date, "%Y-%m-%d")
-        print("Run in manual mode. Date: {}".format(args.run_date))
-    else:
-        run_date = datetime.today()
-    # # 先执行账号抓取
-    # try:
-    #     account_crawler.run(run_date)
-    # except Exception as e:
-    #     error_msg = traceback.format_exc()
-    #     bot(
-    #         title='账号抓取v1执行失败',
-    #         detail={
-    #             "error": str(e),
-    #             "traceback": error_msg
-    #         }
-    #     )
-    # 再执行文章抓取
-    try:
-        video_crawler.run()
-    except Exception as e:
-        error_msg = traceback.format_exc()
-        bot(
-            title='视频抓取执行失败',
-            detail={
-                "error": str(e),
-                "traceback": error_msg
-            }
-        )
-    # 再执行账号抓取v2
-    try:
-        account_crawler.run_v2()
-    except Exception as e:
-        error_msg = traceback.format_exc()
-        bot(
-            title='账号抓取V2执行失败',
-            detail={
-                "error": str(e),
-                "traceback": error_msg
-            }
-        )
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 34
run_video_publish_and_audit.py

@@ -1,34 +0,0 @@
-"""
-@author: luojunhui
-"""
-from argparse import ArgumentParser
-
-from cold_start.publish import PublishVideosForAudit
-
-pub = PublishVideosForAudit()
-
-
-def main():
-    """
-    主函数
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument("--run_task", type=str, help="run task, input publish or check")
-    args = parser.parse_args()
-
-    if args.run_task:
-        task = args.run_task
-        if task == "publish":
-            pub.publish_job()
-        elif task == "check":
-            pub.check_job()
-        else:
-            print("run_task input ERROR,please input publish or check")
-    else:
-        pub.publish_job()
-        pub.check_job()
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 57
run_video_understanding_with_google.py

@@ -1,57 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-import datetime
-import multiprocessing
-
-from applications import log
-from cold_start.ai_pipeline import ExtractVideoBestFrame
-
-PROCESS_EXIT_TIMEOUT = 10 * 60
-
-
-def start_task():
-    task = ExtractVideoBestFrame()
-
-    # 查询有多少任务正在处理中
-    processing_tasks = task.get_processing_task_pool_size()
-
-    if processing_tasks:
-        print(
-            f"{datetime.datetime.now()} 当前有 {processing_tasks} 个任务正在等待 google 处理..."
-        )
-        task.extract_best_frame_with_gemini_ai()
-    else:
-        print(f"{datetime.datetime.now()} 没有任务正在处理中...")
-        # upload video to google ai
-        task.upload_video_to_gemini_ai()
-        log(
-            task="video_understanding_with_google",
-            function="main",
-            message="upload_video_to_google_ai_task",
-        )
-        task.extract_best_frame_with_gemini_ai()
-
-    # 调用接口,使用 ffmpeg 获取视频的最佳帧作为封面
-    task.get_cover_with_best_frame()
-
-
-def main():
-    # create sub process
-    process = multiprocessing.Process(target=start_task)
-    process.start()
-
-    # wait for sub process to finish
-    process.join(PROCESS_EXIT_TIMEOUT)
-
-    if process.is_alive():
-        print(
-            f"Process {process.pid} did not finish within {PROCESS_EXIT_TIMEOUT} seconds. Terminating..."
-        )
-        process.terminate()
-        process.join()
-
-
-if __name__ == "__main__":
-    main()

+ 17 - 19
schedule_app.py

@@ -1,26 +1,24 @@
-# from celery import Celery
-from tasks.crawler_tasks.crawler_video.crawler_piaoquan_videos import CrawlerPiaoQuanVideos
-from tasks.crawler_tasks.crawler_video.crawler_sohu_videos import CrawlerSohuHotVideos
-from tasks.crawler_tasks.crawler_video.crawler_sohu_videos import CrawlerSohuRecommendVideos
+import time
+import schedule
+from tasks.experiment.expirment_tasks import deal
 
 
-# app = Celery('tasks', broker='redis://localhost:6379/0')
+def do_job():
+    """<UNK>"""
+    print("开始执行任务...")
+    deal()
+    print("任务执行完成!")
 
-# @app.task
-def run_piaoquan_video_crawler():
-    crawler = CrawlerPiaoQuanVideos()
-    crawler.deal()
+def run_scheduler():
+    """调度器运行函数"""
+    # 每天5:00执行
+    schedule.every().day.at("15:41").do(do_job)
 
-def run_sohu_video_crawler():
-    # step1, crawl sohu hot videos
-    crawler_sohu_hot_videos = CrawlerSohuHotVideos()
-    crawler_sohu_hot_videos.deal()
+    print("调度器已启动,等待执行时间...")
+    while True:
+        schedule.run_pending()
+        time.sleep(60)  # 每分钟检查一次
 
-    # step2, crawl sohu recommend videos
-    crawler_sohu_recommend_videos = CrawlerSohuRecommendVideos()
-    crawler_sohu_recommend_videos.deal()
 
 if __name__ == "__main__":
-    run_piaoquan_video_crawler()
-    run_sohu_video_crawler()
-
+    run_scheduler()

+ 106 - 0
tasks/experiment/expirment_tasks.py

@@ -0,0 +1,106 @@
+import json
+import datetime
+from typing import Dict, List
+
+from pymysql.cursors import DictCursor
+
+from applications.api.odps_api import ODPSApi
+from applications.db import DatabaseConnector
+from config import growth_config, long_articles_config
+
+
+def format_video_values(value_list: List[List]) -> Dict:
+    keyword_map = {}
+    for line in value_list:
+        video_id = line[0]
+        video_data = json.loads(line[1])
+        video_keywords = video_data["video_keywords"]
+        if video_keywords:
+            try:
+                keywords = json.loads(video_keywords)
+            except:
+                if "," in video_keywords:
+                    keywords = video_keywords.split(",")
+                elif "," in video_keywords:
+                    keywords = video_keywords.split(",")
+                else:
+                    continue
+            for keyword in keywords:
+                keyword = keyword.strip()
+                if keyword_map.get(keyword):
+                    keyword_map[keyword].append(video_id)
+                else:
+                    keyword_map[keyword] = [video_id]
+
+    return keyword_map
+
+
+def get_candidate_videos(db_client):
+    """
+    获取待处理的视频
+    """
+    sql = f"""
+        SELECT video_id
+        FROM content_platform_video
+        WHERE dt > DATE_SUB(CURRENT_DATE, INTERVAL 5 DAY)
+        group by video_id;
+    """
+    response = db_client.fetch(sql, cursor_type=DictCursor)
+    return response
+
+
+def get_video_detail(video_tuple: tuple, odps_server: ODPSApi, yesterday: str):
+    """
+    获取视频详情
+    """
+    fetch_query = f"""
+        select id, `data` from loghubods.videoid_feature_aitags 
+        where dt = '{yesterday}'
+        and id in {video_tuple};
+    """
+    res = odps_server.execute_sql(fetch_query)
+    values_list = res.values.tolist()
+    return values_list
+
+
+def save_to_database(date_str, keyword_map, db_client):
+    """
+    保存到数据库
+    """
+    sql = f"""
+        insert into video_keywords_map (date, keywords_map, status)
+        values (%s, %s, %s);
+    """
+    affected_rows = db_client.save(sql, (date_str, keyword_map, 1))
+    if affected_rows:
+        update_query = f"""
+            update video_keywords_map
+            set status = %s where date != %s;
+        """
+        return db_client.save(update_query, (0, date_str))
+    else:
+        return 0
+
+
+def deal():
+    yesterday = datetime.date.today() - datetime.timedelta(days=1)
+    yesterday_str = yesterday.strftime("%Y%m%d")
+
+    growth_client = DatabaseConnector(growth_config)
+    growth_client.connect()
+
+    long_articles_client = DatabaseConnector(long_articles_config)
+    long_articles_client.connect()
+
+    odps_server = ODPSApi()
+
+    video_ids = get_candidate_videos(growth_client)
+    video_id_tuple = tuple([i["video_id"] for i in video_ids])
+
+    video_details = get_video_detail(video_id_tuple, odps_server, yesterday_str)
+
+    keyword_map = format_video_values(video_details)
+
+    save_to_database(
+        yesterday_str, json.dumps(keyword_map, ensure_ascii=False), long_articles_client
+    )

+ 0 - 21
title_process_task.py

@@ -1,21 +0,0 @@
-"""
-@author: luojunhui
-"""
-from tasks.ai_tasks.category_generation_task import ArticlePoolCategoryGenerationTask
-from tasks.ai_tasks.category_generation_task import VideoPoolCategoryGenerationTask
-from tasks.ai_tasks.title_rewrite_task import TitleRewriteTask
-
-
-if __name__ == '__main__':
-    # 1. 标题重写
-    title_rewrite_task = TitleRewriteTask()
-    title_rewrite_task.deal()
-
-    # 2. 视频内容池标题分类
-    video_pool_category_generation_task = VideoPoolCategoryGenerationTask()
-    video_pool_category_generation_task.deal()
-
-    # 3. 文章内容池标题分类
-    article_pool_category_generation_task = ArticlePoolCategoryGenerationTask()
-    article_pool_category_generation_task.deal()
-

+ 0 - 36
title_similarity_score_task.py

@@ -1,36 +0,0 @@
-"""
-@author: luojunhui
-"""
-import traceback
-from applications import bot
-from cold_start.filter.title_similarity_task import ColdStartTitleSimilarityTask
-
-
-if __name__ == '__main__':
-    batch_size = 3000
-    task = ColdStartTitleSimilarityTask()
-    task.init_database()
-    # process video
-    try:
-        task.run(meta_source="video")
-    except Exception as e:
-        bot(
-            title="视频冷启池nlp任务异常",
-            mention=False,
-            detail={
-                "traceback": traceback.format_exc(),
-                "error": f"{e}"
-            }
-        )
-    # process article
-    try:
-        task.run(meta_source="article")
-    except Exception as e:
-        bot(
-            title="文章冷启池nlp任务异常",
-            mention=False,
-            detail={
-                "traceback": traceback.format_exc(),
-                "error": f"{e}"
-            }
-        )

+ 0 - 10
toutiao_video_crawler.py

@@ -1,10 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-from tasks.crawler_tasks.crawler_video.crawler_toutiao_videos import CrawlerToutiaoAccountVideos
-
-
-if __name__ == '__main__':
-    crawler = CrawlerToutiaoAccountVideos()
-    crawler.deal()

+ 0 - 38
updateAccountV3.py

@@ -1,38 +0,0 @@
-import time
-
-from datetime import datetime, timedelta
-from argparse import ArgumentParser
-
-from tasks.data_tasks.account_position_read_avg_task import (
-    AccountPositionReadAvgTask,
-    AccountOpenRateAvgTask,
-)
-
-
-def main():
-    """
-    main job
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument(
-        "--run-date",
-        help="Run only once for date in format of %Y-%m-%d. \
-                                If no specified, run as daily jobs.",
-    )
-    args = parser.parse_args()
-    update_account_read_avg_task = AccountPositionReadAvgTask()
-    update_account_open_rate_avg_task = AccountOpenRateAvgTask()
-    if args.run_date:
-        update_account_read_avg_task.do_task_list(dt=args.run_date)
-    else:
-        dt_object = datetime.fromtimestamp(int(time.time()))
-        one_day = timedelta(days=1)
-        yesterday = dt_object - one_day
-        yesterday_str = yesterday.strftime("%Y-%m-%d")
-        update_account_read_avg_task.do_task_list(dt=yesterday_str)
-        update_account_open_rate_avg_task.do_task_list(date_string=yesterday_str)
-
-
-if __name__ == "__main__":
-    main()

+ 0 - 733
updatePublishedMsgDaily.py

@@ -1,733 +0,0 @@
-"""
-@author: luojunhui
-@description: update daily information into official articles v2
-"""
-import json
-import time
-import traceback
-import urllib.parse
-from argparse import ArgumentParser
-from datetime import datetime
-from typing import Dict, List, Tuple
-
-from pymysql.cursors import DictCursor
-from tqdm import tqdm
-
-from applications import aiditApi
-from applications import bot
-from applications import create_feishu_columns_sheet
-from applications import Functions
-from applications import log
-from applications import WeixinSpider
-from applications.const import updatePublishedMsgTaskConst
-from applications.db import DatabaseConnector
-from config import denet_config, long_articles_config, piaoquan_crawler_config
-
-ARTICLE_TABLE = "official_articles_v2"
-const = updatePublishedMsgTaskConst()
-spider = WeixinSpider()
-functions = Functions()
-
-
-def generate_bot_columns():
-    """
-    生成列
-    :return:
-    """
-    columns = [
-        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="name", display_name="公众号名称"),
-        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="ghId", display_name="ghId"),
-        create_feishu_columns_sheet(sheet_type="number", sheet_name="follower_count", display_name="粉丝数"),
-        create_feishu_columns_sheet(sheet_type="date", sheet_name="account_init_timestamp",
-                                    display_name="账号接入系统时间"),
-        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="using_status", display_name="利用状态")
-    ]
-    return columns
-
-
-def get_account_status(db_client: DatabaseConnector) -> Dict:
-    """
-    获取账号的实验状态
-    :return:
-    """
-    sql = f"""  
-            SELECT t1.account_id, t2.status
-            FROM wx_statistics_group_source_account t1
-            JOIN wx_statistics_group_source t2
-            ON t1.group_source_name = t2.account_source_name;
-            """
-    account_status_list = db_client.fetch(sql, cursor_type=DictCursor)
-    account_status_dict = {account['account_id']: account['status'] for account in account_status_list}
-    return account_status_dict
-
-
-def get_accounts(db_client: DatabaseConnector) -> List[Dict]:
-    """
-    从 aigc 数据库中获取目前处于发布状态的账号
-    :return:
-    "name": line[0],
-    "ghId": line[1],
-    "follower_count": line[2],
-    "account_init_time": int(line[3] / 1000),
-    "account_type": line[4], # 订阅号 or 服务号
-    "account_auth": line[5]
-    """
-    illegal_accounts = [
-            'gh_4c058673c07e',
-            'gh_de9f9ebc976b',
-            'gh_7b4a5f86d68c',
-            'gh_f902cea89e48',
-            'gh_789a40fe7935',
-            'gh_cd041ed721e6',
-            'gh_62d7f423f382',
-            'gh_043223059726',
-            'gh_5bb79339a1f4'
-    ]
-    account_list_with_out_using_status = aiditApi.get_publish_account_from_aigc()
-    account_status_dict = get_account_status(db_client)
-    account_list = [
-        {
-            **item,
-            'using_status': 0 if account_status_dict.get(item['account_id']) == '实验' else 1
-        }
-        for item in account_list_with_out_using_status
-    ]
-    account_list = [account for account in account_list if account['ghId'] not in illegal_accounts]
-    return account_list
-
-
-def insert_each_msg(db_client: DatabaseConnector, account_info: Dict, msg_list: List[Dict]) -> None:
-    """
-    把消息数据更新到数据库中
-    :param account_info:
-    :param db_client:
-    :param msg_list:
-    :return:
-    """
-    gh_id = account_info['ghId']
-    account_name = account_info['name']
-    for info in msg_list:
-        baseInfo = info.get("BaseInfo", {})
-        appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
-        createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
-        updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
-        Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
-        detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
-        if detail_article_list:
-            for article in detail_article_list:
-                title = article.get("Title", None)
-                Digest = article.get("Digest", None)
-                ItemIndex = article.get("ItemIndex", None)
-                ContentUrl = article.get("ContentUrl", None)
-                SourceUrl = article.get("SourceUrl", None)
-                CoverImgUrl = article.get("CoverImgUrl", None)
-                CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
-                CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
-                ItemShowType = article.get("ItemShowType", None)
-                IsOriginal = article.get("IsOriginal", None)
-                ShowDesc = article.get("ShowDesc", None)
-                show_stat = functions.show_desc_to_sta(ShowDesc)
-                ori_content = article.get("ori_content", None)
-                show_view_count = show_stat.get("show_view_count", 0)
-                show_like_count = show_stat.get("show_like_count", 0)
-                show_zs_count = show_stat.get("show_zs_count", 0)
-                show_pay_count = show_stat.get("show_pay_count", 0)
-                wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
-                status = account_info['using_status']
-                info_tuple = (
-                    gh_id,
-                    account_name,
-                    appMsgId,
-                    title,
-                    Type,
-                    createTime,
-                    updateTime,
-                    Digest,
-                    ItemIndex,
-                    ContentUrl,
-                    SourceUrl,
-                    CoverImgUrl,
-                    CoverImgUrl_1_1,
-                    CoverImgUrl_235_1,
-                    ItemShowType,
-                    IsOriginal,
-                    ShowDesc,
-                    ori_content,
-                    show_view_count,
-                    show_like_count,
-                    show_zs_count,
-                    show_pay_count,
-                    wx_sn,
-                    json.dumps(baseInfo, ensure_ascii=False),
-                    functions.str_to_md5(title),
-                    status
-                )
-                try:
-                    insert_sql = f"""
-                        INSERT INTO {ARTICLE_TABLE}
-                        (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
-                        values
-                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
-                        """
-                    db_client.save(query=insert_sql, params=info_tuple)
-                    log(
-                        task="updatePublishedMsgDaily",
-                        function="insert_each_msg",
-                        message="插入文章数据成功",
-                        data={
-                            "info": info_tuple
-                        }
-
-                    )
-                except Exception as e:
-                    try:
-                        update_sql = f"""
-                        UPDATE {ARTICLE_TABLE}
-                        SET show_view_count = %s, show_like_count=%s
-                        WHERE wx_sn = %s;
-                        """
-                        db_client.save(query=update_sql,
-                                       params=(show_view_count, show_like_count, wx_sn))
-                        log(
-                            task="updatePublishedMsgDaily",
-                            function="insert_each_msg",
-                            message="更新文章数据成功",
-                            data={
-                                "wxSn": wx_sn,
-                                "likeCount": show_like_count,
-                                "viewCount": show_view_count
-                            }
-
-                        )
-                    except Exception as e:
-                        log(
-                            task="updatePublishedMsgDaily",
-                            function="insert_each_msg",
-                            message="更新文章失败, 报错原因是: {}".format(e),
-                            status="fail"
-                        )
-                        continue
-
-
-def update_each_account(db_client: DatabaseConnector, account_info: Dict, latest_update_time: int, cursor=None):
-    """
-    更新每一个账号信息
-    :param account_info:
-    :param cursor:
-    :param latest_update_time: 最新更新时间
-    :param db_client: 数据库连接信息
-    :return: None
-    """
-    gh_id = account_info['ghId']
-    response = spider.update_msg_list(ghId=gh_id, index=cursor)
-    msg_list = response.get("data", {}).get("data", [])
-    if msg_list:
-        # do
-        last_article_in_this_msg = msg_list[-1]
-        last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
-        # last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
-        # resdata = spider.get_account_by_url(last_url)
-        # check_id = resdata['data'].get('data', {}).get('wx_gh')
-        # if check_id == gh_id:
-        insert_each_msg(
-            db_client=db_client,
-            account_info=account_info,
-            msg_list=msg_list
-        )
-        if last_time_stamp_in_this_msg > latest_update_time:
-            next_cursor = response['data']['next_cursor']
-            return update_each_account(
-                db_client=db_client,
-                account_info=account_info,
-                latest_update_time=latest_update_time,
-                cursor=next_cursor
-            )
-        log(
-            task="updatePublishedMsgDaily",
-            function="update_each_account",
-            message="账号文章更新成功",
-            data=response
-        )
-        return None
-    else:
-        log(
-            task="updatePublishedMsgDaily",
-            function="update_each_account",
-            message="账号文章更新失败",
-            status="fail",
-            data=response
-        )
-        return None
-
-
-def check_account_info(db_client: DatabaseConnector, gh_id: str) -> int:
-    """
-    通过 gh_id查询账号信息的最新发布时间
-    :param db_client:
-    :param gh_id:
-    :return:
-    """
-    sql = f"""
-        SELECT MAX(publish_timestamp)
-        FROM {ARTICLE_TABLE}
-        WHERE ghId = '{gh_id}';
-        """
-    result = db_client.fetch(sql)
-    if result:
-        return result[0][0]
-    else:
-        # 新号,抓取周期定位抓取时刻往前推30天
-        return int(time.time()) - const.NEW_ACCOUNT_CRAWL_PERIOD
-
-
-def update_single_account(db_client: DatabaseConnector, account_info: Dict):
-    """
-    更新单个账号
-    :param db_client:
-    :param account_info:
-    :return:
-    """
-    gh_id = account_info['ghId']
-    max_publish_time = check_account_info(db_client, gh_id)
-    update_each_account(
-        db_client=db_client,
-        account_info=account_info,
-        latest_update_time=max_publish_time
-    )
-
-
-def check_single_account(db_client: DatabaseConnector, account_item: Dict) -> bool:
-    """
-    校验每个账号是否更新
-    :param db_client:
-    :param account_item:
-    :return: True / False
-    """
-    gh_id = account_item['ghId']
-    account_type = account_item['account_type']
-    today_str = datetime.today().strftime("%Y-%m-%d")
-    today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
-    today_timestamp = today_date_time.timestamp()
-    sql = f"""
-            SELECT max(updateTime)
-            FROM {ARTICLE_TABLE}
-            WHERE ghId = '{gh_id}';
-            """
-    try:
-        latest_update_time = db_client.fetch(sql)[0][0]
-        # 判断该账号当天发布的文章是否被收集
-        if account_type in const.SUBSCRIBE_TYPE_SET:
-            if int(latest_update_time) > int(today_timestamp):
-                return True
-            else:
-                return False
-        else:
-            if int(latest_update_time) > int(today_timestamp) - 7 * 24 * 3600:
-                return True
-            else:
-                return False
-    except Exception as e:
-        print(e)
-        return False
-
-
-def get_articles(db_client: DatabaseConnector):
-    """
-
-    :return:
-    """
-    sql = f"""
-    SELECT ContentUrl, wx_sn 
-    FROM {ARTICLE_TABLE}
-    WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};"""
-    response = db_client.fetch(sql)
-    return response
-
-
-def update_publish_timestamp(db_client: DatabaseConnector, row: Tuple):
-    """
-    更新发布时间戳 && minigram 信息
-    :param db_client:
-    :param row:
-    :return:
-    """
-    url = row[0]
-    wx_sn = row[1]
-    try:
-        response = spider.get_article_text(url)
-        response_code = response['code']
-
-        if response_code == const.ARTICLE_DELETE_CODE:
-            publish_timestamp_s = const.DELETE_STATUS
-            root_source_id_list = []
-        elif response_code == const.ARTICLE_ILLEGAL_CODE:
-            publish_timestamp_s = const.ILLEGAL_STATUS
-            root_source_id_list = []
-        elif response_code == const.ARTICLE_SUCCESS_CODE:
-            data = response['data']['data']
-            publish_timestamp_ms = data['publish_timestamp']
-            publish_timestamp_s = int(publish_timestamp_ms / 1000)
-            mini_program = data.get('mini_program', [])
-            if mini_program:
-                root_source_id_list = [
-                    urllib.parse.parse_qs(
-                        urllib.parse.unquote(i['path'])
-                    )['rootSourceId'][0]
-                    for i in mini_program
-                ]
-            else:
-                root_source_id_list = []
-        else:
-            publish_timestamp_s = const.UNKNOWN_STATUS
-            root_source_id_list = []
-    except Exception as e:
-        publish_timestamp_s = const.REQUEST_FAIL_STATUS
-        root_source_id_list = None
-        error_msg = traceback.format_exc()
-        print(e, error_msg)
-
-    update_sql = f"""
-            UPDATE {ARTICLE_TABLE}
-            SET publish_timestamp = %s, root_source_id_list = %s
-            WHERE wx_sn = %s;
-        """
-    db_client.save(
-        query=update_sql,
-        params=(
-            publish_timestamp_s,
-            json.dumps(root_source_id_list, ensure_ascii=False),
-            wx_sn
-        ))
-    if publish_timestamp_s == const.REQUEST_FAIL_STATUS:
-        return row
-    else:
-        return None
-
-
-def get_article_detail_job(db_client: DatabaseConnector):
-    """
-    获取发布文章详情
-    :return:
-    """
-
-    article_tuple = get_articles(db_client)
-    for article in tqdm(article_tuple):
-        try:
-            update_publish_timestamp(db_client=db_client, row=article)
-        except Exception as e:
-            print(e)
-            error_msg = traceback.format_exc()
-            print(error_msg)
-    # check 一遍存在请求失败-1 && 0 的文章
-    process_failed_articles = get_articles(db_client)
-    fail_list = []
-    if process_failed_articles:
-        for article in tqdm(process_failed_articles):
-            try:
-                res = update_publish_timestamp(db_client=db_client, row=article)
-                fail_list.append({"wx_sn": res[1], "url": res[0]})
-            except Exception as e:
-                print(e)
-                error_msg = traceback.format_exc()
-                print(error_msg)
-
-    # 通过msgId 来修改publish_timestamp
-    update_sql = f"""
-        UPDATE {ARTICLE_TABLE} oav 
-        JOIN (
-            SELECT ghId, appMsgId, MAX(publish_timestamp) AS publish_timestamp 
-            FROM {ARTICLE_TABLE} 
-            WHERE publish_timestamp > %s 
-            GROUP BY ghId, appMsgId
-            ) vv 
-            ON oav.appMsgId = vv.appMsgId AND oav.ghId = vv.ghId
-        SET oav.publish_timestamp = vv.publish_timestamp
-        WHERE oav.publish_timestamp <= %s;
-    """
-    db_client.save(
-        query=update_sql,
-        params=(0, 0)
-    )
-
-    # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
-    update_sql_2 = f"""
-        UPDATE {ARTICLE_TABLE}
-        SET publish_timestamp = updateTime
-        WHERE publish_timestamp < %s;
-    """
-    db_client.save(
-        query=update_sql_2,
-        params=0
-    )
-    if fail_list:
-        bot(
-            title="更新文章任务,请求detail失败",
-            detail=fail_list
-        )
-
-
-def whether_title_unsafe(db_client: DatabaseConnector, title: str):
-    """
-    检查文章标题是否已经存在违规记录
-    :param db_client:
-    :param title:
-    :return:
-    """
-    title_md5 = functions.str_to_md5(title)
-    sql = f"""
-        SELECT title_md5
-        FROM article_unsafe_title
-        WHERE title_md5 = '{title_md5}';
-    """
-    res = db_client.fetch(sql)
-    if res:
-        return True
-    else:
-        return False
-
-
-def update_job(piaoquan_crawler_db_client, aigc_db_client):
-    """
-    更新任务
-    :return:
-    """
-    account_list = get_accounts(db_client=aigc_db_client)
-    # 订阅号
-    subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
-    success_count = 0
-    fail_count = 0
-    for sub_item in tqdm(subscription_accounts):
-        try:
-            update_single_account(piaoquan_crawler_db_client, sub_item)
-            success_count += 1
-            # time.sleep(5)
-        except Exception as e:
-            fail_count += 1
-            log(
-                task="updatePublishedMsgDaily",
-                function="update_job",
-                message="单个账号文章更新失败, 报错信息是: {}".format(e),
-                status="fail",
-                data={
-                    "account": sub_item,
-                    "error": str(e),
-                    "traceback": traceback.format_exc()
-                }
-            )
-    log(
-        task="updatePublishedMsgDaily",
-        function="update_job",
-        message="订阅号更新完成",
-        data={
-            "success": success_count,
-            "fail": fail_count
-        }
-    )
-
-    if fail_count / (success_count + fail_count) > const.SUBSCRIBE_FAIL_RATE_THRESHOLD:
-        bot(
-            title="订阅号超过 {}% 的账号更新失败".format(int(const.SUBSCRIBE_FAIL_RATE_THRESHOLD * 100)),
-            detail={
-                "success": success_count,
-                "fail": fail_count,
-                "failRate": fail_count / (success_count + fail_count)
-            }
-        )
-    bot(
-        title="更新每日发布文章任务完成通知",
-        detail={
-            "msg": "订阅号更新完成",
-            "finish_time": datetime.today().__str__()
-        },
-        mention=False
-    )
-    # 服务号
-    server_accounts = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
-    for sub_item in tqdm(server_accounts):
-        try:
-            update_single_account(piaoquan_crawler_db_client, sub_item)
-            time.sleep(5)
-        except Exception as e:
-            print(e)
-    bot(
-        title="更新每日发布文章任务完成通知",
-        detail={
-            "msg": "服务号更新完成",
-            "finish_time": datetime.today().__str__()
-        },
-        mention=False
-    )
-
-
-def check_job(piaoquan_crawler_db_client, aigc_db_client):
-    """
-    校验任务
-    :return:
-    """
-    account_list = get_accounts(db_client=aigc_db_client)
-    # 订阅号
-    subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
-    fail_list = []
-    # check and rework if fail
-    for sub_item in tqdm(subscription_accounts):
-        res = check_single_account(piaoquan_crawler_db_client, sub_item)
-        if not res:
-            try:
-                update_single_account(piaoquan_crawler_db_client, sub_item)
-            except Exception as e:
-                print(e)
-                print(sub_item)
-                # fail_list.append(sub_item)
-    # check whether success and bot if fails
-    for sub_item in tqdm(subscription_accounts):
-        res = check_single_account(piaoquan_crawler_db_client, sub_item)
-        if not res:
-            # 去掉三个不需要查看的字段
-            sub_item.pop('account_type', None)
-            sub_item.pop('account_auth', None)
-            sub_item.pop('account_id', None)
-            fail_list.append(sub_item)
-    if fail_list:
-        try:
-            bot(
-                title="更新当天发布文章,存在未更新的账号",
-                detail={
-                    "columns": generate_bot_columns(),
-                    "rows": fail_list
-                },
-                table=True
-            )
-        except Exception as e:
-            print("Timeout Error: {}".format(e))
-    else:
-        bot(
-            title="更新当天发布文章,所有账号均更新成功",
-            mention=False,
-            detail={
-                "msg": "校验任务完成",
-                "finish_time": datetime.today().__str__()
-            }
-        )
-
-
-def monitor(piaoquan_crawler_db_client, long_articles_db_client, run_date):
-    """
-    监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警
-    :return:
-    """
-    if not run_date:
-        run_date = datetime.today().strftime("%Y-%m-%d")
-
-    monitor_start_timestamp = int(datetime.strptime(run_date, "%Y-%m-%d").timestamp()) - const.MONITOR_PERIOD
-    select_sql = f"""
-        SELECT ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) AS publish_timestamp
-        FROM {ARTICLE_TABLE}
-        WHERE publish_timestamp >= {monitor_start_timestamp};
-    """
-    article_list = piaoquan_crawler_db_client.fetch(select_sql)
-    for article in tqdm(article_list, desc="monitor article list"):
-        gh_id = article[0]
-        account_name = article[1]
-        title = article[2]
-        # 判断标题是否存在违规记录
-        if whether_title_unsafe(long_articles_db_client, title):
-            continue
-        url = article[3]
-        wx_sn = article[4]
-        publish_date = article[5]
-        try:
-            response = spider.get_article_text(url, is_cache=False)
-            response_code = response['code']
-            if response_code == const.ARTICLE_ILLEGAL_CODE:
-                bot(
-                    title="文章违规告警",
-                    detail={
-                        "ghId": gh_id,
-                        "accountName": account_name,
-                        "title": title,
-                        "wx_sn": str(wx_sn),
-                        "publish_date": str(publish_date)
-                    },
-                    mention=False
-                )
-                aiditApi.delete_articles(
-                    gh_id=gh_id,
-                    title=title
-                )
-        except Exception as e:
-            error_msg = traceback.format_exc()
-            log(
-                task="monitor",
-                function="monitor",
-                message="请求文章详情失败",
-                data={
-                    "ghId": gh_id,
-                    "accountName": account_name,
-                    "title": title,
-                    "wx_sn": str(wx_sn),
-                    "error": str(e),
-                    "msg": error_msg
-                }
-            )
-
-
-def main():
-    """
-    main
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument(
-        "--run_task",
-        help="update: update_job, check: check_job, detail: get_article_detail_job, monitor: monitor")
-    parser.add_argument(
-        "--run_date",
-        help="--run_date %Y-%m-%d",
-    )
-    args = parser.parse_args()
-
-    # 初始化数据库连接
-    try:
-        piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
-        piaoquan_crawler_db_client.connect()
-        aigc_db_client = DatabaseConnector(denet_config)
-        aigc_db_client.connect()
-        long_articles_db_client = DatabaseConnector(long_articles_config)
-    except Exception as e:
-        error_msg = traceback.format_exc()
-        bot(
-            title="更新文章任务连接数据库失败",
-            detail={
-                "error": e,
-                "msg": error_msg
-            }
-        )
-        return
-
-    if args.run_task:
-        run_task = args.run_task
-        match run_task:
-            case "update":
-                update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
-                get_article_detail_job(db_client=piaoquan_crawler_db_client)
-            case "check":
-                check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
-            case "detail":
-                get_article_detail_job(db_client=piaoquan_crawler_db_client)
-            case "monitor":
-                if args.run_date:
-                    run_date = args.run_date
-                else:
-                    run_date = None
-                monitor(piaoquan_crawler_db_client=piaoquan_crawler_db_client,
-                        long_articles_db_client=long_articles_db_client, run_date=run_date)
-            case _:
-                print("No such task, input update: update_job, check: check_job, detail: get_article_detail_job")
-    else:
-        update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
-        check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
-        get_article_detail_job(db_client=piaoquan_crawler_db_client)
-
-
-if __name__ == '__main__':
-    main()

+ 0 - 33
update_mini_info_v2.py

@@ -1,33 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-from argparse import ArgumentParser
-
-from tasks.data_tasks.update_published_articles_minigram_detail import UpdatePublishedArticlesMinigramDetail
-
-
-def main():
-    """
-    update mini program detail main
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument("--run-date",
-                        help="Run only once for date in format of %Y-%m-%d. \
-                            If no specified, run as daily jobs.")
-    args = parser.parse_args()
-
-    update_minigram_detail_task = UpdatePublishedArticlesMinigramDetail()
-    update_minigram_detail_task.init_database()
-
-    if args.run_date:
-        update_minigram_detail_task.update_published_articles_job(biz_date=args.run_date)
-        update_minigram_detail_task.update_mini_program_detail_job(biz_date=args.run_date)
-    else:
-        update_minigram_detail_task.update_published_articles_job()
-        update_minigram_detail_task.update_mini_program_detail_job()
-
-
-if __name__ == '__main__':
-    main()