""" @author: luojunhui """ import time import json import threading import requests import schedule from tqdm import tqdm from datetime import datetime from config import accountBaseInfo from applications import PQMySQL, WeixinSpider, Functions from applications.decoratorApi import retryOnTimeout class UpdateMsgDaily(object): """ 日常更新文章 """ db_client = PQMySQL() spider = WeixinSpider() functions = Functions() @classmethod def getAccountIdDict(cls): """ 获取全部内部账号的id :return: """ gh_id_dict = {} for key in accountBaseInfo: gh_id = accountBaseInfo[key]["ghId"] name = accountBaseInfo[key]["accountName"] gh_id_dict[gh_id] = name return gh_id_dict @classmethod @retryOnTimeout() def bot(cls, account_list): """ 机器人 """ url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410" headers = {"Content-Type": "application/json"} payload = { "msg_type": "interactive", "card": { "elements": [ { "tag": "div", "text": { "content": "存在文章更新失败\n", "tag": "lark_md", }, }, { "tag": "div", "text": { "content": json.dumps( account_list, ensure_ascii=False, indent=4 ), "tag": "lark_md", }, }, ], "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}}, }, } requests.request("POST", url=url, headers=headers, data=json.dumps(payload), timeout=10) @classmethod def findAccountLatestUpdateTime(cls, gh_id): """ 获取账号的最近更新id :param gh_id: :return: """ sql = f""" select accountName, updateTime from official_articles_v2 where ghId = '{gh_id}' order by updateTime DESC; """ result = cls.db_client.select(sql) if result: account_name, update_time = result[0] return {"update_time": update_time, "account_type": "history"} else: return { "update_time": int(time.time()) - 30 * 24 * 60 * 60, "account_type": "new", } @classmethod def updateMsgList(cls, gh_id, account_name, msg_list): """ 把消息数据更新到数据库中 :param account_name: :param gh_id: :param msg_list: :return: """ for info in msg_list: baseInfo = info.get("BaseInfo", {}) appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None) createTime = ( info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None) ) updateTime = ( info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None) ) Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None) detail_article_list = info.get("AppMsg", {}).get("DetailInfo", []) if detail_article_list: for article in detail_article_list: title = article.get("Title", None) Digest = article.get("Digest", None) ItemIndex = article.get("ItemIndex", None) ContentUrl = article.get("ContentUrl", None) SourceUrl = article.get("SourceUrl", None) CoverImgUrl = article.get("CoverImgUrl", None) CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None) CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None) ItemShowType = article.get("ItemShowType", None) IsOriginal = article.get("IsOriginal", None) ShowDesc = article.get("ShowDesc", None) show_stat = cls.functions.show_desc_to_sta(ShowDesc) ori_content = article.get("ori_content", None) show_view_count = show_stat.get("show_view_count", 0) show_like_count = show_stat.get("show_like_count", 0) show_zs_count = show_stat.get("show_zs_count", 0) show_pay_count = show_stat.get("show_pay_count", 0) wx_sn = ( ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None ) info_tuple = ( gh_id, account_name, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_235_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, json.dumps(baseInfo, ensure_ascii=False), ) try: insert_sql = f""" INSERT INTO official_articles_v2 (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ cls.db_client.update(sql=insert_sql, params=info_tuple) except Exception as e: try: update_sql = f""" UPDATE official_articles_v2 SET show_view_count = %s, show_like_count=%s WHERE wx_sn = %s; """ cls.db_client.update( sql=update_sql, params=(show_view_count, show_like_count, wx_sn), ) except Exception as e: print("失败-{}".format(e)) continue @classmethod def getAccountArticleList(cls, gh_id, last_update_time, cursor=None): """ 输入ghid获取账号的文章list :return: """ try: response = cls.spider.update_msg_list(ghId=gh_id, index=cursor) except Exception as e: response = { "error": str(e), "info": "更新文章接口请求失败", "gh_id": gh_id, "time": datetime.now().__str__() } # 之后可以考虑抛出阿里云日志 print(response) return msg_list = response.get("data", {}).get("data") if msg_list: last_article_in_this_msg = msg_list[-1] last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"][ "BaseInfo" ]["UpdateTime"] last_url = last_article_in_this_msg["AppMsg"]["DetailInfo"][0]["ContentUrl"] # 校验是否抓到的是同一个账号 try: resdata = cls.spider.get_account_by_url(last_url) except Exception as e: resdata = { "error": str(e), "info": "通过链接获取账号信息失败", "gh_id": gh_id, "time": datetime.now().__str__() } print(resdata) return check_name = resdata["data"].get("data", {}).get("account_name") check_id = resdata["data"].get("data", {}).get("wx_gh") if check_id == gh_id: cls.updateMsgList(gh_id, check_name, msg_list) if last_time_stamp_in_this_msg > last_update_time: next_cursor = response["data"]["next_cursor"] return cls.getAccountArticleList( gh_id=gh_id, last_update_time=last_update_time, cursor=next_cursor, ) else: response = { "code": 1002, "info": "抓取时候账号校验失败", "error": None, "gh_id": gh_id, "time_stamp": datetime.now().__str__(), } print(response) else: response = { "code": 1003, "info": "账号为抓取到内容", "error": None, "gh_id": gh_id, "time_stamp": datetime.now().__str__(), } print(response) @classmethod def checkEachAccount(cls, gh_id): """ 验证单个账号是否当天有更新 :param gh_id: :return: """ today_str = datetime.today().strftime("%Y-%m-%d") today_date_time = datetime.strptime(today_str, "%Y-%m-%d") today_timestamp = today_date_time.timestamp() sql = f""" select updateTime from official_articles_v2 where ghId = '{gh_id}' order by updateTime desc """ latest_update_time = cls.db_client.select(sql)[0][0] # 判断该账号当天发布的文章是否被收集 if int(latest_update_time) > int(today_timestamp): return True else: return False @classmethod def updateJob(cls): """ 更新文章任务 :return: """ account_list = cls.getAccountIdDict() for account_id in tqdm(account_list): account_info = cls.findAccountLatestUpdateTime(account_id) latest_time = account_info["update_time"] try: cls.getAccountArticleList( gh_id=account_id, last_update_time=latest_time ) except Exception as e: response = { "code": 1001, "info": "单个账号更新失败", "error": str(e), "time_stamp": datetime.now().__str__(), } print(response) @classmethod def checkJob(cls): """ 验证所有账号是否已经有更新数据 :return: todo: 被封禁账号&&服务号需要做区分 """ account_dict = cls.getAccountIdDict() error_account_list = [] for account_id in tqdm(account_dict): if cls.checkEachAccount(account_id): continue else: name = account_dict[account_id] error_account_list.append(name) if error_account_list: try: cls.bot(error_account_list) except Exception as e: print("Timeout Error: {}".format(e)) def job_with_thread(job_func): """ 每个任务放到单个线程中 :param job_func: :return: """ job_thread = threading.Thread(target=job_func) job_thread.start() if __name__ == "__main__": UMD = UpdateMsgDaily() schedule.every().day.at("21:00").do(job_with_thread, UMD.updateJob) schedule.every().day.at("21:30").do(job_with_thread, UMD.checkJob) while True: schedule.run_pending() time.sleep(1)