| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 | import osimport randomimport sysimport timeimport uuidimport jsonfrom datetime import datetimeimport requestsfrom application.common import Feishusys.path.append(os.getcwd())from application.items import VideoItemfrom application.pipeline import PiaoQuanPipelinefrom application.common.messageQueue import MQfrom application.common.proxies import tunnel_proxiesfrom application.common.log import AliyunLoggerfrom application.common.mysql import MysqlHelperclass JXXFRecommend(object):    """    吉祥幸福-欢快吉祥早安祝福    """    def __init__(self, platform, mode, rule_dict, user_list, env="prod"):        self.limit_flag = False        self.platform = platform        self.mode = mode        self.rule_dict = rule_dict        self.user_list = user_list        self.env = env        self.download_cnt = 0        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)        self.expire_flag = False        self.aliyun_log = AliyunLogger(mode=self.mode, platform=self.platform)        self.mysql = MysqlHelper(mode=self.mode, platform=self)    def get_cookie(self):        sql = f""" select * from crawler_config where source="{self.platform}" """        configs = self.mysql.select(sql=sql)        for config in configs:            if "token" in config:                token_element = config[3]                data_json = json.loads(token_element)                token = data_json.get("token")                return token    def logic(self):        for i in range(10):            app_id = 'wx6692a24ad2a88bfb'            js_code = self.get_js_code(app_id)            token = self.get_search_params(app_id, js_code)            if token:                return token    def get_js_code(self, app_id: str) -> str:        js_code = ''        try:            url = 'http://61.48.133.26:30001/GetMiniAppCode'            data = {                "appid": app_id            }            response =requests.request(method='POST', url=url, json=data)            body = response.content.decode()            res_data = json.loads(body)            js_code = res_data['GetMiniAppCode']        except Exception as e:            pass        return js_code    def get_search_params(self, app_id: str, js_code: str) -> dict:        try:            url = f"https://api.huanqiwl.top/index.php?s=mobile/Login/loginToken&code={js_code}&appid={app_id}"            headers = {                'Connection': 'keep-alive',                'content-type': 'application/json',                'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_5_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/8.0.48(0x18003030) NetType/WIFI Language/zh_CN',                'Referer': 'https://servicewechat.com/wx6692a24ad2a88bfb/5/page-frame.html'            }            response =requests.request(method='GET', headers=headers, url=url, data={})            body = response.content.decode()            body_json = json.loads(body)            data = body_json.get("data")            token = data['token']        except Exception as e:            return ''        return token    def get_recommend_list(self):        """        获取推荐页视频        """        token = self.logic()        headers = {            'Host': 'api.huanqiwl.top',            'Content-Type': 'application/json',            'Accept-Language': 'zh-cn',            'token': token,            'Accept': '*/*',            'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 11_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E217 MicroMessenger/6.8.0(0x16080000) NetType/WIFI Language/en Branch/Br_trunk MiniProgramEnv/Mac',            'Referer': 'https://servicewechat.com/wx6692a24ad2a88bfb/3/page-frame.html'        }        for i in range(14):            time.sleep(random.randint(1, 10))            for j in range(2):                url = f"https://api.huanqiwl.top/index.php?s=mobile/Video/getList&cid={j}&page={i}&api_version=4&appid=wx6692a24ad2a88bfb&version=1.9.5&env_version=release&scene=1053"                payload = {}                response = requests.request("GET", url, headers=headers, data=payload)                if "未登录" in response.text:                    self.aliyun_log.logging(                        code="3000",                        message="抓取单条视频失败, token 失效"                        ),                    break                for index, video_obj in enumerate(response.json()['data']['list'], 1):                    try:                        self.aliyun_log.logging(                            code="1001", message="扫描到一条视频", data=video_obj                        )                        self.process_video_obj(video_obj)                    except Exception as e:                        self.aliyun_log.logging(                            code="3000",                            message="抓取单条视频失败, 该视频位于第{}页第{}条报错原因是{}".format(                                i, index, e                            ),                        )                if self.limit_flag:                    return                time.sleep(random.randint(5, 10))    def process_video_obj(self, video_obj):        """        处理视频        :param video_obj:        """        time.sleep(random.randint(3, 8))        trace_id = self.platform + str(uuid.uuid1())        our_user = random.choice(self.user_list)        item = VideoItem()        item.add_video_info("video_id", video_obj["id"])        item.add_video_info("video_title", video_obj["title"])        item.add_video_info("play_cnt", 0)        item.add_video_info("publish_time_stamp", int(time.time()))        item.add_video_info("out_user_id", video_obj["id"])        item.add_video_info("cover_url", video_obj["images"])        item.add_video_info("like_cnt", 0)        item.add_video_info("video_url", video_obj["video_url"])        item.add_video_info("out_video_id", video_obj["id"])        item.add_video_info("platform", self.platform)        item.add_video_info("strategy", self.mode)        item.add_video_info("session", "{}-{}".format(self.platform, int(time.time())))        item.add_video_info("user_id", our_user["uid"])        item.add_video_info("user_name", our_user["nick_name"])        # 获取当前时间        current_time = datetime.now()        formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")        values = [[            video_obj["id"],            formatted_time,            video_obj["title"],            video_obj["images"],            video_obj["video_url"]        ]]        Feishu.insert_columns(self.platform, 'jixiangxingfu', "L0KXHh", "ROWS", 1, 2)        time.sleep(0.5)        Feishu.update_values(self.platform, 'jixiangxingfu', "L0KXHh", "A2:Z2", values)        mq_obj = item.produce_item()        pipeline = PiaoQuanPipeline(            platform=self.platform,            mode=self.mode,            rule_dict=self.rule_dict,            env=self.env,            item=mq_obj,            trace_id=trace_id,        )        if pipeline.process_item():            self.download_cnt += 1            self.mq.send_msg(mq_obj)            self.aliyun_log.logging(code="1002", message="成功发送至 ETL", data=mq_obj)            if self.download_cnt >= int(                    self.rule_dict.get("videos_cnt", {}).get("min", 200)            ):                self.limit_flag = True    def run(self):        self.get_recommend_list()if __name__ == '__main__':    J = JXXFRecommend(        platform="jixiangxingfu",        mode="recommend",        rule_dict={},        user_list=[{'uid': "123456", 'nick_name': "xiaoxiao"}],    )    J.get_recommend_list()    # J.logic()
 |