123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- """
- @author: luojunhui
- """
- import time
- import json
- import requests
- import schedule
- from tqdm import tqdm
- from datetime import datetime
- from config import accountBaseInfo
- from applications import PQMySQL, WeixinSpider, Functions
- class UpdateMsgDaily(object):
- """
- 日常更新文章
- """
- db_client = PQMySQL()
- spider = WeixinSpider()
- functions = Functions()
- @classmethod
- def getAccountIdDict(cls):
- """
- 获取全部内部账号的id
- :return:
- """
- gh_id_dict = {}
- for key in accountBaseInfo:
- gh_id = accountBaseInfo[key]["ghId"]
- name = accountBaseInfo[key]["accountName"]
- gh_id_dict[gh_id] = name
- return gh_id_dict
- @classmethod
- def bot(cls, account_list):
- """
- 机器人
- """
- url = "https://open.feishu.cn/open-apis/bot/v2/hook/b44333f2-16c0-4cb1-af01-d135f8704410"
- headers = {"Content-Type": "application/json"}
- payload = {
- "msg_type": "interactive",
- "card": {
- "elements": [
- {
- "tag": "div",
- "text": {
- "content": "存在文章更新失败<at id=all></at>\n",
- "tag": "lark_md",
- },
- },
- {
- "tag": "div",
- "text": {
- "content": json.dumps(
- account_list, ensure_ascii=False, indent=4
- ),
- "tag": "lark_md",
- },
- },
- ],
- "header": {"title": {"content": "【重点关注】", "tag": "plain_text"}},
- },
- }
- requests.request("POST", url=url, headers=headers, data=json.dumps(payload))
- @classmethod
- def findAccountLatestUpdateTime(cls, gh_id):
- """
- 获取账号的最近更新id
- :param gh_id:
- :return:
- """
- sql = f"""
- select accountName, updateTime
- from official_articles_v2
- where ghId = '{gh_id}'
- order by updateTime DESC;
- """
- result = cls.db_client.select(sql)
- if result:
- account_name, update_time = result[0]
- return {"update_time": update_time, "account_type": "history"}
- else:
- return {
- "update_time": int(time.time()) - 30 * 24 * 60 * 60,
- "account_type": "new",
- }
- @classmethod
- def updateMsgList(cls, gh_id, account_name, msg_list):
- """
- 把消息数据更新到数据库中
- :param account_name:
- :param gh_id:
- :param msg_list:
- :return:
- """
- for info in msg_list:
- baseInfo = info.get("BaseInfo", {})
- appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
- createTime = (
- info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
- )
- updateTime = (
- info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
- )
- Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
- detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
- if detail_article_list:
- for article in detail_article_list:
- title = article.get("Title", None)
- Digest = article.get("Digest", None)
- ItemIndex = article.get("ItemIndex", None)
- ContentUrl = article.get("ContentUrl", None)
- SourceUrl = article.get("SourceUrl", None)
- CoverImgUrl = article.get("CoverImgUrl", None)
- CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
- CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
- ItemShowType = article.get("ItemShowType", None)
- IsOriginal = article.get("IsOriginal", None)
- ShowDesc = article.get("ShowDesc", None)
- show_stat = cls.functions.show_desc_to_sta(ShowDesc)
- ori_content = article.get("ori_content", None)
- show_view_count = show_stat.get("show_view_count", 0)
- show_like_count = show_stat.get("show_like_count", 0)
- show_zs_count = show_stat.get("show_zs_count", 0)
- show_pay_count = show_stat.get("show_pay_count", 0)
- wx_sn = (
- ContentUrl.split("&sn=")[1].split("&")[0]
- if ContentUrl
- else None
- )
- info_tuple = (
- gh_id,
- account_name,
- appMsgId,
- title,
- Type,
- createTime,
- updateTime,
- Digest,
- ItemIndex,
- ContentUrl,
- SourceUrl,
- CoverImgUrl,
- CoverImgUrl_1_1,
- CoverImgUrl_235_1,
- ItemShowType,
- IsOriginal,
- ShowDesc,
- ori_content,
- show_view_count,
- show_like_count,
- show_zs_count,
- show_pay_count,
- wx_sn,
- json.dumps(baseInfo, ensure_ascii=False),
- )
- try:
- insert_sql = f"""
- INSERT INTO official_articles_v2
- (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- cls.db_client.update(sql=insert_sql, params=info_tuple)
- except Exception as e:
- try:
- update_sql = f"""
- UPDATE official_articles_v2
- SET show_view_count = %s, show_like_count=%s
- WHERE wx_sn = %s;
- """
- cls.db_client.update(
- sql=update_sql,
- params=(show_view_count, show_like_count, wx_sn),
- )
- except Exception as e:
- print("失败-{}".format(e))
- continue
- @classmethod
- def getAccountArticleList(cls, gh_id, last_update_time, cursor=None):
- """
- 输入ghid获取账号的文章list
- :return:
- """
- response = cls.spider.update_msg_list(ghId=gh_id, index=cursor)
- msg_list = response.get("data", {}).get("data")
- if msg_list:
- last_article_in_this_msg = msg_list[-1]
- last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"][
- "BaseInfo"
- ]["UpdateTime"]
- last_url = last_article_in_this_msg["AppMsg"]["DetailInfo"][0]["ContentUrl"]
- # 校验是否抓到的是同一个账号
- resdata = cls.spider.get_account_by_url(last_url)
- check_name = resdata["data"].get("data", {}).get("account_name")
- check_id = resdata["data"].get("data", {}).get("wx_gh")
- if check_id == gh_id:
- cls.updateMsgList(gh_id, check_name, msg_list)
- if last_time_stamp_in_this_msg > last_update_time:
- next_cursor = response["data"]["next_cursor"]
- return cls.getAccountArticleList(
- gh_id=gh_id,
- last_update_time=last_update_time,
- cursor=next_cursor,
- )
- else:
- response = {
- "code": 1002,
- "info": "抓取时候账号校验失败",
- "error": None,
- "gh_id": gh_id,
- "time_stamp": datetime.now().__str__(),
- }
- print(response)
- else:
- response = {
- "code": 1003,
- "info": "账号为抓取到内容",
- "error": None,
- "gh_id": gh_id,
- "time_stamp": datetime.now().__str__(),
- }
- print(response)
- @classmethod
- def checkEachAccount(cls, gh_id):
- """
- 验证单个账号是否当天有更新
- :param gh_id:
- :return:
- """
- today_str = datetime.today().strftime("%Y-%m-%d")
- today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
- today_timestamp = today_date_time.timestamp()
- sql = f"""
- select updateTime
- from official_articles_v2
- where ghId = '{gh_id}'
- order by updateTime
- desc
- """
- latest_update_time = cls.db_client.select(sql)[0][0]
- # 判断该账号当天发布的文章是否被收集
- if int(latest_update_time) > int(today_timestamp):
- return True
- else:
- return False
- @classmethod
- def updateJob(cls):
- """
- 更新文章任务
- :return:
- """
- account_list = cls.getAccountIdDict()
- for account_id in tqdm(account_list):
- account_info = cls.findAccountLatestUpdateTime(account_id)
- latest_time = account_info["update_time"]
- try:
- cls.getAccountArticleList(
- gh_id=account_id, last_update_time=latest_time
- )
- except Exception as e:
- response = {
- "code": 1001,
- "info": "单个账号更新失败",
- "error": str(e),
- "time_stamp": datetime.now().__str__(),
- }
- print(response)
- @classmethod
- def checkJob(cls):
- """
- 验证所有账号是否已经有更新数据
- :return:
- todo: 被封禁账号&&服务号需要做区分
- """
- account_dict = cls.getAccountIdDict()
- error_account_list = []
- for account_id in tqdm(account_dict):
- if cls.checkEachAccount(account_id):
- continue
- else:
- name = account_dict[account_id]
- error_account_list.append(name)
- if error_account_list:
- cls.bot(error_account_list)
- if __name__ == "__main__":
- UMD = UpdateMsgDaily()
- schedule.every().day.at("21:00").do(UMD.updateJob)
- schedule.every().day.at("21:30").do(UMD.checkJob)
- while True:
- schedule.run_pending()
- time.sleep(1)
|