Selaa lähdekoodia

向诊断队列发送消息

gxt_xa_000000 4 vuotta sitten
vanhempi
commit
5ab1548571

+ 4 - 0
pom.xml

@@ -130,6 +130,10 @@
130 130
             <artifactId>camel-servlet</artifactId>
131 131
         </dependency>
132 132
         <dependency>
133
+            <groupId>org.apache.camel.springboot</groupId>
134
+            <artifactId>camel-quartz-starter</artifactId>
135
+        </dependency>
136
+        <dependency>
133 137
             <groupId>org.springframework.boot</groupId>
134 138
             <artifactId>spring-boot-starter-test</artifactId>
135 139
             <scope>test</scope>

+ 63 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/DefaultMsgSendFailCallBack.java

@@ -0,0 +1,63 @@
1
+package com.gct.tools.etlcamelhuge.MQ;
2
+
3
+import lombok.extern.slf4j.Slf4j;
4
+
5
+import java.io.File;
6
+import java.io.IOException;
7
+import java.time.LocalDate;
8
+import java.time.format.DateTimeFormatter;
9
+import java.util.logging.FileHandler;
10
+import java.util.logging.Formatter;
11
+import java.util.logging.LogRecord;
12
+import java.util.logging.Logger;
13
+
14
+/**
15
+ * class name: DiagnoseMsgSendFailCallBack
16
+ *
17
+ * @author lloyd
18
+ * @version 1.0
19
+ * @since 2021/7/05 下午7:08
20
+ */
21
+@Slf4j
22
+public class DefaultMsgSendFailCallBack {
23
+
24
+    public static void accept(MessageBody msg, String logFilePath) {
25
+        String msgJson = msg.toJsonBody();
26
+        String fileName = logFilePath + "-" + LocalDate.now().format(DateTimeFormatter.ISO_DATE);
27
+        File file = new File(fileName);
28
+        if (!file.getParentFile().exists())
29
+            if (!file.getParentFile().mkdirs()) {
30
+                log.error("create log file failed,method:mkdirs()");
31
+                return;
32
+            }
33
+        if (!file.exists()) {
34
+            try {
35
+                if (!file.createNewFile()) {
36
+                    log.error("create log file failed,method:createNewFile()");
37
+                    return;
38
+                }
39
+            } catch (IOException ioException) {
40
+                log.error("create log file failed, cause by:{}", ioException.getMessage());
41
+                return;
42
+            }
43
+        }
44
+
45
+        try {
46
+            FileHandler fileHandler = new FileHandler(fileName, true);
47
+            fileHandler.setFormatter(new Formatter() {
48
+                @Override
49
+                public String format(LogRecord logRecord) {
50
+                    return logRecord.getMessage();
51
+                }
52
+            });
53
+            Logger logger = Logger.getLogger(DefaultMsgSendFailCallBack.class.getTypeName());
54
+            logger.setUseParentHandlers(false);
55
+            logger.addHandler(fileHandler);
56
+            logger.info(msgJson + "\n");
57
+        } catch (IOException ioException) {
58
+            log.error("run logger handler failed, cause by:{}", ioException.getMessage());
59
+        }
60
+
61
+
62
+    }
63
+}

+ 23 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/DefaultMsgSendSuccessCallBack.java

@@ -0,0 +1,23 @@
1
+package com.gct.tools.etlcamelhuge.MQ;
2
+
3
+import lombok.extern.slf4j.Slf4j;
4
+import org.apache.rocketmq.client.producer.SendResult;
5
+import org.springframework.stereotype.Component;
6
+
7
+/**
8
+ * class name: DiagnoseMsgSendSuccessCallBack
9
+ *
10
+ * @author lloyd
11
+ * @version 1.0
12
+ * @since 2021/6/24 下午7:23
13
+ */
14
+@Slf4j
15
+@Component("defaultMsgSendSuccessCallBack")
16
+public class DefaultMsgSendSuccessCallBack implements SendSuccessCallBack {
17
+    private static int count = 0;
18
+    @Override
19
+    public void accept(SendResult sendResult) {
20
+        System.out.println("send = "+count++);
21
+        log.info("send msg success,msg:{}", sendResult.getMsgId());
22
+    }
23
+}

+ 41 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/DiagnoseDelayedMessageProducer.java

