Browse Source

修改数据源

liangjianf 2 years ago
parent
commit
c4f14f495f

+ 4 - 1
beidou-track-consumer/pom.xml

@@ -9,7 +9,6 @@
     </parent>
     <groupId>cn.com.taiji</groupId>
     <artifactId>beidou-track-consumer</artifactId>
-    <version>0.0.1-SNAPSHOT</version>
     <name>beidou-track-consumer</name>
     <description>beidou-track-consumer</description>
     <properties>
@@ -21,6 +20,10 @@
             <artifactId>spring-boot-starter</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
             <optional>true</optional>

+ 45 - 0
beidou-track-consumer/src/main/java/cn/com/taiji/beidou/track/consumer/BeidouShipTrackConsumer.java

@@ -0,0 +1,45 @@
+package cn.com.taiji.beidou.track.consumer;
+
+import cn.com.taiji.beidou.track.entity.BeidouTrackEntity;
+import com.alibaba.fastjson.JSONObject;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * @author liangjf
+ */
+@Component
+@Slf4j
+@ConditionalOnProperty(name = "taiji.kafka.consumer.beidou.enable",havingValue = "true")
+public class BeidouShipTrackConsumer {
+
+    @Resource
+    private KafkaTemplate kafkaTemplate;
+    @Value("${taiji.kafka.productor.beidou.topic}")
+    private String kafkaTopic;
+
+    @KafkaListener(
+            containerFactory = "kafkaContainerFactory",
+            topics = {"${taiji.kafka.consumer.beidou.topic}"}
+    )
+    public void dynamicTrack(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        for (ConsumerRecord<?, ?> record : records) {
+            Optional message = Optional.ofNullable(record.value());
+            Object msg = message.get();
+            BeidouTrackEntity entity = JSONObject.parseObject(msg.toString(), BeidouTrackEntity.class);
+            String data = JSONObject.toJSONString(entity);
+            kafkaTemplate.send(kafkaTopic, entity.getDeviceId(),data);
+        }
+        ack.acknowledge();
+    }
+}

+ 28 - 0
beidou-track-consumer/src/main/java/cn/com/taiji/beidou/track/entity/BeidouTrackEntity.java

@@ -0,0 +1,28 @@
+package cn.com.taiji.beidou.track.entity;
+
+import lombok.Data;
+
+/**
+ * @author wudskq
+ */
+@Data
+public class BeidouTrackEntity {
+
+    private Long trackId;
+    private String deviceId;
+    private String shipType;
+    private String workType;
+    private String workWay;
+    private String sendTime;
+    private String locationTime;
+    private String online;
+    private String shipLength;
+    private String shipWidth;
+    private String texture;
+    private String longitude;
+    private String latitude;
+    private String direction;
+    private String speed;
+    private String kwh;
+    private String shipName;
+}

+ 17 - 2
beidou-track-consumer/src/main/resources/application-prod.yml

@@ -1,6 +1,8 @@
+server:
+  port: 8085
 spring:
   kafka:
-    bootstrap-servers: 172.28.19.20:29092
+    bootstrap-servers: 10.112.89.239:9092
     producer:
       # 发生错误后,消息重发的次数。
       retries: 0
@@ -33,4 +35,17 @@ spring:
       #listner负责ack,每调用一次,就立即commit
       ack-mode: manual_immediate
       missing-topics-fatal: false
-      type: batch
+      type: batch
+taiji:
+  geomesa.store.redis.config: redis.url===redis://10.112.89.239:6379###redis.catalog===geomesa###redis.connection.pool.size===100###redis.connection.pool.validate===true###geomesa.stats.enable===false###geomesa.stats.generate===false###geomesa.query.caching===false
+  kafka:
+    productor:
+      beidou:
+        topic: taiji_ax_beidou_dynamic_ship
+    consumer:
+      bootstrap-servers: 172.28.19.20:29092
+      beidou:
+        enable: true
+        expiry: 3600
+        topic: sgAxBeidouTrack
+        group: sgAxBeidouTrack—${random.uuid}