Parcourir la source

上线——youlegaoxiaoxiaoshipin——代码

罗俊辉 il y a 1 an
Parent
commit
2ec15490cd

+ 7 - 3
common/aliyun_log.py

@@ -25,13 +25,17 @@ class AliyunLogger:
 
 
     # 写入阿里云日志
     # 写入阿里云日志
     @staticmethod
     @staticmethod
-    def logging(code, platform, mode, env, message, data, account=None):
+    def logging(
+        code, platform, mode, env, message, data=None, trace_id=None, account=None
+    ):
         """
         """
         写入阿里云日志
         写入阿里云日志
         测试库: https://sls.console.aliyun.com/lognext/project/crawler-log-dev/logsearch/crawler-log-dev
         测试库: https://sls.console.aliyun.com/lognext/project/crawler-log-dev/logsearch/crawler-log-dev
         正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod
         正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod
         """
         """
         # 设置阿里云日志服务的访问信息
         # 设置阿里云日志服务的访问信息
+        if data is None:
+            data = {}
         accessKeyId = "LTAIWYUujJAm7CbH"
         accessKeyId = "LTAIWYUujJAm7CbH"
         accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
         accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
         if env == "dev":
         if env == "dev":
@@ -57,11 +61,12 @@ class AliyunLogger:
         """
         """
         message = message.replace("\r", " ").replace("\n", " ")
         message = message.replace("\r", " ").replace("\n", " ")
         contents = [
         contents = [
+            (f"TraceId", str(trace_id)),
             (f"code", str(code)),
             (f"code", str(code)),
             (f"platform", str(platform)),
             (f"platform", str(platform)),
             (f"mode", str(mode)),
             (f"mode", str(mode)),
             (f"message", str(message)),
             (f"message", str(message)),
-            (f"data", json.dumps(data, ensure_ascii=False)),
+            (f"data", json.dumps(data, ensure_ascii=False) if data else ""),
             (f"account", str(account)),
             (f"account", str(account)),
             ("timestamp", str(int(time.time()))),
             ("timestamp", str(int(time.time()))),
         ]
         ]
@@ -78,4 +83,3 @@ class AliyunLogger:
             compress=False,
             compress=False,
         )
         )
         client.put_logs(request)
         client.put_logs(request)
-        print("日志写入成功")

+ 44 - 28
common/pipeline.py

@@ -18,27 +18,28 @@ class PiaoQuanPipeline:
         publish_time_stamp = self.item["publish_time_stamp"]
         publish_time_stamp = self.item["publish_time_stamp"]
         update_time_stamp = self.item["update_time_stamp"]
         update_time_stamp = self.item["update_time_stamp"]
         if (
         if (
-                int(time.time()) - publish_time_stamp
-                > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
+            int(time.time()) - publish_time_stamp
+            > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
         ) and (
         ) and (
-                int(time.time()) - update_time_stamp
-                > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
+            int(time.time()) - update_time_stamp
+            > 3600 * 24 * int(self.rule_dict.get("period", {}).get("max", 1000))
         ):
         ):
             AliyunLogger.logging(
             AliyunLogger.logging(
-                code="2001",
+                code="2004",
                 platform=self.platform,
                 platform=self.platform,
                 mode=self.mode,
                 mode=self.mode,
-                data="",
                 env=self.env,
                 env=self.env,
-                message="发布时间超过{}天".format(int(self.rule_dict.get('period', {}).get('max', 1000)))
+                message="发布时间超过{}天".format(
+                    int(self.rule_dict.get("period", {}).get("max", 1000))
+                ),
             )
             )
             return False
             return False
         return True
         return True
 
 
     # 视频标题是否满足需求
     # 视频标题是否满足需求
     def title_flag(self):
     def title_flag(self):
-        title = self.item['video_title']
-        cleaned_title = re.sub(r'[^\w]', ' ', title)
+        title = self.item["video_title"]
+        cleaned_title = re.sub(r"[^\w]", " ", title)
         # 敏感词
         # 敏感词
         # 获取敏感词列表
         # 获取敏感词列表
         sensitive_words = []
         sensitive_words = []
@@ -49,7 +50,7 @@ class PiaoQuanPipeline:
                 mode=self.mode,
                 mode=self.mode,
                 env=self.env,
                 env=self.env,
                 message="标题中包含敏感词",
                 message="标题中包含敏感词",
-                data=self.item
+                data=self.item,
             )
             )
             return False
             return False
         return True
         return True
@@ -60,34 +61,54 @@ class PiaoQuanPipeline:
         if self.item.get("publish_time_stamp"):
         if self.item.get("publish_time_stamp"):
             self.item["publish_time"] = self.item["publish_time_stamp"] * 1000
             self.item["publish_time"] = self.item["publish_time_stamp"] * 1000
         # 格式化 video_dict:period
         # 格式化 video_dict:period
