Selaa lähdekoodia

feat:添加监控任务枚举

zhaohaipeng 1 kuukausi sitten
vanhempi
commit
1c90156f37

+ 7 - 6
client/CrawlerClient.py

@@ -4,6 +4,7 @@ from typing import List, Dict, Optional, Any
 import requests
 
 from model.automation_provide_job import DouYinSearchConfig, ChannelSearchAndDetailDTO
+from util.automation_provide_util import logger
 
 
 # ==================== 配置与枚举定义 ====================
@@ -55,7 +56,7 @@ class CrawlerClient:
         sort_type = sortType if sortType else "综合排序"
         publish_time = publishTime if publishTime else "不限"
         duration_val = duration if duration else "不限"
-        account_id = account_id if account_id else 98
+        account_id = account_id if account_id else "98"
 
         # 构建请求参数
         param_json = {
@@ -64,7 +65,7 @@ class CrawlerClient:
             "sort_type": sort_type,
             "publish_time": publish_time,
             "duration": duration_val,
-            "cursor": cursor if cursor else "",
+            "cursor": cursor if cursor else "0",
             "account_id": account_id
         }
 
@@ -85,8 +86,8 @@ class CrawlerClient:
             # 执行关键词搜索
             search_result_json = self.keyword_search(search_config)
         except Exception as e:
+            logger.error(f"关键词 {search_config.search_content} 搜索异常", exc_info=e)
             pass
-            # logger.error(f"关键词 {search_config.search_content} 搜索异常", exc_info=e)
 
         # 解析搜索结果列表
         search_result = search_result_json.get("data", [])
@@ -168,17 +169,17 @@ class CrawlerClient:
         # logger.info(f"invoke crawler api request. url:{url}, request:{params}")
 
         # 发送POST请求
-        response_str = requests.post(url, json.dumps(params)).text
+        response_str = requests.post(url, json.dumps(params, ensure_ascii=False)).text
         response_str = response_str if response_str else "{}"
 
         # 解析响应
         try:
             resp_json = json.loads(response_str)
         except json.JSONDecodeError:
-            # logger.error(f"响应JSON解析失败: {response_str}")
+            logger.error(f"响应JSON解析失败: {response_str}")
             resp_json = {}
 
-        # logger.info(f"invoke crawler api result. respJson: {resp_json}")
+        logger.info(f"invoke crawler api result. respJson: {resp_json}")
 
         # 检查响应码
         if resp_json.get("code") != "0" and resp_json.get("code") != 0:

+ 1 - 0
enums/automation_job.py

@@ -12,6 +12,7 @@ class AutomationJobCronInfo(Enum):
     channel_image_search_video_all_cate_top = ("识图直接供给_全品类(每日Top)", "channel_image_search_video_all_cate", "top", 9)
     video_decode_accurate_text_top = ("视频解构精准文本(每日Top)", "video_decode_accurate_text", "top", 9)
     keywords_top = ("视频解构关键词(每日Top)", "keywords", "top", 9)
+    account_tencent_huxuan = ("腾讯互选平台账号供给", "account_tencent_huxuan", 9)
 
     def __init__(self, task_name, crawler_mode, video_source, task_start_hour):
         self.task_name = task_name

+ 67 - 18
script/dou_yin_keywords_search.py

@@ -1,4 +1,6 @@
 import json
+import os.path
+import time
 from typing import List, Any, Optional
 
 from simpleeval import simple_eval
@@ -9,9 +11,9 @@ from util.automation_provide_util import AutoProvideUtil
 
 crawler_client = CrawlerClient()
 
-preFilterThreshold = 3
+preFilterThreshold = 2
 
-result_txt_file = '/Users/zhao/Desktop/tzld/文档/分析文档/关键词分析_20260204_1.txt'
+result_txt_file = '/Users/zhao/Desktop/tzld/文档/分析文档/关键词供给任务分析/关键词分析_20260226.txt'
 
 
 def write_result_file(content, mode='a+'):
@@ -21,8 +23,7 @@ def write_result_file(content, mode='a+'):
 
 
 def log_info_print_title():
-    write_result_file(
-        "视频ID,品类,关键词,爬取计划,结果,原因,搜索使用的账号ID,排序方式,站外视频ID,站外账号ID,过滤结果,分享量,点赞量,分享量/点赞量,视频时长(秒),观众年龄50+占比,观众年龄50+TGI,过滤规则表达式", 'w')
+    write_result_file("品类,标题,videoid,解析的关键词,抖音搜索首页结果初判是否通过,搜索视频链接,热点宝账号链接,点赞量,分享量,分享比点赞,时长,账号50以上占比,账号50以上TGI", 'w')
 
 
 def log_info_print(log_json: dict[str, Any], account_id: Optional[str] = None):