@@ -0,0 +1,41 @@
1
+package com.gct.tools.etlcamelhuge.MQ;
2
+
3
+import org.apache.rocketmq.client.exception.MQClientException;
4
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
5
+
6
+/**
7
+ * class name: DiagnoseDelayedMessageProducer
8
+ *
9
+ * @author lloyd
10
+ * @version 1.0
11
+ * @since 2021/6/24 下午7:41
12
+ */
13
+public class DiagnoseDelayedMessageProducer implements MessageProducer {
14
+    DefaultMQProducer producer;
15
+
16
+    @Override
17
+    public void destroy() {
18
+        producer.shutdown();
19
+    }
20
+
21
+    @Override
22
+    public void init() {
23
+        try {
24
+            producer.start();
25
+        } catch (MQClientException e) {
26
+            System.out.println("Error");
27
+        }
28
+    }
29
+
30
+    //@Scheduled(zone = "****")
31
+    public void task() {
32
+        /**
33
+         * TODO:
34
+         * 完善延迟重发策略以及统一补偿策略
35
+         * 间隔一段时间将logfile中的统一生产到mq中
36
+         * 默认7天2G(循环擦除1小时)限制,理论上本地空间无限保留时间无限
37
+         */
38
+        //Executors.newScheduledThreadPool(1).schedule()
39
+        //Quartz
40
+    }
41
+}

+ 60 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/DiagnoseMessageProducer.java

@@ -0,0 +1,60 @@
1
+package com.gct.tools.etlcamelhuge.MQ;
2
+
3
+import com.alibaba.fastjson.JSONObject;
4
+import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
5
+import lombok.extern.slf4j.Slf4j;
6
+import org.apache.rocketmq.client.exception.MQClientException;
7
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
8
+import org.apache.rocketmq.common.message.Message;
9
+
10
+import java.nio.charset.StandardCharsets;
11
+
12
+/**
13
+ * class name: DiagnoseMessageProducer
14
+ *
15
+ * @author lloyd
16
+ * @version 1.0
17
+ * @since 2021/6/24 下午6:44
18
+ */
19
+@Slf4j
20
+public class DiagnoseMessageProducer implements MessageProducer {
21
+    private final DefaultMQProducer producer;
22
+    private final GCTRocketMQProperties properties;
23
+
24
+    private final SendFailCallBack failCallBack;
25
+    private final SendSuccessCallBack successCallBack;
26
+
27
+    public DiagnoseMessageProducer(DefaultMQProducer producer, GCTRocketMQProperties properties, SendFailCallBack failCallBack, SendSuccessCallBack successCallBack) {
28
+        this.producer = producer;
29
+        this.properties = properties;
30
+        this.failCallBack = failCallBack;
31
+        this.successCallBack = successCallBack;
32
+    }
33
+
34
+
35
+    @Override
36
+    public void init() {
37
+        try {
38
+            producer.start();
39
+        } catch (MQClientException e) {
40
+            log.error("producer can not start ,cause by:{}", e.getErrorMessage());
41
+        }
42
+    }
43
+
44
+    @Override
45
+    public void send(MessageBody msgBody) {
46
+        MessageProducer.super.send(msgBody);
47
+        if (!(msgBody instanceof DiagnoseMsg)) {
48
+            log.error("Not supported type of msg body:{},and only supported 'DiagnoseMsg'", msgBody.getClass().getTypeName());
49
+            return;
50
+        }
51
+        DiagnoseMsg actualMsg = (DiagnoseMsg) msgBody;
52
+        Message msg = new Message(properties.getTopic(), properties.getTags(), actualMsg.getWellId(), JSONObject.toJSONString(actualMsg).getBytes(StandardCharsets.UTF_8));
53
+        sendDefault(producer, msg, (MessageBody) actualMsg, failCallBack, successCallBack);
54
+    }
55
+
56
+    @Override
57
+    public void destroy() {
58
+        producer.shutdown();
59
+    }
60
+}

+ 18 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/GCTMutableMQProperties.java

@@ -0,0 +1,18 @@
1
+package com.gct.tools.etlcamelhuge.MQ;
2
+
3
+import lombok.Data;
4
+import org.springframework.boot.context.properties.ConfigurationProperties;
5
+
6
+/**
7
+ * class name: GCTMutiableMQProperties
8
+ *
9
+ * @author lloyd
10
+ * @version 1.0
11
+ * @since 2021/7/2 下午2:37
12
+ */
13
+@ConfigurationProperties(prefix = "gct.rocketmq")
14
+@Data
15
+public class GCTMutableMQProperties {
16
+    private GCTRocketMQProperties one;
17
+    private GCTRocketMQProperties two;
18
+}

