|
@@ -0,0 +1,181 @@
|
|
|
+import datetime
|
|
|
+import traceback
|
|
|
+import pandas as pd
|
|
|
+from odps import ODPS
|
|
|
+from threading import Timer
|
|
|
+from config import set_config
|
|
|
+from log import Log
|
|
|
+from utils.utils import get_data_from_odps
|
|
|
+from words_func import word_cut
|
|
|
+from db_helper import MysqlHelper
|
|
|
+config_, env = set_config()
|
|
|
+log_ = Log()
|
|
|
+mysql_helper = MysqlHelper()
|
|
|
+features = ['title', 'source']
|
|
|
+
|
|
|
+
|
|
|
+def check_table_partition_exits(date, project, table, connect_timeout=3000, read_timeout=500000,
|
|
|
+ pool_maxsize=1000, pool_connections=1000):
|
|
|
+ """
|
|
|
+ 判断表中是否存在这个分区
|
|
|
+ :param date: 日期 type-string '%Y%m%d'
|
|
|
+ :param project: type-string
|
|
|
+ :param table: 表名 type-string
|
|
|
+ :param connect_timeout: 连接超时设置
|
|
|
+ :param read_timeout: 读取超时设置
|
|
|
+ :param pool_maxsize:
|
|
|
+ :param pool_connections:
|
|
|
+ :return: records
|
|
|
+ """
|
|
|
+ odps = ODPS(
|
|
|
+ access_id=config_.ODPS_CONFIG['ACCESSID'],
|
|
|
+ secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
|
|
|
+ project=project,
|
|
|
+ endpoint=config_.ODPS_CONFIG['ENDPOINT'],
|
|
|
+ connect_timeout=connect_timeout,
|
|
|
+ read_timeout=read_timeout,
|
|
|
+ pool_maxsize=pool_maxsize,
|
|
|
+ pool_connections=pool_connections
|
|
|
+ )
|
|
|
+ t = odps.get_table(name=table)
|
|
|
+ return t.exist_partition(partition_spec=f'dt={date}')
|
|
|
+
|
|
|
+
|
|
|
+def data_check(project, table, now_date):
|
|
|
+ """检查数据是否准备好"""
|
|
|
+ odps = ODPS(
|
|
|
+ access_id=config_.ODPS_CONFIG['ACCESSID'],
|
|
|
+ secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
|
|
|
+ project=project,
|
|
|
+ endpoint=config_.ODPS_CONFIG['ENDPOINT'],
|
|
|
+ connect_timeout=3000,
|
|
|
+ read_timeout=500000,
|
|
|
+ pool_maxsize=1000,
|
|
|
+ pool_connections=1000
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ dt = datetime.datetime.strftime(now_date, '%Y%m%d')
|
|
|
+ check_res = check_table_partition_exits(date=dt, project=project, table=table)
|
|
|
+ if check_res:
|
|
|
+ sql = f'select * from {project}.{table} where dt = {dt}'
|
|
|
+ with odps.execute_sql(sql=sql).open_reader() as reader:
|
|
|
+ data_count = reader.count
|
|
|
+ else:
|
|
|
+ data_count = 0
|
|
|
+ except Exception as e:
|
|
|
+ data_count = 0
|
|
|
+ return data_count
|
|
|
+
|
|
|
+
|
|
|
+def get_title_data(project, table, now_date):
|
|
|
+ """获取站内外视频标题数据"""
|
|
|
+ dt = datetime.datetime.strftime(now_date, '%Y%m%d')
|
|
|
+ records = get_data_from_odps(date=dt, project=project, table=table)
|
|
|
+ feature_data = []
|
|
|
+ for record in records:
|
|
|
+ item = {}
|
|
|
+ for feature_name in features:
|
|
|
+ item[feature_name] = record[feature_name]
|
|
|
+ feature_data.append(item)
|
|
|
+ feature_df = pd.DataFrame(feature_data)
|
|
|
+ return feature_df
|
|
|
+
|
|
|
+
|
|
|
+def update_cut_words_result(text, source, words_list):
|
|
|
+ """
|
|
|
+ 分词结果入库
|
|
|
+ :param text: 原始文本 type-string
|
|
|
+ :param source: 文本来源 type-int
|
|
|
+ :param words_list: 分词结果 type-list
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ # 分词结果拼接成字符串
|
|
|
+ words = ','.join(words_list)
|
|
|
+ # 判断原始文本是否已存在
|
|
|
+ select_sql = f"SELECT id FROM word.cut_words_result WHERE text = '{text}';"
|
|
|
+ res = mysql_helper.get_data(sql=select_sql)
|
|
|
+ if res is None or len(res) == 0:
|
|
|
+ # 不存在,插入
|
|
|
+ insert_sql = f"insert into word.cut_words_result (text, words, source) values ('{text}', '{words}', {source});"
|
|
|
+ log_.info(f"insert_sql = {insert_sql}")
|
|
|
+ mysql_helper.add_data(sql=insert_sql)
|
|
|
+ else:
|
|
|
+ # 存在,更新
|
|
|
+ update_sql = f"""update word.cut_words_result set words = '{words}', source = {source},
|
|
|
+ update_time = '{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}' where id = {res[0][0]};"""
|
|
|
+ log_.info(f"update_sql = {update_sql}")
|
|
|
+ mysql_helper.add_data(sql=update_sql)
|
|
|
+
|
|
|
+
|
|
|
+def update_hot_word(words_list, source):
|
|
|
+ """
|
|
|
+ 词入库
|
|
|
+ :param words_list: 词列表 type-list
|
|
|
+ :param source: 词来源 type-int
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ for word in words_list:
|
|
|
+ if len(word) == 0:
|
|
|
+ continue
|
|
|
+ # 判断词是否已存在
|
|
|
+ select_sql = f"SELECT id, source FROM word.hot_word WHERE word = '{word}';"
|
|
|
+ res = mysql_helper.get_data(sql=select_sql)
|
|
|
+ if res is None or len(res) == 0:
|
|
|
+ # 不存在,插入
|
|
|
+ insert_sql = f"insert into word.hot_word (word, source) values ('{word}', {source});"
|
|
|
+ mysql_helper.add_data(sql=insert_sql)
|
|
|
+ else:
|
|
|
+ # 存在,更新
|
|
|
+ if source != res[0][1]:
|
|
|
+ source = 3
|
|
|
+ update_sql = \
|
|
|
+ f"""update word.hot_word set source = {source},
|
|
|
+ update_time = '{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}' where id = {res[0][0]};"""
|
|
|
+ mysql_helper.add_data(sql=update_sql)
|
|
|
+
|
|
|
+
|
|
|
+def data_update(project, table, now_date):
|
|
|
+ """数据更新"""
|
|
|
+ # 获取站内外视频标题数据
|
|
|
+ df = get_title_data(project=project, table=table, now_date=now_date)
|
|
|
+ df['source'] = df['source'].astype(int)
|
|
|
+ for source in [1, 2]:
|
|
|
+ df_temp = df[df['source'] == source]
|
|
|
+ title_list = df_temp['title'].to_list()
|
|
|
+ log_.info(f"source = {source}, count = {len(title_list)}")
|
|
|
+ for title in title_list:
|
|
|
+ log_.info(f"title: {title}")
|
|
|
+ if len(title) == 0:
|
|
|
+ return
|
|
|
+ # 1. 分词
|
|
|
+ words_list = word_cut(text=title)
|
|
|
+ log_.info(f"words_list: {words_list}")
|
|
|
+ # 2. 分词结果入库
|
|
|
+ update_cut_words_result(text=title, source=source, words_list=words_list)
|
|
|
+ # 3. 词入库
|
|
|
+ update_hot_word(words_list=words_list, source=source)
|
|
|
+
|
|
|
+
|
|
|
+def timer_check():
|
|
|
+ try:
|
|
|
+ project = config_.TITLE_DATA['project']
|
|
|
+ table = config_.TITLE_DATA['table']
|
|
|
+ now_date = datetime.datetime.today()
|
|
|
+ log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
|
|
|
+ # 查看当天更新的数据是否已准备好
|
|
|
+ data_count = data_check(project=project, table=table, now_date=now_date)
|
|
|
+ if data_count > 0:
|
|
|
+ log_.info(f'data_count = {data_count}')
|
|
|
+ # 数据准备好,进行更新
|
|
|
+ data_update(project=project, table=table, now_date=now_date)
|
|
|
+ log_.info(f"data update end!")
|
|
|
+ else:
|
|
|
+ # 数据没准备好,1分钟后重新检查
|
|
|
+ Timer(60, timer_check).start()
|
|
|
+ except Exception as e:
|
|
|
+ log_.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ timer_check()
|