123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- """
- @author: luojunhui
- """
- import time
- import asyncio
- import datetime
- import traceback
- import threading
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from tasks.history_task import historyContentIdTask
- from applications.db import AsyncMySQLClient
- from applications.log import logging
- from applications.feishu import bot
- from applications.config.const import HistoryContentIdTaskConst
- async def do_job(publish_flag):
- """
- main job
- :return:
- """
- async_mysql_pool = AsyncMySQLClient()
- try:
- await async_mysql_pool.init_pool()
- logging(
- code="history0001",
- info="Init MySQL pool successfully",
- alg="historyContentIdTask",
- function="main"
- )
- except Exception as e:
- logging(
- code="history0002",
- info="Init MySQL pool failed",
- alg="historyContentIdTask",
- function="main",
- data={
- "error": str(e),
- "traceback": traceback.format_exc()
- }
- )
- bot(
- title="historyContentIdTask INIT MYSQL FAILS",
- detail={
- "error": str(e),
- "traceback": traceback.format_exc()
- }
- )
- try:
- history_content_id_task = historyContentIdTask(async_mysql_pool, publish_flag)
- except Exception as e:
- logging(
- code="history0003",
- info="Init historyContentIdTask failed",
- alg="historyContentIdTask",
- function="main",
- data={
- "error": str(e),
- "traceback": traceback.format_exc()
- }
- )
- return
- await history_content_id_task.deal()
- def run_thread(publish_flag, sleep_seconds, stop_flag):
- """
- 执行进程
- """
- while not stop_flag.is_set():
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- loop.run_until_complete(do_job(publish_flag=publish_flag))
- loop.close()
- now_str = datetime.datetime.now().__str__()
- logging(
- code="history0004",
- info="History task finished",
- function="run_thread"
- )
- print("Task{}--{} 请求执行完成, 等待{}s".format(publish_flag, now_str, sleep_seconds))
- time.sleep(sleep_seconds)
- def main():
- """
- main function
- """
- const = HistoryContentIdTaskConst()
- stop_event = threading.Event()
- # 启动两个线程,分别执行两个函数
- with ThreadPoolExecutor(max_workers=2) as executor:
- futures = [
- executor.submit(run_thread, const.NEED_PUBLISH, const.NEED_PUBLISH_WAIT_TIME, stop_event),
- executor.submit(run_thread, const.DO_NOT_NEED_PUBLISH, const.DO_NOT_NEED_PUBLISH_WAIT_TIME, stop_event)
- ]
- try:
- for future in as_completed(futures):
- future.result()
- except KeyboardInterrupt:
- print("Stopping all threads...")
- stop_event.set()
- if __name__ == '__main__':
- main()
|