+ 74 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/GCTRocketMQConfiguration.java

@@ -0,0 +1,74 @@
1
+package com.gct.tools.etlcamelhuge.MQ;
2
+
3
+import org.apache.rocketmq.client.AccessChannel;
4
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
5
+import org.apache.rocketmq.spring.support.RocketMQUtil;
6
+import org.slf4j.Logger;
7
+import org.slf4j.LoggerFactory;
8
+import org.springframework.beans.factory.annotation.Qualifier;
9
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
10
+import org.springframework.context.annotation.Bean;
11
+import org.springframework.context.annotation.Configuration;
12
+import org.springframework.util.Assert;
13
+import org.springframework.util.StringUtils;
14
+
15
+/**
16
+ * class name: GCTRocketMQConfiguration
17
+ *
18
+ * @author lloyd
19
+ * @version 1.0
20
+ * @since 2021/6/24 下午6:30
21
+ */
22
+@Configuration
23
+@EnableConfigurationProperties({GCTMutableMQProperties.class})
24
+public class GCTRocketMQConfiguration {
25
+
26
+    public static DefaultMQProducer buildDefaultMQProducer(GCTRocketMQProperties properties) {
27
+        GCTRocketMQProperties.Producer producerConfig = properties.getProducer();
28
+        String nameServer = properties.getNameServer();
29
+        String groupName = producerConfig.getGroup();
30
+        Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
31
+        Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
32
+        String accessChannel = properties.getAccessChannel();
33
+        String ak = properties.getProducer().getAccessKey();
34
+        String sk = properties.getProducer().getSecretKey();
35
+        boolean isEnableMsgTrace = properties.getProducer().isEnableMsgTrace();
36
+        String customizedTraceTopic = properties.getProducer().getCustomizedTraceTopic();
37
+        DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);
38
+        producer.setNamesrvAddr(nameServer);
39
+        if (!StringUtils.isEmpty(accessChannel)) {
40
+            producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
41
+        }
42
+
43
+        producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
44
+        producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
45
+        producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
46
+        producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
47
+        producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
48
+        producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
49
+        return producer;
50
+    }
51
+
52
+    @Bean(name = "diagnoseMessageProducer", destroyMethod = "destroy", initMethod = "init")
53
+    public MessageProducer diagnoseMessageProducer(GCTMutableMQProperties properties,
54
+                                                   @Qualifier("defaultMsgSendSuccessCallBack") SendSuccessCallBack successCallBack) {
55
+        GCTRocketMQProperties one = properties.getOne();
56
+        return new DiagnoseMessageProducer(buildDefaultMQProducer(one), one, (e, msg) -> {
57
+            Logger log = LoggerFactory.getLogger(DiagnoseMessageProducer.class);
58
+            log.error("send diagnose message failed  cause by:{},stack:{}", e.getMessage(), e.getStackTrace());
59
+            DefaultMsgSendFailCallBack.accept(msg, one.getLogFilePath());
60
+        }, successCallBack);
61
+    }
62
+
63
+    @Bean(name = "warnMessageProducer", destroyMethod = "destroy", initMethod = "init")
64
+    public MessageProducer warnMessageProducer(GCTMutableMQProperties properties,
65
+                                               @Qualifier("defaultMsgSendSuccessCallBack") SendSuccessCallBack successCallBack) {
66
+        GCTRocketMQProperties two = properties.getTwo();
67
+        return new WarnMessageProducer(buildDefaultMQProducer(two), two, (e, msg) -> {
68
+            Logger log = LoggerFactory.getLogger(WarnMessageProducer.class);
69
+            log.error("send warn message failed  cause by:{},stack:{}", e.getMessage(), e.getStackTrace());
70
+            DefaultMsgSendFailCallBack.accept(msg, two.getLogFilePath());
71
+        }, successCallBack);
72
+    }
73
+
74
+}

+ 22 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/GCTRocketMQProperties.java

