Selaa lähdekoodia

修改公众号日志

罗俊辉 1 vuosi sitten
vanhempi
commit
55a01e9521

+ 2 - 1
common/__init__.py

@@ -1,2 +1,3 @@
 from .aliyun_log import AliyunLogger
-from .redirect_url import get_redirect_url
+from .redirect_url import get_redirect_url
+from .pipeline import PiaoQuanPipeline

+ 167 - 4
gongzhonghao/gongzhonghao_author/gongzhonghao_author.py

@@ -4,6 +4,7 @@ import os
 import random
 import sys
 import time
+import uuid
 import requests
 import urllib3
 from selenium.webdriver import DesiredCapabilities
@@ -16,6 +17,7 @@ from common.common import Common
 from common.feishu import Feishu
 from common.scheduling_db import MysqlHelper
 from common.public import get_config_from_mysql, download_rule, title_like, task_unbind
+from common import AliyunLogger, PiaoQuanPipeline
 
 
 class GongzhonghaoAuthor:
@@ -45,6 +47,13 @@ class GongzhonghaoAuthor:
     def get_user_info(cls, log_type, crawler, task_dict, user_dict, token_index, env):
         Common.logger(log_type, crawler).info(f"获取站外用户信息:{user_dict['link']}")
         Common.logging(log_type, crawler, env, f"获取站外用户信息:{user_dict['link']}")
+        AliyunLogger.logging(
+            code="1000",
+            platform=crawler,
+            mode=log_type,
+            env=env,
+            message=f"获取站外用户信息:{user_dict['link']}"
+        )
         while True:
             token_dict = cls.get_token(log_type, crawler, token_index, env)
             url = "https://mp.weixin.qq.com/cgi-bin/searchbiz?"
@@ -82,6 +91,13 @@ class GongzhonghaoAuthor:
             if r.json()["base_resp"]["err_msg"] == "invalid session":
                 Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}, get_fakeid:{r.text}\n")
                 Common.logging(log_type, crawler, env, f"status_code:{r.status_code}, get_fakeid:{r.text}\n")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"status_code:{r.status_code}, get_fakeid:{r.text}\n"
+                )
                 if 20 >= datetime.datetime.now().hour >= 10:
                     Feishu.bot(log_type, crawler, f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n过期啦,请扫码更换token\nhttps://mp.weixin.qq.com/")
                 time.sleep(60 * 15)
@@ -89,6 +105,13 @@ class GongzhonghaoAuthor:
             if r.json()["base_resp"]["err_msg"] == "freq control":
                 Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}, get_fakeid:{r.text}\n")
                 Common.logging(log_type, crawler, env, f"status_code:{r.status_code}, get_fakeid:{r.text}\n")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"status_code:{r.status_code}, get_fakeid:{r.text}\n"
+                )
                 if 20 >= datetime.datetime.now().hour >= 10:
                     Feishu.bot(log_type, crawler, f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/")
                 time.sleep(60 * 15)
@@ -96,14 +119,35 @@ class GongzhonghaoAuthor:
             if r.json()["base_resp"]["err_msg"] == "ok" and len(r.json()["list"]) == 0:
                 Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}, get_fakeid:{r.text}\n")
                 Common.logging(log_type, crawler, env, f"status_code:{r.status_code}, get_fakeid:{r.text}\n")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"status_code:{r.status_code}, get_fakeid:{r.text}\n"
+                )
                 unbind_msg = task_unbind(log_type=log_type, crawler=crawler, taskid=task_dict['id'], uids=str(user_dict["uid"]), env=env)
                 if unbind_msg == "success":
                     if 20 >= datetime.datetime.now().hour >= 10:
                         Feishu.bot(log_type, crawler, f"公众号:{user_dict['link']}, 站内昵称:{user_dict['nick_name']}\n抓取异常, 已取消抓取该公众号\n")
                     Common.logging(log_type, crawler, env, f"公众号:{user_dict['link']}, 站内昵称:{user_dict['nick_name']}\n抓取异常, 已取消抓取该公众号\n")
+                    AliyunLogger.logging(
+                        code="2000",
+                        platform=crawler,
+                        mode=log_type,
+                        env=env,
+                        message=f"公众号:{user_dict['link']}, 站内昵称:{user_dict['nick_name']}\n抓取异常, 已取消抓取该公众号\n"
+                    )
                 else:
                     Common.logger(log_type, crawler).warning(f"unbind_msg:{unbind_msg}")
                     Common.logging(log_type, crawler, env, f"unbind_msg:{unbind_msg}")
