|
@@ -9,19 +9,24 @@ import cn.com.taiji.service.TianaoService;
|
|
|
import cn.com.taiji.utils.DateUtils;
|
|
|
import cn.com.taiji.vo.ExportVo;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.http.client.config.RequestConfig;
|
|
|
import org.elasticsearch.action.search.SearchRequest;
|
|
|
import org.elasticsearch.action.search.SearchResponse;
|
|
|
+import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
|
|
|
import org.elasticsearch.client.RequestOptions;
|
|
|
import org.elasticsearch.client.RestHighLevelClient;
|
|
|
import org.elasticsearch.index.query.BoolQueryBuilder;
|
|
|
import org.elasticsearch.index.query.QueryBuilders;
|
|
|
import org.elasticsearch.search.SearchHit;
|
|
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
|
|
+import org.elasticsearch.search.sort.SortOrder;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
/**
|
|
|
* @author chenfangchao
|
|
@@ -30,36 +35,56 @@ import java.util.*;
|
|
|
* @description: TODO
|
|
|
* @date 2023/2/6 5:53 PM
|
|
|
*/
|
|
|
+@Slf4j
|
|
|
@Service
|
|
|
public class TianaoServiceImpl implements TianaoService {
|
|
|
|
|
|
@Autowired
|
|
|
private RestHighLevelClient restHighLevelClient;
|
|
|
|
|
|
+ private static final RequestOptions COMMON_OPTIONS;
|
|
|
+
|
|
|
+ static {
|
|
|
+ RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(5000)
|
|
|
+ .setSocketTimeout(60000).build();
|
|
|
+ RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
|
|
|
+ builder.setRequestConfig(requestConfig).setHttpAsyncResponseConsumerFactory(
|
|
|
+ new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(5 * 1024 * 1024 * 1024)
|
|
|
+ );
|
|
|
+
|
|
|
+ COMMON_OPTIONS = builder.build();
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
|
|
|
@Override
|
|
|
public List<ExportVo> tianaRader(QueryVo query) {
|
|
|
- ArrayList<String> result = new ArrayList<>();
|
|
|
- List<String> tianaoRadarList = new ArrayList<>();
|
|
|
- IdentityHashMap<String, CountDTO> idenNewStringMap = new IdentityHashMap<>();
|
|
|
+ Set<String> result = new HashSet<>();
|
|
|
+ List<String> tianaoRadarList = new LinkedList<>();
|
|
|
+ Map<String, CountDTO> idenNewStringMap = new ConcurrentHashMap<>();
|
|
|
String time = DateUtils.format(new Date(), "yyyy-MM-dd");
|
|
|
+
|
|
|
SearchRequest searchRequest = new SearchRequest(EsIndexConstants.INDEX_SEAT_TRACK_RADAR + time);
|
|
|
// 构建查询条件
|
|
|
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
|
|
|
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
|
|
- boolQueryBuilder.filter(QueryBuilders.rangeQuery("fusionTime").gte(query.getStartTime()).lte(query.getEndTime()));
|
|
|
+ boolQueryBuilder.filter(QueryBuilders.rangeQuery("collectTime").gte(query.getStartTime()).lte(query.getEndTime()));
|
|
|
+ searchSourceBuilder.sort("collectTime", SortOrder.ASC);
|
|
|
searchSourceBuilder.size(1000000);
|
|
|
+ searchSourceBuilder.trackTotalHits(true);
|
|
|
searchSourceBuilder.query(boolQueryBuilder);
|
|
|
searchRequest.source(searchSourceBuilder);
|
|
|
try {
|
|
|
- SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
|
|
|
+ SearchResponse search = restHighLevelClient.search(searchRequest, COMMON_OPTIONS);
|
|
|
SearchHit[] hits = search.getHits().getHits();
|
|
|
for (SearchHit hit : hits) {
|
|
|
- String sourceAsString = hit.getSourceAsString();
|
|
|
- TianaoRadar object = JSON.parseObject(sourceAsString, TianaoRadar.class);
|
|
|
+ Map<String, Object> sourceAsMap = hit.getSourceAsMap();
|
|
|
+ String fusionBatchNum = (String) sourceAsMap.get("fusionBatchNum");
|
|
|
//存储天奥
|
|
|
- tianaoRadarList.add(object.getFusionBatchNum());
|
|
|
+ tianaoRadarList.add(fusionBatchNum);
|
|
|
}
|
|
|
+ log.info("当前查询到的天奥数据为: " + search.getHits().getTotalHits().value + "条" + tianaoRadarList.size());
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
@@ -69,16 +94,19 @@ public class TianaoServiceImpl implements TianaoService {
|
|
|
// 构建查询条件
|
|
|
BoolQueryBuilder fusionBoolQueryBuilder = QueryBuilders.boolQuery();
|
|
|
SearchSourceBuilder fusionSearchSourceBuilder = new SearchSourceBuilder();
|
|
|
- boolQueryBuilder.filter(QueryBuilders.rangeQuery("mergeTime").gte(query.getStartTime()).lte(query.getEndTime()));
|
|
|
+ fusionBoolQueryBuilder.filter(QueryBuilders.termsQuery("mergeType","RADAR"));
|
|
|
+ fusionBoolQueryBuilder.filter(QueryBuilders.rangeQuery("mergeTime").gte(query.getStartTime()).lte(query.getEndTime()));
|
|
|
+ fusionSearchSourceBuilder.sort("mergeTime",SortOrder.ASC);
|
|
|
fusionSearchSourceBuilder.size(1000000);
|
|
|
+ fusionSearchSourceBuilder.trackTotalHits(true);
|
|
|
fusionSearchSourceBuilder.query(fusionBoolQueryBuilder);
|
|
|
fusionSearchRequest.source(fusionSearchSourceBuilder);
|
|
|
try {
|
|
|
- SearchResponse fusionSearch = restHighLevelClient.search(fusionSearchRequest, RequestOptions.DEFAULT);
|
|
|
+ SearchResponse fusionSearch = restHighLevelClient.search(fusionSearchRequest, COMMON_OPTIONS);
|
|
|
SearchHit[] hits = fusionSearch.getHits().getHits();
|
|
|
for (SearchHit hit : hits) {
|
|
|
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
|
|
|
- String mergeTarget = (String) sourceAsMap.get("merge_target");
|
|
|
+ String mergeTarget = (String) sourceAsMap.get("mergeTarget");
|
|
|
String targetSource = (String) sourceAsMap.get("targetSource");
|
|
|
List<FusionNewShipJsonDTO> targetSourceList = JSON.parseArray(targetSource, FusionNewShipJsonDTO.class);
|
|
|
for (FusionNewShipJsonDTO item : targetSourceList) {
|
|
@@ -91,8 +119,8 @@ public class TianaoServiceImpl implements TianaoService {
|
|
|
if(!data.contains(mergeTarget)){
|
|
|
data.add(mergeTarget);
|
|
|
countDTO.setData(data);
|
|
|
+ idenNewStringMap.put(item.getTrackId(),countDTO);
|
|
|
}
|
|
|
- idenNewStringMap.put(item.getTrackId(),countDTO);
|
|
|
}else {
|
|
|
ArrayList<String> list = new ArrayList<>();
|
|
|
list.add(mergeTarget);
|
|
@@ -102,31 +130,44 @@ public class TianaoServiceImpl implements TianaoService {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ log.info("当前查询到的融合船舶数据为: " + fusionSearch.getHits().getTotalHits().value + "条");
|
|
|
} catch (IOException e) {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
+
|
|
|
//遍历天奥数据、找到在一条天奥数据在融合中有两个不同mergeTarget的融合数据
|
|
|
for (String key : tianaoRadarList) {
|
|
|
//计算key在map中的数量,大于1则拿出map中的mergeTarget
|
|
|
CountDTO countDTO = idenNewStringMap.get(key);
|
|
|
- if(countDTO.getCount() > 1){
|
|
|
- List<String> data = countDTO.getData();
|
|
|
- result.addAll(data);
|
|
|
+ if(null != countDTO) {
|
|
|
+ if (null != countDTO.getCount()) {
|
|
|
+ if(countDTO.getCount() > 1){
|
|
|
+ List<String> data = countDTO.getData();
|
|
|
+ if(null != data && 0 < data.size()){
|
|
|
+ for (String datum : data) {
|
|
|
+ result.add(datum);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ log.info("天奥船舶在融合中有多个融合匹配号的有: " + result.size() + "条");
|
|
|
+
|
|
|
//导出融合数据
|
|
|
SearchRequest exportFusionSearchRequest = new SearchRequest(EsIndexConstants.INDEX_SEAT_REALTIMETRAJECTORY + time);
|
|
|
SearchSourceBuilder exportSearchSourceBuilder = new SearchSourceBuilder();
|
|
|
exportSearchSourceBuilder.size(1000000);
|
|
|
- exportSearchSourceBuilder.query(QueryBuilders.termsQuery("mergeTime",result));
|
|
|
+ exportSearchSourceBuilder.trackTotalHits(true);
|
|
|
+ exportSearchSourceBuilder.query(QueryBuilders.termsQuery("mergeTarget",result));
|
|
|
exportFusionSearchRequest.source(exportSearchSourceBuilder);
|
|
|
List<ExportVo> data = new ArrayList<>();
|
|
|
try {
|
|
|
- SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
|
|
|
+ SearchResponse response = restHighLevelClient.search(exportFusionSearchRequest,COMMON_OPTIONS);
|
|
|
SearchHit[] hits = response.getHits().getHits();
|
|
|
for (SearchHit hit : hits) {
|
|
|
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
|
|
|
- String mergeTarget = (String) sourceAsMap.get("merge_target");
|
|
|
+ String mergeTarget = (String) sourceAsMap.get("mergeTarget");
|
|
|
String targetSource = (String) sourceAsMap.get("targetSource");
|
|
|
ExportVo tianaoVo = new ExportVo();
|
|
|
tianaoVo.setMergeTarget(mergeTarget);
|