@@ -0,0 +1,22 @@
1
+package com.gct.tools.etlcamelhuge.MQ;
2
+
3
+import lombok.Data;
4
+import lombok.EqualsAndHashCode;
5
+import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
6
+
7
+/**
8
+ * class name: GCTRocketMQProperties
9
+ *
10
+ * @author lloyd
11
+ * @version 1.0
12
+ * @since 2021/6/24 下午6:26
13
+ */
14
+@Data
15
+@EqualsAndHashCode(callSuper = false)
16
+public class GCTRocketMQProperties extends RocketMQProperties {
17
+
18
+    private String topic;
19
+    private String logFilePath;
20
+    private String tags;
21
+
22
+}

+ 12 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/MessageBody.java

@@ -0,0 +1,12 @@
1
+package com.gct.tools.etlcamelhuge.MQ;
2
+
3
+/**
4
+ * interface name: MessageBody
5
+ *
6
+ * @author lloyd
7
+ * @version 1.0
8
+ * @since 2021/6/24 下午7:01
9
+ */
10
+public interface MessageBody {
11
+    String toJsonBody();
12
+}

+ 51 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/MessageProducer.java

@@ -0,0 +1,51 @@
1
+package com.gct.tools.etlcamelhuge.MQ;
2
+
3
+import org.apache.rocketmq.client.exception.MQClientException;
4
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
5
+import org.apache.rocketmq.client.producer.SendCallback;
6
+import org.apache.rocketmq.client.producer.SendResult;
7
+import org.apache.rocketmq.common.message.Message;
8
+import org.apache.rocketmq.remoting.exception.RemotingException;
9
+import org.slf4j.Logger;
10
+import org.slf4j.LoggerFactory;
11
+
12
+import java.util.Objects;
13
+
14
+/**
15
+ * class name: MessageProducer
16
+ *
17
+ * @author lloyd
18
+ * @version 1.0
19
+ * @since 2021/6/24 下午6:41
20
+ */
21
+
22
+public interface MessageProducer {
23
+    void init();
24
+
25
+    default void send(MessageBody msgBody) {
26
+        LoggerFactory.getLogger(MessageProducer.class).info("sending msg :==>{}", msgBody.toJsonBody());
27
+    }
28
+
29
+    void destroy() throws Exception;
30
+
31
+    default void sendDefault(DefaultMQProducer producer, Message msg, MessageBody actualMsg, SendFailCallBack failCallBack, SendSuccessCallBack successCallBack) {
32
+        Logger log = LoggerFactory.getLogger(producer.getProducerGroup() + producer.getClass().getTypeName());
33
+        try {
34
+            producer.send(msg, new SendCallback() {
35
+                @Override
36
+                public void onSuccess(SendResult sendResult) {
37
+                    if (Objects.nonNull(successCallBack)) successCallBack.accept(sendResult);
38
+                }
39
+
40
+                @Override
41
+                public void onException(Throwable throwable) {
42
+                    if (Objects.nonNull(failCallBack)) failCallBack.accept(throwable, actualMsg);
43
+                }
44
+            });
45
+        } catch (MQClientException | RemotingException | InterruptedException e) {
46
+            log.error("Send msg failed,by cause:{},stack:{}", e.getMessage(), e.getStackTrace());
47
+        }
48
+    }
49
+
50
+
51
+}

+ 13 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/SendFailCallBack.java

@@ -0,0 +1,13 @@
1
+package com.gct.tools.etlcamelhuge.MQ;
2
+
3
+/**
4
+ * interface name: SendFailCallBack
5
+ *
6
+ * @author lloyd
7
+ * @version 1.0
8
+ * @since 2021/6/24 下午6:58
9
+ */
10
+@FunctionalInterface
11
+public interface SendFailCallBack {
12
+    void accept(Throwable e, MessageBody msg);
13
+}

+ 15 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/SendSuccessCallBack.java

@@ -0,0 +1,15 @@
1
+package com.gct.tools.etlcamelhuge.MQ;
2
+
3
+import org.apache.rocketmq.client.producer.SendResult;
4
+
5
+/**
6
+ * interface name: SendSuccessCallBack
7
+ *
8
+ * @author lloyd
9
+ * @version 1.0
10
+ * @since 2021/6/24 下午7:03
11
+ */
12
+@FunctionalInterface
13
+public interface SendSuccessCallBack {
14
+    void accept(SendResult sendResult);
15
+}

+ 57 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/WarnMessageProducer.java