-        if self.item.get("publish_time") and self.item.get("period", "noperiod") == "noperiod":
-            self.item["period"] = int((int(time.time() * 1000) - self.item["publish_time"]) / (3600 * 24 * 1000))
+        if (
+            self.item.get("publish_time")
+            and self.item.get("period", "noperiod") == "noperiod"
+        ):
+            self.item["period"] = int(
+                (int(time.time() * 1000) - self.item["publish_time"])
+                / (3600 * 24 * 1000)
+            )
         # 格式化 rule_dict 最大值取值为 0 的问题
         # 格式化 rule_dict 最大值取值为 0 的问题
         for key in self.item:
         for key in self.item:
             if self.rule_dict.get(key):
             if self.rule_dict.get(key):
-                max_value = int(self.rule_dict[key]["max"]) if int(self.rule_dict[key]["max"]) > 0 else 999999999999999
+                max_value = (
+                    int(self.rule_dict[key]["max"])
+                    if int(self.rule_dict[key]["max"]) > 0
+                    else 999999999999999
+                )
                 if key == "peroid":
                 if key == "peroid":
                     flag = 0 <= int(self.item[key]) <= max_value
                     flag = 0 <= int(self.item[key]) <= max_value
                     AliyunLogger.logging(
                     AliyunLogger.logging(
-                        code="2003",
+                        code="2004",
                         platform=self.platform,
                         platform=self.platform,
                         mode=self.mode,
                         mode=self.mode,
                         env=self.env,
                         env=self.env,
                         data=self.item,
                         data=self.item,
-                        message='{}: 0 <= {} <= {}, {}'.format(key, self.item[key], max_value, flag)
+                        message="{}: 0 <= {} <= {}, {}".format(
+                            key, self.item[key], max_value, flag
+                        ),
                     )
                     )
 
 
                     if not flag:
                     if not flag:
                         return flag
                         return flag
                 else:
                 else:
-                    flag = int(self.rule_dict[key]["min"]) <= int(self.item[key] <= max_value)
+                    flag = int(self.rule_dict[key]["min"]) <= int(
+                        self.item[key] <= max_value
+                    )
                     AliyunLogger.logging(
                     AliyunLogger.logging(
-                        code="2003",
+                        code="2004",
                         platform=self.platform,
                         platform=self.platform,
                         mode=self.mode,
                         mode=self.mode,
                         env=self.env,
                         env=self.env,
                         data=self.item,
                         data=self.item,
-                        message='{}: {} <= {} <= {}, {}'.format(key, self.rule_dict[key]["min"], self.item[key], max_value, flag)
+                        message="{}: {} <= {} <= {}, {}".format(
+                            key,
+                            self.rule_dict[key]["min"],
+                            self.item[key],
+                            max_value,
+                            flag,
+                        ),
                     )
                     )
                     if not flag:
                     if not flag:
                         return flag
                         return flag
@@ -98,14 +119,10 @@ class PiaoQuanPipeline:
     # 按照某个具体平台来去重
     # 按照某个具体平台来去重
     def repeat_video(self):
     def repeat_video(self):
         # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
         # sql = f""" select * from crawler_video where platform="公众号" and out_video_id="{video_id}"; """
-        out_id = self.item['out_video_id']
+        out_id = self.item["out_video_id"]
         sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
         sql = f""" select * from crawler_video where platform = "{self.platform}" and out_video_id="{out_id}"; """
         repeat_video = MysqlHelper.get_values(
         repeat_video = MysqlHelper.get_values(
-            log_type=self.mode,
-            crawler=self.platform,
-            env=self.env,
-            sql=sql,
-            action=''
+            log_type=self.mode, crawler=self.platform, env=self.env, sql=sql, action=""
         )
         )
         if repeat_video:
         if repeat_video:
             AliyunLogger.logging(
             AliyunLogger.logging(
@@ -114,7 +131,7 @@ class PiaoQuanPipeline:
                 mode=self.mode,
                 mode=self.mode,
                 env=self.env,
                 env=self.env,
                 message="重复的视频",
                 message="重复的视频",
-                data=self.item
+                data=self.item,
             )
             )
             return False
             return False
         return True
         return True
@@ -133,12 +150,11 @@ class PiaoQuanPipeline:
             # 记录相关日志
             # 记录相关日志
             return False
             return False
         AliyunLogger.logging(
         AliyunLogger.logging(
-            code="2000",
+            code="1002",
             platform=self.platform,
             platform=self.platform,
             mode=self.mode,
             mode=self.mode,
             env=self.env,
             env=self.env,
             data=self.item,
             data=self.item,
-            message="该视频符合抓取条件,准备发往 ETL"
+            message="该视频符合抓取条件,准备发往 ETL",
         )
         )
         return True
         return True
