Error message here!

Hide Error message here!

忘记密码?

Error message here!

请输入正确邮箱

Hide Error message here!

密码丢失?请输入您的电子邮件地址。您将收到一个重设密码链接。

Error message here!

返回登录

Close

使用Java操作Elasticsearch(Elasticsearch的java api使用)

别先生 2019-10-21 21:19:00 阅读数:109 评论数:0 点赞数:0 收藏数:0

1、Elasticsearch是基于Lucene开发的一个分布式全文检索框架,向Elasticsearch中存储和从Elasticsearch中查询,格式是json。

索引index,相当于数据库中的database。

类型type相当于数据库中的table。

主键id相当于数据库中记录的主键,是唯一的。

向Elasticsearch中存储数据,其实就是向es中的index下面的type中存储json类型的数据。

2、Elasticsearch是RestFul风格的api,通过http的请求形式(注意,参数是url拼接还是请求的json形式哦),发送请求,对Elasticsearch进行操作。
查询,请求方式应该是get。删除,请求方式应该是delete。添加,请求方式应该是put/post。修改,请求方式应该是put/post。
RESTFul接口url的格式:http://ip:port/<index>/<type>/<[id]>。其中index、type是必须提供的。id是可以选择的,不提供es会自动生成,index、type将信息进行分层,利于管理。

3、如何使用java连接Elasticsearch。由于使用的是maven项目,pom.xml的依赖如下所示:

 <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.</modelVersion>
<groupId>com.bie</groupId>
<artifactId>elasticsearch-hello</artifactId>
<version>0.0.-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-</encoding>
</properties>
<dependencies>
<!-- elasticsearch的客户端 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.4.</version>
</dependency>
<!-- elasticsearch依赖2.x的log4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.</version>
</dependency>
<!-- junit单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
</project>

使用查询的方式,先简单测试一下是否连通es集群,和对比查询的数据是否一致。

 package com.bie.elasticsearch;

import java.net.InetAddress;

import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;

/**
  *
  * @author biehl
  *
 */
public class HelloElasticsearch {

public static void main(String[] args) {
 try {
 // 设置集群名称biehl01,Settings设置es的集群名称,使用的设计模式,链式设计模式、build设计模式。
Settings settings = Settings.builder().put("cluster.name", "biehl01").build();
 // 读取es集群中的数据,创建client。
@SuppressWarnings("resource")
 TransportClient client = new PreBuiltTransportClient(settings).addTransportAddresses(
 // 用java访问ES用的端口是9300。es的9200是restful的请求端口号
 // 由于我使用的是伪集群,所以就配置了一台机器,如果是集群方式,将竞选主节点的加进来即可。
 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
 // 9300),
 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
 // 9300),
new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), ));
 // 搜索数据(.actionGet()方法是同步的,没有返回就等待)
 // 方式是先去索引里面查询出索引数据,再去文档里面查询出数据。
GetResponse response = client.prepareGet("news", "fulltext", "").execute().actionGet();
 // 输出结果
System.out.println(response);
 // 关闭client
 client.close();
 } catch (Exception e) {
  e.printStackTrace();
  }

 }

}

查询的结果如下所示:

4、如何使用java api创建索引Index、类型Type、以及指定字段,是否创建索引,是否存储,是否即分词,又建立索引(analyzed)、是否建索引不分词(not_analyzed)等等。

 package com.bie.elasticsearch;

import java.io.IOException;
 import java.net.InetAddress;
 import java.util.HashMap;

import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
 import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.IndicesAdminClient;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
 import org.junit.Before;
 import org.junit.Test;

/**
  *
  * @author biehl
  *
 */
public class AdminAPI {

private TransportClient client = null;

// 在所有的测试方法之前执行
@SuppressWarnings("resource")
  @Before
 public void init() throws Exception {
 // 设置集群名称biehl01
Settings settings = Settings.builder().put("cluster.name", "biehl01")
 // 自动感知的功能(可以通过当前指定的节点获取所有es节点的信息)
.put("client.transport.sniff", true).build();
 // 创建client
client = new PreBuiltTransportClient(settings).addTransportAddresses(
 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
 // 9300),
 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
 // 9300),
 // 建议指定2个及其以上的节点。
new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), ));
  }

