import datetime import time import traceback import re import jieba import jieba.posseg as pseg from db_helper import MysqlHelper from log import Log mysql_helper = MysqlHelper() log_ = Log() def get_words(page_num, page_size): """ 分页获取所有热点词 :param page_num: 页码 :param page_size: 每页请求条目数 :return: words """ try: sql = f"select id, word from word.hot_word order by id limit {(page_num-1)*page_size}, {page_size};" data = mysql_helper.get_data(sql=sql) if data is None: return None words = [] for id_, word in data: words.append({'id': id_, 'word': word}) return words except Exception as e: log_.error(traceback.format_exc()) return None def update_wechat_score_data(data): """ 根据爬取到的微信指数数据更新数据库 :param data: :return: """ if data is None or len(data) == 0: log_.info(f"无需要更新的数据!") return # 爬取数据解析 log_.info(f"data count = {len(data)}") wechat_score_data_list = [] for item in data: if item is None: continue word_id = item.get('id') word = item.get('word') wechat_scores = item.get('wechatScores') if wechat_scores is None or len(wechat_scores) == 0: continue for score_data in wechat_scores: score = score_data.get('score') score_date = score_data.get('scoreDate') wechat_score_data_list.append({'word_id': word_id, 'word': word, 'score': score, 'score_date': score_date}) log_.info(f"wechat_score_data_list count = {len(wechat_score_data_list)}") # update or insert 数据区分 update_data = [] insert_data = [] for wechat_score_data in wechat_score_data_list: select_sql = f"SELECT id FROM word.word_wechat_score " \ f"WHERE word_id = {wechat_score_data['word_id']} " \ f"AND score_date = '{wechat_score_data['score_date']}';" res = mysql_helper.get_data(sql=select_sql) if res is None: continue if len(res) == 0: insert_data.append(wechat_score_data) else: id_ = res[0][0] wechat_score_data['id'] = id_ update_data.append(wechat_score_data) log_.info(f"update_data = {len(update_data)}") log_.info(f"insert_data = {len(insert_data)}") # 批量插入 if len(insert_data) > 0: for i in range(len(insert_data) // 100 + 1): log_.info(f"insert i = {i}") insert_temp_data = insert_data[i * 100:(i + 1) * 100] if len(insert_temp_data) > 0: insert_sql_values = ', '.join([f"({item['word_id']}, {item['score']}, '{item['score_date']}')" for item in insert_temp_data]) insert_start_time = time.time() insert_sql = f"insert into word.word_wechat_score (word_id, wechat_score, score_date) " \ f"values {insert_sql_values};" mysql_helper.add_data(sql=insert_sql) log_.info(f"insert executeTime: {(time.time() - insert_start_time) * 1000}") log_.info(f"insert wechat score data finished! insert count = {len(insert_data)}") # 批量更新 if len(update_data) > 0: for i in range(len(update_data) // 100 + 1): log_.info(f"update i = {i}") update_temp_data = update_data[i * 100:(i + 1) * 100] if len(update_temp_data) > 0: update_id = [item['id'] for item in update_temp_data] update_sql_values = ' '.join([f"when {item['id']} then {item['score']}" for item in update_temp_data]) update_start_time = time.time() if len(update_id) > 1: update_sql = f"update word.word_wechat_score set wechat_score = " \ f"case id {update_sql_values} end where id in {tuple(update_id)};" else: update_sql = f"update word.word_wechat_score set wechat_score = " \ f"case id {update_sql_values} end where id in ({update_id[0]});" mysql_helper.add_data(sql=update_sql) log_.info(f"update executeTime: {(time.time() - update_start_time) * 1000}") log_.info(f"update wechat score data finished! update count = {len(update_data)}") def get_stop_words(): """获取停用词表""" stop = open('hit_stopwords.txt', 'r+', encoding='utf-8') stop_words = stop.read().split("\n") return stop_words def filter_emoji(text): """清除文本中的表情符号""" # - # 符号和象形字 # - # 表情符号 # - # 交通符号和地图符号 # - # 其它符号 # \U00010000 -\U0010ffff # 英文emoji表情 p = re.compile(u'['u'\U0001F300-\U0001F64F' u'\U0001F680-\U0001F6FF' u'\u2600-\u2B55 \U00010000-\U0010ffff]+') result = re.sub(p, '', text) # 正则匹配,将表情符合替换为空'' return result def word_cut(text): """分词""" # 获取停用词 stop_words = get_stop_words() # 清除空格 text = text.strip() # 清除表情符号 text = filter_emoji(text) # 精确模式分词 seg_list = jieba.cut(text, cut_all=False) seg_list = [seg for seg in seg_list] # print(seg_list) # 根据词性去除数词、数量词、量词、代词 if len(seg_list) > 1: words = [] for seg in seg_list: words += [(word, flag) for word, flag in pseg.cut(seg)] seg_list = [word for word, flag in words if flag not in ['m', 'mq', 'q', 'r']] # print(seg_list) # 去除停用词 seg_list = [seg for seg in seg_list if seg not in stop_words] # 去除空格 seg_list = [seg for seg in seg_list if ' ' not in seg] # 去除纯数字字符串 seg_list = [seg for seg in seg_list if seg.isdigit() is False] # 去除单个字符 seg_list = [seg for seg in seg_list if len(seg) > 1] # print(seg_list) return seg_list def get_today_words(page_num, page_size): """ 分页获取今日更新的所有热点词 :param page_num: 页码 :param page_size: 每页请求条目数 :return: words """ try: dt = datetime.datetime.today().strftime('%Y-%m-%d') sql = f"select id, word from word.hot_word where update_time > cast('{dt}' as datetime) " \ f"order by id limit {(page_num-1)*page_size}, {page_size};" data = mysql_helper.get_data(sql=sql) if data is None: return None words = [] for id_, word in data: words.append({'id': id_, 'word': word}) return words except Exception as e: log_.error(traceback.format_exc()) return None if __name__ == '__main__': get_words(8, 100) # get_words(1, 20) # get_words(2, 10)