|
@@ -0,0 +1,288 @@
|
|
|
+package cn.com.taiji.cql.service.impl;
|
|
|
+
|
|
|
+import cn.com.taiji.common.domain.PageResult;
|
|
|
+import cn.com.taiji.common.enums.CommonConstant;
|
|
|
+import cn.com.taiji.common.model.EsDataSource;
|
|
|
+import cn.com.taiji.cql.service.IElasticSearchService;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.apache.http.HttpHost;
|
|
|
+import org.apache.http.auth.AuthScope;
|
|
|
+import org.apache.http.auth.UsernamePasswordCredentials;
|
|
|
+import org.apache.http.client.CredentialsProvider;
|
|
|
+import org.apache.http.impl.client.BasicCredentialsProvider;
|
|
|
+import org.elasticsearch.client.RestClient;
|
|
|
+import org.geotools.api.data.DataStore;
|
|
|
+import org.geotools.api.data.Query;
|
|
|
+import org.geotools.api.data.SimpleFeatureSource;
|
|
|
+import org.geotools.api.feature.simple.SimpleFeature;
|
|
|
+import org.geotools.api.feature.type.AttributeDescriptor;
|
|
|
+import org.geotools.api.filter.Filter;
|
|
|
+import org.geotools.api.filter.sort.SortBy;
|
|
|
+import org.geotools.api.filter.sort.SortOrder;
|
|
|
+import org.geotools.data.elasticsearch.ElasticDataStore;
|
|
|
+import org.geotools.data.simple.SimpleFeatureCollection;
|
|
|
+import org.geotools.data.simple.SimpleFeatureIterator;
|
|
|
+import org.geotools.filter.FilterFactoryImpl;
|
|
|
+import org.geotools.filter.SortByImpl;
|
|
|
+import org.geotools.filter.text.ecql.ECQL;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class ElasticSearchServiceImpl implements IElasticSearchService {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public PageResult esCqlQuery(EsDataSource esDataSource, Map<String, Object> paramMaps) {
|
|
|
+
|
|
|
+ List<Map<String, Object>> dataList = new ArrayList<Map<String, Object>>();
|
|
|
+
|
|
|
+ DataStore dataStore = null;
|
|
|
+
|
|
|
+ RestClient restClient = null;
|
|
|
+
|
|
|
+ String host = esDataSource.getUris();
|
|
|
+
|
|
|
+ String indexName = paramMaps.get("indexName").toString();
|
|
|
+
|
|
|
+ //查询记录总数
|
|
|
+ Long count =0l;
|
|
|
+
|
|
|
+ //当前页数
|
|
|
+ Long pageNumber= CommonConstant.PAGE_NUMBER;
|
|
|
+
|
|
|
+ //一页显示多少条记录
|
|
|
+ Long pageSize=CommonConstant.PAGE_SIZE;
|
|
|
+
|
|
|
+ //总页数
|
|
|
+ Long pageCount =0l;
|
|
|
+
|
|
|
+ try {
|
|
|
+
|
|
|
+ restClient = createRestClient(esDataSource);
|
|
|
+
|
|
|
+ dataStore = new ElasticDataStore(restClient, indexName);
|
|
|
+
|
|
|
+ if (dataStore != null) {
|
|
|
+
|
|
|
+ log.info("系统连接到位于:" + host + "的空间数据库" + indexName + "成功!");
|
|
|
+
|
|
|
+ //根据表名获取source
|
|
|
+ SimpleFeatureSource fSource=dataStore.getFeatureSource(indexName);
|
|
|
+
|
|
|
+
|
|
|
+ FilterFactoryImpl filterFactory = new FilterFactoryImpl();
|
|
|
+
|
|
|
+ //组转cql查询条件
|
|
|
+ Query query = new Query();
|
|
|
+
|
|
|
+ if(paramMaps.containsKey("cql")) {
|
|
|
+
|
|
|
+ String cql = paramMaps.get("cql").toString();
|
|
|
+
|
|
|
+ if(cql !=null && !"".equals(cql)){
|
|
|
+
|
|
|
+ Filter filter = ECQL.toFilter(cql);
|
|
|
+
|
|
|
+ query.setFilter(filter);
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //组装排序参数
|
|
|
+ if(paramMaps.containsKey("sort")){
|
|
|
+
|
|
|
+ List<SortByImpl> sortByList = new ArrayList<SortByImpl>();
|
|
|
+
|
|
|
+ JSONObject sortJSONObject = JSONObject.parseObject(paramMaps.get("sort").toString());
|
|
|
+
|
|
|
+ for(String sortKey : sortJSONObject.keySet()) {
|
|
|
+
|
|
|
+ Object sortValue = sortJSONObject.get(sortKey);
|
|
|
+
|
|
|
+ if("asc".equals(sortValue.toString().toLowerCase())){
|
|
|
+
|
|
|
+ SortByImpl sb = new SortByImpl(filterFactory.property(sortKey),SortOrder.ASCENDING);
|
|
|
+
|
|
|
+ sortByList.add(sb);
|
|
|
+ }else{
|
|
|
+
|
|
|
+ SortByImpl sb = new SortByImpl(filterFactory.property(sortKey),SortOrder.DESCENDING);
|
|
|
+
|
|
|
+ sortByList.add(sb);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ query.setSortBy(sortByList.toArray(new SortBy[sortByList.size()]));
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ //组装分页参数
|
|
|
+ if(paramMaps.containsKey("pageSize") && paramMaps.containsKey("pageNumber")){
|
|
|
+
|
|
|
+ //查询记录总数
|
|
|
+ count = Long.valueOf(fSource.getCount(query));
|
|
|
+
|
|
|
+ pageNumber = Long.valueOf(paramMaps.get("pageNumber").toString());
|
|
|
+
|
|
|
+ pageSize = Long.valueOf(paramMaps.get("pageSize").toString());
|
|
|
+
|
|
|
+ int startIndex = pageNumber.intValue()-1 <=0?0:pageNumber.intValue()-1;
|
|
|
+
|
|
|
+ //设置分页查询参数
|
|
|
+ query.setStartIndex(startIndex*pageSize.intValue());
|
|
|
+
|
|
|
+ query.setMaxFeatures(pageSize.intValue());
|
|
|
+
|
|
|
+ //计算总页数
|
|
|
+ if (count%pageSize ==0){
|
|
|
+
|
|
|
+ pageCount = count/pageSize;
|
|
|
+
|
|
|
+ }else{
|
|
|
+
|
|
|
+ pageCount = count/pageSize +1;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ SimpleFeatureCollection simpleFeatureCollection =fSource.getFeatures(query);
|
|
|
+
|
|
|
+ if(simpleFeatureCollection !=null){
|
|
|
+
|
|
|
+ // 获取当前矢量数据有哪些属性字段值
|
|
|
+ List<AttributeDescriptor> attributeList = simpleFeatureCollection.getSchema().getAttributeDescriptors();
|
|
|
+
|
|
|
+
|
|
|
+ SimpleFeatureIterator simpleFeatureIterator = simpleFeatureCollection.features();
|
|
|
+
|
|
|
+ while (simpleFeatureIterator.hasNext()){
|
|
|
+ //获取每一个要素
|
|
|
+ SimpleFeature simpleFeature = simpleFeatureIterator.next();
|
|
|
+
|
|
|
+ Map<String, Object> dataMap = new HashMap<String, Object>();
|
|
|
+
|
|
|
+ String id = simpleFeature.getID();
|
|
|
+ id = id.substring(id.lastIndexOf(".")+1);
|
|
|
+
|
|
|
+ dataMap.put("id",id);
|
|
|
+
|
|
|
+ for(AttributeDescriptor attribute:attributeList){
|
|
|
+
|
|
|
+// System.out.println(attribute.getLocalName() + ":" + simpleFeature.getAttribute(attribute.getLocalName()));
|
|
|
+
|
|
|
+ dataMap.put(attribute.getLocalName(),simpleFeature.getAttribute(attribute.getLocalName()));
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ dataList.add(dataMap);
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ simpleFeatureIterator.close();
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ if(!paramMaps.containsKey("pageSize") && !paramMaps.containsKey("pageNumber")){
|
|
|
+
|
|
|
+ count = Long.valueOf(dataList.size());
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ } else {
|
|
|
+
|
|
|
+ log.info("系统连接到位于:" + host + "的空间数据库" + indexName + "失败!请检查相关参数");
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ }catch (Exception e) {
|
|
|
+
|
|
|
+ e.printStackTrace();
|
|
|
+ log.error("系统连接到位于:" + esDataSource.getUris()+ "的空间数据库失败!请检查相关参数");
|
|
|
+
|
|
|
+ }finally {
|
|
|
+
|
|
|
+ if(dataStore !=null){
|
|
|
+
|
|
|
+ dataStore.dispose();
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ if(restClient !=null){
|
|
|
+
|
|
|
+ try{
|
|
|
+
|
|
|
+ restClient.close();
|
|
|
+
|
|
|
+ }catch (Exception exception){
|
|
|
+
|
|
|
+ exception.printStackTrace();
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ int respCode = 0;
|
|
|
+
|
|
|
+ String respMsg = "操作成功";
|
|
|
+
|
|
|
+ if(count == 0){
|
|
|
+
|
|
|
+ respCode = 1;
|
|
|
+ respMsg = "操作成功,但没有查找到记录!";
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ return PageResult.<List<Map<String, Object>>>builder().datas(dataList).resp_code(respCode).resp_msg(respMsg).pageSize(pageSize).pageNumber(pageNumber).pageCount(pageCount).recordCount(count).build();
|
|
|
+ }
|
|
|
+
|
|
|
+ private RestClient createRestClient(EsDataSource esDataSource) throws Exception {
|
|
|
+
|
|
|
+ String[] esUris=esDataSource.getUris().split(",");
|
|
|
+
|
|
|
+ HttpHost[] httpHosts = new HttpHost[esUris.length];
|
|
|
+ //将地址转换为http主机数组,未配置端口则采用默认9200端口,配置了端口则用配置的端口
|
|
|
+ for (int i = 0; i < httpHosts.length; i++) {
|
|
|
+ if (!StringUtils.isEmpty(esUris[i])) {
|
|
|
+ if (esUris[i].contains(":")) {
|
|
|
+ String[] uris = esUris[i].split(":");
|
|
|
+ httpHosts[i] = new HttpHost(uris[0], Integer.parseInt(uris[1]), "http");
|
|
|
+ } else {
|
|
|
+ httpHosts[i] = new HttpHost(esUris[i], 9200, "http");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
|
|
|
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esDataSource.getUsername(), esDataSource.getPassword()));
|
|
|
+
|
|
|
+ return RestClient.builder(httpHosts)
|
|
|
+ .setRequestConfigCallback(
|
|
|
+ requestConfigBuilder -> {
|
|
|
+ requestConfigBuilder.setConnectTimeout(esDataSource.getConnectionTimeout());
|
|
|
+ requestConfigBuilder.setSocketTimeout(esDataSource.getSocketTimeout());
|
|
|
+ requestConfigBuilder.setConnectionRequestTimeout(esDataSource.getConnectionRequestTimeout());
|
|
|
+ return requestConfigBuilder;
|
|
|
+ }
|
|
|
+ ).setHttpClientConfigCallback(
|
|
|
+ httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setMaxConnTotal(30).setMaxConnPerRoute(30)
|
|
|
+ ).build();
|
|
|
+
|
|
|
+ }
|
|
|
+}
|