| MySQL | ElasticSearch |
|---|---|
| Database(数据库) | Index(索引) |
| Table(表) | Type(类型) |
| Row(行) | Document(文档) |
| Column(列) | Field(字段) |
| Schema(方案) | Mapping(映射) |
| Index(索引) | Everthing Indexed by default(所有字段都被索引) |
| SQL(结构化查询语言) | Query DSL(查询专用语言) |
Index API 允许我们存储一个JSON格式的文档,使得数据可以被搜索到。文档通过index、type、id唯一确定。id可以自己提供一个ID,也可以使用Index API为我们生成一个。
有四种不同的方式来产生JSON格式的文档(document)
/**
* 手动方式
* @throws UnknownHostException
*/
@Test
public void JsonDocument() throws UnknownHostException {
String json = "{" +
"/"user/":/"deepredapple/"," +
"/"postDate/":/"2018-01-30/"," +
"/"message/":/"trying out Elasticsearch/"" +
"}";
IndexResponse indexResponse = client.prepareIndex("fendo", "fendodate").setSource(json).get();
System.out.println(indexResponse.getResult());
}
/**
* Map方式
*/
@Test
public void MapDocument() {
Map<String, Object> json = new HashMap<String, Object>();
json.put("user", "hhh");
json.put("postDate", "2018-06-28");
json.put("message", "trying out Elasticsearch");
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(json).get();
System.out.println(response.getResult());
}
/**
* 使用JACKSON序列化
*/
@Test
public void JACKSONDocument() throws JsonProcessingException {
Blog blog = new Blog();
blog.setUser("123");
blog.setPostDate("2018-06-29");
blog.setMessage("try out ElasticSearch");
ObjectMapper mapper = new ObjectMapper();
byte[] bytes = mapper.writeValueAsBytes(blog);
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(bytes).get();
System.out.println(response.getResult());
}
/**
* 使用XContentBuilder帮助类方式
*/
@Test
public void XContentBuilderDocument() throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
.field("user", "xcontentdocument")
.field("postDate", "2018-06-30")
.field("message", "this is ElasticSearch").endObject();
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(builder).get();
System.out.println(response.getResult());
}
package com.deepredapple.es.document;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* @author DeepRedApple
*/
public class TestClient {
TransportClient client = null;
public static final String INDEX = "fendo";
public static final String TYPE = "fendodate";
@Before
public void beforeClient() throws UnknownHostException {
client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
}
/**
* 手动方式
* @throws UnknownHostException
*/
@Test
public void JsonDocument() throws UnknownHostException {
String json = "{" +
"/"user/":/"deepredapple/"," +
"/"postDate/":/"2018-01-30/"," +
"/"message/":/"trying out Elasticsearch/"" +
"}";
IndexResponse indexResponse = client.prepareIndex(INDEX, TYPE).setSource(json).get();
System.out.println(indexResponse.getResult());
}
/**
* Map方式
*/
@Test
public void MapDocument() {
Map<String, Object> json = new HashMap<String, Object>();
json.put("user", "hhh");
json.put("postDate", "2018-06-28");
json.put("message", "trying out Elasticsearch");
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(json).get();
System.out.println(response.getResult());
}
/**
* 使用JACKSON序列化
*/
@Test
public void JACKSONDocument() throws JsonProcessingException {
Blog blog = new Blog();
blog.setUser("123");
blog.setPostDate("2018-06-29");
blog.setMessage("try out ElasticSearch");
ObjectMapper mapper = new ObjectMapper();
byte[] bytes = mapper.writeValueAsBytes(blog);
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(bytes).get();
System.out.println(response.getResult());
}
/**
* 使用XContentBuilder帮助类方式
*/
@Test
public void XContentBuilderDocument() throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
.field("user", "xcontentdocument")
.field("postDate", "2018-06-30")
.field("message", "this is ElasticSearch").endObject();
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(builder).get();
System.out.println(response.getResult());
}
}
get API 可以通过id查看文档
GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();
参数分别为索引、类型、_id
setOperationThreaded设置为true是在不同的线程里执行此操作
/**
* Get API
*/
@Test
public void testGetApi() {
GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").setOperationThreaded(false).get();
Map<String, Object> map = getResponse.getSource();
Set<String> keySet = map.keySet();
for (String str : keySet) {
Object o = map.get(str);
System.out.println(o.toString());
}
}
根据ID删除:
DeleteResponse deleteResponse = client.prepareDelete(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();
参数为索引、类型、_id
setOperationThreaded设置为true是在不同的线程里执行此操作
/**
* deleteAPI
*/
@Test
public void testDeleteAPI() {
GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").setOperationThreaded(false).get();
System.out.println(getResponse.getSource());
DeleteResponse deleteResponse = client.prepareDelete(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();
System.out.println(deleteResponse.getResult());
}
通过查询条件删除
/**
* 通过查询条件删除
*/
@Test
public void deleteByQuery() {
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("user", "hhh")) //查询条件
.source(INDEX).get();//索引名
long deleted = response.getDeleted();//删除文档数量
System.out.println(deleted);
}
参数说明 QueryBuilders.matchQuery("user", "hhh") 的参数为字段和查询条件,source(INDEX)参数为索引名
当执行的删除的时间过长时,可以使用异步回调的方式执行删除操作,执行的结果在回调里面获取
/**
* 回调的方式执行删除 适合大数据量的删除操作
*/
@Test
public void DeleteByQueryAsync() {
for (int i = 1300; i < 3000; i++) {
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("user", "hhh " + i))
.source(INDEX)
.execute(new ActionListener<BulkByScrollResponse>() {
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted();
System.out.println("删除的文档数量为= "+deleted);
}
public void onFailure(Exception e) {
System.out.println("Failure");
}
});
}
}
监听回调方法是execute方法
.execute(new ActionListener<BulkByScrollResponse>() { //回调方法
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted();
System.out.println("删除的文档数量为= "+deleted);
}
public void onFailure(Exception e) {
System.out.println("Failure");
}
});
更新索引
主要有两种方法进行更新操作
/**
* 使用UpdateRequest进行更新
*/
@Test
public void testUpdateAPI() throws IOException, ExecutionException, InterruptedException {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(INDEX);
updateRequest.type(TYPE);
updateRequest.id("AWRFv-yAro3r8sDxIpib");
updateRequest.doc(jsonBuilder()
.startObject()
.field("user", "hhh")
.endObject());
client.update(updateRequest).get();
}
/**
* 使用PrepareUpdate
*/
@Test
public void testUpdatePrepareUpdate() throws IOException {
client.prepareUpdate(INDEX, TYPE, "AWRFvA7k0udstXU4tl60")
.setScript(new Script("ctx._source.user = /"DeepRedApple/"")).get();
client.prepareUpdate(INDEX, TYPE, "AWRFvA7k0udstXU4tl60")
.setDoc(jsonBuilder()
.startObject()
.field("user", "DeepRedApple")
.endObject()).get();
}
client.prepareUpdate中的setScript方法不同的版本的参数不同,这里直接传入值,也可以直接插入文件存储的脚本,然后直接执行脚本里面的数据进行更新操作。
使用脚本更新文档
/**
* 通过脚本更新
*/
@Test
public void testUpdateByScript() throws ExecutionException, InterruptedException {
UpdateRequest updateRequest = new UpdateRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIpia")
.script(new Script("ctx._source.user = /"LZH/""));
client.update(updateRequest).get();
}
更新文档,如果存在文档就更新,如果不存在就插入
/**
* 更新文档 如果存在更新,否则插入
*/
@Test
public void testUpsert() throws IOException, ExecutionException, InterruptedException {
IndexRequest indexRequest = new IndexRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIp12")
.source(jsonBuilder()
.startObject()
.field("user", "hhh")
.field("postDate", "2018-02-14")
.field("message", "ElasticSearch")
.endObject());
UpdateRequest updateRequest = new UpdateRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIp12")
.doc(jsonBuilder()
.startObject()
.field("user", "LZH")
.endObject())
.upsert(indexRequest); //如果不存在,就增加indexRequest
client.update(updateRequest).get();
}
一次获取多个文档,
/**
* 一次获取多个文档
*/
@Test
public void TestMultiGetApi() {
MultiGetResponse responses = client.prepareMultiGet()
.add(INDEX, TYPE, "AWRFv-yAro3r8sDxIpib") //一个ID的方式
.add(INDEX, TYPE, "AWRFvA7k0udstXU4tl60", "AWRJA72Uro3r8sDxIpip")//多个ID的方式
.add("blog", "blog", "AWG9GKCwhg1e21lmGSLH") //从另一个索引里面获取
.get();
for (MultiGetItemResponse itemResponse : responses) {
GetResponse response = itemResponse.getResponse();
if (response.isExists()) {
String source = response.getSourceAsString(); //_source
JSONObject jsonObject = JSON.parseObject(source);
Set<String> sets = jsonObject.keySet();
for (String str : sets) {
System.out.println("key -> " + str);
System.out.println("value -> "+jsonObject.get(str));
System.out.println("===============");
}
}
}
}
Buli API 可以实现批量插入
/**
* 批量插入
*/
@Test
public void testBulkApi() throws IOException {
BulkRequestBuilder requestBuilder = client.prepareBulk();
requestBuilder.add(client.prepareIndex(INDEX, TYPE, "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "张三")
.field("postDate", "2018-05-01")
.field("message", "zhangSan message")
.endObject()));
requestBuilder.add(client.prepareIndex(INDEX, TYPE, "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "李四")
.field("postDate", "2016-09-10")
.field("message", "Lisi message")
.endObject()));
BulkResponse bulkResponse = requestBuilder.get();
if (bulkResponse.hasFailures()) {
System.out.println("error");
}
}
使用Bulk Processor,Bulk Processor提供了一个简单的接口,在给定的大小的数量上定时批量自动请求
首先创建Bulk Processor实例
/**
* 创建Processor实例
*/
@Test
public void testCreateBulkProcessor() {
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
//调用Bulk之前执行,例如可以通过request.numberOfActions()方法知道numberOfActions
public void beforeBulk(long l, BulkRequest request) {
}
//调用Bulk之后执行,例如可以通过response.hasFailures()方法知道是否执行失败
public void afterBulk(long l, BulkRequest request, BulkResponse response) {
}
//调用失败抛出throwable
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
}
}).setBulkActions(10000) //每次10000个请求
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5MB一块
.setFlushInterval(TimeValue.timeValueSeconds(5))//无论请求数量多少,每5秒钟请求一次
.setConcurrentRequests(1)//设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
//设置自定义重复请求机制,最开始等待100毫秒,之后成倍增加,重试3次,当一次或者多次重复请求失败后因为计算资源不够抛出EsRejectedExecutionException
// 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
.build();
}
/**
* 创建Processor实例
*/
@Test
public void testCreateBulkProcessor() throws IOException {
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
//调用Bulk之前执行,例如可以通过request.numberOfActions()方法知道numberOfActions
public void beforeBulk(long l, BulkRequest request) {
}
//调用Bulk之后执行,例如可以通过response.hasFailures()方法知道是否执行失败
public void afterBulk(long l, BulkRequest request, BulkResponse response) {
}
//调用失败抛出throwable
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
}
}).setBulkActions(10000) //每次10000个请求
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5MB一块
.setFlushInterval(TimeValue.timeValueSeconds(5))//无论请求数量多少,每5秒钟请求一次
.setConcurrentRequests(1)//设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
//设置自定义重复请求机制,最开始等待100毫秒,之后成倍增加,重试3次,当一次或者多次重复请求失败后因为计算资源不够抛出EsRejectedExecutionException
// 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
.build();
//增加requests
bulkProcessor.add(new IndexRequest(INDEX, TYPE, "3").source(
jsonBuilder()
.startObject()
.field("user", "王五")
.field("postDate", "2019-10-05")
.field("message", "wangwu message")
.endObject()));
bulkProcessor.add(new DeleteRequest(INDEX, TYPE, "1"));
bulkProcessor.flush();
//关闭bulkProcessor
bulkProcessor.close();
client.admin().indices().prepareRefresh().get();
client.prepareSearch().get();
}
搜索API可以支持搜索查询,返回查询匹配的结果,它可以搜索一个index/type或者多个index/type,可以使用Query Java API 作为查询条件
Java 默认提供QUERY_AND_FETCH和DFS_QUERY_AND_FETCH两种search Types,但是这种模式应该由系统选择,而不是用户手动指定
实例
@Test
public void testSearchApi() {
SearchResponse response = client.prepareSearch(INDEX).setTypes(TYPE)
.setQuery(QueryBuilders.matchQuery("user", "hhh")).get();
SearchHit[] hits = response.getHits().getHits();
for (int i = 0; i < hits.length; i++) {
String json = hits[i].getSourceAsString();
JSONObject object = JSON.parseObject(json);
Set<String> strings = object.keySet();
for (String str : strings) {
System.out.println(object.get(str));
}
}
}
一般的搜索请求都时返回一页的数据,无论多大的数据量都会返回给用户,Scrolls API 可以允许我们检索大量的数据(甚至是全部数据)。Scroll API允许我们做一个初始阶段搜索页并且持续批量从ElasticSearch里面拉去结果知道结果没有剩下。Scroll API的创建并不是为了实时的用户响应,而是为了处理大量的数据。
/**
* 滚动查询
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void testScrollApi() throws ExecutionException, InterruptedException {
MatchQueryBuilder qb = matchQuery("user", "hhh");
SearchResponse response = client.prepareSearch(INDEX).addSort(FieldSortBuilder.DOC_FIELD_NAME,
SortOrder.ASC)
.setScroll(new TimeValue(60000)) //为了使用scroll,初始搜索请求应该在查询中指定scroll参数,告诉ElasticSearch需要保持搜索的上下文环境多长时间
.setQuery(qb)
.setSize(100).get();
do {
for (SearchHit hit : response.getHits().getHits()) {
String json = hit.getSourceAsString();
JSONObject object = JSON.parseObject(json);
Set<String> strings = object.keySet();
for (String str : strings) {
System.out.println(object.get(str));
}
}
response = client.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().get();
} while (response.getHits().getHits().length != 0);
}
虽然滚动时间已过,搜索上下文会自动被清除,但是一直保持滚动代价会很大,所以当我们不在使用滚动时要尽快使用Clear-Scroll API进行清除。
ClearScrollRequestBuilder clearBuilder = client.prepareClearScroll();
clearBuilder.addScrollId(response.getScrollId());
ClearScrollResponse scrollResponse = clearBuilder.get();
System.out.println("是否清楚成功:"+scrollResponse.isSucceeded());
MultiSearch API 允许在同一个API中执行多个搜索请求。它的端点是_msearch
@Test
public void testMultiSearchApi() {
SearchRequestBuilder srb1 = client.prepareSearch().setQuery(QueryBuilders.queryStringQuery("elasticsearch")).setSize(1);
SearchRequestBuilder srb2 = client.prepareSearch().setQuery(QueryBuilders.matchQuery("user", "hhh")).setSize(1);
MultiSearchResponse multiSearchResponse = client.prepareMultiSearch().add(srb1).add(srb2).get();
long nbHits = 0;
for (MultiSearchResponse.Item item : multiSearchResponse.getResponses()) {
SearchResponse response = item.getResponse();
nbHits += response.getHits().getTotalHits();
}
System.out.println(nbHits);
}
聚合框架有助于根据搜索查询提供数据。它是基于简单的构建块也称为整合,整合就是将复杂的数据摘要有序的放在一块。聚合可以被看做是从一组文件中获取分析信息的一系列工作的统称。聚合的实现过程就是定义这个文档集的过程
@Test
public void testAggregations() {
SearchResponse searchResponse = client.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.addAggregation(AggregationBuilders.terms("LZH").field("user"))
.addAggregation(AggregationBuilders.dateHistogram("2013-01-30").field("postDate")
.dateHistogramInterval(DateHistogramInterval.YEAR)).get();
Terms lzh = searchResponse.getAggregations().get("user");
Histogram postDate = searchResponse.getAggregations().get("2013-01-30");
}
获取文档的最大数量,如果设置了,需要通过SearchResponse对象里面的isTerminatedEarly()判断返回文档是否达到设置的数量
@Test
public void TestTerminate() {
SearchResponse searchResponse = client.prepareSearch(INDEX)
.setTerminateAfter(2) //如果达到这个数量,提前终止
.get();
if (searchResponse.isTerminatedEarly()) {
System.out.println(searchResponse.getHits().totalHits);
}
}
聚合。ElasticSearch提供完整的Java API来使用聚合。使用AggregationBuilders构建对象,增加到搜索请求中。
SearchResponse response = client.prepareSearch().setQuery(/*查询*/).addAggregation(/*聚合*/).execute().actionGet();
结构化聚合。
在计算度量类的这类聚合操作是以使用一种方式或者从文档中提取需要聚合的值为基础。
在这中间主要使用的类是** AggregationBuilders
**,这里面包含了大量的一下的聚合方法调用,直接使用即可
MinAggregationBuilder aggregation = AggregationBuilders.min("agg").field("age");
SearchResponse sr = client.prepareSearch("twitter").addAggregation(aggregation).get();
Min agg = sr.getAggregations().get("agg");
String value = agg.getValueAsString();//这个统计的是日期,一般用下面方法获得最小值
System.out.println("min value:" + value);
第一行MinAggregationBuilder的toString()执行的内容如下
{
"error": "JsonGenerationException[Can not write a field name, expecting a value]"
}
SearchResponse sr = client.prepareSearch("twitter").addAggregation(aggregation).get();
在SearchResponse的toString()的内容如下, 这个内容就是查询的JSON结果,这里面的JSON结果的结构与SearchResponse的API操作相配套使用可以获取到里面的每一个值。
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 4,
"max_score": 1.0,
"hits": [
{
"_index": "twitter",
"_type": "tweet",
"_id": "10",
"_score": 1.0,
"_source": {
"user": "kimchy",
"postDate": "2018-06-29T09:10:21.396Z",
"age": 30,
"gender": "female",
"message": "trying out Elasticsearch"
}
},
{
"_index": "twitter",
"_type": "tweet",
"_id": "2",
"_score": 1.0,
"_source": {
"user": "kimchy",
"postDate": "2018-06-29T09:05:33.943Z",
"age": 20,
"gender": "female",
"message": "trying out Elasticsearch"
}
},
{
"_index": "twitter",
"_type": "tweet",
"_id": "1",
"_score": 1.0,
"_source": {
"user": "kimchy",
"postDate": "2018-06-29T08:59:00.191Z",
"age": 10,
"gender": "male",
"message": "trying out Elasticsearch"
}
},
{
"_index": "twitter",
"_type": "tweet",
"_id": "11",
"_score": 1.0,
"_source": {
"user": "kimchy",
"postDate": "2018-06-29T09:10:54.386Z",
"age": 30,
"gender": "female",
"message": "trying out Elasticsearch"
}
}
]
},
"aggregations": {
"agg": {
"value": 10.0
}
}
}
通过观察可以发现 sr.getAggregations().get("agg");
方法就是获取其中的聚合统计的数据,其中整个代码中的参数agg可以自定义
MaxAggregationBuilder aggregation = AggregationBuilders.max("agg").field("readSize");
SearchResponse sr = client.prepareSearch("blog").addAggregation(aggregation).get();
Max agg = sr.getAggregations().get("agg");
String value = agg.getValueAsString();
System.out.println("max value:" + value);
具体分析方法如Min Aggregation聚合一样, 但是不能统计出是哪一条数据的最大最小值
SumAggregationBuilder aggregation = AggregationBuilders.sum("agg").field("readSize");
SearchResponse sr = client.prepareSearch("blog").addAggregation(aggregation).get();
Sum agg = sr.getAggregations().get("agg");
String value = agg.getValueAsString();
System.out.println("sum value:" + value);
AvgAggregationBuilder aggregation = AggregationBuilders.avg("agg").field("age");
SearchResponse searchResponse = client.prepareSearch("twitter").addAggregation(aggregation).get();
Avg avg = searchResponse.getAggregations().get("agg");
String value = avg.getValueAsString();
System.out.println("avg value: "+ value);
统计聚合——基于文档的某个值,计算出一些统计信息(min、max、sum、count、avg), 用于计算的值可以是特定的数值型字段,也可以通过脚本计算而来。
StatsAggregationBuilder aggregation = AggregationBuilders.stats("agg").field("age");
SearchResponse searchResponse = client.prepareSearch("twitter").addAggregation(aggregation).get();
Stats stats = searchResponse.getAggregations().get("agg");
String max = stats.getMaxAsString();
String min = stats.getMinAsString();
String avg = stats.getAvgAsString();
String sum = stats.getSumAsString();
long count = stats.getCount();
System.out.println("max value: "+max);
System.out.println("min value: "+min);
System.out.println("avg value: "+avg);
System.out.println("sum value: "+sum);
System.out.println("count value: "+count);
这个聚合统计可以统计出上面的平常的统计值。当需要统计上面的大部分的值时,可以使用这种方式
扩展统计聚合——基于文档的某个值,计算出一些统计信息(比普通的stats聚合多了sum_of_squares、variance、std_deviation、std_deviation_bounds),用于计算的值可以是特定的数值型字段,也可以通过脚本计算而来。主要的结果值就是最大、最小、方差、平方差等统计值
ExtendedStatsAggregationBuilder aggregation = AggregationBuilders.extendedStats("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
ExtendedStats extended = response.getAggregations().get("agg");
String max = extended.getMaxAsString();
String min = extended.getMinAsString();
String avg = extended.getAvgAsString();
String sum = extended.getSumAsString();
long count = extended.getCount();
double stdDeviation = extended.getStdDeviation();
double sumOfSquares = extended.getSumOfSquares();
double variance = extended.getVariance();
System.out.println("max value: " +max);
System.out.println("min value: " +min);
System.out.println("avg value: " +avg);
System.out.println("sum value: " +sum);
System.out.println("count value: " +count);
System.out.println("stdDeviation value: " +stdDeviation);
System.out.println("sumOfSquares value: " +sumOfSquares);
System.out.println("variance value: "+variance);
值计数聚合——计算聚合文档中某个值的个数, 用于计算的值可以是特定的数值型字段,也可以通过脚本计算而来。该聚合一般域其它 single-value 聚合联合使用,比如在计算一个字段的平均值的时候,可能还会关注这个平均值是由多少个值计算而来。
ValueCountAggregationBuilder aggregation = AggregationBuilders.count("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
ValueCount count = response.getAggregations().get("agg");
long value = count.getValue();
System.out.println("ValueCount value: " +value);
PercentilesAggregationBuilder aggregation = AggregationBuilders.percentiles("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
Percentiles agg = response.getAggregations().get("agg");
for (Percentile entry : agg) {
double percent = entry.getPercent();
double value = entry.getValue();
System.out.println("percent value: " + percent + "value value: " + value);
}
去除重复的个数的基数
CardinalityAggregationBuilder aggregation = AggregationBuilders.cardinality("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
Cardinality agg = response.getAggregations().get("agg");
long value = agg.getValue();
System.out.println("value value: "+ value);
查询出匹配的文档的字段的个数
TermsAggregationBuilder aggregation = AggregationBuilders.terms("agg").field("gender.keyword")
.subAggregation(AggregationBuilders.topHits("top").explain(true).size(1).from(10));
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
Terms agg = response.getAggregations().get("agg");
for (Terms.Bucket bucket : agg.getBuckets()) {
String key = (String) bucket.getKey();
long docCount = bucket.getDocCount();
System.out.println("key value: " + key + " docCount value: " + docCount);
TopHits topHits = bucket.getAggregations().get("top");
for (SearchHit searchHitFields : topHits.getHits().getHits()) {
System.out.println("id value: " + searchHitFields.getId() + " source value: " + searchHitFields.getSourceAsString());
}
}
查询全局的一个数量统计
AggregationBuilder aggregation = AggregationBuilders
.global("agg")
.subAggregation(
AggregationBuilders.terms("users").field("user.keyword")
);
SearchResponse sr = client.prepareSearch("twitter")
.addAggregation(aggregation)
.get();
System.out.println(sr);
Global agg = sr.getAggregations().get("agg");
long count = agg.getDocCount(); // Doc count
System.out.println("global count:" + count);
过滤统计
AggregationBuilder aggregation = AggregationBuilders.filters("aaa", new FiltersAggregator.KeyedFilter("men", QueryBuilders.termQuery("gender", "male")));
SearchResponse sr = client.prepareSearch("twitter").setTypes("tweet").addAggregation(aggregation).get();
Filters agg = sr.getAggregations().get("aaa");
for (Filters.Bucket entry : agg.getBuckets()) {
String key = entry.getKeyAsString(); // bucket key
long docCount = entry.getDocCount(); // Doc count
System.out.println("global " + key + " count:" + docCount);
}
多个条件过滤,查询出个数
AggregationBuilder aggregation = AggregationBuilders.filters("aaa",new FiltersAggregator.KeyedFilter("men", QueryBuilders.termQuery("gender", "male")),new FiltersAggregator.KeyedFilter("women", QueryBuilders.termQuery("gender", "female")));
SearchResponse sr = client.prepareSearch("twitter").setTypes("tweet").addAggregation(aggregation).get();
Filters agg = sr.getAggregations().get("aaa");
for (Filters.Bucket entry : agg.getBuckets()) {
String key = entry.getKeyAsString(); // bucket key
long docCount = entry.getDocCount(); // Doc count
System.out.println("global " + key + " count:" + docCount);
}
TermsAggregationBuilder fieldAggregation = AggregationBuilders.terms("genders").field("gender.keyword")
.order(Terms.Order.term(true));
SearchResponse response = client.prepareSearch("twitter").setTypes("tweet").addAggregation(fieldAggregation).get();
Terms terms = response.getAggregations().get("genders");
for (Terms.Bucket bucket : terms.getBuckets()) {
System.out.println("key value: " + bucket.getKey());
System.out.println("docCount value: " + bucket.getDocCount());
}
TermsAggregationBuilder fieldAggregation = AggregationBuilders.terms("genders").field("gender.keyword")
.order(Terms.Order.term(true));
匹配所有文档
QueryBuilder qb = matchAllQuery();
模糊匹配和字段词组查询
QueryBuilder qb = matchQuery("gender", "female");
多个字段进行查询,字段可以有多个
QueryBuilder qb = multiMatchQuery("female","gender", "message");
对一些比较专业的偏门词语进行更加专业的查询
QueryBuilder qb = commonTermsQuery("gender","female");
一种与Lucene查询语法结合的查询,允许使用特殊条件去查询(AND|OR|NOT)
QueryBuilder qb = queryStringQuery("+male -female");
一种简单的查询语法
QueryBuilder qb = queryStringQuery("+male -female");
在指定字段中查询确切的值的文档
QueryBuilder qb = termQuery("gender","male");
查询一个字段内的多个确切的值
QueryBuilder qb = termsQuery("age","10", "20");
范围查询
QueryBuilder qb = QueryBuilders.rangeQuery("age").gte(10).includeLower(true).lte(20).includeUpper(true);
其中,includeLower()和includeUpper()方法表示这个范围是否包含查询
根据指定的字段名查询是否存在
QueryBuilder qb = existsQuery("user");
根据指定字段名和指定精确前缀进行查询
QueryBuilder qb = prefixQuery("gender","m");
通配符查询,指定字段名和通配符。其中?表示单字符通配符,*表示多字符通配符。通配符查询的字段都是未经过分析的字段
QueryBuilder qb = wildcardQuery("gender","f?*");
根据指定字段名和正则表达式进行查询。查询的字段也是未经过分析的字段
QueryBuilder qb = regexpQuery("gender","f.*");
模糊查询:指定的确切的字段名和拼写错误的查询内容
QueryBuilder qb = fuzzyQuery("gender","mala").fuzziness(Fuzziness.ONE);