data_recycle_pipeline.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. import json
  2. from typing import List, Dict
  3. from applications.utils import show_desc_to_sta, str_to_md5
  4. insert_outside_article_query = """
  5. INSERT INTO outside_account_articles
  6. (
  7. wx_sn, gh_id, account_name, app_msg_id, title,
  8. title_md5, publish_type, create_time, update_time,
  9. digest, item_index, content_url,
  10. source_url, cover_img_url, cover_img_url_1_1, cover_img_url_255_1,
  11. item_show_type, is_original, show_desc, ori_content,
  12. show_view_count, show_like_count, show_zs_count, show_pay_count,
  13. base_info
  14. )
  15. VALUES
  16. (
  17. %s, %s, %s, %s, %s,
  18. %s, %s, %s, %s, %s,
  19. %s, %s, %s, %s, %s,
  20. %s, %s, %s, %s, %s,
  21. %s, %s, %s, %s, %s
  22. );
  23. """
  24. async def insert_article_into_recycle_pool(
  25. pool, log_client, msg_list: List[Dict], account_info: Dict
  26. ):
  27. """insert article into recycle pool"""
  28. table_name = "official_articles_v2"
  29. for info in msg_list:
  30. base_info = info.get("BaseInfo", {})
  31. app_msg_id = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  32. create_timestamp = (
  33. info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  34. )
  35. update_timestamp = (
  36. info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  37. )
  38. publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  39. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  40. if detail_article_list:
  41. for article in detail_article_list:
  42. title = article.get("Title", None)
  43. digest = article.get("Digest", None)
  44. item_index = article.get("ItemIndex", None)
  45. content_url = article.get("ContentUrl", None)
  46. source_url = article.get("SourceUrl", None)
  47. cover_img_url = article.get("CoverImgUrl", None)
  48. cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
  49. cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
  50. item_show_type = article.get("ItemShowType", None)
  51. is_original = article.get("IsOriginal", None)
  52. show_desc = article.get("ShowDesc", None)
  53. show_stat = show_desc_to_sta(show_desc)
  54. ori_content = article.get("ori_content", None)
  55. show_view_count = show_stat.get("show_view_count", 0)
  56. show_like_count = show_stat.get("show_like_count", 0)
  57. show_zs_count = show_stat.get("show_zs_count", 0)
  58. show_pay_count = show_stat.get("show_pay_count", 0)
  59. wx_sn = (
  60. content_url.split("&sn=")[1].split("&")[0] if content_url else None
  61. )
  62. status = account_info["using_status"]
  63. info_tuple = (
  64. account_info["gh_id"],
  65. account_info["name"],
  66. app_msg_id,
  67. title,
  68. publish_type,
  69. create_timestamp,
  70. update_timestamp,
  71. digest,
  72. item_index,
  73. content_url,
  74. source_url,
  75. cover_img_url,
  76. cover_img_url_1_1,
  77. cover_img_url_235_1,
  78. item_show_type,
  79. is_original,
  80. show_desc,
  81. ori_content,
  82. show_view_count,
  83. show_like_count,
  84. show_zs_count,
  85. show_pay_count,
  86. wx_sn,
  87. json.dumps(base_info, ensure_ascii=False),
  88. str_to_md5(title),
  89. status,
  90. )
  91. try:
  92. insert_query = f"""
  93. insert into {table_name}
  94. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
  95. values
  96. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  97. """
  98. await pool.async_save(
  99. query=insert_query,
  100. params=info_tuple,
  101. db_name="piaoquan_crawler",
  102. )
  103. await log_client.log(
  104. contents={
  105. "function": "insert_article_into_recycle_pool",
  106. "status": "success",
  107. "data": info_tuple,
  108. }
  109. )
  110. print("insert_article_into_recycle_pool success")
  111. except Exception as e:
  112. try:
  113. update_sql = f"""update {table_name} set show_view_count = %s, show_like_count=%s where wx_sn = %s;"""
  114. await pool.async_save(
  115. query=update_sql,
  116. params=(show_view_count, show_like_count, wx_sn),
  117. db_name="piaoquan_crawler",
  118. )
  119. print("update_article_into_recycle_pool success")
  120. except Exception as e:
  121. await log_client.log(
  122. contents={
  123. "function": "insert_article_into_recycle_pool",
  124. "status": "fail",
  125. "message": "更新文章失败",
  126. "data": {
  127. "error": str(e),
  128. "content_link": content_url,
  129. "account_name": account_info["name"],
  130. },
  131. }
  132. )
  133. continue
  134. else:
  135. await log_client.log(
  136. contents={
  137. "function": "insert_article_into_recycle_pool",
  138. "status": "fail",
  139. "message": "account has no articles",
  140. "data": {"account_name": account_info["name"]},
  141. }
  142. )
  143. async def insert_outside_article_into_recycle_pool(
  144. pool, log_client, msg_list: List[Dict], account_info: Dict
  145. ):
  146. """insert outside article into recycle pool"""
  147. for info in msg_list:
  148. base_info = info.get("BaseInfo", {})
  149. app_msg_id = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  150. create_timestamp = (
  151. info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  152. )
  153. update_timestamp = (
  154. info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  155. )
  156. publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  157. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  158. if detail_article_list:
  159. for article in detail_article_list:
  160. title = article.get("Title", None)
  161. title_md5 = str_to_md5(title),
  162. insert_query = """
  163. insert ignore into title_features (title, title_md5, version)
  164. values (%s, %s, %s);
  165. """
  166. await pool.async_save(query=insert_query, params=(title, title_md5, 2))
  167. digest = article.get("Digest", None)
  168. item_index = article.get("ItemIndex", None)
  169. content_url = article.get("ContentUrl", None)
  170. source_url = article.get("SourceUrl", None)
  171. cover_img_url = article.get("CoverImgUrl", None)
  172. cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
  173. cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
  174. item_show_type = article.get("ItemShowType", None)
  175. is_original = article.get("IsOriginal", None)
  176. show_desc = article.get("ShowDesc", None)
  177. show_stat = show_desc_to_sta(show_desc)
  178. ori_content = article.get("ori_content", None)
  179. show_view_count = show_stat.get("show_view_count", 0)
  180. show_like_count = show_stat.get("show_like_count", 0)
  181. show_zs_count = show_stat.get("show_zs_count", 0)
  182. show_pay_count = show_stat.get("show_pay_count", 0)
  183. wx_sn = (
  184. content_url.split("&sn=")[1].split("&")[0] if content_url else None
  185. )
  186. info_tuple = (
  187. wx_sn,
  188. account_info["gh_id"],
  189. account_info["name"],
  190. app_msg_id,
  191. title,
  192. title_md5,
  193. publish_type,
  194. create_timestamp,
  195. update_timestamp,
  196. digest,
  197. item_index,
  198. content_url,
  199. source_url,
  200. cover_img_url,
  201. cover_img_url_1_1,
  202. cover_img_url_235_1,
  203. item_show_type,
  204. is_original,
  205. show_desc,
  206. ori_content,
  207. show_view_count,
  208. show_like_count,
  209. show_zs_count,
  210. show_pay_count,
  211. json.dumps(base_info, ensure_ascii=False),
  212. )
  213. try:
  214. await pool.async_save(
  215. query=insert_outside_article_query,
  216. params=info_tuple,
  217. )
  218. await log_client.log(
  219. contents={
  220. "function": "insert_article_into_recycle_pool",
  221. "status": "success",
  222. "data": info_tuple,
  223. }
  224. )
  225. print("insert_article_into_recycle_pool success")
  226. except Exception as e:
  227. try:
  228. update_sql = """update outside_account_articles set show_view_count = %s, show_like_count=%s where wx_sn = %s;"""
  229. await pool.async_save(
  230. query=update_sql,
  231. params=(show_view_count, show_like_count, wx_sn),
  232. )
  233. print("update_article_into_recycle_pool success")
  234. except Exception as e:
  235. await log_client.log(
  236. contents={
  237. "function": "insert_article_into_recycle_pool",
  238. "status": "fail",
  239. "message": "更新文章失败",
  240. "data": {
  241. "error": str(e),
  242. "content_link": content_url,
  243. "account_name": account_info["name"],
  244. },
  245. }
  246. )
  247. continue
  248. else:
  249. await log_client.log(
  250. contents={
  251. "function": "insert_article_into_recycle_pool",
  252. "status": "fail",
  253. "message": "account has no articles",
  254. "data": {"account_name": account_info["name"]},
  255. }
  256. )