Przeglądaj źródła

2023-12-19: MySQL和MQ相关代码编写

罗俊辉 1 rok temu
rodzic
commit
e4d40fc73c

+ 0 - 30
application/common/messageQueue/__init__.py

@@ -1,30 +0,0 @@
-import json
-from mq_http_sdk.mq_exception import MQExceptionBase
-from mq_http_sdk.mq_producer import TopicMessage
-from mq_http_sdk.mq_client import MQClient
-
-from application.common.log import Local
-
-
-class MQ:
-    instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
-
-    def __init__(self, topic_name) -> None:
-        self.mq_client = MQClient("http://1894469520484605.mqrest.cn-qingdao-public.aliyuncs.com",
-                                  "LTAI4G7puhXtLyHzHQpD6H7A",
-                                  "nEbq3xWNQd1qLpdy2u71qFweHkZjSG")
-        self.producer = self.mq_client.get_producer(self.instance_id, topic_name)
-
-    def send_msg(self, video_dict):
-        strategy = video_dict["strategy"]
-        platform = video_dict["platform"]
-        try:
-            msg = TopicMessage(json.dumps(video_dict))
-            message_key = "{}-{}-{}".format(platform, strategy, video_dict['out_video_id'])
-            # msg.set_message_key(platform + "-" + strategy + "-" + video_dict["out_video_id"])
-            msg.set_message_key(message_key)
-            re_msg = self.producer.publish_message(msg)
-            Local.logger(strategy, platform).info("Publish Message Succeed. MessageID:%s, BodyMD5:%s\n" %
-                                                  (re_msg.message_id, re_msg.message_body_md5))
-        except MQExceptionBase as e:
-            Local.logger(strategy, platform).error("Publish Message Fail. Exception:%s\n" % e)

+ 25 - 0
application/common/messageQueue/consumer.py

@@ -0,0 +1,25 @@
+from mq_http_sdk.mq_client import *
+
+
+def get_consumer(topic_name, group_id):
+    # 初始化client。
+    mq_client = MQClient(
+        # 设置HTTP协议客户端接入点,进入云消息队列 RocketMQ 版控制台实例详情页面的接入点区域查看。
+        "http://1894469520484605.mqrest.cn-qingdao-public.aliyuncs.com",
+        # AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey。
+        "LTAI4G7puhXtLyHzHQpD6H7A",
+        # AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey。
+        "nEbq3xWNQd1qLpdy2u71qFweHkZjSG",
+    )
+    # 消息所属的Topic,在云消息队列 RocketMQ 版控制台创建。
+    # topic_name = "${TOPIC}"
+    topic_name = str(topic_name)
+    # 您在云消息队列 RocketMQ 版控制台创建的Group ID。
+    # group_id = "${GROUP_ID}"
+    group_id = str(group_id)
+    # Topic所属的实例ID,在云消息队列 RocketMQ 版控制台创建。
+    # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在云消息队列 RocketMQ 版控制台的实例详情页面查看。
+    instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
+
+    consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
+    return consumer

+ 29 - 0
application/common/messageQueue/mq.py

@@ -0,0 +1,29 @@
+import json
+from mq_http_sdk.mq_exception import MQExceptionBase
+from mq_http_sdk.mq_producer import TopicMessage
+from mq_http_sdk.mq_client import MQClient
+
+from application.common.log import Local
+
+
+class MQ:
+    instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
+
+    def __init__(self, topic_name) -> None:
+        self.mq_client = MQClient("http://1894469520484605.mqrest.cn-qingdao-public.aliyuncs.com",
+                                  "LTAI4G7puhXtLyHzHQpD6H7A",
+                                  "nEbq3xWNQd1qLpdy2u71qFweHkZjSG")
+        self.producer = self.mq_client.get_producer(self.instance_id, topic_name)
+
+    def send_msg(self, video_dict):
+        strategy = video_dict["strategy"]
+        platform = video_dict["platform"]
+        try:
+            msg = TopicMessage(json.dumps(video_dict))
+            message_key = "{}-{}-{}".format(platform, strategy, video_dict['out_video_id'])
+            msg.set_message_key(message_key)
+            re_msg = self.producer.publish_message(msg)
+            Local.logger(strategy, platform).info("Publish Message Succeed. MessageID:%s, BodyMD5:%s\n" %
+                                                  (re_msg.message_id, re_msg.message_body_md5))
+        except MQExceptionBase as e:
+            Local.logger(strategy, platform).error("Publish Message Fail. Exception:%s\n" % e)

+ 1 - 0
application/common/mysql/__init__.py

@@ -0,0 +1 @@
+from .mysql_helper import MysqlHelper

+ 95 - 0
application/common/mysql/mysql_helper.py

@@ -0,0 +1,95 @@
+# -*- coding: utf-8 -*-
+# @Author: luojunhui
+# @Time: 2023/12/19
+"""
+数据库连接及操作
+"""
+import redis
+import pymysql
+from application.common.log import Local
+from application.config.mysql_config import env_dict
+
+
+class MysqlHelper(object):
+
+    def __init__(self, env, mode, platform, action=''):
+        mysql_config = env_dict[env]
+        self.connection = pymysql.connect(
+            host=mysql_config['host'],  # 数据库IP地址,内网地址
+            port=mysql_config['port'],  # 端口号
+            user=mysql_config['user'],  # mysql用户名
+            passwd=mysql_config['passwd'],  # mysql用户登录密码
+            db=mysql_config['db'],  # 数据库名
+            charset=mysql_config['charset']  # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+        )
+        self.mode = mode
+        self.platform = platform
+        self.action = action
+
+    def select(self, sql):
+        cursor = self.connection.cursor()
+        cursor.execute(sql)
+        data = cursor.fetchall()
+        return data
+
+    def update(self, sql):
+        cursor = self.connection.cursor()
+        try:
+            res = cursor.execute(sql)
+            self.connection.commit()
+            return res
+        except Exception as e:
+            Local.logger(self.mode, self.platform).error(f"update_values异常,进行回滚操作:{e}\n")
+            self.connection.rollback()
+
+    def close(self):
+        self.connection.close()
+
+
+class RedisHelper:
+    @classmethod
+    def connect_redis(cls, env):
+        if env == 'hk':
+            redis_pool = redis.ConnectionPool(
+                # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
+                # host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 测试地址
+                host='r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com',  # 外网地址
+                port=6379,
+                db=2,
+                password='Wqsd@2019'
+            )
+            redis_conn = redis.Redis(connection_pool=redis_pool)
+        elif env == 'prod':
+            redis_pool = redis.ConnectionPool(
+                host='r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com',  # 内网地址
+                # host='r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com',  # 外网地址
+                port=6379,
+                db=2,
+                password='Wqsd@2019'
+            )
+            redis_conn = redis.Redis(connection_pool=redis_pool)
+        else:
+            redis_pool = redis.ConnectionPool(
+                # host='r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com',  # 内网地址
+                host='r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com',  # 外网地址
+                port=6379,
+                db=2,
+                password='Qingqu2019'
+            )
+            redis_conn = redis.Redis(connection_pool=redis_pool)
+        return redis_conn
+
+    @classmethod
+    def redis_push(cls, env, task_key, data):
+        redis_conn = cls.connect_redis(env)
+        # print("开始写入数据")
+        redis_conn.lpush(task_key, data)
+        # print("数据写入完成")
+
+    @classmethod
+    def redis_pop(cls, env, task_key):
+        redis_conn = cls.connect_redis(env)
+        if redis_conn.llen(task_key) == 0:
+            return None
+        else:
+            return redis_conn.rpop(task_key)

+ 2 - 1
application/config/__init__.py

@@ -1 +1,2 @@
-from .ipconfig import ip_config
+from .ipconfig import ip_config
+from .mysql_config import env_dict

+ 36 - 0
application/config/mysql_config.py

@@ -0,0 +1,36 @@
+"""
+MySQL的配置任务
+"""
+
+
+# 香港服务器, 暂时不写
+mysql_hk = {
+    "", ""
+}
+
+# prod环境服务器地址
+mysql_prod = {
+    "host": "rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
+    # host="rm-bp1159bu17li9hi94ro.mysql.rds.aliyuncs.com",# 数据库IP地址,外网地址
+    "port": 3306,  # 端口号
+    "user":"crawler",  # mysql用户名
+    "passwd": "crawler123456@",  # mysql用户登录密码
+    "db": "piaoquan-crawler",  # 数据库名
+    "charset": "utf8mb4"  # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+}
+# 测试环境Mysql服务器地址
+mysql_dev = {
+    "host": "rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
+    # host="rm-bp1k5853td1r25g3ndo.mysql.rds.aliyuncs.com",  # 数据库IP地址,外网地址
+    "port": 3306,  # 端口号
+    "user":"crawler",  # mysql用户名
+    "passwd": "crawler123456@",  # mysql用户登录密码
+    "db": "piaoquan-crawler",  # 数据库名
+    "charset": "utf8mb4"  # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+}
+
+env_dict = {
+    "hk": mysql_hk,
+    "prod": mysql_prod,
+    "dev": mysql_dev
+}

+ 0 - 0
application/spider/crawler/__init__.py → application/spider/crawler_offline/__init__.py


+ 0 - 0
application/spider/crawler/xiaoniangao_plus.py → application/spider/crawler_offline/xiaoniangao_plus.py


+ 0 - 0
application/common/aliyun/__init__.py → application/spider/crawler_online/__init__.py


+ 1 - 1
scheduler/spider_scheduler.py

@@ -3,7 +3,7 @@ import sys
 
 sys.path.append(os.getcwd())
 
-from application.spider.crawler import *
+from application.spider.crawler_offline import *
 
 
 class SpiderHome(object):