java多线程处理执行solr创建索引示例

 更新时间:2014年02月26日 10:29:57   作者:  
这篇文章主要介绍了java多线程处理执行solr创建索引示例,需要的朋友可以参考下

复制代码 代码如下:

public class SolrIndexer implements Indexer, Searcher, DisposableBean {
 //~ Static fields/initializers =============================================

 static final Logger logger = LoggerFactory.getLogger(SolrIndexer.class);

 private static final long SHUTDOWN_TIMEOUT    = 5 * 60 * 1000L; // long enough

 private static final int  INPUT_QUEUE_LENGTH  = 16384;

 //~ Instance fields ========================================================

 private CommonsHttpSolrServer server;

 private BlockingQueue<Operation> inputQueue;

 private Thread updateThread;
 volatile boolean running = true;
 volatile boolean shuttingDown = false;

 //~ Constructors ===========================================================

 public SolrIndexer(String url) throws MalformedURLException {
  server = new CommonsHttpSolrServer(url);

  inputQueue = new ArrayBlockingQueue<Operation>(INPUT_QUEUE_LENGTH);

  updateThread = new Thread(new UpdateTask());
  updateThread.setName("SolrIndexer");
  updateThread.start();
 }

 //~ Methods ================================================================

 public void setSoTimeout(int timeout) {
  server.setSoTimeout(timeout);
 }

 public void setConnectionTimeout(int timeout) {
  server.setConnectionTimeout(timeout);
 }

 public void setAllowCompression(boolean allowCompression) {
  server.setAllowCompression(allowCompression);
 }


 public void addIndex(Indexable indexable) throws IndexingException {
  if (shuttingDown) {
   throw new IllegalStateException("SolrIndexer is shutting down");
  }
  inputQueue.offer(new Operation(indexable, OperationType.UPDATE));
 }
 

 public void delIndex(Indexable indexable) throws IndexingException {
  if (shuttingDown) {
   throw new IllegalStateException("SolrIndexer is shutting down");
  }
  inputQueue.offer(new Operation(indexable, OperationType.DELETE));
 }

 
 private void updateIndices(String type, List<Indexable> indices) throws IndexingException {
  if (indices == null || indices.size() == 0) {
   return;
  }

  logger.debug("Updating {} indices", indices.size());

  UpdateRequest req = new UpdateRequest("/" + type + "/update");
  req.setAction(UpdateRequest.ACTION.COMMIT, false, false);

  for (Indexable idx : indices) {
   Doc doc = idx.getDoc();

   SolrInputDocument solrDoc = new SolrInputDocument();
   solrDoc.setDocumentBoost(doc.getDocumentBoost());
   for (Iterator<Field> i = doc.iterator(); i.hasNext();) {
    Field field = i.next();
    solrDoc.addField(field.getName(), field.getValue(), field.getBoost());
   }

   req.add(solrDoc);   
  }

  try {
   req.process(server);   
  } catch (SolrServerException e) {
   logger.error("SolrServerException occurred", e);
   throw new IndexingException(e);
  } catch (IOException e) {
   logger.error("IOException occurred", e);
   throw new IndexingException(e);
  }
 }

 
 private void delIndices(String type, List<Indexable> indices) throws IndexingException {
  if (indices == null || indices.size() == 0) {
   return;
  }

  logger.debug("Deleting {} indices", indices.size());

  UpdateRequest req = new UpdateRequest("/" + type + "/update");
  req.setAction(UpdateRequest.ACTION.COMMIT, false, false);
  for (Indexable indexable : indices) {   
   req.deleteById(indexable.getDocId());
  }

  try {
   req.process(server);
  } catch (SolrServerException e) {
   logger.error("SolrServerException occurred", e);
   throw new IndexingException(e);
  } catch (IOException e) {
   logger.error("IOException occurred", e);
   throw new IndexingException(e);
  }
 }

 
 public QueryResult search(Query query) throws IndexingException {
  SolrQuery sq = new SolrQuery();
  sq.setQuery(query.getQuery());
  if (query.getFilter() != null) {
   sq.addFilterQuery(query.getFilter());
  }
  if (query.getOrderField() != null) {
   sq.addSortField(query.getOrderField(), query.getOrder() == Query.Order.DESC ? SolrQuery.ORDER.desc : SolrQuery.ORDER.asc);
  }
  sq.setStart(query.getOffset());
  sq.setRows(query.getLimit());

  QueryRequest req = new QueryRequest(sq);
  req.setPath("/" + query.getType() + "/select");

  try {
   QueryResponse rsp = req.process(server);
   SolrDocumentList docs = rsp.getResults();

   QueryResult result = new QueryResult();
   result.setOffset(docs.getStart());
   result.setTotal(docs.getNumFound());
   result.setSize(sq.getRows());

   List<Doc> resultDocs = new ArrayList<Doc>(result.getSize());
   for (Iterator<SolrDocument> i = docs.iterator(); i.hasNext();) {
    SolrDocument solrDocument = i.next();

    Doc doc = new Doc();
    for (Iterator<Map.Entry<String, Object>> iter = solrDocument.iterator(); iter.hasNext();) {
     Map.Entry<String, Object> field = iter.next();
     doc.addField(field.getKey(), field.getValue());
    }

    resultDocs.add(doc);
   }

   result.setDocs(resultDocs);
   return result;

  } catch (SolrServerException e) {
   logger.error("SolrServerException occurred", e);
   throw new IndexingException(e);
  }
 }
 