-

+ 84 - 37
common/public.py

@@ -22,7 +22,7 @@ from common.scheduling_db import MysqlHelper
 # from scheduling_db import MysqlHelper
 # from scheduling_db import MysqlHelper
 
 
 
 
-def get_user_from_mysql(log_type, crawler, source, env, action=''):
+def get_user_from_mysql(log_type, crawler, source, env, action=""):
     sql = f"select * from crawler_user_v3 where source='{source}' and mode='{log_type}'"
     sql = f"select * from crawler_user_v3 where source='{source}' and mode='{log_type}'"
     results = MysqlHelper.get_values(log_type, crawler, sql, env, action=action)
     results = MysqlHelper.get_values(log_type, crawler, sql, env, action=action)
     if results:
     if results:
@@ -56,7 +56,9 @@ def title_like(log_type, crawler, platform, title, env):
     :param env: 环境
     :param env: 环境
     :return: 相似度>=80%,返回 True;反之,返回 False
     :return: 相似度>=80%,返回 True;反之,返回 False
     """
     """
-    select_sql = f""" select video_title from crawler_video where platform="{platform}" """
+    select_sql = (
+        f""" select video_title from crawler_video where platform="{platform}" """
+    )
     video_list = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="")
     video_list = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="")
     # print(video_list)
     # print(video_list)
     if len(video_list) == 0:
     if len(video_list) == 0:
@@ -71,7 +73,7 @@ def title_like(log_type, crawler, platform, title, env):
     return False
     return False
 
 
 
 
-def get_config_from_mysql(log_type, source, env, text, action=''):
+def get_config_from_mysql(log_type, source, env, text, action=""):
     select_sql = f"""select * from crawler_config where source="{source}" """
     select_sql = f"""select * from crawler_config where source="{source}" """
     contents = MysqlHelper.get_values(log_type, source, select_sql, env, action=action)
     contents = MysqlHelper.get_values(log_type, source, select_sql, env, action=action)
     title_list = []
     title_list = []
@@ -79,7 +81,7 @@ def get_config_from_mysql(log_type, source, env, text, action=''):
     emoji_list = []
     emoji_list = []
     search_word_list = []
     search_word_list = []
     for content in contents:
     for content in contents:
-        config = content['config']
+        config = content["config"]
         config_dict = eval(config)
         config_dict = eval(config)
         for k, v in config_dict.items():
         for k, v in config_dict.items():
             if k == "title":
             if k == "title":
@@ -110,8 +112,10 @@ def get_config_from_mysql(log_type, source, env, text, action=''):
 
 
 def get_rule_from_mysql(task_id, log_type, crawler, env):
 def get_rule_from_mysql(task_id, log_type, crawler, env):
     select_rule_sql = f"""select rule from crawler_task_v3 where id={task_id}"""
     select_rule_sql = f"""select rule from crawler_task_v3 where id={task_id}"""
-    rule_list = MysqlHelper.get_values(log_type, crawler, select_rule_sql, env, action="")
-    return json.loads(rule_list[0]['rule'])
+    rule_list = MysqlHelper.get_values(
+        log_type, crawler, select_rule_sql, env, action=""
+    )
+    return json.loads(rule_list[0]["rule"])
 
 
 
 
 def random_title(log_type, crawler, env, text):
 def random_title(log_type, crawler, env, text):
@@ -120,34 +124,28 @@ def random_title(log_type, crawler, env, text):
 
 
 
 
 def task_fun(task_str):
 def task_fun(task_str):
-    task_str = task_str.replace("'[", '[').replace("]'", ']')
+    task_str = task_str.replace("'[", "[").replace("]'", "]")
     task_dict = dict(eval(task_str))
     task_dict = dict(eval(task_str))
-    rule = task_dict['rule']
-    task_dict['rule'] = dict()
+    rule = task_dict["rule"]
+    task_dict["rule"] = dict()
     for item in rule:
     for item in rule:
         for k, val in item.items():
         for k, val in item.items():
-            task_dict['rule'][k] = val
-    rule_dict = task_dict['rule']
-    task_dict = {
-        "task_dict": task_dict,
-        "rule_dict": rule_dict
-    }
+            task_dict["rule"][k] = val
+    rule_dict = task_dict["rule"]
+    task_dict = {"task_dict": task_dict, "rule_dict": rule_dict}
     return task_dict
     return task_dict
 
 
 
 
 def task_fun_mq(task_str):
 def task_fun_mq(task_str):
-    task_str = task_str.replace('"[', '[').replace(']"', ']').replace('\\', '')
+    task_str = task_str.replace('"[', "[").replace(']"', "]").replace("\\", "")
     task_dict = dict(eval(task_str))
     task_dict = dict(eval(task_str))
