Browse Source

mq send retry one time

gxt 3 years ago
parent
commit
e5de4d016f

+ 1 - 1
src/main/java/com/gct/tools/etlcamelhuge/MQ/DefaultMsgSendSuccessCallBack.java

@@ -20,6 +20,6 @@ public class DefaultMsgSendSuccessCallBack implements SendSuccessCallBack {
20
     @Override
20
     @Override
21
     public void accept(SendResult sendResult) {
21
     public void accept(SendResult sendResult) {
22
         //System.out.println("send success= "+count.incrementAndGet());
22
         //System.out.println("send success= "+count.incrementAndGet());
23
-        //log.info("send msg success,msg:{}", sendResult.getMsgId());
23
+        log.info("send msg success,msg:{}", sendResult.getMsgId());
24
     }
24
     }
25
 }
25
 }

+ 1 - 1
src/main/java/com/gct/tools/etlcamelhuge/MQ/DiagnoseMessageProducer.java

@@ -50,7 +50,7 @@ public class DiagnoseMessageProducer implements MessageProducer {
50
         }
50
         }
51
         DiagnoseMsg actualMsg = (DiagnoseMsg) msgBody;
51
         DiagnoseMsg actualMsg = (DiagnoseMsg) msgBody;
52
         Message msg = new Message(properties.getTopic(), properties.getTags(), actualMsg.getWellId(), JSONObject.toJSONString(actualMsg).getBytes(StandardCharsets.UTF_8));
52
         Message msg = new Message(properties.getTopic(), properties.getTags(), actualMsg.getWellId(), JSONObject.toJSONString(actualMsg).getBytes(StandardCharsets.UTF_8));
53
-        sendDefault(producer, msg, (MessageBody) actualMsg, failCallBack, successCallBack);
53
+        sendDefault(producer, msg, (MessageBody) actualMsg, failCallBack, successCallBack,0);
54
     }
54
     }
55
 
55
 
56
     @Override
56
     @Override

+ 8 - 3
src/main/java/com/gct/tools/etlcamelhuge/MQ/MessageProducer.java

@@ -24,12 +24,12 @@ public interface MessageProducer {
24
     void init();
24
     void init();
25
 
25
 
26
     default void send(MessageBody msgBody) {
26
     default void send(MessageBody msgBody) {
27
-        LoggerFactory.getLogger(MessageProducer.class).info("sending msg :==>{}",((DiagnoseMsg)msgBody).getWellId()+" "+ ((DiagnoseMsg)msgBody).getProdDate()+" "+((DiagnoseMsg)msgBody).getSgt().substring(0,5));
27
+        LoggerFactory.getLogger(MessageProducer.class).info("sending msg :==>{}", ((DiagnoseMsg) msgBody).getWellId() + " " + ((DiagnoseMsg) msgBody).getProdDate() + " " + ((DiagnoseMsg) msgBody).getSgt().substring(0, 5));
28
     }
28
     }
29
 
29
 
30
     void destroy() throws Exception;
30
     void destroy() throws Exception;
31
 
31
 
32
-    default void sendDefault(DefaultMQProducer producer, Message msg, MessageBody actualMsg, SendFailCallBack failCallBack, SendSuccessCallBack successCallBack) {
32
+    default void sendDefault(DefaultMQProducer producer, Message msg, MessageBody actualMsg, SendFailCallBack failCallBack, SendSuccessCallBack successCallBack,int retryCount) {
33
         Logger log = LoggerFactory.getLogger(producer.getProducerGroup() + producer.getClass().getTypeName());
33
         Logger log = LoggerFactory.getLogger(producer.getProducerGroup() + producer.getClass().getTypeName());
34
         try {
34
         try {
35
             producer.send(msg, new SendCallback() {
35
             producer.send(msg, new SendCallback() {
@@ -40,7 +40,12 @@ public interface MessageProducer {
40
 
40
 
41
                 @Override
41
                 @Override
42
                 public void onException(Throwable throwable) {
42
                 public void onException(Throwable throwable) {
43
-                    if (Objects.nonNull(failCallBack)) failCallBack.accept(throwable, actualMsg);
43
+                    if (retryCount < 2) {
44
+                        log.debug("retry send msg ,last time failed cause by {}",throwable.getMessage());
45
+                        sendDefault(producer, msg, actualMsg, failCallBack, successCallBack, retryCount + 1);
46
+                    } else {
47
+                        if (Objects.nonNull(failCallBack)) failCallBack.accept(throwable, actualMsg);
48
+                    }
44
                 }
49
                 }
45
             });
50
             });
46
         } catch (MQClientException | RemotingException | InterruptedException e) {
51
         } catch (MQClientException | RemotingException | InterruptedException e) {

+ 1 - 1
src/main/java/com/gct/tools/etlcamelhuge/MQ/WarnMessageProducer.java

@@ -47,7 +47,7 @@ public class WarnMessageProducer implements MessageProducer {
47
         }
47
         }
48
         WarnMsg actualMsg = (WarnMsg) msgBody;
48
         WarnMsg actualMsg = (WarnMsg) msgBody;
49
         Message msg = new Message(properties.getTopic(), properties.getTags(), actualMsg.getWellId(), msgBody.toJsonBody().getBytes(StandardCharsets.UTF_8));
49
         Message msg = new Message(properties.getTopic(), properties.getTags(), actualMsg.getWellId(), msgBody.toJsonBody().getBytes(StandardCharsets.UTF_8));
50
-        sendDefault(producer, msg, actualMsg, failCallBack, successCallBack);
50
+        sendDefault(producer, msg, actualMsg, failCallBack, successCallBack,0);
51
     }
51
     }
52
 
52
 
53
     @Override
53
     @Override