/**
  *
  * AdminClient创建索引,并配置一些参数,用来指定一些映射关系等等
  *
  * 这里创建一个索引Index,并且指定分区、副本的数量
  *
 */
 @Test
 public void createIndexWithSettings() {
 // 获取Admin的API
AdminClient admin = client.admin();
 // 使用Admin API对索引进行操作
IndicesAdminClient indices = admin.indices();
 // 准备创建索引
indices.prepareCreate("food")
 // 配置索引参数
 .setSettings(
 // 参数配置器
Settings.builder()// 指定索引分区的数量。shards分区
.put("index.number_of_shards", )
 // 指定索引副本的数量(注意:不包括本身,如果设置数据存储副本为1,实际上数据存储了2份)
 // replicas副本
.put("index.number_of_replicas", ))
 // 真正执行
.get();
  }

/**
  * 你可以通过dynamic设置来控制这一行为,它能够接受以下的选项: true:默认值。
  *
  * 动态添加字段 false:忽略新字段
  *
  * strict:如果碰到陌生字段,抛出异常
  *
  * 给索引添加mapping信息(给表添加schema信息)
  *
  * @throws IOException
 */
 @Test
 public void elasticsearchSettingsMappings() throws IOException {
 // 1:settings
HashMap<String, Object> settings_map = new HashMap<String, Object>();
 // shards分区的数量4
settings_map.put("number_of_shards", );
 // 副本的数量1
settings_map.put("number_of_replicas", );

// 2:mappings(映射、schema)
 // field("dynamic", "true")含义是动态字段
XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("dynamic", "true")
 // 设置type中的属性
.startObject("properties")
 // id属性
.startObject("id")
 // 类型是integer
.field("type", "integer")
 // 不分词,但是建索引
.field("index", "not_analyzed")
 // 在文档中存储
.field("store", "yes").endObject()
 // name属性
.startObject("name")
 // string类型
.field("type", "string")
 // 在文档中存储
.field("store", "yes")
 // 建立索引
.field("index", "analyzed")
 // 使用ik_smart进行分词
.field("analyzer", "ik_smart").endObject().endObject().endObject();

CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("computer");
 // 管理索引(user_info)然后关联type(user)
prepareCreate.setSettings(settings_map).addMapping("xiaomi", builder).get();
  }

/**
  * index这个属性,no代表不建索引
  *
  * not_analyzed,建索引不分词
  *
  * analyzed 即分词,又建立索引
  *
  * expected [no],[not_analyzed] or [analyzed]。即可以选择三者任意一个值
  *
  * @throws IOException
 */
 @Test
 public void elasticsearchSettingsPlayerMappings() throws IOException {
 // 1:settings
HashMap<String, Object> settings_map = new HashMap<String, Object>();
 // 分区的数量4
settings_map.put("number_of_shards", );
 // 副本的数量1
settings_map.put("number_of_replicas", );

// 2:mappings
XContentBuilder builder = XContentFactory.jsonBuilder().startObject()//
 .field("dynamic", "true").startObject("properties")
 // 在文档中存储、
.startObject("id").field("type", "integer").field("store", "yes").endObject()
 // 不分词,但是建索引、
.startObject("name").field("type", "string").field("index", "not_analyzed").endObject()
 //
 .startObject("age").field("type", "integer").endObject()
 //
 .startObject("salary").field("type", "integer").endObject()
 // 不分词,但是建索引、
.startObject("team").field("type", "string").field("index", "not_analyzed").endObject()
 // 不分词,但是建索引、
.startObject("position").field("type", "string").field("index", "not_analyzed").endObject()
 // 即分词,又建立索引、
.startObject("description").field("type", "string").field("store", "no").field("index", "analyzed")
 .field("analyzer", "ik_smart").endObject()
 // 即分词,又建立索引、在文档中存储、
.startObject("addr").field("type", "string").field("store", "yes").field("index", "analyzed")
 .field("analyzer", "ik_smart").endObject()

 .endObject()

 .endObject();

CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("player");
 prepareCreate.setSettings(settings_map).addMapping("basketball", builder).get();
  }
 }

5、使用java api操作Elasticsearch的增删改查以及复杂查询(聚合查询,可以进行分组统计数量,分组统计最大值,分组统计平均值,等等统计)。

 package com.bie.elasticsearch;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;

import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;

