""" @author: luojunhui """ import time import json import requests import schedule from tqdm import tqdm from datetime import datetime from config import accountBaseInfo from applications import PQMySQL, WeixinSpider, Functions class UpdateMsgDaily(object): """ 日常更新文章 """ db_client = PQMySQL() spider = WeixinSpider() functions = Functions() @classmethod def getAccountIdDict(cls): """ 获取全部内部账号的id :return: """ gh_id_dict = {} for key in accountBaseInfo: gh_id = accountBaseInfo[key]["ghId"] name = accountBaseInfo[key]["accountName"] gh_id_dict[gh_id] = name return gh_id_dict @classmethod def bot(cls, account_list): """ 机器人 """ url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410" headers = {"Content-Type": "application/json"} payload = { "msg_type": "interactive", "card": { "elements": [ { "tag": "div", "text": { "content": "存在文章更新失败\n", "tag": "lark_md", }, }, { "tag": "div", "text": { "content": json.dumps( account_list, ensure_ascii=False, indent=4 ), "tag": "lark_md", }, }, ], "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}}, }, } requests.request("POST", url=url, headers=headers, data=json.dumps(payload)) @classmethod def findAccountLatestUpdateTime(cls, gh_id): """ 获取账号的最近更新id :param gh_id: :return: """ sql = f""" select accountName, updateTime from official_articles_v2 where ghId = '{gh_id}' order by updateTime DESC; """ result = cls.db_client.select(sql) if result: account_name, update_time = result[0] return {"update_time": update_time, "account_type": "history"} else: return { "update_time": int(time.time()) - 30 * 24 * 60 * 60, "account_type": "new", } @classmethod def updateMsgList(cls, gh_id, account_name, msg_list): """ 把消息数据更新到数据库中 :param account_name: :param gh_id: :param msg_list: :return: """ for info in msg_list: baseInfo = info.get("BaseInfo", {}) appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None) createTime = ( info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None) ) updateTime = ( info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None) ) Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None) detail_article_list = info.get("AppMsg", {}).get("DetailInfo", []) if detail_article_list: for article in detail_article_list: title = article.get("Title", None) Digest = article.get("Digest", None) ItemIndex = article.get("ItemIndex", None) ContentUrl = article.get("ContentUrl", None) SourceUrl = article.get("SourceUrl", None) CoverImgUrl = article.get("CoverImgUrl", None) CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None) CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None) ItemShowType = article.get("ItemShowType", None) IsOriginal = article.get("IsOriginal", None) ShowDesc = article.get("ShowDesc", None) show_stat = cls.functions.show_desc_to_sta(ShowDesc) ori_content = article.get("ori_content", None) show_view_count = show_stat.get("show_view_count", 0) show_like_count = show_stat.get("show_like_count", 0) show_zs_count = show_stat.get("show_zs_count", 0) show_pay_count = show_stat.get("show_pay_count", 0) wx_sn = ( ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None ) info_tuple = ( gh_id, account_name, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_235_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, json.dumps(baseInfo, ensure_ascii=False), ) try: insert_sql = f""" INSERT INTO official_articles_v2 (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ cls.db_client.update(sql=insert_sql, params=info_tuple) except Exception as e: try: update_sql = f""" UPDATE official_articles_v2 SET show_view_count = %s, show_like_count=%s WHERE wx_sn = %s; """ cls.db_client.update( sql=update_sql, params=(show_view_count, show_like_count, wx_sn), ) except Exception as e: print("失败-{}".format(e)) continue @classmethod def getAccountArticleList(cls, gh_id, last_update_time, cursor=None): """ 输入ghid获取账号的文章list :return: """ response = cls.spider.update_msg_list(ghId=gh_id, index=cursor) msg_list = response.get("data", {}).get("data") if msg_list: last_article_in_this_msg = msg_list[-1] last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"][ "BaseInfo" ]["UpdateTime"] last_url = last_article_in_this_msg["AppMsg"]["DetailInfo"][0]["ContentUrl"] # 校验是否抓到的是同一个账号 resdata = cls.spider.get_account_by_url(last_url) check_name = resdata["data"].get("data", {}).get("account_name") check_id = resdata["data"].get("data", {}).get("wx_gh") if check_id == gh_id: cls.updateMsgList(gh_id, check_name, msg_list) if last_time_stamp_in_this_msg > last_update_time: next_cursor = response["data"]["next_cursor"] return cls.getAccountArticleList( gh_id=gh_id, last_update_time=last_update_time, cursor=next_cursor, ) else: response = { "code": 1002, "info": "抓取时候账号校验失败", "error": None, "gh_id": gh_id, "time_stamp": datetime.now().__str__(), } print(response) else: response = { "code": 1003, "info": "账号为抓取到内容", "error": None, "gh_id": gh_id, "time_stamp": datetime.now().__str__(), } print(response) @classmethod def checkEachAccount(cls, gh_id): """ 验证单个账号是否当天有更新 :param gh_id: :return: """ today_str = datetime.today().strftime("%Y-%m-%d") today_date_time = datetime.strptime(today_str, "%Y-%m-%d") today_timestamp = today_date_time.timestamp() sql = f""" select updateTime from official_articles_v2 where ghId = '{gh_id}' order by updateTime desc """ latest_update_time = cls.db_client.select(sql)[0][0] # 判断该账号当天发布的文章是否被收集 if int(latest_update_time) > int(today_timestamp): return True else: return False @classmethod def updateJob(cls): """ 更新文章任务 :return: """ account_list = cls.getAccountIdDict() for account_id in tqdm(account_list): account_info = cls.findAccountLatestUpdateTime(account_id) latest_time = account_info["update_time"] try: cls.getAccountArticleList( gh_id=account_id, last_update_time=latest_time ) except Exception as e: response = { "code": 1001, "info": "单个账号更新失败", "error": str(e), "time_stamp": datetime.now().__str__(), } print(response) @classmethod def checkJob(cls): """ 验证所有账号是否已经有更新数据 :return: todo: 被封禁账号&&服务号需要做区分 """ account_dict = cls.getAccountIdDict() error_account_list = [] for account_id in tqdm(account_dict): if cls.checkEachAccount(account_id): continue else: name = account_dict[account_id] error_account_list.append(name) if error_account_list: cls.bot(error_account_list) if __name__ == "__main__": UMD = UpdateMsgDaily() schedule.every().day.at("21:00").do(UMD.updateJob) schedule.every().day.at("21:30").do(UMD.checkJob) while True: schedule.run_pending() time.sleep(1)