HlxOneLevelServiceImpl.java 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package cn.com.taiji.service;
  2. import cn.com.taiji.constants.EsIndexConstants;
  3. import cn.com.taiji.entity.CountDTO;
  4. import cn.com.taiji.entity.FusionNewShipJsonDTO;
  5. import cn.com.taiji.entity.HlxOneLevelShipTrackDTO;
  6. import cn.com.taiji.entity.QueryVo;
  7. import cn.com.taiji.utils.DateUtils;
  8. import cn.com.taiji.vo.ExportVo;
  9. import com.alibaba.fastjson.JSON;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.apache.http.client.config.RequestConfig;
  12. import org.elasticsearch.action.search.SearchRequest;
  13. import org.elasticsearch.action.search.SearchResponse;
  14. import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
  15. import org.elasticsearch.client.RequestOptions;
  16. import org.elasticsearch.client.RestHighLevelClient;
  17. import org.elasticsearch.index.query.BoolQueryBuilder;
  18. import org.elasticsearch.index.query.QueryBuilders;
  19. import org.elasticsearch.search.SearchHit;
  20. import org.elasticsearch.search.builder.SearchSourceBuilder;
  21. import org.springframework.beans.factory.annotation.Autowired;
  22. import org.springframework.stereotype.Service;
  23. import java.io.IOException;
  24. import java.util.*;
  25. import java.util.concurrent.ConcurrentHashMap;
  26. /**
  27. * @author chenfangchao
  28. * @title: HlxOneLevelServiceImpl
  29. * @projectName es-track-analysis
  30. * @description: TODO
  31. * @date 2023/2/7 4:53 PM
  32. */
  33. @Slf4j
  34. @Service
  35. public class HlxOneLevelServiceImpl implements HlxOneLevelService{
  36. @Autowired
  37. private RestHighLevelClient restHighLevelClient;
  38. private static final RequestOptions COMMON_OPTIONS;
  39. static {
  40. RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(5000)
  41. .setSocketTimeout(60000).build();
  42. RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
  43. builder.setRequestConfig(requestConfig).setHttpAsyncResponseConsumerFactory(
  44. new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(5 * 1024 * 1024 * 1024)
  45. );
  46. COMMON_OPTIONS = builder.build();
  47. }
  48. @Override
  49. public List<ExportVo> hlx(QueryVo query) {
  50. Map<String,CountDTO> result = new ConcurrentHashMap<>();
  51. List<String> hlxList = new LinkedList<>();
  52. Map<String, CountDTO> idenNewStringMap = new ConcurrentHashMap<>();
  53. String time = DateUtils.format(new Date(), "yyyy-MM-dd");
  54. SearchRequest searchRequest = new SearchRequest(EsIndexConstants.INDEX_SEAT_HLX_FUSION_SHIP + time);
  55. // 构建查询条件
  56. BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
  57. SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  58. boolQueryBuilder.filter(QueryBuilders.rangeQuery("time").gte(query.getStartTime()).lte(query.getEndTime()));
  59. searchSourceBuilder.size(1000000);
  60. searchSourceBuilder.trackTotalHits(true);
  61. searchSourceBuilder.query(boolQueryBuilder);
  62. searchRequest.source(searchSourceBuilder);
  63. try {
  64. SearchResponse search = restHighLevelClient.search(searchRequest, COMMON_OPTIONS);
  65. SearchHit[] hits = search.getHits().getHits();
  66. log.info("当前查询到的海兰信数据为: " + search.getHits().getTotalHits().value + "条");
  67. for (SearchHit hit : hits) {
  68. String sourceAsString = hit.getSourceAsString();
  69. HlxOneLevelShipTrackDTO object = JSON.parseObject(sourceAsString, HlxOneLevelShipTrackDTO.class);
  70. //存储海兰信
  71. hlxList.add(object.getTargetId());
  72. }
  73. } catch (IOException e) {
  74. e.printStackTrace();
  75. }
  76. //融合
  77. SearchRequest fusionSearchRequest = new SearchRequest((EsIndexConstants.INDEX_SEAT_REALTIMETRAJECTORY + time));
  78. // 构建查询条件
  79. BoolQueryBuilder fusionBoolQueryBuilder = QueryBuilders.boolQuery();
  80. SearchSourceBuilder fusionSearchSourceBuilder = new SearchSourceBuilder();
  81. fusionBoolQueryBuilder.filter(QueryBuilders.rangeQuery("mergeTime").gte(query.getStartTime()).lte(query.getEndTime()));
  82. fusionSearchSourceBuilder.size(1000000);
  83. fusionSearchSourceBuilder.trackTotalHits(true);
  84. fusionSearchSourceBuilder.query(fusionBoolQueryBuilder);
  85. fusionSearchRequest.source(fusionSearchSourceBuilder);
  86. try {
  87. SearchResponse fusionSearch = restHighLevelClient.search(fusionSearchRequest, COMMON_OPTIONS);
  88. SearchHit[] hits = fusionSearch.getHits().getHits();
  89. for (SearchHit hit : hits) {
  90. Map<String, Object> sourceAsMap = hit.getSourceAsMap();
  91. String mergeTarget = (String) sourceAsMap.get("mergeTarget");
  92. String targetSource = (String) sourceAsMap.get("targetSource");
  93. List<FusionNewShipJsonDTO> targetSourceList = JSON.parseArray(targetSource, FusionNewShipJsonDTO.class);
  94. for (FusionNewShipJsonDTO item : targetSourceList) {
  95. if (null != item) {
  96. if (null != item.getType() && "HLX_AIS_RADAR".equals(item.getType())) {
  97. if(idenNewStringMap.get(item.getTrackId()) != null){
  98. CountDTO countDTO = idenNewStringMap.get(item.getTrackId());
  99. countDTO.setCount(countDTO.getCount()+1);
  100. List<String> data = countDTO.getData();
  101. if(!data.contains(mergeTarget)){
  102. data.add(mergeTarget);
  103. countDTO.setData(data);
  104. idenNewStringMap.put(item.getTrackId(),countDTO);
  105. }
  106. }else {
  107. ArrayList<String> list = new ArrayList<>();
  108. list.add(mergeTarget);
  109. idenNewStringMap.put(item.getTrackId(),new CountDTO(1,list));
  110. }
  111. }
  112. }
  113. }
  114. }
  115. log.info("当前查询到的海兰信船舶数据为: " + fusionSearch.getHits().getTotalHits().value + "条");
  116. } catch (IOException e) {
  117. e.printStackTrace();
  118. }
  119. //遍历海兰信数据、找到在一条海兰信数据在融合中有两个不同mergeTarget的融合数据
  120. for (String key : hlxList) {
  121. //计算key在map中的数量,大于1则拿出map中的mergeTarget
  122. CountDTO countDTO = idenNewStringMap.get(key);
  123. if(null != countDTO){
  124. if(null != countDTO.getCount()){
  125. if(countDTO.getCount() > 1){
  126. List<String> data = countDTO.getData();
  127. if(null != data && 0 < data.size()){
  128. for (String item : data) {
  129. //mergeTarget
  130. result.put(item,countDTO);
  131. }
  132. }
  133. }
  134. }
  135. }
  136. }
  137. log.info("海兰信船舶在融合中有多个融合匹配号的有: " + result.size() + "条");
  138. //导出融合数据
  139. SearchRequest exportFusionSearchRequest = new SearchRequest(EsIndexConstants.INDEX_SEAT_REALTIMETRAJECTORY + time);
  140. SearchSourceBuilder exportSearchSourceBuilder = new SearchSourceBuilder();
  141. exportSearchSourceBuilder.size(50000);
  142. exportSearchSourceBuilder.trackTotalHits(true);
  143. exportSearchSourceBuilder.query(QueryBuilders.termsQuery("mergeTarget",result.keySet()));
  144. exportFusionSearchRequest.source(exportSearchSourceBuilder);
  145. List<ExportVo> data = new ArrayList<>();
  146. try {
  147. SearchResponse response = restHighLevelClient.search(exportFusionSearchRequest, COMMON_OPTIONS);
  148. SearchHit[] hits = response.getHits().getHits();
  149. for (SearchHit hit : hits) {
  150. Map<String, Object> sourceAsMap = hit.getSourceAsMap();
  151. String mergeTarget = (String) sourceAsMap.get("mergeTarget");
  152. String targetSource = (String) sourceAsMap.get("targetSource");
  153. String mergeTime = (String) sourceAsMap.get("mergeTime");
  154. CountDTO countDTO = result.get(mergeTarget);
  155. if(null != countDTO.getData() && 1 < countDTO.getData().size()){
  156. ExportVo exportVo = new ExportVo();
  157. exportVo.setMergeTarget(countDTO.getData().toString());
  158. exportVo.setTargetSource(targetSource);
  159. exportVo.setEndTime(mergeTime);
  160. exportVo.setCount(countDTO.getData().size());
  161. List<FusionNewShipJsonDTO> targetSourceList = JSON.parseArray(targetSource, FusionNewShipJsonDTO.class);
  162. for (FusionNewShipJsonDTO item : targetSourceList) {
  163. if (null != item) {
  164. if (null != item.getType() && "HLX_AIS_RADAR".equals(item.getType())) {
  165. exportVo.setStartTime(item.getTime());
  166. }
  167. }
  168. }
  169. data.add(exportVo);
  170. }
  171. }
  172. } catch (IOException e) {
  173. e.printStackTrace();
  174. }
  175. return data;
  176. }
  177. }