Browse Source

[insert]新增海康抓拍转储ES算法

xiahailong 2 years ago
parent
commit
4b5bcf4fe2
16 changed files with 950 additions and 278 deletions
  1. 261 0
      src/main/java/cn/com/taiji/esshipservice/listener/hlx/fusion/DynamicCaptureCarTrackConsumer.java
  2. 237 0
      src/main/java/cn/com/taiji/esshipservice/listener/hlx/fusion/DynamicCaptureFaceTrackConsumer.java
  3. 167 0
      src/main/java/cn/com/taiji/esshipservice/listener/hlx/fusion/DynamicCaptureShipTrackConsumer.java
  4. 256 256
      src/main/java/cn/com/taiji/esshipservice/listener/hlx/fusion/EsHkImagesDumpS3.java
  5. 6 4
      src/main/java/cn/com/taiji/esshipservice/utils/HttpsDownloadUtils.java
  6. BIN
      target/EsHkImagesDumpS3.jar
  7. BIN
      target/EsHkImagesDumpS3.jar.original
  8. BIN
      target/classes/cn/com/taiji/esshipservice/listener/hlx/fusion/DynamicCaptureCarTrackConsumer.class
  9. BIN
      target/classes/cn/com/taiji/esshipservice/listener/hlx/fusion/DynamicCaptureFaceTrackConsumer.class
  10. BIN
      target/classes/cn/com/taiji/esshipservice/listener/hlx/fusion/DynamicCaptureShipTrackConsumer.class
  11. BIN
      target/classes/cn/com/taiji/esshipservice/listener/hlx/fusion/EsHkImagesDumpS3.class
  12. BIN
      target/classes/cn/com/taiji/esshipservice/utils/HttpsDownloadUtils$1.class
  13. BIN
      target/classes/cn/com/taiji/esshipservice/utils/HttpsDownloadUtils$X509TrustUtiil.class
  14. BIN
      target/classes/cn/com/taiji/esshipservice/utils/HttpsDownloadUtils.class
  15. 20 18
      target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst
  16. 3 0
      target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst

+ 261 - 0
src/main/java/cn/com/taiji/esshipservice/listener/hlx/fusion/DynamicCaptureCarTrackConsumer.java

