浏览代码

1. 加了 video_item代码
2. 增加中老年娱乐新日志

罗俊辉 1 年之前
父节点
当前提交
76cb89ee77

+ 45 - 0
common/video_item.py

@@ -0,0 +1,45 @@
+from common.public import clean_title
+from .aliyun_log import AliyunLogger
+
+
+class VideoItem:
+    """
+    function: 当扫描进一条视频的时候,对该视频的基本信息进行处理,保证发送给 pipeline和 etl 的 video_dict 是正确的
+    __init__: 初始化空json 对象,用来存储视频信息
+    add_video_info: 把视频信息存储到 item 对象中
+    check_item: 检查 item 对象中的各个元素以及处理
+    """
+    def __init__(self):
+        self.item = {}
+
+    def add_video_info(self, key, value):
+        self.item[key] = value
+
+    # 判断视频格式, 做兼容
+    def check_item(self):
+        # video_title
+        if self.item.get("video_title"):
+            self.item['video_title'] = clean_title(self.item['video_title'])
+        else:
+            self.item['video_title'] = "No title"
+        # video_id
+
+        # video_time, publish_time_str, publish_time_stamp, update_time_stamp
+
+        # play_cnt, like_cnt, comment_cnt, share_cnt
+
+        # width, height, video_width, video_height
+
+        # user_name, user_id, out_user_name, out_user_id
+
+        # profile_id, profile_mid
+
+        # session
+
+        # video_url
+
+        # cover_url
+
+    def return_item(self):
+        self.check_item()
+        return self.item

+ 80 - 6
zhonglaonianyule/zhonglaonianyule_main/run_zlnyljcrd_recommend.py

@@ -12,6 +12,7 @@ sys.path.append(os.getcwd())
 from common.public import get_consumer, ack_message, task_fun_mq
 from common.common import Common
 from common.scheduling_db import MysqlHelper
+from common import AliyunLogger
 from zhonglaonianyule.zhonglaonianyule_recommend import ZLNYLScheduling
 
 
@@ -37,6 +38,16 @@ def main(log_type, crawler, topic_name, group_id, env):
         f"TopicName:{topic_name}\n"
         f"MQConsumer:{group_id}",
     )
+    AliyunLogger.logging(
+        code="1000",
+        platform=crawler,
+        mode=log_type,
+        env=env,
+        message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+        f"WaitSeconds:{wait_seconds}\n"
+        f"TopicName:{topic_name}\n"
+        f"MQConsumer:{group_id}",
+    )
     while True:
         try:
             # 长轮询消费消息。
@@ -70,6 +81,22 @@ def main(log_type, crawler, topic_name, group_id, env):
                     f"ReceiptHandle:{msg.receipt_handle}\n"
                     f"Properties:{msg.properties}",
                 )
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"Receive\n"
+                    f"MessageId:{msg.message_id}\n"
+                    f"MessageBodyMD5:{msg.message_body_md5}\n"
+                    f"MessageTag:{msg.message_tag}\n"
+                    f"ConsumedTimes:{msg.consumed_times}\n"
+                    f"PublishTime:{msg.publish_time}\n"
+                    f"Body:{msg.message_body}\n"
+                    f"NextConsumeTime:{msg.next_consume_time}\n"
+                    f"ReceiptHandle:{msg.receipt_handle}\n"
+                    f"Properties:{msg.properties}",
+                )
                 # ack_mq_message
                 ack_message(
                     log_type=log_type,
@@ -88,20 +115,47 @@ def main(log_type, crawler, topic_name, group_id, env):
                 user_list = MysqlHelper.get_values(
                     log_type, crawler, select_user_sql, env, action=""
                 )
-                Common.logger(log_type, crawler).info(f"调度任务:\n{task_dict}")
+                Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
                 Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
-                Common.logger(log_type, crawler).info(f"抓取规则:\n{rule_dict}")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"调度任务:{task_dict}",
+                )
+                Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}")
                 Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}")