-    rule = task_dict['rule']
-    task_dict['rule'] = dict()
+    rule = task_dict["rule"]
+    task_dict["rule"] = dict()
     for item in rule:
     for item in rule:
         for k, val in item.items():
         for k, val in item.items():
-            task_dict['rule'][k] = val
-    rule_dict = task_dict['rule']
-    task_dict = {
-        "task_dict": task_dict,
-        "rule_dict": rule_dict
-    }
+            task_dict["rule"][k] = val
+    rule_dict = task_dict["rule"]
+    task_dict = {"task_dict": task_dict, "rule_dict": rule_dict}
     return task_dict
     return task_dict
 
 
 
 
@@ -159,7 +157,7 @@ def get_consumer(topic_name, group_id):
         # AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey。
         # AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey。
         "LTAI4G7puhXtLyHzHQpD6H7A",
         "LTAI4G7puhXtLyHzHQpD6H7A",
         # AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey。
         # AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey。
-        "nEbq3xWNQd1qLpdy2u71qFweHkZjSG"
+        "nEbq3xWNQd1qLpdy2u71qFweHkZjSG",
     )
     )
     # 消息所属的Topic,在云消息队列 RocketMQ 版控制台创建。
     # 消息所属的Topic,在云消息队列 RocketMQ 版控制台创建。
     # topic_name = "${TOPIC}"
     # topic_name = "${TOPIC}"
@@ -181,7 +179,9 @@ def ack_message(log_type, crawler, recv_msgs, consumer):
     try:
     try:
         receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
         receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
         consumer.ack_message(receipt_handle_list)
         consumer.ack_message(receipt_handle_list)
-        Common.logger(log_type, crawler).info(f"Ack {len(receipt_handle_list)} Message Succeed.\n")
+        Common.logger(log_type, crawler).info(
+            f"Ack {len(receipt_handle_list)} Message Succeed.\n"
+        )
     except MQExceptionBase as err:
     except MQExceptionBase as err:
         Common.logger(log_type, crawler).info(f"Ack Message Fail! Exception:{err}\n")
         Common.logger(log_type, crawler).info(f"Ack Message Fail! Exception:{err}\n")
 
 
@@ -200,7 +200,9 @@ def download_rule(log_type, crawler, video_dict, rule_dict):
         video_dict["publish_time"] = video_dict["publish_time_stamp"] * 1000
         video_dict["publish_time"] = video_dict["publish_time_stamp"] * 1000
     # 格式化 video_dict:period
     # 格式化 video_dict:period
     if "period" not in video_dict.keys() and "publish_time" in video_dict.keys():
     if "period" not in video_dict.keys() and "publish_time" in video_dict.keys():
-        video_dict["period"] = int((int(time.time() * 1000) - video_dict["publish_time"]) / (3600 * 24 * 1000))
+        video_dict["period"] = int(
+            (int(time.time() * 1000) - video_dict["publish_time"]) / (3600 * 24 * 1000)
+        )
     # 格式化 rule_dict 最大值取值为 0 的问题
     # 格式化 rule_dict 最大值取值为 0 的问题
     for rule_value in rule_dict.values():
     for rule_value in rule_dict.values():
         if rule_value["max"] == 0:
         if rule_value["max"] == 0:
@@ -215,11 +217,15 @@ def download_rule(log_type, crawler, video_dict, rule_dict):
             if video_key == rule_key == "period":
             if video_key == rule_key == "period":
                 result = 0 <= int(video_value) <= int(rule_value["max"])
                 result = 0 <= int(video_value) <= int(rule_value["max"])
                 Common.logger(log_type, crawler).info(
                 Common.logger(log_type, crawler).info(
-                    f'{video_key}: 0 <= {video_value} <= {rule_value["min"]}, {result}')
+                    f'{video_key}: 0 <= {video_value} <= {rule_value["min"]}, {result}'
+                )
             elif video_key == rule_key:
             elif video_key == rule_key:
-                result = int(rule_value["min"]) <= int(video_value) <= int(rule_value["max"])
+                result = (
+                    int(rule_value["min"]) <= int(video_value) <= int(rule_value["max"])
+                )
                 Common.logger(log_type, crawler).info(
                 Common.logger(log_type, crawler).info(
-                    f'{video_key}: {rule_value["min"]} <= {video_value} <= {rule_value["max"]},{result}')
+                    f'{video_key}: {rule_value["min"]} <= {video_value} <= {rule_value["max"]},{result}'
+                )
             else:
             else:
                 result = True
                 result = True
 
 
