Browse Source

mq need config

xusirui 8 months ago
parent
commit
4bde83bbda

+ 2 - 5
src/main/java/com/gct/tools/etlcamelhuge/MQ/MessageProducer.java

@@ -34,9 +34,8 @@ public interface MessageProducer {
34
     default void sendDefault(DefaultMQProducer producer, Message msg, MessageBody actualMsg, SendFailCallBack failCallBack, SendSuccessCallBack successCallBack,int retryCount) {
34
     default void sendDefault(DefaultMQProducer producer, Message msg, MessageBody actualMsg, SendFailCallBack failCallBack, SendSuccessCallBack successCallBack,int retryCount) {
35
         Logger log = LoggerFactory.getLogger(producer.getProducerGroup() + producer.getClass().getTypeName());
35
         Logger log = LoggerFactory.getLogger(producer.getProducerGroup() + producer.getClass().getTypeName());
36
         try {
36
         try {
37
-            producer.send(msg, 5);
38
 
37
 
39
-           /* producer.send(msg, new SendCallback() {
38
+            producer.send(msg, new SendCallback() {
40
                 @Override
39
                 @Override
41
                 public void onSuccess(SendResult sendResult) {
40
                 public void onSuccess(SendResult sendResult) {
42
                     if (Objects.nonNull(successCallBack)) successCallBack.accept(sendResult);
41
                     if (Objects.nonNull(successCallBack)) successCallBack.accept(sendResult);
@@ -51,12 +50,10 @@ public interface MessageProducer {
51
                         if (Objects.nonNull(failCallBack)) failCallBack.accept(throwable, actualMsg);
50
                         if (Objects.nonNull(failCallBack)) failCallBack.accept(throwable, actualMsg);
52
                     }
51
                     }
53
                 }
52
                 }
54
-            },50);*/
53
+            });
55
         } catch (MQClientException | RemotingException | InterruptedException e) {
54
         } catch (MQClientException | RemotingException | InterruptedException e) {
56
             e.printStackTrace();
55
             e.printStackTrace();
57
             log.error("Send msg failed,by cause:{},stack:{}", e.getMessage(), e.getStackTrace());
56
             log.error("Send msg failed,by cause:{},stack:{}", e.getMessage(), e.getStackTrace());
58
-        } catch (MQBrokerException e) {
59
-            throw new RuntimeException(e);
60
         }
57
         }
61
     }
58
     }
62
 
59
 

+ 13 - 1
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCCofRealTimeConfiguration.java

