migrate_articles2.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. """
  2. @author: luojunhui
  3. """
  4. import os
  5. import json
  6. import pymysql
  7. from tqdm import tqdm
  8. from concurrent.futures.thread import ThreadPoolExecutor
  9. def insert_into_mysql(path):
  10. """
  11. :param path: 文件路径
  12. :return:
  13. """
  14. with open(path, encoding="utf-8") as f:
  15. info = json.loads(f.read())
  16. gzh_info = path.split("/")[-3]
  17. accountName = gzh_info.split("_")[-1]
  18. ghId = gzh_info.replace("_" + accountName, "")
  19. baseInfo = info.get("BaseInfo", {})
  20. appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
  21. createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
  22. updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
  23. Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
  24. detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
  25. if detail_article_list:
  26. for article in detail_article_list:
  27. title = article.get("Title", None)
  28. Digest = article.get("Digest", None)
  29. ItemIndex = article.get("ItemIndex", None)
  30. ContentUrl = article.get("ContentUrl", None)
  31. SourceUrl = article.get("SourceUrl", None)
  32. CoverImgUrl = article.get("CoverImgUrl", None)
  33. CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
  34. CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
  35. ItemShowType = article.get("ItemShowType", None)
  36. IsOriginal = article.get("IsOriginal", None)
  37. ShowDesc = article.get("ShowDesc", None)
  38. ori_content = article.get("ori_content", None)
  39. show_view_count = article.get("show_view_count", 0)
  40. show_like_count = article.get("show_like_count", 0)
  41. show_zs_count = article.get("show_zs_count", 0)
  42. show_pay_count = article.get("show_pay_count", 0)
  43. wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
  44. info_tuple = (
  45. ghId,
  46. accountName,
  47. appMsgId,
  48. title,
  49. Type,
  50. createTime,
  51. updateTime,
  52. Digest,
  53. ItemIndex,
  54. ContentUrl,
  55. SourceUrl,
  56. CoverImgUrl,
  57. CoverImgUrl_1_1,
  58. CoverImgUrl_235_1,
  59. ItemShowType,
  60. IsOriginal,
  61. ShowDesc,
  62. ori_content,
  63. show_view_count,
  64. show_like_count,
  65. show_zs_count,
  66. show_pay_count,
  67. wx_sn,
  68. json.dumps(baseInfo, ensure_ascii=False)
  69. )
  70. connection = pymysql.connect(
  71. host='rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com',
  72. port=3306,
  73. user='crawler',
  74. password='crawler123456@',
  75. db='piaoquan-crawler',
  76. charset='utf8mb4'
  77. )
  78. insert_sql = f"""
  79. INSERT INTO official_articles
  80. (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo)
  81. values
  82. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  83. """
  84. cursor = connection.cursor()
  85. cursor.execute(
  86. insert_sql,
  87. info_tuple
  88. )
  89. connection.commit()
  90. def get_file_list():
  91. """
  92. 获取文件
  93. :return:
  94. """
  95. path = 'account'
  96. dirs = os.listdir(path)
  97. sub_dirs = [os.path.join(path, i, "msg") for i in dirs]
  98. L = []
  99. for sub_dir in sub_dirs:
  100. try:
  101. file_list = os.listdir(sub_dir)
  102. file_path_list = [os.path.join(sub_dir, i) for i in file_list]
  103. L.append(file_path_list)
  104. except:
  105. pass
  106. return L
  107. if __name__ == '__main__':
  108. file_list = get_file_list()
  109. L_files = []
  110. c = 0
  111. for files in tqdm(file_list):
  112. c += len(files)
  113. print(c)
  114. # with ThreadPoolExecutor(max_workers=10) as pool:
  115. # pool.map(insert_into_mysql, files)