@@ -0,0 +1,261 @@
+package cn.com.taiji.esshipservice.listener.hlx.fusion;
+
+import cn.com.taiji.esshipservice.utils.HttpsDownloadUtils;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.TopicPartition;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static cn.com.taiji.esshipservice.constant.ShipHisTrackIndexConstants.INDEX_SEAT_HIK_CAPTURE_CAR_TRACKS;
+
+
+//海康
+
+/**
+ * @author xhl
+ */
+@Component
+@Slf4j
+public class DynamicCaptureCarTrackConsumer {
+
+    @Autowired
+    private RestHighLevelClient client;
+
+
+    @Autowired
+    private HttpsDownloadUtils httpsDownloadUtils;
+
+    ExecutorService threadPool = new ThreadPoolExecutor( 10,25,10L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
+
+    /**
+     * 海康车辆抓拍数据-「分区」消费
+     * es场景
+     */
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-car.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-car.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-car.partitions0}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack0(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+    }
+
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-car.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-car.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-car.partitions1}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack1(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+
+    }
+
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-car.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-car.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-car.partitions2}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack2(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+    }
+
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-car.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-car.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-car.partitions3}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack3(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+    }
+
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-car.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-car.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-car.partitions4}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack4(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+
+    }
+
+    public void insertEs(List<ConsumerRecord<?, ?>> records) {
+        BulkRequest bulkRequest = new BulkRequest();
+        records.forEach(item -> {
+            Optional<?> message = Optional.ofNullable(item.value());
+            if (message.isPresent()) {
+                Object msg = message.get();
+                JSONObject map = JSONObject.parseObject(msg.toString());
+                JSONArray jsonArray = JSONObject.parseObject(map.getString("vehiclePassRecogList"),JSONArray.class);
+                if (jsonArray.size()>0){
+                    for (int i = 0; i < jsonArray.size(); i++) {
+                        Map<String, Object> maps = Collections.synchronizedMap(new HashMap<>());
+                        JSONObject job = jsonArray.getJSONObject(i);
+                        maps.put("cameraIndexCode", job.getString("cameraIndexCode"));
+                        maps.put("captureTime", job.getString("captureTime"));
+                        maps.put("card", job.getString("card"));
+                        maps.put("cardName", job.getString("cardName"));
+                        maps.put("cardNum", job.getString("cardNum"));
+                        maps.put("cardType", job.getString("cardType"));
+                        maps.put("cardTypeName", job.getString("cardTypeName"));
+                        maps.put("copilot", job.getString("copilot"));
+                        maps.put("copilotName", job.getString("copilotName"));
+                        maps.put("coverPlate", job.getString("coverPlate"));
+                        maps.put("coverPlateName", job.getString("coverPlateName"));
+                        maps.put("crossingIndexCode", job.getString("crossingIndexCode"));
+                        maps.put("dangMark", job.getString("dangMark"));
+                        maps.put("dangMarkName", job.getString("dangMarkName"));
+                        maps.put("decoration", job.getString("decoration"));
+                        maps.put("decorationName", job.getString("decorationName"));
+                        maps.put("directionIndex", job.getString("directionIndex"));
+                        maps.put("directionName", job.getString("directionName"));
+                        maps.put("envproSign", job.getString("envproSign"));
+                        maps.put("envproSignName", job.getString("envproSignName"));
+                        maps.put("frontChild", job.getString("frontChild"));
+                        maps.put("frontChildName", job.getString("frontChildName"));
+                        maps.put("id", job.getString("id"));
+                        maps.put("label", job.getString("label"));
+                        maps.put("labelName", job.getString("labelName"));
+                        maps.put("labelNum", job.getString("labelNum"));
+                        maps.put("laneNo", job.getString("laneNo"));
+                        maps.put("linkWifiVehicleId", job.getString("linkWifiVehicleId"));
+                        maps.put("linkFaceVehicleId", job.getString("linkFaceVehicleId"));
+                        maps.put("luggageRack", job.getString("luggageRack"));
+                        maps.put("luggageRackName", job.getString("luggageRackName"));
+                        maps.put("mobileDeviceLatitude", job.getString("mobileDeviceLatitude"));
+                        maps.put("mobileDeviceLongitude", job.getString("mobileDeviceLongitude"));
+                        maps.put("muckTruck", job.getString("muckTruck"));
+                        maps.put("muckTruckName", job.getString("muckTruckName"));
+                        maps.put("pdvs", job.getString("pdvs"));
+                        maps.put("pdvsName", job.getString("pdvsName"));
+                        maps.put("pendant", job.getString("pendant"));
+                        maps.put("pendantName", job.getString("pendantName"));
+                        maps.put("picUrlNum", job.getString("picUrlNum"));
+                        maps.put("pilotSafebelt", job.getString("pilotSafebelt"));
+                        maps.put("pliotSafebeltName", job.getString("pliotSafebeltName"));
+                        maps.put("pilotSunvisor", job.getString("pilotSunvisor"));
+                        maps.put("pilotSunvisorName", job.getString("pilotSunvisorName"));
+                        maps.put("plateColor", job.getString("plateColor"));
+                        maps.put("plateColorName", job.getString("plateColorName"));
+                        String plateImagePath = job.getString("plateImagePath");
+                        if (null != plateImagePath   && !plateImagePath.equals("")){
+                            maps.put("plateImagePath", httpsDownloadUtils.downloadFileHttps(plateImagePath));
+                        }
+                        maps.put("plateNo", job.getString("plateNo"));
+                        maps.put("plateState", job.getString("plateState"));
+                        maps.put("plateStateName", job.getString("plateStateName"));
+                        maps.put("plateType", job.getString("plateType"));
+                        maps.put("plateTypeName", job.getString("plateTypeName"));
+                        maps.put("rect", job.getString("rect"));
+                        maps.put("spareTire", job.getString("spareTire"));
+                        maps.put("spareTireName", job.getString("spareTireName"));
+                        maps.put("sunroof", job.getString("sunroof"));
+                        maps.put("sunroofName", job.getString("sunroofName"));
+                        String targetPicUrl = job.getString("targetPicUrl");
+                        if (null != targetPicUrl  && !targetPicUrl.equals("")){
+                            maps.put("targetPicUrl", httpsDownloadUtils.downloadFileHttps(targetPicUrl));
+                        }
+                        maps.put("tempPlateNo", job.getString("tempPlateNo"));
+                        maps.put("tempPlateNoName", job.getString("tempPlateNoName"));
+                        maps.put("tissueBox", job.getString("tissueBox"));
+                        maps.put("tissueBoxName", job.getString("tissueBoxName"));
+                        maps.put("tricycleCanopy", job.getString("tricycleCanopy"));
+                        maps.put("tricycleCanopyName", job.getString("tricycleCanopyName"));
+                        maps.put("uphone", job.getString("uphone"));
+                        maps.put("ecolabel", job.getString("ecolabel"));
+                        maps.put("ecolabelName", job.getString("ecolabelName"));
+                        maps.put("usePhoneName", job.getString("usePhoneName"));
+                        maps.put("vehicleColor", job.getString("vehicleColor"));
+                        maps.put("vehicleColorName", job.getString("vehicleColorName"));
+                        maps.put("vehicleColorDepth", job.getString("vehicleColorDepth"));
+                        maps.put("vehicleColorDepthName", job.getString("vehicleColorDepthName"));
+                        maps.put("vehicleHead", job.getString("vehicleHead"));
+                        maps.put("vehicleHeadName", job.getString("vehicleHeadName"));
+                        maps.put("vehicleLogo", job.getString("vehicleLogo"));
+                        maps.put("vehicleModel", job.getString("vehicleModel"));
+                        maps.put("vehicleLamp", job.getString("vehicleLamp"));
+                        maps.put("vehicleLampName", job.getString("vehicleLampName"));
+                        maps.put("vehicleSpeed", job.getString("vehicleSpeed"));
+                        maps.put("vehicleSprayPainted", job.getString("vehicleSprayPainted"));
+                        maps.put("vehicleSprayPaintedName", job.getString("vehicleSprayPaintedName"));
+                        maps.put("vehicleSubLogo", job.getString("vehicleSubLogo"));
+                        maps.put("vehicleType", job.getString("vehicleType"));
+                        maps.put("vehicleTypeName", job.getString("vehicleTypeName"));
+                        maps.put("vicePilotSafebelt", job.getString("vicePilotSafebelt"));
+                        maps.put("vicePilotSafebeltName", job.getString("vicePilotSafebeltName"));
+                        maps.put("vicePilotSunvisor", job.getString("vicePilotSunvisor"));
+                        maps.put("vicePilotSunvisorName", job.getString("vicePilotSunvisorName"));
+                        maps.put("imagePath1", job.getString("imagePath1"));
+                        maps.put("imagePath2", job.getString("imagePath2"));
+                        maps.put("imagePath3", job.getString("imagePath3"));
+                        maps.put("imagePath4", job.getString("imagePath4"));
+                        maps.put("imagePath5", job.getString("imagePath5"));
+                        maps.put("imagePath6", job.getString("imagePath6"));
+                        maps.put("direction", job.getString("direction"));
+                        bulkRequest.add(new IndexRequest(INDEX_SEAT_HIK_CAPTURE_CAR_TRACKS).source(maps));
+                    }
+                }
+            }
+        });
+        try {
+            BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+}

+ 237 - 0
src/main/java/cn/com/taiji/esshipservice/listener/hlx/fusion/DynamicCaptureFaceTrackConsumer.java

@@ -0,0 +1,237 @@
+package cn.com.taiji.esshipservice.listener.hlx.fusion;
+
+import cn.com.taiji.esshipservice.utils.HttpsDownloadUtils;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.TopicPartition;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static cn.com.taiji.esshipservice.constant.ShipHisTrackIndexConstants.INDEX_SEAT_HIK_CAPTURE_FACE_TRACKS;
+
+
+//海康
+
+/**
+ * @author xhl
+ */
+@Component
+@Slf4j
+public class DynamicCaptureFaceTrackConsumer {
+
+    @Autowired
+    private RestHighLevelClient client;
+
+    @Autowired
+    private HttpsDownloadUtils httpsDownloadUtils;
+
+    ExecutorService threadPool = new ThreadPoolExecutor( 10,25,10L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
+
+    /**
+     * 海康人员抓拍数据-「分区」消费
+     * es场景
+     */
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-face.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-face.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-face.partitions0}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack0(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+    }
+
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-face.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-face.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-face.partitions1}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack1(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+
+    }
+
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-face.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-face.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-face.partitions2}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack2(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+    }
+
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-face.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-face.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-face.partitions3}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack3(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+    }
+
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-face.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-face.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-face.partitions4}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack4(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+
+    }
+
+//    @Scheduled(cron = "0 30 18 * * ?")
+//    public void creatIndex(){
+//        threadPool.execute(()-> {
+//            try {
+//                TrackDaily();
+//            } catch (IOException e) {
+//                e.printStackTrace();
+//            }
+//        });
+//    }
+//
+//    public void TrackDaily() throws IOException {
+//        //获取明日零时时间
+//        Calendar cal = Calendar.getInstance();
+//        cal.add(Calendar.DATE,1);
+//        cal.set(cal.get(Calendar.YEAR),cal.get(Calendar.MONTH),cal.get(Calendar.DAY_OF_MONTH),0,0,0);
+//        Date d = cal.getTime();
+//        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+//        String name = INDEX_SEAT_HIK_CAPTURE_FACE_TRACKS + sdf.format(d);
+//        if (isExistsIndex(name)) {
+//            return;
+//        }
+//        CreateIndexRequest request = new CreateIndexRequest(name);
+//        // 2.准备请求参数
+//        request.source(MAPPING_HIK_CAPTURE_FACE, XContentType.JSON);
+//        client.indices().create(request, RequestOptions.DEFAULT);
+//    }
+//
+//    /**
+//     * 判断指定的索引名是否存在
+//     *
+//     * @param indexName 索引名
+//     * @return 存在:true; 不存在:false;
+//     */
+//    public boolean isExistsIndex(String indexName) throws IOException {
+//
+//        GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
+//        boolean flag = client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
+//        return flag;
+//    }
+
+    public void insertEs(List<ConsumerRecord<?, ?>> records) {
+        BulkRequest bulkRequest = new BulkRequest();
+        records.forEach(item -> {
+            Optional<?> message = Optional.ofNullable(item.value());
+            if (message.isPresent()) {
+                Object msg = message.get();
+                JSONObject map = JSONObject.parseObject(msg.toString());
+                JSONArray jsonArray = JSONObject.parseObject(map.getString("faceLibRecogList"),JSONArray.class);
+                if (jsonArray.size()>0){
+                    for (int i = 0; i < jsonArray.size(); i++) {
+                        JSONObject job = jsonArray.getJSONObject(i);
+                        Map<String, Object> maps = Collections.synchronizedMap(new HashMap<>());
+                        String id = job.getString("traceUuid")+job.getString("traceIdx");
+                        maps.put("traceUuid", job.getString("traceUuid"));
+                        maps.put("traceIdx", job.getString("traceIdx"));
+                        maps.put("cameraIndexCode", job.getString("cameraIndexCode"));
+                        maps.put("captureTime", job.getString("captureTime"));
+                        maps.put("deviceIndexCode", job.getString("deviceIndexCode"));
+                        maps.put("deviceName", job.getString("deviceName"));
+                        String bkgUrl = job.getString("bkgUrl");
+                        if (null != bkgUrl && !bkgUrl.equals("")){
+                            maps.put("bkgUrl", httpsDownloadUtils.downloadFileHttps(bkgUrl));
+                        }
+                        maps.put("bornTime", job.getString("bornTime"));
+                        maps.put("name", job.getString("name"));
+                        maps.put("certificateNumber", job.getString("certificateNumber"));
+                        String facePicUrl = job.getString("facePicUrl");
+                        if (null != facePicUrl && !facePicUrl.equals("")){
+                            maps.put("facePicUrl", httpsDownloadUtils.downloadFileHttps(facePicUrl));
+                        }
+                        maps.put("faceRect", job.getString("faceRect"));
+                        maps.put("linkFaceBodyId", job.getString("linkFaceBodyId"));
+                        maps.put("gender", job.getString("gender"));
+                        maps.put("genderName", job.getString("genderName"));
+                        maps.put("glass", job.getString("glass"));
+                        maps.put("glassName", job.getString("glassName"));
+                        maps.put("latitude", job.getString("latitude"));
+                        maps.put("longitude", job.getString("longitude"));
+                        maps.put("linkFaceVehicleId", job.getString("linkFaceVehicleId"));
+                        maps.put("modelData", job.getString("modelData"));
+                        maps.put("plateNo", job.getString("plateNo"));
+                        maps.put("humanId", job.getString("humanId"));
+                        maps.put("smile", job.getString("smile"));
+                        maps.put("smileName", job.getString("smileName"));
+                        maps.put("age", job.getString("age"));
+                        maps.put("ageGroup", job.getString("ageGroup"));
+                        maps.put("ageGroupName", job.getString("ageGroupName"));
+                        maps.put("rect", job.getString("rect"));
+                        maps.put("address", job.getString("address"));
+                        bulkRequest.add(new IndexRequest(INDEX_SEAT_HIK_CAPTURE_FACE_TRACKS).source(maps).id(id));
+                    }
+                }
+            }
+        });
+        try {
+            BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+}

+ 167 - 0
src/main/java/cn/com/taiji/esshipservice/listener/hlx/fusion/DynamicCaptureShipTrackConsumer.java

@@ -0,0 +1,167 @@
+package cn.com.taiji.esshipservice.listener.hlx.fusion;
+
+import cn.com.taiji.esshipservice.utils.HttpsDownloadUtils;
+import com.alibaba.fastjson.JSONObject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.TopicPartition;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static cn.com.taiji.esshipservice.constant.ShipHisTrackIndexConstants.INDEX_SEAT_HIK_CAPTURE_SHIP_TRACKS;
+
+
+//海康
+
+/**
+ * @author xhl
+ */
+@Component
+@Slf4j
+public class DynamicCaptureShipTrackConsumer {
+
+    @Autowired
+    private RestHighLevelClient client;
+
+    @Autowired
+    private HttpsDownloadUtils httpsDownloadUtils;
+
+    ExecutorService threadPool = new ThreadPoolExecutor( 10,25,10L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
+
+    /**
+     * 海康船舶抓拍数据-「分区」消费
+     * es场景
+     */
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-ship.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-ship.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-ship.partitions0}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack0(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+    }
+
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-ship.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-ship.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-ship.partitions1}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack1(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+
+    }
+
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-ship.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-ship.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-ship.partitions2}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack2(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+    }
+
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-ship.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-ship.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-ship.partitions3}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack3(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+    }
+
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.hik-capture-ship.group}",
+            topicPartitions = {
+                    @TopicPartition(
+                            topic = "${taiji.kafka.consumer.hik-capture-ship.topic}",
+                            partitions = {"#{'${taiji.kafka.consumer.hik-capture-ship.partitions4}'.split(',')}"}
+                    )
+            }
+    )
+    public void dynamicCaptureFaceTrack4(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        threadPool.execute(()-> {
+            insertEs(records);
+        });
+        ack.acknowledge();
+
+    }
+
+    public void insertEs(List<ConsumerRecord<?, ?>> records) {
+        BulkRequest bulkRequest = new BulkRequest();
+        records.forEach(item -> {
+            Optional<?> message = Optional.ofNullable(item.value());
+            if (message.isPresent()) {
+                Object msg = message.get();
+                JSONObject map = JSONObject.parseObject(msg.toString());
+                Map<String, Object> maps = Collections.synchronizedMap(new HashMap<>());
+                maps.put("sourceType", map.getString("sourceType"));
+                maps.put("indexCode", map.getString("indexCode"));
+                maps.put("captureTime", map.getString("captureTime"));
+                maps.put("targetName", map.getString("targetName"));
+                maps.put("targetShipClassify", map.getString("targetShipClassify"));
+                maps.put("targetShipType", map.getString("targetShipType"));
+                maps.put("targetImageUrl", map.getString("targetImageUrl"));
+                maps.put("targetConfidence", map.getString("targetConfidence"));
+                maps.put("targetLength", map.getString("targetLength"));
+                maps.put("targetWidth", map.getString("targetWidth"));
+                maps.put("targetSpeed", map.getString("targetSpeed"));
+                maps.put("targetPicRect", map.getString("targetPicRect"));
+                maps.put("targetDirection", map.getString("targetDirection"));
+                maps.put("targetImages", map.getString("targetImages"));
+                maps.put("hik", map.getString("hik"));
+                bulkRequest.add(new IndexRequest(INDEX_SEAT_HIK_CAPTURE_SHIP_TRACKS).source(maps));
+            }
+        });
+        try {
+            BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+}

+ 256 - 256
src/main/java/cn/com/taiji/esshipservice/listener/hlx/fusion/EsHkImagesDumpS3.java

@@ -1,258 +1,258 @@
-package cn.com.taiji.esshipservice.listener.hlx.fusion;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import cn.com.taiji.esshipservice.utils.HttpsDownloadUtils;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.http.client.config.RequestConfig;
-import org.elasticsearch.action.DocWriteResponse;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteResponse;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.action.update.UpdateResponse;
-import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static cn.com.taiji.esshipservice.constant.ShipHisTrackIndexConstants.*;
-
-/**
- * @author xhl
- * @date 2023/2/24
- */
-@Component
-@Slf4j
-public class EsHkImagesDumpS3 {
-
-    @Autowired
-    private RestHighLevelClient client;
-
-    private static final RequestOptions COMMON_OPTIONS;
-
-    @Autowired
-    private HttpsDownloadUtils httpsDownloadUtils;
-
-    static {
-        RequestConfig requestConfig = RequestConfig.custom()
-                .setConnectTimeout(5000)
-                .setSocketTimeout(6000)
-                .build();
-        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
-        builder.setRequestConfig(requestConfig).setHttpAsyncResponseConsumerFactory(
-                // 设置查询内容大小限制,默认100 * 1024 * 1024
-                new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(5*1024 * 1024 * 1024)
-        );
-        COMMON_OPTIONS = builder.build();
-    }
-
-//    ExecutorService threadPool = new ThreadPoolExecutor( 15,35,10L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
-
-    @Scheduled(fixedRate=60*60*1000)
-    public void scheduled() throws IOException {
-        Calendar calendar = Calendar.getInstance();
-        calendar.set(Calendar.HOUR_OF_DAY, calendar.get(Calendar.HOUR_OF_DAY) -2);
-        Date time1 = calendar.getTime();
-        Calendar calendar2 = Calendar.getInstance();
-        calendar2.set(Calendar.HOUR_OF_DAY, calendar2.get(Calendar.HOUR_OF_DAY) -1);
-        Date time2 = calendar2.getTime();
-        SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
-        String startTime = sdf1.format(time1);
-        String endTime = sdf1.format(time2);
-        imagesDumpS3Car(startTime,endTime);
-        imagesDumpS3Face(startTime,endTime);
-    }
-
-    public void imagesDumpS3Car(String startTime,String endTime) throws IOException {
-        //设置查询索引
-        SearchRequest searchRequest = new SearchRequest(INDEX_SEAT_HIK_CAPTURE_CAR_TRACKS);
-        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
-        BoolQueryBuilder boolQueryBuilder= QueryBuilders.boolQuery();
-        boolQueryBuilder.filter(QueryBuilders.rangeQuery("captureTime").gte(startTime).lte(endTime));
-        searchSourceBuilder.query(boolQueryBuilder);
-        searchSourceBuilder
-                .trackTotalHits(true)
-                .size(1000000);
-//                .timeout(TimeValue.timeValueHours(1L))
-//                .timeout(TimeValue.timeValueMinutes(30L))
-//                .timeout(TimeValue.timeValueSeconds(500L));
-        searchRequest.source(searchSourceBuilder);
-        SearchResponse search = null;
-        try {
-            search = client.search(searchRequest, COMMON_OPTIONS);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        SearchHits searchHits = search.getHits();
-        SearchHit[] hits = searchHits.getHits();
-        BulkRequest bulkRequest = new BulkRequest();
-        Integer num = 0;
-        for (SearchHit hit : hits) {
-            Map<String, Object> map = hit.getSourceAsMap();
-            Map<String, Object> maps = new HashMap<>();
-            Object plateImagePath = map.get("plateImagePath");
-            String s3plateImagePath = null;
-            if (null != plateImagePath   && !plateImagePath.equals("")){
-//                Boolean num = false;
-//                StringTokenizer tokenizer = new StringTokenizer(plateImagePath.toString(),"/");
-//                /** 判断枚举对象中是否还有数据 **/
-//                while (tokenizer.hasMoreElements()){
-//                    /** 返回从当前位置到下一个分隔符的字符串 **/
-//                    if (tokenizer.nextToken().equals("74.10.28.62:81")){
-//                        num =true;
-//                        return;
-//                    }
+//package cn.com.taiji.esshipservice.listener.hlx.fusion;
+//
+//import com.alibaba.fastjson.JSON;
+//import com.alibaba.fastjson.JSONArray;
+//import cn.com.taiji.esshipservice.utils.HttpsDownloadUtils;
+//import lombok.extern.slf4j.Slf4j;
+//import org.apache.http.client.config.RequestConfig;
+//import org.elasticsearch.action.DocWriteResponse;
+//import org.elasticsearch.action.bulk.BulkItemResponse;
+//import org.elasticsearch.action.bulk.BulkRequest;
+//import org.elasticsearch.action.bulk.BulkResponse;
+//import org.elasticsearch.action.delete.DeleteResponse;
+//import org.elasticsearch.action.index.IndexResponse;
+//import org.elasticsearch.action.search.SearchRequest;
+//import org.elasticsearch.action.search.SearchResponse;
+//import org.elasticsearch.action.update.UpdateRequest;
+//import org.elasticsearch.action.update.UpdateResponse;
+//import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
+//import org.elasticsearch.client.RequestOptions;
+//import org.elasticsearch.client.RestHighLevelClient;
+//import org.elasticsearch.core.TimeValue;
+//import org.elasticsearch.index.query.BoolQueryBuilder;
+//import org.elasticsearch.index.query.QueryBuilders;
+//import org.elasticsearch.search.SearchHit;
+//import org.elasticsearch.search.SearchHits;
+//import org.elasticsearch.search.builder.SearchSourceBuilder;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.scheduling.annotation.Scheduled;
+//import org.springframework.stereotype.Component;
+//
+//import java.io.IOException;
+//import java.text.SimpleDateFormat;
+//import java.util.*;
+//import java.util.concurrent.*;
+//
+//import static cn.com.taiji.esshipservice.constant.ShipHisTrackIndexConstants.*;
+//
+///**
+// * @author xhl
+// * @date 2023/2/24
+// */
+//@Component
+//@Slf4j
+//public class EsHkImagesDumpS3 {
+//
+//    @Autowired
+//    private RestHighLevelClient client;
+//
+//    private static final RequestOptions COMMON_OPTIONS;
+//
+//    @Autowired
+//    private HttpsDownloadUtils httpsDownloadUtils;
+//
+//    static {
+//        RequestConfig requestConfig = RequestConfig.custom()
+//                .setConnectTimeout(5000)
+//                .setSocketTimeout(6000)
+//                .build();
+//        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
+//        builder.setRequestConfig(requestConfig).setHttpAsyncResponseConsumerFactory(
+//                // 设置查询内容大小限制,默认100 * 1024 * 1024
+//                new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(5*1024 * 1024 * 1024)
+//        );
+//        COMMON_OPTIONS = builder.build();
+//    }
+//
+////    ExecutorService threadPool = new ThreadPoolExecutor( 15,35,10L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
+//
+////    @Scheduled(fixedRate=60*60*1000)
+//    public void scheduled() throws IOException {
+//        Calendar calendar = Calendar.getInstance();
+//        calendar.set(Calendar.HOUR_OF_DAY, calendar.get(Calendar.HOUR_OF_DAY) -2);
+//        Date time1 = calendar.getTime();
+//        Calendar calendar2 = Calendar.getInstance();
+//        calendar2.set(Calendar.HOUR_OF_DAY, calendar2.get(Calendar.HOUR_OF_DAY) -1);
+//        Date time2 = calendar2.getTime();
+//        SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
+//        String startTime = sdf1.format(time1);
+//        String endTime = sdf1.format(time2);
+//        imagesDumpS3Car(startTime,endTime);
+//        imagesDumpS3Face(startTime,endTime);
+//    }
+//
+//    public void imagesDumpS3Car(String startTime,String endTime) throws IOException {
+//        //设置查询索引
+//        SearchRequest searchRequest = new SearchRequest(INDEX_SEAT_HIK_CAPTURE_CAR_TRACKS);
+//        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+//        BoolQueryBuilder boolQueryBuilder= QueryBuilders.boolQuery();
+//        boolQueryBuilder.filter(QueryBuilders.rangeQuery("captureTime").gte(startTime).lte(endTime));
+//        searchSourceBuilder.query(boolQueryBuilder);
+//        searchSourceBuilder
+//                .trackTotalHits(true)
+//                .size(1000000);
+////                .timeout(TimeValue.timeValueHours(1L))
+////                .timeout(TimeValue.timeValueMinutes(30L))
+////                .timeout(TimeValue.timeValueSeconds(500L));
+//        searchRequest.source(searchSourceBuilder);
+//        SearchResponse search = null;
+//        try {
+//            search = client.search(searchRequest, COMMON_OPTIONS);
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//        }
+//        SearchHits searchHits = search.getHits();
+//        SearchHit[] hits = searchHits.getHits();
+//        BulkRequest bulkRequest = new BulkRequest();
+//        Integer num = 0;
+//        for (SearchHit hit : hits) {
+//            Map<String, Object> map = hit.getSourceAsMap();
+//            Map<String, Object> maps = new HashMap<>();
+//            Object plateImagePath = map.get("plateImagePath");
+//            String s3plateImagePath = null;
+//            if (null != plateImagePath   && !plateImagePath.equals("")){
+////                Boolean num = false;
+////                StringTokenizer tokenizer = new StringTokenizer(plateImagePath.toString(),"/");
+////                /** 判断枚举对象中是否还有数据 **/
+////                while (tokenizer.hasMoreElements()){
+////                    /** 返回从当前位置到下一个分隔符的字符串 **/
+////                    if (tokenizer.nextToken().equals("74.10.28.62:81")){
+////                        num =true;
+////                        return;
+////                    }
+////                }
+////                if (num == false) {
+//                s3plateImagePath = httpsDownloadUtils.downloadFileHttps(plateImagePath.toString());
+//                maps.put("plateImagePath", s3plateImagePath);
+////                }
+//            }
+//            String s3targetPicUrl = null;
+//            Object targetPicUrl = map.get("targetPicUrl");
+//            if (null != targetPicUrl  && !targetPicUrl.equals("")){
+////                Boolean num = false;
+////                StringTokenizer tokenizer = new StringTokenizer(targetPicUrl.toString(),"/");
+////                /** 判断枚举对象中是否还有数据 **/
+////                while (tokenizer.hasMoreElements()){
+////                    /** 返回从当前位置到下一个分隔符的字符串 **/
+////                    if (tokenizer.nextToken().equals("74.10.28.62:81")){
+////                        num =true;
+////                        return;
+////                    }
+////                }
+////                if (num == false) {
+//                s3targetPicUrl = httpsDownloadUtils.downloadFileHttps(targetPicUrl.toString());
+//                maps.put("targetPicUrl", s3targetPicUrl);
+////                }
+//
+//            }
+//            if (!maps.isEmpty()) {
+//                bulkRequest.add(new UpdateRequest(INDEX_SEAT_HIK_CAPTURE_CAR_TRACKS, hit.getId()).doc(maps));
+//                num++;
+//            }
+//            if (num == 100){
+//                try {
+//                    BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
+//                    num = 0;
+//                    bulkRequest.requests().clear();
+//                } catch (IOException e) {
+//                    e.printStackTrace();
 //                }
-//                if (num == false) {
-                s3plateImagePath = httpsDownloadUtils.downloadFileHttps(plateImagePath.toString());
-                maps.put("plateImagePath", s3plateImagePath);
+//            }
+//        }
+//        try {
+//            if (bulkRequest.requests().size()>0){
+//                BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
+//            }
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//        }
+//    }
+//
+//    public void imagesDumpS3Face(String startTime,String endTime) throws IOException {
+//        //设置查询索引
+//        SearchRequest searchRequest = new SearchRequest(INDEX_SEAT_HIK_CAPTURE_FACE_TRACKS);
+//        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+//        BoolQueryBuilder boolQueryBuilder= QueryBuilders.boolQuery();
+//        boolQueryBuilder.filter(QueryBuilders.rangeQuery("captureTime").gte(startTime).lte(endTime));
+//        searchSourceBuilder.query(boolQueryBuilder);
+//        searchSourceBuilder
+//                .trackTotalHits(true)
+//                .size(1000000);
+////                .timeout(TimeValue.timeValueHours(1L))
+////                .timeout(TimeValue.timeValueMinutes(30L))
+////                .timeout(TimeValue.timeValueSeconds(500L));
+//        searchRequest.source(searchSourceBuilder);
+//        SearchResponse search = null;
+//        try {
+//            search = client.search(searchRequest, COMMON_OPTIONS);
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//        }
+//        SearchHits searchHits = search.getHits();
+//        SearchHit[] hits = searchHits.getHits();
+//        BulkRequest bulkRequest = new BulkRequest();
+//        Integer num = 0;
+//        for (SearchHit hit : hits) {
+//            Map<String, Object> map = hit.getSourceAsMap();
+//            Map<String, Object> maps = new HashMap<>();
+//            String s3bkgUrl = null;
+//            Object bkgUrl = map.get("bkgUrl");
+//            if (null != bkgUrl && !bkgUrl.equals("")){
+////                Boolean num = false;
+////                StringTokenizer tokenizer = new StringTokenizer(bkgUrl.toString(),"/");
+////                /** 判断枚举对象中是否还有数据 **/
+////                while (tokenizer.hasMoreElements()){
+////                    /** 返回从当前位置到下一个分隔符的字符串 **/
+////                    if (tokenizer.nextToken().equals("74.10.28.62:81")){
+////                        num =true;
+////                        return;
+////                    }
+////                }
+////                if (num == false){
+//                s3bkgUrl = httpsDownloadUtils.downloadFileHttps(bkgUrl.toString());
+//                maps.put("bkgUrl", s3bkgUrl);
+////                }
+//            }
+//            String s3facePicUrl = null;
+//            Object facePicUrl = map.get("facePicUrl");
+//            if (null != facePicUrl && !facePicUrl.equals("")){
+////                Boolean num = false;
+////                StringTokenizer tokenizer = new StringTokenizer(facePicUrl.toString(),"/");
+////                /** 判断枚举对象中是否还有数据 **/
+////                while (tokenizer.hasMoreElements()){
+////                    /** 返回从当前位置到下一个分隔符的字符串 **/
+////                    if (tokenizer.nextToken().equals("74.10.28.62:81")){
+////                        num =true;
+////                        return;
+////                    }
+////                }
+////                if (num == false){
+//                s3facePicUrl = httpsDownloadUtils.downloadFileHttps(facePicUrl.toString());
+//                maps.put("facePicUrl", s3facePicUrl);
+////                }
+//            }
+//            if (!maps.isEmpty()) {
+//                bulkRequest.add(new UpdateRequest(INDEX_SEAT_HIK_CAPTURE_FACE_TRACKS, hit.getId()).doc(maps));
+//                num++;
+//            }
+//            if (num == 100){
+//                try {
+//                    BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
+//                    num = 0;
+//                    bulkRequest.requests().clear();
+//                } catch (IOException e) {
+//                    e.printStackTrace();
 //                }
-            }
-            String s3targetPicUrl = null;
-            Object targetPicUrl = map.get("targetPicUrl");
-            if (null != targetPicUrl  && !targetPicUrl.equals("")){
-//                Boolean num = false;
-//                StringTokenizer tokenizer = new StringTokenizer(targetPicUrl.toString(),"/");
-//                /** 判断枚举对象中是否还有数据 **/
-//                while (tokenizer.hasMoreElements()){
-//                    /** 返回从当前位置到下一个分隔符的字符串 **/
-//                    if (tokenizer.nextToken().equals("74.10.28.62:81")){
-//                        num =true;
-//                        return;
-//                    }
-//                }
-//                if (num == false) {
-                s3targetPicUrl = httpsDownloadUtils.downloadFileHttps(targetPicUrl.toString());
-                maps.put("targetPicUrl", s3targetPicUrl);
-//                }
-
-            }
-            if (!maps.isEmpty()) {
-                bulkRequest.add(new UpdateRequest(INDEX_SEAT_HIK_CAPTURE_CAR_TRACKS, hit.getId()).doc(maps));
-                num++;
-            }
-            if (num == 100){
-                try {
-                    BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
-                    num = 0;
-                    bulkRequest.requests().clear();
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-        try {
-            if (bulkRequest.requests().size()>0){
-                BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    public void imagesDumpS3Face(String startTime,String endTime) throws IOException {
-        //设置查询索引
-        SearchRequest searchRequest = new SearchRequest(INDEX_SEAT_HIK_CAPTURE_FACE_TRACKS);
-        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
-        BoolQueryBuilder boolQueryBuilder= QueryBuilders.boolQuery();
-        boolQueryBuilder.filter(QueryBuilders.rangeQuery("captureTime").gte(startTime).lte(endTime));
-        searchSourceBuilder.query(boolQueryBuilder);
-        searchSourceBuilder
-                .trackTotalHits(true)
-                .size(1000000);
-//                .timeout(TimeValue.timeValueHours(1L))
-//                .timeout(TimeValue.timeValueMinutes(30L))
-//                .timeout(TimeValue.timeValueSeconds(500L));
-        searchRequest.source(searchSourceBuilder);
-        SearchResponse search = null;
-        try {
-            search = client.search(searchRequest, COMMON_OPTIONS);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        SearchHits searchHits = search.getHits();
-        SearchHit[] hits = searchHits.getHits();
-        BulkRequest bulkRequest = new BulkRequest();
-        Integer num = 0;
-        for (SearchHit hit : hits) {
-            Map<String, Object> map = hit.getSourceAsMap();
-            Map<String, Object> maps = new HashMap<>();
-            String s3bkgUrl = null;
-            Object bkgUrl = map.get("bkgUrl");
-            if (null != bkgUrl && !bkgUrl.equals("")){
-//                Boolean num = false;
-//                StringTokenizer tokenizer = new StringTokenizer(bkgUrl.toString(),"/");
-//                /** 判断枚举对象中是否还有数据 **/
-//                while (tokenizer.hasMoreElements()){
-//                    /** 返回从当前位置到下一个分隔符的字符串 **/
-//                    if (tokenizer.nextToken().equals("74.10.28.62:81")){
-//                        num =true;
-//                        return;
-//                    }
-//                }
-//                if (num == false){
-                s3bkgUrl = httpsDownloadUtils.downloadFileHttps(bkgUrl.toString());
-                maps.put("bkgUrl", s3bkgUrl);
-//                }
-            }
-            String s3facePicUrl = null;
-            Object facePicUrl = map.get("facePicUrl");
-            if (null != facePicUrl && !facePicUrl.equals("")){
-//                Boolean num = false;
-//                StringTokenizer tokenizer = new StringTokenizer(facePicUrl.toString(),"/");
-//                /** 判断枚举对象中是否还有数据 **/
-//                while (tokenizer.hasMoreElements()){
-//                    /** 返回从当前位置到下一个分隔符的字符串 **/
-//                    if (tokenizer.nextToken().equals("74.10.28.62:81")){
-//                        num =true;
-//                        return;
-//                    }
-//                }
-//                if (num == false){
-                s3facePicUrl = httpsDownloadUtils.downloadFileHttps(facePicUrl.toString());
-                maps.put("facePicUrl", s3facePicUrl);
-//                }
-            }
-            if (!maps.isEmpty()) {
-                bulkRequest.add(new UpdateRequest(INDEX_SEAT_HIK_CAPTURE_FACE_TRACKS, hit.getId()).doc(maps));
-                num++;
-            }
-            if (num == 100){
-                try {
-                    BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
-                    num = 0;
-                    bulkRequest.requests().clear();
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-        try {
-            if (bulkRequest.requests().size()>0){
-                BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-}
-
+//            }
+//        }
+//        try {
+//            if (bulkRequest.requests().size()>0){
+//                BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
+//            }
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//        }
+//    }
+//
+//}
+//

+ 6 - 4
src/main/java/cn/com/taiji/esshipservice/utils/HttpsDownloadUtils.java

@@ -4,6 +4,7 @@ import com.bingocloud.services.s3.AmazonS3Client;
 import com.bingocloud.services.s3.model.*;
 import io.daas.bingocloud.client.DaasioClient;
 import io.daas.bingocloud.store.MinioFileSystem;
+import lombok.Synchronized;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
@@ -36,7 +37,7 @@ public class HttpsDownloadUtils extends MinioFileSystem{
      * @param fileUrl   https 远程路径
      * @throws Exception
      */
-    public String downloadFileHttps(String fileUrl) {
+    public synchronized String downloadFileHttps(String fileUrl) {
         String key = "";
         String data = "";
         AmazonS3Client amazonS3Client = null;
@@ -58,14 +59,15 @@ public class HttpsDownloadUtils extends MinioFileSystem{
                 PutObjectResult putObjectResult = amazonS3Client.putObject(new PutObjectRequest(buckets.get(2).getName(), preFix + key + postFix, inputStream, new ObjectMetadata()));
                 copyAclFromParent(buckets.get(2).getName(),preFix + key + postFix);
                 data = httpUrl + preFix + key +postFix;
+                amazonS3Client.shutdown();
                 return data;
             }else {
-                throw new RuntimeException("文件读取失败");
+//                throw new RuntimeException("文件读取失败");
+                log.info("文件读取失败");
+                return null;
             }
         }catch (Exception e){
             e.printStackTrace();
-        }finally {
-            amazonS3Client.shutdown();
         }
         return null;
     }

BIN
target/EsHkImagesDumpS3.jar


BIN
target/EsHkImagesDumpS3.jar.original


BIN
target/classes/cn/com/taiji/esshipservice/listener/hlx/fusion/DynamicCaptureCarTrackConsumer.class


BIN
target/classes/cn/com/taiji/esshipservice/listener/hlx/fusion/DynamicCaptureFaceTrackConsumer.class


BIN
target/classes/cn/com/taiji/esshipservice/listener/hlx/fusion/DynamicCaptureShipTrackConsumer.class


BIN
target/classes/cn/com/taiji/esshipservice/listener/hlx/fusion/EsHkImagesDumpS3.class


BIN
target/classes/cn/com/taiji/esshipservice/utils/HttpsDownloadUtils$1.class


BIN
target/classes/cn/com/taiji/esshipservice/utils/HttpsDownloadUtils$X509TrustUtiil.class


BIN
target/classes/cn/com/taiji/esshipservice/utils/HttpsDownloadUtils.class


+ 20 - 18
target/maven-status/maven-compiler-plugin/compile/default-compile/createdFiles.lst

@@ -1,43 +1,45 @@
-cn\com\taiji\esshipservice\domain\HainanAreaOut.class
-cn\com\taiji\esshipservice\domain\dto\TrackQueryPolygonInputDTO.class
+cn\com\taiji\esshipservice\listener\hlx\fusion\DynamicCaptureShipTrackConsumer.class
 cn\com\taiji\esshipservice\utils\JsonParser.class
 cn\com\taiji\esshipservice\domain\SmallRadar.class
 cn\com\taiji\esshipservice\enums\HisQueryCmdEnum.class
 cn\com\taiji\esshipservice\utils\DateUtils.class
-cn\com\taiji\esshipservice\domain\CbgkShip$CbgkShipBuilder.class
 cn\com\taiji\esshipservice\constant\ShipHisTrackIndexConstants.class
-cn\com\taiji\esshipservice\constant\BeiDouIdConstants.class
-cn\com\taiji\esshipservice\domain\dto\FuseShipTrack.class
+cn\com\taiji\esshipservice\listener\hlx\fusion\DynamicCaptureFaceTrackConsumer.class
 cn\com\taiji\esshipservice\domain\dto\HisTrackQueryDto.class
 cn\com\taiji\esshipservice\config\BingoFileConfig.class
-cn\com\taiji\esshipservice\utils\HttpsDownloadUtils$X509TrustUtiil.class
 cn\com\taiji\esshipservice\domain\dto\TempTrack.class
-cn\com\taiji\esshipservice\domain\CbgkShip.class
 cn\com\taiji\esshipservice\domain\BeiDouDTO.class
-cn\com\taiji\esshipservice\AxShipTrackEsServiceApplication.class
 cn\com\taiji\esshipservice\domain\dto\HistoryTrack.class
-cn\com\taiji\esshipservice\domain\ShipTrackHistoryDTO.class
-cn\com\taiji\esshipservice\domain\dto\TrackQueryInputDTO.class
 cn\com\taiji\esshipservice\domain\YzPortShip.class
-cn\com\taiji\esshipservice\domain\TestQuery.class
 cn\com\taiji\esshipservice\config\DassIoClientConfig.class
 cn\com\taiji\esshipservice\domain\HlxRadar.class
-cn\com\taiji\esshipservice\utils\HttpsDownloadUtils.class
-cn\com\taiji\esshipservice\domain\dto\CommonHistoryTrack.class
-cn\com\taiji\esshipservice\listener\hlx\fusion\EsHkImagesDumpS3.class
-cn\com\taiji\esshipservice\domain\ImportShipTable.class
 cn\com\taiji\esshipservice\config\EsClientConfiguration.class
 cn\com\taiji\esshipservice\domain\ImportShipTable2.class
-cn\com\taiji\esshipservice\utils\HttpsDownloadUtils$1.class
 cn\com\taiji\esshipservice\domain\dto\BaseTrackQueryInputDTO.class
 cn\com\taiji\esshipservice\domain\dto\EsHistoryTrack.class
 cn\com\taiji\esshipservice\exception\CustomException.class
 cn\com\taiji\esshipservice\domain\dto\TrackQueryCircleInputDTO.class
 cn\com\taiji\esshipservice\domain\ANotOnline.class
-cn\com\taiji\esshipservice\utils\MapPoint.class
 cn\com\taiji\esshipservice\domain\dto\TrackQueryShipInputDTO.class
+cn\com\taiji\esshipservice\domain\Beidou.class
+cn\com\taiji\esshipservice\domain\HainanAreaOut.class
+cn\com\taiji\esshipservice\domain\dto\TrackQueryPolygonInputDTO.class
+cn\com\taiji\esshipservice\domain\CbgkShip$CbgkShipBuilder.class
+cn\com\taiji\esshipservice\listener\hlx\fusion\DynamicCaptureCarTrackConsumer.class
+cn\com\taiji\esshipservice\constant\BeiDouIdConstants.class
+cn\com\taiji\esshipservice\domain\dto\FuseShipTrack.class
+cn\com\taiji\esshipservice\utils\HttpsDownloadUtils$X509TrustUtiil.class
+cn\com\taiji\esshipservice\domain\CbgkShip.class
+cn\com\taiji\esshipservice\AxShipTrackEsServiceApplication.class
+cn\com\taiji\esshipservice\domain\ShipTrackHistoryDTO.class
+cn\com\taiji\esshipservice\domain\dto\TrackQueryInputDTO.class
+cn\com\taiji\esshipservice\domain\TestQuery.class
+cn\com\taiji\esshipservice\utils\HttpsDownloadUtils.class
+cn\com\taiji\esshipservice\domain\dto\CommonHistoryTrack.class
+cn\com\taiji\esshipservice\domain\ImportShipTable.class
+cn\com\taiji\esshipservice\utils\HttpsDownloadUtils$1.class
+cn\com\taiji\esshipservice\utils\MapPoint.class
 cn\com\taiji\esshipservice\domain\EsLocationDict.class
 cn\com\taiji\esshipservice\enums\HisTrackSrcTypeEnum.class
-cn\com\taiji\esshipservice\domain\Beidou.class
 cn\com\taiji\esshipservice\utils\GraphUtils.class
 cn\com\taiji\esshipservice\domain\ApiResult.class

+ 3 - 0
target/maven-status/maven-compiler-plugin/compile/default-compile/inputFiles.lst

@@ -2,6 +2,7 @@ D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\config\EsClientConf
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\enums\HisTrackSrcTypeEnum.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\AxShipTrackEsServiceApplication.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\domain\HainanAreaOut.java
+D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\listener\hlx\fusion\DynamicCaptureFaceTrackConsumer.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\listener\hlx\fusion\EsHkImagesDumpS3.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\config\BingoFileConfig.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\domain\dto\HistoryTrack.java
@@ -19,6 +20,7 @@ D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\utils\GraphUtils.ja
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\domain\dto\FuseShipTrack.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\domain\HlxRadar.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\domain\YzPortShip.java
+D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\listener\hlx\fusion\DynamicCaptureShipTrackConsumer.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\constant\BeiDouIdConstants.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\domain\dto\TrackQueryInputDTO.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\domain\CbgkShip.java
@@ -35,6 +37,7 @@ D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\domain\TestQuery.ja
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\domain\ApiResult.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\exception\CustomException.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\domain\BeiDouDTO.java
+D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\listener\hlx\fusion\DynamicCaptureCarTrackConsumer.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\domain\SmallRadar.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\domain\dto\CommonHistoryTrack.java
 D:\EsHkImagesDumpS3\src\main\java\cn\com\taiji\esshipservice\utils\HttpsDownloadUtils.java