-                Common.logger(log_type, crawler).info(f"用户列表:\n{user_list}")
-                Common.logging(log_type, crawler, env, f"用户列表:\n{user_list}")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"抓取规则:{rule_dict}",
+                )
+                Common.logger(log_type, crawler).info(f"用户列表:{user_list}\n")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"用户列表:{user_list}\n",
+                )
                 Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
                 Common.logging(
                     log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n'
                 )
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f'开始抓取:{task_dict["taskName"]}\n',
+                )
                 ZL = ZLNYLScheduling(
                     log_type=log_type,
                     crawler=crawler,
-                    category=3615,
+                    category=3613,
                     rule_dict=rule_dict,
                     env=env,
                 )
@@ -110,6 +164,13 @@ def main(log_type, crawler, topic_name, group_id, env):
                     time.sleep(60)
                 Common.logger(log_type, crawler).info("抓取一轮结束\n")
                 Common.logging(log_type, crawler, env, "抓取一轮结束\n")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="抓取一轮结束\n",
+                )
                 xng_author_end_time = int(time.time())
                 xng_author_duration = xng_author_start_time - xng_author_end_time
                 Common.logger(log_type, crawler).info(f"duration {xng_author_duration}")
@@ -126,14 +187,27 @@ def main(log_type, crawler, topic_name, group_id, env):
                 Common.logging(
                     log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n"
                 )
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n",
+                )
                 continue
-
             Common.logger(log_type, crawler).info(
                 f"Consume Message Fail! Exception:{err}\n"
             )
             Common.logging(
                 log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n"
             )
+            AliyunLogger.logging(
+                code="2000",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n",
+            )
             time.sleep(2)
             continue
 

+ 82 - 18
zhonglaonianyule/zhonglaonianyule_main/run_zlnyljkys_recommend.py

@@ -3,7 +3,6 @@
 # @Time: 2023/10/10
 import argparse
 import time
-import random
 
 from mq_http_sdk.mq_client import *
 from mq_http_sdk.mq_consumer import *
@@ -13,6 +12,7 @@ sys.path.append(os.getcwd())
 from common.public import get_consumer, ack_message, task_fun_mq
 from common.common import Common
 from common.scheduling_db import MysqlHelper
+from common import AliyunLogger
 from zhonglaonianyule.zhonglaonianyule_recommend import ZLNYLScheduling
 
 
@@ -38,6 +38,16 @@ def main(log_type, crawler, topic_name, group_id, env):
         f"TopicName:{topic_name}\n"
         f"MQConsumer:{group_id}",
     )
+    AliyunLogger.logging(
+        code="1000",
+        platform=crawler,
+        mode=log_type,
+        env=env,
+        message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+        f"WaitSeconds:{wait_seconds}\n"
+        f"TopicName:{topic_name}\n"
+        f"MQConsumer:{group_id}",
+    )
     while True:
         try:
             # 长轮询消费消息。
@@ -71,6 +81,22 @@ def main(log_type, crawler, topic_name, group_id, env):
                     f"ReceiptHandle:{msg.receipt_handle}\n"
                     f"Properties:{msg.properties}",
                 )
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"Receive\n"
+                    f"MessageId:{msg.message_id}\n"
+                    f"MessageBodyMD5:{msg.message_body_md5}\n"
+                    f"MessageTag:{msg.message_tag}\n"
+                    f"ConsumedTimes:{msg.consumed_times}\n"
+                    f"PublishTime:{msg.publish_time}\n"
+                    f"Body:{msg.message_body}\n"
+                    f"NextConsumeTime:{msg.next_consume_time}\n"
+                    f"ReceiptHandle:{msg.receipt_handle}\n"
+                    f"Properties:{msg.properties}",
+                )
                 # ack_mq_message
                 ack_message(
                     log_type=log_type,
@@ -89,37 +115,62 @@ def main(log_type, crawler, topic_name, group_id, env):
                 user_list = MysqlHelper.get_values(
                     log_type, crawler, select_user_sql, env, action=""
                 )
-                Common.logger(log_type, crawler).info(f"调度任务:\n{task_dict}")
+                Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
                 Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
-                Common.logger(log_type, crawler).info(f"抓取规则:\n{rule_dict}")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"调度任务:{task_dict}",
+                )
+                Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}")
                 Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}")
