瀏覽代碼

update 小年糕rule

zhangyong 1 年之前
父節點
當前提交
8012821a9c
共有 2 個文件被更改,包括 48 次插入41 次删除
  1. 11 5
      main/process_mq.sh
  2. 37 36
      xiaoniangaoplus/xiaoniangaoplus_main/run_xngrule_recommend.py

+ 11 - 5
main/process_mq.sh

@@ -1,9 +1,9 @@
-#! /bin/bash
+#! /bin/bash -x
 
-crawler=$1  # 哪款爬虫
-path=$2     # 爬虫路径
-log_type=$3 # 爬虫策略
-env=$4      # 环境
+crawler="xngrule"  # 哪款爬虫
+path="xiaoniangaoplus"     # 爬虫路径
+log_type="recommend" # 爬虫策略
+env="prod"      # 环境
 
 if [ ${env} = "dev" ];then
   piaoquan_crawler_dir=/Users/tzld/Desktop/piaoquan_crawler/
@@ -63,6 +63,12 @@ elif [ ${crawler} = "xngplus" ] && [ ${log_type} = "recommend" ];then
   python=python3
   log_path=${piaoquan_crawler_dir}main/main_logs/process-mq-$(date +%Y-%m-%d).log
 
+elif [ ${crawler} = "xngrule" ] && [ ${log_type} = "recommend" ];then
+  piaoquan_crawler_dir=/Users/tzld/Desktop/piaoquan_crawler/
+  profile_path=/.base_profile
+  python=python3
+  log_path=${piaoquan_crawler_dir}main/main_logs/process-mq-$(date +%Y-%m-%d).log
+
 else
   piaoquan_crawler_dir=/root/piaoquan_crawler/
   profile_path=/etc/profile

+ 37 - 36
xiaoniangaoplus/xiaoniangaoplus_main/run_xngrule_recommend.py

@@ -6,10 +6,11 @@ import multiprocessing
 
 from mq_http_sdk.mq_client import *
 from mq_http_sdk.mq_consumer import *
+sys.path.append(os.getcwd())
+
 from mq_http_sdk.mq_exception import MQExceptionBase
 
 
-sys.path.append(os.getcwd())
 from common.common import Common
 from common.public import get_consumer, ack_message, task_fun_mq
 from common.scheduling_db import MysqlHelper
@@ -25,7 +26,7 @@ def run(args1, args2, args3, args4, args5):
                                env=args5)
 
 
-class XngRuleMain:
+class Main:
     @classmethod
     def main(cls, log_type, crawler, topic_name, group_id, env):
         consumer = get_consumer(topic_name, group_id)
@@ -38,10 +39,10 @@ class XngRuleMain:
                                               f'WaitSeconds:{wait_seconds}\n'
                                               f'TopicName:{topic_name}\n'
                                               f'MQConsumer:{group_id}')
-        # Common.logging(log_type, crawler, env, f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
-        #                                        f'WaitSeconds:{wait_seconds}\n'
-        #                                        f'TopicName:{topic_name}\n'
-        #                                        f'MQConsumer:{group_id}')
+        Common.logging(log_type, crawler, env, f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+                                               f'WaitSeconds:{wait_seconds}\n'
+                                               f'TopicName:{topic_name}\n'
+                                               f'MQConsumer:{group_id}')
         while True:
             try:
                 # 长轮询消费消息。
@@ -57,16 +58,16 @@ class XngRuleMain:
                                                           f"NextConsumeTime:{msg.next_consume_time}\n"
                                                           f"ReceiptHandle:{msg.receipt_handle}\n"
                                                           f"Properties:{msg.properties}")
-                    # Common.logging(log_type, crawler, env, f"Receive\n"
-                    #                                        f"MessageId:{msg.message_id}\n"
-                    #                                        f"MessageBodyMD5:{msg.message_body_md5}\n"
-                    #                                        f"MessageTag:{msg.message_tag}\n"
-                    #                                        f"ConsumedTimes:{msg.consumed_times}\n"
-                    #                                        f"PublishTime:{msg.publish_time}\n"
-                    #                                        f"Body:{msg.message_body}\n"
-                    #                                        f"NextConsumeTime:{msg.next_consume_time}\n"
-                    #                                        f"ReceiptHandle:{msg.receipt_handle}\n"
-                    #                                        f"Properties:{msg.properties}")
+                    Common.logging(log_type, crawler, env, f"Receive\n"
+                                                           f"MessageId:{msg.message_id}\n"
+                                                           f"MessageBodyMD5:{msg.message_body_md5}\n"
+                                                           f"MessageTag:{msg.message_tag}\n"
+                                                           f"ConsumedTimes:{msg.consumed_times}\n"
+                                                           f"PublishTime:{msg.publish_time}\n"
+                                                           f"Body:{msg.message_body}\n"
+                                                           f"NextConsumeTime:{msg.next_consume_time}\n"
+                                                           f"ReceiptHandle:{msg.receipt_handle}\n"
+                                                           f"Properties:{msg.properties}")
                     # ack_mq_message
                     ack_message(log_type=log_type, crawler=crawler, recv_msgs=recv_msgs, consumer=consumer)
 
@@ -81,13 +82,13 @@ class XngRuleMain:
                         our_uid_list.append(user["uid"])
                     our_uid = random.choice(our_uid_list)
                     Common.logger(log_type, crawler).info(f"调度任务:{task_dict}")
-                    # Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
+                    Common.logging(log_type, crawler, env, f"调度任务:{task_dict}")
                     Common.logger(log_type, crawler).info(f"抓取规则:{rule_dict}")
-                    # Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}")
+                    Common.logging(log_type, crawler, env, f"抓取规则:{rule_dict}")
                     Common.logger(log_type, crawler).info(f"用户列表:{user_list}\n")
-                    # Common.logging(log_type, crawler, env, f"用户列表:{user_list}\n")
+                    Common.logging(log_type, crawler, env, f"用户列表:{user_list}\n")
                     Common.logger(log_type, crawler).info(f'开始抓取:{task_dict["taskName"]}\n')
-                    # Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n')
+                    Common.logging(log_type, crawler, env, f'开始抓取:{task_dict["taskName"]}\n')
 
                     process = multiprocessing.Process(
                         target=run,
@@ -112,31 +113,31 @@ class XngRuleMain:
 
 
                     Common.logger(log_type, crawler).info('抓取一轮结束\n')
-                    # Common.logging(log_type, crawler, env, '抓取一轮结束\n')
+                    Common.logging(log_type, crawler, env, '抓取一轮结束\n')
 
             except MQExceptionBase as err:
                 # Topic中没有消息可消费。
                 if err.type == "MessageNotExist":
                     Common.logger(log_type, crawler).info(f"No new message! RequestId:{err.req_id}\n")
-                    # Common.logging(log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n")
+                    Common.logging(log_type, crawler, env, f"No new message! RequestId:{err.req_id}\n")
                     continue
 
                 Common.logger(log_type, crawler).info(f"Consume Message Fail! Exception:{err}\n")
-                # Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n")
+                Common.logging(log_type, crawler, env, f"Consume Message Fail! Exception:{err}\n")
                 time.sleep(2)
                 continue
 
 
-# if __name__ == "__main__":
-#     parser = argparse.ArgumentParser()  ## 新建参数解释器对象
-#     parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
-#     parser.add_argument('--crawler')  ## 添加参数
-#     parser.add_argument('--topic_name')  ## 添加参数
-#     parser.add_argument('--group_id')  ## 添加参数
-#     parser.add_argument('--env')  ## 添加参数
-#     args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
-#     XngRuleMain.main(log_type=args.log_type,
-#                               crawler=args.crawler,
-#                               topic_name=args.topic_name,
-#                               group_id=args.group_id,
-#                               env=args.env)
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument('--log_type', type=str)  ## 添加参数,注明参数类型
+    parser.add_argument('--crawler')  ## 添加参数
+    parser.add_argument('--topic_name')  ## 添加参数
+    parser.add_argument('--group_id')  ## 添加参数
+    parser.add_argument('--env')  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+    Main.main(log_type=args.log_type,
+                              crawler=args.crawler,
+                              topic_name=args.topic_name,
+                              group_id=args.group_id,
+                              env=args.env)