ElasticSearch 项目实践

基于 Apache Lucene 构建的开源分布式搜索和数据分析引擎, 采用 Java 语言编写, 提供简单易用的 Restful API. 具有易于扩展, 可以支持 PB 级数据处理, 以及查询数据快等特点.

基本概念

  • 集群与节点: ES 本质是一个分布式数据库, 允许多台服务器协同工作, 每台服务器可以运行多个 ES 实例. 单个 ES 实例称为一个节点 (node), 一组节点构成一个集群 (cluster).
  • 索引: 含有相同属性的文档集合
  • 类型: 索引可以定义一个或多个类型
  • 文档: 可以被搜索的基本数据单位
  • 分片: 每个索引有多个分片, 每个分片都是一个 Lucene 索引
  • 备份: 拷贝一个分片就完成了分片的备份

以上图为例, 是我们团队的 ES 集群 plat_prod_poinlp:

  • 3 个节点
  • 5 个索引
  • 42 个分片, 除默认索引 .kibana 只有一个主分片以外, 其余四个索引均有 5 个主分片, 一共 21 个主分片, 然后每个主分片有 1 个备份分片, 因此一共 42 个分片. 从上图来看, 边框加粗的分片表示在用的主分片, 相对的是备份分片, 值得注意的是, 主分片与备份分片一定不会在一个节点上, 每个节点都有主分片和备份分片.
  • 1.2 亿+ 文档
  • Java 版本 1.8, ES 版本: 5.6.3

索引结构

  • settings: 修改 ES 配置, 比如修改副本数, 分片数, 自定义分词等.
  • mappings: 设置字段属性
    • keyword: 关键字类型, 不会被分词
    • text: 文本类型, 默认会被分词
    • numberic (long, integer, short, byte, double, float), 其中 integer 和 float 运用比较多
    • date 日期类型
    • Boolean 布尔类型
  • aliases: 索引别名
{
  "aliases": {},
  "mappings": {
    "article": {
      "properties": {
        "category": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "cateogry": {
          "type": "keyword"
        },
        "city": {
          "type": "keyword"
        },
        "id": {
          "type": "long"
        },
        "isOriginal": {
          "type": "boolean"
        },
        "keywords": {
          "type": "text",
          "analyzer": "keywords_analyzer"
        },
        "mediaId": {
          "type": "integer"
        },
        "publishtime": {
          "type": "date",
          "format": "yyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
        },
        "quantity": {
          "type": "integer"
        },
        "score": {
          "type": "double"
        },
        "title": {
          "type": "text",
          "boost": 8,
          "analyzer": "ik_max_word",
          "include_in_all": true
        },
        "updatetime": {
          "type": "date",
          "format": "yyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
        }
      }
    }
  },
  "settings": {
      "number_of_shards": "5",
      "number_of_replicas": "1",
      "refresh_interval": "1s",
      "analysis": {
        "analyzer": {
          "keywords_analyzer": {
            "lowercase": "true",
            "pattern": ",",
            "type": "pattern"
          }
        }
      }
    }
  }
}
{
  "aliases": {
    "reviewFeature": {}
  },
  "mappings": {
    "review_feature": {
      "properties": {
        "biz_id": {
          "type": "long"
        },
        "city_id": {
          "type": "long"
        },
        "feature_id": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "shop_id": {
          "type": "long"
        },
        "shop_type": {
          "type": "long"
        }
      }
    }
  },
  "settings": {
      "number_of_shards": "5",
      "number_of_replicas": "1"
    }
  }
}

创建索引

  • java api
  • hive udf

查询

es rest client

单个 ip

@Bean
public RestHighLevelClient restHighLevelClient() {
    HttpHost httpHost = new HttpHost(globalSetting.getEsHostName(), globalSetting.getEsPort(), "http");
    return new RestHighLevelClient(RestClient.builder(httpHost));
}

多个 ip

@Bean
public RestHighLevelClient restHighLevelClient() {
    String[] hosts = globalSetting.getEsHostName().split(",");
    HttpHost[] httpHosts = new HttpHost[hosts.length];
    for (int i = 0; i < hosts.length; i++) {
        httpHosts[i] = new HttpHost(hosts[i], globalSetting.getEsPort(), "http");
    }
    return new RestHighLevelClient(RestClient.builder(httpHosts));
}

查询语句

查询的一般步骤

// 构建 query
TermQueryBuilder queryBuilder = QueryBuilders.termQuery("taxonomy_list", taxonomies.get(taxonomies.size() - 1));

// 构建 source
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(queryBuilder);
sourceBuilder.from((page - 1) * pageSize);
sourceBuilder.size(pageSize);

// 构建 request
SearchRequest request = new SearchRequest("note");
request.types("item");
request.source(sourceBuilder);

// 执行 search, 并解析搜索结果
SearchResponse response = restHighLevelClient.search(request);
SearchHits hits = response.getHits();
List<String> noteIds = new ArrayList<>();
for (SearchHit hit : hits.getHits()) {
    Map<String, Object> res = hit.getSourceAsMap();
    noteIds.add((String) res.get("id"));
}

// 获取总的匹配数
hits.getTotalHits()
TermQueryBuilder queryBuilder = QueryBuilders.termQuery("name", "肯德基");
MatchQueryBuilder queryBuilder = QueryBuilders.matchQuery("name", name);
NestedQueryBuilder queryBuilder = QueryBuilders.nestedQuery("tags", QueryBuilders.termsQuery("tags.tag_name", tagNameSet), ScoreMode.Total);
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
        .must(QueryBuilders.matchQuery("name", name))
        .must(QueryBuilders.termQuery("category", target.getCategory()))
        .mustNot(QueryBuilders.termQuery("id", target.getId().toString()))
        .must(QueryBuilders.geoDistanceQuery("latlng_tcnt_amap").point(lat, lng).distance(10, DistanceUnit.KILOMETERS));

  • Match All Query
    • matchAllQuery() 匹配所有文件, 相当于 select * from tbl
  • Full text queries
    • matchQuery
    • multiMatchQuery
    • commonTermsQuery
    • queryStringQuery
    • simpleQueryStringQuery
  • Term level queries
    • termQuery("field", val) 完全包含, 没有分词
    • termsQuery("field", val1, val2, ...) 完全或匹配
    • rangeQuery
    • existsQuery
    • prefixQuery
    • wildcardQuery 通配符查询
    • regexpQuery
    • fuzzyQuery 模糊查询
    • typeQuery
    • idsQuery 通过 id 查询
  • Compound queries
    • constantScoreQuery
    • boolQuery
      • must 文档必须完全匹配条件
      • should 至少满足一个条件
      • must_not 文档必须不匹配条件
    • disMaxQuery
    • Function Score Query
    • boostingQuery
  • Joining queries
    • nestedQuery 嵌套查询
    • Has Child Query
    • Has Parent Query
  • Geo queries
    • geoDistanceQuery
  • Specialized queries
  • Span queries

  • match phrase query
  • match phrase prefix query

参考文献