瀏覽代碼

关键词搜索 功能

zhangyong 1 年之前
父節點
當前提交
bf83bdb07a
共有 9 個文件被更改,包括 362 次插入50 次删除
  1. 21 17
      common/common_log.py
  2. 103 4
      common/feishu_form.py
  3. 23 1
      common/redis.py
  4. 1 1
      common/sql_help.py
  5. 109 0
      data_channel/dy_keyword.py
  6. 1 1
      job_dd_sph.py
  7. 21 0
      job_keyword.py
  8. 16 0
      redis_clear_mark.py
  9. 67 26
      video_rewriting/video_processor.py

+ 21 - 17
common/common_log.py

@@ -27,22 +27,26 @@ class Common:
     # 使用 logger 模块生成日志
     @staticmethod
     def logger(log_type):
-        """
-        使用 logger 模块生成日志
-        """
-        # 日志路径
-        log_dir = f"./logs/{log_type}/"
-        log_path = os.getcwd() + os.sep + log_dir
-        if not os.path.isdir(log_path):
-            os.makedirs(log_path)
-        # 日志文件名
-        log_name = f"{log_type}-{datetime.now().date().strftime('%Y-%m-%d')}.log"
-
-        # 日志不打印到控制台
-        logger.remove(handler_id=None)
-        # 初始化日志
-        logger.add(os.path.join(log_dir, log_name), level="INFO", rotation="00:00", retention="10 days", enqueue=True)
-
-        return logger
+        try:
+            """
+            使用 logger 模块生成日志
+            """
+            # 日志路径
+            log_dir = f"./logs/{log_type}/"
+            log_path = os.getcwd() + os.sep + log_dir
+            if not os.path.isdir(log_path):
+                os.makedirs(log_path)
+            # 日志文件名
+            log_name = f"{log_type}-{datetime.now().date().strftime('%Y-%m-%d')}.log"
+
+            # 日志不打印到控制台
+            logger.remove(handler_id=None)
+            # 初始化日志
+            logger.add(os.path.join(log_dir, log_name), level="INFO", rotation="00:00", retention="10 days", enqueue=True)
+
+            return logger
+        except Exception as e:
+            Common.logger("aly-logger").log(f"阿里云日志上报异常{e}")
+            return None
 
 

+ 103 - 4
common/feishu_form.py

@@ -4,7 +4,8 @@ import os
 import random
 import sys
 import datetime
-
+import itertools
+from collections import defaultdict
 from common.sql_help import sqlCollect
 
 sys.path.append(os.getcwd())
@@ -53,6 +54,107 @@ class Material():
             return user_data_list
         except:
             return user_data_list