@@ -243,20 +249,37 @@ def download_rule_v2(log_type, crawler, video_dict, rule_dict):
     if video_dict.get("publish_time_stamp"):
     if video_dict.get("publish_time_stamp"):
         video_dict["publish_time"] = video_dict["publish_time_stamp"] * 1000
         video_dict["publish_time"] = video_dict["publish_time_stamp"] * 1000
     # 格式化 video_dict:period
     # 格式化 video_dict:period
-    if video_dict.get("publish_time") and video_dict.get("period", "noperiod") == "noperiod":
-        video_dict["period"] = int((int(time.time() * 1000) - video_dict["publish_time"]) / (3600 * 24 * 1000))
+    if (
+        video_dict.get("publish_time")
+        and video_dict.get("period", "noperiod") == "noperiod"
+    ):
+        video_dict["period"] = int(
+            (int(time.time() * 1000) - video_dict["publish_time"]) / (3600 * 24 * 1000)
+        )
     # 格式化 rule_dict 最大值取值为 0 的问题
     # 格式化 rule_dict 最大值取值为 0 的问题
     for key in video_dict:
     for key in video_dict:
         if rule_dict.get(key):
         if rule_dict.get(key):
-            max_value = int(rule_dict[key]["max"]) if int(rule_dict[key]["max"]) > 0 else 999999999999999
+            max_value = (
+                int(rule_dict[key]["max"])
+                if int(rule_dict[key]["max"]) > 0
+                else 999999999999999
+            )
             if key == "peroid":
             if key == "peroid":
                 flag = 0 <= int(video_dict[key]) <= max_value
                 flag = 0 <= int(video_dict[key]) <= max_value
-                Common.logger(log_type, crawler).info('{}: 0 <= {} <= {}, {}'.format(key, video_dict[key], max_value, flag))
+                Common.logger(log_type, crawler).info(
+                    "{}: 0 <= {} <= {}, {}".format(
+                        key, video_dict[key], max_value, flag
+                    )
+                )
                 if not flag:
                 if not flag:
                     return flag
                     return flag
             else:
             else:
                 flag = int(rule_dict[key]["min"]) <= int(video_dict[key] <= max_value)
                 flag = int(rule_dict[key]["min"]) <= int(video_dict[key] <= max_value)
-                Common.logger(log_type, crawler).info('{}: {} <= {} <= {}, {}'.format(key, rule_dict[key]["min"],video_dict[key], max_value, flag))
+                Common.logger(log_type, crawler).info(
+                    "{}: {} <= {} <= {}, {}".format(
+                        key, rule_dict[key]["min"], video_dict[key], max_value, flag
+                    )
+                )
                 if not flag:
                 if not flag:
                     return flag
                     return flag
         else:
         else:
@@ -325,7 +348,7 @@ def task_unbind(log_type, crawler, taskid, uids, env):
     params = {
     params = {
         "taskId": taskid,  # 任务 ID
         "taskId": taskid,  # 任务 ID
         "uids": uids,  # 解绑用户uid(多个英文逗号隔开),例如"3222121,213231"
         "uids": uids,  # 解绑用户uid(多个英文逗号隔开),例如"3222121,213231"
-        "operator": ""  # 默认 system
+        "operator": "",  # 默认 system
     }
     }
     Common.logger(log_type, crawler).info(f"url:{url}")
     Common.logger(log_type, crawler).info(f"url:{url}")
     Common.logging(log_type, crawler, env, f"url:{url}")
     Common.logging(log_type, crawler, env, f"url:{url}")
@@ -340,6 +363,30 @@ def task_unbind(log_type, crawler, taskid, uids, env):
         return response.text
         return response.text
 
 
 
 
+def clean_title(strings):
+    return (
+        strings.strip()
+        .replace("\n", "")
+        .replace("/", "")
+        .replace("\r", "")
+        .replace("#", "")
+        .replace(".", "。")
+        .replace("\\", "")
+        .replace("&NBSP", "")
+        .replace(":", "")
+        .replace("*", "")
+        .replace("?", "")
+        .replace("?", "")
+        .replace('"', "")
+        .replace("<", "")
+        .replace(">", "")
+        .replace("|", "")
+        .replace(" ", "")
+        .replace('"', "")
+        .replace("'", "")
+    )
+
+
 if __name__ == "__main__":
 if __name__ == "__main__":
-    print(get_title_score("recommend", "kuaishou", "16QspO", "0usaDk", '像梦一场'))
+    print(get_title_score("recommend", "kuaishou", "16QspO", "0usaDk", "像梦一场"))
     pass
     pass

+ 138 - 0
youlegaoxiaoxiaoshipin/youlegaoxiaoxiaoshipin_main/run_ylgxxsp_recommend.py

