123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378 |
- """
- @author: luojunhui
- @description: update daily information into official articles v2
- """
- import time
- import json
- import threading
- import schedule
- from tqdm import tqdm
- from datetime import datetime
- from applications import PQMySQL, WeixinSpider, Functions, log, bot
- def getAccounts():
- """
- 获取账号信息
- :return: [{}, {},...], [{}, {}, {}...]
- """
- with open("config/accountInfoV0914.json", encoding="utf-8") as f:
- account_list = json.loads(f.read())
- subscription_account = [i for i in account_list if i['type'] == '订阅号']
- server_account = [i for i in account_list if i['type'] == '服务号']
- return subscription_account, server_account
- def insertEachMsg(db_client, gh_id, account_name, msg_list):
- """
- 把消息数据更新到数据库中
- :param db_client:
- :param account_name:
- :param gh_id:
- :param msg_list:
- :return:
- """
- for info in msg_list:
- baseInfo = info.get("BaseInfo", {})
- appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
- createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
- updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
- Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
- detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
- if detail_article_list:
- for article in detail_article_list:
- title = article.get("Title", None)
- Digest = article.get("Digest", None)
- ItemIndex = article.get("ItemIndex", None)
- ContentUrl = article.get("ContentUrl", None)
- SourceUrl = article.get("SourceUrl", None)
- CoverImgUrl = article.get("CoverImgUrl", None)
- CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
- CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
- ItemShowType = article.get("ItemShowType", None)
- IsOriginal = article.get("IsOriginal", None)
- ShowDesc = article.get("ShowDesc", None)
- show_stat = Functions().show_desc_to_sta(ShowDesc)
- ori_content = article.get("ori_content", None)
- show_view_count = show_stat.get("show_view_count", 0)
- show_like_count = show_stat.get("show_like_count", 0)
- show_zs_count = show_stat.get("show_zs_count", 0)
- show_pay_count = show_stat.get("show_pay_count", 0)
- wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
- info_tuple = (
- gh_id,
- account_name,
- appMsgId,
- title,
- Type,
- createTime,
- updateTime,
- Digest,
- ItemIndex,
- ContentUrl,
- SourceUrl,
- CoverImgUrl,
- CoverImgUrl_1_1,
- CoverImgUrl_235_1,
- ItemShowType,
- IsOriginal,
- ShowDesc,
- ori_content,
- show_view_count,
- show_like_count,
- show_zs_count,
- show_pay_count,
- wx_sn,
- json.dumps(baseInfo, ensure_ascii=False),
- Functions().str_to_md5(title)
- )
- try:
- insert_sql = f"""
- INSERT INTO official_articles_v2
- (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- db_client.update(sql=insert_sql, params=info_tuple)
- log(
- task="updatePublishedMsgDaily",
- function="insertEachMsg",
- message="插入文章数据成功",
- data={
- "info": info_tuple
- }
- )
- except Exception as e:
- try:
- update_sql = f"""
- UPDATE official_articles_v2
- SET show_view_count = %s, show_like_count=%s
- WHERE wx_sn = %s;
- """
- db_client.update(sql=update_sql,
- params=(show_view_count, show_like_count, wx_sn))
- log(
- task="updatePublishedMsgDaily",
- function="insertEachMsg",
- message="更新文章数据成功",
- data={
- "wxSn": wx_sn,
- "likeCount": show_like_count,
- "viewCount": show_view_count
- }
- )
- except Exception as e:
- log(
- task="updatePublishedMsgDaily",
- function="insertEachMsg",
- message="更新文章失败, 报错原因是: {}".format(e),
- status="fail"
- )
- continue
- def updateEachAccount(db_client, gh_id, account_name, latest_update_time, cursor=None):
- """
- 更新每一个账号信息
- :param account_name:
- :param cursor:
- :param latest_update_time: 最新更新时间
- :param db_client: 数据库连接信息
- :param gh_id: 公众号 gh_id
- :return: None
- """
- response = WeixinSpider().update_msg_list(ghId=gh_id, index=cursor)
- msg_list = response.get("data", {}).get("data", {})
- if msg_list:
- # do
- last_article_in_this_msg = msg_list[-1]
- last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
- last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
- resdata = WeixinSpider().get_account_by_url(last_url)
- check_id = resdata['data'].get('data', {}).get('wx_gh')
- if check_id == gh_id:
- insertEachMsg(
- db_client=db_client,
- gh_id=gh_id,
- account_name=account_name,
- msg_list=msg_list
- )
- if last_time_stamp_in_this_msg > latest_update_time:
- next_cursor = response['data']['next_cursor']
- return updateEachAccount(
- db_client=db_client,
- gh_id=gh_id,
- account_name=account_name,
- latest_update_time=latest_update_time,
- cursor=next_cursor
- )
- log(
- task="updatePublishedMsgDaily",
- function="updateEachAccount",
- message="账号文章更新成功",
- data=response
- )
- else:
- log(
- task="updatePublishedMsgDaily",
- function="updateEachAccount",
- message="账号文章更新失败",
- status="fail",
- data=response
- )
- return
- def checkAccountInfo(db_client, gh_id):
- """
- 通过 gh_id查询视频信息
- :param db_client:
- :param gh_id:
- :return:
- """
- sql = f"""
- select accountName, updateTime
- from official_articles_v2
- where ghId = '{gh_id}'
- order by updateTime DESC;
- """
- result = db_client.select(sql)
- if result:
- account_name, update_time = result[0]
- return {
- "account_name": account_name,
- "update_time": update_time,
- "account_type": "history"
- }
- else:
- return {
- "account_name": "",
- "update_time": int(time.time()) - 30 * 24 * 60 * 60,
- "account_type": "new"
- }
- def updateSingleJob(db_client, gh_id):
- """
- :param db_client:
- :param gh_id:
- :return:
- """
- account_info = checkAccountInfo(db_client, gh_id)
- account_name = account_info['account_name']
- update_time = account_info['update_time']
- updateEachAccount(
- db_client=db_client,
- gh_id=gh_id,
- account_name=account_name,
- latest_update_time=update_time
- )
- def checkSingleAccount(db_client, account_item):
- """
- 校验每个账号是否更新
- :param db_client:
- :param account_item:
- :return: True / False
- """
- gh_id = account_item['ghId']
- account_type = account_item['type']
- today_str = datetime.today().strftime("%Y-%m-%d")
- today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
- today_timestamp = today_date_time.timestamp()
- sql = f"""
- select updateTime
- from official_articles_v2
- where ghId = '{gh_id}'
- order by updateTime
- desc;
- """
- try:
- latest_update_time = db_client.select(sql)[0][0]
- # 判断该账号当天发布的文章是否被收集
- if account_type == "订阅号":
- if int(latest_update_time) > int(today_timestamp):
- return True
- else:
- return False
- else:
- if int(latest_update_time) > int(today_timestamp) - 7 * 24 * 3600:
- return True
- else:
- return False
- except Exception as e:
- print("updateTime Error -- {}".format(e))
- return False
- def updateJob():
- """
- 更新任务
- :return:
- """
- db_client = PQMySQL()
- sub_accounts, server_accounts = getAccounts()
- s_count = 0
- f_count = 0
- for sub_item in tqdm(sub_accounts):
- try:
- updateSingleJob(db_client, sub_item['ghId'])
- s_count += 1
- time.sleep(5)
- except Exception as e:
- f_count += 1
- log(
- task="updatePublishedMsgDaily",
- function="updateJob",
- message="单个账号文章更新失败, 报错信息是: {}".format(e),
- status="fail",
- )
- log(
- task="updatePublishedMsgDaily",
- function="updateJob",
- message="订阅号更新完成",
- data={
- "success": s_count,
- "fail": f_count
- }
- )
- if f_count / (s_count + f_count) > 0.3:
- bot(
- title="订阅号超过 30% 的账号更新失败",
- detail={
- "success": s_count,
- "fail": f_count,
- "failRate": f_count / (s_count + f_count)
- }
- )
- for sub_item in tqdm(server_accounts):
- try:
- updateSingleJob(db_client, sub_item['ghId'])
- time.sleep(5)
- except Exception as e:
- print(e)
- def checkJob():
- """
- 校验任务
- :return:
- """
- db_client = PQMySQL()
- sub_accounts, server_accounts = getAccounts()
- fail_list = []
- # account_list = sub_accounts + server_accounts
- account_list = sub_accounts
- # check and rework if fail
- for sub_item in tqdm(account_list):
- res = checkSingleAccount(db_client, sub_item)
- if not res:
- updateSingleJob(db_client, sub_item['ghId'])
- # check whether success and bot if fails
- for sub_item in tqdm(account_list):
- res = checkSingleAccount(db_client, sub_item)
- if not res:
- fail_list.append(sub_item)
- if fail_list:
- try:
- bot(
- title="日常报警, 存在账号更新失败",
- detail=fail_list
- )
- except Exception as e:
- print("Timeout Error: {}".format(e))
- def job_with_thread(job_func):
- """
- 每个任务放到单个线程中
- :param job_func:
- :return:
- """
- job_thread = threading.Thread(target=job_func)
- job_thread.start()
- if __name__ == '__main__':
- schedule.every().day.at("20:50").do(job_with_thread, updateJob)
- schedule.every().day.at("21:45").do(job_with_thread, checkJob)
- while True:
- schedule.run_pending()
- time.sleep(1)
- # log(
- # task="updatePublishedMsgDaily",
- # function="main",
- # message="更新公众号文章信息任务正常执行"
- # )
|