-                Common.logger(log_type, crawler).info(f"用户列表:\n{user_list}")
-                Common.logging(log_type, crawler, env, f"用户列表:\n{user_list}")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"抓取规则:{rule_dict}",
+                )
+                Common.logger(log_type, crawler).info(f"用户列表:{user_list}\n")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"用户列表:{user_list}\n",
+                )
                 Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
                 Common.logging(
                     log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n'
                 )
-                our_uid_list = []
-                for user in user_list:
-                    our_uid_list.append(user["uid"])
-                our_uid = random.choice(our_uid_list)
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f'开始抓取:{task_dict["taskName"]}\n',
+                )
                 ZL = ZLNYLScheduling(
                     log_type=log_type,
                     crawler=crawler,
-                    category=3615,
+                    category=3613,
                     rule_dict=rule_dict,
                     env=env,
-                    our_uid=our_uid
                 )
                 for i in range(100):
-                    if ZL.download_count >= int(rule_dict.get("videos_cnt", {}).get("min", 10)):
-                        ZL.download_count = 0
-                        break
-                    else:
-                        ZL.get_videoList(i + 1)
-                        time.sleep(60)
+                    ZL.get_videoList(i + 1)
+                    time.sleep(60)
                 Common.logger(log_type, crawler).info("抓取一轮结束\n")
                 Common.logging(log_type, crawler, env, "抓取一轮结束\n")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="抓取一轮结束\n",
+                )
                 xng_author_end_time = int(time.time())
                 xng_author_duration = xng_author_start_time - xng_author_end_time
                 Common.logger(log_type, crawler).info(f"duration {xng_author_duration}")
@@ -136,14 +187,27 @@ def main(log_type, crawler, topic_name, group_id, env):
                 Common.logging(
                     log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n"
                 )
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n",
+                )
                 continue
-
             Common.logger(log_type, crawler).info(
                 f"Consume Message Fail! Exception:{err}\n"
             )
             Common.logging(
                 log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n"
             )
+            AliyunLogger.logging(
+                code="2000",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n",
+            )
             time.sleep(2)
             continue
 

+ 73 - 84
zhonglaonianyule/zhonglaonianyule_recommend/zhonglaonianyule_recommend_scheduling.py

@@ -5,44 +5,20 @@ import json
 import os
 import random
 import sys
+import uuid
 import time
 import requests
-from hashlib import md5
 
 from common.mq import MQ
 
 sys.path.append(os.getcwd())
 from common.common import Common
-from common.scheduling_db import MysqlHelper
-from common.public import get_config_from_mysql, download_rule
+from common import PiaoQuanPipeline, AliyunLogger
+from common.public import clean_title
 
 proxies = {"http": None, "https": None}
 
 
-def clean_title(strings):
-    return (
-        strings.strip()
-        .replace("\n", "")
-        .replace("/", "")
-        .replace("\r", "")
-        .replace("#", "")
-        .replace(".", "。")
-        .replace("\\", "")
-        .replace("&NBSP", "")
-        .replace(":", "")
-        .replace("*", "")
-        .replace("?", "")
-        .replace("?", "")
-        .replace('"', "")
-        .replace("<", "")
-        .replace(">", "")
-        .replace("|", "")
-        .replace(" ", "")
-        .replace('"', "")
-        .replace("'", "")
-    )
-
-
 class ZLNYLScheduling:
     def __init__(self, log_type, crawler, category, rule_dict, env, our_uid):
         self.platform = "中老年娱乐"