import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
 import org.elasticsearch.action.delete.DeleteResponse;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.get.MultiGetItemResponse;
 import org.elasticsearch.action.get.MultiGetResponse;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.reindex.DeleteByQueryAction;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
 import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
 import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
 import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
 import org.junit.Before;
 import org.junit.Test;

/**
  *
  * @author biehl
  *
 */
public class ElasticsearchCRUD {

private TransportClient client = null;

@SuppressWarnings("resource")
  @Before
 public void init() throws Exception {
 // 设置集群名称biehl01
Settings settings = Settings.builder().put("cluster.name", "biehl01")
 // 自动感知的功能(可以通过当前指定的节点获取所有es节点的信息)
.put("client.transport.sniff", true).build();
 // 创建client
client = new PreBuiltTransportClient(settings).addTransportAddresses(
 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
 // 9300),
 // new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"),
 // 9300),
 // 建议指定2个及其以上的节点。
new InetSocketTransportAddress(InetAddress.getByName("192.168.110.133"), ));
  }

/**
  * 创建一个Index索引、Type类型、以及id。
  *
  * 然后插入类型里面的数据。
  *
  * @throws IOException
 */
 @Test
 public void elasticsearchCreate() throws IOException {
 IndexResponse response = client.prepareIndex("people", "student", "")
 .setSource(jsonBuilder().startObject().field("username", "王五五").field("sex", "")
 .field("birthday", new Date()).field("age", ).field("message", "trying out Elasticsearch")
  .endObject())
 .get();
 System.out.println(response.toString());
  }

/**
  * 查找一条索引Index里面的类型Type里面的id的所有信息
  *
  * @throws IOException
 */
 @Test
 public void elasticsearchGet() throws IOException {
 GetResponse response = client.prepareGet("people", "student", "").get();
 System.out.println(response.getSourceAsString());
  }

/**
  * 查找多条
  *
  * 索引Index里面的类型Type里面的多个id的所有信息
  *
  * @throws IOException
 */
 @Test
 public void elasticsearchMultiGet() throws IOException {
 // 查询出多个索引Index多个类型Type的多个id的所有信息
MultiGetResponse multiGetItemResponses = client.prepareMultiGet().add("people", "student", "")
 .add("people", "student", "", "").add("people", "teacher", "").add("news", "fulltext", "").get();
 // 将查询出的结果遍历输出
for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
 // 将每一个查询出的结果遍历输出
GetResponse response = itemResponse.getResponse();
 // 判断如果存在就进行遍历输出
if (response.isExists()) {
 String json = response.getSourceAsString();
 System.out.println(json);
  }
  }
  }

/**
  * 修改指定的索引Index里面的类型Type的id的信息
  *
  * @throws Exception
 */
 @Test
 public void elasticsearchUpdate() throws Exception {
 // 创建一个更新的请求对象
UpdateRequest updateRequest = new UpdateRequest();
 // 指定索引Index
updateRequest.index("people");
 // 指定类型Type
updateRequest.type("student");
 // 指定id的值
updateRequest.id("");
 // 设置修改的字段信息
updateRequest.doc(jsonBuilder().startObject().field("username", "王五五").endObject());
 // 开始进行修改,并且返回响应信息
UpdateResponse updateResponse = client.update(updateRequest).get();
 // 打印输出响应的信息
System.out.println(updateResponse.toString());
  }

/**
  * 删除指定的索引Index里面的类型Type的id的信息
 */
 @Test
 public void elasticsearchDelete() {
 // 指定删除的id信息,并且给出响应结果
 // prepareDelete(String index, String type, String id);
DeleteResponse response = client.prepareDelete("people", "student", "").get();
 // 打印输出的响应信息
System.out.println(response);
  }

/**
  * 根据查询条件进行删除数据
  *
  *
 */
 @Test
 public void elasticsearchDeleteByQuery() {
 BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
 // 指定查询条件,matchQuery是name的值text里面包括了这个内容就进行删除。默认使用标准分词器。
.filter(QueryBuilders.matchQuery("username", "王五五"))
 // 指定索引名称
.source("people").get();
 // 获取到删除的个数
long deleted = response.getDeleted();
 // 打印输出删除的个数
System.out.println(deleted);
  }