+    """
+    list 重新排序
+    """
+    @classmethod
+    def sort_keyword_data(cls, data):
+        data = [json.loads(item) for item in data]
+        # 根据 keyword_name 进行分组
+        groups = defaultdict(list)
+        for item in data:
+            groups[item['keyword_name']].append(item)
+        # 获取所有唯一的 keyword_name 并创建一个轮流顺序
+        unique_names = list(groups.keys())
+        priority_names = [name for name in unique_names if name in unique_names]
+        remaining_names = [name for name in unique_names if name not in priority_names]
+
+        # 将 priority_names 列表进行轮流排序
+        result = []
+        max_length = max(len(groups[name]) for name in priority_names)
+
+        for i in range(max_length):
+            for name in priority_names:
+                if i < len(groups[name]):
+                    result.append(groups[name][i])
+        # 将未列入优先顺序的其余数据添加到结果中
+        for name in remaining_names:
+            result.extend(groups[name])
+
+        # 将结果转回 JSON 字符串列表
+        sorted_list = [json.dumps(item, ensure_ascii=False) for item in result]
+        return sorted_list
+
+    """
+    获取搜索任务
+    """
+    @classmethod
+    def get_keyword_data(cls, feishu_id, feishu_sheet):
+        data = Feishu.get_values_batch(feishu_id, feishu_sheet)
+        processed_list = []
+        for row in data[1:]:
+            try:
+                channel_id = row[1]
+                channel_url = str(row[2])
+                piaoquan_id = row[3]
+                number = row[4]
+                video_share = row[5]
+                video_ending = row[6]
+                voice = row[7]
+                crop_tool = row[8]
+                gg_duration = row[9]
+                title = row[10]
+                if channel_url == None or channel_url == "" or len(channel_url) == 0:
+                    continue
+                first_category = row[12]  # 一级品类
+                secondary_category = row[13]  # 二级品类
+                def count_items(item, separator):
+                    if item and item not in {'None', ''}:
+                        return len(item.split(separator))
+                    return 0
+                video_id_total = count_items(str(channel_url), ',')
+                title_total = count_items(str(title), '/')
+                video_ending_total = count_items(str(video_ending), ',')
+                values = [channel_id, video_id_total, piaoquan_id, video_share, video_ending_total, crop_tool, gg_duration, title_total, first_category]
+                filtered_values = [str(value) for value in values if value is not None and value != "None"]
+                task_mark = "_".join(map(str, filtered_values))
+                keyword_sort = row[14]  # 排序条件
+                keyword_time = row[15]  # 发布时间
+                keyword_duration = row[16]  # 视频时长
+                keyword_name = row[17]  # 负责人
+                keyword_sort_list = keyword_sort.split(',')
+                keyword_duration_list = keyword_duration.split(',')
+                keyword_time_list = keyword_time.split(',')
+                combinations = list(itertools.product(keyword_sort_list, keyword_time_list, keyword_duration_list))
+                if ',' in channel_url:
+                    channel_url = channel_url.split(',')
+                else:
+                    channel_url = [channel_url]
+                for user in channel_url:
+                    for combo in combinations:
+                        number_dict = {
+                            "task_mark": task_mark,
+                            "channel_id": channel_id,
+                            "channel_url": user,
+                            "piaoquan_id": piaoquan_id,
+                            "number": number,
+                            "title": title,
+                            "video_share": video_share,
+                            "video_ending": video_ending,
+                            "crop_total": crop_tool,
+                            "gg_duration_total": gg_duration,
+                            "voice": voice,
+                            "first_category": first_category,  # 一级品类
+                            "secondary_category": secondary_category,  # 二级品类
+                            "combo": combo,  # 搜索条件
+                            "keyword_name": keyword_name  # 品类负责人
+                        }
+                        processed_list.append(json.dumps(number_dict, ensure_ascii=False))
+                processed_list = cls.sort_keyword_data(processed_list)
+                return processed_list
+            except:
+                processed_list = cls.sort_keyword_data(processed_list)
+                return processed_list
 
     """
     获取对应负责人任务明细
@@ -189,6 +291,3 @@ class Material():
             if channel_mask == channel:
                 return cookie
 
-
-
-

+ 23 - 1
common/redis.py

@@ -44,7 +44,29 @@ def get_data(name, data):
             return None
 
         client.rpush(task, *data)
-    return client.lpop(task)
+    ret = client.lpop(task)
+    if name == 'pl-gjc':
+        client.rpush(task, ret)
+    return ret
+
+"""抖音搜索计数插入"""
+def increment_key():
+    helper = SyncRedisHelper()
+    client = helper.get_client()
+    client.incrby('dyss-count', 1)
+
+"""抖音搜索计数获取"""
+def get_first_value_with_prefix():
+    helper = SyncRedisHelper()
+    client = helper.get_client()
+    value = client.get("dyss-count")
+    return int(value) if value is not None else 1
+
+def del_dyss_redis_key():
+    helper = SyncRedisHelper()
+    client = helper.get_client()
+    client.delete("dyss-count")
+
 
 def get_redis_video_data(video_id):
     lock = f"video_lock:{video_id}"

+ 1 - 1
common/sql_help.py

@@ -182,7 +182,7 @@ class sqlCollect():
         """
         获取视频号单点内容
         """
-        sql = f"""select video_id,title,author_id,author_name,cover_url,video_url,video_duration,from_user_id,from_user_name,from_group_id,from_group_name,source,wx_msg, is_encrypted, decode_key  from dandian_content where from_user_name = %s and has_used = 0"""
+        sql = f"""select video_id,title,author_id,author_name,cover_url,video_url,video_duration,from_user_id,from_user_name,from_group_id,from_group_name,source,wx_msg, is_encrypted, decode_key  from dandian_content where from_user_name = %s and has_used = 0 limit 2"""
         data = AigcMysqlHelper.get_values(sql, (url))
         return data
 

+ 109 - 0
data_channel/dy_keyword.py

@@ -0,0 +1,109 @@
+import requests
+import json
+
+from common import Common, AliyunLogger
+from common.sql_help import sqlCollect
+
+
+class DyKeyword:
+    @classmethod
+    def get_key_word(cls, keyword, task_mark, mark, channel_id, name, task):
+        combo = task['combo']
+        content_type = combo[0]
+        publish_time = combo[1]
+        duration = combo[2]
+        share_count_rule = 0
+        special = 0
+        short_duration_rule = 0
+        url = "http://8.217.190.241:8888/crawler/dou_yin/keyword"
+        list = []
+        payload = json.dumps({
+            "keyword": keyword,
+            "content_type": "视频",
+            "sort_type": content_type,
+            "publish_time": publish_time,
+            "duration": duration,
+            "cursor": ""
+        })
+        headers = {
+            'Content-Type': 'application/json'
+        }
+        if " 不限" == publish_time:
+            share_count_rule = 200
+            special = 0.15
+            short_duration_rule = 30
+        elif "一天内" == publish_time:
+            share_count_rule = 0
+            special = 0.10
+            short_duration_rule = 25
+        elif "一周内" == publish_time:
+            share_count_rule = 100
+            special = 0.15
+            short_duration_rule = 25
+        elif "半年内" == publish_time:
+            share_count_rule = 200
+            special = 0.15
+            short_duration_rule = 25
+        try:
+            response = requests.request("POST", url, headers=headers, data=payload)
+            response = response.json()
+            code = response['code']
+            if code != 0:
+                Common.logger("dy-key-word").info(f"抖音搜索词数据获取失败,接口为/dou_yin/keyword\n")
+                return list
+            data = response['data']['data']
+            for i in range(len(data)):
+
+                video_id = data[i].get('aweme_id')  # 文章id
+                status = sqlCollect.is_used(task_mark, video_id, mark, channel_id)
+                video_uri = data[i].get('video', {}).get('play_addr', {}).get('uri')
+                ratio = f'{data[i].get("video", {}).get("height")}p'
+                video_url = f'https://www.iesdouyin.com/aweme/v1/play/?video_id={video_uri}&ratio={ratio}&line=0'  # 视频链接
+                old_title = data[i].get('desc', "").strip().replace("\n", "") \
+                    .replace("/", "").replace("\\", "").replace("\r", "") \
+                    .replace(":", "").replace("*", "").replace("?", "") \
+                    .replace("?", "").replace('"', "").replace("<", "") \
+                    .replace(">", "").replace("|", "").replace(" ", "") \
+                    .replace("&NBSP", "").replace(".", "。").replace(" ", "") \
+                    .replace("'", "").replace("#", "").replace("Merge", "")
+                digg_count = int(data[i].get('statistics').get('digg_count'))  # 点赞
+                share_count = int(data[i].get('statistics').get('share_count'))  # 转发
+                duration = data[i].get('duration')
+                duration = duration / 1000
+                log_data = f"user:{keyword},,video_id:{video_id},,video_url:{video_url},,original_title:{old_title},,share_count:{share_count},,digg_count:{digg_count},,duration:{duration}"
+                AliyunLogger.logging(channel_id, name, keyword, video_id, "扫描到一条视频", "2001", log_data)
+                Common.logger("dy-key-word").info(
+                    f"扫描:{task_mark},搜索词:{keyword},视频id{video_id} ,分享:{share_count},点赞{digg_count}")
+                if status:
+                    AliyunLogger.logging(channel_id, name, keyword, video_id, "该视频已改造过", "2001", log_data)
+                    continue
+                video_percent = '%.2f' % (int(share_count) / int(digg_count))
+                if int(share_count) < share_count_rule:
+                    AliyunLogger.logging(channel_id, name, keyword, video_id, f"不符合规则:分享小于{share_count_rule}", "2003", log_data)
+                    Common.logger("dy-ls").info(
+                        f"不符合规则:{task_mark},用户主页id:{keyword},视频id{video_id} ,分享:{share_count},点赞{digg_count} ,时长:{int(duration)} ")
+                    continue
+                if float(video_percent) < special:
+                    AliyunLogger.logging(channel_id, name, keyword, video_id, f"不符合规则:分享/点赞小于{special}", "2003", log_data)
+                    Common.logger("dy-ls").info(
+                        f"不符合规则:{task_mark},用户主页id:{keyword},视频id{video_id} ,分享:{share_count},点赞{digg_count} ,时长:{int(duration)} ")
+                    continue
+                if int(duration) < short_duration_rule or int(duration) > 720:
+                    AliyunLogger.logging(channel_id, name, keyword, video_id, f"不符合规则:时长不符合规则大于720秒/小于{short_duration_rule}秒", "2003", log_data)
+                    Common.logger("dy-ls").info(
+                        f"不符合规则:{task_mark},用户主页id:{keyword},视频id{video_id} ,分享:{share_count},点赞{digg_count} ,时长:{int(duration)} ")
+                    continue
+                AliyunLogger.logging(channel_id, name, keyword, video_id, "符合规则等待改造", "2004", log_data)
+                cover_url = data[i].get('video').get('cover').get('url_list')[0]  # 视频封面
+                all_data = {"video_id": video_id, "cover": cover_url, "video_url": video_url, "rule": video_percent,
+                            "old_title": old_title}
+                list.append(all_data)
+            return list
+        except Exception as exc:
+            Common.logger("dy-key-word").info(f"抖音搜索词{keyword}获取失败{exc}\n")
+            return list
+
+
+if __name__ == '__main__':
+
+    DyKeyword.get_key_word('keyword', 'sort_type', 'publish_time', 'duration', 'task_mark', 'mark', 'channel_id', 'name')

+ 1 - 1
job_dd_sph.py

@@ -11,7 +11,7 @@ def video_task_start():
             print("开始执行任务")
             mark = VideoProcessor.main(data)
             print(f"返回用户名: {mark}")
-            time.sleep(360 if mark else 360)  # 根据 mark 是否为空设置延迟
+            time.sleep(120 if mark else 120)  # 根据 mark 是否为空设置延迟
         except Exception as e:
             print("处理任务时出现异常:", e)
             time.sleep(10)

+ 21 - 0
job_keyword.py

@@ -0,0 +1,21 @@
+
+import time
+from common import Material
+
+from video_rewriting.video_processor import VideoProcessor
+def video_task_start():
+    """抖/快视频搜索处理视频任务,返回用户名并根据结果决定延迟时间"""
+    data = Material.feishu_list()[14]
+    while True:
+        try:
+            print("开始执行任务")
+            mark = VideoProcessor.main(data)
+            print(f"返回用户名: {mark}")
+            time.sleep(10 if mark else 120)  # 根据 mark 是否为空设置延迟
+        except Exception as e:
+            print("处理任务时出现异常:", e)
+            time.sleep(10)
+            continue
+if __name__ == '__main__':
+    video_task_start()
+

+ 16 - 0
redis_clear_mark.py

@@ -0,0 +1,16 @@
+import schedule
+import time
+from common.redis import del_dyss_redis_key
+
+
+def redis_bot():
+    print("开始执行")
+    del_dyss_redis_key()
+    print("执行结束")
+
+
+# 每天零点10分
+schedule.every().day.at("00:10").do(redis_bot)
+while True:
+    schedule.run_pending()
+    time.sleep(1)

+ 67 - 26
video_rewriting/video_processor.py

@@ -3,18 +3,16 @@ import json
 import os
 import random
 import re
-import sys
-import threading
 import time
 from datetime import datetime
-import concurrent.futures
 
-from common.redis import get_data, get_redis_video_data
+from common.redis import get_data, get_first_value_with_prefix, increment_key
 from common.tts_help import TTS
 from common import Material, Feishu, Common, Oss, AliyunLogger
 from common.ffmpeg import FFmpeg
 from common.gpt4o_help import GPT4o
 from data_channel.douyin import DY
+from data_channel.dy_keyword import DyKeyword
 from data_channel.dy_ls import DYLS
 from data_channel.ks_ls import KSLS
 from data_channel.kuaishou import KS
@@ -97,7 +95,7 @@ class VideoProcessor:
         video_path_url = cls.create_folders(mark, str(task_mark))
         zm = Material.get_pzsrt_data("summary", "500Oe0", video_share)
         Common.logger(mark).info(f"{name}的{task_mark}下{channel_id}的用户:{url}开始获取视频")
-        data_list = cls.get_data_list(channel_id, task_mark, url, number, mark, feishu_id, cookie_sheet, name)
+        data_list = cls.get_data_list(channel_id, task_mark, url, number, mark, feishu_id, cookie_sheet, name, task)
         if not data_list:
             AliyunLogger.logging(channel_id, name, url, "", "无改造视频", "4000")
             Common.logger(mark).info(f"{name}的{task_mark}下{channel_id}的视频ID{url} 已经改造过了")
@@ -219,32 +217,62 @@ class VideoProcessor:
                     Feishu.finish_bot(text,
                                       "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
                                       "【 机器改造通知 】")
-                    log_data = f"user:{url},,video_id:{v_id},,video_url:{video_url},,ai_title:{new_title},,voice:{voice}"
-                    AliyunLogger.logging(channel_id, name, url, v_id, "视频改造成功", "1000", log_data, str(code))
                     if channel_id == "快手历史" or channel_id == "抖音历史" or channel_id == "视频号历史":
                         explain = "历史爆款"
                     else:
                         explain = "新供给"
                     current_time = datetime.now()
                     formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
-                    values = [
-                        [
-                            name,
-                            task_mark,
-                            channel_id,
-                            url,
-                            str(v_id),
-                            piaoquan_id,
-                            old_title,
-                            title if title in ["原标题", "AI标题"] else "",
-                            new_title,
-                            str(code),
-                            formatted_time,
-                            str(rule),
-                            explain,
-                            voice
+                    if name == "品类关键词搜索":
+                        first_category = task["first_category"]
+                        secondary_category = task["secondary_category"]
+                        keyword_principal = task["keyword_name"]
+                        log_data = f"user:{url},,video_id:{v_id},,video_url:{video_url},,ai_title:{new_title},,voice:{voice},,first_category:{first_category},,secondary_category:{secondary_category},,keyword_principal:{keyword_principal}"
+                        AliyunLogger.logging(channel_id, name, url, v_id, "视频改造成功", "1000", log_data, str(code))
+                        values = [
+                            [
+                                name,
+                                task_mark,
+                                channel_id,
+                                url,
+                                str(v_id),
+                                piaoquan_id,
+                                old_title,
+                                title if title in ["原标题", "AI标题"] else "",
+                                new_title,
+                                str(code),
+                                formatted_time,
+                                str(rule),
+                                explain,
+                                voice,
+                                first_category,
+                                secondary_category,
+                                keyword_principal,
+                                pq_url
+                            ]
+                        ]
+                    else:
+                        log_data = f"user:{url},,video_id:{v_id},,video_url:{video_url},,ai_title:{new_title},,voice:{voice}"
+                        AliyunLogger.logging(channel_id, name, url, v_id, "视频改造成功", "1000", log_data, str(code))
+
+                        values = [
+                            [
+                                name,
+                                task_mark,
+                                channel_id,
+                                url,
+                                str(v_id),
+                                piaoquan_id,
+                                old_title,
+                                title if title in ["原标题", "AI标题"] else "",
+                                new_title,
+                                str(code),
+                                formatted_time,
+                                str(rule),
+                                explain,
+                                voice
+                            ]
                         ]
-                    ]
                     if values:
                         if name == "王雪珂":
                             sheet = "vfhHwj"
@@ -274,6 +302,8 @@ class VideoProcessor:
                             sheet = "ptgCXW"
                         elif name == "快手品类账号":
                             sheet = "ibjoMx"
+                        elif name == "品类关键词搜索":
+                            sheet = "Tgpikc"
                         Feishu.insert_columns("ILb4sa0LahddRktnRipcu2vQnLb", sheet, "ROWS", 1, 2)
                         time.sleep(0.5)
                         Feishu.update_values("ILb4sa0LahddRktnRipcu2vQnLb", sheet, "A2:Z2", values)
@@ -284,7 +314,7 @@ class VideoProcessor:
                 continue
 
     @classmethod
-    def get_data_list(cls, channel_id, task_mark, url, number, mark, feishu_id, cookie_sheet, name):
+    def get_data_list(cls, channel_id, task_mark, url, number, mark, feishu_id, cookie_sheet, name, task):
         """
         根据渠道ID获取数据列表
         """
@@ -306,6 +336,8 @@ class VideoProcessor:
             return KSLS.get_ksls_list(task_mark, url, number, mark, channel_id, name)
         elif channel_id == "视频号历史":
             return SPHLS.get_sphls_data(task_mark, url, number, mark, channel_id, name)
+        elif channel_id == '抖音搜索':
+            return DyKeyword.get_key_word(url, task_mark, mark, channel_id, name, task)
 
     @classmethod
     def generate_title(cls, video, title):
@@ -499,13 +531,22 @@ class VideoProcessor:
         feishu_id = data["feishu_id"]
         feishu_sheet = data["feishu_sheet"]
         cookie_sheet = data["cookie_sheet"]
-        task_data = Material.get_task_data(feishu_id, feishu_sheet)
+        if mark == 'pl-gjc':
+            task_data = Material.get_keyword_data(feishu_id, feishu_sheet)
+        else:
+            task_data = Material.get_task_data(feishu_id, feishu_sheet)
         try:
             data = get_data(mark, task_data)
             if not data:
                 Common.logger("redis").error(f"{mark}任务开始新的一轮\n")
                 return
             task = json.loads(data)
+            if mark == 'pl-gjc' and task['channel_id'] == '抖音搜索':
+                count = get_first_value_with_prefix()
+                increment_key()
+                if int(count) == 100:
+                    Common.logger(mark).log(f"抖音搜索接口今日已经上限")
+                    return "抖音搜索上限"
             VideoProcessor.process_task(task, mark, name, feishu_id, cookie_sheet)
             return mark
         except Exception as e: