Procházet zdrojové kódy

update 二期开发

lierqiang před 2 roky
rodič
revize
e2744cb3a0

+ 100 - 0
common/db/mysql_help_new.py

@@ -0,0 +1,100 @@
+# -*- coding: utf-8 -*-
+# @Author: wangkun
+# @Time: 2023/2/2
+"""
+数据库连接及操作
+"""
+import os
+import logging
+import pymysql
+from dotenv import load_dotenv
+
+load_dotenv(verbose=True)
+env = os.getenv('env')
+
+
+class MysqlHelper(object):
+
+    def __init__(self):
+        if env == 'hk':
+            # 创建一个 Connection 对象,代表了一个数据库连接
+            connection = pymysql.connect(
+                host="rm-j6cz4c6pt96000xi3.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
+                # host="rm-j6cz4c6pt96000xi3lo.mysql.rds.aliyuncs.com",# 数据库IP地址,外网地址
+                port=3306,  # 端口号
+                user="crawler",  # mysql用户名
+                passwd="crawler123456@",  # mysql用户登录密码
+                db="piaoquan-crawler",  # 数据库名
+                # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+                charset="utf8")
+        elif env == 'prod':
+            # 创建一个 Connection 对象,代表了一个数据库连接
+            connection = pymysql.connect(
+                host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
+                # host="rm-bp1159bu17li9hi94ro.mysql.rds.aliyuncs.com",# 数据库IP地址,外网地址
+                port=3306,  # 端口号
+                user="crawler",  # mysql用户名
+                passwd="crawler123456@",  # mysql用户登录密码
+                db="piaoquan-crawler",  # 数据库名
+                # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+                charset="utf8")
+        else:
+            # 创建一个 Connection 对象,代表了一个数据库连接
+            connection = pymysql.connect(
+                host="rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
+                # host="rm-bp1k5853td1r25g3ndo.mysql.rds.aliyuncs.com",  # 数据库IP地址,外网地址
+                port=3306,  # 端口号
+                user="crawler",  # mysql用户名
+                passwd="crawler123456@",  # mysql用户登录密码
+                db="piaoquan-crawler",  # 数据库名
+                # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+                charset="utf8")
+        self.connection = connection
+
+        self.cursor = connection.cursor(cursor=pymysql.cursors.DictCursor)
+
+    def get_values(self, sql):
+        try:
+            self.cursor.execute(sql)
+            # fetchall方法返回的是一个元组,里面每个元素也是元组,代表一行记录
+            data = self.cursor.fetchall()
+            # 关闭数据库连接
+            # self.connection.close()
+            # 返回查询结果,元组
+            return data
+        except Exception as e:
+            logging.error(f"get_values异常:{e}\n")
+
+    def insert_values(self, sql, value):
+        try:
+            # 连接数据库
+            # 执行 sql 语句
+            self.cursor.execute(sql, value)
+            task_id = self.connection.insert_id()
+            self.connection.commit()
+            # 关闭数据库连接
+            self.connection.close()
+            # 返回查询结果,元组
+            return task_id
+        except Exception as e:
+            logging.error(f"insert_values异常:{e}\n")
+
+    def update_values(self, sql):
+        try:
+            # 执行 sql 语句
+            self.cursor.execute(sql)
+            # 注意 一定要commit,否则添加数据不生效
+            self.connection.commit()
+            self.connection.close()
+            return True
+        except Exception as e:
+            logging.error(f"update_values异常,进行回滚操作:{e}\n")
+            # 发生错误时回滚
+            self.connection.rollback()
+            self.connection.close()
+            return False
+        # 关闭数据库连接
+
+
+if __name__ == "__main__":
+    MysqlHelper()

+ 0 - 0
conf/__init__.py


binární
conf/__pycache__/__init__.cpython-310.pyc


binární
conf/__pycache__/config.cpython-310.pyc


+ 27 - 0
conf/config.py

@@ -0,0 +1,27 @@
+import os, sys
+from dotenv import load_dotenv
+
+load_dotenv(verbose=True)
+env = os.getenv('env')
+
+sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
+
+
+def get_config():
+    conf = {
+        'dev': {
+            'media_url': 'https://videotest.yishihui.com/longvideoapi/user/virtual/crawler/registerVirtualUser',
+            'our_url': 'https://testadmin.piaoquantv.com/ums/user/{}/post',
+            'select_media_url': 'https://testadmin.piaoquantv.com/manager/user/info',
+            'media_main_url': 'https://testadmin.piaoquantv.com/ums/user/{}/post'
+
+        },
+        'prod': {
+            'media_url': 'http://longvideoapi-internal.piaoquantv.com/longvideoapi/user/virtual/crawler/registerVirtualUser',
+            'our_url': 'https://admin.piaoquantv.com/ums/user/{}/post',
+            'select_media_url': 'https://admin.piaoquantv.com/manager/user/info',
+            'media_main_url': 'https://admin.piaoquantv.com/ums/user/{}/post'
+        },
+
+    }
+    return conf[env]

+ 227 - 50
server/conf_task.py

@@ -1,17 +1,89 @@
+import copy
+import logging
 import os
 import sys
 import time
-
 import requests
 from flask import Flask, request
 from flask import jsonify
 
 sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
-from common.db.mysql_help import MysqlHelper
-from user_spider.user_info import *
+from common.db.mysql_help_new import MysqlHelper
+from conf.config import get_config
 
 app = Flask(__name__)
 app.config['JSON_AS_ASCII'] = False
+# mysql实例
+MysqlHelper = MysqlHelper()
+conf = get_config()
+
+
+@app.route("/v1/crawler/task/getcategory", methods=["GET"])
+def getCategory():
+    sql = f'select id, content_category from crawler_content_category'
+    result = MysqlHelper.get_values(sql)
+    return jsonify({'code': 200, 'data': result})
+
+
+@app.route("/v1/crawler/task/getmodename", methods=["GET"])
+def getModeName():
+    sql = f'select id, mode_name from crawler_mode'
+    result = MysqlHelper.get_values(sql)
+    return jsonify({'code': 200, 'data': result})
+
+
+@app.route("/v1/crawler/task/getmodeboard", methods=["GET"])
+def getModeBoard():
+    sql = f'select id, mode_board from crawler_board'
+    result = MysqlHelper.get_values(sql)
+    return jsonify({'code': 200, 'data': result})
+
+
+@app.route("/v1/crawler/user/findmedia", methods=["GET"])
+def getMediaInfo():
+    data = request.args.to_dict()
+    task_id = data['task_id']
+    sql = f'select * from crawler_author_map where task_id={task_id}'
+    result = MysqlHelper.get_values(sql)
+    task_user_info = []
+    for task_info in result:
+        media_id = task_info['media_id']
+
+        media_info = requests.get(url=conf['select_media_url'], params={'uid': media_id}).json()['content']
+        media_name = media_info['longvideoNickName'] if media_info['longvideoNickName'] else media_info['nickName']
+        nick_name = task_info['nick_name']
+        spider_link = task_info['spider_link']
+        create_user_time = task_info['create_user_time']
+        media_data = dict(
+            media_name=media_name,
+            nick_name=nick_name,
+            spider_link=spider_link,
+            media_id={'media_id': media_id, 'media_url': conf['media_main_url'].format(media_id)},
+            create_user_time=create_user_time
+        )
+        task_user_info.append(media_data)
+    return jsonify({'code': 200, 'data': task_user_info})
+
+
+@app.route("/v1/crawler/task/findtask", methods=["GET"])
+def getTaskUserInfo():
+    # 根据条件查找任务
+    data = request.args.to_dict()
+    values = ''
+    for k, v in data.items():
+        if isinstance(v, int):
+            values += f'{k}={v} and '
+        else:
+            values += f'{k}="{v}" and '
+
+    sql = f"select task_id from crawler_author_map where {values[:-4]}"  # [:-1]是为了去掉末尾的逗号
+    res = MysqlHelper.get_values(sql)
+    task_id = res['task_id']
+
+    sql = f'select task_name, source, task_type, create_task_user, insert_time, update_task_user, update_time from crawler_task where task_id={task_id} '
+    task_info = MysqlHelper.get_values(sql)
+
+    return jsonify({'code': 200, 'data': task_info})
 
 
 # 只接受get方法访问
@@ -50,24 +122,33 @@ def getSource():
     return jsonify({'code': '200', 'result': source_list})
 
 
+@app.route("/v1/crawler/task/checkrepeat", methods=["POST"])
+def get_repeat_list():
+    data = request.json
+    # 字段转换
+    spider_links = data.get('spider_link')
+    repeat_list = list()
+    # 判断是否为重复名单
+    for spider_link in spider_links:
+        if isinstance(spider_link, int):
+            s_sql = f"""select spider_link from crawler_author_map where spider_link={spider_link}"""
+        else:
+            s_sql = f"""select spider_link from crawler_author_map where spider_link='{spider_link}'"""
+        result = MysqlHelper.get_values(s_sql)
+        if result:
+            repeat_list.append(spider_link)
+    if repeat_list:
+        return jsonify({'code': 400, 'message': '名单重复', 'repeat_list': repeat_list})
+    else:
+
+        return jsonify({'code': 200, 'message': '抓取名单校验通过', 'repeat_list': repeat_list})
+
+
 @app.route("/v1/crawler/task/insert", methods=["POST"])
 def insertTask():
     try:
         data = request.json
-        outer_info = data.get('spider_link')
-        source = data.get('source')
-        exist_outer_info = list()
-        s_sql = f"""select spider_link from crawler_task where source='{source}'"""
-        result = MysqlHelper.get_values(s_sql)
-        if result:
-            for outer_link in outer_info:
-                for link_tuple in result:
-                    if outer_link in link_tuple[0]:
-                        exist_outer_info.append(outer_link)
-            if exist_outer_info:
-                return jsonify({'code': 200, 'message': '名单重复', 'repeat_list': exist_outer_info})
-
-        # 字段转换
+        user_data = copy.deepcopy(data)
 
         tag_name_list = []
         content_tag_list = []
@@ -90,6 +171,9 @@ def insertTask():
         data['spider_link'] = str(data['spider_link'])
         data['spider_rule'] = str(data['spider_rule'])
 
+        data['user_tag_info'] = str(user_tag)
+        data['content_tag_info'] = str(user_content_tag)
+
         data['user_tag'] = ','.join(str(i) for i in tag_name_list)
         data['user_content_tag'] = ','.join(str(i) for i in content_tag_list)
         # data['crawler_interval'] = data.pop('interval')
@@ -97,11 +181,13 @@ def insertTask():
         keys = ','.join(data.keys())
         values = ','.join(['%s'] * len(data))
         sql = 'insert into {table}({keys}) VALUES({values})'.format(table='crawler_task', keys=keys, values=values)
-        MysqlHelper.insert_values(sql, tuple(data.values()))
+        task_id = MysqlHelper.insert_values(sql, tuple(data.values()))
+        if task_id:
+            success_list, fail_list = create_uid(user_data, task_id)
+            return jsonify(
+                {'code': 200, 'message': 'task create success', 'success_list': success_list, 'fail_list': fail_list})
     except Exception as e:
-        return jsonify({'code': 400, 'message': '任务写入失败'})
-
-    return jsonify({'code': 200, 'message': 'task create success'})
+        return jsonify({'code': 500, 'message': '任务写入失败,原因:{e}'})
 
 
 @app.route("/v1/crawler/task/gettask", methods=["GET"])
@@ -112,26 +198,30 @@ def getAllTask():
         offset = int(get_data.get('offset', 10))
         start_count = (page * offset) - offset
         end_count = page * offset
-        sql = f"""select task_id, task_name, insert_time from crawler_task order by update_time desc limit {start_count}, {end_count} """
+        sql = f"""select task_name, source, task_type, create_task_user, insert_time, update_task_user, update_time  from crawler_task order by update_time desc limit {start_count}, {end_count} """
         result = MysqlHelper.get_values(sql)
         if not result:
             return jsonify({'code': '200', 'result': [], 'message': '没有更多任务'})
-        source_list = list()
-        for task_id, task_name, insert_time in result:
-            data = dict(
-                task_id=task_id,
-                task_name=task_name,
-                insert_time=insert_time * 1000
+        task_list = list()
+        for task_info in result:
+            task_data = dict(
+                task_name=task_info['task_name'],
+                source=task_info['source'],
+                task_type=task_info['task_type'],
+                create_task_user=task_info['create_task_user'],
+                insert_time=task_info['insert_time'] * 1000,
+                update_task_user=task_info['update_task_user'],
+                update_time=task_info['update_time']
             )
-            source_list.append(data)
+            task_list.append(task_data)
 
         t_sql = f"""select count(*) from crawler_task"""
         t_res = MysqlHelper.get_values(t_sql)
-        total = t_res[0][0]
+        total = t_res[0]['count(*)']
     except Exception as e:
         return jsonify({"code": "400", 'message': "任务列表获取失败"})
 
-    return jsonify({'code': '200', 'result': source_list, 'total': total})
+    return jsonify({'code': '200', 'result': task_list, 'total': total})
 
 
 @app.route("/v1/crawler/task/getone", methods=["GET"])
@@ -139,27 +229,32 @@ def getOneTask():
     try:
         get_data = request.args.to_dict()
         task_id = get_data['task_id']
-        sql = f'select task_id, spider_link from crawler_task where task_id={task_id}'
+        sql = f'select * from crawler_task where task_id={task_id}'
         result = MysqlHelper.get_values(sql)
+
         if not result:
-            return jsonify({'code': '200', 'result': [], 'message': 'no data'})
-        for task_id, spider_link in result:
-            data = dict(
-                task_id=task_id,
-                spider_link=eval(spider_link),
-            )
+            return jsonify({'code': '400', 'result': [], 'message': 'no data'})
+
     except Exception as e:
-        return jsonify({'code': '400', "message": "获取任务信息失败"})
+        return jsonify({'code': '500', "message": "获取任务信息失败"})
 
-    return jsonify({'code': '200', 'result': data})
+    return jsonify({'code': '200', 'result': result})
 
 
 @app.route("/v1/crawler/task/update", methods=["POST"])
 def updateTask():
     try:
-        task_id = request.json.get('task_id')
-        spider_link = request.json.get('spider_link')
-        sql = f"""UPDATE crawler_task SET spider_link="{spider_link}", update_time={int(time.time())}  where task_id = {task_id}"""
+        data = request.json
+        task_id = data.get('task_id')
+        task_info = data.get('task_info')
+        values = ''
+        for k, v in task_info.items():
+            if isinstance(v, int):
+                values += f'{k}={v},'
+            else:
+                values += f'{k}="{v}",'
+
+        sql = f'update crawler_task set {values[:-1]} where task_id={task_id}'
         result = MysqlHelper.update_values(sql)
         if result:
             return jsonify({'code': 200, 'message': 'task update success'})
@@ -170,16 +265,98 @@ def updateTask():
         return jsonify({'code': 400, 'message': '任务更新失败'})
 
 
-# def get_user_info(source):
-#     source_spider = {
-#         'xigua': xigua_user_info
-#     }
-#     return source_spider.get(source)
+def create_uid(task, task_id):
+    spider_link = task.get('spider_link')
+    source = task.get('source')
+    task_type = task.get('task_type')
+    applets_status = task.get('applets_status')
+    app_status = task.get('app_status')
+    user_tag = task.get('user_tag')
+    user_content_tag = task.get('user_content_tag')
+
+    mode_name_id = task['mode_name_id']
+    mode_board_id = task['mode_board_id']
+    content_category_id = task['content_category_id']
+
+    mn_sql = f'select * from crawler_mode_name where id={mode_name_id}'
+    mode_name_list = MysqlHelper.get_values(mn_sql)
+    mb_sql = f'select * from crawler_mode_board where id={mode_board_id}'
+    mode_board_list = MysqlHelper.get_values(mb_sql)
+    cc_sql = f'select * from crawler_content_category where id={content_category_id}'
+    content_category_list = MysqlHelper.get_values(cc_sql)
+    task['mode_name_str'] = mode_name_list[0]['mode_name']
+    task['mode_board_str'] = mode_board_list[0]['mode_board']
+    task['content_category_str'] = content_category_list[0]['content_category']
+    mode_value_str = f"{task['mode_name_str']},{task['mode_board_str']}"
+    content_category_str = f"{task['content_category_str']}"
+
+    success_list = list()
+    fail_list = list()
+
+    tag_name_list = list()
+    content_tag_list = list()
+
+    for tag in user_tag:
+        tag_name_list.append(tag['tagName'])
+    for tag in user_content_tag:
+        content_tag_list.append(tag['tagName'])
+    user_tags = ','.join(str(i) for i in tag_name_list)
+    user_content_tags = ','.join(str(i) for i in content_tag_list)
+    for author_url in spider_link:
+        now_time = int(time.time())
+        time_array = time.localtime(now_time)
+        str_time = time.strftime("%Y-%m-%d", time_array)
+        post_data = {
+            # 'count': 1,     # (必须)账号个数:传1
+            # 'accountType': 4,   # (必须)账号类型 :传 4 app虚拟账号
+            'pwd': '',  # 密码 默认 12346
+            'nickName': '',  # 昵称  默认 vuser......
+            'avatarUrl': '',
+            # 头像Url  默认 http://weapppiccdn.yishihui.com/resources/images/pic_normal.png
+            'tagName': f"{'spider'},{user_tags},{mode_value_str},{content_category_str},{str_time}",  # 多条数据用英文逗号分割
+        }
+        try:
+            response = requests.post(url=conf['media_url'], params=post_data)
+            media_id = response.json()['data']
+        except Exception as e:
+            logging.warning(f'创建账户:{spider_link},失败,原因:{e}')
+            fail_list.append(author_url)
+            continue
+
+        data = dict(
+            spider_link=author_url,
+            media_id=media_id,
+            source=source,
+            task_type=task_type,
+            applets_status=applets_status,
+            app_status=app_status,
+            user_tag=user_tags,
+            user_content_tag=user_content_tags,
+            insert_time=int(time.time()),
+            update_time=int(time.time()),
+            create_user_time=now_time,
+            mode_name_str=task['mode_name_str'],
+            mode_board_str=task['mode_board_str'],
+            content_category_str=task['content_category_str'],
+            mode_value_str=mode_value_str,
+            task_id=task_id
+        )
+        keys = ','.join(data.keys())
+        values = ','.join(['%s'] * len(data))
+        table = 'crawler_author_map'
+        sql = f"""insert into {table}({keys}) VALUES({values})"""
+        MysqlHelper.insert_values(sql, tuple(data.values()))
+        uer_info = dict(
+            outer_id=author_url,
+            uid=media_id
+        )
+        success_list.append(uer_info)
+
+    return success_list, fail_list
 
 
 @app.route("/v1/crawler/author/create", methods=["POST"])
 def createUser():
-    get_media_url = 'http://longvideoapi-internal.piaoquantv.com/longvideoapi/user/virtual/crawler/registerVirtualUser'
     spider_link = request.json.get('spider_link')
     source = request.json.get('source')
     task_type = request.json.get('task_type')
@@ -214,7 +391,7 @@ def createUser():
                     # 头像Url  默认 http://weapppiccdn.yishihui.com/resources/images/pic_normal.png
                     'tagName': user_tags,  # 多条数据用英文逗号分割
                 }
-                response = requests.post(url=get_media_url, params=post_data)
+                response = requests.post(url=conf['media_url'], params=post_data)
                 media_id = response.json()['data']
 
                 data = dict(