@@ -0,0 +1,138 @@
+# -*- coding: utf-8 -*-
+# @Author: luojunhui
+# @Time: 2023/10/23
+import argparse
+import random
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+sys.path.append(os.getcwd())
+from common.public import task_fun_mq, get_consumer, ack_message
+from common.scheduling_db import MysqlHelper
+from common.aliyun_log import AliyunLogger
+from youlegaoxiaoxiaoshipin.youlegaoxiaoxiaoshipin_recommend import YLGXXSPScheduling
+
+
+def main(my_platform, mode, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    AliyunLogger.logging(
+        code="1000",
+        platform=my_platform,
+        mode=mode,
+        env=env,
+        message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+        f"WaitSeconds:{wait_seconds}\n"
+        f"TopicName:{topic_name}\n"
+        f"MQConsumer:{group_id}",
+    )
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=my_platform,
+                    mode=mode,
+                    env=env,
+                    message=f"Receive\n"
+                    f"MessageId:{msg.message_id}\n"
+                    f"MessageBodyMD5:{msg.message_body_md5}\n"
+                    f"MessageTag:{msg.message_tag}\n"
+                    f"ConsumedTimes:{msg.consumed_times}\n"
+                    f"PublishTime:{msg.publish_time}\n"
+                    f"Body:{msg.message_body}\n"
+                    f"NextConsumeTime:{msg.next_consume_time}\n"
+                    f"ReceiptHandle:{msg.receipt_handle}\n"
+                    f"Properties:{msg.properties}",
+                )
+                # ack_mq_message
+                ack_message(
+                    log_type=mode,
+                    crawler=my_platform,
+                    recv_msgs=recv_msgs,
+                    consumer=consumer,
+                )
+                # 解析 task_dict
+                task_dict = task_fun_mq(msg.message_body)["task_dict"]
+                AliyunLogger.logging(
+                    "1000", my_platform, mode, env, f"调度任务:{task_dict}"
+                )
+                # 解析 rule_dict
+                rule_dict = task_fun_mq(msg.message_body)["rule_dict"]
+                AliyunLogger.logging(
+                    "1000", my_platform, mode, env, f"抓取规则:{rule_dict}\n"
+                )
+                # 解析 user_list
+                task_id = task_dict["id"]
+                select_user_sql = (
+                    f"""select * from crawler_user_v3 where task_id={task_id}"""
+                )
+                user_list = MysqlHelper.get_values(
+                    mode, my_platform, select_user_sql, env, action=""
+                )
+                our_uid_list = []
+                for user in user_list:
+                    our_uid_list.append(user["uid"])
+                our_uid = random.choice(our_uid_list)
+                YLGXXSP = YLGXXSPScheduling(
+                    platform=my_platform,
+                    mode=mode,
+                    env=env,
+                    rule_dict=rule_dict,
+                    our_uid=our_uid,
+                )
+                for i in range(10):
+                    YLGXXSP.get_videoList(page_id=i + 1)
+
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=my_platform,
+                    mode=mode,
+                    env=env,
+                    message="成功抓取完一轮",
+                )
+
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=my_platform,
+                    mode=mode,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n",
+                )
+                continue
+            AliyunLogger.logging(
+                code="1000",
+                platform=my_platform,
+                mode=mode,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n",
+            )
+            time.sleep(2)
+            continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument("--mode", type=str)  ## 添加参数,注明参数类型
+    parser.add_argument("--platform")  ## 添加参数
+    parser.add_argument("--topic_name")  ## 添加参数
+    parser.add_argument("--group_id")  ## 添加参数
+    parser.add_argument("--env")  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    main(
+        my_platform=args.platform,
+        mode=args.mode,
+        topic_name=args.topic_name,
+        group_id=args.group_id,
+        env=args.env,
+    )

+ 1 - 0
youlegaoxiaoxiaoshipin/youlegaoxiaoxiaoshipin_recommend/__init__.py

@@ -0,0 +1 @@
+from .youlegaoxiaoxiaoshipin_scheduling import YLGXXSPScheduling

+ 82 - 90
youlegaoxiaoxiaoshipin/youlegaoxiaoxiaoshipin_recommend/youlegaoxiaoxiaoshipin_scheduling.py

@@ -3,48 +3,23 @@
 # @Time: 2023/10/23
 # @Time: 2023/10/23
 import json
 import json
 import os
 import os
-import random
 import sys
 import sys
 import time
 import time
+import random
 import requests
 import requests
 
 
-
 sys.path.append(os.getcwd())
 sys.path.append(os.getcwd())
 
 
 from common.mq import MQ
 from common.mq import MQ
 from common.aliyun_log import AliyunLogger
 from common.aliyun_log import AliyunLogger
 from common.pipeline import PiaoQuanPipeline
 from common.pipeline import PiaoQuanPipeline
