| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361 | import osimport randomimport sysimport timeimport uuidimport jsonfrom datetime import datetimefrom typing import Dict, Anyimport cv2import requestsfrom application.common.feishu import FsDatafrom application.common.feishu.feishu_utils import FeishuUtilsfrom application.common.gpt import GPT4oMinifrom application.common.mysql.sql import Sqlfrom application.common.redis.xng_redis import xng_in_video_datafrom application.config.config import xiaoniangao_view_api,xiaoniangao_history_api,xiaoniangao_log_upload_apisys.path.append(os.getcwd())from application.items import VideoItemfrom application.pipeline import PiaoQuanPipelinefrom application.common.messageQueue import MQfrom application.common.log import AliyunLoggerfrom application.common.mysql import MysqlHelperdef video_view(content_id, account_id):    headers = {        "Content-Type": "application/json"    }    payload = {        "content_id": str(content_id),        "account_id": str(account_id)    }    try:        # 发送 POST 请求        response = requests.post(            xiaoniangao_view_api,            headers=headers,            json=payload  # 自动将字典转换为 JSON        )        # 检查 HTTP 状态码        if response.status_code == 200:            # 解析 JSON 响应            result = response.json()            # 提取关键字段            code = result.get("code")            msg = result.get("msg")            # 业务逻辑处理(示例)            if code == 0:                print("请求成功")            else:                print(f"{xiaoniangao_view_api}请求失败,错误码: {code}, 消息: {msg}")        else:            print(f"{xiaoniangao_view_api}HTTP 请求失败,状态码: {response.status_code}")    except requests.exceptions.RequestException as e:        print(f"{xiaoniangao_view_api}请求异常: {e}")    except json.JSONDecodeError:        print(f"{xiaoniangao_view_api}响应不是有效的 JSON 格式")def video_history(video_view_lists):    headers = {        "Content-Type": "application/json"    }    payload = {        "content_ids": video_view_lists    }    try:        # 发送 POST 请求        response = requests.post(            xiaoniangao_history_api,            headers=headers,            json=payload  # 自动将字典转换为 JSON        )        # 检查 HTTP 状态码        if response.status_code == 200:            # 解析 JSON 响应            result = response.json()            # 提取关键字段            code = result.get("code")            msg = result.get("msg")            # 业务逻辑处理(示例)            if code == 0:                print("请求成功")            else:                print(f"{xiaoniangao_history_api}请求失败{payload},错误码: {code}, 消息: {msg}")        else:            print(f"{xiaoniangao_history_api}HTTP 请求失败{payload},状态码: {response.status_code}")    except requests.exceptions.RequestException as e:        print(f"{xiaoniangao_history_api}请求异常: {e}")    except json.JSONDecodeError:        print(f"{xiaoniangao_history_api}响应不是有效的 JSON 格式")def log_upload(video_objs):    headers = {        "Content-Type": "application/json"    }    payload = {        "e": video_objs    }    try:        # 发送 POST 请求        response = requests.post(            xiaoniangao_log_upload_api,            headers=headers,            json=payload  # 自动将字典转换为 JSON        )        # 检查 HTTP 状态码        if response.status_code == 200:            # 解析 JSON 响应            result = response.json()            # 提取关键字段            code = result.get("code")            msg = result.get("msg")            # 业务逻辑处理(示例)            if code == 0:                print(f"{xiaoniangao_log_upload_api}请求成功")            else:                print(f"{xiaoniangao_log_upload_api}请求失败,错误码: {code}, 消息: {msg}")        else:            print(f"{xiaoniangao_log_upload_api} 请求失败,状态码: {response.status_code}")    except requests.exceptions.RequestException as e:        print(f"{xiaoniangao_log_upload_api}请求异常: {e}")    except json.JSONDecodeError:        print(f"{xiaoniangao_log_upload_api}响应不是有效的 JSON 格式")class XNGTJLRecommend(object):    """    小年糕推荐流    """    def __init__(self, platform, mode, rule_dict, user_list, env="prod"):        self.limit_flag = False        self.platform = platform        self.mode = mode        self.rule_dict = rule_dict        self.user_list = user_list        self.env = env        self.download_cnt = 0        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)        self.expire_flag = False        self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)        self.mysql = MysqlHelper(mode=self.mode, platform=self)    def get_video_duration(self, video_link: str) -> int:        cap = cv2.VideoCapture(video_link)        if cap.isOpened():            rate = cap.get(5)            frame_num = cap.get(7)            duration = int(frame_num / rate)            return duration        return 0    def get_recommend_list(self):        print("小年糕推荐流开始")        """        获取推荐页视频        """        headers = {            'Content-Type': 'application/json'        }        data_rule = FsData()        title_rule = data_rule.get_title_rule()        # for i in range(3):        for i in range(11):            url = "http://8.217.192.46:8889/crawler/xiao_nian_gao_plus/recommend"            payload = json.dumps({})            response = requests.request("POST", url, headers=headers, data=payload)            response = response.json()            if response['code'] != 0:                self.aliyun_log.logging(                    code="3000",                    message="抓取单条视频失败,请求失败"                ),                continue            video_view_lists = []            video_log_uploads = []            if len(response['data']['data']) == 0:                self.aliyun_log.logging(                    code="3000",                    message="抓取视频失败,无数据返回",                    data=response                )                continue            for index, video_obj in enumerate(response['data']['data'], 1):                try:                    self.aliyun_log.logging(                        code="1001", message="扫描到一条视频", data=video_obj                    )                    vid = video_obj['id']                    video_view_lists.append(str(vid))                    video_log_uploads.append(self.build_video_log(video_obj, index))                    self.process_video_obj(video_obj, title_rule)                except Exception as e:                    self.aliyun_log.logging(                        code="3000",                        message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(                            1, index, e                        ),                    )                if self.limit_flag:                    return            video_history(video_view_lists)            log_upload(video_log_uploads)            time.sleep(random.randint(5, 10))    def process_video_obj(self, video_obj, title_rule):        """        处理视频        :param video_obj:        """        # time.sleep(random.randint(3, 8))        trace_id = self.platform + str(uuid.uuid1())        our_user = random.choice(self.user_list)        item = VideoItem()        vid = video_obj['id']        mid = int(video_obj['user']['mid'])        print(f"vid={vid},mid={mid}")        try:            mid = int(video_obj['user']['mid'])            print(f"id:{mid}")            user_name = video_obj['user']['nick']            avatar_url = video_obj['user']['hurl']            sql = Sql()            max_id = sql.select_id(mid)            if max_id:                sql.update_name_url(mid, avatar_url, user_name)            else:                time.sleep(1)                link = sql.select_id_status(mid)                if link:                    sql.insert_name_url(mid, avatar_url, user_name)                    print(f"开始写入{mid}")                    xng_in_video_data(json.dumps({"mid": mid}))        except Exception as e:            print(f"写入异常{e}")            pass        url = video_obj["v_url"]        duration = self.get_video_duration(url)        item.add_video_info("video_id", video_obj["id"])        item.add_video_info("video_title", video_obj["title"])        item.add_video_info("play_cnt", int(video_obj["play_pv"]))        item.add_video_info("publish_time_stamp", int(int(video_obj["t"]) / 1000))        item.add_video_info("out_user_id", video_obj["id"])        item.add_video_info("cover_url", video_obj["url"])        item.add_video_info("like_cnt", 0)        item.add_video_info("share_cnt", int(video_obj["share"]))        item.add_video_info("comment_cnt", int(video_obj["comment_count"]))        item.add_video_info("video_url", video_obj["v_url"])        item.add_video_info("out_video_id", video_obj["id"])        item.add_video_info("duration", int(duration))        item.add_video_info("platform", self.platform)        item.add_video_info("strategy", self.mode)        item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))        item.add_video_info("user_id", our_user["uid"])        item.add_video_info("user_name", our_user["nick_name"])        mq_obj = item.produce_item()        pipeline = PiaoQuanPipeline(            platform=self.platform,            mode=self.mode,            rule_dict=self.rule_dict,            env=self.env,            item=mq_obj,            trace_id=trace_id,        )        if pipeline.process_item():            title_list = title_rule.split(",")            title = video_obj["title"]            contains_keyword = any(keyword in title for keyword in title_list)            if contains_keyword:                new_title = GPT4oMini.get_ai_mini_title(title)                if new_title:                    item.add_video_info("video_title", new_title)                    current_time = datetime.now()                    formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")                    values = [                        [                            video_obj["v_url"],                            video_obj["url"],                            title,                            new_title,                            formatted_time,                        ]                    ]                    FeishuUtils.insert_columns("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "D1nVxQ", "ROWS", 1, 2)                    time.sleep(0.5)                    FeishuUtils.update_values("U5dXsSlPOhiNNCtEfgqcm1iYnpf", "D1nVxQ", "A2:Z2", values)            self.mq.send_msg(mq_obj)            video_view(vid, mid)            self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)            if self.download_cnt >= int(                    self.rule_dict.get("videos_cnt", {}).get("min", 200)            ):                self.limit_flag = True    """    查询用户id是否存在    """    def select_id(self, uid):        sql = f""" select uid from xng_uid where uid = "{uid}"; """        db = MysqlHelper()        repeat_video = db.select(sql=sql)        if repeat_video:            return True        return False    """    查询用户id是否之前已添加过    """    def select_id_status(self, uid):        sql = f""" select uid from crawler_user_v3 where link = "{uid}"; """        db = MysqlHelper()        repeat_video = db.select(sql=sql)        if repeat_video:            return False        return True    def build_video_log(self, video_obj: Dict[str, Any], index: int) -> Dict[str, Any]:        """构建视频日志对象"""        return {            "ac": "show",            "md_ver": "2.0",            "data": {                "page": "discoverIndexPage",                "topic": "recommend",                "tpl_id": str(video_obj['tpl_id']),                "profile_ct": str(video_obj['p_ct']),                "flv": "0",                "sign": video_obj['sign'],                "serial_id": video_obj['serial_id'],                "type": "post",                "name": "post",                "src_page": "discoverIndexPage/recommend",                "aid": str(video_obj['album_id']),                "cid": str(video_obj['id']),                "feed_idx": index - 1,                "cmid": str(video_obj['user']['mid']),                "user_ct": "1638346045009"            },            "t": int(time.time() * 1000),            "ab": {}        }    def run(self):        self.get_recommend_list()if __name__ == '__main__':    J = XNGTJLRecommend(        platform="xiaonianggaotuijianliu",        mode="recommend",        rule_dict={},        user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}],    )    J.get_recommend_list()    # J.logic()
 |