package cn.com.taiji.service; import cn.com.taiji.constants.EsIndexConstants; import cn.com.taiji.entity.CountDTO; import cn.com.taiji.entity.FusionNewShipJsonDTO; import cn.com.taiji.entity.HlxOneLevelShipTrackDTO; import cn.com.taiji.entity.QueryVo; import cn.com.taiji.utils.DateUtils; import cn.com.taiji.vo.ExportVo; import com.alibaba.fastjson.JSON; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.*; /** * @author chenfangchao * @title: HlxOneLevelServiceImpl * @projectName es-track-analysis * @description: TODO * @date 2023/2/7 4:53 PM */ @Service public class HlxOneLevelServiceImpl implements HlxOneLevelService{ @Autowired private RestHighLevelClient restHighLevelClient; @Override public List hlx(QueryVo query) { ArrayList result = new ArrayList<>(); List hlxList = new ArrayList<>(); IdentityHashMap idenNewStringMap = new IdentityHashMap<>(); String time = DateUtils.format(new Date(), "yyyy-MM-dd"); SearchRequest searchRequest = new SearchRequest(EsIndexConstants.INDEX_SEAT_HLX_FUSION_SHIP + time); // 构建查询条件 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); boolQueryBuilder.filter(QueryBuilders.rangeQuery("time").gte(query.getStartTime()).lte(query.getEndTime())); searchSourceBuilder.query(boolQueryBuilder); searchRequest.source(searchSourceBuilder); try { SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); SearchHit[] hits = search.getHits().getHits(); for (SearchHit hit : hits) { String sourceAsString = hit.getSourceAsString(); HlxOneLevelShipTrackDTO object = JSON.parseObject(sourceAsString, HlxOneLevelShipTrackDTO.class); //存储海兰信 hlxList.add(object.getTargetId()); } } catch (IOException e) { e.printStackTrace(); } //融合 SearchRequest fusionSearchRequest = new SearchRequest((EsIndexConstants.INDEX_SEAT_REALTIMETRAJECTORY + time)); // 构建查询条件 BoolQueryBuilder fusionBoolQueryBuilder = QueryBuilders.boolQuery(); SearchSourceBuilder fusionSearchSourceBuilder = new SearchSourceBuilder(); boolQueryBuilder.filter(QueryBuilders.rangeQuery("mergeTime").gte(query.getStartTime()).lte(query.getEndTime())); fusionSearchSourceBuilder.query(fusionBoolQueryBuilder); fusionSearchRequest.source(fusionSearchSourceBuilder); try { SearchResponse fusionSearch = restHighLevelClient.search(fusionSearchRequest, RequestOptions.DEFAULT); SearchHit[] hits = fusionSearch.getHits().getHits(); for (SearchHit hit : hits) { Map sourceAsMap = hit.getSourceAsMap(); String mergeTarget = (String) sourceAsMap.get("merge_target"); String targetSource = (String) sourceAsMap.get("targetSource"); List targetSourceList = JSON.parseArray(targetSource, FusionNewShipJsonDTO.class); for (FusionNewShipJsonDTO item : targetSourceList) { if (null != item) { if (null != item.getType() && "HLX_AIS_RADAR".equals(item.getType())) { if(idenNewStringMap.get(item.getTrackId()) != null){ CountDTO countDTO = idenNewStringMap.get(item.getTrackId()); countDTO.setCount(countDTO.getCount()+1); List data = countDTO.getData(); if(!data.contains(mergeTarget)){ data.add(mergeTarget); countDTO.setData(data); } idenNewStringMap.put(item.getTrackId(),countDTO); }else { ArrayList list = new ArrayList<>(); list.add(mergeTarget); idenNewStringMap.put(item.getTrackId(),new CountDTO(1,list)); } } } } } } catch (IOException e) { e.printStackTrace(); } //遍历海兰信数据、找到在一条海兰信数据在融合中有两个不同mergeTarget的融合数据 for (String key : hlxList) { //计算key在map中的数量,大于1则拿出map中的mergeTarget CountDTO countDTO = idenNewStringMap.get(key); if(countDTO.getCount() > 1){ List data = countDTO.getData(); result.addAll(data); } } //导出融合数据 SearchRequest exportFusionSearchRequest = new SearchRequest(EsIndexConstants.INDEX_SEAT_REALTIMETRAJECTORY + time); SearchSourceBuilder exportSearchSourceBuilder = new SearchSourceBuilder(); exportSearchSourceBuilder.query(QueryBuilders.termsQuery("mergeTime",result)); exportFusionSearchRequest.source(exportSearchSourceBuilder); List data = new ArrayList<>(); try { SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); SearchHit[] hits = response.getHits().getHits(); for (SearchHit hit : hits) { Map sourceAsMap = hit.getSourceAsMap(); String mergeTarget = (String) sourceAsMap.get("merge_target"); String targetSource = (String) sourceAsMap.get("targetSource"); ExportVo tianaoVo = new ExportVo(); tianaoVo.setMergeTarget(mergeTarget); tianaoVo.setStartTime(targetSource); data.add(tianaoVo); } } catch (IOException e) { e.printStackTrace(); } return data; } }