import os import time import datetime import requests from applications.db import DatabaseConnector def get_status_field_by_process(process: str) -> tuple[str, str]: match process: case "upload": status = "upload_status" update_timestamp = "upload_status_ts" case "understanding": status = "understanding_status" update_timestamp = "understanding_status_ts" case "summary": status = "summary_status" update_timestamp = "summary_status_ts" case "rewrite": status = "rewrite_status" update_timestamp = "rewrite_status_ts" case _: raise ValueError(f"Unexpected task: {process}") return status, update_timestamp def roll_back_lock_tasks( db_client: DatabaseConnector, process: str, max_process_time: int, init_status: int, processing_status: int ) -> int: """ rollback tasks which have been locked for a long time """ status, update_timestamp = get_status_field_by_process(process) now_timestamp = int(time.time()) timestamp_threshold = now_timestamp - max_process_time update_query = f""" update video_content_understanding set {status} = %s where {status} = %s and {update_timestamp} < %s; """ rollback_rows = db_client.save( query=update_query, params=(init_status, processing_status, timestamp_threshold) ) return rollback_rows def download_file(task_id, oss_path): """ 下载视频文件 """ video_url = "https://rescdn.yishihui.com/" + oss_path file_name = "static/{}.mp4".format(task_id) if os.path.exists(file_name): return file_name proxies = {"http": None, "https": None} with open(file_name, "wb") as f: response = requests.get(video_url, proxies=proxies) f.write(response.content) return file_name def generate_summary_prompt(text): prompt = f""" 你是1个优秀的公众号文章写作大师,我对你有以下要求 视频总结:{text} 第一个要求:请仔细阅读以上视频总结,挑选其中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号),这部分内容为段落1。 句子段落之间以悬念承接,可以吸引读者往下读第二句。 第二个要求:在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情,可以加一些emoji表情。注意是点击上面的视频,不是下面的视频。这部分内容为段落2。 你最终输出一段总结内容,将第一段和第二段之间空格一行。不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。 """ return prompt def update_task_queue_status( db_client: DatabaseConnector, task_id: int, process: str, ori_status: int, new_status: int) -> int: # update task queue status status, update_timestamp = get_status_field_by_process(process) update_query = f""" update video_content_understanding set {status} = %s, {update_timestamp} = %s where {status} = %s and id = %s; """ roll_back_rows = db_client.save( query=update_query, params=( new_status, datetime.datetime.now(), ori_status, task_id, ), ) return roll_back_rows def update_video_pool_status( db_client: DatabaseConnector, content_trace_id: str, ori_status: int, new_status: int) -> int: # update publish_single_source_status update_query = f""" update publish_single_video_source set status = %s where content_trace_id = %s and status = %s """ affected_rows = db_client.save( query=update_query, params=(new_status, content_trace_id, ori_status) ) return affected_rows