""" @author: luojunhui 通过抓包 aigc 平台,自动化一些操作 """ import requests import json from applications.decoratorApi import retryOnTimeout from applications.denetMysql import DeNetMysql HEADERS = { 'Accept': 'application/json', 'Accept-Language': 'zh,zh-CN;q=0.9', 'Content-Type': 'application/json', 'Origin': 'http://admin.cybertogether.net', 'Proxy-Connection': 'keep-alive', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36' } PERSON_COOKIE = { "token": "af54cdc404c3464d896745df389b2dce", "appType": 9, "platform": "pc", "appVersionCode": 1000, "clientTimestamp": 1, "fid": 1, "loginUid": 1, "pageSource": 1, "requestId": 1, "rid": 1, "uid": 1 } def get_generated_article_list(plan_id): """ 自动生成计划 id 获取该生成计划已经生成过的文章列表 :param plan_id: :return: """ db = DeNetMysql() sql = f""" SELECT account.wx_gh, content.title, content.content_link, content.view_count, content.like_count, from_unixtime(cprr.create_timestamp / 1000) AS 抓取时间, from_unixtime(content.publish_timestamp / 1000) AS 发布时间 FROM crawler_plan_result_rel cprr JOIN crawler_plan plan ON cprr.plan_id = plan.id JOIN crawler_content content ON cprr.channel_source_id = content.channel_content_id JOIN crawler_account account ON content.channel_account_id = account.channel_account_id WHERE plan_id IN ( SELECT input_source_value FROM produce_plan_input_source WHERE plan_id = '{plan_id}' ); """ article_list = db.select(sql) return article_list def get_generated_article_title(generate_task_id): """ 生成计划 id 获取该生成计划已经生成过的文章标题 :param generate_task_id: :return: title_set """ db = DeNetMysql() sql = f""" SELECT DISTINCT output.output FROM produce_plan_exe_record planExeRecord JOIN produce_plan_module_output output ON output.plan_exe_id = planExeRecord.plan_exe_id AND output.produce_module_type = 3 WHERE planExeRecord.plan_id = '{generate_task_id}'; """ title_tuple = db.select(sql) title_set = set([i[0] for i in title_tuple]) return title_set def get_publish_account_from_aigc(): """ 从 aigc 系统中获取正在发布的账号 :return: name: 公众号名称 gh_id: 公众号 gh_id follower_count: 粉丝数量 service_type_info: '公众号类型:0-订阅号,1-由历史老账号升级后的订阅号,2-服务号', verify_type_info:'公众号认证类型:-1-未认证,0-微信认证,1-新浪微博认证,3-已资质认证通过但还未通过名称认证,4-已资质认证通过、还未通过名称认证,但通过了新浪微博认证' """ db = DeNetMysql() sql = """ SELECT DISTINCT t3.`name`, t3.gh_id, t3.follower_count, t3.create_timestamp, t4.service_type_info, t4.verify_type_info, t3.id FROM publish_plan t1 JOIN publish_plan_account t2 ON t1.id = t2.plan_id JOIN publish_account t3 ON t2.account_id = t3.id LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id WHERE t1.plan_status = 1 AND t3.channel = 5 GROUP BY t3.id ORDER BY t3.create_timestamp DESC """ info_tuple = db.select(sql) info_list = [ { "name": line[0], "ghId": line[1], "follower_count": line[2], "account_init_timestamp": int(line[3]), "account_type": line[4], "account_auth": line[5], "account_id": line[6] } for line in info_tuple ] return info_list def auto_create_crawler_task(plan_id, plan_name, plan_tag, url_list): """ 通过 url 自动创建抓取计划 :param plan_id: 计划 id, 若往已经存在的 plan_id 中加文章则需要传,否则会新生成一个 id :param plan_name: 计划名称 :param plan_tag: 计划标签 :param url_list: 输入的 url_list :return: """ url = "http://aigc-api.cybertogether.net/aigc/crawler/plan/save" payload = json.dumps({ "params": { "contentFilters": [], "accountFilters": [], "filterAccountMatchMode": 1, "filterContentMatchMode": 1, "selectModeValues": [], "searchModeValues": [], "contentModal": 3, "analyze": {}, "crawlerComment": 0, "inputGroup": None, "inputSourceGroups": [], "modePublishTime": [], "planType": 2, "frequencyType": 2, "planTag": plan_tag, "tagPenetrateFlag": 0, "id": plan_id, "name": plan_name, "channel": 5, "crawlerMode": 5, "inputModeValues": url_list, "modePublishTimeStart": None, "modePublishTimeEnd": None, "executeRate": None, "executeDate": None, "executeWindowStart": None, "executeWindowEnd": None, "executeTimeInterval": None, "executeNum": None, "addModal": None, "addChannel": None, "fileUpload": None, "prompt": None, "acelFlag": None, "tasks": [] }, "baseInfo": PERSON_COOKIE }) response = requests.request("POST", url, headers=HEADERS, data=payload) return response.json() def bind_crawler_task_to_generate_task(crawler_task_list, generate_task_id): """ 将抓取计划绑定至生成计划 生成计划已经存在 :crawler_task_list: 要输入的抓取计划List :generate_task_id: 目标生成计划 id :return: response """ url = "http://aigc-api.cybertogether.net/aigc/produce/plan/save" plan_info = get_generate_task_detail(generate_task_id) input_source_groups = plan_info.get("inputSourceGroups") existed_crawler_task = input_source_groups[0].get("inputSources") new_task_list = existed_crawler_task + crawler_task_list input_source_group_0 = input_source_groups[0] input_source_group_0['inputSources'] = new_task_list payload = json.dumps({ "params": { "contentFilters": [], "produceModal": plan_info.get("produceModal"), "inputModal": plan_info.get("inputModal"), "tasks": plan_info.get("tasks", []), "modules": [], "moduleGroups": plan_info.get("moduleGroups"), "inputSourceGroups": [input_source_group_0], "layoutType": plan_info.get("layoutType"), "activeManualReview": plan_info.get("activeManualReview"), "totalProduceNum": plan_info.get("totalProduceNum"), "dailyProduceNum": plan_info.get("dailyProduceNum"), "maxConcurrentNum": plan_info.get("maxConcurrentNum"), "id": generate_task_id, "name": plan_info.get("name"), "planTag": plan_info.get("planTag"), "tagPenetrateFlag": plan_info.get("tagPenetrateFlag"), "inputType": plan_info.get("inputType"), "inputChannel": plan_info.get("inputChannel"), "activeManualReviewCount": plan_info.get("activeManualReviewCount"), "autoComposite": plan_info.get("autoComposite") }, "baseInfo": PERSON_COOKIE }) response = requests.request("POST", url, headers=HEADERS, data=payload) return response.json() @retryOnTimeout() def get_generate_task_detail(generate_task_id): """ 通过生成计划的 id,获取该生成计划已有的抓取计划 list :param generate_task_id: :return: """ url = "http://aigc-api.cybertogether.net/aigc/produce/plan/detail" payload = json.dumps({ "params": { "id": generate_task_id }, "baseInfo": PERSON_COOKIE }) response = requests.request("POST", url, headers=HEADERS, data=payload, timeout=10) result = response.json() if result['msg'] == 'success': return result['data'] else: return {} @retryOnTimeout() def get_publish_task_detail(publish_task_id): """ 通过发布计划的 id,获取该发布计划已有的抓取计划 list :param publish_task_id: :param generate_task_id: :return: """ url = "http://aigc-api.cybertogether.net/aigc/publish/plan/detail" payload = json.dumps({ "params": { "id": publish_task_id }, "baseInfo": PERSON_COOKIE }) response = requests.request("POST", url, headers=HEADERS, data=payload) return response.json() def bind_crawler_task_to_publish_task(target_publish_task_id, crawler_task_name, crawler_task_id): """ 将抓取计划绑定至发布计划 发布计划已经存在 :param crawler_task_id: 抓取计划ID :param crawler_task_name: 抓取计划名称 :param target_publish_task_id: 目标发布计划 id :return: response """ publish_task_detail = get_publish_task_detail(target_publish_task_id) publish_task_detail_data = publish_task_detail.get("data") already_exist_crawler_task_list = publish_task_detail_data.get("inputGroups")[0].get("inputSources") new_crawler_task_list = [ { "sourceCategory": 1, "inputSourceValueType": 1, "inputSourceValue": crawler_task_id, "inputSourceLabel": crawler_task_name } ] new_input_source_group = already_exist_crawler_task_list + new_crawler_task_list if publish_task_detail_data: url = "http://aigc-api.cybertogether.net/aigc/publish/plan/save" payload = json.dumps({ "params": { "accountIds": [i['id'] for i in publish_task_detail_data.get("accountIds")], "inputGroups": [ { "groupId": "e40cd06daeb5345ed26256c8744f7a33", "groupName": None, "channel": None, "contentModal": None, "groupIndex": 1, "filterMatchMode": 2, "inputSources": new_input_source_group, "inputFilters": [], "inputOrders": [], "label": "input1" } ], "inputSources": [], "inputFilters": [], "activeManualReview": publish_task_detail_data.get("activeManualReview"), "channel": publish_task_detail_data.get("channel"), "contentAllocationRules": publish_task_detail_data.get("contentAllocationRules"), "contentModal": publish_task_detail_data.get("contentModal"), "contentSortingRules": publish_task_detail_data.get("contentSortingRules"), "douyinPublishAccoutSetting": publish_task_detail_data.get("douyinPublishAccoutSetting"), "filterMatchMode": 1, "name": publish_task_detail_data.get("name"), "publishAccoutJson": "", "publishBgmType": publish_task_detail_data.get("publishBgmType"), "publishDate": publish_task_detail_data.get("publishDate"), "publishLocation": publish_task_detail_data.get("publishLocation"), "publishNum": publish_task_detail_data.get("publishNum"), "publishPushTime": publish_task_detail_data.get("publishPushTime"), "publishRate": publish_task_detail_data.get("publishRate"), "publishTimeInterval": publish_task_detail_data.get("publishTimeInterval"), "publishWindowEnd": publish_task_detail_data.get("publishWindowEnd"), "publishWindowStart": publish_task_detail_data.get("publishWindowStart"), "wxContentInsert": publish_task_detail_data.get("wxContentInsert"), "wxVideoPublishAccountSetting": publish_task_detail_data.get("wxVideoPublishAccountSetting"), "scoreJudgeFlag": publish_task_detail_data.get("scoreJudgeFlag"), "scoreJudgeTasks": publish_task_detail_data.get("scoreJudgeTasks"), "machineReviewMatchMode": publish_task_detail_data.get("machineReviewMatchMode"), "id": publish_task_detail_data.get("id"), "planType": publish_task_detail_data.get("planType"), "planTag": publish_task_detail_data.get("planTag"), "tagPenetrateFlag": publish_task_detail_data.get("tagPenetrateFlag"), "actionObjects": publish_task_detail_data.get("actionObjects"), "actionContents": publish_task_detail_data.get("actionContents"), "accountFrom": publish_task_detail_data.get("accountFrom"), "actionContentAllocationRule": publish_task_detail_data.get("actionContentAllocationRule"), "publishPerNum": publish_task_detail_data.get("publishPerNum"), "publishPerMinNum": publish_task_detail_data.get("publishPerMinNum"), "pushType": publish_task_detail_data.get("pushType"), "triggerEvent": publish_task_detail_data.get("triggerEvent"), "pushContentSortingRules": publish_task_detail_data.get("pushContentSortingRules"), "biliDistrict": publish_task_detail_data.get("biliDistrict"), "firstItemScoreJudgeTaskId": publish_task_detail_data.get("firstItemScoreJudgeTaskId"), "secondItemScoreJudgeTaskId": publish_task_detail_data.get("secondItemScoreJudgeTaskId"), "otherItemScoreJudgeTaskId": publish_task_detail_data.get("otherItemScoreJudgeTaskId"), "gzhArticleSortFlag": publish_task_detail_data.get("gzhArticleSortFlag"), "gzhArticleSortTask": publish_task_detail_data.get("gzhArticleSortTask"), "miniprogramInsertFlag": publish_task_detail_data.get("miniprogramInsertFlag"), "miniprogramInsertTasks": publish_task_detail_data.get("miniprogramInsertTasks"), "machineReviewConditions": publish_task_detail_data.get("machineReviewConditions"), "gzhTriggerSyncFrequency": publish_task_detail_data.get("gzhTriggerSyncFrequency"), "gzhTriggerSendContentType": publish_task_detail_data.get("gzhTriggerSendContentType"), "longArticleSystemHost": publish_task_detail_data.get("longArticleSystemHost"), }, "baseInfo": PERSON_COOKIE }) response = requests.request("POST", url, headers=HEADERS, data=payload) print(response.json()) else: return def delete_articles(gh_id, title): """ 删除公众号文章 :param gh_id: :param title: :return: """ url = "http://101.37.174.139:80/articleAudit/titleDangerFindDelete" payload = { "ghId": gh_id, 'title': title } headers = { 'Content-Type': 'application/json;charset=UTF-8' } response = requests.request("POST", url, headers=headers, json=payload, timeout=600) return response def get_only_auto_reply_accounts(): """ 获取急转的账号 """ sql = "select publish_account_id from publish_account_remark where remark like '%即转%';" denet = DeNetMysql() result = denet.select(sql) account_id_list = [i[0] for i in result] return set(account_id_list)