-
-
-def clean_title(strings):
-    return (
-        strings.strip()
-        .replace("\n", "")
-        .replace("/", "")
-        .replace("\r", "")
-        .replace("#", "")
-        .replace(".", "。")
-        .replace("\\", "")
-        .replace("&NBSP", "")
-        .replace(":", "")
-        .replace("*", "")
-        .replace("?", "")
-        .replace("?", "")
-        .replace('"', "")
-        .replace("<", "")
-        .replace(">", "")
-        .replace("|", "")
-        .replace(" ", "")
-        .replace('"', "")
-        .replace("'", "")
-    )
+from common.public import clean_title
 
 
 
 
 class YLGXXSPScheduling:
 class YLGXXSPScheduling:
-    def __init__(self, log_type, crawler, rule_dict, env, our_uid):
-        self.platform = "youlegaoxiaoxiaoshipin"
-        self.log_type = log_type
-        self.crawler = crawler
+    def __init__(self, platform, mode, rule_dict, env, our_uid):
+        self.platform = platform
+        self.mode = mode
         self.rule_dict = rule_dict
         self.rule_dict = rule_dict
         self.env = env
         self.env = env
         self.our_uid = our_uid
         self.our_uid = our_uid
@@ -53,7 +28,15 @@ class YLGXXSPScheduling:
 
 
     # 获取视频id_list
     # 获取视频id_list
     def get_videoList(self, page_id):
     def get_videoList(self, page_id):
-        # time.sleep(random.randint(5, 10))
+        time.sleep(random.randint(5, 10))
+        AliyunLogger.logging(
+            code="1000",
+            platform=self.platform,
+            mode=self.mode,
+            env=self.env,
+            data={},
+            message="开始抓取第{}页".format(page_id),
+        )
         headers = {
         headers = {
             "Host": "cpu.baidu.com",
             "Host": "cpu.baidu.com",
             "xweb_xhr": "1",
             "xweb_xhr": "1",
@@ -72,67 +55,76 @@ class YLGXXSPScheduling:
             "pageSize": "10",
             "pageSize": "10",
         }
         }
         response = requests.post(
         response = requests.post(
-            "https://cpu.baidu.com/1033/a16a67fe",
-            headers=headers,
-            data=data
+            "https://cpu.baidu.com/1033/a16a67fe", headers=headers, data=data
         )
         )
         result = response.json()
         result = response.json()
         if "data" not in result or response.status_code != 200:
         if "data" not in result or response.status_code != 200:
-            # Common.logger(self.log_type, self.crawler).info(
-            #     f"get_videoList:{response.text}\n"
-            # )
-            # Common.logging(
-            #     self.log_type,
-            #     self.crawler,
-            #     self.env,
-            #     f"get_videoList:{response.text}\n",
-            # )
+            AliyunLogger.logging(
+                code="2000",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                data={},
+                message="抓取第{}页失败,无数据".format(page_id),
+            )
             return
             return
-        elif len(result["data"]['result']) == 0:
-            # Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n")
-            # Common.logging(self.log_type, self.crawler, self.env, f"没有更多数据啦~\n")
+        elif len(result["data"]["result"]) == 0:
+            AliyunLogger.logging(
+                code="2001",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                data={},
+                message="抓取d到第{}页, 没有更多数据了".format(page_id),
+            )
             return
             return
         else:
         else:
-            data_list = result['data']["result"]
-            for video_obj in data_list:
-                print(1)
-                AliyunLogger.logging(
-                    code="1001",
-                    platform=self.crawler,
-                    mode=self.log_type,
-                    env=self.env,
-                    data={},
-                    message="成功扫描到一条视频"
-                )
-                self.process_video_obj(video_obj)
-                # try:
-                #     AliyunLogger.logging(
-                #         code="1001",
-                #         platform=self.crawler,
-                #         mode=self.log_type,
-                #         env=self.env,
-                #         data="",
-                #         message="成功扫描到一条视频"
-                #     )
-                #     self.process_video_obj(video_obj)
-                # except Exception as e:
-                #     Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n")
-                #     Common.logging(
-                #         self.log_type, self.crawler, self.env, f"抓取单条视频异常:{e}\n"
-                #     )
+            data_list = result["data"]["result"]
+            for index, video_obj in enumerate(data_list):
+                try:
+                    AliyunLogger.logging(
+                        code="1001",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        data={},
+                        message="成功扫描到一条视频, 该视频位于第{}页{}条".format(page_id, index + 1),
+                    )
+                    self.process_video_obj(video_obj)
+                except Exception as e:
+                    AliyunLogger.logging(
+                        code="3000",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        data=video_obj,
+                        message="抓取单条视频异常, 报错原因是: {}, 该视频位于第{}页{}条".format(
+                            e, page_id, index + 1
+                        ),
+                    )
+            AliyunLogger.logging(
+                code="1000",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                data={},
+                message="完成抓取第{}页".format(page_id),
+            )
 
 
     def process_video_obj(self, video_obj):
     def process_video_obj(self, video_obj):
         video_id = video_obj.get("data", {}).get("id", 0)
         video_id = video_obj.get("data", {}).get("id", 0)
         video_title = clean_title(video_obj.get("data", {}).get("title", "no title"))
         video_title = clean_title(video_obj.get("data", {}).get("title", "no title"))