@@ -55,13 +31,6 @@ class ZLNYLScheduling:
         self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
         self.download_count = 0
 
-    def repeat_video(self, video_id):
-        sql = f""" select * from crawler_video where platform in ("{self.crawler}","{self.platform}") and out_video_id="{video_id}"; """
-        repeat_video = MysqlHelper.get_values(
-            self.log_type, self.crawler, sql, self.env
-        )
-        return len(repeat_video)
-
     # 获取视频id_list
     def get_videoList(self, page_id):
         url = "https://kkj.xinhuachuanmeijs.com/app/index.php?i=299&t=0&m=jyt_txvideo&v=1.0&from=wxapp&c=entry&a=wxapp&do=videolist&"
@@ -88,15 +57,38 @@ class ZLNYLScheduling:
                 self.env,
                 f"get_videoList:{response.text}\n",
             )
+            AliyunLogger.logging(
+                code="2000",
+                platform=self.crawler,
+                mode=self.log_type,
+                env=self.env,
+                message=f"get_videoList:{response.text}\n",
+            )
             return
         elif len(response.json()["data"]) == 0:
             Common.logger(self.log_type, self.crawler).info(f"没有更多数据啦~\n")
             Common.logging(self.log_type, self.crawler, self.env, f"没有更多数据啦~\n")
+            AliyunLogger.logging(
+                code="2000",
+                platform=self.crawler,
+                mode=self.log_type,
+                env=self.env,
+                message=f"没有更多数据啦~\n"
+            )
             return
         else:
             data_list = response.json()["data"]
             for video_obj in data_list:
                 try:
+                    trace_id = self.crawler + str(uuid.uuid1())
+                    AliyunLogger.logging(
+                        code="1001",
+                        platform=self.crawler,
+                        mode=self.log_type,
+                        env=self.env,
+                        trace_id=trace_id,
+                        message="扫描到一条视频"
+                    )
                     video_id = video_obj.get("vid", 0)
                     video_title = clean_title(video_obj.get("vtitle", 0))
                     video_time = video_obj.get("v_time", 0)
@@ -116,6 +108,7 @@ class ZLNYLScheduling:
                         "user_name": user_name,
                         "publish_time_stamp": publish_time_stamp,
                         "publish_time_str": publish_time_str,
+                        "update_time_stamp": int(time.time()),
                         "video_width": 0,
                         "video_height": 0,
                         "profile_id": 0,
@@ -128,67 +121,63 @@ class ZLNYLScheduling:
                     Common.logging(
                         self.log_type, self.crawler, self.env, f"{video_dict}"
                     )
+                    video_dict["out_user_id"] = video_dict["profile_id"]
+                    video_dict["platform"] = self.crawler
+                    video_dict["strategy"] = self.log_type
+                    video_dict["out_video_id"] = video_dict["video_id"]
+                    video_dict["width"] = video_dict["video_width"]
+                    video_dict["height"] = video_dict["video_height"]
+                    video_dict["crawler_rule"] = json.dumps(self.rule_dict)
+                    video_dict["user_id"] = self.our_uid
+                    video_dict["publish_time"] = video_dict["publish_time_str"]
+                    d_obj = self.find_video_url(video_id)
+                    video_dict["video_url"] = d_obj["url"]
+                    video_dict["avatar_url"] = d_obj["cover"]
+                    video_dict["cover_url"] = d_obj["cover"]
                     # 过滤无效视频
                     if video_title == "" or video_dict["video_id"] == "":
                         Common.logger(self.log_type, self.crawler).info("无效视频\n")
                         Common.logging(self.log_type, self.crawler, self.env, "无效视频\n")