 public void destroy() throws Exception {
  shutdown(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);  
 }

 public boolean shutdown(long timeout, TimeUnit unit) {
  if (shuttingDown) {
   logger.info("Suppressing duplicate attempt to shut down");
   return false;
  }
  shuttingDown = true;
  String baseName = updateThread.getName();
  updateThread.setName(baseName + " - SHUTTING DOWN");
  boolean rv = false;
  try {
   // Conditionally wait
   if (timeout > 0) {
    updateThread.setName(baseName + " - SHUTTING DOWN (waiting)");
    rv = waitForQueue(timeout, unit);
   }
  } finally {
   // But always begin the shutdown sequence
   running = false;
   updateThread.setName(baseName + " - SHUTTING DOWN (informed client)");
  }
  return rv;
 }

 /**
  * @param timeout
  * @param unit
  * @return
  */
 private boolean waitForQueue(long timeout, TimeUnit unit) {
  CountDownLatch latch = new CountDownLatch(1);
  inputQueue.add(new StopOperation(latch));
  try {
   return latch.await(timeout, unit);
  } catch (InterruptedException e) {
   throw new RuntimeException("Interrupted waiting for queues", e);
  }
 }

 

 class UpdateTask implements Runnable {
  public void run() {
   while (running) {
    try {
     syncIndices();
    } catch (Throwable e) {
     if (shuttingDown) {
      logger.warn("Exception occurred during shutdown", e);
     } else {
      logger.error("Problem handling solr indexing updating", e);
     }
    }
   }
   logger.info("Shut down SolrIndexer");
  }
 }

 void syncIndices() throws InterruptedException {
  Operation op = inputQueue.poll(1000L, TimeUnit.MILLISECONDS);

  if (op == null) {
   return;
  }

  if (op instanceof StopOperation) {
   ((StopOperation) op).stop();
   return;
  }

  // wait 1 second
  try {
   Thread.sleep(1000);
  } catch (InterruptedException e) {

  }

  List<Operation> ops = new ArrayList<Operation>(inputQueue.size() + 1);
  ops.add(op);
  inputQueue.drainTo(ops);

  Map<String, List<Indexable>> deleteMap = new HashMap<String, List<Indexable>>(4);
  Map<String, List<Indexable>> updateMap = new HashMap<String, List<Indexable>>(4);

  for (Operation o : ops) {
   if (o instanceof StopOperation) {
    ((StopOperation) o).stop();
   } else {
    Indexable indexable = o.indexable;
    if (o.type == OperationType.DELETE) {
     List<Indexable> docs = deleteMap.get(indexable.getType());
     if (docs == null) {
      docs = new LinkedList<Indexable>();
      deleteMap.put(indexable.getType(), docs);
     }
     docs.add(indexable);
    } else {
     List<Indexable> docs = updateMap.get(indexable.getType());
     if (docs == null) {
      docs = new LinkedList<Indexable>();
      updateMap.put(indexable.getType(), docs);
     }
     docs.add(indexable);
    }
   }
  }

  for (Iterator<Map.Entry<String, List<Indexable>>> i = deleteMap.entrySet().iterator(); i.hasNext();) {
   Map.Entry<String, List<Indexable>> entry = i.next();
   delIndices(entry.getKey(), entry.getValue());
  }

  for (Iterator<Map.Entry<String, List<Indexable>>> i = updateMap.entrySet().iterator(); i.hasNext();) {
   Map.Entry<String, List<Indexable>> entry = i.next();
   updateIndices(entry.getKey(), entry.getValue());
  }
 }

