data_recycle_pipeline.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. import json
  2. from typing import List, Dict
  3. from applications.utils import show_desc_to_sta, str_to_md5
  4. async def insert_article_into_recycle_pool(
  5. pool, log_client, msg_list: List[Dict], account_info: Dict
  6. ):
  7. """insert article into recycle pool"""
  8. table_name = "official_articles_v2"
  9. for info in msg_list:
  10. base_info = info.get("BaseInfo", {})
  11. app_msg_id = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  12. create_timestamp = (
  13. info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  14. )
  15. update_timestamp = (
  16. info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  17. )
  18. publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  19. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  20. if detail_article_list:
  21. for article in detail_article_list:
  22. title = article.get("Title", None)
  23. digest = article.get("Digest", None)
  24. item_index = article.get("ItemIndex", None)
  25. content_url = article.get("ContentUrl", None)
  26. source_url = article.get("SourceUrl", None)
  27. cover_img_url = article.get("CoverImgUrl", None)
  28. cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
  29. cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
  30. item_show_type = article.get("ItemShowType", None)
  31. is_original = article.get("IsOriginal", None)
  32. show_desc = article.get("ShowDesc", None)
  33. show_stat = show_desc_to_sta(show_desc)
  34. ori_content = article.get("ori_content", None)
  35. show_view_count = show_stat.get("show_view_count", 0)
  36. show_like_count = show_stat.get("show_like_count", 0)
  37. show_zs_count = show_stat.get("show_zs_count", 0)
  38. show_pay_count = show_stat.get("show_pay_count", 0)
  39. wx_sn = (
  40. content_url.split("&sn=")[1].split("&")[0] if content_url else None
  41. )
  42. status = account_info["using_status"]
  43. info_tuple = (
  44. account_info["gh_id"],
  45. account_info["name"],
  46. app_msg_id,
  47. title,
  48. publish_type,
  49. create_timestamp,
  50. update_timestamp,
  51. digest,
  52. item_index,
  53. content_url,
  54. source_url,
  55. cover_img_url,
  56. cover_img_url_1_1,
  57. cover_img_url_235_1,
  58. item_show_type,
  59. is_original,
  60. show_desc,
  61. ori_content,
  62. show_view_count,
  63. show_like_count,
  64. show_zs_count,
  65. show_pay_count,
  66. wx_sn,
  67. json.dumps(base_info, ensure_ascii=False),
  68. str_to_md5(title),
  69. status,
  70. )
  71. try:
  72. insert_query = f"""
  73. insert into {table_name}
  74. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
  75. values
  76. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  77. """
  78. await pool.async_save(
  79. query=insert_query,
  80. params=info_tuple,
  81. db_name="piaoquan_crawler",
  82. )
  83. await log_client.log(
  84. contents={
  85. "function": "insert_article_into_recycle_pool",
  86. "status": "success",
  87. "data": info_tuple,
  88. }
  89. )
  90. print("insert_article_into_recycle_pool success")
  91. except Exception as e:
  92. try:
  93. update_sql = f"""update {table_name} set show_view_count = %s, show_like_count=%s where wx_sn = %s;"""
  94. await pool.async_save(
  95. query=update_sql,
  96. params=(show_view_count, show_like_count, wx_sn),
  97. db_name="piaoquan_crawler",
  98. )
  99. print("update_article_into_recycle_pool success")
  100. except Exception as e:
  101. await log_client.log(
  102. contents={
  103. "function": "insert_article_into_recycle_pool",
  104. "status": "fail",
  105. "message": "更新文章失败",
  106. "data": {
  107. "error": str(e),
  108. "content_link": content_url,
  109. "account_name": account_info["name"],
  110. },
  111. }
  112. )
  113. continue
  114. else:
  115. await log_client.log(
  116. contents={
  117. "function": "insert_article_into_recycle_pool",
  118. "status": "fail",
  119. "message": "account has no articles",
  120. "data": {"account_name": account_info["name"]},
  121. }
  122. )