cut_words.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import datetime
  2. import traceback
  3. import pandas as pd
  4. from odps import ODPS
  5. from threading import Timer
  6. from config import set_config
  7. from log import Log
  8. from utils.utils import get_data_from_odps
  9. from words_func import word_cut
  10. from db_helper import MysqlHelper
  11. config_, env = set_config()
  12. log_ = Log()
  13. mysql_helper = MysqlHelper()
  14. features = ['title', 'source']
  15. def check_table_partition_exits(date, project, table, connect_timeout=3000, read_timeout=500000,
  16. pool_maxsize=1000, pool_connections=1000):
  17. """
  18. 判断表中是否存在这个分区
  19. :param date: 日期 type-string '%Y%m%d'
  20. :param project: type-string
  21. :param table: 表名 type-string
  22. :param connect_timeout: 连接超时设置
  23. :param read_timeout: 读取超时设置
  24. :param pool_maxsize:
  25. :param pool_connections:
  26. :return: records
  27. """
  28. odps = ODPS(
  29. access_id=config_.ODPS_CONFIG['ACCESSID'],
  30. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  31. project=project,
  32. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  33. connect_timeout=connect_timeout,
  34. read_timeout=read_timeout,
  35. pool_maxsize=pool_maxsize,
  36. pool_connections=pool_connections
  37. )
  38. t = odps.get_table(name=table)
  39. return t.exist_partition(partition_spec=f'dt={date}')
  40. def data_check(project, table, now_date):
  41. """检查数据是否准备好"""
  42. odps = ODPS(
  43. access_id=config_.ODPS_CONFIG['ACCESSID'],
  44. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  45. project=project,
  46. endpoint=config_.ODPS_CONFIG['ENDPOINT'],
  47. connect_timeout=3000,
  48. read_timeout=500000,
  49. pool_maxsize=1000,
  50. pool_connections=1000
  51. )
  52. try:
  53. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  54. check_res = check_table_partition_exits(date=dt, project=project, table=table)
  55. if check_res:
  56. sql = f'select * from {project}.{table} where dt = {dt}'
  57. with odps.execute_sql(sql=sql).open_reader() as reader:
  58. data_count = reader.count
  59. else:
  60. data_count = 0
  61. except Exception as e:
  62. data_count = 0
  63. return data_count
  64. def get_title_data(project, table, now_date):
  65. """获取站内外视频标题数据"""
  66. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  67. records = get_data_from_odps(date=dt, project=project, table=table)
  68. feature_data = []
  69. for record in records:
  70. item = {}
  71. for feature_name in features:
  72. item[feature_name] = record[feature_name]
  73. feature_data.append(item)
  74. feature_df = pd.DataFrame(feature_data)
  75. return feature_df
  76. def update_cut_words_result(text, source, words_list):
  77. """
  78. 分词结果入库
  79. :param text: 原始文本 type-string
  80. :param source: 文本来源 type-int
  81. :param words_list: 分词结果 type-list
  82. :return:
  83. """
  84. # 分词结果拼接成字符串
  85. words = ','.join(words_list)
  86. # 判断原始文本是否已存在
  87. select_sql = f"SELECT id FROM word.cut_words_result WHERE text = '{text}';"
  88. res = mysql_helper.get_data(sql=select_sql)
  89. if res is None or len(res) == 0:
  90. # 不存在,插入
  91. insert_sql = f"insert into word.cut_words_result (text, words, source) values ('{text}', '{words}', {source});"
  92. log_.info(f"insert_sql = {insert_sql}")
  93. mysql_helper.add_data(sql=insert_sql)
  94. else:
  95. # 存在,更新
  96. update_sql = f"""update word.cut_words_result set words = '{words}', source = {source},
  97. update_time = '{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}' where id = {res[0][0]};"""
  98. log_.info(f"update_sql = {update_sql}")
  99. mysql_helper.add_data(sql=update_sql)
  100. def update_hot_word(words_list, source):
  101. """
  102. 词入库
  103. :param words_list: 词列表 type-list
  104. :param source: 词来源 type-int
  105. :return:
  106. """
  107. for word in words_list:
  108. if len(word) == 0:
  109. continue
  110. # 判断词是否已存在
  111. select_sql = f"SELECT id, source FROM word.hot_word WHERE word = '{word}';"
  112. res = mysql_helper.get_data(sql=select_sql)
  113. if res is None or len(res) == 0:
  114. # 不存在,插入
  115. insert_sql = f"insert into word.hot_word (word, source) values ('{word}', {source});"
  116. mysql_helper.add_data(sql=insert_sql)
  117. else:
  118. # 存在,更新
  119. if source != res[0][1]:
  120. source = 3
  121. update_sql = \
  122. f"""update word.hot_word set source = {source},
  123. update_time = '{datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}' where id = {res[0][0]};"""
  124. mysql_helper.add_data(sql=update_sql)
  125. def data_update(project, table, now_date):
  126. """数据更新"""
  127. # 获取站内外视频标题数据
  128. df = get_title_data(project=project, table=table, now_date=now_date)
  129. df['source'] = df['source'].astype(int)
  130. for source in [1, 2]:
  131. df_temp = df[df['source'] == source]
  132. title_list = df_temp['title'].to_list()
  133. log_.info(f"source = {source}, count = {len(title_list)}")
  134. for title in title_list:
  135. log_.info(f"title: {title}")
  136. if len(title) == 0:
  137. return
  138. # 1. 分词
  139. words_list = word_cut(text=title)
  140. log_.info(f"words_list: {words_list}")
  141. # 2. 分词结果入库
  142. update_cut_words_result(text=title, source=source, words_list=words_list)
  143. # 3. 词入库
  144. update_hot_word(words_list=words_list, source=source)
  145. def timer_check():
  146. try:
  147. project = config_.TITLE_DATA['project']
  148. table = config_.TITLE_DATA['table']
  149. now_date = datetime.datetime.today()
  150. log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
  151. # 查看当天更新的数据是否已准备好
  152. data_count = data_check(project=project, table=table, now_date=now_date)
  153. if data_count > 0:
  154. log_.info(f'data_count = {data_count}')
  155. # 数据准备好,进行更新
  156. data_update(project=project, table=table, now_date=now_date)
  157. log_.info(f"data update end!")
  158. else:
  159. # 数据没准备好,1分钟后重新检查
  160. Timer(60, timer_check).start()
  161. except Exception as e:
  162. log_.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
  163. if __name__ == '__main__':
  164. timer_check()