Browse Source

[update]0.2版本

xiahailong 2 years ago
parent
commit
5f0726f326

+ 46 - 2
.idea/workspace.xml

@@ -87,6 +87,19 @@
     </key>
   </component>
   <component name="RunManager" selected="Spring Boot.FusionAnalysisApplication">
+    <configuration name="text" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="cn.com.taiji.service.impl.text" />
+      <module name="fusion-analysis" />
+      <extension name="coverage">
+        <pattern>
+          <option name="PATTERN" value="cn.com.taiji.service.impl.*" />
+          <option name="ENABLED" value="true" />
+        </pattern>
+      </extension>
+      <method v="2">
+        <option name="Make" enabled="true" />
+      </method>
+    </configuration>
     <configuration name="BeidouAnalySisApplication" type="SpringBootApplicationConfigurationType" factoryName="Spring Boot">
       <module name="beidou-analysis" />
       <option name="SPRING_BOOT_MAIN_CLASS" value="cn.com.taiji.BeidouAnalySisApplication" />
@@ -137,6 +150,7 @@
     </configuration>
     <recent_temporary>
       <list>
+        <item itemvalue="应用程序.text" />
         <item itemvalue="Spring Boot.EsTrackAnAlySisApplication" />
         <item itemvalue="Spring Boot.EsTrackAnAlySisApplication" />
         <item itemvalue="Spring Boot.EsTrackAnAlySisApplication" />
@@ -157,7 +171,8 @@
       <workItem from="1675666106206" duration="7273000" />
       <workItem from="1675730151616" duration="2482000" />
       <workItem from="1675751880629" duration="13334000" />
-      <workItem from="1675788186214" duration="4218000" />
+      <workItem from="1675788186214" duration="4782000" />
+      <workItem from="1675817188436" duration="6347000" />
     </task>
     <task id="LOCAL-00001" summary="[insert]新增prod环境配置,完善es连接配置">
       <created>1675652041715</created>
@@ -187,7 +202,14 @@
       <option name="project" value="LOCAL" />
       <updated>1675768958870</updated>
     </task>
-    <option name="localTasksCounter" value="5" />
+    <task id="LOCAL-00005" summary="[update]修改pom文件">
+      <created>1675792903860</created>
+      <option name="number" value="00005" />
+      <option name="presentableId" value="LOCAL-00005" />
+      <option name="project" value="LOCAL" />
+      <updated>1675792903860</updated>
+    </task>
+    <option name="localTasksCounter" value="6" />
     <servers />
   </component>
   <component name="TypeScriptGeneratedFilesManager">
@@ -228,6 +250,25 @@
     <option name="LAST_COMMIT_MESSAGE" value="[update]修改pom文件" />
   </component>
   <component name="XDebuggerManager">
+    <breakpoint-manager>
+      <breakpoints>
+        <line-breakpoint enabled="true" type="java-line">
+          <url>file://$PROJECT_DIR$/fusion-analysis/src/main/java/cn/com/taiji/service/impl/FusionAnalysisServiceImpl.java</url>
+          <line>652</line>
+          <option name="timeStamp" value="18" />
+        </line-breakpoint>
+        <line-breakpoint enabled="true" type="java-line">
+          <url>file://$PROJECT_DIR$/fusion-analysis/src/main/java/cn/com/taiji/service/impl/FusionAnalysisServiceImpl.java</url>
+          <line>670</line>
+          <option name="timeStamp" value="19" />
+        </line-breakpoint>
+        <line-breakpoint enabled="true" type="java-line">
+          <url>file://$PROJECT_DIR$/fusion-analysis/src/main/java/cn/com/taiji/service/impl/FusionAnalysisServiceImpl.java</url>
+          <line>691</line>
+          <option name="timeStamp" value="20" />
+        </line-breakpoint>
+      </breakpoints>
+    </breakpoint-manager>
     <pin-to-top-manager>
       <pinned-members>
         <PinnedItemInfo parentTag="org.elasticsearch.search.aggregations.Aggregations" memberName="aggregations" />
