words_func.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import time
  2. import traceback
  3. from db_helper import MysqlHelper
  4. from log import Log
  5. mysql_helper = MysqlHelper()
  6. log_ = Log()
  7. def get_words(page_num, page_size):
  8. """
  9. 分页获取所有热点词
  10. :param page_num: 页码
  11. :param page_size: 每页请求条目数
  12. :return: words
  13. """
  14. try:
  15. sql = f"select id, word from word.hot_word order by id limit {(page_num-1)*page_size}, {page_size};"
  16. data = mysql_helper.get_data(sql=sql)
  17. if data is None:
  18. return None
  19. words = []
  20. for id_, word in data:
  21. words.append({'id': id_, 'word': word})
  22. return words
  23. except Exception as e:
  24. log_.error(traceback.format_exc())
  25. return None
  26. def update_wechat_score_data(data):
  27. """
  28. 根据爬取到的微信指数数据更新数据库
  29. :param data:
  30. :return:
  31. """
  32. if data is None or len(data) == 0:
  33. log_.info(f"无需要更新的数据!")
  34. return
  35. # 爬取数据解析
  36. log_.info(f"data count = {len(data)}")
  37. wechat_score_data_list = []
  38. for item in data:
  39. if item is None:
  40. continue
  41. word_id = item.get('id')
  42. word = item.get('word')
  43. wechat_scores = item.get('wechatScores')
  44. if wechat_scores is None or len(wechat_scores) == 0:
  45. continue
  46. for score_data in wechat_scores:
  47. score = score_data.get('score')
  48. score_date = score_data.get('scoreDate')
  49. wechat_score_data_list.append({'word_id': word_id, 'word': word, 'score': score, 'score_date': score_date})
  50. log_.info(f"wechat_score_data_list count = {len(wechat_score_data_list)}")
  51. # update or insert 数据区分
  52. update_data = []
  53. insert_data = []
  54. for wechat_score_data in wechat_score_data_list:
  55. select_sql = f"SELECT id FROM word.word_wechat_score " \
  56. f"WHERE word_id = {wechat_score_data['word_id']} " \
  57. f"AND score_date = '{wechat_score_data['score_date']}';"
  58. res = mysql_helper.get_data(sql=select_sql)
  59. if res is None:
  60. continue
  61. if len(res) == 0:
  62. insert_data.append(wechat_score_data)
  63. else:
  64. id_ = res[0][0]
  65. wechat_score_data['id'] = id_
  66. update_data.append(wechat_score_data)
  67. log_.info(f"update_data = {len(update_data)}")
  68. log_.info(f"insert_data = {len(insert_data)}")
  69. # 批量插入
  70. if len(insert_data) > 0:
  71. for i in range(len(insert_data) // 100 + 1):
  72. log_.info(f"insert i = {i}")
  73. insert_temp_data = insert_data[i * 100:(i + 1) * 100]
  74. if len(insert_temp_data) > 0:
  75. insert_sql_values = ', '.join([f"({item['word_id']}, {item['score']}, '{item['score_date']}')"
  76. for item in insert_temp_data])
  77. insert_start_time = time.time()
  78. insert_sql = f"insert into word.word_wechat_score (word_id, wechat_score, score_date) " \
  79. f"values {insert_sql_values};"
  80. mysql_helper.add_data(sql=insert_sql)
  81. log_.info(f"insert executeTime: {(time.time() - insert_start_time) * 1000}")
  82. log_.info(f"insert wechat score data finished! insert count = {len(insert_data)}")
  83. # 批量更新
  84. if len(update_data) > 0:
  85. for i in range(len(update_data) // 100 + 1):
  86. log_.info(f"update i = {i}")
  87. update_temp_data = update_data[i * 100:(i + 1) * 100]
  88. if len(update_temp_data) > 0:
  89. update_id = [item['id'] for item in update_temp_data]
  90. update_sql_values = ' '.join([f"when {item['id']} then {item['score']}" for item in update_temp_data])
  91. update_start_time = time.time()
  92. if len(update_id) > 1:
  93. update_sql = f"update word.word_wechat_score set wechat_score = " \
  94. f"case id {update_sql_values} end where id in {tuple(update_id)};"
  95. else:
  96. update_sql = f"update word.word_wechat_score set wechat_score = " \
  97. f"case id {update_sql_values} end where id in ({update_id[0]});"
  98. mysql_helper.add_data(sql=update_sql)
  99. log_.info(f"update executeTime: {(time.time() - update_start_time) * 1000}")
  100. log_.info(f"update wechat score data finished! update count = {len(update_data)}")
  101. if __name__ == '__main__':
  102. get_words(8, 100)
  103. # get_words(1, 20)
  104. # get_words(2, 10)