+                    AliyunLogger.logging(
+                        code="2000",
+                        platform=crawler,
+                        mode=log_type,
+                        env=env,
+                        message=f"unbind_msg: {unbind_msg}"
+                    )
                 return None
             user_info_dict = {'user_name': r.json()["list"][0]["nickname"],
                               'user_id': r.json()["list"][0]["fakeid"],
@@ -228,6 +272,13 @@ class GongzhonghaoAuthor:
             if r.json()["base_resp"]["err_msg"] == "invalid session":
                 Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}, get_videoList:{r.text}\n")
                 Common.logging(log_type, crawler, env, f"status_code:{r.status_code}, get_videoList:{r.text}\n")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"status_code:{r.status_code}, get_videoList:{r.text}\n"
+                )
                 if 20 >= datetime.datetime.now().hour >= 10:
                     Feishu.bot(log_type, crawler, f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']}\n过期啦,请扫码更换token\nhttps://mp.weixin.qq.com/")
                 time.sleep(60 * 15)
@@ -235,6 +286,13 @@ class GongzhonghaoAuthor:
             if r.json()["base_resp"]["err_msg"] == "freq control":
                 Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}, get_videoList:{r.text}\n")
                 Common.logging(log_type, crawler, env, f"status_code:{r.status_code}, get_videoList:{r.text}\n")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"status_code:{r.status_code}, get_videoList:{r.text}\n"
+                )
                 if 20 >= datetime.datetime.now().hour >= 10:
                     Feishu.bot(log_type, crawler,f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']} \n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/")
                 time.sleep(60 * 15)
@@ -242,6 +300,13 @@ class GongzhonghaoAuthor:
             if r.json()["base_resp"]["err_msg"] == "invalid args" and r.json()["base_resp"]["ret"] == 200002:
                 Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}, get_videoList:{r.text}\n")
                 Common.logging(log_type, crawler, env, f"status_code:{r.status_code}, get_videoList:{r.text}\n")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"status_code:{r.status_code}, get_videoList:{r.text}\n"
+                )
                 task_unbind(log_type=log_type, crawler=crawler, taskid=task_dict['id'], uids=str(user_dict["uid"]), env=env)
                 if 20 >= datetime.datetime.now().hour >= 10:
                     Feishu.bot(log_type, crawler,f"公众号:{user_dict['link']}, 站内昵称:{user_dict['nick_name']}\n抓取异常, 已取消抓取该公众号\n")
@@ -249,6 +314,13 @@ class GongzhonghaoAuthor:
             if 'app_msg_list' not in r.json():
                 Common.logger(log_type, crawler).warning(f"status_code:{r.status_code}, get_videoList:{r.text}\n")
                 Common.logging(log_type, crawler, env, f"status_code:{r.status_code}, get_videoList:{r.text}\n")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"status_code:{r.status_code}, get_videoList:{r.text}\n"
+                )
                 if 20 >= datetime.datetime.now().hour >= 10:
                     Feishu.bot(log_type, crawler, f"{token_dict['title']}\n操作人:{token_dict['operator']}\n更换日期:{token_dict['update_time']}\n频控啦,请扫码更换其他公众号token\nhttps://mp.weixin.qq.com/")
                 time.sleep(60 * 15)
@@ -256,12 +328,20 @@ class GongzhonghaoAuthor:
             if len(r.json()['app_msg_list']) == 0:
                 Common.logger(log_type, crawler).info('没有更多视频了\n')
                 Common.logging(log_type, crawler, env, '没有更多视频了\n')
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="没有更多视频了\n"
+                )
                 return
             else:
                 begin += 5
                 app_msg_list = r.json()['app_msg_list']
                 for article in app_msg_list:
                     try:
+                        trace_id = crawler + str(uuid.uuid1())
                         create_time = article.get('create_time', 0)
                         update_time = article.get('update_time', 0)
                         publish_time_stamp = int(create_time)
@@ -288,16 +368,44 @@ class GongzhonghaoAuthor:
                         for k, v in video_dict.items():
                             Common.logger(log_type, crawler).info(f"{k}:{v}")
                         Common.logging(log_type, crawler, env, f'video_dict:{video_dict}')
