TranSportClient记录&记忆

自动嗅探设置

Settings esSettings = Settings.builder()

    .put("cluster.name", clusterName) //设置ES实例的名称

    .put("client.transport.sniff", true) //自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中

    .build();

    client = new PreBuiltTransportClient(esSettings);//初始化client较老版本发生了变化,此方法有几个重载方法,初始化插件等。

    //此步骤添加IP,至少一个,其实一个就够了,因为添加了自动嗅探配置

    client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip), esPort));

现在是在代码中配置,如果交给IOC托管,查找一下看看在哪里配置

配置详解:

自动嗅探设置

index API

生成JSON格式文档的方式&使用prepareIndex来进行存储

ES中的id可以由我们自己指定,也可以自己指定 (这是一个重载方法)

—单个文档插入

client.prepareIndex("fendo", "fendodate") 
        .setSource(json)
        .get();//执行

我使用Gson将对象转json,然后添加 XContentType.JSON

client.prepareIndex(indexName, type,id).setSource(data, XContentType.JSON).get();

插入单条数据遇到的问题:

刚开始以为只要是json就可以,但是没有解决

client.prepareIndex(indexName, type,id).setSource(data).get();

但是总是报错,就看了一下源码

if(source.length % 2 != 0) {
            throw new IllegalArgumentException("The number of object passed must be even but was [" + source.length + "]");
        } else if(source.length == 2 && source[0] instanceof BytesReference && source[1] instanceof Boolean) {
            throw new IllegalArgumentException("you are using the removed method for source with bytes and unsafe flag, the unsafe flag was removed, please just use source(BytesReference)");
        }

这个没有看懂(不知道为什么要这么检查),有种硬编码的赶脚

最终还是使用了XContentBuilder来构建

这里面的i++和++i用的很好—记一下

for(int i = 0; i < source.length; ++i) {
     builder.field(source[i++].toString(), source[i]);
    }

Bulk API 批量插入

BulkRequestBuilder bulkRequest = client.prepareBulk();
        bulkRequest.add(client.prepareIndex("index","type","id")
        .setSource("json", XContentType.JSON));
        
BulkResponse bulkResponse = bulkRequest.get();

可以写一个循环,将我们的数据批量插入,一般来说这个操作都是在全量同步数据使用。

但是

这个批量操作并不是原子性的,也没有事务,个人感觉减少了通信次数。

GET API

根据index type id 查询数据

GetResponse response = client.prepareGet("index", "type", "id").get();

但是一般都是根据条件来查询 使用到id精确查询的不多

解析response

Map<String, Object> source = response.getSource();//默认返回的是一个map

Map<String, Object> sourceAsMap = response.getSourceAsMap();// 指定返回格式


String sourceAsString = response.getSourceAsString();//尝试看看能不能转换成Java Bean

boolean sourceEmpty = response.isSourceEmpty();//判空操作

返回map

可以使用这个工具类实现map<String,Object> 和 Java Bean的转换

Java Bean与Map<String,Object>相互转换

Delete API

删除操作 还是利用的 index type id 这种场景真的少,

我代码中的删除操作 是先根据条件查询回来id 然后根据id删除,很不好。

DeleteResponse response = 
client.prepareDelete("twitter", "tweet", "1").get();
        
String s = response.toString();
RestStatus status = response.status();

int status1 = status.getStatus();//查看删除是否成功

Delete By Query API

BulkByScrollResponse response =
                DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                        .filter(QueryBuilders.matchQuery("gender", "male")) //查询条件
                        .source("persons") //index(索引名)
                        .get();  //执行

获取失败信息

List<ScrollableHitSource.SearchFailure> searchFailures = response.getSearchFailures();
        for (ScrollableHitSource.SearchFailure searchFailure : searchFailures) {
            Throwable reason = searchFailure.getReason();//获取失败的异常信息
            String index = searchFailure.getIndex();
            Integer shardId = searchFailure.getShardId();
            String nodeId = searchFailure.getNodeId();
        }

获取删除的文档数量

long deleted = response.getDeleted();

异步操作,监听结果回调

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male"))      //查询            
    .source("persons")                //index(索引名)                                    
    .execute(new ActionListener<BulkByScrollResponse>() {     //回调监听     
        @Override
        public void onResponse(BulkByScrollResponse response) {
            long deleted = response.getDeleted();   //删除文档的数量                 
        }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });

利用CountDownLatch实现主线程接收数据

Update API

两种方式

UpdateRequest
prepareUpdate()

使用UpdateRequest

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index("index");
        updateRequest.type("type");
        updateRequest.id("1");
        updateRequest.doc(jsonBuilder()
                .startObject()
                .field("gender", "male")
                .endObject());
        client.update(updateRequest).get();

一般也不是根据id更新,当然如果在数据库表中存放了es中的id也是可以这样操作的。~~~~

原文 

https://segmentfault.com/a/1190000022564820

本站部分文章源于互联网,本着传播知识、有益学习和研究的目的进行的转载,为网友免费提供。如有著作权人或出版方提出异议,本站将立即删除。如果您对文章转载有任何疑问请告之我们,以便我们及时纠正。

PS:推荐一个微信公众号: askHarries 或者qq群:474807195,里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

转载请注明原文出处:Harries Blog™ » TranSportClient记录&记忆

赞 (0)
分享到:更多 ()

评论 0

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址