|
@@ -5,7 +5,7 @@ import traceback
|
|
|
import odps
|
|
|
from odps import ODPS
|
|
|
import json
|
|
|
-from threading import Timer
|
|
|
+import time
|
|
|
from datetime import datetime, timedelta
|
|
|
from db_helper import MysqlHelper
|
|
|
from my_utils import check_table_partition_exits_v2, get_dataframe_from_odps, \
|
|
@@ -321,7 +321,7 @@ def build_and_transfer_data(run_dt, run_hour, project, **kwargs):
|
|
|
rows = mysql.execute(sql)
|
|
|
|
|
|
|
|
|
-def main_loop():
|
|
|
+def main():
|
|
|
argparser = ArgumentParser()
|
|
|
argparser.add_argument('-n', '--dry-run', action='store_true')
|
|
|
argparser.add_argument('--run-at',help='assume to run at date and hour, yyyyMMddHH')
|
|
@@ -340,19 +340,20 @@ def main_loop():
|
|
|
last_dt = last_date.strftime("%Y%m%d")
|
|
|
# 查看当前天级更新的数据是否已准备好
|
|
|
# 当前上游统计表为天级更新,但字段设计为兼容小时级
|
|
|
- h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
|
|
|
- if h_data_count > 0:
|
|
|
- LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
|
|
|
- run_dt = run_date.strftime("%Y%m%d")
|
|
|
- run_hour = run_date.strftime("%H")
|
|
|
- LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}')
|
|
|
- build_and_transfer_data(run_dt, run_hour, ODS_PROJECT,
|
|
|
- dry_run=args.dry_run)
|
|
|
- LOGGER.info('数据更新完成')
|
|
|
- else:
|
|
|
- LOGGER.info("上游数据未就绪,等待60s")
|
|
|
- Timer(60, main_loop).start()
|
|
|
- return
|
|
|
+ while True:
|
|
|
+ h_data_count = check_data_partition(ODS_PROJECT, GH_REPLY_STATS_TABLE, last_dt, '00')
|
|
|
+ if h_data_count > 0:
|
|
|
+ LOGGER.info('上游数据表查询数据条数={},开始计算'.format(h_data_count))
|
|
|
+ run_dt = run_date.strftime("%Y%m%d")
|
|
|
+ run_hour = run_date.strftime("%H")
|
|
|
+ LOGGER.info(f'run_dt: {run_dt}, run_hour: {run_hour}')
|
|
|
+ build_and_transfer_data(run_dt, run_hour, ODS_PROJECT,
|
|
|
+ dry_run=args.dry_run)
|
|
|
+ LOGGER.info('数据更新完成')
|
|
|
+ return
|
|
|
+ else:
|
|
|
+ LOGGER.info("上游数据未就绪,等待60s")
|
|
|
+ time.sleep(60)
|
|
|
except Exception as e:
|
|
|
LOGGER.error(f"数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
|
|
|
if CONFIG.ENV_TEXT == '开发环境':
|
|
@@ -369,4 +370,4 @@ def main_loop():
|
|
|
if __name__ == '__main__':
|
|
|
LOGGER.info("%s 开始执行" % os.path.basename(__file__))
|
|
|
LOGGER.info(f"environment: {CONFIG.ENV_TEXT}")
|
|
|
- main_loop()
|
|
|
+ main()
|