-
+                        AliyunLogger.logging(
+                            code="1001",
+                            trace_id=trace_id,
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message="扫描到一条视频",
+                            data=video_dict
+                        )
                         if (int(time.time()) - publish_time_stamp > 3600 * 24 * int(rule_dict.get('period', {}).get('max', 1000)))\
                                 and (int(time.time()) - update_time_stamp > 3600 * 24 * int(rule_dict.get('period', {}).get('max', 1000))):
                             Common.logger(log_type, crawler).info(f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n")
                             Common.logging(log_type, crawler, env, f"发布时间超过{int(rule_dict.get('period', {}).get('max', 1000))}天\n")
+                            AliyunLogger.logging(
+                                code="2004",
+                                trace_id=trace_id,
+                                platform=crawler,
+                                mode=log_type,
+                                env=env,
+                                data=video_dict,
+                                message="发布时间超过{}天".format(
+                                    int(rule_dict.get("period", {}).get("max", 1000))
+                                ),
+                            )
                             return
 
                         if video_dict['article_url'] == 0 or video_dict['video_url'] == 0:
                             Common.logger(log_type, crawler).info("文章涉嫌违反相关法律法规和政策\n")
                             Common.logging(log_type, crawler, env, "文章涉嫌违反相关法律法规和政策\n")
+                            AliyunLogger.logging(
+                                code="2005",
+                                trace_id=trace_id,
+                                platform=crawler,
+                                mode=log_type,
+                                env=env,
+                                data=video_dict,
+                                message="无效文章或视频"
+                            )
                         # 标题敏感词过滤
                         elif any(str(word) if str(word) in video_dict['video_title'] else False
                                  for word in get_config_from_mysql(log_type=log_type,
@@ -307,14 +415,32 @@ class GongzhonghaoAuthor:
                                                                    action="")) is True:
                             Common.logger(log_type, crawler).info("标题已中过滤词\n")
                             Common.logging(log_type, crawler, env, "标题已中过滤词\n")
+                            AliyunLogger.logging(
+                                code="2003",
+                                trace_id=trace_id,
+                                platform=crawler,
+                                mode=log_type,
+                                env=env,
+                                data=video_dict,
+                                message="标题已中过滤词\n"
+                            )
                         # 已下载判断
                         elif cls.repeat_video(log_type, crawler, video_dict['video_id'], env) != 0:
                             Common.logger(log_type, crawler).info("视频已下载\n")
                             Common.logging(log_type, crawler, env, "视频已下载\n")
+                            AliyunLogger.logging(
+                                code="2002",
+                                trace_id=trace_id,
+                                platform=crawler,
+                                mode=log_type,
+                                env=env,
+                                data=video_dict,
+                                message="视频已下载"
+                            )
                         # 标题相似度
-                        elif title_like(log_type, crawler, video_dict['video_title'], cls.platform, env) is True:
-                            Common.logger(log_type, crawler).info(f'标题相似度>=80%:{video_dict["video_title"]}\n')
-                            Common.logging(log_type, crawler, env, f'标题相似度>=80%:{video_dict["video_title"]}\n')
+                        # elif title_like(log_type, crawler, video_dict['video_title'], cls.platform, env) is True:
+                        #     Common.logger(log_type, crawler).info(f'标题相似度>=80%:{video_dict["video_title"]}\n')
+                        #     Common.logging(log_type, crawler, env, f'标题相似度>=80%:{video_dict["video_title"]}\n')
                         else:
                             video_dict["out_user_id"] = video_dict["user_id"]
                             video_dict["platform"] = crawler
@@ -326,10 +452,26 @@ class GongzhonghaoAuthor:
                             video_dict["user_id"] = user_dict["uid"]  # 站内 UID?爬虫获取不到了(随机发布到原 5 个账号中)
                             video_dict["publish_time"] = video_dict["publish_time_str"]
                             mq.send_msg(video_dict)
+                            AliyunLogger.logging(
+                                code="1002",
+                                trace_id=trace_id,
+                                platform=crawler,
+                                mode=log_type,
+                                env=env,
+                                data=video_dict,
+                                message="成功发送 MQ 至 ETL"
+                            )
                             time.sleep(random.randint(1, 8))
                     except Exception as e:
                         Common.logger(log_type, crawler).error(f"抓取单条视频异常:{e}\n")
                         Common.logging(log_type, crawler, env, f"抓取单条视频异常:{e}\n")
+                        AliyunLogger.logging(
+                            code="3000",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message=f"抓取单条视频异常:{e}\n"
+                        )
                 Common.logger(log_type, crawler).info('休眠 60 秒\n')
                 Common.logging(log_type, crawler, env, '休眠 60 秒\n')
                 time.sleep(60)
@@ -346,6 +488,13 @@ class GongzhonghaoAuthor:
         for user_dict in user_list:
             Common.logger(log_type, crawler).info(f'抓取公众号:{user_dict["nick_name"]}\n')
             Common.logging(log_type, crawler, env, f'抓取公众号:{user_dict["nick_name"]}\n')
+            AliyunLogger.logging(
+                code="1003",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message="开始抓取公众号: {}".format(user_dict['nick_name'])
+            )
             try:
                 cls.get_videoList(log_type=log_type,
                                   crawler=crawler,
@@ -360,6 +509,20 @@ class GongzhonghaoAuthor:
             except Exception as e:
                 Common.logger(log_type, crawler).info(f'抓取公众号:{user_dict["nick_name"]}时异常:{e}\n')
                 Common.logging(log_type, crawler, env, f'抓取公众号:{user_dict["nick_name"]}时异常:{e}\n')
+                AliyunLogger.logging(
+                    code="3000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="抓取公众号: {} 时异常".format(user_dict['nick_name'])
+                )
+            AliyunLogger.logging(
+                code="1004",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message="完成抓取公众号: {}".format(user_dict['nick_name'])
+            )
 
 
 if __name__ == "__main__":

+ 67 - 3
gongzhonghao/gongzhonghao_main/run_gzh_author.py

@@ -6,10 +6,12 @@ from multiprocessing import Process
 from mq_http_sdk.mq_client import *
 from mq_http_sdk.mq_consumer import *
 from mq_http_sdk.mq_exception import MQExceptionBase
+
 sys.path.append(os.getcwd())
 from common.public import task_fun_mq, get_consumer, ack_message
 from common.common import Common
 from common.scheduling_db import MysqlHelper
+from common import AliyunLogger
 from gongzhonghao.gongzhonghao_author.gongzhonghao_author import GongzhonghaoAuthor
 
 
@@ -45,6 +47,16 @@ def main(log_type, crawler, topic_name, group_id, env):
                                            f'WaitSeconds:{wait_seconds}\n'
                                            f'TopicName:{topic_name}\n'
                                            f'MQConsumer:{group_id}')
+    AliyunLogger.logging(
+        code="1000",
+        platform=crawler,
+        mode=log_type,
+        env=env,
+        message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                f'WaitSeconds:{wait_seconds}\n'
+                f'TopicName:{topic_name}\n'
+                f'MQConsumer:{group_id}'
+    )
     while True:
         try:
             # 长轮询消费消息。
@@ -70,6 +82,23 @@ def main(log_type, crawler, topic_name, group_id, env):
                                                        f"NextConsumeTime:{msg.next_consume_time}\n"
                                                        f"ReceiptHandle:{msg.receipt_handle}\n"
                                                        f"Properties:{msg.properties}")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"Receive\n"
