|  | @@ -0,0 +1,176 @@
 | 
											
												
													
														|  | 
 |  | +import datetime
 | 
											
												
													
														|  | 
 |  | +import traceback
 | 
											
												
													
														|  | 
 |  | +import pandas as pd
 | 
											
												
													
														|  | 
 |  | +from odps import ODPS
 | 
											
												
													
														|  | 
 |  | +from threading import Timer
 | 
											
												
													
														|  | 
 |  | +from config import set_config
 | 
											
												
													
														|  | 
 |  | +from log import Log
 | 
											
												
													
														|  | 
 |  | +from utils.utils import get_data_from_odps
 | 
											
												
													
														|  | 
 |  | +from words_func import word_cut
 | 
											
												
													
														|  | 
 |  | +from db_helper import MysqlHelper
 | 
											
												
													
														|  | 
 |  | +config_ = set_config()
 | 
											
												
													
														|  | 
 |  | +log_ = Log()
 | 
											
												
													
														|  | 
 |  | +mysql_helper = MysqlHelper()
 | 
											
												
													
														|  | 
 |  | +features = ['title', 'source']
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +def check_table_partition_exits(date, project, table, connect_timeout=3000, read_timeout=500000,
 | 
											
												
													
														|  | 
 |  | +                                pool_maxsize=1000, pool_connections=1000):
 | 
											
												
													
														|  | 
 |  | +    """
 | 
											
												
													
														|  | 
 |  | +    判断表中是否存在这个分区
 | 
											
												
													
														|  | 
 |  | +    :param date: 日期 type-string '%Y%m%d'
 | 
											
												
													
														|  | 
 |  | +    :param project: type-string
 | 
											
												
													
														|  | 
 |  | +    :param table: 表名 type-string
 | 
											
												
													
														|  | 
 |  | +    :param connect_timeout: 连接超时设置
 | 
											
												
													
														|  | 
 |  | +    :param read_timeout: 读取超时设置
 | 
											
												
													
														|  | 
 |  | +    :param pool_maxsize:
 | 
											
												
													
														|  | 
 |  | +    :param pool_connections:
 | 
											
												
													
														|  | 
 |  | +    :return: records
 | 
											
												
													
														|  | 
 |  | +    """
 | 
											
												
													
														|  | 
 |  | +    odps = ODPS(
 | 
											
												
													
														|  | 
 |  | +        access_id=config_.ODPS_CONFIG['ACCESSID'],
 | 
											
												
													
														|  | 
 |  | +        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 | 
											
												
													
														|  | 
 |  | +        project=project,
 | 
											
												
													
														|  | 
 |  | +        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 | 
											
												
													
														|  | 
 |  | +        connect_timeout=connect_timeout,
 | 
											
												
													
														|  | 
 |  | +        read_timeout=read_timeout,
 | 
											
												
													
														|  | 
 |  | +        pool_maxsize=pool_maxsize,
 | 
											
												
													
														|  | 
 |  | +        pool_connections=pool_connections
 | 
											
												
													
														|  | 
 |  | +    )
 | 
											
												
													
														|  | 
 |  | +    t = odps.get_table(name=table)
 | 
											
												
													
														|  | 
 |  | +    return t.exist_partition(partition_spec=f'dt={date}')
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +def data_check(project, table, now_date):
 | 
											
												
													
														|  | 
 |  | +    """检查数据是否准备好"""
 | 
											
												
													
														|  | 
 |  | +    odps = ODPS(
 | 
											
												
													
														|  | 
 |  | +        access_id=config_.ODPS_CONFIG['ACCESSID'],
 | 
											
												
													
														|  | 
 |  | +        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
 | 
											
												
													
														|  | 
 |  | +        project=project,
 | 
											
												
													
														|  | 
 |  | +        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
 | 
											
												
													
														|  | 
 |  | +        connect_timeout=3000,
 | 
											
												
													
														|  | 
 |  | +        read_timeout=500000,
 | 
											
												
													
														|  | 
 |  | +        pool_maxsize=1000,
 | 
											
												
													
														|  | 
 |  | +        pool_connections=1000
 | 
											
												
													
														|  | 
 |  | +    )
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +    try:
 | 
											
												
													
														|  | 
 |  | +        dt = datetime.datetime.strftime(now_date, '%Y%m%d%H')
 | 
											
												
													
														|  | 
 |  | +        check_res = check_table_partition_exits(date=dt, project=project, table=table)
 | 
											
												
													
														|  | 
 |  | +        if check_res:
 | 
											
												
													
														|  | 
 |  | +            sql = f'select * from {project}.{table} where dt = {dt}'
 | 
											
												
													
														|  | 
 |  | +            with odps.execute_sql(sql=sql).open_reader() as reader:
 | 
											
												
													
														|  | 
 |  | +                data_count = reader.count
 | 
											
												
													
														|  | 
 |  | +        else:
 | 
											
												
													
														|  | 
 |  | +            data_count = 0
 | 
											
												
													
														|  | 
 |  | +    except Exception as e:
 | 
											
												
													
														|  | 
 |  | +        data_count = 0
 | 
											
												
													
														|  | 
 |  | +    return data_count
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +def get_title_data(project, table, now_date):
 | 
											
												
													
														|  | 
 |  | +    """获取站内外视频标题数据"""
 | 
											
												
													
														|  | 
 |  | +    dt = datetime.datetime.strftime(now_date, '%Y%m%d')
 | 
											
												
													
														|  | 
 |  | +    records = get_data_from_odps(date=dt, project=project, table=table)
 | 
											
												
													
														|  | 
 |  | +    feature_data = []
 | 
											
												
													
														|  | 
 |  | +    for record in records:
 | 
											
												
													
														|  | 
 |  | +        item = {}
 | 
											
												
													
														|  | 
 |  | +        for feature_name in features:
 | 
											
												
													
														|  | 
 |  | +            item[feature_name] = record[feature_name]
 | 
											
												
													
														|  | 
 |  | +        feature_data.append(item)
 | 
											
												
													
														|  | 
 |  | +    feature_df = pd.DataFrame(feature_data)
 | 
											
												
													
														|  | 
 |  | +    return feature_df
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +def update_cut_words_result(text, source, words_list):
 | 
											
												
													
														|  | 
 |  | +    """
 | 
											
												
													
														|  | 
 |  | +    分词结果入库
 | 
											
												
													
														|  | 
 |  | +    :param text: 原始文本 type-string
 | 
											
												
													
														|  | 
 |  | +    :param source: 文本来源 type-int
 | 
											
												
													
														|  | 
 |  | +    :param words_list: 分词结果 type-list
 | 
											
												
													
														|  | 
 |  | +    :return:
 | 
											
												
													
														|  | 
 |  | +    """
 | 
											
												
													
														|  | 
 |  | +    # 分词结果拼接成字符串
 | 
											
												
													
														|  | 
 |  | +    words = ','.join(words_list)
 | 
											
												
													
														|  | 
 |  | +    # 判断原始文本是否已存在
 | 
											
												
													
														|  | 
 |  | +    select_sql = f"SELECT id FROM word.cut_words_result WHERE text = '{text}';"
 | 
											
												
													
														|  | 
 |  | +    res = mysql_helper.get_data(sql=select_sql)
 | 
											
												
													
														|  | 
 |  | +    if res is None or len(res) == 0:
 | 
											
												
													
														|  | 
 |  | +        # 不存在,插入
 | 
											
												
													
														|  | 
 |  | +        insert_sql = f"insert into word.cut_words_result (text, words, source) values '{text}', '{words}', {source};"
 | 
											
												
													
														|  | 
 |  | +        mysql_helper.add_data(sql=insert_sql)
 | 
											
												
													
														|  | 
 |  | +    else:
 | 
											
												
													
														|  | 
 |  | +        # 存在,更新
 | 
											
												
													
														|  | 
 |  | +        update_sql = f"""update word.cut_words_result set words = '{words}', source = {source}, 
 | 
											
												
													
														|  | 
 |  | +        update_time = {datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} where id = {res[0][0]};"""
 | 
											
												
													
														|  | 
 |  | +        mysql_helper.add_data(sql=update_sql)
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +def update_hot_word(words_list, source):
 | 
											
												
													
														|  | 
 |  | +    """
 | 
											
												
													
														|  | 
 |  | +    词入库
 | 
											
												
													
														|  | 
 |  | +    :param words_list: 词列表 type-list
 | 
											
												
													
														|  | 
 |  | +    :param source: 词来源 type-int
 | 
											
												
													
														|  | 
 |  | +    :return:
 | 
											
												
													
														|  | 
 |  | +    """
 | 
											
												
													
														|  | 
 |  | +    for word in words_list:
 | 
											
												
													
														|  | 
 |  | +        if len(word) == 0:
 | 
											
												
													
														|  | 
 |  | +            continue
 | 
											
												
													
														|  | 
 |  | +        # 判断词是否已存在
 | 
											
												
													
														|  | 
 |  | +        select_sql = f"SELECT id, source FROM word.hot_word WHERE word = '{word}';"
 | 
											
												
													
														|  | 
 |  | +        res = mysql_helper.get_data(sql=select_sql)
 | 
											
												
													
														|  | 
 |  | +        if res is None or len(res) == 0:
 | 
											
												
													
														|  | 
 |  | +            # 不存在,插入
 | 
											
												
													
														|  | 
 |  | +            insert_sql = f"insert into word.hot_word (word, source) values '{word}', {source};"
 | 
											
												
													
														|  | 
 |  | +            mysql_helper.add_data(sql=insert_sql)
 | 
											
												
													
														|  | 
 |  | +        else:
 | 
											
												
													
														|  | 
 |  | +            # 存在,更新
 | 
											
												
													
														|  | 
 |  | +            if source != res[0][1]:
 | 
											
												
													
														|  | 
 |  | +                source = 3
 | 
											
												
													
														|  | 
 |  | +            update_sql = \
 | 
											
												
													
														|  | 
 |  | +                f"""update word.hot_word set source = {source}, 
 | 
											
												
													
														|  | 
 |  | +                update_time = {datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")} where id = {res[0][0]};"""
 | 
											
												
													
														|  | 
 |  | +            mysql_helper.add_data(sql=update_sql)
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +def data_update(project, table, now_date):
 | 
											
												
													
														|  | 
 |  | +    """数据更新"""
 | 
											
												
													
														|  | 
 |  | +    # 获取站内外视频标题数据
 | 
											
												
													
														|  | 
 |  | +    df = get_title_data(project=project, table=table, now_date=now_date)
 | 
											
												
													
														|  | 
 |  | +    df = df['tag'].astype(int)
 | 
											
												
													
														|  | 
 |  | +    for source in [1, 2]:
 | 
											
												
													
														|  | 
 |  | +        df_temp = df[df['source'] == source]
 | 
											
												
													
														|  | 
 |  | +        title_list = df_temp['title'].to_list()
 | 
											
												
													
														|  | 
 |  | +        for title in title_list:
 | 
											
												
													
														|  | 
 |  | +            if len(title) == 0:
 | 
											
												
													
														|  | 
 |  | +                return
 | 
											
												
													
														|  | 
 |  | +            # 1. 分词
 | 
											
												
													
														|  | 
 |  | +            words_list = word_cut(text=title)
 | 
											
												
													
														|  | 
 |  | +            # 2. 分词结果入库
 | 
											
												
													
														|  | 
 |  | +            update_cut_words_result(text=title, source=source, words_list=words_list)
 | 
											
												
													
														|  | 
 |  | +            # 3. 词入库
 | 
											
												
													
														|  | 
 |  | +            update_hot_word(words_list=words_list, source=source)
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +def timer_check():
 | 
											
												
													
														|  | 
 |  | +    try:
 | 
											
												
													
														|  | 
 |  | +        project = config_.TITLE_DATA['project']
 | 
											
												
													
														|  | 
 |  | +        table = config_.TITLE_DATA['table']
 | 
											
												
													
														|  | 
 |  | +        now_date = datetime.datetime.today()
 | 
											
												
													
														|  | 
 |  | +        log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
 | 
											
												
													
														|  | 
 |  | +        # 查看当天更新的数据是否已准备好
 | 
											
												
													
														|  | 
 |  | +        data_count = data_check(project=project, table=table, now_date=now_date)
 | 
											
												
													
														|  | 
 |  | +        if data_count > 0:
 | 
											
												
													
														|  | 
 |  | +            log_.info(f'data_count = {data_count}')
 | 
											
												
													
														|  | 
 |  | +            # 数据准备好,进行更新
 | 
											
												
													
														|  | 
 |  | +            data_update(project=project, table=table, now_date=now_date)
 | 
											
												
													
														|  | 
 |  | +            log_.info(f"data update end!")
 | 
											
												
													
														|  | 
 |  | +        else:
 | 
											
												
													
														|  | 
 |  | +            # 数据没准备好,1分钟后重新检查
 | 
											
												
													
														|  | 
 |  | +            Timer(60, timer_check).start()
 | 
											
												
													
														|  | 
 |  | +    except Exception as e:
 | 
											
												
													
														|  | 
 |  | +        log_.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +if __name__ == '__main__':
 | 
											
												
													
														|  | 
 |  | +    timer_check()
 |