data_recycle_pipeline.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. import json
  2. from typing import List, Dict
  3. from applications.utils import show_desc_to_sta, str_to_md5
  4. insert_outside_article_query = """
  5. INSERT INTO outside_account_articles
  6. (
  7. wx_sn, gh_id, account_name, app_msg_id, title,
  8. title_md5, publish_type, create_time, update_time,
  9. digest, item_index, content_url,
  10. source_url, cover_img_url, cover_img_url_1_1, cover_img_url_255_1,
  11. item_show_type, is_original, show_desc, ori_content,
  12. show_view_count, show_like_count, show_zs_count, show_pay_count,
  13. base_info
  14. )
  15. VALUES
  16. (
  17. %s, %s, %s, %s, %s,
  18. %s, %s, %s, %s, %s,
  19. %s, %s, %s, %s, %s,
  20. %s, %s, %s, %s, %s,
  21. %s, %s, %s, %s, %s
  22. );
  23. """
  24. async def insert_article_into_recycle_pool(
  25. pool, log_client, msg_list: List[Dict], account_info: Dict
  26. ):
  27. """insert article into recycle pool"""
  28. table_name = "official_articles_v2"
  29. for info in msg_list:
  30. base_info = info.get("BaseInfo", {})
  31. app_msg_id = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  32. create_timestamp = (
  33. info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  34. )
  35. update_timestamp = (
  36. info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  37. )
  38. publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  39. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  40. if detail_article_list:
  41. for article in detail_article_list:
  42. title = article.get("Title", None)
  43. digest = article.get("Digest", None)
  44. item_index = article.get("ItemIndex", None)
  45. content_url = article.get("ContentUrl", None)
  46. source_url = article.get("SourceUrl", None)
  47. cover_img_url = article.get("CoverImgUrl", None)
  48. cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
  49. cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
  50. item_show_type = article.get("ItemShowType", None)
  51. is_original = article.get("IsOriginal", None)
  52. show_desc = article.get("ShowDesc", None)
  53. show_stat = show_desc_to_sta(show_desc)
  54. ori_content = article.get("ori_content", None)
  55. show_view_count = show_stat.get("show_view_count", 0)
  56. show_like_count = show_stat.get("show_like_count", 0)
  57. show_zs_count = show_stat.get("show_zs_count", 0)
  58. show_pay_count = show_stat.get("show_pay_count", 0)
  59. wx_sn = (
  60. content_url.split("&sn=")[1].split("&")[0] if content_url else None
  61. )
  62. status = account_info["using_status"]
  63. send_timestamp = article.get("send_time", 0)
  64. info_tuple = (
  65. account_info["gh_id"],
  66. account_info["name"],
  67. app_msg_id,
  68. title,
  69. publish_type,
  70. create_timestamp,
  71. update_timestamp,
  72. digest,
  73. item_index,
  74. content_url,
  75. source_url,
  76. cover_img_url,
  77. cover_img_url_1_1,
  78. cover_img_url_235_1,
  79. item_show_type,
  80. is_original,
  81. show_desc,
  82. ori_content,
  83. show_view_count,
  84. show_like_count,
  85. show_zs_count,
  86. show_pay_count,
  87. wx_sn,
  88. json.dumps(base_info, ensure_ascii=False),
  89. str_to_md5(title),
  90. status,
  91. send_timestamp,
  92. )
  93. try:
  94. insert_query = f"""
  95. insert into {table_name}
  96. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status, publish_timestamp)
  97. values
  98. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  99. """
  100. await pool.async_save(
  101. query=insert_query,
  102. params=info_tuple,
  103. db_name="piaoquan_crawler",
  104. )
  105. await log_client.log(
  106. contents={
  107. "function": "insert_article_into_recycle_pool",
  108. "status": "success",
  109. "data": info_tuple,
  110. }
  111. )
  112. print("insert_article_into_recycle_pool success")
  113. except Exception as e:
  114. try:
  115. update_sql = f"""update {table_name} set show_view_count = %s, show_like_count=%s where wx_sn = %s;"""
  116. await pool.async_save(
  117. query=update_sql,
  118. params=(show_view_count, show_like_count, wx_sn),
  119. db_name="piaoquan_crawler",
  120. )
  121. print("update_article_into_recycle_pool success")
  122. except Exception as e:
  123. await log_client.log(
  124. contents={
  125. "function": "insert_article_into_recycle_pool",
  126. "status": "fail",
  127. "message": "更新文章失败",
  128. "data": {
  129. "error": str(e),
  130. "content_link": content_url,
  131. "account_name": account_info["name"],
  132. },
  133. }
  134. )
  135. continue
  136. else:
  137. await log_client.log(
  138. contents={
  139. "function": "insert_article_into_recycle_pool",
  140. "status": "fail",
  141. "message": "account has no articles",
  142. "data": {"account_name": account_info["name"]},
  143. }
  144. )
  145. async def insert_outside_article_into_recycle_pool(
  146. pool, log_client, msg_list: List[Dict], account_info: Dict
  147. ):
  148. """insert outside article into recycle pool"""
  149. for info in msg_list:
  150. base_info = info.get("BaseInfo", {})
  151. app_msg_id = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  152. create_timestamp = (
  153. info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  154. )
  155. update_timestamp = (
  156. info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  157. )
  158. publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  159. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  160. if detail_article_list:
  161. for article in detail_article_list:
  162. title = article.get("Title", None)
  163. title_md5 = str_to_md5(title)
  164. insert_query = """
  165. insert ignore into title_features (title, title_md5, version)
  166. values (%s, %s, %s);
  167. """
  168. await pool.async_save(query=insert_query, params=(title, title_md5, 2))
  169. digest = article.get("Digest", None)
  170. item_index = article.get("ItemIndex", None)
  171. content_url = article.get("ContentUrl", None)
  172. source_url = article.get("SourceUrl", None)
  173. cover_img_url = article.get("CoverImgUrl", None)
  174. cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
  175. cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
  176. item_show_type = article.get("ItemShowType", None)
  177. is_original = article.get("IsOriginal", None)
  178. show_desc = article.get("ShowDesc", None)
  179. show_stat = show_desc_to_sta(show_desc)
  180. ori_content = article.get("ori_content", None)
  181. show_view_count = show_stat.get("show_view_count", 0)
  182. show_like_count = show_stat.get("show_like_count", 0)
  183. show_zs_count = show_stat.get("show_zs_count", 0)
  184. show_pay_count = show_stat.get("show_pay_count", 0)
  185. wx_sn = (
  186. content_url.split("&sn=")[1].split("&")[0] if content_url else None
  187. )
  188. info_tuple = (
  189. wx_sn,
  190. account_info["gh_id"],
  191. account_info["name"],
  192. app_msg_id,
  193. title,
  194. title_md5,
  195. publish_type,
  196. create_timestamp,
  197. update_timestamp,
  198. digest,
  199. item_index,
  200. content_url,
  201. source_url,
  202. cover_img_url,
  203. cover_img_url_1_1,
  204. cover_img_url_235_1,
  205. item_show_type,
  206. is_original,
  207. show_desc,
  208. ori_content,
  209. show_view_count,
  210. show_like_count,
  211. show_zs_count,
  212. show_pay_count,
  213. json.dumps(base_info, ensure_ascii=False),
  214. )
  215. try:
  216. await pool.async_save(
  217. query=insert_outside_article_query,
  218. params=info_tuple,
  219. )
  220. await log_client.log(
  221. contents={
  222. "function": "insert_article_into_recycle_pool",
  223. "status": "success",
  224. "data": info_tuple,
  225. }
  226. )
  227. print("insert_article_into_recycle_pool success")
  228. except Exception as e:
  229. try:
  230. update_sql = """update outside_account_articles set show_view_count = %s, show_like_count=%s where wx_sn = %s;"""
  231. await pool.async_save(
  232. query=update_sql,
  233. params=(show_view_count, show_like_count, wx_sn),
  234. )
  235. print("update_article_into_recycle_pool success")
  236. except Exception as e:
  237. await log_client.log(
  238. contents={
  239. "function": "insert_article_into_recycle_pool",
  240. "status": "fail",
  241. "message": "更新文章失败",
  242. "data": {
  243. "error": str(e),
  244. "content_link": content_url,
  245. "account_name": account_info["name"],
  246. },
  247. }
  248. )
  249. continue
  250. else:
  251. await log_client.log(
  252. contents={
  253. "function": "insert_article_into_recycle_pool",
  254. "status": "fail",
  255. "message": "account has no articles",
  256. "data": {"account_name": account_info["name"]},
  257. }
  258. )