Browse Source

海豚祝福添加新日志

罗俊辉 1 năm trước cách đây
mục cha
commit
f5eb3b16be

+ 72 - 27
haitunzhufu/haitunzhufu_main/run_htzf_recommend.py

@@ -13,6 +13,7 @@ from common.common import Common
 from common.public import get_consumer, ack_message, task_fun_mq, get_rule_from_mysql
 from common.scheduling_db import MysqlHelper
 from haitunzhufu.haitunzhufu_recommend import HTZFScheduling
+from common import AliyunLogger
 
 
 class HTZFMain:
@@ -28,10 +29,20 @@ class HTZFMain:
                                               f'WaitSeconds:{wait_seconds}\n'
                                               f'TopicName:{topic_name}\n'
                                               f'MQConsumer:{group_id}')
-        Common.logging(log_type, crawler, env, f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
-                                               f'WaitSeconds:{wait_seconds}\n'
-                                               f'TopicName:{topic_name}\n'
-                                               f'MQConsumer:{group_id}')
+        # Common.logging(log_type, crawler, env, f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+        #                                        f'WaitSeconds:{wait_seconds}\n'
+        #                                        f'TopicName:{topic_name}\n'
+        #                                        f'MQConsumer:{group_id}')
+        AliyunLogger.logging(
+            code="1000",
+            platform=crawler,
+            mode=log_type,
+            env=env,
+            message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                    f'WaitSeconds:{wait_seconds}\n'
+                    f'TopicName:{topic_name}\n'
+                    f'MQConsumer:{group_id}'
+        )
         while True:
             try:
                 # 长轮询消费消息。
@@ -47,16 +58,32 @@ class HTZFMain:
                                                           f"NextConsumeTime:{msg.next_consume_time}\n"
                                                           f"ReceiptHandle:{msg.receipt_handle}\n"
                                                           f"Properties:{msg.properties}")
-                    Common.logging(log_type, crawler, env, f"Receive\n"
-                                                           f"MessageId:{msg.message_id}\n"
-                                                           f"MessageBodyMD5:{msg.message_body_md5}\n"
-                                                           f"MessageTag:{msg.message_tag}\n"
-                                                           f"ConsumedTimes:{msg.consumed_times}\n"
-                                                           f"PublishTime:{msg.publish_time}\n"
-                                                           f"Body:{msg.message_body}\n"
-                                                           f"NextConsumeTime:{msg.next_consume_time}\n"
-                                                           f"ReceiptHandle:{msg.receipt_handle}\n"
-                                                           f"Properties:{msg.properties}")
+                    # Common.logging(log_type, crawler, env, f"Receive\n"
+                    #                                        f"MessageId:{msg.message_id}\n"
+                    #                                        f"MessageBodyMD5:{msg.message_body_md5}\n"
+                    #                                        f"MessageTag:{msg.message_tag}\n"
+                    #                                        f"ConsumedTimes:{msg.consumed_times}\n"
+                    #                                        f"PublishTime:{msg.publish_time}\n"
+                    #                                        f"Body:{msg.message_body}\n"
+                    #                                        f"NextConsumeTime:{msg.next_consume_time}\n"
+                    #                                        f"ReceiptHandle:{msg.receipt_handle}\n"
+                    #                                        f"Properties:{msg.properties}")
+                    AliyunLogger.logging(
+                        code="1000",
+                        platform=crawler,
+                        mode=log_type,
+                        env=env,
+                        message=f"Receive\n"
+                                f"MessageId:{msg.message_id}\n"
+                                f"MessageBodyMD5:{msg.message_body_md5}\n"
+                                f"MessageTag:{msg.message_tag}\n"
+                                f"ConsumedTimes:{msg.consumed_times}\n"
+                                f"PublishTime:{msg.publish_time}\n"
+                                f"Body:{msg.message_body}\n"
+                                f"NextConsumeTime:{msg.next_consume_time}\n"
+                                f"ReceiptHandle:{msg.receipt_handle}\n"
+                                f"Properties:{msg.properties}"
+                    )
                     # ack_mq_message
                     ack_message(log_type=log_type, crawler=crawler, recv_msgs=recv_msgs, consumer=consumer)
 
@@ -71,20 +98,34 @@ class HTZFMain:
                         our_uid_list.append(user["uid"])
                     our_uid = random.choice(our_uid_list)
                     Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
-                    Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
-                    Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}")
-                    Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}")
+                    AliyunLogger.logging(
+                        "1000", crawler, log_type, env, f"调度任务:{task_dict}"
+                    )
+
                     Common.logger(log_type, crawler).info(f"用户列表:{user_list}\n")
-                    Common.logging(log_type, crawler, env, f"用户列表:{user_list}\n")
+                    AliyunLogger.logging(
+                        "1000", crawler, log_type, env, f"用户列表:{user_list}\n"
+                    )
                     Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
