| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710 | """@author: luojunhui@description: update daily information into official articles v2"""import jsonimport timeimport tracebackimport urllib.parsefrom argparse import ArgumentParserfrom datetime import datetimefrom typing import Dict, Listfrom pymysql.cursors import DictCursorfrom tqdm import tqdmfrom applications import aiditApifrom applications import botfrom applications import create_feishu_columns_sheetfrom applications import Functionsfrom applications import logfrom applications import WeixinSpiderfrom applications.const import updatePublishedMsgTaskConstfrom applications.db import DatabaseConnectorfrom config import denet_config, long_articles_config, piaoquan_crawler_configARTICLE_TABLE = "official_articles_v2"const = updatePublishedMsgTaskConst()spider = WeixinSpider()functions = Functions()def generate_bot_columns():    """    生成列    :return:    """    columns = [        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="name", display_name="公众号名称"),        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="ghId", display_name="ghId"),        create_feishu_columns_sheet(sheet_type="number", sheet_name="follower_count", display_name="粉丝数"),        create_feishu_columns_sheet(sheet_type="date", sheet_name="account_init_timestamp",                                    display_name="账号接入系统时间"),        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="using_status", display_name="利用状态")    ]    return columnsdef get_account_status(aigc_db_client: DatabaseConnector(denet_config)) -> Dict:    """    获取账号的实验状态    :return:    """    sql = f"""              SELECT t1.account_id, t2.status            FROM wx_statistics_group_source_account t1            JOIN wx_statistics_group_source t2            ON t1.group_source_name = t2.account_source_name;            """    account_status_list = aigc_db_client.fetch(sql, cursor_type=DictCursor)    account_status_dict = {account['account_id']: account['status'] for account in account_status_list}    return account_status_dictdef get_accounts(aigc_db_client: DatabaseConnector(denet_config)) -> List[Dict]:    """    从 aigc 数据库中获取目前处于发布状态的账号    :return:    "name": line[0],    "ghId": line[1],    "follower_count": line[2],    "account_init_time": int(line[3] / 1000),    "account_type": line[4], # 订阅号 or 服务号    "account_auth": line[5]    """    account_list_with_out_using_status = aiditApi.get_publish_account_from_aigc()    account_status_dict = get_account_status(aigc_db_client)    account_list = [        {            **item,            'using_status': 0 if account_status_dict.get(item['account_id']) == '实验' else 1        }        for item in account_list_with_out_using_status    ]    return account_listdef insert_each_msg(db_client, account_info, account_name, msg_list):    """    把消息数据更新到数据库中    :param account_info:    :param db_client:    :param account_name:    :param msg_list:    :return:    """    gh_id = account_info['ghId']    account_name = account_name['name']    for info in msg_list:        baseInfo = info.get("BaseInfo", {})        appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)        createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)        updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)        Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)        detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])        if detail_article_list:            for article in detail_article_list:                title = article.get("Title", None)                Digest = article.get("Digest", None)                ItemIndex = article.get("ItemIndex", None)                ContentUrl = article.get("ContentUrl", None)                SourceUrl = article.get("SourceUrl", None)                CoverImgUrl = article.get("CoverImgUrl", None)                CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)                CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)                ItemShowType = article.get("ItemShowType", None)                IsOriginal = article.get("IsOriginal", None)                ShowDesc = article.get("ShowDesc", None)                show_stat = functions.show_desc_to_sta(ShowDesc)                ori_content = article.get("ori_content", None)                show_view_count = show_stat.get("show_view_count", 0)                show_like_count = show_stat.get("show_like_count", 0)                show_zs_count = show_stat.get("show_zs_count", 0)                show_pay_count = show_stat.get("show_pay_count", 0)                wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None                status = account_info['using_status']                info_tuple = (                    gh_id,                    account_name,                    appMsgId,                    title,                    Type,                    createTime,                    updateTime,                    Digest,                    ItemIndex,                    ContentUrl,                    SourceUrl,                    CoverImgUrl,                    CoverImgUrl_1_1,                    CoverImgUrl_235_1,                    ItemShowType,                    IsOriginal,                    ShowDesc,                    ori_content,                    show_view_count,                    show_like_count,                    show_zs_count,                    show_pay_count,                    wx_sn,                    json.dumps(baseInfo, ensure_ascii=False),                    functions.str_to_md5(title),                    status                )                try:                    insert_sql = f"""                        INSERT INTO {ARTICLE_TABLE}                        (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)                        values                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);                        """                    db_client.save(sql=insert_sql, params=info_tuple)                    log(                        task="updatePublishedMsgDaily",                        function="insert_each_msg",                        message="插入文章数据成功",                        data={                            "info": info_tuple                        }                    )                except Exception as e:                    try:                        update_sql = f"""                        UPDATE {ARTICLE_TABLE}                        SET show_view_count = %s, show_like_count=%s                        WHERE wx_sn = %s;                        """                        db_client.save(sql=update_sql,                                       params=(show_view_count, show_like_count, wx_sn))                        log(                            task="updatePublishedMsgDaily",                            function="insert_each_msg",                            message="更新文章数据成功",                            data={                                "wxSn": wx_sn,                                "likeCount": show_like_count,                                "viewCount": show_view_count                            }                        )                    except Exception as e:                        log(                            task="updatePublishedMsgDaily",                            function="insert_each_msg",                            message="更新文章失败, 报错原因是: {}".format(e),                            status="fail"                        )                        continuedef update_each_account(db_client: DatabaseConnector(piaoquan_crawler_config), account_info: Dict, latest_update_time: int, cursor=None):    """    更新每一个账号信息    :param account_info:    :param cursor:    :param latest_update_time: 最新更新时间    :param db_client: 数据库连接信息    :return: None    """    gh_id = account_info['ghId']    response = spider.update_msg_list(ghId=gh_id, index=cursor)    msg_list = response.get("data", {}).get("data", {})    if msg_list:        # do        last_article_in_this_msg = msg_list[-1]        last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']        last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']        resdata = spider.get_account_by_url(last_url)        check_id = resdata['data'].get('data', {}).get('wx_gh')        if check_id == gh_id:            insert_each_msg(                db_client=db_client,                account_info=account_info,                msg_list=msg_list            )            if last_time_stamp_in_this_msg > latest_update_time:                next_cursor = response['data']['next_cursor']                return update_each_account(                    db_client=db_client,                    account_info=account_info,                    latest_update_time=latest_update_time,                    cursor=next_cursor                )            log(                task="updatePublishedMsgDaily",                function="update_each_account",                message="账号文章更新成功",                data=response            )    else:        log(            task="updatePublishedMsgDaily",            function="update_each_account",            message="账号文章更新失败",            status="fail",            data=response        )        returndef check_account_info(piaoquan_crawler_db_client: DatabaseConnector(piaoquan_crawler_config), gh_id: str) -> int:    """    通过 gh_id查询账号信息的最新发布时间    :param piaoquan_crawler_db_client:    :param gh_id:    :return:    """    sql = f"""        SELECT MAX(publish_timestamp)        FROM {ARTICLE_TABLE}        WHERE ghId = '{gh_id}';        """    result = piaoquan_crawler_db_client.fetch(sql)    if result:        return result[0][0]    else:        # 新号,抓取周期定位抓取时刻往前推30天        return int(time.time()) - const.NEW_ACCOUNT_CRAWL_PERIODdef update_single_account(piaoquan_crawler_db_client: DatabaseConnector(piaoquan_crawler_config), account_info: Dict):    """    更新单个账号    :param piaoquan_crawler_db_client:    :param account_info:    :return:    """    gh_id = account_info['ghId']    max_publish_time = check_account_info(piaoquan_crawler_db_client, gh_id)    update_each_account(        db_client=piaoquan_crawler_db_client,        account_info=account_info,        latest_update_time=max_publish_time    )def check_single_account(db_client, account_item):    """    校验每个账号是否更新    :param db_client:    :param account_item:    :return: True / False    """    gh_id = account_item['ghId']    account_type = account_item['account_type']    today_str = datetime.today().strftime("%Y-%m-%d")    today_date_time = datetime.strptime(today_str, "%Y-%m-%d")    today_timestamp = today_date_time.timestamp()    sql = f"""            SELECT max(updateTime)            FROM {ARTICLE_TABLE}            WHERE ghId = '{gh_id}';            """    try:        latest_update_time = db_client.fetch(sql)[0][0]        # 判断该账号当天发布的文章是否被收集        if account_type in const.SUBSCRIBE_TYPE_SET:            if int(latest_update_time) > int(today_timestamp):                return True            else:                return False        else:            if int(latest_update_time) > int(today_timestamp) - 7 * 24 * 3600:                return True            else:                return False    except Exception as e:        print(e)        return Falsedef update_job(piaoquan_crawler_db_client, aigc_db_client):    """    更新任务    :return:    """    account_list = get_accounts(aigc_db_client=aigc_db_client)    # 订阅号    subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]    success_count = 0    fail_count = 0    for sub_item in tqdm(subscription_accounts):        try:            update_single_account(piaoquan_crawler_db_client, sub_item)            success_count += 1            time.sleep(5)        except Exception as e:            fail_count += 1            log(                task="updatePublishedMsgDaily",                function="update_job",                message="单个账号文章更新失败, 报错信息是: {}".format(e),                status="fail",            )    log(        task="updatePublishedMsgDaily",        function="update_job",        message="订阅号更新完成",        data={            "success": success_count,            "fail": fail_count        }    )    if fail_count / (success_count + fail_count) > const.SUBSCRIBE_FAIL_RATE_THRESHOLD:        bot(            title="订阅号超过 30% 的账号更新失败",            detail={                "success": success_count,                "fail": fail_count,                "failRate": fail_count / (success_count + fail_count)            }        )    bot(        title="更新每日发布文章任务完成通知",        detail={            "msg": "订阅号更新完成",            "finish_time": datetime.today().__str__()        },        mention=False    )    # 服务号    server_accounts = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]    for sub_item in tqdm(server_accounts):        try:            update_single_account(piaoquan_crawler_db_client, sub_item)            time.sleep(5)        except Exception as e:            print(e)    bot(        title="更新每日发布文章任务完成通知",        detail={            "msg": "服务号更新完成",            "finish_time": datetime.today().__str__()        },        mention=False    )def check_job(piaoquan_crawler_db_client, aigc_db_client):    """    校验任务    :return:    """    account_list = get_accounts(aigc_db_client=aigc_db_client)    # 订阅号    subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]    fail_list = []    # check and rework if fail    for sub_item in tqdm(subscription_accounts):        res = check_single_account(piaoquan_crawler_db_client, sub_item)        if not res:            update_single_account(piaoquan_crawler_db_client, sub_item)    # check whether success and bot if fails    for sub_item in tqdm(account_list):        res = check_single_account(piaoquan_crawler_db_client, sub_item)        if not res:            # 去掉三个不需要查看的字段            sub_item.pop('account_type', None)            sub_item.pop('account_auth', None)            sub_item.pop('account_id', None)            fail_list.append(sub_item)    if fail_list:        try:            bot(                title="更新当天发布文章,存在未更新的账号",                detail={                    "columns": generate_bot_columns(),                    "rows": fail_list                },                table=True            )        except Exception as e:            print("Timeout Error: {}".format(e))    else:        bot(            title="更新当天发布文章,所有账号均更新成功",            mention=False,            detail={                "msg": "校验任务完成",                "finish_time": datetime.today().__str__()            }        )def get_articles(db_client):    """    :return:    """    sql = f"""    SELECT ContentUrl, wx_sn     FROM {ARTICLE_TABLE}    WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};"""    response = db_client.fetch(sql)    return responsedef update_publish_timestamp(db_client, row):    """    更新发布时间戳 && minigram 信息    :param db_client:    :param row:    :return:    """    url = row[0]    wx_sn = row[1]    try:        response = spider.get_article_text(url)        response_code = response['code']        if response_code == const.ARTICLE_DELETE_CODE:            publish_timestamp_s = const.DELETE_STATUS            root_source_id_list = []        elif response_code == const.ARTICLE_ILLEGAL_CODE:            publish_timestamp_s = const.ILLEGAL_STATUS            root_source_id_list = []        elif response_code == const.ARTICLE_SUCCESS_CODE:            data = response['data']['data']            publish_timestamp_ms = data['publish_timestamp']            publish_timestamp_s = int(publish_timestamp_ms / 1000)            mini_program = data.get('mini_program', [])            if mini_program:                root_source_id_list = [                    urllib.parse.parse_qs(                        urllib.parse.unquote(i['path'])                    )['rootSourceId'][0]                    for i in mini_program                ]            else:                root_source_id_list = []        else:            publish_timestamp_s = const.UNKNOWN_STATUS            root_source_id_list = []    except Exception as e:        publish_timestamp_s = const.REQUEST_FAIL_STATUS        root_source_id_list = None        error_msg = traceback.format_exc()        print(e, error_msg)    update_sql = f"""            UPDATE {ARTICLE_TABLE}            SET publish_timestamp = %s, root_source_id_list = %s            WHERE wx_sn = %s;        """    db_client.save(        sql=update_sql,        params=(            publish_timestamp_s,            json.dumps(root_source_id_list, ensure_ascii=False),            wx_sn        ))    if publish_timestamp_s == const.REQUEST_FAIL_STATUS:        return row    else:        return Nonedef get_article_detail_job(piaoquan_crawler_db_client):    """    获取发布文章详情    :return:    """    article_tuple = get_articles(piaoquan_crawler_db_client)    for article in tqdm(article_tuple):        try:            update_publish_timestamp(db_client=piaoquan_crawler_db_client, row=article)        except Exception as e:            print(e)            error_msg = traceback.format_exc()            print(error_msg)    # check 一遍存在请求失败-1 && 0 的文章    process_failed_articles = get_articles(piaoquan_crawler_db_client)    fail_list = []    if process_failed_articles:        for article in tqdm(process_failed_articles):            try:                res = update_publish_timestamp(db_client=piaoquan_crawler_db_client, row=article)                fail_list.append({"wx_sn": res[1], "url": res[0]})            except Exception as e:                print(e)                error_msg = traceback.format_exc()                print(error_msg)    # 通过msgId 来修改publish_timestamp    update_sql = f"""        UPDATE {ARTICLE_TABLE} oav         JOIN (            SELECT appMsgId, MAX(publish_timestamp) AS publish_timestamp             FROM {ARTICLE_TABLE}             WHERE publish_timestamp > %s             GROUP BY appMsgId            ) vv             ON oav.appMsgId = vv.appMsgId        SET oav.publish_timestamp = vv.publish_timestamp        WHERE oav.publish_timestamp <= %s;    """    piaoquan_crawler_db_client.save(        sql=update_sql,        params=(0, 0)    )    # 若还是无 publish_timestamp,用update_time当作 publish_timestamp    update_sql_2 = f"""        UPDATE {ARTICLE_TABLE}        SET publish_timestamp = updateTime        WHERE publish_timestamp < %s;    """    piaoquan_crawler_db_client.save(        sql=update_sql_2,        params=0    )    if fail_list:        bot(            title="更新文章任务,请求detail失败",            detail=fail_list        )def whether_title_unsafe(db_client, title):    """    检查文章标题是否已经存在违规记录    :param db_client:    :param title:    :return:    """    title_md5 = functions.str_to_md5(title)    sql = f"""        SELECT title_md5        FROM article_unsafe_title        WHERE title_md5 = '{title_md5}';    """    res = db_client.fetch(sql)    if res:        return True    else:        return Falsedef monitor(piaoquan_crawler_db_client, long_articles_db_client, run_date):    """    监控任务, 监测周期为7天,监测文章是否被违规,若监测到违规文章,则进行告警    :return:    """    if not run_date:        run_date = datetime.today().strftime("%Y-%m-%d")    monitor_start_timestamp = int(datetime.strptime(run_date, "%Y-%m-%d").timestamp()) - const.MONITOR_PERIOD    select_sql = f"""        SELECT ghId, accountName, title, ContentUrl, wx_sn, from_unixtime(publish_timestamp) AS publish_timestamp        FROM {ARTICLE_TABLE}        WHERE publish_timestamp >= {monitor_start_timestamp};    """    article_list = piaoquan_crawler_db_client.fetch(select_sql)    for article in tqdm(article_list, desc="monitor article list"):        gh_id = article[0]        account_name = article[1]        title = article[2]        # 判断标题是否存在违规记录        if whether_title_unsafe(long_articles_db_client, title):            continue        url = article[3]        wx_sn = article[4]        publish_date = article[5]        try:            response = spider.get_article_text(url, is_cache=False)            response_code = response['code']            if response_code == const.ARTICLE_ILLEGAL_CODE:                bot(                    title="文章违规告警",                    detail={                        "ghId": gh_id,                        "accountName": account_name,                        "title": title,                        "wx_sn": str(wx_sn),                        "publish_date": str(publish_date)                    },                    mention=False                )                aiditApi.delete_articles(                    gh_id=gh_id,                    title=title                )        except Exception as e:            error_msg = traceback.format_exc()            log(                task="monitor",                function="monitor",                message="请求文章详情失败",                data={                    "ghId": gh_id,                    "accountName": account_name,                    "title": title,                    "wx_sn": str(wx_sn),                    "error": str(e),                    "msg": error_msg                }            )def main():    """    main    :return:    """    parser = ArgumentParser()    parser.add_argument(        "--run_task",        help="update: update_job, check: check_job, detail: get_article_detail_job, monitor: monitor")    parser.add_argument(        "--run_date",        help="--run_date %Y-%m-%d",    )    args = parser.parse_args()    # 初始化数据库连接    try:        piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)        piaoquan_crawler_db_client.connect()        aigc_db_client = DatabaseConnector(denet_config)        aigc_db_client.connect()        long_articles_db_client = DatabaseConnector(long_articles_config)    except Exception as e:        error_msg = traceback.format_exc()        bot(            title="更新文章任务连接数据库失败",            detail={                "error": e,                "msg": error_msg            }        )        return    if args.run_task:        run_task = args.run_task        match run_task:            case "update":                update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)            case "check":                check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)            case "detail":                get_article_detail_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client)            case "monitor":                if args.run_date:                    run_date = args.run_date                else:                    run_date = None                monitor(piaoquan_crawler_db_client=piaoquan_crawler_db_client,                        long_articles_db_client=long_articles_db_client, run_date=run_date)            case _:                print("No such task, input update: update_job, check: check_job, detail: get_article_detail_job")    else:        update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)        check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)        get_article_detail_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client)if __name__ == '__main__':    main()
 |