package cn.com.taiji.service; import cn.com.taiji.constants.EsIndexConstants; import cn.com.taiji.entity.CountDTO; import cn.com.taiji.entity.FusionNewShipJsonDTO; import cn.com.taiji.entity.HlxOneLevelShipTrackDTO; import cn.com.taiji.entity.QueryVo; import cn.com.taiji.utils.DateUtils; import cn.com.taiji.vo.ExportVo; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.http.client.config.RequestConfig; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.HttpAsyncResponseConsumerFactory; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * @author chenfangchao * @title: HlxOneLevelServiceImpl * @projectName es-track-analysis * @description: TODO * @date 2023/2/7 4:53 PM */ @Slf4j @Service public class HlxOneLevelServiceImpl implements HlxOneLevelService{ @Autowired private RestHighLevelClient restHighLevelClient; private static final RequestOptions COMMON_OPTIONS; static { RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(5000) .setSocketTimeout(60000).build(); RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); builder.setRequestConfig(requestConfig).setHttpAsyncResponseConsumerFactory( new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(5 * 1024 * 1024 * 1024) ); COMMON_OPTIONS = builder.build(); } @Override public List hlx(QueryVo query) { Map result = new ConcurrentHashMap<>(); List hlxList = new LinkedList<>(); Map idenNewStringMap = new ConcurrentHashMap<>(); String time = DateUtils.format(new Date(), "yyyy-MM-dd"); SearchRequest searchRequest = new SearchRequest(EsIndexConstants.INDEX_SEAT_HLX_FUSION_SHIP + time); // 构建查询条件 BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); boolQueryBuilder.filter(QueryBuilders.rangeQuery("time").gte(query.getStartTime()).lte(query.getEndTime())); searchSourceBuilder.size(1000000); searchSourceBuilder.trackTotalHits(true); searchSourceBuilder.query(boolQueryBuilder); searchRequest.source(searchSourceBuilder); try { SearchResponse search = restHighLevelClient.search(searchRequest, COMMON_OPTIONS); SearchHit[] hits = search.getHits().getHits(); log.info("当前查询到的海兰信数据为: " + search.getHits().getTotalHits().value + "条"); for (SearchHit hit : hits) { String sourceAsString = hit.getSourceAsString(); HlxOneLevelShipTrackDTO object = JSON.parseObject(sourceAsString, HlxOneLevelShipTrackDTO.class); //存储海兰信 hlxList.add(object.getTargetId()); } } catch (IOException e) { e.printStackTrace(); } //融合 SearchRequest fusionSearchRequest = new SearchRequest((EsIndexConstants.INDEX_SEAT_REALTIMETRAJECTORY + time)); // 构建查询条件 BoolQueryBuilder fusionBoolQueryBuilder = QueryBuilders.boolQuery(); SearchSourceBuilder fusionSearchSourceBuilder = new SearchSourceBuilder(); fusionBoolQueryBuilder.filter(QueryBuilders.rangeQuery("mergeTime").gte(query.getStartTime()).lte(query.getEndTime())); fusionSearchSourceBuilder.size(1000000); fusionSearchSourceBuilder.trackTotalHits(true); fusionSearchSourceBuilder.query(fusionBoolQueryBuilder); fusionSearchRequest.source(fusionSearchSourceBuilder); try { SearchResponse fusionSearch = restHighLevelClient.search(fusionSearchRequest, COMMON_OPTIONS); SearchHit[] hits = fusionSearch.getHits().getHits(); for (SearchHit hit : hits) { Map sourceAsMap = hit.getSourceAsMap(); String mergeTarget = (String) sourceAsMap.get("mergeTarget"); String targetSource = (String) sourceAsMap.get("targetSource"); List targetSourceList = JSON.parseArray(targetSource, FusionNewShipJsonDTO.class); for (FusionNewShipJsonDTO item : targetSourceList) { if (null != item) { if (null != item.getType() && "HLX_AIS_RADAR".equals(item.getType())) { if(idenNewStringMap.get(item.getTrackId()) != null){ CountDTO countDTO = idenNewStringMap.get(item.getTrackId()); countDTO.setCount(countDTO.getCount()+1); List data = countDTO.getData(); if(!data.contains(mergeTarget)){ data.add(mergeTarget); countDTO.setData(data); idenNewStringMap.put(item.getTrackId(),countDTO); } }else { ArrayList list = new ArrayList<>(); list.add(mergeTarget); idenNewStringMap.put(item.getTrackId(),new CountDTO(1,list)); } } } } } log.info("当前查询到的海兰信船舶数据为: " + fusionSearch.getHits().getTotalHits().value + "条"); } catch (IOException e) { e.printStackTrace(); } //遍历海兰信数据、找到在一条海兰信数据在融合中有两个不同mergeTarget的融合数据 for (String key : hlxList) { //计算key在map中的数量,大于1则拿出map中的mergeTarget CountDTO countDTO = idenNewStringMap.get(key); if(null != countDTO){ if(null != countDTO.getCount()){ if(countDTO.getCount() > 1){ List data = countDTO.getData(); if(null != data && 0 < data.size()){ for (String item : data) { //mergeTarget result.put(item,countDTO.getCount()); } } } } } } log.info("海兰信船舶在融合中有多个融合匹配号的有: " + result.size() + "条"); //导出融合数据 SearchRequest exportFusionSearchRequest = new SearchRequest(EsIndexConstants.INDEX_SEAT_REALTIMETRAJECTORY + time); SearchSourceBuilder exportSearchSourceBuilder = new SearchSourceBuilder(); exportSearchSourceBuilder.size(1000000); exportSearchSourceBuilder.trackTotalHits(true); exportSearchSourceBuilder.query(QueryBuilders.termsQuery("mergeTarget",result.keySet())); exportFusionSearchRequest.source(exportSearchSourceBuilder); List data = new ArrayList<>(); try { SearchResponse response = restHighLevelClient.search(exportFusionSearchRequest, COMMON_OPTIONS); SearchHit[] hits = response.getHits().getHits(); for (SearchHit hit : hits) { Map sourceAsMap = hit.getSourceAsMap(); String mergeTarget = (String) sourceAsMap.get("mergeTarget"); String targetSource = (String) sourceAsMap.get("targetSource"); String mergeTime = (String) sourceAsMap.get("mergeTime"); Integer count = result.get(mergeTarget); ExportVo exportVo = new ExportVo(); exportVo.setMergeTarget(mergeTarget); exportVo.setTargetSource(targetSource); exportVo.setEndTime(mergeTime); exportVo.setCount(count); List targetSourceList = JSON.parseArray(targetSource, FusionNewShipJsonDTO.class); for (FusionNewShipJsonDTO item : targetSourceList) { if (null != item) { if (null != item.getType() && "HLX_AIS_RADAR".equals(item.getType())) { exportVo.setStartTime(item.getSourceTime()); } } } data.add(exportVo); } } catch (IOException e) { e.printStackTrace(); } return data; } }