Explorar o código

add words_func & update

liqian %!s(int64=2) %!d(string=hai) anos
pai
achega
c24d03a4fa
Modificáronse 3 ficheiros con 128 adicións e 3 borrados
  1. 19 1
      app.py
  2. 2 2
      config.py
  3. 107 0
      words_func.py

+ 19 - 1
app.py

@@ -1,4 +1,5 @@
 import json
+import time
 import traceback
 from gevent import monkey
 monkey.patch_all()
@@ -6,6 +7,7 @@ monkey.patch_all()
 from flask import Flask, request
 from log import Log
 from config import set_config
+from words_func import get_words, update_wechat_score_data
 
 app = Flask(__name__)
 log_ = Log()
@@ -21,9 +23,20 @@ def health_check():
 @app.route('/hot/word/getAllWords', methods=['GET', 'POST'])
 def get_all_words():
     try:
+        start_time = time.time()
         request_data = json.loads(request.get_data())
         page_num = request_data.get('pageNum', 1)
         page_size = request_data.get('pageSize', 100)
+        words = get_words(page_num=page_num, page_size=page_size)
+        result = {'code': 200, 'message': 'success', 'data': {'words': words}}
+        log_message = {
+            'requestUri': '/hot/word/getAllWords',
+            'logTimestamp': int(time.time() * 1000),
+            'result': result,
+            'executeTime': (time.time() - start_time) * 1000
+        }
+        log_.info(log_message)
+        return json.dumps(result)
     except Exception as e:
         log_.error(traceback.format_exc())
         result = {'code': -1, 'message': 'fail'}
@@ -32,10 +45,15 @@ def get_all_words():
 
 # 更新热点词的微信指数
 @app.route('/hot/word/updateWechatScore', methods=['GET', 'POST'])
-def get_all_words():
+def update_wechat_score():
     try:
+        start_time = time.time()
         request_data = json.loads(request.get_data())
         data = request_data.get('data', None)
+        update_wechat_score_data(data=data)
+        result = {'code': 200, 'message': 'success'}
+        log_.info(f"executeTime = {(time.time() - start_time) * 1000}")
+        return json.dumps(result)
     except Exception as e:
         log_.error(traceback.format_exc())
         result = {'code': -1, 'message': 'fail'}

+ 2 - 2
config.py

@@ -137,8 +137,8 @@ class ProductionConfig(BaseConfig):
 
 def set_config():
     # 获取环境变量 ROV_OFFLINE_ENV
-    # env = os.environ.get('ROV_OFFLINE_ENV')
-    env = 'dev'
+    env = os.environ.get('ROV_OFFLINE_ENV')
+    # env = 'dev'
     if env is None:
         # log_.error('ENV ERROR: is None!')
         return

+ 107 - 0
words_func.py

@@ -0,0 +1,107 @@
+import traceback
+
+from db_helper import MysqlHelper
+from log import Log
+
+mysql_helper = MysqlHelper()
+log_ = Log()
+
+
+def get_words(page_num, page_size):
+    """
+    分页获取所有热点词
+    :param page_num: 页码
+    :param page_size: 每页请求条目数
+    :return: words
+    """
+    try:
+        sql = f"select id, word from word.hot_word order by id limit {(page_num-1)*page_size}, {page_size};"
+        data = mysql_helper.get_data(sql=sql)
+        if data is None:
+            return None
+        words = []
+        for id_, word in data:
+            words.append({'id': id_, 'word': word})
+        return words
+    except Exception as e:
+        log_.error(traceback.format_exc())
+        return None
+
+
+def update_wechat_score_data(data):
+    """
+    根据爬取到的微信指数数据更新数据库
+    :param data:
+    :return:
+    """
+    if data is None or len(data) == 0:
+        log_.info(f"无需要更新的数据!")
+        return
+    # 爬取数据解析
+    log_.info(f"data count = {len(data)}")
+    wechat_score_data_list = []
+    for item in data:
+        word_id = item.get('id')
+        word = item.get('word')
+        wechat_scores = item.get('wechatScores')
+        if wechat_scores is None or len(wechat_scores) == 0:
+            continue
+        for score_data in wechat_scores:
+            score = score_data.get('score')
+            score_date = score_data.get('scoreDate')
+            wechat_score_data_list.append({'word_id': word_id, 'word': word, 'score': score, 'score_date': score_date})
+    log_.info(f"wechat_score_data_list count = {len(wechat_score_data_list)}")
+    # update or insert 数据区分
+    update_data = []
+    insert_data = []
+    for wechat_score_data in wechat_score_data_list:
+        select_sql = f"SELECT id FROM word.word_wechat_score " \
+                     f"WHERE word_id = {wechat_score_data['word_id']} " \
+                     f"AND score_date = '{wechat_score_data['score_date']}';"
+        res = mysql_helper.get_data(sql=select_sql)
+        if res is None:
+            continue
+        if len(res) == 0:
+            insert_data.append(wechat_score_data)
+        else:
+            id_ = res[0][0]
+            wechat_score_data['id'] = id_
+            update_data.append(wechat_score_data)
+    log_.info(f"update_data = {len(update_data)}")
+    log_.info(f"insert_data = {len(insert_data)}")
+
+    # 批量插入
+    if len(insert_data) > 0:
+        for i in range(len(insert_data) // 100 + 1):
+            log_.info(f"insert i = {i}")
+            insert_temp_data = insert_data[i * 100:(i + 1) * 100]
+            if len(insert_temp_data) > 0:
+                insert_sql_values = ', '.join([f"({item['word_id']}, {item['score']}, '{item['score_date']}')"
+                                               for item in insert_temp_data])
+                insert_sql = f"insert into word.word_wechat_score (word_id, wechat_score, score_date) " \
+                             f"values {insert_sql_values};"
+                mysql_helper.add_data(sql=insert_sql)
+        log_.info(f"insert wechat score data finished! insert count = {len(insert_data)}")
+
+    # 批量更新
+    if len(update_data) > 0:
+        for i in range(len(update_data) // 100 + 1):
+            log_.info(f"update i = {i}")
+            update_temp_data = update_data[i * 100:(i + 1) * 100]
+            if len(update_temp_data) > 0:
+                update_id = [item['id'] for item in update_temp_data]
+                update_sql_values = ' '.join([f"when {item['id']} then {item['score']}" for item in update_temp_data])
+                if len(update_id) > 1:
+                    update_sql = f"update word.word_wechat_score set wechat_score = " \
+                                 f"case id {update_sql_values} end where id in {tuple(update_id)};"
+                else:
+                    update_sql = f"update word.word_wechat_score set wechat_score = " \
+                                 f"case id {update_sql_values} end where id in ({update_id[0]});"
+                mysql_helper.add_data(sql=update_sql)
+    log_.info(f"update wechat score data finished! update count = {len(update_data)}")
+
+
+if __name__ == '__main__':
+    get_words(8, 100)
+    # get_words(1, 20)
+    # get_words(2, 10)