@@ -0,0 +1,57 @@
1
+package com.gct.tools.etlcamelhuge.MQ;
2
+
3
+import lombok.extern.slf4j.Slf4j;
4
+import org.apache.rocketmq.client.exception.MQClientException;
5
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
6
+import org.apache.rocketmq.common.message.Message;
7
+
8
+import java.nio.charset.StandardCharsets;
9
+
10
+/**
11
+ * class name: DiagnoseMessageProducer
12
+ *
13
+ * @author lloyd
14
+ * @version 1.0
15
+ * @since 2021/6/24 下午6:44
16
+ */
17
+@Slf4j
18
+public class WarnMessageProducer implements MessageProducer {
19
+    private final DefaultMQProducer producer;
20
+    private final GCTRocketMQProperties properties;
21
+
22
+    private final SendFailCallBack failCallBack;
23
+    private final SendSuccessCallBack successCallBack;
24
+
25
+    public WarnMessageProducer(DefaultMQProducer producer, GCTRocketMQProperties properties, SendFailCallBack failCallBack, SendSuccessCallBack successCallBack) {
26
+        this.producer = producer;
27
+        this.properties = properties;
28
+        this.failCallBack = failCallBack;
29
+        this.successCallBack = successCallBack;
30
+    }
31
+
32
+    @Override
33
+    public void init() {
34
+        try {
35
+            producer.start();
36
+        } catch (MQClientException e) {
37
+            log.error("producer can not start ,cause by:{}", e.getErrorMessage());
38
+        }
39
+    }
40
+
41
+    @Override
42
+    public void send(MessageBody msgBody) {
43
+        MessageProducer.super.send(msgBody);
44
+        if (!(msgBody instanceof WarnMsg)) {
45
+            log.error("Not supported type of msg body:{},and only supported 'WarnMsg'", msgBody.getClass().getTypeName());
46
+            return;
47
+        }
48
+        WarnMsg actualMsg = (WarnMsg) msgBody;
49
+        Message msg = new Message(properties.getTopic(), properties.getTags(), actualMsg.getWellId(), msgBody.toJsonBody().getBytes(StandardCharsets.UTF_8));
50
+        sendDefault(producer, msg, actualMsg, failCallBack, successCallBack);
51
+    }
52
+
53
+    @Override
54
+    public void destroy() {
55
+        producer.shutdown();
56
+    }
57
+}

+ 39 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/WarnMsg.java

@@ -0,0 +1,39 @@
1
+package com.gct.tools.etlcamelhuge.MQ;
2
+
3
+import com.alibaba.fastjson.JSONObject;
4
+import lombok.AllArgsConstructor;
5
+import lombok.Data;
6
+
7
+/**
8
+ * class name: MQDiagnoseMsg
9
+ * <p>
10
+ * rocketMQ exchange message entity:
11
+ * for diagnose process
12
+ * <p>
13
+ * Notice:
14
+ * 1.the message key:well_id
15
+ * 2.the formatter of sgt eg:0,123,1,253...
16
+ * 3.use the secret "DiagnoseMsg_v1.0" to send message
17
+ * 4.in order to send message sequence orderly,please do that :
18
+ * sendOneWayOrderly(./././)
19
+ * </p>
20
+ *
21
+ * @author lloyd
22
+ * @version 1.0
23
+ * @since 2021/6/24 下午6:48
24
+ */
25
+@Data
26
+@AllArgsConstructor
27
+public class WarnMsg implements MessageBody {
28
+    private String wellId;
29
+    private String context;
30
+    private String wnDate;
31
+    private String resultId;
32
+    private String category;
33
+    private String rank;
34
+
35
+    @Override
36
+    public String toJsonBody() {
37
+        return JSONObject.toJSONString(this);
38
+    }
39
+}

+ 29 - 0
src/main/java/com/gct/tools/etlcamelhuge/entity/DiagnoseMsg.java

@@ -0,0 +1,29 @@
1
+package com.gct.tools.etlcamelhuge.entity;
2
+
3
+import com.alibaba.fastjson.JSONObject;
4
+import com.fasterxml.jackson.annotation.JsonFormat;
5
+import com.gct.tools.etlcamelhuge.MQ.MessageBody;
6
+import lombok.AllArgsConstructor;
7
+import lombok.Data;
8
+
9
+@Data
10
+@AllArgsConstructor
11
+public class DiagnoseMsg implements MessageBody {
12
+
13
+    private String wellId;
14
+    private String wellName;
15
+    private String orgId;
16
+
17
+    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss" , timezone = "GMT+8")
18
+    private String prodDate;
19
+    private String sgt;
20
+    private String rcvDate;
21
+    private double s;
22
+    private double n;
23
+
24
+
25
+    @Override
26
+    public String toJsonBody() {
27
+        return JSONObject.toJSONString(this);
28
+    }
29
+}

