123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- """
- @author: luojunhui
- """
- import json
- import time
- from applications.wxSpider import ArticleManager
- from applications.functions import show_desc_to_sta
- class ArticleSpider(object):
- """
- input: ghId, AccountName
- """
- def __init__(self, params, mysql_client):
- self.endTime = None
- self.startTime = None
- self.ghId = None
- self.params = params
- self.mysql_client = mysql_client
- self.tools = ArticleManager()
- def checkParams(self):
- """
- 校验参数
- :return:
- """
- try:
- self.ghId = self.params['ghId']
- # self.startTime = self.params['startTime']
- # self.endTime = self.params['endTime']
- return None
- except Exception as e:
- return {
- "error": "Params Error",
- "msg": str(e),
- "params": self.params
- }
- async def checkAccount(self):
- """
- 判断账号是否是新账号, 内部账号还是外部账号
- :return:
- """
- sql = f"""
- select accountName, updateTime
- from official_articles
- where ghId = '{self.ghId}'
- order by updateTime DESC;"""
- result = await self.mysql_client.async_select(sql)
- if result:
- account_name, update_time = result[0]
- return {
- "account_name": account_name,
- "update_time": update_time,
- "account_type": "history"
- }
- else:
- return {
- "account_name": "",
- "update_time": int(time.time()) - 30 * 24 * 60 * 60,
- "account_type": "new"
- }
- async def process_msg_list(self, gh_id, account_name, msg_list):
- """
- 把消息数据更新到数据库中
- :param account_name:
- :param gh_id:
- :param msg_list:
- :return:
- """
- for info in msg_list:
- baseInfo = info.get("BaseInfo", {})
- appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
- createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
- updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
- if int(time.time()) - int(updateTime) <= 20 * 60 * 60:
- continue
- Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
- detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
- if detail_article_list:
- for article in detail_article_list:
- try:
- title = article.get("Title", None)
- Digest = article.get("Digest", None)
- ItemIndex = article.get("ItemIndex", None)
- ContentUrl = article.get("ContentUrl", None)
- SourceUrl = article.get("SourceUrl", None)
- CoverImgUrl = article.get("CoverImgUrl", None)
- CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
- CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
- ItemShowType = article.get("ItemShowType", None)
- IsOriginal = article.get("IsOriginal", None)
- ShowDesc = article.get("ShowDesc", None)
- show_stat = show_desc_to_sta(ShowDesc)
- ori_content = article.get("ori_content", None)
- show_view_count = show_stat.get("show_view_count", 0)
- show_like_count = show_stat.get("show_like_count", 0)
- show_zs_count = show_stat.get("show_zs_count", 0)
- show_pay_count = show_stat.get("show_pay_count", 0)
- wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
- info_tuple = (
- gh_id,
- account_name,
- appMsgId,
- title,
- Type,
- createTime,
- updateTime,
- Digest,
- ItemIndex,
- ContentUrl,
- SourceUrl,
- CoverImgUrl,
- CoverImgUrl_1_1,
- CoverImgUrl_235_1,
- ItemShowType,
- IsOriginal,
- ShowDesc,
- ori_content,
- show_view_count,
- show_like_count,
- show_zs_count,
- show_pay_count,
- wx_sn,
- json.dumps(baseInfo, ensure_ascii=False)
- )
- insert_sql = f"""
- INSERT INTO official_articles
- (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- await self.mysql_client.async_insert(sql=insert_sql, params=info_tuple)
- print("更新成功")
- except Exception as e:
- print("error")
- print(e)
- continue
- async def getAccountArticleList(self, gh_id, account_name, last_update_time, cursor=None):
- """
- 输入ghid获取账号的文章list
- :return:
- """
- response = self.tools.update_msg_list(ghId=gh_id, index=cursor)
- msg_list = response.get("data", {}).get("data")
- if msg_list:
- # print(msg_list)
- print("获取msg_list成功")
- last_article_in_this_msg = msg_list[-1]
- last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
- last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
- resdata = await self.tools.get_account_by_url(last_url)
- check_name = resdata['data'].get('data', {}).get('account_name')
- check_id = resdata['data'].get('data', {}).get('wx_gh')
- print(check_name, check_id, last_url)
- if check_id == gh_id:
- print("校验成功")
- await self.process_msg_list(gh_id, check_name, msg_list)
- if last_time_stamp_in_this_msg > last_update_time:
- next_cursor = response['data']['next_cursor']
- return await self.getAccountArticleList(
- gh_id=gh_id,
- account_name=check_name,
- last_update_time=last_update_time,
- cursor=next_cursor
- )
- else:
- print("校验失败")
- async def deal(self):
- """
- deal function
- :return:
- """
- if self.checkParams():
- return self.checkParams()
- else:
- account_info = await self.checkAccount()
- account_name = account_info['account_name']
- update_time = account_info['update_time']
- print("开始执行")
- await self.getAccountArticleList(
- gh_id=self.ghId,
- account_name=account_name,
- last_update_time=update_time
- )
- return {"message": "successful"}
|