@@ -239,4 +280,7 @@
     <expand />
     <select />
   </component>
+  <component name="com.intellij.coverage.CoverageDataManagerImpl">
+    <SUITE FILE_PATH="coverage/FusionAnalysis$FusionAnalysisApplication.ic" NAME="FusionAnalysisApplication 覆盖结果" MODIFIED="1675822157470" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="idea" COVERAGE_BY_TEST_ENABLED="false" COVERAGE_TRACING_ENABLED="false" />
+  </component>
 </project>

+ 177 - 65
fusion-analysis/src/main/java/cn/com/taiji/service/impl/FusionAnalysisServiceImpl.java

@@ -14,6 +14,7 @@ import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.client.config.RequestConfig;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
@@ -133,8 +134,8 @@ public class FusionAnalysisServiceImpl implements FusionAnalysisService {
                         Date date = null;
                         Date date2 = null;
                         try {
-                            date = DateUtils.parse(job.get("mergeTime").toString(),"yyyy-MM-dd");
-                            date2 = DateUtils.parse(job2.get("mergeTime").toString(),"yyyy-MM-dd");
+                            date = DateUtils.parse(job.get("mergeTime").toString(),"yyyy-MM-dd HH:mm:ss.SSS");
+                            date2 = DateUtils.parse(job2.get("mergeTime").toString(),"yyyy-MM-dd HH:mm:ss.SSS");
                         } catch (ParseException e) {
                             e.printStackTrace();
                         }
@@ -314,14 +315,40 @@ public class FusionAnalysisServiceImpl implements FusionAnalysisService {
                 .timeout(TimeValue.timeValueSeconds(500L));
         request.source(searchSourceBuilder);
         SearchResponse search = client.search(request, COMMON_OPTIONS);
-        Aggregations aggregations = search.getAggregations();
-        ParsedStringTerms aggregation = aggregations.get("ship_field_agg");
-        for (Terms.Bucket bucket : aggregation.getBuckets()) {
-            LeaceOutDto dto = new LeaceOutDto();
-            dto.setBatchNumber(bucket.getKeyAsString());
-            dto.setCount(bucket.getDocCount());
-            dtos.add(dto);
+        SearchHits searchHits = search.getHits();
+        SearchHit[] hits = searchHits.getHits();
+        List<Map<String, Object>> inList = new ArrayList<Map<String, Object>>();
+        List<Map<String, Object>> outList = new ArrayList<Map<String, Object>>();
+        for (SearchHit hit : hits) {
+            Map<String, Object> map = hit.getSourceAsMap();
+            inList.add(map);
         }
+        outList = change(inList, "fusionBatchNum", outList);
+        outList.forEach(map-> {
+                    JSONObject maps = JSONObject.parseObject(JSON.toJSONString(map));
+//            JSONObject maps = new JSONObject();
+//            Iterator it = map.keySet().iterator();
+//            while (it.hasNext()) {
+//                String key = (String) it.next();
+//                maps.put(key, map.get(key));
+//            }
+                    JSONArray jsonArray = JSONArray.parseArray(maps.getString("array"));
+                    if (jsonArray.size() > 0) {
+                        JSONObject job = jsonArray.getJSONObject(0);
+                        LeaceOutDto dto = new LeaceOutDto();
+                        dto.setBatchNumber(job.getString("fusionBatchNum"));
+            dto.setCount((long) jsonArray.size());
+            dtos.add(dto);
+                    }
+                });
+//        Aggregations aggregations = search.getAggregations();
+//        ParsedStringTerms aggregation = aggregations.get("ship_field_agg");
+//        for (Terms.Bucket bucket : aggregation.getBuckets()) {
+//            LeaceOutDto dto = new LeaceOutDto();
+//            dto.setBatchNumber(bucket.getKeyAsString());
+//            dto.setCount(bucket.getDocCount());
+//            dtos.add(dto);
+//        }
         dtos.forEach(item->{
             LeaveOutData data = new LeaveOutData();
             SearchRequest request2 = new SearchRequest(INDEX_SEAT_REALTIMETRAJECTORY+time);
@@ -339,13 +366,13 @@ public class FusionAnalysisServiceImpl implements FusionAnalysisService {
                     .timeout(TimeValue.timeValueSeconds(500L));
             request2.source(searchSourceBuilder2);
             try {
-                SearchResponse search2 = client.search(request, COMMON_OPTIONS);
-                SearchHits searchHits = search2.getHits();
-                Long totalHits = searchHits.getTotalHits().value;
+                SearchResponse search2 = client.search(request2, COMMON_OPTIONS);
+                SearchHits searchHits2 = search2.getHits();
+                Long totalHits = searchHits2.getTotalHits().value;
                 if (totalHits<5) {
-                    SearchHit[] hits = searchHits.getHits();
-                    if (hits.length > 0) {
-                        JSONObject sourceItem = JSONObject.parseObject(hits[0].getSourceAsString());
+                    SearchHit[] hits2 = searchHits2.getHits();
+                    if (hits2.length > 0) {
+                        JSONObject sourceItem = JSONObject.parseObject(hits2[0].getSourceAsString());
                         data.setMergeTarget(sourceItem.getString("mergeTarget"));
                         data.setStartTime(query.getStartTime());
                         data.setEndTime(query.getEndTime());
@@ -384,26 +411,53 @@ public class FusionAnalysisServiceImpl implements FusionAnalysisService {
         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
         boolQueryBuilder.filter(QueryBuilders.rangeQuery("locationTime").gte(query.getStartTime()).lte(query.getEndTime()));
         searchSourceBuilder.query(boolQueryBuilder);
-        TermsAggregationBuilder aggBuilder = AggregationBuilders.terms("ship_field_agg")
-                .field("devideNo")
-                .size(query.getPageSize());
+//        TermsAggregationBuilder aggBuilder = AggregationBuilders.terms("ship_field_agg")
+//                .field("fusionBatchNum")
+//                .size(query.getPageSize());
         searchSourceBuilder
                 .trackTotalHits(true)
-                .aggregation(aggBuilder)
+//                .aggregation(aggBuilder)
                 .size(query.getPageSize())
+                .sort(SortBuilders.fieldSort("locationTime").order(SortOrder.ASC))
                 .timeout(TimeValue.timeValueHours(1L))
                 .timeout(TimeValue.timeValueMinutes(30L))
                 .timeout(TimeValue.timeValueSeconds(500L));
         request.source(searchSourceBuilder);
         SearchResponse search = client.search(request, COMMON_OPTIONS);
-        Aggregations aggregations = search.getAggregations();
-        ParsedStringTerms aggregation = aggregations.get("ship_field_agg");
-        for (Terms.Bucket bucket : aggregation.getBuckets()) {
-            LeaceOutDto dto = new LeaceOutDto();
-            dto.setBatchNumber(bucket.getKeyAsString());
-            dto.setCount(Long.valueOf(bucket.getDocCount()));
-            dtos.add(dto);
+        SearchHits searchHits = search.getHits();
+        SearchHit[] hits = searchHits.getHits();
+        List<Map<String, Object>> inList = new ArrayList<Map<String, Object>>();
+        List<Map<String, Object>> outList = new ArrayList<Map<String, Object>>();
+        for (SearchHit hit : hits) {
+            Map<String, Object> map = hit.getSourceAsMap();
+            inList.add(map);
         }
+        outList = change(inList, "devideNo", outList);
+        outList.forEach(map-> {
+            JSONObject maps = JSONObject.parseObject(JSON.toJSONString(map));
+//            JSONObject maps = new JSONObject();
+//            Iterator it = map.keySet().iterator();
+//            while (it.hasNext()) {
+//                String key = (String) it.next();
+//                maps.put(key, map.get(key));
+//            }
+            JSONArray jsonArray = JSONArray.parseArray(maps.getString("array"));
+            if (jsonArray.size() > 0) {
+                JSONObject job = jsonArray.getJSONObject(0);
+                LeaceOutDto dto = new LeaceOutDto();
+                dto.setBatchNumber(job.getString("devideNo"));
+                dto.setCount((long) jsonArray.size());
+                dtos.add(dto);
+            }
+        });
+//        Aggregations aggregations = search.getAggregations();
+//        ParsedStringTerms aggregation = aggregations.get("ship_field_agg");
+//        for (Terms.Bucket bucket : aggregation.getBuckets()) {
+//            LeaceOutDto dto = new LeaceOutDto();
+//            dto.setBatchNumber(bucket.getKeyAsString());
+//            dto.setCount(bucket.getDocCount());
+//            dtos.add(dto);
+//        }
         dtos.forEach(item->{
             LeaveOutData data = new LeaveOutData();
             SearchRequest request2 = new SearchRequest(INDEX_SEAT_REALTIMETRAJECTORY+time);
@@ -421,13 +475,13 @@ public class FusionAnalysisServiceImpl implements FusionAnalysisService {
                     .timeout(TimeValue.timeValueSeconds(500L));
             request2.source(searchSourceBuilder2);
             try {
-                SearchResponse search2 = client.search(request, COMMON_OPTIONS);
-                SearchHits searchHits = search2.getHits();
-                Long totalHits = searchHits.getTotalHits().value;
+                SearchResponse search2 = client.search(request2, COMMON_OPTIONS);
+                SearchHits searchHits2 = search2.getHits();
+                Long totalHits = searchHits2.getTotalHits().value;
                 if (totalHits<5) {
-                    SearchHit[] hits = searchHits.getHits();
-                    if (hits.length > 0) {
-                        JSONObject sourceItem = JSONObject.parseObject(hits[0].getSourceAsString());
+                    SearchHit[] hits2 = searchHits2.getHits();
+                    if (hits2.length > 0) {
+                        JSONObject sourceItem = JSONObject.parseObject(hits2[0].getSourceAsString());
                         data.setMergeTarget(sourceItem.getString("mergeTarget"));
                         data.setStartTime(query.getStartTime());
                         data.setEndTime(query.getEndTime());
@@ -466,26 +520,53 @@ public class FusionAnalysisServiceImpl implements FusionAnalysisService {
         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
         boolQueryBuilder.filter(QueryBuilders.rangeQuery("time").gte(query.getStartTime()).lte(query.getEndTime()));
         searchSourceBuilder.query(boolQueryBuilder);
-        TermsAggregationBuilder aggBuilder = AggregationBuilders.terms("ship_field_agg")
-                .field("targetID")
-                .size(query.getPageSize());
+//        TermsAggregationBuilder aggBuilder = AggregationBuilders.terms("ship_field_agg")
+//                .field("fusionBatchNum")
+//                .size(query.getPageSize());
         searchSourceBuilder
                 .trackTotalHits(true)
-                .aggregation(aggBuilder)
+//                .aggregation(aggBuilder)
                 .size(query.getPageSize())
+                .sort(SortBuilders.fieldSort("time").order(SortOrder.ASC))
                 .timeout(TimeValue.timeValueHours(1L))
                 .timeout(TimeValue.timeValueMinutes(30L))
                 .timeout(TimeValue.timeValueSeconds(500L));
         request.source(searchSourceBuilder);
         SearchResponse search = client.search(request, COMMON_OPTIONS);
-        Aggregations aggregations = search.getAggregations();
-        ParsedStringTerms aggregation = aggregations.get("ship_field_agg");
-        for (Terms.Bucket bucket : aggregation.getBuckets()) {
-            LeaceOutDto dto = new LeaceOutDto();
-            dto.setBatchNumber(bucket.getKeyAsString());
-            dto.setCount(Long.valueOf(bucket.getDocCount()));
-            dtos.add(dto);
+        SearchHits searchHits = search.getHits();
+        SearchHit[] hits = searchHits.getHits();
+        List<Map<String, Object>> inList = new ArrayList<Map<String, Object>>();
+        List<Map<String, Object>> outList = new ArrayList<Map<String, Object>>();
+        for (SearchHit hit : hits) {
+            Map<String, Object> map = hit.getSourceAsMap();
+            inList.add(map);
         }
+        outList = change(inList, "targetID", outList);
+        outList.forEach(map-> {
+            JSONObject maps = JSONObject.parseObject(JSON.toJSONString(map));
+//            JSONObject maps = new JSONObject();
+//            Iterator it = map.keySet().iterator();
+//            while (it.hasNext()) {
+//                String key = (String) it.next();
+//                maps.put(key, map.get(key));
+//            }
+            JSONArray jsonArray = JSONArray.parseArray(maps.getString("array"));
+            if (jsonArray.size() > 0) {
+                JSONObject job = jsonArray.getJSONObject(0);
+                LeaceOutDto dto = new LeaceOutDto();
+                dto.setBatchNumber(job.getString("targetID"));
+                dto.setCount((long) jsonArray.size());
+                dtos.add(dto);
+            }
+        });
+//        Aggregations aggregations = search.getAggregations();
+//        ParsedStringTerms aggregation = aggregations.get("ship_field_agg");
+//        for (Terms.Bucket bucket : aggregation.getBuckets()) {
+//            LeaceOutDto dto = new LeaceOutDto();
+//            dto.setBatchNumber(bucket.getKeyAsString());
+//            dto.setCount(bucket.getDocCount());
+//            dtos.add(dto);
+//        }
         dtos.forEach(item->{
             LeaveOutData data = new LeaveOutData();
             SearchRequest request2 = new SearchRequest(INDEX_SEAT_REALTIMETRAJECTORY+time);
@@ -503,13 +584,13 @@ public class FusionAnalysisServiceImpl implements FusionAnalysisService {
                     .timeout(TimeValue.timeValueSeconds(500L));
             request2.source(searchSourceBuilder2);
             try {
-                SearchResponse search2 = client.search(request, COMMON_OPTIONS);
-                SearchHits searchHits = search2.getHits();
-                Long totalHits = searchHits.getTotalHits().value;
+                SearchResponse search2 = client.search(request2, COMMON_OPTIONS);
+                SearchHits searchHits2 = search2.getHits();
+                Long totalHits = searchHits2.getTotalHits().value;
                 if (totalHits<5) {
-                    SearchHit[] hits = searchHits.getHits();
-                    if (hits.length > 0) {
-                        JSONObject sourceItem = JSONObject.parseObject(hits[0].getSourceAsString());
+                    SearchHit[] hits2 = searchHits2.getHits();
+                    if (hits2.length > 0) {
+                        JSONObject sourceItem = JSONObject.parseObject(hits2[0].getSourceAsString());
                         data.setMergeTarget(sourceItem.getString("mergeTarget"));
                         data.setStartTime(query.getStartTime());
                         data.setEndTime(query.getEndTime());
@@ -549,26 +630,44 @@ public class FusionAnalysisServiceImpl implements FusionAnalysisService {
         boolQueryBuilder.filter(QueryBuilders.rangeQuery("receiveTime").gte(query.getStartTime()).lte(query.getEndTime()));
         boolQueryBuilder.filter(QueryBuilders.termQuery("targetProper","1"));
         searchSourceBuilder.query(boolQueryBuilder);
-        TermsAggregationBuilder aggBuilder = AggregationBuilders.terms("ship_field_agg")
-                .field("fusionBatchNum")
-                .size(query.getPageSize());
+//        TermsAggregationBuilder aggBuilder = AggregationBuilders.terms("ship_field_agg")
+//                .field("fusionBatchNum")
+//                .size(query.getPageSize());
         searchSourceBuilder
                 .trackTotalHits(true)
-                .aggregation(aggBuilder)
+//                .aggregation(aggBuilder)
                 .size(query.getPageSize())
                 .timeout(TimeValue.timeValueHours(1L))
                 .timeout(TimeValue.timeValueMinutes(30L))
                 .timeout(TimeValue.timeValueSeconds(500L));
         request.source(searchSourceBuilder);
         SearchResponse search = client.search(request, COMMON_OPTIONS);
-        Aggregations aggregations = search.getAggregations();
-        ParsedStringTerms aggregation = aggregations.get("ship_field_agg");
-        for (Terms.Bucket bucket : aggregation.getBuckets()) {
-            LeaceOutDto dto = new LeaceOutDto();
-            dto.setBatchNumber(bucket.getKeyAsString());
-            dto.setCount(Long.valueOf(bucket.getDocCount()));
-            dtos.add(dto);
+        SearchHits searchHits = search.getHits();
+        SearchHit[] hits = searchHits.getHits();
+        List<Map<String, Object>> inList = new ArrayList<Map<String, Object>>();
+        List<Map<String, Object>> outList = new ArrayList<Map<String, Object>>();
+        for (SearchHit hit : hits) {
+            Map<String, Object> map = hit.getSourceAsMap();
+            inList.add(map);
         }
+        outList = change(inList, "fusionBatchNum", outList);
+        outList.forEach(map-> {
+            JSONObject maps = JSONObject.parseObject(JSON.toJSONString(map));
+//            JSONObject maps = new JSONObject();
+//            Iterator it = map.keySet().iterator();
+//            while (it.hasNext()) {
+//                String key = (String) it.next();
+//                maps.put(key, map.get(key));
+//            }
+            JSONArray jsonArray = JSONArray.parseArray(maps.getString("array"));
+            if (jsonArray.size() > 0) {
+                JSONObject job = jsonArray.getJSONObject(0);
+                LeaceOutDto dto = new LeaceOutDto();
+                dto.setBatchNumber(job.getString("fusionBatchNum"));
+                dto.setCount((long) jsonArray.size());
+                dtos.add(dto);
+            }
+        });
         dtos.forEach(item->{
             StaticOrDynamicData data = new StaticOrDynamicData();
             SearchRequest request2 = new SearchRequest(INDEX_SEAT_REALTIMETRAJECTORY+time);
@@ -587,11 +686,11 @@ public class FusionAnalysisServiceImpl implements FusionAnalysisService {
                     .timeout(TimeValue.timeValueSeconds(500L));
             request2.source(searchSourceBuilder2);
             try {
-                SearchResponse search2 = client.search(request, COMMON_OPTIONS);
-                SearchHits searchHits = search2.getHits();
-                SearchHit[] hits = searchHits.getHits();
-                if (hits.length > 0) {
-                    JSONObject sourceItem = JSONObject.parseObject(hits[0].getSourceAsString());
+                SearchResponse search2 = client.search(request2, COMMON_OPTIONS);
+                SearchHits searchHits2 = search2.getHits();
+                SearchHit[] hits2 = searchHits2.getHits();
+                if (hits2.length > 0) {
+                    JSONObject sourceItem = JSONObject.parseObject(hits2[0].getSourceAsString());
                     data.setMergeTarget(sourceItem.getString("mergeTarget"));
                     data.setBatchNumber(item.getBatchNumber());
                     list.add(data);
@@ -648,3 +747,16 @@ public class FusionAnalysisServiceImpl implements FusionAnalysisService {
         return outList;
     }
 }
+
+
+class text{
+    @Autowired
+    public RestHighLevelClient client;
+
+    public static void main(String[] args) {
+        String source = "[{\"track_device_no\":\"15013798\",\"track_id\":\"15013798\",\"id\":\"82717282\",\"time\":\"2023 - 02 - 07 00: 59: 30.069\",\"type\":\"BEIDOU\"}]";
+        String json = JSON.toJSONString(source);
+        System.out.println(json);
+
+    }
+}