app.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. import random
  2. import os
  3. import logging
  4. import json
  5. import threading
  6. import time
  7. import traceback
  8. import ast
  9. from gevent import monkey
  10. monkey.patch_all()
  11. from flask import Flask, request
  12. from mq_http_sdk.mq_exception import MQExceptionBase
  13. from mq_http_sdk.mq_producer import *
  14. from mq_http_sdk.mq_client import *
  15. from gpt_process import title_generate
  16. from log import Log
  17. from config import set_config
  18. from db_helper import RedisHelper
  19. from gevent.pywsgi import WSGIServer
  20. from multiprocessing import cpu_count, Process
  21. log_ = Log()
  22. config_ = set_config()
  23. def title_generate_main0():
  24. # log_.info("debug: title_generate_main")
  25. # 初始化client
  26. mq_client = MQClient(
  27. # 设置HTTP协议客户端接入点
  28. host=config_.MQ_CONFIG['ENDPOINT'],
  29. # AccessKey ID,阿里云身份验证标识
  30. access_id=config_.MQ_CONFIG['ACCESS_KEY'],
  31. # AccessKey Secret,阿里云身份验证密钥
  32. access_key=config_.MQ_CONFIG['SECRET_KEY']
  33. )
  34. # Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
  35. # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
  36. instance_id = config_.MQ_CONFIG['INSTANCE_ID']
  37. # 监听消息所属的Topic
  38. consumer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name']
  39. # 您在消息队列RocketMQ版控制台创建的Group ID。
  40. group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id']
  41. consumer = mq_client.get_consumer(instance_id, consumer_topic_name, group_id)
  42. # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
  43. # 长轮询时间3秒(最多可设置为30秒)。
  44. wait_seconds = 3
  45. # 一次最多消费1条(最多可设置为16条)。
  46. batch = 1
  47. print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
  48. % (10 * "=", 10 * "=", consumer_topic_name, group_id, wait_seconds)))
  49. while True:
  50. receipt_handle_list = []
  51. try:
  52. # 长轮询消费消息。
  53. start_time0 = time.time()
  54. recv_msgs = consumer.consume_message(batch, wait_seconds)
  55. for msg in recv_msgs:
  56. start_time = time.time()
  57. receive_info = {
  58. 'messageId': msg.message_id,
  59. 'messageBodyMD5': msg.message_body_md5,
  60. 'messageTag': msg.message_tag,
  61. 'consumedTimes': msg.consumed_times,
  62. 'publishTime': msg.publish_time,
  63. 'body': msg.message_body,
  64. 'nextConsumeTime': msg.next_consume_time,
  65. 'receiptHandle': msg.receipt_handle,
  66. 'properties': msg.properties
  67. }
  68. log_.info(receive_info)
  69. # log_.info(f"debug: {receive_info}")
  70. message_body = json.loads(msg.message_body)
  71. video_id = message_body['videoId']
  72. video_path = message_body['videoPath']
  73. try:
  74. # ppp = {'videoId': video_id, 'videoPath': video_path}
  75. # log_.info(f"debug: {ppp}")
  76. title = title_generate(video_id=video_id, video_path=video_path)
  77. if title[0] in ['"', "'"] and title[-1] in ['"', "'"]:
  78. title = title[1:-1]
  79. log_.info({'titleGenerate': {'videoId': video_id, 'title': title}})
  80. except ConnectionResetError:
  81. # API限流
  82. # 记录重试次数
  83. key_name = f"{config_.TITLE_GENERATE_RETRY_KEY_NAME_PREFIX}{video_id}"
  84. redis_helper = RedisHelper()
  85. redis_helper.setnx_key(key_name=key_name, value=0, expire_time=2*3600)
  86. redis_helper.incr_key(key_name=key_name, amount=1, expire_time=2*3600)
  87. # 判断已重试次数
  88. retry_count = redis_helper.get_data_from_redis(key_name=key_name)
  89. log_.error({'errorType': 'ConnectionResetError', 'videoId': video_id, 'retryCount': retry_count})
  90. if retry_count is not None and retry_count == config_.RETRY_MAX_COUNT:
  91. # 添加消息handle到ack_message列表
  92. receipt_handle_list.append(msg.receipt_handle)
  93. pass
  94. except Exception:
  95. # 添加消息handle到ack_message列表
  96. receipt_handle_list.append(msg.receipt_handle)
  97. log_.error({'videoId': video_id, 'traceback': traceback.format_exc()})
  98. else:
  99. # 1. 发送结果至done消息队列
  100. try:
  101. msg_content = {
  102. 'title': title,
  103. 'videoId': video_id
  104. }
  105. message_body = json.dumps(msg_content)
  106. producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name']
  107. producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name)
  108. produce_msg = TopicMessage(message_body=message_body)
  109. # 设置消息的Key。
  110. produce_msg.set_message_key(f"videoId{video_id}")
  111. re_msg = producer.publish_message(produce_msg)
  112. log_.info({
  113. 'publish': {
  114. 'status': 'success',
  115. 'messageID': re_msg.message_id,
  116. 'bodyMD5': re_msg.message_body_md5,
  117. 'messageContent': msg_content,
  118. 'executeTime': (time.time() - start_time) * 1000
  119. }
  120. })
  121. except MQExceptionBase as publish_e:
  122. log_.error({
  123. 'errorType': 'publish',
  124. 'status': 'fail',
  125. 'videoId': video_id,
  126. 'exception': publish_e,
  127. 'traceback': traceback.format_exc()
  128. })
  129. if publish_e.type == "TopicNotExist":
  130. sys.exit(1)
  131. # 2. 添加消息handle到ack_message列表
  132. receipt_handle_list.append(msg.receipt_handle)
  133. # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
  134. # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
  135. # log_.info(f"receipt_handle_list: {receipt_handle_list}")
  136. consumer.ack_message(receipt_handle_list)
  137. log_.info({
  138. 'ackMessage': {
  139. 'status': 'success',
  140. 'receiptHandleList': receipt_handle_list,
  141. 'executeTime': (time.time() - start_time0) * 1000
  142. }
  143. })
  144. # except MQExceptionBase as consume_e:
  145. # log_.error({
  146. # 'errorType': 'consume',
  147. # 'status': 'fail',
  148. # 'exception': consume_e,
  149. # 'traceback': traceback.format_exc()
  150. # })
  151. # # Topic中没有消息可消费。
  152. # if consume_e.type == "MessageNotExist":
  153. # print(("No new message! RequestId: %s" % consume_e.req_id))
  154. # continue
  155. # # 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
  156. # elif consume_e.sub_errors:
  157. # for sub_error in consume_e.sub_errors:
  158. # print(("\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % \
  159. # (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])))
  160. #
  161. # log_.error("Consume Message Fail! Exception:%s\n" % consume_e)
  162. # time.sleep(30)
  163. # continue
  164. except Exception as consume_e:
  165. log_.error({
  166. 'errorType': 'consume',
  167. 'status': 'fail',
  168. 'exception': consume_e,
  169. 'traceback': traceback.format_exc()
  170. })
  171. # log_.error("Consume Message Fail! Exception:%s\n" % e)
  172. time.sleep(30)
  173. continue
  174. def title_generate_main():
  175. # log_.info("debug: title_generate_main")
  176. # 初始化client
  177. mq_client = MQClient(
  178. # 设置HTTP协议客户端接入点
  179. host=config_.MQ_CONFIG['ENDPOINT'],
  180. # AccessKey ID,阿里云身份验证标识
  181. access_id=config_.MQ_CONFIG['ACCESS_KEY'],
  182. # AccessKey Secret,阿里云身份验证密钥
  183. access_key=config_.MQ_CONFIG['SECRET_KEY']
  184. )
  185. # Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
  186. # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
  187. instance_id = config_.MQ_CONFIG['INSTANCE_ID']
  188. # 监听消息所属的Topic
  189. consumer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_todo']['topic_name']
  190. # 您在消息队列RocketMQ版控制台创建的Group ID。
  191. group_id = config_.MQ_TOPIC_CONFIG['asr_title_todo']['group_id']
  192. consumer = mq_client.get_consumer(instance_id, consumer_topic_name, group_id)
  193. # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
  194. # 长轮询时间3秒(最多可设置为30秒)。
  195. wait_seconds = 3
  196. # 一次最多消费1条(最多可设置为16条)。
  197. batch = 1
  198. print(("%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" \
  199. % (10 * "=", 10 * "=", consumer_topic_name, group_id, wait_seconds)))
  200. while True:
  201. receipt_handle_list = []
  202. try:
  203. # 长轮询消费消息。
  204. start_time0 = time.time()
  205. recv_msgs = consumer.consume_message(batch, wait_seconds)
  206. for msg in recv_msgs:
  207. start_time = time.time()
  208. receive_info = {
  209. 'messageId': msg.message_id,
  210. 'messageBodyMD5': msg.message_body_md5,
  211. 'messageTag': msg.message_tag,
  212. 'consumedTimes': msg.consumed_times,
  213. 'publishTime': msg.publish_time,
  214. 'body': msg.message_body,
  215. 'nextConsumeTime': msg.next_consume_time,
  216. 'receiptHandle': msg.receipt_handle,
  217. 'properties': msg.properties
  218. }
  219. log_.info(receive_info)
  220. message_body = json.loads(msg.message_body)
  221. video_id = message_body['videoId']
  222. video_path = message_body['videoPath']
  223. # 1. 添加消息handle到ack_message列表
  224. receipt_handle_list.append(msg.receipt_handle)
  225. # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
  226. # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
  227. consumer.ack_message(receipt_handle_list)
  228. log_.info({
  229. 'ackMessage': {
  230. 'status': 'success',
  231. 'receiptHandleList': receipt_handle_list,
  232. 'executeTime': (time.time() - start_time0) * 1000
  233. }
  234. })
  235. # 2. 获取标题
  236. title, generate_filepath = title_generate(video_id=video_id, video_path=video_path)
  237. log_.info({'titleGenerateInitial': {'videoId': video_id, 'title': title}})
  238. # 删除相关文件
  239. for _, filepath in generate_filepath.items():
  240. try:
  241. os.remove(filepath)
  242. except:
  243. continue
  244. if title is None:
  245. continue
  246. elif title[0] in ['"', "'", "′", "“"] and title[-1] in ['"', "'", "′", "”"]:
  247. title = title[1:-1]
  248. log_.info({'titleGenerate': {'videoId': video_id, 'title': title}})
  249. # 3. 发送结果至done消息队列
  250. try:
  251. msg_content = {
  252. 'title': title,
  253. 'videoId': video_id
  254. }
  255. message_body = json.dumps(msg_content)
  256. producer_topic_name = config_.MQ_TOPIC_CONFIG['asr_title_done']['topic_name']
  257. producer = mq_client.get_producer(instance_id=instance_id, topic_name=producer_topic_name)
  258. produce_msg = TopicMessage(message_body=message_body)
  259. # 设置消息的Key。
  260. produce_msg.set_message_key(f"videoId-{video_id}")
  261. re_msg = producer.publish_message(produce_msg)
  262. log_.info({
  263. 'publish': {
  264. 'status': 'success',
  265. 'messageID': re_msg.message_id,
  266. 'bodyMD5': re_msg.message_body_md5,
  267. 'messageContent': msg_content,
  268. 'executeTime': (time.time() - start_time) * 1000
  269. }
  270. })
  271. except Exception as publish_e:
  272. log_.error({
  273. 'errorType': 'publish',
  274. 'status': 'fail',
  275. 'videoId': video_id,
  276. 'exception': publish_e,
  277. 'traceback': traceback.format_exc()
  278. })
  279. # if publish_e.type == "TopicNotExist":
  280. # sys.exit(1)
  281. except Exception as consume_e:
  282. log_.error({
  283. 'errorType': 'consume',
  284. 'status': 'fail',
  285. 'exception': consume_e,
  286. 'traceback': traceback.format_exc()
  287. })
  288. # log_.error("Consume Message Fail! Exception:%s\n" % e)
  289. time.sleep(30)
  290. continue
  291. class FlaskApp(Flask):
  292. def __init__(self, *args, **kwargs):
  293. super(FlaskApp, self).__init__(*args, **kwargs)
  294. self._activate_background_job()
  295. def _activate_background_job(self):
  296. # log_.info("debug: _activate_background_job")
  297. def run_job():
  298. # log_.info("debug: run_job")
  299. title_generate_main()
  300. t1 = threading.Thread(target=run_job)
  301. t1.start()
  302. app = FlaskApp(__name__)
  303. @app.route('/healthcheck')
  304. def health_check():
  305. # log_.info('debug: ok!')
  306. return 'ok!'
  307. if __name__ == '__main__':
  308. # title_generate_main()
  309. app.run()