 enum OperationType { DELETE, UPDATE, SHUTDOWN }

 static class Operation {
  OperationType type;
  Indexable indexable;

  Operation() {}

  Operation(Indexable indexable, OperationType type) {
   this.indexable = indexable;
   this.type = type;
  }
 }

 static class StopOperation extends Operation {
  CountDownLatch latch;

  StopOperation(CountDownLatch latch) {
   this.latch = latch;
   this.type = OperationType.SHUTDOWN;
  }

  public void stop() {
   latch.countDown();
  }
 }

 //~ Accessors ===============

}

相关文章

  • Spring Boot日志技术logback原理及配置解析

    Spring Boot日志技术logback原理及配置解析

    这篇文章主要介绍了Spring Boot日志技术logback原理及用法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • spring 声明式事务实现过程解析

    spring 声明式事务实现过程解析

    这篇文章主要介绍了spring 声明式事务实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • JAVA中的基本数据类型

    JAVA中的基本数据类型

    本文主要介绍了JAVA中的基本数据类型。具有很好的参考价值,下面跟着小编一起来看下吧
    2017-02-02
  • SpringBoot中使用MongoDB的连接池配置

    SpringBoot中使用MongoDB的连接池配置

    由于MongoDB的客户端本身就是一个连接池,因此,我们只需要配置客户端即可,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-10-10
  • SpringBoot 请求参数忽略大小写的实例

    SpringBoot 请求参数忽略大小写的实例

    这篇文章主要介绍了SpringBoot 请求参数忽略大小写的实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-01-01
  • SpringEvents与异步事件驱动案例详解

    SpringEvents与异步事件驱动案例详解

    本文深入探讨了SpringBoot中的事件驱动架构,特别是通过Spring事件机制实现组件解耦和系统扩展性增强,介绍了事件的发布者、事件本身、事件监听器和事件处理器的概念,感兴趣的朋友跟随小编一起看看吧
    2024-09-09
  • Java使用新浪微博API开发微博应用的基本方法

    Java使用新浪微博API开发微博应用的基本方法

    这篇文章主要介绍了Java使用新浪微博API开发微博应用的基本方法,文中还给出了一个不使用任何SDK实现Oauth授权并实现简单的发布微博功能的实现方法,需要的朋友可以参考下
    2015-11-11
  • SpringBoot数据层测试事务回滚的实现流程

    SpringBoot数据层测试事务回滚的实现流程

    这篇文章主要介绍了SpringBoot数据层测试事务回滚的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习吧
    2022-10-10
  • IDEA整合SSM框架实现网页上显示数据

    IDEA整合SSM框架实现网页上显示数据

    最近做了个小项目,该项目包在intellij idea中实现了ssm框架的整合以及实现访问,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-05-05
  • SpringBoot基于Redis实现生成全局唯一ID的方法

    SpringBoot基于Redis实现生成全局唯一ID的方法

    在项目中生成全局唯一ID有很多好处,生成全局唯一ID有助于提高系统的可用性、数据的完整性和安全性,同时也方便数据的管理和分析,所以本文给大家介绍了SpringBoot基于Redis实现生成全局唯一ID的方法,文中有详细的代码讲解,需要的朋友可以参考下
    2023-12-12

最新评论