""" @author: luojunhui """ import json import time from applications.wxSpider import ArticleManager from applications.functions import show_desc_to_sta class ArticleSpider(object): """ input: ghId, AccountName """ def __init__(self, params, mysql_client): self.endTime = None self.startTime = None self.ghId = None self.params = params self.mysql_client = mysql_client self.tools = ArticleManager() def checkParams(self): """ 校验参数 :return: """ try: self.ghId = self.params['ghId'] # self.startTime = self.params['startTime'] # self.endTime = self.params['endTime'] return None except Exception as e: return { "error": "Params Error", "msg": str(e), "params": self.params } async def checkAccount(self): """ 判断账号是否是新账号, 内部账号还是外部账号 :return: """ sql = f""" select accountName, updateTime from official_articles where ghId = '{self.ghId}' order by updateTime DESC;""" result = await self.mysql_client.async_select(sql) if result: account_name, update_time = result[0] return { "account_name": account_name, "update_time": update_time, "account_type": "history" } else: return { "account_name": "", "update_time": int(time.time()) - 30 * 24 * 60 * 60, "account_type": "new" } async def process_msg_list(self, gh_id, account_name, msg_list): """ 把消息数据更新到数据库中 :param account_name: :param gh_id: :param msg_list: :return: """ for info in msg_list: baseInfo = info.get("BaseInfo", {}) appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None) createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None) updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None) if int(time.time()) - int(updateTime) <= 20 * 60 * 60: continue Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None) detail_article_list = info.get("AppMsg", {}).get("DetailInfo", []) if detail_article_list: for article in detail_article_list: try: title = article.get("Title", None) Digest = article.get("Digest", None) ItemIndex = article.get("ItemIndex", None) ContentUrl = article.get("ContentUrl", None) SourceUrl = article.get("SourceUrl", None) CoverImgUrl = article.get("CoverImgUrl", None) CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None) CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None) ItemShowType = article.get("ItemShowType", None) IsOriginal = article.get("IsOriginal", None) ShowDesc = article.get("ShowDesc", None) show_stat = show_desc_to_sta(ShowDesc) ori_content = article.get("ori_content", None) show_view_count = show_stat.get("show_view_count", 0) show_like_count = show_stat.get("show_like_count", 0) show_zs_count = show_stat.get("show_zs_count", 0) show_pay_count = show_stat.get("show_pay_count", 0) wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None info_tuple = ( gh_id, account_name, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_235_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, json.dumps(baseInfo, ensure_ascii=False) ) insert_sql = f""" INSERT INTO official_articles (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ await self.mysql_client.async_insert(sql=insert_sql, params=info_tuple) print("更新成功") except Exception as e: print("error") print(e) continue async def getAccountArticleList(self, gh_id, account_name, last_update_time, cursor=None): """ 输入ghid获取账号的文章list :return: """ response = self.tools.update_msg_list(ghId=gh_id, index=cursor) msg_list = response.get("data", {}).get("data") if msg_list: # print(msg_list) print("获取msg_list成功") last_article_in_this_msg = msg_list[-1] last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime'] last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl'] resdata = await self.tools.get_account_by_url(last_url) check_name = resdata['data'].get('data', {}).get('account_name') check_id = resdata['data'].get('data', {}).get('wx_gh') print(check_name, check_id, last_url) if check_id == gh_id: print("校验成功") await self.process_msg_list(gh_id, check_name, msg_list) if last_time_stamp_in_this_msg > last_update_time: next_cursor = response['data']['next_cursor'] return await self.getAccountArticleList( gh_id=gh_id, account_name=check_name, last_update_time=last_update_time, cursor=next_cursor ) else: print("校验失败") async def deal(self): """ deal function :return: """ if self.checkParams(): return self.checkParams() else: account_info = await self.checkAccount() account_name = account_info['account_name'] update_time = account_info['update_time'] print("开始执行") await self.getAccountArticleList( gh_id=self.ghId, account_name=account_name, last_update_time=update_time ) return {"message": "successful"}