123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package cn.com.taiji.service;
- import cn.com.taiji.constants.EsIndexConstants;
- import cn.com.taiji.entity.CountDTO;
- import cn.com.taiji.entity.FusionNewShipJsonDTO;
- import cn.com.taiji.entity.HlxOneLevelShipTrackDTO;
- import cn.com.taiji.entity.QueryVo;
- import cn.com.taiji.utils.DateUtils;
- import cn.com.taiji.vo.ExportVo;
- import com.alibaba.fastjson.JSON;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.http.client.config.RequestConfig;
- import org.elasticsearch.action.search.SearchRequest;
- import org.elasticsearch.action.search.SearchResponse;
- import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.index.query.BoolQueryBuilder;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.search.SearchHit;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import java.io.IOException;
- import java.util.*;
- import java.util.concurrent.ConcurrentHashMap;
- /**
- * @author chenfangchao
- * @title: HlxOneLevelServiceImpl
- * @projectName es-track-analysis
- * @description: TODO
- * @date 2023/2/7 4:53 PM
- */
- @Slf4j
- @Service
- public class HlxOneLevelServiceImpl implements HlxOneLevelService{
- @Autowired
- private RestHighLevelClient restHighLevelClient;
- private static final RequestOptions COMMON_OPTIONS;
- static {
- RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(5000)
- .setSocketTimeout(60000).build();
- RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
- builder.setRequestConfig(requestConfig).setHttpAsyncResponseConsumerFactory(
- new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(5 * 1024 * 1024 * 1024)
- );
- COMMON_OPTIONS = builder.build();
- }
- @Override
- public List<ExportVo> hlx(QueryVo query) {
- Map<String,CountDTO> result = new ConcurrentHashMap<>();
- List<String> hlxList = new LinkedList<>();
- Map<String, CountDTO> idenNewStringMap = new ConcurrentHashMap<>();
- String time = DateUtils.format(new Date(), "yyyy-MM-dd");
- SearchRequest searchRequest = new SearchRequest(EsIndexConstants.INDEX_SEAT_HLX_FUSION_SHIP + time);
- // 构建查询条件
- BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- boolQueryBuilder.filter(QueryBuilders.rangeQuery("time").gte(query.getStartTime()).lte(query.getEndTime()));
- searchSourceBuilder.size(1000000);
- searchSourceBuilder.trackTotalHits(true);
- searchSourceBuilder.query(boolQueryBuilder);
- searchRequest.source(searchSourceBuilder);
- try {
- SearchResponse search = restHighLevelClient.search(searchRequest, COMMON_OPTIONS);
- SearchHit[] hits = search.getHits().getHits();
- log.info("当前查询到的海兰信数据为: " + search.getHits().getTotalHits().value + "条");
- for (SearchHit hit : hits) {
- String sourceAsString = hit.getSourceAsString();
- HlxOneLevelShipTrackDTO object = JSON.parseObject(sourceAsString, HlxOneLevelShipTrackDTO.class);
- //存储海兰信
- hlxList.add(object.getTargetId());
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- //融合
- SearchRequest fusionSearchRequest = new SearchRequest((EsIndexConstants.INDEX_SEAT_REALTIMETRAJECTORY + time));
- // 构建查询条件
- BoolQueryBuilder fusionBoolQueryBuilder = QueryBuilders.boolQuery();
- SearchSourceBuilder fusionSearchSourceBuilder = new SearchSourceBuilder();
- fusionBoolQueryBuilder.filter(QueryBuilders.rangeQuery("mergeTime").gte(query.getStartTime()).lte(query.getEndTime()));
- fusionSearchSourceBuilder.size(1000000);
- fusionSearchSourceBuilder.trackTotalHits(true);
- fusionSearchSourceBuilder.query(fusionBoolQueryBuilder);
- fusionSearchRequest.source(fusionSearchSourceBuilder);
- try {
- SearchResponse fusionSearch = restHighLevelClient.search(fusionSearchRequest, COMMON_OPTIONS);
- SearchHit[] hits = fusionSearch.getHits().getHits();
- for (SearchHit hit : hits) {
- Map<String, Object> sourceAsMap = hit.getSourceAsMap();
- String mergeTarget = (String) sourceAsMap.get("mergeTarget");
- String targetSource = (String) sourceAsMap.get("targetSource");
- List<FusionNewShipJsonDTO> targetSourceList = JSON.parseArray(targetSource, FusionNewShipJsonDTO.class);
- for (FusionNewShipJsonDTO item : targetSourceList) {
- if (null != item) {
- if (null != item.getType() && "HLX_AIS_RADAR".equals(item.getType())) {
- if(idenNewStringMap.get(item.getTrackId()) != null){
- CountDTO countDTO = idenNewStringMap.get(item.getTrackId());
- countDTO.setCount(countDTO.getCount()+1);
- List<String> data = countDTO.getData();
- if(!data.contains(mergeTarget)){
- data.add(mergeTarget);
- countDTO.setData(data);
- idenNewStringMap.put(item.getTrackId(),countDTO);
- }
- }else {
- ArrayList<String> list = new ArrayList<>();
- list.add(mergeTarget);
- idenNewStringMap.put(item.getTrackId(),new CountDTO(1,list));
- }
- }
- }
- }
- }
- log.info("当前查询到的海兰信船舶数据为: " + fusionSearch.getHits().getTotalHits().value + "条");
- } catch (IOException e) {
- e.printStackTrace();
- }
- //遍历海兰信数据、找到在一条海兰信数据在融合中有两个不同mergeTarget的融合数据
- for (String key : hlxList) {
- //计算key在map中的数量,大于1则拿出map中的mergeTarget
- CountDTO countDTO = idenNewStringMap.get(key);
- if(null != countDTO){
- if(null != countDTO.getCount()){
- if(countDTO.getCount() > 1){
- List<String> data = countDTO.getData();
- if(null != data && 0 < data.size()){
- for (String item : data) {
- //mergeTarget
- result.put(item,countDTO);
- }
- }
- }
- }
- }
- }
- log.info("海兰信船舶在融合中有多个融合匹配号的有: " + result.size() + "条");
- //导出融合数据
- SearchRequest exportFusionSearchRequest = new SearchRequest(EsIndexConstants.INDEX_SEAT_REALTIMETRAJECTORY + time);
- SearchSourceBuilder exportSearchSourceBuilder = new SearchSourceBuilder();
- exportSearchSourceBuilder.size(50000);
- exportSearchSourceBuilder.trackTotalHits(true);
- exportSearchSourceBuilder.query(QueryBuilders.termsQuery("mergeTarget",result.keySet()));
- exportFusionSearchRequest.source(exportSearchSourceBuilder);
- List<ExportVo> data = new ArrayList<>();
- try {
- SearchResponse response = restHighLevelClient.search(exportFusionSearchRequest, COMMON_OPTIONS);
- SearchHit[] hits = response.getHits().getHits();
- for (SearchHit hit : hits) {
- Map<String, Object> sourceAsMap = hit.getSourceAsMap();
- String mergeTarget = (String) sourceAsMap.get("mergeTarget");
- String targetSource = (String) sourceAsMap.get("targetSource");
- String mergeTime = (String) sourceAsMap.get("mergeTime");
- CountDTO countDTO = result.get(mergeTarget);
- if(null != countDTO.getData() && 1 < countDTO.getData().size()){
- ExportVo exportVo = new ExportVo();
- exportVo.setMergeTarget(countDTO.getData().toString());
- exportVo.setTargetSource(targetSource);
- exportVo.setEndTime(mergeTime);
- exportVo.setCount(countDTO.getData().size());
- List<FusionNewShipJsonDTO> targetSourceList = JSON.parseArray(targetSource, FusionNewShipJsonDTO.class);
- for (FusionNewShipJsonDTO item : targetSourceList) {
- if (null != item) {
- if (null != item.getType() && "HLX_AIS_RADAR".equals(item.getType())) {
- exportVo.setStartTime(item.getTime());
- }
- }
- }
- data.add(exportVo);
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- return data;
- }
- }
|