123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- import ssl
- import datetime
- import time
- import requests
- import json
- from pymysql.cursors import DictCursor
- from elasticsearch import Elasticsearch, ApiError
- from elasticsearch import helpers
- from applications.db import DatabaseConnector
- from config import long_articles_config
- from config.es_mappings import index_name, mappings, settings
- db_client = DatabaseConnector(long_articles_config)
- db_client.connect()
- ctx = ssl.create_default_context(cafile="config/es_certs.crt")
- es_password = 'nkvvASQuQ0XUGRq5OLvm'
- es = Elasticsearch(
- "https://192.168.205.85:9200",
- # 如果启用了用户名密码
- basic_auth=("elastic", es_password),
- ssl_context=ctx
- )
- def create_index():
- # 2. 防守式删除旧索引(可选)
- if es.indices.exists(index=index_name):
- es.indices.delete(index=index_name)
- # 3. 创建索引
- try:
- es.indices.create(
- index=index_name,
- settings=settings,
- mappings=mappings
- )
- print(f"✅ Index <{index_name}> created.")
- except ApiError as e:
- # 打印 ES 返回的具体错误
- print(f"❌ Failed: {e.meta.error['type']} – {e.meta.error['reason']}")
- def get_max_article_id():
- response = es.search(
- index=index_name,
- size=1,
- sort="article_id:desc",
- _source=["article_id"]
- )
- # print(json.dumps(response.body, indent=4))
- return response['hits']['hits'][0]['_source']['article_id']
- def get_articles(id_):
- fetch_query = f"""
- select article_id, platform, out_account_id, title
- from crawler_meta_article
- where status = 1 and article_id > %s
- order by article_id limit 10000;
- """
- # 执行查询
- results = db_client.fetch(fetch_query, cursor_type=DictCursor, params=(id_,))
- docs = [
- {
- "_index": index_name,
- "_id": item['article_id'],
- "_source": {
- "article_id": item['article_id'],
- "platform": item['platform'],
- "out_account_id": item['out_account_id'],
- "title": item['title']
- }
- } for item in results
- ]
- return docs
- def search():
- query = {
- "query": {
- "match": {
- "title": "刘伯承元帅"
- }
- },
- "_source": ["article_id", "title"]
- }
- a = time.time()
- resp = es.search(index=index_name, body=query)
- b = time.time()
- for hit in resp["hits"]["hits"]:
- print(hit["_source"])
- print(b - a)
- def get_cluster_docs_stats():
- """获取集群文档统计信息"""
- stats = es.nodes.stats()
- # print(stats)
- # print(type(stats))
- print(json.dumps(stats.body, indent=4, ensure_ascii=False))
- if __name__ == "__main__":
- max_id = get_max_article_id()
- i = 0
- while int(max_id) < 27492350:
- articles = get_articles(max_id)
- res = helpers.bulk(es, articles)
- print(es.count(index=index_name))
- max_id = get_max_article_id()
- i += 1
- print(i)
|