conf_task.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. import copy
  2. import logging
  3. import os
  4. import sys
  5. import time
  6. import requests
  7. from flask import Flask, request
  8. from flask import jsonify
  9. from dotenv import load_dotenv
  10. sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))
  11. from conf.config import get_config
  12. from common.db.mysql_help import MysqlHelper
  13. load_dotenv(verbose=True)
  14. env = os.getenv('env')
  15. app = Flask(__name__)
  16. app.config['JSON_AS_ASCII'] = False
  17. # mysql实例
  18. mysql_con = MysqlHelper()
  19. conf = get_config()
  20. @app.route("/v1/crawler/task/dellink", methods=["POST"])
  21. def delSpiderLink():
  22. data = request.json
  23. spider_links = data['spider_link']
  24. del_link = []
  25. for link in spider_links:
  26. up_sql = f'update crawler_author_map set is_del=0 where spider_link="{link}"'
  27. MysqlHelper.update_values(up_sql)
  28. del_link.append(link)
  29. return jsonify({'code': 200, 'message': '抓取名单删除成功', 'del_link': del_link})
  30. @app.route("/v1/crawler/task/getcategory", methods=["GET"])
  31. def getCategory():
  32. sql = f'select id, content_category from crawler_content_category'
  33. result = mysql_con.get_values(sql)
  34. return jsonify({'code': 200, 'data': result})
  35. @app.route("/v1/crawler/task/getmodename", methods=["GET"])
  36. def getModeName():
  37. sql = f'select id, mode_name from crawler_mode'
  38. result = mysql_con.get_values(sql)
  39. return jsonify({'code': 200, 'data': result})
  40. @app.route("/v1/crawler/task/getmodeboard", methods=["GET"])
  41. def getModeBoard():
  42. sql = f'select id, mode_board from crawler_board'
  43. result = mysql_con.get_values(sql)
  44. return jsonify({'code': 200, 'data': result})
  45. @app.route("/v1/crawler/user/findmedia", methods=["GET"])
  46. def getMediaInfo():
  47. data = request.args.to_dict()
  48. task_id = data['task_id']
  49. sql = f'select * from crawler_author_map where task_id={task_id}'
  50. result = mysql_con.get_values(sql)
  51. task_user_info = []
  52. for task_info in result:
  53. media_id = task_info['media_id']
  54. media_info = requests.get(url=conf['select_media_url'], params={'uid': media_id}).json()['content']
  55. media_name = media_info['longvideoNickName'] if media_info['longvideoNickName'] else media_info['nickName']
  56. nick_name = task_info['nick_name']
  57. spider_link = task_info['spider_link']
  58. create_user_time = task_info['create_user_time']
  59. media_data = dict(
  60. media_name=media_name,
  61. nick_name=nick_name,
  62. spider_link=spider_link,
  63. media_id={'media_id': media_id, 'media_url': conf['media_main_url'].format(media_id)},
  64. create_user_time=create_user_time
  65. )
  66. task_user_info.append(media_data)
  67. return jsonify({'code': 200, 'data': task_user_info})
  68. @app.route("/v1/crawler/task/findtask", methods=["GET"])
  69. def getTaskUserInfo():
  70. # 根据条件查找任务
  71. data = request.args.to_dict()
  72. values = ''
  73. for k, v in data.items():
  74. if isinstance(v, int):
  75. values += f'{k}={v} and '
  76. else:
  77. values += f'{k}="{v}" and '
  78. sql = f"select task_id from crawler_author_map where {values[:-4]}" # [:-1]是为了去掉末尾的逗号
  79. res = mysql_con.get_values(sql)
  80. task_id = res['task_id']
  81. sql = f'select task_name, source, task_type, create_task_user, insert_time, update_task_user, update_time from crawler_task where task_id={task_id} '
  82. task_info = mysql_con.get_values(sql)
  83. return jsonify({'code': 200, 'data': task_info})
  84. # 只接受get方法访问
  85. @app.route("/v1/crawler/source/getall", methods=["GET"])
  86. def getSource():
  87. try:
  88. # 获取传入的params参数
  89. get_data = request.args.to_dict()
  90. # # 对参数进行操作
  91. sql = 'select source, task_type, spider_name, machine, source_desc, task_type_desc, spider_name_desc from crawler_source'
  92. result = mysql_con.get_values(sql)
  93. if not result:
  94. return jsonify({'code': '200', 'result': [], 'message': '没有更多数据'})
  95. source_list = list()
  96. for source_info in result:
  97. source_dict = {
  98. 'task_type': [
  99. {
  100. 'type': source_info['task_type'],
  101. 'description': source_info['task_type_desc'],
  102. 'spider': {
  103. 'spider_name': source_info['spider_name'],
  104. 'description': source_info['spider_name_desc']
  105. }
  106. }
  107. ],
  108. 'description': source_info['source_desc'],
  109. 'source': source_info['source'],
  110. 'machine': source_info['machine']
  111. }
  112. source_list.append(source_dict)
  113. except Exception as e:
  114. return jsonify({'code': '400', 'message': '获取数据源信息失败'})
  115. return jsonify({'code': '200', 'result': source_list})
  116. @app.route("/v1/crawler/task/checkrepeat", methods=["POST"])
  117. def get_repeat_list():
  118. data = request.json
  119. # 字段转换
  120. spider_links = data.get('spider_link')
  121. repeat_list = list()
  122. # 判断是否为重复名单
  123. for spider_link in spider_links:
  124. if isinstance(spider_link, int):
  125. s_sql = f"""select spider_link from crawler_author_map where spider_link={spider_link}"""
  126. else:
  127. s_sql = f"""select spider_link from crawler_author_map where spider_link='{spider_link}'"""
  128. result = mysql_con.get_values(s_sql)
  129. if result:
  130. repeat_list.append(spider_link)
  131. if repeat_list:
  132. return jsonify({'code': 400, 'message': '名单重复', 'repeat_list': repeat_list})
  133. else:
  134. return jsonify({'code': 200, 'message': '抓取名单校验通过', 'repeat_list': repeat_list})
  135. @app.route("/v1/crawler/task/insert", methods=["POST"])
  136. def insertTask():
  137. try:
  138. data = request.json
  139. user_data = copy.deepcopy(data)
  140. tag_name_list = []
  141. content_tag_list = []
  142. user_tag = data['user_tag']
  143. user_content_tag = data['user_content_tag']
  144. for tag in user_tag:
  145. tag_name_list.append(tag['tagName'])
  146. for tag in user_content_tag:
  147. content_tag_list.append(tag['tagName'])
  148. if data['min_publish_time']:
  149. data['min_publish_time'] = int(data['min_publish_time'] / 1000)
  150. else:
  151. data['min_publish_time'] = 0
  152. # if data['min_publish_day']:
  153. # data['min_publish_day'] = 0
  154. data['next_time'] = int(data['next_time'] / 1000)
  155. data['insert_time'] = int(time.time())
  156. data['update_time'] = int(time.time())
  157. data['spider_link'] = str(data['spider_link'])
  158. data['spider_rule'] = str(data['spider_rule'])
  159. data['user_tag_info'] = str(user_tag)
  160. data['content_tag_info'] = str(user_content_tag)
  161. data['user_tag'] = ','.join(str(i) for i in tag_name_list)
  162. data['user_content_tag'] = ','.join(str(i) for i in content_tag_list)
  163. # data['crawler_interval'] = data.pop('interval')
  164. # 获取到一个以键且为逗号分隔的字符串,返回一个字符串
  165. keys = ','.join(data.keys())
  166. values = ','.join(['%s'] * len(data))
  167. sql = 'insert into {table}({keys}) VALUES({values})'.format(table='crawler_task', keys=keys, values=values)
  168. task_id = mysql_con.insert_values(sql, tuple(data.values()))
  169. if task_id:
  170. success_list, fail_list = create_uid(user_data, task_id)
  171. return jsonify(
  172. {'code': 200, 'message': 'task create success', 'success_list': success_list, 'fail_list': fail_list})
  173. except Exception as e:
  174. return jsonify({'code': 500, 'message': '任务写入失败,原因:{e}'})
  175. @app.route("/v1/crawler/task/gettask", methods=["GET"])
  176. def getAllTask():
  177. try:
  178. get_data = request.args.to_dict()
  179. page = int(get_data.get('page', 1))
  180. offset = int(get_data.get('offset', 10))
  181. start_count = (page * offset) - offset
  182. end_count = page * offset
  183. sql = f"""select task_id, task_name, source, task_type, create_task_user, insert_time, update_task_user, update_time from crawler_task order by update_time desc limit {start_count}, {end_count} """
  184. result = mysql_con.get_values(sql)
  185. if not result:
  186. return jsonify({'code': '200', 'result': [], 'message': '没有更多任务'})
  187. task_list = list()
  188. for task_info in result:
  189. task_data = dict(
  190. task_id=task_info['task_id'],
  191. task_name=task_info['task_name'],
  192. source=task_info['source'],
  193. task_type=task_info['task_type'],
  194. create_task_user=task_info['create_task_user'],
  195. insert_time=task_info['insert_time'] * 1000,
  196. update_task_user=task_info['update_task_user'],
  197. update_time=task_info['update_time'] * 1000
  198. )
  199. task_list.append(task_data)
  200. t_sql = f"""select count(*) from crawler_task"""
  201. t_res = mysql_con.get_values(t_sql)
  202. total = t_res[0]['count(*)']
  203. except Exception as e:
  204. return jsonify({"code": "400", 'message': "任务列表获取失败"})
  205. return jsonify({'code': '200', 'result': task_list, 'total': total})
  206. @app.route("/v1/crawler/task/getone", methods=["GET"])
  207. def getOneTask():
  208. try:
  209. get_data = request.args.to_dict()
  210. task_id = get_data['task_id']
  211. sql = f'select * from crawler_task where task_id={task_id}'
  212. result = mysql_con.get_values(sql)
  213. if not result:
  214. return jsonify({'code': '400', 'result': [], 'message': 'no data'})
  215. data = result[0]
  216. if data['min_publish_time']:
  217. data['min_publish_time'] = data['min_publish_time'] * 1000
  218. else:
  219. data['min_publish_time'] = 0
  220. data['next_time'] = data['next_time'] * 1000
  221. data['spider_link'] = eval(data['spider_link'])
  222. data['spider_rule'] = eval(data['spider_rule'])
  223. #
  224. data['user_tag_info'] = eval(data['user_tag_info'])
  225. data['content_tag_info'] = eval(data['content_tag_info'])
  226. except Exception as e:
  227. return jsonify({'code': '500', "message": "获取任务信息失败"})
  228. return jsonify({'code': '200', 'result': result})
  229. @app.route("/v1/crawler/task/update", methods=["POST"])
  230. def updateTask():
  231. try:
  232. data = request.json
  233. task_id = data.get('task_id')
  234. task_info = data.get('task_info')
  235. values = ''
  236. task_info['min_publish_time'] = task_info['min_publish_time']/1000
  237. task_info['next_time'] = task_info['next_time']/1000
  238. for k, v in task_info.items():
  239. if isinstance(v, int):
  240. values += f'{k}={v},'
  241. else:
  242. values += f'{k}="{v}",'
  243. sql = f'update crawler_task set {values[:-1]} where task_id={task_id}'
  244. result = mysql_con.update_values(sql)
  245. if result:
  246. return jsonify({'code': 200, 'message': 'task update success'})
  247. else:
  248. return jsonify({'code': 400, 'message': 'task update faild'})
  249. except Exception as e:
  250. return jsonify({'code': 400, 'message': '任务更新失败'})
  251. def create_uid(task, task_id):
  252. spider_link = task.get('spider_link')
  253. source = task.get('source')
  254. task_type = task.get('task_type')
  255. applets_status = task.get('applets_status')
  256. app_status = task.get('app_status')
  257. user_tag = task.get('user_tag')
  258. user_content_tag = task.get('user_content_tag')
  259. mode_name_id = task.get('mode_name_id', 0)
  260. mode_board_id = task.get('mode_board_id', 0)
  261. content_category_id = task.get('content_category_id', 0)
  262. mn_sql = f'select * from crawler_mode_name where id={mode_name_id}'
  263. mode_name_list = mysql_con.get_values(mn_sql)
  264. mb_sql = f'select * from crawler_mode_board where id={mode_board_id}'
  265. mode_board_list = mysql_con.get_values(mb_sql)
  266. cc_sql = f'select * from crawler_content_category where id={content_category_id}'
  267. content_category_list = mysql_con.get_values(cc_sql)
  268. if mode_name_list:
  269. task['mode_name_str'] = mode_name_list[0]['mode_name']
  270. else:
  271. task['mode_name_str'] = ''
  272. if mode_board_list:
  273. task['mode_board_str'] = mode_board_list[0]['mode_board']
  274. else:
  275. task['mode_board_str'] = ''
  276. if content_category_list:
  277. task['content_category_str'] = content_category_list[0]['content_category']
  278. else:
  279. task['content_category_str'] = ''
  280. success_list = list()
  281. fail_list = list()
  282. tag_name_list = list()
  283. content_tag_list = list()
  284. for tag in user_tag:
  285. tag_name_list.append(tag['tagName'])
  286. for tag in user_content_tag:
  287. content_tag_list.append(tag['tagName'])
  288. user_tags = ','.join(str(i) for i in tag_name_list)
  289. user_content_tags = ','.join(str(i) for i in content_tag_list)
  290. for author_url in spider_link:
  291. now_time = int(time.time())
  292. time_array = time.localtime(now_time)
  293. str_time = time.strftime("%Y-%m-%d", time_array)
  294. # 生成创建用户的tag
  295. tags = ""
  296. tags_list = ['spider', user_tags, task['mode_name_str'], task['mode_board_str'],
  297. task['content_category_str'], str_time]
  298. for v in tags_list:
  299. if v:
  300. tags += str(v) + ','
  301. post_data = {
  302. # 'count': 1, # (必须)账号个数:传1
  303. # 'accountType': 4, # (必须)账号类型 :传 4 app虚拟账号
  304. 'pwd': '', # 密码 默认 12346
  305. 'nickName': '', # 昵称 默认 vuser......
  306. 'avatarUrl': '',
  307. # 头像Url 默认 http://weapppiccdn.yishihui.com/resources/images/pic_normal.png
  308. 'tagName': tags[:-1], # 多条数据用英文逗号分割
  309. }
  310. try:
  311. response = requests.post(url=conf['media_url'], params=post_data)
  312. media_id = response.json()['data']
  313. except Exception as e:
  314. logging.warning(f'创建账户:{spider_link},失败,原因:{e}')
  315. fail_list.append(author_url)
  316. continue
  317. data = dict(
  318. spider_link=author_url,
  319. media_id=media_id,
  320. source=source,
  321. task_type=task_type,
  322. applets_status=applets_status,
  323. app_status=app_status,
  324. user_tag=user_tags,
  325. user_content_tag=user_content_tags,
  326. insert_time=int(time.time()),
  327. update_time=int(time.time()),
  328. create_user_time=now_time,
  329. mode_name_str=task['mode_name_str'],
  330. mode_board_str=task['mode_board_str'],
  331. content_category_str=task['content_category_str'],
  332. # mode_value_str=mode_value_str,
  333. task_id=task_id,
  334. media_main_url=conf['media_main_url'].format(media_id)
  335. )
  336. keys = ','.join(data.keys())
  337. values = ','.join(['%s'] * len(data))
  338. table = 'crawler_author_map'
  339. sql = f"""insert into {table}({keys}) VALUES({values})"""
  340. mysql_con.insert_values(sql, tuple(data.values()))
  341. uer_info = dict(
  342. outer_id=author_url,
  343. uid=media_id
  344. )
  345. success_list.append(uer_info)
  346. return success_list, fail_list
  347. @app.route("/v1/crawler/author/create", methods=["POST"])
  348. def createUser():
  349. spider_link = request.json.get('spider_link')
  350. source = request.json.get('source')
  351. task_type = request.json.get('task_type')
  352. applets_status = request.json.get('applets_status')
  353. app_status = request.json.get('app_status')
  354. user_tag = request.json.get('user_tag')
  355. user_content_tag = request.json.get('user_content_tag')
  356. success_list = list()
  357. fail_list = list()
  358. for author_url in spider_link:
  359. try:
  360. f_sql = f"""select spider_link from crawler_author_map where spider_link="{author_url}" """
  361. result = mysql_con.get_values(f_sql)
  362. if result:
  363. success_list.append(author_url)
  364. continue
  365. else:
  366. tag_name_list = []
  367. content_tag_list = []
  368. for tag in user_tag:
  369. tag_name_list.append(tag['tagName'])
  370. for tag in user_content_tag:
  371. content_tag_list.append(tag['tagName'])
  372. user_tags = ','.join(str(i) for i in tag_name_list)
  373. user_content_tags = ','.join(str(i) for i in content_tag_list)
  374. post_data = {
  375. # 'count': 1, # (必须)账号个数:传1
  376. # 'accountType': 4, # (必须)账号类型 :传 4 app虚拟账号
  377. 'pwd': '', # 密码 默认 12346
  378. 'nickName': '', # 昵称 默认 vuser......
  379. 'avatarUrl': '',
  380. # 头像Url 默认 http://weapppiccdn.yishihui.com/resources/images/pic_normal.png
  381. 'tagName': user_tags, # 多条数据用英文逗号分割
  382. }
  383. response = requests.post(url=conf['media_url'], params=post_data)
  384. media_id = response.json()['data']
  385. data = dict(
  386. spider_link=author_url,
  387. media_id=media_id,
  388. source=source,
  389. task_type=task_type,
  390. applets_status=applets_status,
  391. app_status=app_status,
  392. user_tag=user_tags,
  393. user_content_tag=user_content_tags,
  394. insert_time=int(time.time()),
  395. update_time=int(time.time())
  396. )
  397. keys = ','.join(data.keys())
  398. values = ','.join(['%s'] * len(data))
  399. table = 'crawler_author_map'
  400. sql = f"""insert into {table}({keys}) VALUES({values})"""
  401. result = mysql_con.insert_values(sql, tuple(data.values()))
  402. if not result:
  403. fail_list.append(author_url)
  404. else:
  405. success_list.append(author_url)
  406. except Exception as e:
  407. fail_list.append(author_url)
  408. continue
  409. return jsonify({'code': 200, 'result': {'success': success_list, 'fail': fail_list}})
  410. if __name__ == "__main__":
  411. app.run(debug=True, port=5050)