java连接ElasticSearch集群操作
更新时间:2020年09月17日 14:34:25 作者:java界的守门员
这篇文章主要介绍了java连接ElasticSearch集群操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
我就废话不多说了,大家还是直接看代码吧~
/* *es配置类 * */ @Configuration public class ElasticSearchDataSourceConfigurer { private static final Logger LOG = LogManager.getLogger(ElasticSearchDataSourceConfigurer.class); @Bean public TransportClient getESClient() { //设置集群名称 Settings settings = Settings.builder().put("cluster.name", "bigData-cluster").put("client.transport.sniff", true).build(); //创建client TransportClient client = null; try { client = new PreBuiltTransportClient(settings) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(""), 9300));//集群ip LOG.info("ESClient连接建立成功"); } catch (UnknownHostException e) { LOG.info("ESClient连接建立失败"); e.printStackTrace(); } return client; } }
/** * Simple to Introduction * * @Description: [添加类] */ @Repository public class UserDaoImpl implements userDao { private static final String INDEXNAME = "user";//小写 private static final String TYPENAME = "info"; @Resource TransportClient transportClient; @Override public int addUser(User[] user) { IndexResponse indexResponse = null; int successNum = 0; for (int i = 0; i < user.length; i++) { UUID uuid = UUID.randomUUID(); String str = uuid.toString(); String jsonValue = null; try { jsonValue = JsonUtil.object2JsonString(user[i]); if (jsonValue != null) { indexResponse = transportClient.prepareIndex(INDEXNAME, TYPENAME, str).setSource(jsonValue) .execute().actionGet(); successNum++; } } catch (JsonProcessingException e) { e.printStackTrace(); } } return successNum; } }
/** *批量插入 */ public static void bathAddUser(TransportClient client, List<User> users) { BulkRequestBuilder bulkRequest = transportClient.prepareBulk(); for (int i = 0; i < users.size(); i++) { UUID uuid = UUID.randomUUID(); String str = uuid.toString(); String jsonValue = null; try { jsonValue = JsonUtil.object2JsonString(users.get(i)); } catch (JsonProcessingException e) { e.printStackTrace(); } bulkRequest.add(client.prepareIndex("user", "info", str).setSource(jsonValue)); // 一万条插入一次 if (i % 10000 == 0) { bulkRequest.execute().actionGet(); } System.out.println("已经插入第" + i + "多少条"); } }
补充知识:使用java创建ES(ElasticSearch)连接池
1.首先要有一个创建连接的工厂类
package com.aly.util; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; /** * EliasticSearch连接池工厂对象 * @author 00000 * */ public class EsClientPoolFactory implements PooledObjectFactory<RestHighLevelClient>{ @Override public void activateObject(PooledObject<RestHighLevelClient> arg0) throws Exception { System.out.println("activateObject"); } /** * 销毁对象 */ @Override public void destroyObject(PooledObject<RestHighLevelClient> pooledObject) throws Exception { RestHighLevelClient highLevelClient = pooledObject.getObject(); highLevelClient.close(); } /** * 生产对象 */ // @SuppressWarnings({ "resource" }) @Override public PooledObject<RestHighLevelClient> makeObject() throws Exception { // Settings settings = Settings.builder().put("cluster.name","elasticsearch").build(); RestHighLevelClient client = null; try { /*client = new PreBuiltTransportClient(settings) .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"),9300));*/ client = new RestHighLevelClient(RestClient.builder( new HttpHost("192.168.1.121", 9200, "http"), new HttpHost("192.168.1.122", 9200, "http"), new HttpHost("192.168.1.123", 9200, "http"), new HttpHost("192.168.1.125", 9200, "http"), new HttpHost("192.168.1.126", 9200, "http"), new HttpHost("192.168.1.127", 9200, "http"))); } catch (Exception e) { e.printStackTrace(); } return new DefaultPooledObject<RestHighLevelClient>(client); } @Override public void passivateObject(PooledObject<RestHighLevelClient> arg0) throws Exception { System.out.println("passivateObject"); } @Override public boolean validateObject(PooledObject<RestHighLevelClient> arg0) { return true; } }
2.然后再写我们的连接池工具类
package com.aly.util; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.elasticsearch.client.RestHighLevelClient; /** * ElasticSearch 连接池工具类 * * @author 00000 * */ public class ElasticSearchPoolUtil { // 对象池配置类,不写也可以,采用默认配置 private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); // 采用默认配置maxTotal是8,池中有8个client static { poolConfig.setMaxTotal(8); } // 要池化的对象的工厂类,这个是我们要实现的类 private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory(); // 利用对象工厂类和配置类生成对象池 private static GenericObjectPool<RestHighLevelClient> clientPool = new GenericObjectPool<>(esClientPoolFactory, poolConfig); /** * 获得对象 * * @return * @throws Exception */ public static RestHighLevelClient getClient() throws Exception { // 从池中取一个对象 RestHighLevelClient client = clientPool.borrowObject(); return client; } /** * 归还对象 * * @param client */ public static void returnClient(RestHighLevelClient client) { // 使用完毕之后,归还对象 clientPool.returnObject(client); } }
以上这篇java连接ElasticSearch集群操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。
相关文章
Java Spring-IOC容器与Bean管理之基于注解的方式案例详解
这篇文章主要介绍了Java Spring-IOC容器与Bean管理之基于注解的方式案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下2021-08-08Java并发之原子性 有序性 可见性及Happen Before原则
一提到happens-before原则,就让人有点“丈二和尚摸不着头脑”。这个涵盖了整个JMM中可见性原则的规则,究竟如何理解,把我个人一些理解记录下来。下面可以和小编一起学习Java 并发四个原则2021-09-09ConcurrentHashMap线程安全及实现原理实例解析
这篇文章主要介绍了ConcurrentHashMap线程安全及实现原理实例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪2023-11-11详解IDEA使用Maven项目不能加入本地Jar包的解决方法
这篇文章主要介绍了详解IDEA使用Maven项目不能加入本地Jar包的解决方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧2020-08-08
最新评论