+                            f"MessageId:{msg.message_id}\n"
+                            f"MessageBodyMD5:{msg.message_body_md5}\n"
+                            f"MessageTag:{msg.message_tag}\n"
+                            f"ConsumedTimes:{msg.consumed_times}\n"
+                            f"PublishTime:{msg.publish_time}\n"
+                            f"Body:{msg.message_body}\n"
+                            f"NextConsumeTime:{msg.next_consume_time}\n"
+                            f"ReceiptHandle:{msg.receipt_handle}\n"
+                            f"Properties:{msg.properties}"
+                )
+
                 # ack_mq_message
                 ack_message(log_type=log_type, crawler=crawler, recv_msgs=recv_msgs, consumer=consumer)
 
@@ -77,11 +106,24 @@ def main(log_type, crawler, topic_name, group_id, env):
                 task_dict = task_fun_mq(msg.message_body)['task_dict']
                 Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
                 Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
-
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="f调度任务:{task_dict}"
+                )
                 # 解析 rule_dict
                 rule_dict = task_fun_mq(msg.message_body)['rule_dict']
                 Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}\n")
                 Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}\n")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"抓取规则:{rule_dict}\n"
+                )
 
                 # 解析 user_list
                 task_id = task_dict['id']
@@ -96,14 +138,21 @@ def main(log_type, crawler, topic_name, group_id, env):
                     crawler_num += 1
                 Common.logger(log_type, crawler).info(f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务")
                 Common.logging(log_type, crawler, env, f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务")
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"共{user_num}个公众号,需要启动{crawler_num}个脚本任务"
 
+                )
                 # 多进程并行抓取
                 processes = []
                 for i in range(crawler_num):
                     start = i * chunk_size
                     end = min((i + 1) * chunk_size, user_num + 1)
                     process = Process(target=get_author_videos, args=(
-                    f"{log_type}{i + 1}", crawler, i + 1, task_dict, rule_dict, user_list[start:end], env))
+                        f"{log_type}{i + 1}", crawler, i + 1, task_dict, rule_dict, user_list[start:end], env))
                     process.start()
                     processes.append(process)
 
@@ -115,10 +164,25 @@ def main(log_type, crawler, topic_name, group_id, env):
             if err.type == "MessageNotExist":
                 Common.logger(log_type, crawler).info(f"No new message! RequestId:{err.req_id}\n")
                 Common.logging(log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n")
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n"
+                )
                 continue
 
             Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n")
             Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n")
+            AliyunLogger.logging(
+                code="2000",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n"
+            )
+
             time.sleep(2)
             continue
 
@@ -135,4 +199,4 @@ if __name__ == "__main__":
          crawler=args.crawler,
          topic_name=args.topic_name,
          group_id=args.group_id,
-         env=args.env)
+         env=args.env)