articleDBServer.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. from applications.wxSpider import ArticleManager
  7. from applications.functions import show_desc_to_sta
  8. class ArticleSpider(object):
  9. """
  10. input: ghId, AccountName
  11. """
  12. def __init__(self, params, mysql_client):
  13. self.endTime = None
  14. self.startTime = None
  15. self.ghId = None
  16. self.params = params
  17. self.mysql_client = mysql_client
  18. self.tools = ArticleManager()
  19. def checkParams(self):
  20. """
  21. 校验参数
  22. :return:
  23. """
  24. try:
  25. self.ghId = self.params['ghId']
  26. # self.startTime = self.params['startTime']
  27. # self.endTime = self.params['endTime']
  28. return None
  29. except Exception as e:
  30. return {
  31. "error": "Params Error",
  32. "msg": str(e),
  33. "params": self.params
  34. }
  35. async def checkAccount(self):
  36. """
  37. 判断账号是否是新账号, 内部账号还是外部账号
  38. :return:
  39. """
  40. sql = f"""
  41. select accountName, updateTime
  42. from official_articles
  43. where ghId = '{self.ghId}'
  44. order by updateTime DESC;"""
  45. result = await self.mysql_client.async_select(sql)
  46. if result:
  47. account_name, update_time = result[0]
  48. return {
  49. "account_name": account_name,
  50. "update_time": update_time,
  51. "account_type": "history"
  52. }
  53. else:
  54. return {
  55. "account_name": "",
  56. "update_time": int(time.time()) - 30 * 24 * 60 * 60,
  57. "account_type": "new"
  58. }
  59. async def process_msg_list(self, gh_id, account_name, msg_list):
  60. """
  61. 把消息数据更新到数据库中
  62. :param account_name:
  63. :param gh_id:
  64. :param msg_list:
  65. :return:
  66. """
  67. for info in msg_list:
  68. baseInfo = info.get("BaseInfo", {})
  69. appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  70. createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  71. updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  72. # if int(time.time()) - int(updateTime) <= 20 * 60 * 60:
  73. # continue
  74. Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  75. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  76. if detail_article_list:
  77. for article in detail_article_list:
  78. title = article.get("Title", None)
  79. Digest = article.get("Digest", None)
  80. ItemIndex = article.get("ItemIndex", None)
  81. ContentUrl = article.get("ContentUrl", None)
  82. SourceUrl = article.get("SourceUrl", None)
  83. CoverImgUrl = article.get("CoverImgUrl", None)
  84. CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
  85. CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
  86. ItemShowType = article.get("ItemShowType", None)
  87. IsOriginal = article.get("IsOriginal", None)
  88. ShowDesc = article.get("ShowDesc", None)
  89. show_stat = show_desc_to_sta(ShowDesc)
  90. ori_content = article.get("ori_content", None)
  91. show_view_count = show_stat.get("show_view_count", 0)
  92. show_like_count = show_stat.get("show_like_count", 0)
  93. show_zs_count = show_stat.get("show_zs_count", 0)
  94. show_pay_count = show_stat.get("show_pay_count", 0)
  95. wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
  96. info_tuple = (
  97. gh_id,
  98. account_name,
  99. appMsgId,
  100. title,
  101. Type,
  102. createTime,
  103. updateTime,
  104. Digest,
  105. ItemIndex,
  106. ContentUrl,
  107. SourceUrl,
  108. CoverImgUrl,
  109. CoverImgUrl_1_1,
  110. CoverImgUrl_235_1,
  111. ItemShowType,
  112. IsOriginal,
  113. ShowDesc,
  114. ori_content,
  115. show_view_count,
  116. show_like_count,
  117. show_zs_count,
  118. show_pay_count,
  119. wx_sn,
  120. json.dumps(baseInfo, ensure_ascii=False)
  121. )
  122. try:
  123. insert_sql = f"""
  124. INSERT INTO official_articles
  125. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo)
  126. values
  127. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  128. """
  129. await self.mysql_client.async_insert(sql=insert_sql, params=info_tuple)
  130. print("插入成功")
  131. except Exception as e:
  132. try:
  133. update_sql = f"""
  134. UPDATE official_articles
  135. SET show_view_count = %s, show_like_count=%s
  136. WHERE wx_sn = %s;
  137. """
  138. await self.mysql_client.async_insert(sql=update_sql, params=(show_view_count, show_like_count, wx_sn))
  139. print("更新成功")
  140. except Exception as e:
  141. print("失败-{}".format(e))
  142. continue
  143. async def getAccountArticleList(self, gh_id, account_name, last_update_time, cursor=None):
  144. """
  145. 输入ghid获取账号的文章list
  146. :return:
  147. """
  148. response = self.tools.update_msg_list(ghId=gh_id, index=cursor)
  149. msg_list = response.get("data", {}).get("data")
  150. if msg_list:
  151. # print(msg_list)
  152. print("获取msg_list成功")
  153. last_article_in_this_msg = msg_list[-1]
  154. last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
  155. last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
  156. resdata = await self.tools.get_account_by_url(last_url)
  157. check_name = resdata['data'].get('data', {}).get('account_name')
  158. check_id = resdata['data'].get('data', {}).get('wx_gh')
  159. print(check_name, check_id, last_url)
  160. if check_id == gh_id:
  161. print("校验成功")
  162. await self.process_msg_list(gh_id, check_name, msg_list)
  163. if last_time_stamp_in_this_msg > last_update_time:
  164. next_cursor = response['data']['next_cursor']
  165. return await self.getAccountArticleList(
  166. gh_id=gh_id,
  167. account_name=check_name,
  168. last_update_time=last_update_time,
  169. cursor=next_cursor
  170. )
  171. else:
  172. print("校验失败")
  173. async def deal(self):
  174. """
  175. deal function
  176. :return:
  177. """
  178. if self.checkParams():
  179. return self.checkParams()
  180. else:
  181. account_info = await self.checkAccount()
  182. account_name = account_info['account_name']
  183. update_time = account_info['update_time']
  184. print("开始执行")
  185. await self.getAccountArticleList(
  186. gh_id=self.ghId,
  187. account_name=account_name,
  188. last_update_time=update_time
  189. )
  190. return {"message": "successful"}