123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- import datetime
- import time
- import traceback
- from db_helper import MysqlHelper
- from log import Log
- mysql_helper = MysqlHelper()
- log_ = Log()
- def get_words(page_num, page_size):
- """
- 分页获取所有热点词
- :param page_num: 页码
- :param page_size: 每页请求条目数
- :return: words
- """
- try:
- sql = f"select id, word from word.hot_word order by id limit {(page_num-1)*page_size}, {page_size};"
- data = mysql_helper.get_data(sql=sql)
- if data is None:
- return None
- words = []
- for id_, word in data:
- words.append({'id': id_, 'word': word})
- return words
- except Exception as e:
- log_.error(traceback.format_exc())
- return None
- def update_wechat_score_data(data):
- """
- 根据爬取到的微信指数数据更新数据库
- :param data:
- :return:
- """
- if data is None or len(data) == 0:
- log_.info(f"无需要更新的数据!")
- return
- # 爬取数据解析
- log_.info(f"data count = {len(data)}")
- wechat_score_data_list = []
- for item in data:
- if item is None:
- continue
- word_id = item.get('id')
- word = item.get('word')
- wechat_scores = item.get('wechatScores')
- if wechat_scores is None or len(wechat_scores) == 0:
- continue
- for score_data in wechat_scores:
- score = score_data.get('score')
- score_date = score_data.get('scoreDate')
- wechat_score_data_list.append({'word_id': word_id, 'word': word, 'score': score, 'score_date': score_date})
- log_.info(f"wechat_score_data_list count = {len(wechat_score_data_list)}")
- # update or insert 数据区分
- update_data = []
- insert_data = []
- for wechat_score_data in wechat_score_data_list:
- select_sql = f"SELECT id FROM word.word_wechat_score " \
- f"WHERE word_id = {wechat_score_data['word_id']} " \
- f"AND score_date = '{wechat_score_data['score_date']}';"
- res = mysql_helper.get_data(sql=select_sql)
- if res is None:
- continue
- if len(res) == 0:
- insert_data.append(wechat_score_data)
- else:
- id_ = res[0][0]
- wechat_score_data['id'] = id_
- update_data.append(wechat_score_data)
- log_.info(f"update_data = {len(update_data)}")
- log_.info(f"insert_data = {len(insert_data)}")
- # 批量插入
- if len(insert_data) > 0:
- for i in range(len(insert_data) // 100 + 1):
- log_.info(f"insert i = {i}")
- insert_temp_data = insert_data[i * 100:(i + 1) * 100]
- if len(insert_temp_data) > 0:
- insert_sql_values = ', '.join([f"({item['word_id']}, {item['score']}, '{item['score_date']}')"
- for item in insert_temp_data])
- insert_start_time = time.time()
- insert_sql = f"insert into word.word_wechat_score (word_id, wechat_score, score_date) " \
- f"values {insert_sql_values};"
- mysql_helper.add_data(sql=insert_sql)
- log_.info(f"insert executeTime: {(time.time() - insert_start_time) * 1000}")
- log_.info(f"insert wechat score data finished! insert count = {len(insert_data)}")
- # 批量更新
- if len(update_data) > 0:
- for i in range(len(update_data) // 100 + 1):
- log_.info(f"update i = {i}")
- update_temp_data = update_data[i * 100:(i + 1) * 100]
- if len(update_temp_data) > 0:
- update_id = [item['id'] for item in update_temp_data]
- update_sql_values = ' '.join([f"when {item['id']} then {item['score']}" for item in update_temp_data])
- update_start_time = time.time()
- if len(update_id) > 1:
- update_sql = \
- f"""update word.word_wechat_score set wechat_score = case id {update_sql_values} end,
- update_time = '{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
- where id in {tuple(update_id)};"""
- else:
- update_sql = \
- f"""update word.word_wechat_score set wechat_score = case id {update_sql_values} end,
- update_time = '{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'
- where id in ({update_id[0]});"""
- # f"update word.word_wechat_score set wechat_score = " \
- # f"case id {update_sql_values} end where id in ({update_id[0]});"
- mysql_helper.add_data(sql=update_sql)
- log_.info(f"update executeTime: {(time.time() - update_start_time) * 1000}")
- log_.info(f"update wechat score data finished! update count = {len(update_data)}")
- def get_today_words(page_num, page_size):
- """
- 分页获取今日更新的所有热点词
- :param page_num: 页码
- :param page_size: 每页请求条目数
- :return: words
- """
- try:
- dt = datetime.datetime.today().strftime('%Y-%m-%d')
- sql = f"select id, word from word.hot_word where update_time > cast('{dt}' as datetime) " \
- f"order by id limit {(page_num-1)*page_size}, {page_size};"
- data = mysql_helper.get_data(sql=sql)
- if data is None:
- return None
- words = []
- for id_, word in data:
- words.append({'id': id_, 'word': word})
- return words
- except Exception as e:
- log_.error(traceback.format_exc())
- return None
- if __name__ == '__main__':
- get_words(8, 100)
- # get_words(1, 20)
- # get_words(2, 10)
|