+ 0 - 14
src/main/java/com/gct/tools/etlcamelhuge/entity/MQDiagnoseMsg.java

@@ -1,14 +0,0 @@
1
-package com.gct.tools.etlcamelhuge.entity;
2
-
3
-import lombok.Data;
4
-
5
-@Data
6
-public class MQDiagnoseMsg {
7
-
8
-    private String well_id ;
9
-    private String well_name;
10
-    private String org_id;
11
-    private String pord_date;//创建时间
12
-    private String sgt;
13
-    private String rcv_date;//采集时间
14
-}

+ 36 - 19
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -1,26 +1,21 @@
1 1
 package com.gct.tools.etlcamelhuge.routeconfig;
2 2
 
3
-import com.alibaba.fastjson.JSONObject;
4
-import com.gct.tools.etlcamelhuge.entity.MQDiagnoseMsg;
3
+import com.gct.tools.etlcamelhuge.MQ.MessageBody;
4
+import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
5
+import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
5 6
 import org.apache.camel.*;
6 7
 import org.apache.camel.builder.RouteBuilder;
7
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
8 8
 //import org.apache.rocketmq.common.message.Message;
9 9
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
10
-import org.slf4j.Logger;
11 10
 import org.springframework.beans.factory.annotation.Autowired;
12 11
 import org.springframework.context.annotation.Bean;
13 12
 import org.springframework.context.annotation.Configuration;
14
-import org.springframework.http.HttpEntity;
15
-import org.springframework.http.HttpHeaders;
16
-import org.springframework.http.MediaType;
17
-import org.springframework.jdbc.support.SQLStateSQLExceptionTranslator;
18 13
 
14
+import javax.annotation.Resource;
19 15
 import java.math.BigDecimal;
20
-import java.nio.charset.StandardCharsets;
21 16
 import java.text.SimpleDateFormat;
17
+import java.time.LocalDateTime;
22 18
 import java.util.*;
23
-import java.util.concurrent.atomic.AtomicInteger;
24 19
 
25 20
 /**
26 21
  * class name: CamelJDBCConfiguration
@@ -271,6 +266,8 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
271 266
         }
272 267
         return Arrays.stream(doubles).max().getAsDouble();
273 268
     }
269
+    @Resource(name = "diagnoseMessageProducer")
270
+    private MessageProducer producer;
274 271
     @Bean
275 272
     public RouteBuilder routeBuilderWithOracle1() {
276 273
         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
@@ -285,9 +282,9 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
285 282
             //全部执行完成的大概时间在30-40分钟
286 283
             @Override
287 284
             public void configure() throws Exception {
288
-                //0 0 18 * * ?   每天下午6点执行一次
285
+                //24小时执行一次
289 286
                 //单个执行时间30s左右,在之前有数据的情况下
290
-                from("timer:mytimer1?period=99999999999")
287
+                from("timer:mytimer1?period=604800000")
291 288
                         .routeId("oracle-1")
292 289
                         .setHeader("date", constant(date1))
293 290
                         .setBody(simple("select  distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
@@ -372,7 +369,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
372 369
                         .log("insert")
373 370
                         .end();
374 371
                 //单独执行时间10s
375
-                from("timer:mytimer2?period=99999999999")
372
+                from("timer:mytimer2?period=3600000")
376 373
                         .routeId("oracle-2")
377 374
                         .setHeader("date", constant(date1))
378 375
                         .setBody(simple("select  distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
@@ -396,7 +393,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
396 393
                 //将查询到的DYM数据更新到cb_pc_pro_wellbore_status_daily中
397 394
                 //0 0 */1 * * ? 每1个小时执行一次
398 395
                 //单独执行时间是4m15s 317条数据
399
-                from("timer:mytimer5?period=99999999999")
396
+                from("timer:mytimer5?period=3600000")
400 397
                         .routeId("oracle-5")
401 398
                         .setHeader("date", constant(date1 + " 00:00:00"))
