import datetime import traceback import pandas as pd from odps import ODPS from threading import Timer from config import set_config from log import Log from utils.utils import get_data_from_odps from words_func import word_cut from db_helper import MysqlHelper config_, env = set_config() log_ = Log() mysql_helper = MysqlHelper() features = ['title', 'source'] def check_table_partition_exits(date, project, table, connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000): """ 判断表中是否存在这个分区 :param date: 日期 type-string '%Y%m%d' :param project: type-string :param table: 表名 type-string :param connect_timeout: 连接超时设置 :param read_timeout: 读取超时设置 :param pool_maxsize: :param pool_connections: :return: records """ odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=connect_timeout, read_timeout=read_timeout, pool_maxsize=pool_maxsize, pool_connections=pool_connections ) t = odps.get_table(name=table) return t.exist_partition(partition_spec=f'dt={date}') def data_check(project, table, now_date): """检查数据是否准备好""" odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000 ) try: dt = datetime.datetime.strftime(now_date, '%Y%m%d') check_res = check_table_partition_exits(date=dt, project=project, table=table) if check_res: sql = f'select * from {project}.{table} where dt = {dt}' with odps.execute_sql(sql=sql).open_reader() as reader: data_count = reader.count else: data_count = 0 except Exception as e: data_count = 0 return data_count def get_title_data(project, table, now_date): """获取站内外视频标题数据""" dt = datetime.datetime.strftime(now_date, '%Y%m%d') records = get_data_from_odps(date=dt, project=project, table=table) feature_data = [] for record in records: item = {} for feature_name in features: item[feature_name] = record[feature_name] feature_data.append(item) feature_df = pd.DataFrame(feature_data) return feature_df def update_cut_words_result(text, source, words_list): """ 分词结果入库 :param text: 原始文本 type-string :param source: 文本来源 type-int :param words_list: 分词结果 type-list :return: """ # 分词结果拼接成字符串 words = ','.join(words_list) # 判断原始文本是否已存在 select_sql = f"SELECT id FROM word.cut_words_result WHERE text = '{text}';" res = mysql_helper.get_data(sql=select_sql) if res is None or len(res) == 0: # 不存在,插入 insert_sql = f"insert into word.cut_words_result (text, words, source) values ('{text}', '{words}', {source});" log_.info(f"insert_sql = {insert_sql}") mysql_helper.add_data(sql=insert_sql) else: # 存在,更新 update_sql = f"""update word.cut_words_result set words = '{words}', source = {source}, update_time = '{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}' where id = {res[0][0]};""" log_.info(f"update_sql = {update_sql}") mysql_helper.add_data(sql=update_sql) def update_hot_word(words_list, source): """ 词入库 :param words_list: 词列表 type-list :param source: 词来源 type-int :return: """ for word in words_list: if len(word) == 0: continue # 判断词是否已存在 select_sql = f"SELECT id, source FROM word.hot_word WHERE word = '{word}';" res = mysql_helper.get_data(sql=select_sql) if res is None or len(res) == 0: # 不存在,插入 insert_sql = f"insert into word.hot_word (word, source) values ('{word}', {source});" mysql_helper.add_data(sql=insert_sql) else: # 存在,更新 if source != res[0][1]: source = 3 update_sql = \ f"""update word.hot_word set source = {source}, update_time = '{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}' where id = {res[0][0]};""" mysql_helper.add_data(sql=update_sql) def data_update(project, table, now_date): """数据更新""" # 获取站内外视频标题数据 df = get_title_data(project=project, table=table, now_date=now_date) df['source'] = df['source'].astype(int) for source in [1, 2]: df_temp = df[df['source'] == source] title_list = df_temp['title'].to_list() log_.info(f"source = {source}, count = {len(title_list)}") for title in title_list: log_.info(f"title: {title}") if title is None: continue if len(title) == 0: return # 1. 分词 words_list = word_cut(text=title) log_.info(f"words_list: {words_list}") # 2. 分词结果入库 update_cut_words_result(text=title, source=source, words_list=words_list) # 3. 词入库 update_hot_word(words_list=words_list, source=source) def timer_check(): try: project = config_.TITLE_DATA['project'] table = config_.TITLE_DATA['table'] now_date = datetime.datetime.today() log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}") # 查看当天更新的数据是否已准备好 data_count = data_check(project=project, table=table, now_date=now_date) if data_count > 0: log_.info(f'data_count = {data_count}') # 数据准备好,进行更新 data_update(project=project, table=table, now_date=now_date) log_.info(f"data update end!") else: # 数据没准备好,1分钟后重新检查 Timer(60, timer_check).start() except Exception as e: log_.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}") if __name__ == '__main__': timer_check()