Explorar el Código

Update alg_growth_3rd_gh_reply_video_v1: change gh_detail source to mysql

StrayWarrior hace 4 meses
padre
commit
052795a424
Se han modificado 2 ficheros con 20 adiciones y 10 borrados
  1. 17 8
      alg_growth_3rd_gh_reply_video_v1.py
  2. 3 2
      db_helper.py

+ 17 - 8
alg_growth_3rd_gh_reply_video_v1.py

@@ -6,6 +6,7 @@ import odps
 from odps import ODPS
 import json
 import time
+from pymysql.cursors import DictCursor
 from datetime import datetime, timedelta
 from db_helper import MysqlHelper
 from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \
@@ -41,20 +42,27 @@ STATS_PERIOD_DAYS = 5
 SEND_N = 1
 
 def get_and_update_gh_ids(run_dt):
-    gh = get_odps_df_of_max_partition(ODS_PROJECT, GH_DETAIL, {'dt': run_dt})
-    gh = gh.to_pandas()
+    db = MysqlHelper(CONFIG.MYSQL_GROWTH_INFO)
     gh_type = AutoReplyAccountType.EXTERNAL_GZH.value
-    gh = gh.query(f'type == {gh_type} and is_delete == 0')
+    sqlstr = f"""
+        SELECT gh_id, gh_name, category1, category2, channel,
+               video_ids, strategy_status
+        FROM {GH_DETAIL}
+        WHERE is_delete = 0 AND `type` = {gh_type}
+        """
+    account_data = db.get_data(sqlstr, DictCursor)
+    account_df = pd.DataFrame(account_data)
+
     # default单独处理
-    if 'default' not in gh['gh_id'].values:
+    if 'default' not in account_df['gh_id'].values:
         new_row = pd.DataFrame({'gh_id': ['default'], 'gh_name': ['默认'], 'type': [2], 'category1': ['泛生活']},
                                index=[0])
-        gh = pd.concat([gh, new_row], ignore_index=True)
+        account_df = pd.concat([account_df, new_row], ignore_index=True)
 
-    gh = gh.drop_duplicates(subset=['gh_id'])
+    account_df = account_df.drop_duplicates(subset=['gh_id'])
     global GH_IDS
-    GH_IDS = tuple(gh['gh_id'])
-    return gh
+    GH_IDS = tuple(account_df['gh_id'])
+    return account_df
 
 
 def check_data_partition(project, table, data_dt, data_hr=None):
@@ -356,6 +364,7 @@ def main():
                 time.sleep(60)
     except Exception as e:
         LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
+        return
         if CONFIG.ENV_TEXT == '开发环境':
             return
         send_msg_to_feishu(

+ 3 - 2
db_helper.py

@@ -339,7 +339,7 @@ class MysqlHelper(object):
         """
         self.mysql_info = mysql_info
 
-    def get_data(self, sql):
+    def get_data(self, sql, cursor_type=None):
         """
         查询数据
         :param sql: sql语句
@@ -348,13 +348,14 @@ class MysqlHelper(object):
         # 连接数据库
         conn = pymysql.connect(**self.mysql_info)
         # 创建游标
-        cursor = conn.cursor()
+        cursor = conn.cursor(cursor_type)
         try:
             # 执行SQL语句
             cursor.execute(sql)
             # 获取查询的所有记录
             data = cursor.fetchall()
         except Exception as e:
+            print(e)
             return None
         # 关闭游标对象
         cursor.close()