import os import time import datetime import requests def get_status_field_by_process(process): match process: case "upload": status = "upload_status" update_timestamp = "upload_status_ts" case "understanding": status = "understanding_status" update_timestamp = "understanding_status_ts" case "summary": status = "summary_status" update_timestamp = "summary_status_ts" case "rewrite": status = "rewrite_status" update_timestamp = "rewrite_status_ts" case _: raise ValueError(f"Unexpected task: {process}") return status, update_timestamp def roll_back_lock_tasks( db_client, process, max_process_time, init_status, processing_status ) -> int: """ rollback tasks which have been locked for a long time """ status, update_timestamp = get_status_field_by_process(process) now_timestamp = int(time.time()) timestamp_threshold = now_timestamp - max_process_time update_query = f""" update video_content_understanding set {status} = %s where {status} = %s and {update_timestamp} < %s; """ rollback_rows = db_client.save( query=update_query, params=(init_status, processing_status, timestamp_threshold) ) return rollback_rows def download_file(task_id, oss_path): """ 下载视频文件 """ video_url = "https://rescdn.yishihui.com/" + oss_path file_name = "static/{}.mp4".format(task_id) if os.path.exists(file_name): return file_name proxies = {"http": None, "https": None} with open(file_name, "wb") as f: response = requests.get(video_url, proxies=proxies) f.write(response.content) return file_name def generate_summary_prompt(text): prompt = f""" 你是1个优秀的公众号文章写作大师,我对你有以下要求 视频总结:{text} 第一个要求:请仔细阅读以上视频总结,挑选其中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号),这部分内容为段落1。 句子段落之间以悬念承接,可以吸引读者往下读第二句。 第二个要求:在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情,可以加一些emoji表情。注意是点击上面的视频,不是下面的视频。这部分内容为段落2。 你最终输出一段总结内容,将第一段和第二段之间空格一行。不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。 """ return prompt def update_task_queue_status(db_client, task_id, process, ori_status, new_status): """ 回滚长时间处于处理中的任务 """ status, update_timestamp = get_status_field_by_process(process) update_query = f""" update video_content_understanding set {status} = %s, {update_timestamp} = %s where {status} = %s and id = %s; """ roll_back_rows = db_client.save( query=update_query, params=( new_status, datetime.datetime.now(), ori_status, task_id, ), ) return roll_back_rows def update_video_pool_status(db_client, content_trace_id, ori_status, new_status): """ 回滚长时间处于处理中的任务 """ update_sql = f""" update publish_single_video_source set status = %s where content_trace_id = %s and status = %s; """ # update publish_single_source_status update_query = f""" update publish_single_video_source set status = %s where content_trace_id = %s and status = %s """ affected_rows = db_client.save( query=update_query, params=(new_status, content_trace_id, ori_status) ) return affected_rows