data_recycle_pipeline.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. import json
  2. from typing import List, Dict
  3. from applications.utils import show_desc_to_sta, str_to_md5
  4. insert_outside_article_query = """
  5. INSERT INTO outside_account_articles
  6. (
  7. wx_sn, gh_id, account_name, app_msg_id, title,
  8. title_md5, publish_type, create_time, update_time,
  9. digest, item_index, content_url,
  10. source_url, cover_img_url, cover_img_url_1_1, cover_img_url_255_1,
  11. item_show_type, is_original, show_desc, ori_content,
  12. show_view_count, show_like_count, show_zs_count, show_pay_count,
  13. base_info
  14. )
  15. VALUES
  16. (
  17. %s, %s, %s, %s, %s,
  18. %s, %s, %s, %s, %s,
  19. %s, %s, %s, %s, %s,
  20. %s, %s, %s, %s, %s,
  21. %s, %s, %s, %s, %s
  22. );
  23. """
  24. async def insert_article_into_recycle_pool(
  25. pool, log_client, msg_list: List[Dict], account_info: Dict
  26. ):
  27. """insert article into recycle pool"""
  28. table_name = "official_articles_v2"
  29. for info in msg_list:
  30. base_info = info.get("BaseInfo", {})
  31. app_msg_id = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  32. create_timestamp = (
  33. info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  34. )
  35. update_timestamp = (
  36. info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  37. )
  38. publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  39. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  40. if detail_article_list:
  41. for article in detail_article_list:
  42. title = article.get("Title", None)
  43. digest = article.get("Digest", None)
  44. item_index = article.get("ItemIndex", None)
  45. content_url = article.get("ContentUrl", None)
  46. source_url = article.get("SourceUrl", None)
  47. cover_img_url = article.get("CoverImgUrl", None)
  48. cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
  49. cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
  50. item_show_type = article.get("ItemShowType", None)
  51. is_original = article.get("IsOriginal", None)
  52. show_desc = article.get("ShowDesc", None)
  53. show_stat = show_desc_to_sta(show_desc)
  54. ori_content = article.get("ori_content", None)
  55. show_view_count = show_stat.get("show_view_count", 0)
  56. show_like_count = show_stat.get("show_like_count", 0)
  57. show_zs_count = show_stat.get("show_zs_count", 0)
  58. show_pay_count = show_stat.get("show_pay_count", 0)
  59. wx_sn = (
  60. content_url.split("&sn=")[1].split("&")[0] if content_url else None
  61. )
  62. status = account_info["using_status"]
  63. info_tuple = (
  64. account_info["gh_id"],
  65. account_info["name"],
  66. app_msg_id,
  67. title,
  68. publish_type,
  69. create_timestamp,
  70. update_timestamp,
  71. digest,
  72. item_index,
  73. content_url,
  74. source_url,
  75. cover_img_url,
  76. cover_img_url_1_1,
  77. cover_img_url_235_1,
  78. item_show_type,
  79. is_original,
  80. show_desc,
  81. ori_content,
  82. show_view_count,
  83. show_like_count,
  84. show_zs_count,
  85. show_pay_count,
  86. wx_sn,
  87. json.dumps(base_info, ensure_ascii=False),
  88. str_to_md5(title),
  89. status,
  90. )
  91. try:
  92. insert_query = f"""
  93. insert into {table_name}
  94. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
  95. values
  96. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  97. """
  98. await pool.async_save(
  99. query=insert_query,
  100. params=info_tuple,
  101. db_name="piaoquan_crawler",
  102. )
  103. await log_client.log(
  104. contents={
  105. "function": "insert_article_into_recycle_pool",
  106. "status": "success",
  107. "data": info_tuple,
  108. }
  109. )
  110. print("insert_article_into_recycle_pool success")
  111. except Exception as e:
  112. try:
  113. update_sql = f"""update {table_name} set show_view_count = %s, show_like_count=%s where wx_sn = %s;"""
  114. await pool.async_save(
  115. query=update_sql,
  116. params=(show_view_count, show_like_count, wx_sn),
  117. db_name="piaoquan_crawler",
  118. )
  119. print("update_article_into_recycle_pool success")
  120. except Exception as e:
  121. await log_client.log(
  122. contents={
  123. "function": "insert_article_into_recycle_pool",
  124. "status": "fail",
  125. "message": "更新文章失败",
  126. "data": {
  127. "error": str(e),
  128. "content_link": content_url,
  129. "account_name": account_info["name"],
  130. },
  131. }
  132. )
  133. continue
  134. else:
  135. await log_client.log(
  136. contents={
  137. "function": "insert_article_into_recycle_pool",
  138. "status": "fail",
  139. "message": "account has no articles",
  140. "data": {"account_name": account_info["name"]},
  141. }
  142. )
  143. async def insert_outside_article_into_recycle_pool(
  144. pool, log_client, msg_list: List[Dict], account_info: Dict
  145. ):
  146. """insert outside article into recycle pool"""
  147. for info in msg_list:
  148. base_info = info.get("BaseInfo", {})
  149. app_msg_id = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  150. create_timestamp = (
  151. info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  152. )
  153. update_timestamp = (
  154. info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  155. )
  156. publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  157. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  158. if detail_article_list:
  159. for article in detail_article_list:
  160. title = article.get("Title", None)
  161. digest = article.get("Digest", None)
  162. item_index = article.get("ItemIndex", None)
  163. content_url = article.get("ContentUrl", None)
  164. source_url = article.get("SourceUrl", None)
  165. cover_img_url = article.get("CoverImgUrl", None)
  166. cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
  167. cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
  168. item_show_type = article.get("ItemShowType", None)
  169. is_original = article.get("IsOriginal", None)
  170. show_desc = article.get("ShowDesc", None)
  171. show_stat = show_desc_to_sta(show_desc)
  172. ori_content = article.get("ori_content", None)
  173. show_view_count = show_stat.get("show_view_count", 0)
  174. show_like_count = show_stat.get("show_like_count", 0)
  175. show_zs_count = show_stat.get("show_zs_count", 0)
  176. show_pay_count = show_stat.get("show_pay_count", 0)
  177. wx_sn = (
  178. content_url.split("&sn=")[1].split("&")[0] if content_url else None
  179. )
  180. info_tuple = (
  181. wx_sn,
  182. account_info["gh_id"],
  183. account_info["name"],
  184. app_msg_id,
  185. title,
  186. str_to_md5(title),
  187. publish_type,
  188. create_timestamp,
  189. update_timestamp,
  190. digest,
  191. item_index,
  192. content_url,
  193. source_url,
  194. cover_img_url,
  195. cover_img_url_1_1,
  196. cover_img_url_235_1,
  197. item_show_type,
  198. is_original,
  199. show_desc,
  200. ori_content,
  201. show_view_count,
  202. show_like_count,
  203. show_zs_count,
  204. show_pay_count,
  205. json.dumps(base_info, ensure_ascii=False),
  206. )
  207. try:
  208. await pool.async_save(
  209. query=insert_outside_article_query,
  210. params=info_tuple,
  211. )
  212. await log_client.log(
  213. contents={
  214. "function": "insert_article_into_recycle_pool",
  215. "status": "success",
  216. "data": info_tuple,
  217. }
  218. )
  219. print("insert_article_into_recycle_pool success")
  220. except Exception as e:
  221. try:
  222. update_sql = """update outside_account_articles set show_view_count = %s, show_like_count=%s where wx_sn = %s;"""
  223. await pool.async_save(
  224. query=update_sql,
  225. params=(show_view_count, show_like_count, wx_sn),
  226. )
  227. print("update_article_into_recycle_pool success")
  228. except Exception as e:
  229. await log_client.log(
  230. contents={
  231. "function": "insert_article_into_recycle_pool",
  232. "status": "fail",
  233. "message": "更新文章失败",
  234. "data": {
  235. "error": str(e),
  236. "content_link": content_url,
  237. "account_name": account_info["name"],
  238. },
  239. }
  240. )
  241. continue
  242. else:
  243. await log_client.log(
  244. contents={
  245. "function": "insert_article_into_recycle_pool",
  246. "status": "fail",
  247. "message": "account has no articles",
  248. "data": {"account_name": account_info["name"]},
  249. }
  250. )