-        video_time = video_obj['data']['duration']
-        publish_time_stamp = int(video_obj['data']['clusterTime'])
-        publish_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp))
-        user_name = video_obj['data']['source']
+        video_time = video_obj["data"]["duration"]
+        publish_time_stamp = int(video_obj["data"]["clusterTime"])
+        publish_time_str = time.strftime(
+            "%Y-%m-%d %H:%M:%S", time.localtime(publish_time_stamp)
+        )
+        user_name = video_obj["data"]["source"]
         video_dict = {
         video_dict = {
             "video_title": video_title,
             "video_title": video_title,
             "video_id": video_id,
             "video_id": video_id,
             "duration": video_time,
             "duration": video_time,
-            "play_cnt": int(video_obj['data'].get("playbackCount", 0)),
+            "play_cnt": int(video_obj["data"].get("playbackCount", 0)),
             "like_cnt": int(video_obj.get("likeCount", 0)),
             "like_cnt": int(video_obj.get("likeCount", 0)),
             "comment_cnt": int(video_obj.get("commentCounts", 0)),
             "comment_cnt": int(video_obj.get("commentCounts", 0)),
             "share_cnt": 0,
             "share_cnt": 0,
@@ -146,25 +138,25 @@ class YLGXXSPScheduling:
             "session": f"youlegaoxiaoxiaoshipin-{int(time.time())}",
             "session": f"youlegaoxiaoxiaoshipin-{int(time.time())}",
         }
         }
         flag = PiaoQuanPipeline(
         flag = PiaoQuanPipeline(
-            platform=self.crawler,
-            mode=self.log_type,
+            platform=self.platform,
+            mode=self.mode,
             rule_dict=self.rule_dict,
             rule_dict=self.rule_dict,
             env=self.env,
             env=self.env,
-            item=video_dict
+            item=video_dict,
         )
         )
         if flag:
         if flag:
-            video_dict["out_user_id"] = video_obj['data'].get("ownerId", 0)
-            video_dict["platform"] = self.crawler
-            video_dict["strategy"] = self.log_type
+            video_dict["out_user_id"] = video_obj["data"].get("ownerId", 0)
+            video_dict["platform"] = self.platform
+            video_dict["strategy"] = self.mode
             video_dict["out_video_id"] = str(video_dict["video_id"])
             video_dict["out_video_id"] = str(video_dict["video_id"])
             video_dict["width"] = video_dict["video_width"]
             video_dict["width"] = video_dict["video_width"]
             video_dict["height"] = video_dict["video_height"]
             video_dict["height"] = video_dict["video_height"]
             video_dict["crawler_rule"] = json.dumps(self.rule_dict)
             video_dict["crawler_rule"] = json.dumps(self.rule_dict)
             video_dict["user_id"] = self.our_uid
             video_dict["user_id"] = self.our_uid
             video_dict["publish_time"] = video_dict["publish_time_str"]
             video_dict["publish_time"] = video_dict["publish_time_str"]
-            video_dict["video_url"] = "http:" + video_obj['data']['url']
-            video_dict["avatar_url"] = "http:" + video_obj['data']['avatar']
-            video_dict["cover_url"] = "http:" + video_obj['data']['thumbUrl']
+            video_dict["video_url"] = video_obj["data"]["url"]
+            video_dict["avatar_url"] = "http:" + video_obj["data"]["avatar"]
+            video_dict["cover_url"] = "http:" + video_obj["data"]["thumbUrl"]
             print(json.dumps(video_dict, ensure_ascii=False, indent=4))
             print(json.dumps(video_dict, ensure_ascii=False, indent=4))
             self.download_count += 1
             self.download_count += 1
             # self.mq.send_msg(video_dict)
             # self.mq.send_msg(video_dict)
@@ -172,11 +164,11 @@ class YLGXXSPScheduling:
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
     ZL = YLGXXSPScheduling(
     ZL = YLGXXSPScheduling(
-        log_type="recommend",
-        crawler="ylgxxsp",
+        platform="ylgxxsp",
+        mode="recommend",
         rule_dict={},
         rule_dict={},
         our_uid="luojunhuihaoshuai",
         our_uid="luojunhuihaoshuai",
-        env="prod"
+        env="prod",
     )
     )
     for i in range(5):
     for i in range(5):
         ZL.get_videoList(page_id=i + 1)
         ZL.get_videoList(page_id=i + 1)