""" @author: luojunhui @description: update daily information into official articles v2 """ import time import json import traceback from tqdm import tqdm from datetime import datetime from applications import PQMySQL, WeixinSpider, Functions, log, bot, aiditApi ARTICLE_TABLE = "official_articles_v2" def get_accounts_v1(): """ 获取账号信息 :return: [{}, {},...], [{}, {}, {}...] """ with open("config/accountInfoV0914.json", encoding="utf-8") as f: account_list = json.loads(f.read()) subscription_account = [i for i in account_list if i['type'] == '订阅号'] server_account = [i for i in account_list if i['type'] == '服务号'] return subscription_account, server_account def get_account_using_status(): """ 获取正在 using 的 ghid :return: """ sql = "SELECT gh_id FROM long_articles_publishing_accounts WHERE is_using = 1;" gh_id_tuple = PQMySQL().select(sql) gh_id_list = [ i[0] for i in gh_id_tuple ] return set(gh_id_list) def get_accounts(): """ 从 aigc 数据库中获取目前处于发布状态的账号 :return: "name": line[0], "ghId": line[1], "follower_count": line[2], "account_init_time": int(line[3] / 1000), "account_type": line[4], "account_auth": line[5] """ using_account_set = get_account_using_status() account_list_with_out_using_status = aiditApi.get_publish_account_from_aigc() account_list = [] for item in account_list_with_out_using_status: if item['ghId'] in using_account_set: item['using_status'] = 1 else: item['using_status'] = 0 account_list.append(item) subscription_account = [i for i in account_list if i['account_type'] in {0, 1}] server_account = [i for i in account_list if i['account_type'] == 2] return subscription_account, server_account def insert_each_msg(db_client, account_info, account_name, msg_list): """ 把消息数据更新到数据库中 :param account_info: :param db_client: :param account_name: :param msg_list: :return: """ gh_id = account_info['ghId'] for info in msg_list: baseInfo = info.get("BaseInfo", {}) appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None) createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None) updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None) Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None) detail_article_list = info.get("AppMsg", {}).get("DetailInfo", []) if detail_article_list: for article in detail_article_list: title = article.get("Title", None) Digest = article.get("Digest", None) ItemIndex = article.get("ItemIndex", None) ContentUrl = article.get("ContentUrl", None) SourceUrl = article.get("SourceUrl", None) CoverImgUrl = article.get("CoverImgUrl", None) CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None) CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None) ItemShowType = article.get("ItemShowType", None) IsOriginal = article.get("IsOriginal", None) ShowDesc = article.get("ShowDesc", None) show_stat = Functions().show_desc_to_sta(ShowDesc) ori_content = article.get("ori_content", None) show_view_count = show_stat.get("show_view_count", 0) show_like_count = show_stat.get("show_like_count", 0) show_zs_count = show_stat.get("show_zs_count", 0) show_pay_count = show_stat.get("show_pay_count", 0) wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None status = account_info['using_status'] info_tuple = ( gh_id, account_name, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_235_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, json.dumps(baseInfo, ensure_ascii=False), Functions().str_to_md5(title), status ) try: insert_sql = f""" INSERT INTO {ARTICLE_TABLE} (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ db_client.update(sql=insert_sql, params=info_tuple) log( task="updatePublishedMsgDaily", function="insert_each_msg", message="插入文章数据成功", data={ "info": info_tuple } ) except Exception as e: try: update_sql = f""" UPDATE {ARTICLE_TABLE} SET show_view_count = %s, show_like_count=%s WHERE wx_sn = %s; """ db_client.update(sql=update_sql, params=(show_view_count, show_like_count, wx_sn)) log( task="updatePublishedMsgDaily", function="insert_each_msg", message="更新文章数据成功", data={ "wxSn": wx_sn, "likeCount": show_like_count, "viewCount": show_view_count } ) except Exception as e: log( task="updatePublishedMsgDaily", function="insert_each_msg", message="更新文章失败, 报错原因是: {}".format(e), status="fail" ) continue def update_each_account(db_client, account_info, account_name, latest_update_time, cursor=None): """ 更新每一个账号信息 :param account_info: :param account_name: :param cursor: :param latest_update_time: 最新更新时间 :param db_client: 数据库连接信息 :return: None """ gh_id = account_info['ghId'] response = WeixinSpider().update_msg_list(ghId=gh_id, index=cursor) msg_list = response.get("data", {}).get("data", {}) if msg_list: # do last_article_in_this_msg = msg_list[-1] last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime'] last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl'] resdata = WeixinSpider().get_account_by_url(last_url) check_id = resdata['data'].get('data', {}).get('wx_gh') if check_id == gh_id: insert_each_msg( db_client=db_client, account_info=account_info, account_name=account_name, msg_list=msg_list ) if last_time_stamp_in_this_msg > latest_update_time: next_cursor = response['data']['next_cursor'] return update_each_account( db_client=db_client, account_info=account_info, account_name=account_name, latest_update_time=latest_update_time, cursor=next_cursor ) log( task="updatePublishedMsgDaily", function="update_each_account", message="账号文章更新成功", data=response ) else: log( task="updatePublishedMsgDaily", function="update_each_account", message="账号文章更新失败", status="fail", data=response ) return def check_account_info(db_client, gh_id, account_name): """ 通过 gh_id查询视频信息 :param account_name: :param db_client: :param gh_id: :return: """ sql = f""" SELECT accountName, updateTime FROM {ARTICLE_TABLE} WHERE ghId = '{gh_id}' ORDER BY updateTime DESC LIMIT 1; """ result = db_client.select(sql) if result: old_account_name, update_time = result[0] return { "account_name": old_account_name, "update_time": update_time, "account_type": "history" } else: return { "account_name": account_name, "update_time": int(time.time()) - 30 * 24 * 60 * 60, "account_type": "new" } def update_single_account(db_client, account_info): """ :param account_info: :param db_client: :return: """ gh_id = account_info['ghId'] account_name = account_info['name'] account_detail = check_account_info(db_client, gh_id, account_name) account_name = account_detail['account_name'] update_time = account_detail['update_time'] update_each_account( db_client=db_client, account_info=account_info, account_name=account_name, latest_update_time=update_time ) def check_single_account(db_client, account_item): """ 校验每个账号是否更新 :param db_client: :param account_item: :return: True / False """ gh_id = account_item['ghId'] account_type = account_item['account_type'] today_str = datetime.today().strftime("%Y-%m-%d") today_date_time = datetime.strptime(today_str, "%Y-%m-%d") today_timestamp = today_date_time.timestamp() sql = f""" SELECT updateTime FROM {ARTICLE_TABLE} WHERE ghId = '{gh_id}' ORDER BY updateTime DESC LIMIT 1; """ try: latest_update_time = db_client.select(sql)[0][0] # 判断该账号当天发布的文章是否被收集 if account_type in {0, 1}: if int(latest_update_time) > int(today_timestamp): return True else: return False else: if int(latest_update_time) > int(today_timestamp) - 7 * 24 * 3600: return True else: return False except Exception as e: print("updateTime Error -- {}".format(e)) return False def update_job(): """ 更新任务 :return: """ try: db_client = PQMySQL() except Exception as e: error_msg = traceback.format_exc() bot( title="更新文章任务连接数据库失败", detail={ "error": e, "msg": error_msg } ) return sub_accounts, server_accounts = get_accounts() s_count = 0 f_count = 0 for sub_item in tqdm(sub_accounts): try: update_single_account(db_client, sub_item) s_count += 1 time.sleep(5) except Exception as e: f_count += 1 log( task="updatePublishedMsgDaily", function="update_job", message="单个账号文章更新失败, 报错信息是: {}".format(e), status="fail", ) log( task="updatePublishedMsgDaily", function="update_job", message="订阅号更新完成", data={ "success": s_count, "fail": f_count } ) if f_count / (s_count + f_count) > 0.3: bot( title="订阅号超过 30% 的账号更新失败", detail={ "success": s_count, "fail": f_count, "failRate": f_count / (s_count + f_count) } ) bot( title="更新每日发布文章任务完成通知", detail={ "msg": "订阅号更新完成", "finish_time": datetime.today().__str__() }, mention=False ) for sub_item in tqdm(server_accounts): try: update_single_account(db_client, sub_item) time.sleep(5) except Exception as e: print(e) bot( title="更新每日发布文章任务完成通知", detail={ "msg": "服务号更新完成", "finish_time": datetime.today().__str__() }, mention=False ) def check_job(): """ 校验任务 :return: """ try: db_client = PQMySQL() except Exception as e: error_msg = traceback.format_exc() bot( title="校验更新文章任务连接数据库失败", detail={ "job": "check_job", "error": e, "msg": error_msg } ) return sub_accounts, server_accounts = get_accounts() fail_list = [] # account_list = sub_accounts + server_accounts account_list = sub_accounts # check and rework if fail for sub_item in tqdm(account_list): res = check_single_account(db_client, sub_item) if not res: update_single_account(db_client, sub_item) # check whether success and bot if fails for sub_item in tqdm(account_list): res = check_single_account(db_client, sub_item) if not res: fail_list.append(sub_item) if fail_list: try: bot( title="日常报警, 存在账号更新失败", detail=fail_list ) except Exception as e: print("Timeout Error: {}".format(e)) else: bot( title="校验完成通知", mention=False, detail={ "msg": "校验任务完成", "finish_time": datetime.today().__str__() } ) def main(): """ main :return: """ update_job() time.sleep(60) check_job() if __name__ == '__main__': main()