-                    Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n')
+                    AliyunLogger.logging(
+                        "1000", crawler, log_type, env, f'开始抓取:{task_dict["taskName"]}\n'
+                    )
                     new_r = get_rule_from_mysql(task_id=task_id, log_type=log_type, crawler=crawler, env=env)
                     r_d = {}
                     for item in new_r:
                         for k, val in item.items():
                             r_d[k] = val
                     Common.logger(log_type, crawler).info(f"抓取规则:{r_d}")
-                    Common.logging(log_type, crawler, env, f"抓取规则:{r_d}")
+                    AliyunLogger.logging(
+                        "1000", crawler, log_type, env, f"抓取规则:{r_d}"
+                    )
+                    AliyunLogger.logging(
+                        code="1003",
+                        platform=crawler,
+                        mode=log_type,
+                        env=env,
+                        message="成功获取信息,启动爬虫,开始一轮抓取",
+                    )
                     # 初始化
                     HTZF = HTZFScheduling(
                         log_type=log_type,
@@ -101,17 +142,21 @@ class HTZFMain:
                             HTZF.get_videoList(page_id=i + 1)
                             time.sleep(60)
                     Common.logger(log_type, crawler).info('抓取一轮结束\n')
-                    Common.logging(log_type, crawler, env, '抓取一轮结束\n')
+                    AliyunLogger.logging(
+                        code="1004",
+                        platform=crawler,
+                        mode=log_type,
+                        env=env,
+                        message="成功抓取完一轮",
+                    )
 
             except MQExceptionBase as err:
                 # Topic中没有消息可消费。
                 if err.type == "MessageNotExist":
                     Common.logger(log_type, crawler).info(f"No new message! RequestId:{err.req_id}\n")
-                    Common.logging(log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n")
                     continue
 
                 Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n")
-                Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n")
                 time.sleep(2)
                 continue
 
@@ -125,7 +170,7 @@ if __name__ == "__main__":
     parser.add_argument('--env')  ## 添加参数
     args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
     HTZFMain.main(log_type=args.log_type,
-                     crawler=args.crawler,
-                     topic_name=args.topic_name,
-                     group_id=args.group_id,
-                     env=args.env)
+                  crawler=args.crawler,
+                  topic_name=args.topic_name,
+                  group_id=args.group_id,
+                  env=args.env)

+ 122 - 92
haitunzhufu/haitunzhufu_recommend/haitunzhufu_recommend3.py

@@ -6,6 +6,7 @@ import os
 import random
 import sys
 import time
+import uuid
 from datetime import datetime
 import requests
 from base64 import b64encode, b64decode
@@ -16,27 +17,27 @@ from common.mq import MQ
 
 sys.path.append(os.getcwd())
 from common.common import Common
-from common.scheduling_db import MysqlHelper
-from common.public import get_config_from_mysql, download_rule_v2, clean_title
+from common.public import clean_title
+from common import AliyunLogger, PiaoQuanPipeline
 
 
 class AESCipher:
     def __init__(self, key):
-        self.key = key.encode('utf-8')  # 需要一个bytes类型的key
+        self.key = key.encode("utf-8")  # 需要一个bytes类型的key
         self.iv = self.key  # 在这个例子中,key和iv是相同的
 
     def encrypt(self, data):
         cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
-        ct_bytes = cipher.encrypt(pad(data.encode('utf-8'), AES.block_size))
-        ct = b64encode(ct_bytes).decode('utf-8')
+        ct_bytes = cipher.encrypt(pad(data.encode("utf-8"), AES.block_size))
+        ct = b64encode(ct_bytes).decode("utf-8")
         return ct
 
     def decrypt(self, data):
         try:
-            ct = b64decode(data.encode('utf-8'))
+            ct = b64decode(data.encode("utf-8"))
             cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
             pt = unpad(cipher.decrypt(ct), AES.block_size)
-            return pt.decode('utf-8')
+            return pt.decode("utf-8")
         except Exception as e:
             print("Incorrect decryption")
             return None
@@ -53,73 +54,126 @@ class HTZFScheduling:
         self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
         self.download_count = 0
 
-    def repeat_video(self, video_id):
-        sql = f""" select * from crawler_video where platform in ("{self.crawler}","{self.platform}") and out_video_id="{video_id}"; """
-        repeat_video = MysqlHelper.get_values(
-            self.log_type, self.crawler, sql, self.env
-        )
-        return len(repeat_video)
+    # def repeat_video(self, video_id):
+    #     sql = f""" select * from crawler_video where platform in ("{self.crawler}","{self.platform}") and out_video_id="{video_id}"; """
+    #     repeat_video = MysqlHelper.get_values(
+    #         self.log_type, self.crawler, sql, self.env
+    #     )
+    #     return len(repeat_video)
 
     # 获取视频id_list
     def get_videoList(self, page_id):
         time.sleep(random.randint(5, 10))
-        url = 'https://haitun.wyapi.cn/videos/api.videos/getItem'
+        url = "https://haitun.wyapi.cn/videos/api.videos/getItem"
         headers = {
-            'xweb_xhr': '1',
-            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817',
-            'content-type': 'application/json',
-            'accept': '*/*',
-            'sec-fetch-site': 'cross-site',
-            'sec-fetch-mode': 'cors',
-            'sec-fetch-dest': 'empty',
-            'referer': 'https://servicewechat.com/wxcc35cbbc445d331a/2/page-frame.html',
-            'accept-encoding': 'gzip, deflate, br',
-            'accept-language': 'en'
-        }
-        params = {
-            'mark': '',
-            'page': page_id
+            "xweb_xhr": "1",
+            "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36 MicroMessenger/6.8.0(0x16080000) NetType/WIFI MiniProgramEnv/Mac MacWechat/WMPF XWEB/30817",
+            "content-type": "application/json",
+            "accept": "*/*",
+            "sec-fetch-site": "cross-site",
+            "sec-fetch-mode": "cors",
+            "sec-fetch-dest": "empty",
+            "referer": "https://servicewechat.com/wxcc35cbbc445d331a/2/page-frame.html",
+            "accept-encoding": "gzip, deflate, br",
+            "accept-language": "en",
         }
-
+        params = {"mark": "", "page": page_id}
         response = requests.get(url, headers=headers, params=params)
         ori_result = response.json()
+        AliyunLogger.logging(
+            code="1000",
+            platform=self.crawler,
+            mode=self.log_type,
+            env=self.env,
+            message="开始抓取第{}页".format(page_id),
+        )
         key = "xlc2ze7qnqg8xi1d"
         cipher = AESCipher(key)
-        decrypted_text = cipher.decrypt(ori_result['data'])
+        try:
+            decrypted_text = cipher.decrypt(ori_result["data"])
+            AliyunLogger.logging(
+                code="1000",
+                platform=self.crawler,
+                mode=self.log_type,
+                env=self.env,
+                message="第{}页, 解密成功".format(page_id),
+            )
+        except:
+            AliyunLogger.logging(
+                code="2000",
+                platform=self.crawler,
+                mode=self.log_type,
+                env=self.env,
+                message="第{}页, 解密失败".format(page_id),
+            )
+            return
         result = json.loads(decrypted_text)
         if "list" not in result or response.status_code != 200:
             Common.logger(self.log_type, self.crawler).info(
                 f"get_videoList:{response.text}\n"
             )
-            Common.logging(
-                self.log_type,
-                self.crawler,
-                self.env,
-                f"get_videoList:{response.text}\n",
+            # Common.logging(
+            #     self.log_type,
+            #     self.crawler,
+            #     self.env,
+            #     f"get_videoList:{response.text}\n",
+            # )
+            AliyunLogger.logging(
+                code="2000",
+                platform=self.crawler,
+                mode=self.log_type,
+                env=self.env,
+                message=f"get_videoList:{response.text}\n",
             )
             return
         elif len(result["list"]) == 0:
             Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n")
-            Common.logging(self.log_type, self.crawler, self.env, f"没有更多数据啦~\n")
+            # Common.logging(self.log_type, self.crawler, self.env, f"没有更多数据啦~\n")
+            AliyunLogger.logging(
+                code="2000",
+                platform=self.crawler,
+                mode=self.log_type,
+                env=self.env,
+                message=f"没有更多数据啦~\n",
+            )
             return
         else:
             data_list = result["list"]
-            for video_obj in data_list:
+            for index, video_obj in enumerate(data_list):
                 try:
+                    AliyunLogger.logging(
+                        code="1001",
+                        platform=self.crawler,
+                        mode=self.log_type,
+                        env=self.env,
+                        data={},
+                        message="成功扫描到一条视频, 该视频位于第{}页{}条".format(page_id, index + 1),
+                    )
                     self.process_video_obj(video_obj)
                 except Exception as e:
                     Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n")
-                    Common.logging(
-                        self.log_type, self.crawler, self.env, f"抓取单条视频异常:{e}\n"
+                    # Common.logging(
+                    #     self.log_type, self.crawler, self.env, f"抓取单条视频异常:{e}\n"
+                    # )
+                    AliyunLogger.logging(
+                        code="3000",
+                        platform=self.crawler,
+                        mode=self.log_type,
+                        env=self.env,
+                        data=video_obj,
+                        message="抓取单条视频异常, 报错原因是: {}, 该视频位于第{}页{}条".format(
+                             e, page_id, index + 1
+                        ),
                     )
 
     def process_video_obj(self, video_obj):
         video_id = video_obj.get("id", 0)
+        trace_id = self.crawler + str(uuid.uuid1())
         video_title = clean_title(video_obj.get("name", "no title"))
         video_time = 0
         publish_time_str = video_obj.get("create_at", "")
         # 将时间字符串转换为 datetime 对象
-        dt = datetime.strptime(publish_time_str, '%Y-%m-%d %H:%M:%S')
+        dt = datetime.strptime(publish_time_str, "%Y-%m-%d %H:%M:%S")
         # 将 datetime 对象转换为时间戳
         publish_time_stamp = int(datetime.timestamp(dt))
         user_name = ""
@@ -133,81 +187,57 @@ class HTZFScheduling:
             "share_cnt": 0,
             "user_name": user_name,
             "publish_time_stamp": publish_time_stamp,
+            "update_time_stamp": int(time.time()),
             "publish_time_str": publish_time_str,
             "video_width": 0,
             "video_height": 0,
             "profile_id": 0,
             "profile_mid": 0,
             "session": f"haitunzhufu-{int(time.time())}",
+            "out_user_id": video_obj.get("profile_id", 0),
+            "platform": self.crawler,
+            "strategy": self.log_type,
         }
-        for k, v in video_dict.items():
-            Common.logger(self.log_type, self.crawler).info(f"{k}:{v}")
-        Common.logging(
-            self.log_type, self.crawler, self.env, f"{video_dict}"
+        video_dict["out_video_id"] = str(video_dict["video_id"])
+        pipeline = PiaoQuanPipeline(
+            platform=self.crawler,
+            mode=self.log_type,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            item=video_dict,
+            trace_id=trace_id
         )
-        # 过滤无效视频
-        if video_title == "" or video_dict["video_id"] == "":
-            Common.logger(self.log_type, self.crawler).info("无效视频\n")
-            Common.logging(self.log_type, self.crawler, self.env, "无效视频\n")
-            # 抓取基础规则过滤
-        elif (
-                download_rule_v2(
-                    log_type=self.log_type,
-                    crawler=self.crawler,
-                    video_dict=video_dict,
-                    rule_dict=self.rule_dict,
-                )
-                is False
-        ):
-            Common.logger(self.log_type, self.crawler).info("不满足抓取规则\n")
-            Common.logging(
-                self.log_type, self.crawler, self.env, "不满足抓取规则\n"
-            )
-        elif (
-                any(
-                    str(word)
-                    if str(word) in video_dict["video_title"]
-                    else False
-                    for word in get_config_from_mysql(
-                        log_type=self.log_type,
-                        source=self.crawler,
-                        env=self.env,
-                        text="filter",
-                        action="",
-                    )
-                )
-                is True
-        ):
-            Common.logger(self.log_type, self.crawler).info("已中过滤词\n")
-            Common.logging(self.log_type, self.crawler, self.env, "已中过滤词\n")
-        elif self.repeat_video(video_dict["video_id"]) != 0:
-            Common.logger(self.log_type, self.crawler).info("视频已下载\n")
-            Common.logging(self.log_type, self.crawler, self.env, "视频已下载\n")
-        else:
-            video_dict["out_user_id"] = video_obj.get("profile_id", 0)
-            video_dict["platform"] = self.crawler
-            video_dict["strategy"] = self.log_type
-            video_dict["out_video_id"] = str(video_dict["video_id"])
+        flag = pipeline.process_item()
+        if flag:
             video_dict["width"] = video_dict["video_width"]
             video_dict["height"] = video_dict["video_height"]
             video_dict["crawler_rule"] = json.dumps(self.rule_dict)
             video_dict["user_id"] = self.our_uid
             video_dict["publish_time"] = video_dict["publish_time_str"]
-            video_dict["video_url"] = video_obj['cover']
+            video_dict["video_url"] = video_obj["cover"]
             video_dict["avatar_url"] = ""
-            video_dict["cover_url"] = video_obj['cover'] + "&vframe/png/offset/1/w/200"
-            # print(json.dumps(video_dict, ensure_ascii=False, indent=4))
+            video_dict["cover_url"] = video_obj["cover"] + "&vframe/png/offset/1/w/200"
             self.download_count += 1
             self.mq.send_msg(video_dict)
+            # print(video_dict)
+            AliyunLogger.logging(
+                code="1002",
+                platform=self.crawler,
+                mode=self.log_type,
+                env=self.env,
+                data=video_dict,
+                trace_id=trace_id,
+                message="成功发送 MQ 至 ETL"
+            )
 
 
 if __name__ == "__main__":
     ZL = HTZFScheduling(
         log_type="recommend",
-        crawler="htzf",
+        crawler="haitunzhufu",
         rule_dict={},
         our_uid="luojunhuihaoshuai",
-        env="dev"
+        env="dev",
     )
     for i in range(4):
         ZL.get_videoList(page_id=i + 1)