zhangyong 3 months ago
parent
commit
03a10dc597

+ 19 - 0
Dockerfile

@@ -0,0 +1,19 @@
+FROM python:3.11-slim
+
+WORKDIR /app
+
+COPY . .
+
+ENV TZ=Asia/Shanghai
+
+RUN apt update && apt --no-install-recommends install -y wget xz-utils nscd libgl-dev libglib2.0-dev fonts-wqy-zenhei \
+    && apt-get clean && rm -rf /var/lib/apt/lists/* \
+    && pip install -r requirements.txt --no-cache-dir \
+    && wget -O /tmp/ffmpeg-7.0.2-amd64-static.tar.xz https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz \
+    && tar -xJvf /tmp/ffmpeg-7.0.2-amd64-static.tar.xz -C /usr/local/ \
+    && rm /tmp/ffmpeg-7.0.2-amd64-static.tar.xz \
+    && ln -s /usr/local/ffmpeg-7.0.2-amd64-static/ffprobe /usr/local/bin/ffprobe \
+    && ln -s /usr/local/ffmpeg-7.0.2-amd64-static/ffmpeg /usr/local/bin/ffmpeg \
+    && mkdir -p /app/cache
+
+ENTRYPOINT ["python", "/app/job_data_redis.py"]

+ 1 - 0
common/__init__.py

@@ -0,0 +1 @@
+from .redis import SyncRedisHelper

+ 69 - 0
common/aliyun_log.py

@@ -0,0 +1,69 @@
+# -*- coding: utf-8 -*-
+"""
+公共方法,包含:生成log / 删除log
+"""
+import json
+from datetime import date, timedelta
+from datetime import datetime
+from typing import Optional
+
+from aliyun.log import PutLogsRequest, LogClient, LogItem
+from loguru import logger
+
+proxies = {"http": None, "https": None}
+
+
+class AliyunLogger:
+    # 统一获取当前时间 <class 'datetime.datetime'>  2022-04-14 20:13:51.244472
+    now = datetime.now()
+    # 昨天 <class 'str'>  2022-04-13
+    yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d")
+    # 今天 <class 'datetime.date'>  2022-04-14
+    today = date.today()
+    # 明天 <class 'str'>  2022-04-15
+    tomorrow = (date.today() + timedelta(days=1)).strftime("%Y-%m-%d")
+
+    # 写入阿里云日志
+    @staticmethod
+    def logging(channel: Optional[str] = None,
+            data: Optional[str] = None,
+            description: Optional[str] = None,
+            user_id: Optional[str] = None):
+        """
+        写入阿里云日志
+        测试库: https://sls.console.aliyun.com/lognext/project/crawler-log-dev/logsearch/crawler-log-dev
+        正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod
+        """
+        accessKeyId = "LTAIWYUujJAm7CbH"
+        accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+
+        project = "crawler-log-prod"
+        logstore = "top_automatic_info"
+        endpoint = "cn-hangzhou.log.aliyuncs.com"
+        try:
+            contents = [
+                ("channel", channel),
+                ("description", description),
+                ("data", json.dumps(data, ensure_ascii=False) if data else ""),
+                ("user_id", user_id)
+            ]
+            # 创建 LogClient 实例
+            client = LogClient(endpoint, accessKeyId, accessKey)
+            log_group = []
+            log_item = LogItem()
+            log_item.set_contents(contents)
+            log_group.append(log_item)
+            # 写入日志
+            request = PutLogsRequest(
+                project=project,
+                logstore=logstore,
+                topic="",
+                source="",
+                logitems=log_group,
+                compress=False,
+            )
+
+            client.put_logs(request)
+        except Exception as e:
+            logger.error(f"[+] 阿里云日志写入失败{e}")
+

+ 379 - 0
common/feishu_utils.py

@@ -0,0 +1,379 @@
+# -*- coding: utf-8 -*-
+# @Time: 2023/12/26
+"""
+飞书表配置: token 鉴权 / 增删改查 / 机器人报警
+"""
+import json
+import os
+import sys
+import requests
+import urllib3
+from loguru import logger
+
+sys.path.append(os.getcwd())
+
+proxies = {"http": None, "https": None}
+
+
+class Feishu:
+    """
+    编辑飞书云文档
+    """
+    succinct_url = "https://w42nne6hzg.feishu.cn/sheets/"
+    # 飞书路径token
+    @classmethod
+    def spreadsheettoken(cls, crawler):
+        if crawler == "summary":
+            return "KsoMsyP2ghleM9tzBfmcEEXBnXg"
+        else:
+            return crawler
+
+
+
+    # 获取飞书api token
+    @classmethod
+    def get_token(cls):
+        """
+        获取飞书api token
+        :return:
+        """
+        url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
+        post_data = {"app_id": "cli_a13ad2afa438d00b",  # 这里账号密码是发布应用的后台账号及密码
+                     "app_secret": "4tK9LY9VbiQlY5umhE42dclBFo6t4p5O"}
+
+        try:
+            urllib3.disable_warnings()
+            response = requests.post(url=url, data=post_data, proxies=proxies, verify=False)
+            tenant_access_token = response.json()["tenant_access_token"]
+            return tenant_access_token
+        except Exception as e:
+            logger.error(f"[+] 飞书获取飞书 api token 异常:{e}")
+
+
+    # 获取表格元数据
+    @classmethod
+    def get_metainfo(cls, crawler):
+        """
+        获取表格元数据
+        :return:
+        """
+        try:
+            get_metainfo_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
+                               + cls.spreadsheettoken(crawler) + "/metainfo"
+
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+            params = {
+                "extFields": "protectedRange",  # 额外返回的字段,extFields=protectedRange时返回保护行列信息
+                "user_id_type": "open_id"  # 返回的用户id类型,可选open_id,union_id
+            }
+            urllib3.disable_warnings()
+            r = requests.get(url=get_metainfo_url, headers=headers, params=params, proxies=proxies, verify=False)
+            response = json.loads(r.content.decode("utf8"))
+            return response
+        except Exception as e:
+            logger.error(f"[+] 飞书获取表格元数据异常:{e}")
+
+    # 读取工作表中所有数据
+    @classmethod
+    def get_values_batch(cls, crawler, sheetid):
+        """
+        读取工作表中所有数据
+        :param crawler: 哪个爬虫
+        :param sheetid: 哪张表
+        :return: 所有数据
+        """
+        try:
+            get_values_batch_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
+                                   + cls.spreadsheettoken(crawler) + "/values_batch_get"
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+            params = {
+                "ranges": sheetid,
+                "valueRenderOption": "ToString",
+                "dateTimeRenderOption": "",
+                "user_id_type": "open_id"
+            }
+            urllib3.disable_warnings()
+            r = requests.get(url=get_values_batch_url, headers=headers, params=params, proxies=proxies, verify=False)
+            response = json.loads(r.content.decode("utf8"))
+            values = response["data"]["valueRanges"][0]["values"]
+            return values
+        except Exception as e:
+            logger.error(f"[+] 飞书读取工作表所有数据异常:{e}")
+
+    # 工作表,插入行或列
+    @classmethod
+    def insert_columns(cls, crawler, sheetid, majordimension, startindex, endindex):
+        """
+        工作表插入行或列
+        :param log_type: 日志路径
+        :param crawler: 哪个爬虫的云文档
+        :param sheetid:哪张工作表
+        :param majordimension:行或者列, ROWS、COLUMNS
+        :param startindex:开始位置
+        :param endindex:结束位置
+        """
+        try:
+            insert_columns_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
+                                 + cls.spreadsheettoken(crawler) + "/insert_dimension_range"
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+            body = {
+                "dimension": {
+                    "sheetId": sheetid,
+                    "majorDimension": majordimension,  # 默认 ROWS ,可选 ROWS、COLUMNS
+                    "startIndex": startindex,  # 开始的位置
+                    "endIndex": endindex  # 结束的位置
+                },
+                "inheritStyle": "AFTER"  # BEFORE 或 AFTER,不填为不继承 style
+            }
+
+            urllib3.disable_warnings()
+            r = requests.post(url=insert_columns_url, headers=headers, json=body, proxies=proxies, verify=False)
+        except Exception as e:
+            logger.error(f"[+] 飞书插入行或列异常:{e}")
+
+    # 写入数据
+    @classmethod
+    def update_values(cls, crawler, sheetid, ranges, values):
+        """
+        写入数据
+        :param log_type: 日志路径
+        :param crawler: 哪个爬虫的云文档
+        :param sheetid:哪张工作表
+        :param ranges:单元格范围
+        :param values:写入的具体数据,list
+        """
+        try:
+            update_values_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
+                                + cls.spreadsheettoken(crawler) + "/values_batch_update"
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+            body = {
+                "valueRanges": [
+                    {
+                        "range": sheetid + "!" + ranges,
+                        "values": values
+                    },
+                ],
+            }
+            urllib3.disable_warnings()
+            r = requests.post(url=update_values_url, headers=headers, json=body, proxies=proxies, verify=False)
+        except Exception as e:
+            logger.error(f"[+] 飞书写入数据异常:{e}")
+
+    # 读取单元格数据
+    @classmethod
+    def get_range_value(cls, crawler, sheetid, cell):
+        """
+        读取单元格内容
+        :param log_type: 日志路径
+        :param crawler: 哪个爬虫
+        :param sheetid: 哪张工作表
+        :param cell: 哪个单元格
+        :return: 单元格内容
+        """
+        try:
+            get_range_value_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
+                                  + cls.spreadsheettoken(crawler) + "/values/" + sheetid + "!" + cell
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+            params = {
+                "valueRenderOption": "FormattedValue",
+
+                # dateTimeRenderOption=FormattedString 计算并对时间日期按照其格式进行格式化,但不会对数字进行格式化,返回格式化后的字符串。
+                "dateTimeRenderOption": "",
+
+                # 返回的用户id类型,可选open_id,union_id
+                "user_id_type": "open_id"
+            }
+            urllib3.disable_warnings()
+            r = requests.get(url=get_range_value_url, headers=headers, params=params, proxies=proxies, verify=False)
+            return r.json()["data"]["valueRange"]["values"][0]
+        except Exception as e:
+            logger.error(f"[+] 飞书读取单元格数据异常:{e}")
+    # 获取表内容
+    @classmethod
+    def get_sheet_content(cls, crawler, sheet_id):
+        try:
+            sheet = Feishu.get_values_batch(crawler, sheet_id)
+            content_list = []
+            for x in sheet:
+                for y in x:
+                    if y is None:
+                        pass
+                    else:
+                        content_list.append(y)
+            return content_list
+        except Exception as e:
+            logger.error(f"[+] 飞书get_sheet_content:{e}")
+
+    # 删除行或列,可选 ROWS、COLUMNS
+    @classmethod
+    def dimension_range(cls, crawler, sheetid, major_dimension, startindex, endindex):
+        """
+        删除行或列
+        :param log_type: 日志路径
+        :param crawler: 哪个爬虫
+        :param sheetid:工作表
+        :param major_dimension:默认 ROWS ,可选 ROWS、COLUMNS
+        :param startindex:开始的位置
+        :param endindex:结束的位置
+        :return:
+        """
+        try:
+            dimension_range_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
+                                  + cls.spreadsheettoken(crawler) + "/dimension_range"
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+            body = {
+                "dimension": {
+                    "sheetId": sheetid,
+                    "majorDimension": major_dimension,
+                    "startIndex": startindex,
+                    "endIndex": endindex
+                }
+            }
+            urllib3.disable_warnings()
+            r = requests.delete(url=dimension_range_url, headers=headers, json=body, proxies=proxies, verify=False)
+        except Exception as e:
+            logger.error(f"[+] 飞书删除视频数据异常:{e}")
+
+    # 获取用户 ID
+    @classmethod
+    def get_userid(cls, username):
+        try:
+            url = "https://open.feishu.cn/open-apis/user/v1/batch_get_id?"
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+            name_phone_dict = {
+                "xinxin": "15546206651",
+                "muxinyi": "13699208058",
+                "wangxueke": "13513479926",
+                "yuzhuoyi": "18624010360",
+                "luojunhui": "18801281360",
+                "fanjun": "15200827642",
+                "zhangyong": "17600025055",
+                'liukunyu': "18810931977"
+            }
+            username = name_phone_dict.get(username)
+
+            data = {"mobiles": [username]}
+            urllib3.disable_warnings()
+            r = requests.get(url=url, headers=headers, params=data, verify=False, proxies=proxies)
+            open_id = r.json()["data"]["mobile_users"][username][0]["open_id"]
+
+            return open_id
+        except Exception as e:
+            logger.error(f"[+] 飞书get_userid异常:{e}")
+
+    # 飞书机器人
+    @classmethod
+    def bot(cls, log_type, crawler, text, mark_name):
+        try:
+
+            headers = {'Content-Type': 'application/json'}
+            if crawler == "机器自动改造消息通知":
+                url = "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703"
+                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/KsoMsyP2ghleM9tzBfmcEEXBnXg?sheet=bc154d"
+                users = f"<at id=" + str(cls.get_userid(log_type)) + f">{mark_name}</at>"
+            elif crawler == "快手关键词搜索":
+                url = "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703"
+                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/KsoMsyP2ghleM9tzBfmcEEXBnXg?sheet=U1gySe"
+                users = "".join([f'<at id="{cls.get_userid(type)}">{name}</at>' for type, name in
+                                 zip(log_type, mark_name)])
+                # users = f"<at id=" + str(cls.get_userid(log_type)) + f">{mark_name}</at>"
+            else:
+                url = "https://open.feishu.cn/open-apis/bot/v2/hook/7928f182-08c1-4c4d-b2f7-82e10c93ca80"
+                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/KsoMsyP2ghleM9tzBfmcEEXBnXg?sheet=bc154d"
+                users = f"<at id=" + str(cls.get_userid(log_type)) + f">{mark_name}</at>"
+            data = json.dumps({
+                "msg_type": "interactive",
+                "card": {
+                    "config": {
+                        "wide_screen_mode": True,
+                        "enable_forward": True
+                    },
+                    "elements": [{
+                        "tag": "div",
+                        "text": {
+                            "content": users + text,
+                            "tag": "lark_md"
+                        }
+                    }, {
+                        "actions": [{
+                            "tag": "button",
+                            "text": {
+                                "content": "详情,点击~~~~~",
+                                "tag": "lark_md"
+                            },
+                            "url": sheet_url,
+                            "type": "default",
+                            "value": {}
+                        }],
+                        "tag": "action"
+                    }],
+                    "header": {
+                        "title": {
+                            "content": "📣消息提醒",
+                            "tag": "plain_text"
+                        }
+                    }
+                }
+            })
+            urllib3.disable_warnings()
+            r = requests.post(url, headers=headers, data=data, verify=False, proxies=proxies)
+        except Exception as e:
+            logger.error(f"[+] 飞书bot异常:{e}")
+
+
+    # 飞书机器人-改造计划完成通知
+    @classmethod
+    def finish_bot(cls, text, url, content):
+        try:
+            headers = {'Content-Type': 'application/json'}
+            data = json.dumps({
+                "msg_type": "interactive",
+                "card": {
+                    "config": {
+                        "wide_screen_mode": True,
+                        "enable_forward": True
+                    },
+                    "elements": [{
+                        "tag": "div",
+                        "text": {
+                            "content": text,
+                            "tag": "lark_md"
+                        }
+                    }],
+                    "header": {
+                        "title": {
+                            "content": content,
+                            "tag": "plain_text"
+                        }
+                    }
+                }
+            })
+            urllib3.disable_warnings()
+            r = requests.post(url, headers=headers, data=data, verify=False, proxies=proxies)
+        except Exception as e:
+            logger.error(f"[+] 飞书bot异常:{e}")
+
+if __name__ == "__main__":
+    Feishu.bot('recommend', '抖音', '测试: 抖音cookie失效,请及时更换')
+

+ 67 - 0
common/mysql_db.py

@@ -0,0 +1,67 @@
+# -*- coding: utf-8 -*-
+"""
+数据库连接及操作
+"""
+import pymysql
+
+class MysqlHelper:
+    @classmethod
+    def connect_mysql(cls):
+        # 创建一个 Connection 对象,代表了一个数据库连接
+        connection = pymysql.connect(
+            # host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
+            host="rm-bp1159bu17li9hi94ro.mysql.rds.aliyuncs.com",# 数据库IP地址,外网地址
+            port=3306,  # 端口号
+            user="crawler",  # mysql用户名
+            passwd="crawler123456@",  # mysql用户登录密码
+            db="piaoquan-crawler",  # 数据库名
+            # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+            charset="utf8")
+        return connection
+
+    @classmethod
+    def get_values(cls, sql, params=None):
+        try:
+            # 连接数据库
+            connect = cls.connect_mysql()
+            # 返回一个 Cursor对象
+            mysql = connect.cursor()
+
+            if params:
+                # 如果传递了 params 参数
+                mysql.execute(sql, params)
+            else:
+                # 如果没有传递 params 参数
+                mysql.execute(sql)
+            # fetchall方法返回的是一个元组,里面每个元素也是元组,代表一行记录
+            data = mysql.fetchall()
+
+            # 关闭数据库连接
+            connect.close()
+
+            # 返回查询结果,元组
+            return data
+        except Exception as e:
+            print(f"get_values异常:{e}\n")
+
+    @classmethod
+    def update_values(cls, sql):
+        # 连接数据库
+        connect = cls.connect_mysql()
+        # 返回一个 Cursor对象
+        mysql = connect.cursor()
+        try:
+            # 执行 sql 语句
+            res = mysql.execute(sql)
+            # 注意 一定要commit,否则添加数据不生效
+            connect.commit()
+            return res
+        except Exception as e:
+            # 发生错误时回滚
+            connect.rollback()
+        # 关闭数据库连接
+        connect.close()
+
+
+
+

+ 40 - 0
common/odps_data.py

@@ -0,0 +1,40 @@
+import json
+import datetime
+from odps import ODPS
+
+# ODPS服务配置
+ODPS_CONFIG = {
+    'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
+    'ACCESSID': 'LTAIWYUujJAm7CbH',
+    'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+    'PROJECT': 'loghubods'
+}
+class OdpsDataCount:
+    @classmethod
+    def get_data_count(cls, dt):
+        odps = ODPS(
+            access_id=ODPS_CONFIG['ACCESSID'],
+            secret_access_key=ODPS_CONFIG['ACCESSKEY'],
+            project=ODPS_CONFIG['PROJECT'],
+            endpoint=ODPS_CONFIG['ENDPOINT']
+        )
+        data_values = []
+        try:
+            sql = f'SELECT uid,videoid,return_uv,type,type_owner,channel,channel_owner,title FROM loghubods.all_apptype_top100_return WHERE dt = "{dt}" '
+            with odps.execute_sql(sql).open_reader() as reader:
+                for row in reader:
+                    data_values.append(json.dumps( {"uid": row[0], "videoid": row[1], "return_uv": row[2], "type": row[3], "type_owner": row[4], "channel": row[5], "channel_owner": row[6], "title": row[7], "dt": str(dt)}, ensure_ascii=False ))
+        except Exception as e:
+            print(f"An error occurred: {e}")
+            return data_values
+        return data_values
+
+    @classmethod
+    def main(cls):
+        dt = (datetime.datetime.now() - datetime.timedelta(hours=1)).strftime('%Y%m%d%H')
+        data_count = cls.get_data_count(dt= dt)
+        print(len(data_count))
+        return data_count
+
+if __name__ == '__main__':
+    OdpsDataCount.main()

+ 57 - 0
common/redis.py

@@ -0,0 +1,57 @@
+import redis
+
+from common.odps_data import OdpsDataCount
+
+
+class SyncRedisHelper:
+    _pool: redis.ConnectionPool = None
+    _instance = None
+
+    def __init__(self):
+        if not self._instance:
+            self._pool = self._get_pool()
+            self._instance = self
+
+    def _get_pool(self) -> redis.ConnectionPool:
+        if self._pool is None:
+            self._pool = redis.ConnectionPool(
+                host="r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com",  # 外网地址
+                # host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com",  # 内网地址
+                port=6379,
+                db=0,
+                password="Wqsd@2019",
+                # password="Qingqu2019",
+
+            )
+        return self._pool
+
+    def get_client(self) -> redis.Redis:
+        pool = self._get_pool()
+        client = redis.Redis(connection_pool=pool)
+        return client
+
+    def close(self):
+        if self._pool:
+            self._pool.disconnect(inuse_connections=True)
+
+def insert_job_data(REDIS_NAME):
+    data = OdpsDataCount.main()
+    if not data:
+        return 0
+    helper = SyncRedisHelper()
+    client = helper.get_client()
+    client.rpush(REDIS_NAME, *data)
+    return len(data)
+
+def get_top_data(REDIS_NAME):
+    helper = SyncRedisHelper()
+    client = helper.get_client()
+    ret = client.lpop(REDIS_NAME)
+    return ret
+
+def in_job_video_data(REDIS_NAME, ret):
+    helper = SyncRedisHelper()
+    client = helper.get_client()
+    client.rpush(REDIS_NAME, str(ret))
+
+

+ 28 - 0
common/sql_help.py

@@ -0,0 +1,28 @@
+import os
+import sys
+sys.path.append(os.getcwd())
+from common.mysql_db import MysqlHelper
+
+
+class sqlCollect():
+
+
+
+    @classmethod
+    def insert_machine_making_data(cls, name: str, task_mark: str, channel_id: str, url: str, v_id: str, piaoquan_id: str, new_title: str, code: str, formatted_time, old_title: str, oss_object_key: str):
+        insert_sql = f"""INSERT INTO machine_making_data (name, task_mark, channel, user, v_id, pq_uid, title, pq_vid, data_time, old_title, oss_object_key) values ("{name}", "{task_mark}", "{channel_id}", "{url}", "{v_id}" , "{piaoquan_id}", "{new_title}", "{code}", "{formatted_time}", "{old_title}", "{oss_object_key}")"""
+        MysqlHelper.update_values(
+            sql=insert_sql
+        )
+
+    @classmethod
+    def get_channle_id(cls, pq_id):
+        """
+        从数据库表中读取 id
+        """
+        sql = f"""select v_id,channel from machine_making_data where pq_vid = %s limit 1"""
+        data = MysqlHelper.get_values(sql, (pq_id,))
+        if data:
+            return data[0][0],data[0][1]
+        else:
+            return None, None

+ 16 - 0
docker-compose.yml

@@ -0,0 +1,16 @@
+services:
+  worker1:
+    build:
+      context: .
+      dockerfile: Dockerfile
+    image: top_data
+    container_name: top_worker1
+    restart: unless-stopped
+    environment:
+      - ENV=prod
+    networks:
+      - carry_net
+networks:
+  carry_net:
+    name: carry_net
+

+ 58 - 0
job_data.py

@@ -0,0 +1,58 @@
+import json
+import os
+import time
+import uuid
+
+import schedule
+from loguru import logger
+
+from common.aliyun_log import AliyunLogger
+from common.redis import get_top_data, in_job_video_data
+from top_automatic.top_data_processing import Top
+
+
+def get_data_task():
+    top_tasks = set()  # 使用集合去重
+    while True:
+        top_task = get_top_data("task:top_all_data")
+        if top_task:
+            data = json.loads(top_task)
+            channel_id = data['channel']
+            if channel_id not in ["抖音关键词抓取", "快手关键词抓取", "搬运改造"]:
+                logger.info(f"[+] 改内容为:{channel_id},不做处理")
+                AliyunLogger.logging(channel_id, data, "不处理",None)
+                continue
+            top_tasks.add(top_task)
+        else:
+            return list(top_tasks)
+
+def video_task_start():
+    logger.info(f"[+] 任务开始获取小时级top数据")
+    data_list = get_data_task()
+    logger.info(f"[+] 共获取{len(data_list)}条")
+    if not data_list:
+        return
+    for data in data_list:
+        try:
+            logger.info(f"[+] 任务处理{data}任务")
+            top = Top()
+            top.main(data)
+            logger.info(f"[+] {data}处理成功")
+            time.sleep(5)
+            continue
+        except Exception as e:
+            data = json.loads(data)
+            in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4))
+            AliyunLogger.logging(data['channel'], data, "处理失败重新处理",None)
+            logger.error(f"[+] {data}处理失败,失败信息{e}")
+            continue
+
+def schedule_tasks():
+    schedule.every(10).minutes.do(video_task_start)
+
+if __name__ == '__main__':
+    schedule_tasks()  # 调用任务调度函数
+    while True:
+        schedule.run_pending()
+        time.sleep(1)  # 每秒钟检查一次
+    # video_task_start()

+ 0 - 0
job_day_redis.py


+ 28 - 0
job_hour_data_redis.py

@@ -0,0 +1,28 @@
+import time
+
+import schedule
+from loguru import logger
+
+from common.redis import insert_job_data
+
+
+def jab_top_recommend():
+    """获取每日每小时top前100数据"""
+    try:
+        logger.info(f"开始获取每日每小时top前100数据")
+        redis_task = 'task:top_all_data'
+        top_count = insert_job_data(redis_task)
+        logger.info(f"获取每日每小时top前100数据共{top_count}条")
+        return
+    except Exception as e:
+        logger.error(f"获取每日每小时top前100数据异常,异常信息{e}")
+        return
+def schedule_tasks():
+    schedule.every().hour.at(":05").do(jab_top_recommend)
+
+if __name__ == "__main__":
+    jab_top_recommend()
+    # schedule_tasks()  # 调用任务调度函数
+    # while True:
+    #     schedule.run_pending()
+    #     time.sleep(1)  # 每秒钟检查一次

+ 11 - 0
requirements.txt

@@ -0,0 +1,11 @@
+aliyun-log-python-sdk==0.9.12
+google-generativeai==0.8.3
+loguru==0.7.2
+mutagen==1.47.0
+odps==3.5.1
+opencv-python==4.10.0.84
+oss2==2.19.1
+redis==5.1.1
+requests==2.32.3
+schedule==1.2.2
+pymysql==1.0.2

+ 0 - 0
top_automatic/__init__.py


+ 187 - 0
top_automatic/top_data_processing.py

@@ -0,0 +1,187 @@
+from loguru import logger
+from common.aliyun_log import AliyunLogger
+from common.feishu_utils import Feishu
+from common.redis import get_top_data, in_job_video_data
+import html
+import json
+import random
+import re
+import time
+import requests
+from urllib.parse import urlparse, parse_qs
+from loguru import logger
+from common.sql_help import sqlCollect
+
+
+class Top:
+    def get_text_dy_video(self,url):
+        max_retries = 3
+        retry_count = 0
+        while retry_count < max_retries:
+            try:
+                if "http" not in url:
+                    video_id = url
+                elif "&vid=" in url:
+                    parsed_url = urlparse(url)
+                    params = parse_qs(parsed_url.query)
+                    video_id = params.get('vid', [None])[0]
+                elif "?modal_id=" in url:
+                    parsed_url = urlparse(url)
+                    params = parse_qs(parsed_url.query)
+                    video_id = params.get('modal_id', [None])[0]
+                else:
+                    headers = {
+                        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;'
+                                  'q=0.8,application/signed-exchange;v=b3;q=0.7',
+                        'Accept-Language': 'zh-CN,zh;q=0.9',
+                        'Cache-Control': 'no-cache',
+                        'Pragma': 'no-cache',
+                        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) '
+                                      'Chrome/127.0.0.0 Safari/537.36',
+                    }
+                    response = requests.request(url=url, method='GET', headers=headers, allow_redirects=False, timeout=30)
+                    location = response.headers.get('Location', None)
+                    video_id = re.search(r'/video/(\d+)/?', location.split('?')[0] if location else url).group(1)
+                url = "http://8.217.192.46:8889/crawler/dou_yin/detail"
+                if not video_id or not video_id.strip():
+                    return None, None, None
+                payload = json.dumps({
+                    "content_id": str(video_id)
+                })
+                headers = {
+                    'Content-Type': 'application/json'
+                }
+                time.sleep(random.uniform(5, 10))
+                response = requests.request("POST", url, headers=headers, data=payload, timeout= 60)
+                response = response.json()
+                code = response["code"]
+                if code == 0:
+                    data = response["data"]["data"]
+                    channel_account_id = data["channel_account_id"]
+                    return channel_account_id
+                if code == 22002:
+                    if '抖音内容已被删除或无法访问' in response['msg']:
+                        return "作品不存在"
+            except Exception as e:
+                retry_count += 1
+                logger.error(f"[+] 抖音{url}获取视频链接失败,失败信息{e}")
+                time.sleep(1)
+        return None
+
+    def get_text_ks_video(self,url):
+        try:
+            if "http" not in url:
+                video_id = url
+            else:
+                headers = {
+                    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;'
+                              'q=0.8,application/signed-exchange;v=b3;q=0.7',
+                    'Accept-Language': 'zh-CN,zh;q=0.9',
+                    'Cache-Control': 'no-cache',
+                    'Pragma': 'no-cache',
+                    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) '
+                                  'Chrome/127.0.0.0 Safari/537.36',
+                }
+                response = requests.request(url=url, method='GET', headers=headers, allow_redirects=False, timeout= 30)
+                location = response.headers.get('Location', None)
+                video_id = re.search(r'/(f|photo|short-video|long-video)/(.*)/?',
+                                     location.split('?')[0] if location else url).group(2)
+            url = "http://8.217.192.46:8889/crawler/kuai_shou/detail"
+            if not video_id or not video_id.strip():
+                return None
+            payload = json.dumps({
+                "content_id": str(video_id)
+            })
+            headers = {
+                'Content-Type': 'application/json'
+            }
+            time.sleep(random.uniform(5, 10))
+            response = requests.request("POST", url, headers=headers, data=payload, timeout= 30)
+            response = response.json()
+            code = response["code"]
+            if code == 0:
+                data = response["data"]["data"]
+                channel_account_id = data['channel_account_id']
+                return channel_account_id
+            elif code == 27006:
+                if "作品不存在" in response['msg'] or "内容不存在" in response['msg'] or "私密作品" in response['msg']:
+                    return "作品不存在"
+            time.sleep(3)
+        except Exception as e:
+            logger.error(f"[+] 快手{url}获取视频链接失败,失败信息{e}")
+            return None
+
+
+    def main(self,data):
+        channel_account_id = None
+        tag_transport_channel = None
+        data = json.loads(data)
+        AliyunLogger.logging()
+        channel_id = data['channel']
+        url_id, data_channel = sqlCollect.get_channle_id(data['videoid'])
+        if not url_id:
+            logger.info(f"[+] 任务{data},没有该视频信息")
+            AliyunLogger.logging(data['channel'], data, "没有该视频信息",None)
+            return
+        if "&vid=" in url_id or "?modal_id=" in url_id:
+            host = urlparse(url_id).netloc
+        else:
+            msg = html.unescape(url_id).split('?')[0]
+            pattern = re.search(r'https?://[^\s<>"\'\u4e00-\u9fff]+', msg)
+            if not pattern:
+                in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4))
+                return
+            url_id = pattern.group()
+            host = urlparse(url_id).netloc
+        if host in ['v.douyin.com', 'www.douyin.com', 'www.iesdouyin.com'] or data_channel in "抖音":
+            tag_transport_channel = "抖音"
+            logger.info(f"[+] {url_id}开始获取抖音视频链接")
+            channel_account_id = self.get_text_dy_video(url=url_id)
+        elif host in ['v.kuaishou.com', 'www.kuaishou.com', 'v.m.chenzhongtech.com', 'creater.eozatvmq.com'] or data_channel in "快手":
+            tag_transport_channel = "快手"
+            logger.info(f"[+] {url_id}开始获取快手视频链接")
+            channel_account_id= self.get_text_ks_video(url=url_id)
+        if not channel_account_id:
+            AliyunLogger.logging(data['channel'], data, "没有获取到视频用户ID,等待重新获取",None)
+            in_job_video_data("task:top_all_data", json.dumps(data, ensure_ascii=False, indent=4))
+            return
+        data["channel_account_id"] = channel_account_id
+        if channel_id in ["抖音关键词抓取", "快手关键词抓取"]:
+            data["tag_transport_channel"] = tag_transport_channel
+            redis_data = f"task:top_data_{'dy' if channel_id == '抖音关键词抓取' else 'ks'}_gjc"
+        else:
+            data["tag_transport_channel"] = tag_transport_channel
+            redis_data = f"task:top_data_{'ks' if tag_transport_channel == '快手' else 'dy'}_gz"
+        AliyunLogger.logging(data['channel'], data, "获取成功等待写入改造任务", channel_account_id)
+        in_job_video_data(redis_data, json.dumps(data, ensure_ascii=False, indent=4))
+        logger.info(f"[+] 开始写入飞书表格")
+
+        values = [
+            [
+                data['uid'],
+                data['videoid'],
+                data['return_uv'],
+                data['type'],
+                data['type_owner'],
+                data['channel'],
+                data['channel_owner'],
+                data['title'],
+                data['dt'],
+                channel_account_id,
+                tag_transport_channel
+            ]
+        ]
+        Feishu.insert_columns("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "ROWS", 1, 2)
+        time.sleep(0.5)
+        Feishu.update_values("KUIksoqZkhvZOrtqA1McPwObn7d", "57c076", "A2:Z2", values)
+        logger.info(f"[+] 成功写入飞书表格")
+
+        return
+
+
+if __name__ == '__main__':
+    url,channel = sqlCollect.get_channle_id(45843781)
+    print(url,channel)
+
+    # top = Top()
+    # top.main("task:top_all_data")