402 399
                         //三个月之内dym不为空的数据
@@ -409,7 +406,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
409 406
                         .log("insert !!!")
410 407
                         .end();
411 408
                 //单独执行时间30s
412
-                from("timer:mytimer3?period=99999999999")
409
+                from("timer:mytimer3?period=3600000")
413 410
                         .routeId("oracle-3")
414 411
                         .setHeader("date", constant(date1))
415 412
                         .setBody(simple("select distinct  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
@@ -432,7 +429,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
432 429
 
433 430
                 //0 0 */1 * * ? 每1个小时执行一次
434 431
                 //单独执行一次30s
435
-                from("timer:mytimer4?period=99999999999")
432
+                from("timer:mytimer4?period=3600000")
436 433
                         .routeId("oracle-4")
437 434
                         .setHeader("date", constant(date1))
438 435
                         .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null  "))
@@ -460,7 +457,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
460 457
                 //从天安哪里获取的数据
461 458
                 //0 0 */1 * * ? 每1个小时执行一次
462 459
                 //单独执行一小时的数据30s
463
-                from("timer:mytimer7?period=99999999999")
460
+               /* from("timer:mytimer7?period=3600000")
464 461
                         .routeId("jdbc-gtsj-?")
465 462
                         .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
466 463
                         .to("jdbc:centralbase")
@@ -501,9 +498,29 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
501 498
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
502 499
                                 "values ('${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}')"))
503 500
                         .to("jdbc:centralbase")
504
-                        .log("insert!!!").end();
501
+                        .log("insert!!!").end();*/
502
+
503
+                from("timer:mytimer1?period=3600000")
504
+                        .routeId("centralbase-1")
505
+                        .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) "))
506
+                        .to("jdbc:centralbase")
507
+                        .split(body()).log("log{body}")
508
+                        .process(exchange -> {
509
+                            Message in = exchange.getIn();
510
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
511
+                            String wellName =aRow.get("well_common_name").toString();
512
+                            String wellId =aRow.get("well_id").toString();
513
+                            String orgId = aRow.get("org_id").toString();
514
+                            String prodDate = aRow.get("prod_date").toString().substring(0,19);
515
+                            Double strokeLength = Double.valueOf(aRow.get("stroke_length").toString());
516
+                            Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString());
517
+                            System.out.println("strokeFrequency"+strokeFrequency);
518
+                            String sgt = aRow.get("sgt").toString();
519
+                            DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
520
+                            producer.send((MessageBody) diagnoseMsg);
521
+                        }).log("send success").end();
505 522
 //-------------------------------------------------------------------------------------------------------------------------------------------------------------------
506
-           /*     from("timer:mytimer2?period=999999999")
523
+           /*     from("timer:mytimer2?period=3600000")
507 524
                         .routeId("oracle-2")
508 525
                         .setBody(simple("select distinct station_id,station_name from centralbase.cb_cd_well_source"))
509 526
                         .to("jdbc:centralbase")

+ 28 - 7
src/main/resources/application.yml

@@ -124,10 +124,31 @@ rocketmq:
124 124
     sendMessageTimeout: 300000
125 125
     access-key: 123456
126 126
     secret-key: 123456
127
-logging:
128
-  file:
129
-    path: /home/gxt/IdeaProjects/etl-camel-huge/logs
130
-    max-history: 30
131
-    max-size: 10
132
-    name: log
133
-
127
+gct:
128
+  rocketmq:
129
+    one:
130
+      topic: diagnose-msg
131
+      tags: v1
132
+      log-file-path: /home/lloyd/Desktop/log/task.json
133
+      producer:
134
+        group: diagnose
135
+        access-key: diagnose-msg-v1
136
+        secret-key: diagnose-msg-v1
137
+      name-server: 10.72.143.2:9876
138
+      consumer:
139
+        group: diagnose
140
+        access-key: diagnose-msg-v1
141
+        secret-key: diagnose-msg-v1
142
+    two:
143
+      topic: warn-msg
144
+      tags: v1
145
+      log-file-path: /home/lloyd/Desktop/log/task.json
146
+      producer:
147
+        group: warn
148
+        access-key: warn-msg-v1
149
+        secret-key: warn-msg-v1
150
+      name-server: 10.72.143.2:9876
151
+      consumer:
152
+        group: warn
153
+        access-key: warn-msg-v1
154
+        secret-key: warn-msg-v1