app.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import random
  2. import os
  3. import logging
  4. import json
  5. import time
  6. import traceback
  7. import ast
  8. from gevent import monkey
  9. monkey.patch_all()
  10. from flask import Flask, request
  11. from mq_http_sdk.mq_exception import MQExceptionBase
  12. from mq_http_sdk.mq_producer import *
  13. from mq_http_sdk.mq_client import *
  14. from gpt_process import title_generate
  15. from log import Log
  16. from config import set_config
  17. from db_helper import RedisHelper
  18. from gevent.pywsgi import WSGIServer
  19. from multiprocessing import cpu_count, Process
  20. # from werkzeug.middleware.profiler import ProfilerMiddleware
  21. # from geventwebsocket.handler import WebSocketHandler
  22. app = Flask(__name__)
  23. log_ = Log()
  24. config_ = set_config()
  25. def title_generate_main():
  26. # 初始化client
  27. mq_client = MQClient(
  28. # 设置HTTP协议客户端接入点
  29. host=config_.MQ_CONFIG['ENDPOINT'],
  30. # AccessKey ID,阿里云身份验证标识
  31. access_id=config_.MQ_CONFIG['ACCESS_KEY'],
  32. # AccessKey Secret,阿里云身份验证密钥
  33. access_key=config_.MQ_CONFIG['SECRET_KEY']
  34. )
  35. # Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
  36. # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
  37. instance_id = config_.MQ_CONFIG['INSTANCE_ID']
  38. # 监听消息所属的Topic
  39. todo_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name']
  40. # 您在消息队列RocketMQ版控制台创建的Group ID。
  41. group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id']
  42. consumer = mq_client.get_consumer(instance_id, todo_topic_name, group_id)
  43. # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
  44. # 长轮询时间3秒(最多可设置为30秒)。
  45. wait_seconds = 3
  46. # 一次最多消费1条(最多可设置为16条)。
  47. batch = 1
  48. print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
  49. % (10 * "=", 10 * "=", todo_topic_name, group_id, wait_seconds)))
  50. while True:
  51. receipt_handle_list = []
  52. try:
  53. # 长轮询消费消息。
  54. recv_msgs = consumer.consume_message(batch, wait_seconds)
  55. for msg in recv_msgs:
  56. print(("Receive, MessageId: %s\nMessageBodyMD5: %s \
  57. \nMessageTag: %s\nConsumedTimes: %s \
  58. \nPublishTime: %s\nBody: %s \
  59. \nNextConsumeTime: %s \
  60. \nReceiptHandle: %s \
  61. \nProperties: %s\n" % \
  62. (msg.message_id, msg.message_body_md5,
  63. msg.message_tag, msg.consumed_times,
  64. msg.publish_time, msg.message_body,
  65. msg.next_consume_time, msg.receipt_handle, msg.properties)))
  66. video_id = msg.message_body['videoId']
  67. video_path = msg.message_body['videoPath']
  68. try:
  69. title = title_generate(video_id=video_id, video_path=video_path)
  70. except ConnectionResetError:
  71. # API限流
  72. log_.info(video_id)
  73. # 记录重试次数
  74. key_name = f"{config_.TITLE_GENERATE_RETRY_KEY_NAME_PREFIX}{video_id}"
  75. redis_helper = RedisHelper()
  76. redis_helper.setnx_key(key_name=key_name, value=0, expire_time=2*3600)
  77. redis_helper.incr_key(key_name=key_name, amount=1, expire_time=2*3600)
  78. # 判断已重试次数
  79. retry_count = redis_helper.get_data_from_redis(key_name=key_name)
  80. if retry_count is not None and retry_count == config_.RETRY_MAX_COUNT:
  81. # 确认消息消费成功
  82. receipt_handle_list.append(msg.receipt_handle)
  83. pass
  84. else:
  85. pass
  86. except Exception:
  87. # 确认消息消费成功
  88. receipt_handle_list.append(msg.receipt_handle)
  89. log_.info(traceback.format_exc())
  90. else:
  91. # 1. 发送结果至done消息队列
  92. print(title)
  93. # 2. 确认消息消费成功
  94. receipt_handle_list.append(msg.receipt_handle)
  95. except MQExceptionBase as e:
  96. # Topic中没有消息可消费。
  97. if e.type == "MessageNotExist":
  98. print(("No new message! RequestId: %s" % e.req_id))
  99. continue
  100. print(("Consume Message Fail! Exception:%s\n" % e))
  101. time.sleep(2)
  102. continue
  103. # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
  104. # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
  105. try:
  106. consumer.ack_message(receipt_handle_list)
  107. print(("Ak %s Message Succeed.\n\n" % len(receipt_handle_list)))
  108. except MQExceptionBase as e:
  109. print(("\nAk Message Fail! Exception:%s" % e))
  110. # 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
  111. if e.sub_errors:
  112. for sub_error in e.sub_errors:
  113. print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
  114. (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))
  115. if __name__ == '__main__':
  116. title_generate_main()