@@ -32,17 +33,18 @@ def log_info_print(log_json: dict[str, Any], account_id: Optional[str] = None):
     if 'modelValueConfig' in log_json and isinstance(log_json['modelValueConfig'], dict):
         log_json['modelValueConfig'] = json.dumps(log_json['modelValueConfig'], ensure_ascii=False)
 
-    video_id = log_json["videoId"]
-    keywords = log_json['keywords']
+    video_id = log_json.get("videoId", "-1")
+    keywords = log_json.get('keywords', "")
+    pq_title = log_json.get("pqTitle", "")
     crawler_plan_id = log_json.get("crawlerPlanId", "")
     result = log_json.get("result", False)
     reason = log_json.get("reason", "")
-    merge_cate2 = log_json['mergeSecondLevelCate']
+    merge_cate2 = log_json.get('mergeSecondLevelCate')
     sort_type = json.loads(log_json.get("modelValueConfig", "{}")).get("sortType")
     ext_json = json.loads(log_json.get("ext", "{}"))
     account_id = account_id if account_id else 0
     if not ext_json:
-        write_result_file(f"{video_id},{merge_cate2},{keywords},'{crawler_plan_id},'{result},{reason},{account_id},{sort_type}")
+        write_result_file(f"{merge_cate2},{pq_title},{video_id}.{keywords},False")
         return
     for channel_content_id in ext_json:
         if channel_content_id in ['mergeCate2Map']:
@@ -64,8 +66,10 @@ def log_info_print(log_json: dict[str, Any], account_id: Optional[str] = None):
         elif "fanPortrait" in channel_ext_info:
             channel_account_id = channel_ext_info["fanPortrait"].get("channelAccountId")
 
-        write_result_file(f"{video_id},{merge_cate2},{keywords},'{crawler_plan_id},'{result},{reason},{account_id},{sort_type},'{channel_content_id},{channel_account_id},{filter_result},"
-                          f"{share_cnt},{like_cnt},{share_div_link},{video_duration_s},{audience_age_50_rate},{audience_age_50_tgi},{rule_str}")
+        write_result_file(
+            f"{merge_cate2},{pq_title},{video_id},{keywords},{filter_result},{channel_content_id},{channel_account_id},"
+            f"{like_cnt},{share_cnt},{share_div_link},{video_duration_s},{audience_age_50_rate},{audience_age_50_tgi}"
+        )
 
 
 def keywords_search(keywords: str, sort_type: str, account: Optional[str] = None) -> List[ChannelSearchAndDetailDTO]:
@@ -83,7 +87,7 @@ def eval_expr(expr: str, context: dict) -> bool:
 
 
 def keywords_search_and_filter(keywords: str, sort_type: str, account_id: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]) -> dict[str, Any]:
-    need_copy_keys = ["videoId", "accountFilters", "contentFilters", "mergeSecondLevelCate", "keywords"]
+    need_copy_keys = ["keywords"]
     result_json = {}
     for key in need_copy_keys:
         result_json[key] = log_json.get(key)