/**
  * 异步删除
  *
  * 监听,如果真正删除以后进行回调,打印输出删除确认的消息。
 */
 @Test
 public void elasticsearchDeleteByQueryAsync() {
 DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(QueryBuilders.matchQuery("sex", ""))
 .source("people").execute(new ActionListener<BulkByScrollResponse>() {

// 删除以后的方法回调
 @Override
 public void onResponse(BulkByScrollResponse response) {
 // 返回删除的个数
long deleted = response.getDeleted();
 System.out.println("数据删除完毕!");
 // 打印删除的个数
System.out.println("数据删除的个数: " + deleted);
  }

 @Override
 public void onFailure(Exception e) {
 // 失败打印异常信息
 e.printStackTrace();
  }
  });

// 先打印输出,正常执行完毕。再执行异步监听删除数据。
try {
 System.out.println("异步删除操作!");
 // 休眠10秒钟,避免主线程里面结束,子线程无法进行结果输出
Thread.sleep();
 } catch (Exception e) {
  e.printStackTrace();
  }
  }

/**
  *
  * 按照范围进行查找。
  *
 */
 @Test
 public void elasticsearchRange() {
 // includeLower(true).includeUpper(false)含义是包含前面,不包含后面的
 // [21, 24)
QueryBuilder qb = rangeQuery("age").from().to().includeLower(true).includeUpper(false);
 // 将查询条件传递进去,并将查询结果进行返回。
SearchResponse response = client.prepareSearch("people").setQuery(qb).get();
 System.out.println(response);
  }

/**
  *
  * 向指定索引index里面的类型Type的id的信息
  *
  * @throws IOException
 */
 @Test
 public void elasticsearchAddPlayer() throws IOException {
 //
 IndexResponse response = client.prepareIndex("player", "basketball", "")

 .setSource(jsonBuilder().startObject()

.field("name", "安其拉")

.field("age", )

.field("salary", )

.field("team", "啦啦队 team")

.field("position", "打中锋")

.field("description", "跪族蓝孩")

 .endObject())
 .get();

System.out.println(response);
  }

/**
  * 
 *
  * select team, count(*) as team_count from player group by team;
  *
  * team_counts是别名称。
 */
 @Test
 public void elasticsearchAgg1() {
 // 指定索引和type
SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
 // 按team分组然后聚合,但是并没有指定聚合函数。
 // team_count是别名称
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_count").field("team");
 // 添加聚合器
 builder.addAggregation(teamAgg);
 // 触发
SearchResponse response = builder.execute().actionGet();
 // System.out.println(response);
 // 将返回的结果放入到一个map中
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
 // 遍历打印输出
Set<String> keys = aggMap.keySet();
 for (String key : keys) {
 System.out.println("key: " + key);
  }

System.out.println("");

// //取出聚合属性
StringTerms terms = (StringTerms) aggMap.get("team_count");

// //依次迭代出分组聚合数据
for (Terms.Bucket bucket : terms.getBuckets()) {
 // 分组的名字
String team = (String) bucket.getKey();
 // count,分组后一个组有多少数据
long count = bucket.getDocCount();
 System.out.println(team + ": " + count);
  }

System.out.println("");

// 使用Iterator进行遍历迭代
Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator();
 while (teamBucketIt.hasNext()) {
 Terms.Bucket bucket = teamBucketIt.next();
 // 获取到分组后每组的组名称
String team = (String) bucket.getKey();
 // 获取到分组后的每组数量
long count = bucket.getDocCount();
 // 打印输出
System.out.println(team + ": " + count);
  }
  }

/**
  *
  * select
  *
  * team, position, count(*) as pos_count
  *
  * from
  *
  * player
  *
  * group by
  *
  * team,position;
  *
  *
 */
 @Test
 public void elasticsearchAgg2() {
 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
 // 指定别名和分组的字段
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
 TermsAggregationBuilder posAgg = AggregationBuilders.terms("pos_count").field("position");
 // 添加两个聚合构建器。先按照team分组,再按照position分组。
 builder.addAggregation(teamAgg.subAggregation(posAgg));
 // 执行查询
SearchResponse response = builder.execute().actionGet();
 // 将查询结果放入map中
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
 // 根据属性名到map中查找
StringTerms teams = (StringTerms) aggMap.get("team_name");
 // 循环查找结果
for (Terms.Bucket teamBucket : teams.getBuckets()) {
 // 先按球队进行分组
String team = (String) teamBucket.getKey();
 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
 StringTerms positions = (StringTerms) subAggMap.get("pos_count");
 // 因为一个球队有很多位置,那么还要依次拿出位置信息
for (Terms.Bucket posBucket : positions.getBuckets()) {
 // 拿到位置的名字
String pos = (String) posBucket.getKey();
 // 拿出该位置的数量
long docCount = posBucket.getDocCount();
 // 打印球队,位置,人数
System.out.println(team + ": " + pos + ": " + docCount);
  }
  }

 }

