|
@@ -1,261 +1,264 @@
|
|
|
-package cn.com.taiji.esshipservice.listener.hlx.fusion;
|
|
|
-
|
|
|
-import cn.com.taiji.esshipservice.utils.HttpsDownloadUtils;
|
|
|
-import com.alibaba.fastjson.JSONArray;
|
|
|
-import com.alibaba.fastjson.JSONObject;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
-import org.elasticsearch.action.bulk.BulkRequest;
|
|
|
-import org.elasticsearch.action.bulk.BulkResponse;
|
|
|
-import org.elasticsearch.action.index.IndexRequest;
|
|
|
-import org.elasticsearch.client.RequestOptions;
|
|
|
-import org.elasticsearch.client.RestHighLevelClient;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.kafka.annotation.KafkaListener;
|
|
|
-import org.springframework.kafka.annotation.TopicPartition;
|
|
|
-import org.springframework.kafka.support.Acknowledgment;
|
|
|
-import org.springframework.stereotype.Component;
|
|
|
-
|
|
|
-import java.io.IOException;
|
|
|
-import java.util.*;
|
|
|
-import java.util.concurrent.*;
|
|
|
-
|
|
|
-import static cn.com.taiji.esshipservice.constant.ShipHisTrackIndexConstants.INDEX_SEAT_HIK_CAPTURE_CAR_TRACKS;
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- * @author xhl
|
|
|
- */
|
|
|
-@Component
|
|
|
-@Slf4j
|
|
|
-public class DynamicCaptureCarTrackConsumer {
|
|
|
-
|
|
|
- @Autowired
|
|
|
- private RestHighLevelClient client;
|
|
|
-
|
|
|
-
|
|
|
- @Autowired
|
|
|
- private HttpsDownloadUtils httpsDownloadUtils;
|
|
|
-
|
|
|
- ExecutorService threadPool = new ThreadPoolExecutor( 10,25,10L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
|
|
|
-
|
|
|
-
|
|
|
- * 海康车辆抓拍数据-「分区」消费
|
|
|
- * es场景
|
|
|
- */
|
|
|
-
|
|
|
- @KafkaListener(
|
|
|
- groupId = "${taiji.kafka.consumer.hik-capture-car.group}",
|
|
|
- topicPartitions = {
|
|
|
- @TopicPartition(
|
|
|
- topic = "${taiji.kafka.consumer.hik-capture-car.topic}",
|
|
|
- partitions = {"#{'${taiji.kafka.consumer.hik-capture-car.partitions0}'.split(',')}"}
|
|
|
- )
|
|
|
- }
|
|
|
- )
|
|
|
- public void dynamicCaptureFaceTrack0(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
|
|
|
- threadPool.execute(()-> {
|
|
|
- insertEs(records);
|
|
|
- });
|
|
|
- ack.acknowledge();
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- @KafkaListener(
|
|
|
- groupId = "${taiji.kafka.consumer.hik-capture-car.group}",
|
|
|
- topicPartitions = {
|
|
|
- @TopicPartition(
|
|
|
- topic = "${taiji.kafka.consumer.hik-capture-car.topic}",
|
|
|
- partitions = {"#{'${taiji.kafka.consumer.hik-capture-car.partitions1}'.split(',')}"}
|
|
|
- )
|
|
|
- }
|
|
|
- )
|
|
|
- public void dynamicCaptureFaceTrack1(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
|
|
|
- threadPool.execute(()-> {
|
|
|
- insertEs(records);
|
|
|
- });
|
|
|
- ack.acknowledge();
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- @KafkaListener(
|
|
|
- groupId = "${taiji.kafka.consumer.hik-capture-car.group}",
|
|
|
- topicPartitions = {
|
|
|
- @TopicPartition(
|
|
|
- topic = "${taiji.kafka.consumer.hik-capture-car.topic}",
|
|
|
- partitions = {"#{'${taiji.kafka.consumer.hik-capture-car.partitions2}'.split(',')}"}
|
|
|
- )
|
|
|
- }
|
|
|
- )
|
|
|
- public void dynamicCaptureFaceTrack2(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
|
|
|
- threadPool.execute(()-> {
|
|
|
- insertEs(records);
|
|
|
- });
|
|
|
- ack.acknowledge();
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- @KafkaListener(
|
|
|
- groupId = "${taiji.kafka.consumer.hik-capture-car.group}",
|
|
|
- topicPartitions = {
|
|
|
- @TopicPartition(
|
|
|
- topic = "${taiji.kafka.consumer.hik-capture-car.topic}",
|
|
|
- partitions = {"#{'${taiji.kafka.consumer.hik-capture-car.partitions3}'.split(',')}"}
|
|
|
- )
|
|
|
- }
|
|
|
- )
|
|
|
- public void dynamicCaptureFaceTrack3(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
|
|
|
- threadPool.execute(()-> {
|
|
|
- insertEs(records);
|
|
|
- });
|
|
|
- ack.acknowledge();
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- @KafkaListener(
|
|
|
- groupId = "${taiji.kafka.consumer.hik-capture-car.group}",
|
|
|
- topicPartitions = {
|
|
|
- @TopicPartition(
|
|
|
- topic = "${taiji.kafka.consumer.hik-capture-car.topic}",
|
|
|
- partitions = {"#{'${taiji.kafka.consumer.hik-capture-car.partitions4}'.split(',')}"}
|
|
|
- )
|
|
|
- }
|
|
|
- )
|
|
|
- public void dynamicCaptureFaceTrack4(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
|
|
|
- threadPool.execute(()-> {
|
|
|
- insertEs(records);
|
|
|
- });
|
|
|
- ack.acknowledge();
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- public void insertEs(List<ConsumerRecord<?, ?>> records) {
|
|
|
- BulkRequest bulkRequest = new BulkRequest();
|
|
|
- records.forEach(item -> {
|
|
|
- Optional<?> message = Optional.ofNullable(item.value());
|
|
|
- if (message.isPresent()) {
|
|
|
- Object msg = message.get();
|
|
|
- JSONObject map = JSONObject.parseObject(msg.toString());
|
|
|
- JSONArray jsonArray = JSONObject.parseObject(map.getString("vehiclePassRecogList"),JSONArray.class);
|
|
|
- if (jsonArray.size()>0){
|
|
|
- for (int i = 0; i < jsonArray.size(); i++) {
|
|
|
- Map<String, Object> maps = Collections.synchronizedMap(new HashMap<>());
|
|
|
- JSONObject job = jsonArray.getJSONObject(i);
|
|
|
- maps.put("cameraIndexCode", job.getString("cameraIndexCode"));
|
|
|
- maps.put("captureTime", job.getString("captureTime"));
|
|
|
- maps.put("card", job.getString("card"));
|
|
|
- maps.put("cardName", job.getString("cardName"));
|
|
|
- maps.put("cardNum", job.getString("cardNum"));
|
|
|
- maps.put("cardType", job.getString("cardType"));
|
|
|
- maps.put("cardTypeName", job.getString("cardTypeName"));
|
|
|
- maps.put("copilot", job.getString("copilot"));
|
|
|
- maps.put("copilotName", job.getString("copilotName"));
|
|
|
- maps.put("coverPlate", job.getString("coverPlate"));
|
|
|
- maps.put("coverPlateName", job.getString("coverPlateName"));
|
|
|
- maps.put("crossingIndexCode", job.getString("crossingIndexCode"));
|
|
|
- maps.put("dangMark", job.getString("dangMark"));
|
|
|
- maps.put("dangMarkName", job.getString("dangMarkName"));
|
|
|
- maps.put("decoration", job.getString("decoration"));
|
|
|
- maps.put("decorationName", job.getString("decorationName"));
|
|
|
- maps.put("directionIndex", job.getString("directionIndex"));
|
|
|
- maps.put("directionName", job.getString("directionName"));
|
|
|
- maps.put("envproSign", job.getString("envproSign"));
|
|
|
- maps.put("envproSignName", job.getString("envproSignName"));
|
|
|
- maps.put("frontChild", job.getString("frontChild"));
|
|
|
- maps.put("frontChildName", job.getString("frontChildName"));
|
|
|
- maps.put("id", job.getString("id"));
|
|
|
- maps.put("label", job.getString("label"));
|
|
|
- maps.put("labelName", job.getString("labelName"));
|
|
|
- maps.put("labelNum", job.getString("labelNum"));
|
|
|
- maps.put("laneNo", job.getString("laneNo"));
|
|
|
- maps.put("linkWifiVehicleId", job.getString("linkWifiVehicleId"));
|
|
|
- maps.put("linkFaceVehicleId", job.getString("linkFaceVehicleId"));
|
|
|
- maps.put("luggageRack", job.getString("luggageRack"));
|
|
|
- maps.put("luggageRackName", job.getString("luggageRackName"));
|
|
|
- maps.put("mobileDeviceLatitude", job.getString("mobileDeviceLatitude"));
|
|
|
- maps.put("mobileDeviceLongitude", job.getString("mobileDeviceLongitude"));
|
|
|
- maps.put("muckTruck", job.getString("muckTruck"));
|
|
|
- maps.put("muckTruckName", job.getString("muckTruckName"));
|
|
|
- maps.put("pdvs", job.getString("pdvs"));
|
|
|
- maps.put("pdvsName", job.getString("pdvsName"));
|
|
|
- maps.put("pendant", job.getString("pendant"));
|
|
|
- maps.put("pendantName", job.getString("pendantName"));
|
|
|
- maps.put("picUrlNum", job.getString("picUrlNum"));
|
|
|
- maps.put("pilotSafebelt", job.getString("pilotSafebelt"));
|
|
|
- maps.put("pliotSafebeltName", job.getString("pliotSafebeltName"));
|
|
|
- maps.put("pilotSunvisor", job.getString("pilotSunvisor"));
|
|
|
- maps.put("pilotSunvisorName", job.getString("pilotSunvisorName"));
|
|
|
- maps.put("plateColor", job.getString("plateColor"));
|
|
|
- maps.put("plateColorName", job.getString("plateColorName"));
|
|
|
- String plateImagePath = job.getString("plateImagePath");
|
|
|
- if (null != plateImagePath && !plateImagePath.equals("")){
|
|
|
- maps.put("plateImagePath", httpsDownloadUtils.downloadFileHttps(plateImagePath));
|
|
|
- }
|
|
|
- maps.put("plateNo", job.getString("plateNo"));
|
|
|
- maps.put("plateState", job.getString("plateState"));
|
|
|
- maps.put("plateStateName", job.getString("plateStateName"));
|
|
|
- maps.put("plateType", job.getString("plateType"));
|
|
|
- maps.put("plateTypeName", job.getString("plateTypeName"));
|
|
|
- maps.put("rect", job.getString("rect"));
|
|
|
- maps.put("spareTire", job.getString("spareTire"));
|
|
|
- maps.put("spareTireName", job.getString("spareTireName"));
|
|
|
- maps.put("sunroof", job.getString("sunroof"));
|
|
|
- maps.put("sunroofName", job.getString("sunroofName"));
|
|
|
- String targetPicUrl = job.getString("targetPicUrl");
|
|
|
- if (null != targetPicUrl && !targetPicUrl.equals("")){
|
|
|
- maps.put("targetPicUrl", httpsDownloadUtils.downloadFileHttps(targetPicUrl));
|
|
|
- }
|
|
|
- maps.put("tempPlateNo", job.getString("tempPlateNo"));
|
|
|
- maps.put("tempPlateNoName", job.getString("tempPlateNoName"));
|
|
|
- maps.put("tissueBox", job.getString("tissueBox"));
|
|
|
- maps.put("tissueBoxName", job.getString("tissueBoxName"));
|
|
|
- maps.put("tricycleCanopy", job.getString("tricycleCanopy"));
|
|
|
- maps.put("tricycleCanopyName", job.getString("tricycleCanopyName"));
|
|
|
- maps.put("uphone", job.getString("uphone"));
|
|
|
- maps.put("ecolabel", job.getString("ecolabel"));
|
|
|
- maps.put("ecolabelName", job.getString("ecolabelName"));
|
|
|
- maps.put("usePhoneName", job.getString("usePhoneName"));
|
|
|
- maps.put("vehicleColor", job.getString("vehicleColor"));
|
|
|
- maps.put("vehicleColorName", job.getString("vehicleColorName"));
|
|
|
- maps.put("vehicleColorDepth", job.getString("vehicleColorDepth"));
|
|
|
- maps.put("vehicleColorDepthName", job.getString("vehicleColorDepthName"));
|
|
|
- maps.put("vehicleHead", job.getString("vehicleHead"));
|
|
|
- maps.put("vehicleHeadName", job.getString("vehicleHeadName"));
|
|
|
- maps.put("vehicleLogo", job.getString("vehicleLogo"));
|
|
|
- maps.put("vehicleModel", job.getString("vehicleModel"));
|
|
|
- maps.put("vehicleLamp", job.getString("vehicleLamp"));
|
|
|
- maps.put("vehicleLampName", job.getString("vehicleLampName"));
|
|
|
- maps.put("vehicleSpeed", job.getString("vehicleSpeed"));
|
|
|
- maps.put("vehicleSprayPainted", job.getString("vehicleSprayPainted"));
|
|
|
- maps.put("vehicleSprayPaintedName", job.getString("vehicleSprayPaintedName"));
|
|
|
- maps.put("vehicleSubLogo", job.getString("vehicleSubLogo"));
|
|
|
- maps.put("vehicleType", job.getString("vehicleType"));
|
|
|
- maps.put("vehicleTypeName", job.getString("vehicleTypeName"));
|
|
|
- maps.put("vicePilotSafebelt", job.getString("vicePilotSafebelt"));
|
|
|
- maps.put("vicePilotSafebeltName", job.getString("vicePilotSafebeltName"));
|
|
|
- maps.put("vicePilotSunvisor", job.getString("vicePilotSunvisor"));
|
|
|
- maps.put("vicePilotSunvisorName", job.getString("vicePilotSunvisorName"));
|
|
|
- maps.put("imagePath1", job.getString("imagePath1"));
|
|
|
- maps.put("imagePath2", job.getString("imagePath2"));
|
|
|
- maps.put("imagePath3", job.getString("imagePath3"));
|
|
|
- maps.put("imagePath4", job.getString("imagePath4"));
|
|
|
- maps.put("imagePath5", job.getString("imagePath5"));
|
|
|
- maps.put("imagePath6", job.getString("imagePath6"));
|
|
|
- maps.put("direction", job.getString("direction"));
|
|
|
- bulkRequest.add(new IndexRequest(INDEX_SEAT_HIK_CAPTURE_CAR_TRACKS).source(maps));
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- try {
|
|
|
- BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
|
|
|
- } catch (IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
-}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|