@@ -95,7 +99,14 @@ def keywords_search_and_filter(keywords: str, sort_type: str, account_id: str, l
 
     rule_str = AutoProvideUtil.parse_filter_config_to_rule_str(filters)
 
-    channel_search_and_detail_dtos = keywords_search(keywords, sort_type, account_id)
+    channel_search_and_detail_dtos = []
+
+    for i in range(5):
+        channel_search_and_detail_dtos = keywords_search(keywords, sort_type, account_id)
+        if channel_search_and_detail_dtos:
+            break
+        time.sleep(5)
+
     if not channel_search_and_detail_dtos:
         result_json["result"] = False
         result_json['reason'] = '关键词搜索结果为空'
@@ -134,11 +145,16 @@ def keywords_search_and_filter(keywords: str, sort_type: str, account_id: str, l
             cnt += 1
             continue
 
-        result = eval_expr(expr=rule_str, context=rule_context)
+        result = False
+        try:
+            result = eval_expr(expr=rule_str, context=rule_context)
+        except Exception as e:
+            print(rule_str, rule_context)
+
         ext_json['result'] = result
         if result:
             cnt += 1
-    if cnt <= preFilterThreshold:
+    if cnt < preFilterThreshold:
         result_json["result"] = False
         result_json['reason'] = '该关键词首页满足条件的视频数不足'
 
@@ -174,7 +190,7 @@ def keywords_login_like_sort(keywords: str, log_json: dict[str, Any], filters: L
 
 def handle_log_json(log_json: dict[str, Any]):
     # 登录,综合排序
-    # log_info_print(log_json)
+    log_info_print(log_json)
 
     keywords = log_json['keywords']
     account_filters = json.loads(log_json.get("accountFilters", "[]"))
@@ -185,11 +201,14 @@ def handle_log_json(log_json: dict[str, Any]):
 
     # keywords_not_login_comprehensive_sort(keywords, log_json, search_filter_config_tems)
     # keywords_login_comprehensive_sort(keywords, log_json, search_filter_config_tems)
-    keywords_login_like_sort(keywords, log_json, search_filter_config_tems)
+    # keywords_login_like_sort(keywords, log_json, search_filter_config_tems)
 
 
 def main():
-    file_path = '/Users/zhao/Desktop/keywords.json'
+    if os.path.exists(result_txt_file):
+        os.remove(result_txt_file)
+
+    file_path = '/Users/zhao/Desktop/20260226_keywords_job.json'
     log_list = []
     with open(file_path, "r", encoding="utf-8") as f:
         line = f.readline()
@@ -199,9 +218,39 @@ def main():
 
     log_info_print_title()
     for log in log_list:
-        if "“揭秘开国 领导人" == log['keywords']:
+        if "历史名人" == log.get("mergeSecondLevelCate", ""):
             handle_log_json(log)
 
+    search_filter_config_tems = []
+    account_filters = json.loads(log_list[0].get("accountFilters", "[]"))
+    content_filters = json.loads(log_list[0].get("contentFilters", '[]'))
+    for filter_item in account_filters + content_filters:
+        search_filter_config_tems.append(SearchFilterConfigItem(**filter_item))
+
+    keywords_list = [
+        "大字报式",
+        "情感化",
+        "事迹",
+        "伟人",
+        "毛主席",
+        "历史人物",
+        "民族自豪感",
+        "轶事",
+        "爱国主义",
+        "800万",
+        "物证",
+        "刘永坦院士",
+        "生活细节",
+        "历史人物评价",
+        "先驱先烈",
+        "科研国士",
+        "开国将帅"
+    ]
+    account_id = '771431206'
+    for keyword in keywords_list:
+        log_json = keywords_search_and_filter(keywords=keyword, sort_type="综合排序", account_id=account_id, log_json={"keywords": keyword}, filters=search_filter_config_tems)
+        log_info_print(log_json, account_id=account_id)
+
 
 if __name__ == '__main__':
     main()

+ 5 - 35
util/automation_provide_util.py

@@ -204,6 +204,11 @@ class AutoProvideUtil:
 
     @classmethod
     def extract_fans_portrait_feature(cls, context: Dict[str, float], fans_portrait: Dict[str, Any]) -> None:
+
+        # 设置默认值
+        context["audienceAge50Rate"] = 0
+        context["audienceAge50TGI"] = 0
+
         """提取粉丝画像特征"""
         if not fans_portrait:
             return
@@ -233,38 +238,3 @@ class AutoProvideUtil:
             return float(numerator) / float(denominator)
         except ZeroDivisionError:
             return 0.0
-
-
-# ==================== 使用示例 ====================
-if __name__ == "__main__":
-    # 1. 测试解析Apollo配置
-    test_config = [
-        {"key": "点赞量", "operator": "大于", "value": "1000"},
-        {"key": "视频时长(秒)", "operator": "小于", "value": "60"}
-    ]
-    params = AutoProvideUtil.parse_apollo_config(test_config)
-    print("解析后的过滤参数:")
-    for param in params:
-        print(f"类型: {param.condition_type}, 操作符: {param.operator}, 值: {param.data}")
-
-    # 2. 测试转换为规则字符串
-    rule_str = AutoProvideUtil.parse_filter_map_to_rule_str(test_config)
-    print(f"\n规则字符串: {rule_str}")
-
-    # 3. 测试提取特征
-    test_content_detail = {
-        "data": {
-            "like_count": 2000.0,
-            "share_count": 500.0,
-            "video_url_list": [{"video_duration": 45.0}]
-        }
-    }
-    test_fans_portrait = {
-        "data": {
-            "年龄": {"50-": {"percentage": "25%", "preference": "120%"}}
-        }
-    }
-    features = AutoProvideUtil.extract_content_rule_feature(test_content_detail, test_fans_portrait)
-    print("\n提取的特征:")
-    for k, v in features.items():
-        print(f"{k}: {v}")