/**
  * select team, max(age) as max_age from player group by team;
 */
 @Test
 public void elasticsearchAgg3() {
 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
 // 指定安球队进行分组
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
 // 指定分组求最大值
MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age");
 // 分组后求最大值
 builder.addAggregation(teamAgg.subAggregation(maxAgg));
 // 查询
SearchResponse response = builder.execute().actionGet();
 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
 // 根据team属性,获取map中的内容
StringTerms teams = (StringTerms) aggMap.get("team_name");
 for (Terms.Bucket teamBucket : teams.getBuckets()) {
 // 分组的属性名
String team = (String) teamBucket.getKey();
 // 在将聚合后取最大值的内容取出来放到map中
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
 // 取分组后的最大值
InternalMax ages = (InternalMax) subAggMap.get("max_age");
 // 获取到年龄的值
double max = ages.getValue();
 // 打印输出值
System.out.println(team + ": " + max);
  }
  }

/**
  * select team, avg(age) as avg_age, sum(salary) as total_salary from player
  * group by team;
 */
 @Test
 public void elasticsearchAgg4() {
 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
 // 指定分组字段
TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team");
 // 指定聚合函数是求平均数据
AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age");
 // 指定另外一个聚合函数是求和
SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
 // 分组的聚合器关联了两个聚合函数
 builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg));
 // 查询
SearchResponse response = builder.execute().actionGet();
 Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
 // 按分组的名字取出数据
StringTerms teams = (StringTerms) aggMap.get("team_name");
 for (Terms.Bucket teamBucket : teams.getBuckets()) {
 // 获取球队名字
String team = (String) teamBucket.getKey();
 Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
 // 根据别名取出平均年龄
InternalAvg avgAge = (InternalAvg) subAggMap.get("avg_age");
 // 根据别名取出薪水总和
InternalSum totalSalary = (InternalSum) subAggMap.get("total_salary");
 double avgAgeValue = avgAge.getValue();
 double totalSalaryValue = totalSalary.getValue();
 System.out.println(team + ": " + avgAgeValue + ": " + totalSalaryValue);
  }
  }

/**
  * select team, sum(salary) as total_salary from player group by team order by
  * total_salary desc;
 */
 @Test
 public void elasticsearchAgg5() {
 SearchRequestBuilder builder = client.prepareSearch("player").setTypes("basketball");
 // 按team进行分组,然后指定排序规则
TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team")
 .order(Terms.Order.aggregation("total_salary ", true));
 // 指定一个聚合函数是求和
SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
 // 添加两个聚合构建器。先按照team分组,再按照salary求和。
 builder.addAggregation(termsAgg.subAggregation(sumAgg));
 // 查询
SearchResponse response = builder.execute().actionGet();
 // 将查询结果放入map中
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
 // 从查询结果中获取到team_name的信息
StringTerms teams = (StringTerms) aggMap.get("team_name");
 // 开始遍历获取到的信息
for (Terms.Bucket teamBucket : teams.getBuckets()) {
 // 获取到key的值
String team = (String) teamBucket.getKey();
 // 获取到求和的值
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
 // 获取到求和的值的信息
InternalSum totalSalary = (InternalSum) subAggMap.get("total_salary");
 // 获取到求和的值
double totalSalaryValue = totalSalary.getValue();
 // 打印输出信息
System.out.println(team + " " + totalSalaryValue);
  }
  }

}

执行效果,自己可以分别进行测试。由于测试都写了说明,这里就不一一进行测试打印效果了。请自行联系使用即可。

作者:别先生

博客园:https://www.cnblogs.com/biehongli/

如果您想及时得到个人撰写文章以及著作的消息推送,可以扫描上方二维码,关注个人公众号哦。

 

版权声明
本文为[别先生]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/biehongli/p/11710704.html