""" @author: luojunhui """ import time import asyncio import datetime import traceback import threading from concurrent.futures import ThreadPoolExecutor, as_completed from tasks.history_task import historyContentIdTask from applications.db import AsyncMySQLClient from applications.log import logging from applications.feishu import bot from applications.const import HistoryContentIdTaskConst async def do_job(publish_flag): """ main job :return: """ async_mysql_pool = AsyncMySQLClient() try: await async_mysql_pool.init_pool() logging( code="history0001", info="Init MySQL pool successfully", alg="historyContentIdTask", function="main" ) except Exception as e: logging( code="history0002", info="Init MySQL pool failed", alg="historyContentIdTask", function="main", data={ "error": str(e), "traceback": traceback.format_exc() } ) bot( title="historyContentIdTask INIT MYSQL FAILS", detail={ "error": str(e), "traceback": traceback.format_exc() } ) try: history_content_id_task = historyContentIdTask(async_mysql_pool, publish_flag) except Exception as e: logging( code="history0003", info="Init historyContentIdTask failed", alg="historyContentIdTask", function="main", data={ "error": str(e), "traceback": traceback.format_exc() } ) return await history_content_id_task.deal() def run_thread(publish_flag, sleep_seconds, stop_flag): """ 执行进程 """ while not stop_flag.is_set(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(do_job(publish_flag=publish_flag)) loop.close() now_str = datetime.datetime.now().__str__() logging( code="history0004", info="History task finished", function="run_thread" ) print("Task{}--{} 请求执行完成, 等待{}s".format(publish_flag, now_str, sleep_seconds)) time.sleep(sleep_seconds) def main(): """ main function """ const = HistoryContentIdTaskConst() stop_event = threading.Event() # 启动两个线程,分别执行两个函数 with ThreadPoolExecutor(max_workers=2) as executor: futures = [ executor.submit(run_thread, const.NEED_PUBLISH, const.NEED_PUBLISH_WAIT_TIME, stop_event), executor.submit(run_thread, const.DO_NOT_NEED_PUBLISH, const.DO_NOT_NEED_PUBLISH_WAIT_TIME, stop_event) ] try: for future in as_completed(futures): future.result() except KeyboardInterrupt: print("Stopping all threads...") stop_event.set() if __name__ == '__main__': main()