@@ -106,10 +106,14 @@ public class CamelJDBCCofRealTimeConfiguration {
106
                         .split(body()).process(exchange -> {
106
                         .split(body()).process(exchange -> {
107
                             HashMap body = exchange.getIn().getBody(HashMap.class);
107
                             HashMap body = exchange.getIn().getBody(HashMap.class);
108
                             exchange.getIn().setHeader("well_id", body.get("well_id"));
108
                             exchange.getIn().setHeader("well_id", body.get("well_id"));
109
+//                            System.out.println(body.get("well_id"));
110
+                            if (Objects.isNull(body.get("sgt_last_time")))body.put("sgt_last_time"
111
+                                    ,LocalDateTime.now().minusHours(4).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
109
                             exchange.getIn().setHeader("sgt_last_time", body.get("sgt_last_time").toString().substring(0,19));
112
                             exchange.getIn().setHeader("sgt_last_time", body.get("sgt_last_time").toString().substring(0,19));
110
                         })
113
                         })
111
                         .setBody(simple("select WELL_ID,WELL_NAME,DYNA_CREATE_TIME,CHECK_DATE,DISPLACEMENT,DISP_LOAD,STROKE,FREQUENCY,SUSP_MAX_LOAD,SUSP_MIN_LOAD" +
114
                         .setBody(simple("select WELL_ID,WELL_NAME,DYNA_CREATE_TIME,CHECK_DATE,DISPLACEMENT,DISP_LOAD,STROKE,FREQUENCY,SUSP_MAX_LOAD,SUSP_MIN_LOAD" +
112
-                                " from DEFAULT_GONGTU where well_id='${header.well_id}' and DYNA_CREATE_TIME >= to_date('${header.sgt_last_time}','yyyy-mm-dd hh24:mi:ss') and rownum<50 "))
115
+                                " from DEFAULT_GONGTU where CHECK_DATE >= to_date('${header.sgt_last_time}','yyyy-mm-dd hh24:mi:ss') " +
116
+                                "and well_id='${header.well_id}' and  rownum<50 "))
113
                         .to("jdbc:oracle")
117
                         .to("jdbc:oracle")
114
                         .process(exchange -> {
118
                         .process(exchange -> {
115
                             sendMsgRunTime = 0;
119
                             sendMsgRunTime = 0;
@@ -120,8 +124,15 @@ public class CamelJDBCCofRealTimeConfiguration {
120
                             String prod_date = aRow.get("DYNA_CREATE_TIME").toString().split("\\+")[0];
124
                             String prod_date = aRow.get("DYNA_CREATE_TIME").toString().split("\\+")[0];
121
                             aRow.put("DYNA_CREATE_TIME", prod_date);
125
                             aRow.put("DYNA_CREATE_TIME", prod_date);
122
                             aRow.put("sgt", "");
126
                             aRow.put("sgt", "");
127
+                            //swap table error design column
128
+                            Object tempObj = aRow.get("STROKE");
129
+                            aRow.put("STROKE",aRow.get("FREQUENCY"));
130
+                            aRow.put("FREQUENCY",tempObj);
123
                             if (!StringUtils.isEmpty(aRow.get("DISPLACEMENT")) && !StringUtils.isEmpty(aRow.get("DISP_LOAD"))) {
131
                             if (!StringUtils.isEmpty(aRow.get("DISPLACEMENT")) && !StringUtils.isEmpty(aRow.get("DISP_LOAD"))) {
124
                                 String[] displacements =  clobToString((Clob) aRow.get("DISPLACEMENT")).split(",");//10 四舍五入
132
                                 String[] displacements =  clobToString((Clob) aRow.get("DISPLACEMENT")).split(",");//10 四舍五入
133
+                                for (int i = 0; i < displacements.length; i++) {
134
+                                    displacements[i] = Double.toString(Double.parseDouble(displacements[i])/10);
135
+                                }
125
                                 String[] disp_loads = clobToString((Clob) aRow.get("DISP_LOAD")).split(",");
136
                                 String[] disp_loads = clobToString((Clob) aRow.get("DISP_LOAD")).split(",");
126
                                 Double SUSP_MAX_LOAD = max(disp_loads);
137
                                 Double SUSP_MAX_LOAD = max(disp_loads);
127
                                 Double SUSP_MIN_LOAD = min(disp_loads);
138
                                 Double SUSP_MIN_LOAD = min(disp_loads);
@@ -183,6 +194,7 @@ public class CamelJDBCCofRealTimeConfiguration {
183
                             Double strokeLength = Double.valueOf(aRow.get("STROKE").toString());
194
                             Double strokeLength = Double.valueOf(aRow.get("STROKE").toString());
184
                             Double strokeFrequency = Double.valueOf(aRow.get("FREQUENCY").toString());
195
                             Double strokeFrequency = Double.valueOf(aRow.get("FREQUENCY").toString());
185
                             String sgt = aRow.get("sgt").toString();
196
                             String sgt = aRow.get("sgt").toString();
197
+//                            System.out.println(sgt);
186
                             in.setHeader("sgt_last_time", prodDate);
198
                             in.setHeader("sgt_last_time", prodDate);
187
                             in.setHeader("well_id", wellId);
199
                             in.setHeader("well_id", wellId);
188
                             sendDataToRocketMQ(wellName, wellId, prodDate, strokeLength, strokeFrequency, sgt);
200
                             sendDataToRocketMQ(wellName, wellId, prodDate, strokeLength, strokeFrequency, sgt);