-                        # 抓取基础规则过滤
-                    elif (
-                        download_rule(
-                            log_type=self.log_type,
-                            crawler=self.crawler,
-                            video_dict=video_dict,
-                            rule_dict=self.rule_dict,
-                        )
-                        is False
-                    ):
-                        Common.logger(self.log_type, self.crawler).info("不满足抓取规则\n")
-                        Common.logging(
-                            self.log_type, self.crawler, self.env, "不满足抓取规则\n"
-                        )
-                    elif (
-                        any(
-                            str(word)
-                            if str(word) in video_dict["video_title"]
-                            else False
-                            for word in get_config_from_mysql(
-                                log_type=self.log_type,
-                                source=self.crawler,
-                                env=self.env,
-                                text="filter",
-                                action="",
-                            )
+                        AliyunLogger.logging(
+                            code="2005",
+                            platform=self.crawler,
+                            mode=self.log_type,
+                            env=self.env,
+                            trace_id=trace_id,
+                            message="无效视频"
                         )
-                        is True
-                    ):
-                        Common.logger(self.log_type, self.crawler).info("已中过滤词\n")
-                        Common.logging(self.log_type, self.crawler, self.env, "已中过滤词\n")
-                    elif self.repeat_video(video_dict["video_id"]) != 0:
-                        Common.logger(self.log_type, self.crawler).info("视频已下载\n")
-                        Common.logging(self.log_type, self.crawler, self.env, "视频已下载\n")
-                    else:
-                        # out_video_id = md5(video_title.encode('utf8')).hexdigest()
-                        # out_user_id = md5(user_name.encode('utf8')).hexdigest()
-                        video_dict["out_user_id"] = video_dict["profile_id"]
-                        video_dict["platform"] = self.crawler
-                        video_dict["strategy"] = self.log_type
-                        video_dict["out_video_id"] = video_dict["video_id"]
-                        video_dict["width"] = video_dict["video_width"]
-                        video_dict["height"] = video_dict["video_height"]
-                        video_dict["crawler_rule"] = json.dumps(self.rule_dict)
-                        video_dict["user_id"] = self.our_uid
-                        video_dict["publish_time"] = video_dict["publish_time_str"]
-                        d_obj = self.find_video_url(video_id)
-                        video_dict["video_url"] = d_obj["url"]
-                        video_dict["avatar_url"] = d_obj["cover"]
-                        video_dict["cover_url"] = d_obj["cover"]
-                        # print(json.dumps(video_dict, ensure_ascii=False, indent=4))
+                        continue
+                    pipeline = PiaoQuanPipeline(
+                        platform=self.crawler,
+                        mode=self.log_type,
+                        env=self.env,
+                        rule_dict=self.rule_dict,
+                        item=video_dict,
+                        trace_id=trace_id
+                    )
+                    if pipeline.process_item():
                         self.mq.send_msg(video_dict)
+                        AliyunLogger.logging(
+                            code="1002",
+                            platform=self.crawler,
+                            mode=self.log_type,
+                            env=self.env,
+                            data=video_dict,
+                            trace_id=trace_id,
+                            message="成功发送至 ETL"
+                        )
                 except Exception as e:
                     Common.logger(self.log_type, self.crawler).error(f"抓取单条视频异常:{e}\n")
                     Common.logging(
                         self.log_type, self.crawler, self.env, f"抓取单条视频异常:{e}\n"
                     )
+                    AliyunLogger.logging(
+                        code="3000",
+                        platform=self.crawler,
+                        mode=self.log_type,
+                        env=self.env,
+                        message=f"抓取单条视频异常:{e}\n"
+                    )
 
     def find_video_url(self, video_id):
         url = "https://kkj.xinhuachuanmeijs.com/app/index.php?i=299&t=0&m=jyt_txvideo&v=1.0&from=wxapp&c=entry&a=wxapp&do=videoinfo&state=we7sid-f0008a08276fc324921185dc74427c56&sign=fa36387242169f01aa747a80d49c8670&vid={}&version=1.0.3".format(