瀏覽代碼

mq need config

xusirui 8 月之前
父節點
當前提交
67fb02d36c

+ 1 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/DiagnoseMessageProducer.java

@@ -49,6 +49,7 @@ public class DiagnoseMessageProducer implements MessageProducer {
49 49
             return;
50 50
         }
51 51
         DiagnoseMsg actualMsg = (DiagnoseMsg) msgBody;
52
+        System.out.println(properties);
52 53
         Message msg = new Message(properties.getTopic(), properties.getTags(), actualMsg.getWellId(), JSONObject.toJSONString(actualMsg).getBytes(StandardCharsets.UTF_8));
53 54
         sendDefault(producer, msg, (MessageBody) actualMsg, failCallBack, successCallBack,0);
54 55
     }

+ 9 - 2
src/main/java/com/gct/tools/etlcamelhuge/MQ/MessageProducer.java

@@ -1,6 +1,7 @@
1 1
 package com.gct.tools.etlcamelhuge.MQ;
2 2
 
3 3
 import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
4
+import org.apache.rocketmq.client.exception.MQBrokerException;
4 5
 import org.apache.rocketmq.client.exception.MQClientException;
5 6
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
6 7
 import org.apache.rocketmq.client.producer.SendCallback;
@@ -11,6 +12,7 @@ import org.slf4j.Logger;
11 12
 import org.slf4j.LoggerFactory;
12 13
 
13 14
 import java.util.Objects;
15
+import java.util.concurrent.TimeUnit;
14 16
 
15 17
 /**
16 18
  * class name: MessageProducer
@@ -32,7 +34,9 @@ public interface MessageProducer {
32 34
     default void sendDefault(DefaultMQProducer producer, Message msg, MessageBody actualMsg, SendFailCallBack failCallBack, SendSuccessCallBack successCallBack,int retryCount) {
33 35
         Logger log = LoggerFactory.getLogger(producer.getProducerGroup() + producer.getClass().getTypeName());
34 36
         try {
35
-            producer.send(msg, new SendCallback() {
37
+            producer.send(msg, 5);
38
+
39
+           /* producer.send(msg, new SendCallback() {
36 40
                 @Override
37 41
                 public void onSuccess(SendResult sendResult) {
38 42
                     if (Objects.nonNull(successCallBack)) successCallBack.accept(sendResult);
@@ -47,9 +51,12 @@ public interface MessageProducer {
47 51
                         if (Objects.nonNull(failCallBack)) failCallBack.accept(throwable, actualMsg);
48 52
                     }
49 53
                 }
50
-            });
54
+            },50);*/
51 55
         } catch (MQClientException | RemotingException | InterruptedException e) {
56
+            e.printStackTrace();
52 57
             log.error("Send msg failed,by cause:{},stack:{}", e.getMessage(), e.getStackTrace());
58
+        } catch (MQBrokerException e) {
59
+            throw new RuntimeException(e);
53 60
         }
54 61
     }
55 62
 

+ 66 - 42
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCCofRealTimeConfiguration.java

@@ -10,9 +10,13 @@ import org.springframework.context.annotation.Configuration;
10 10
 import org.springframework.util.StringUtils;
11 11
 
12 12
 import javax.annotation.Resource;
13
+import java.io.IOException;
14
+import java.io.Reader;
13 15
 import java.lang.management.MemoryMXBean;
14 16
 import java.math.BigDecimal;
15 17
 import java.math.RoundingMode;
18
+import java.sql.Clob;
19
+import java.sql.SQLException;
16 20
 import java.time.LocalDateTime;
17 21
 import java.time.format.DateTimeFormatter;
18 22
 import java.util.*;
@@ -63,6 +67,7 @@ public class CamelJDBCCofRealTimeConfiguration {
63 67
     public void sendDataToRocketMQ(String wellName, String wellId, String prodDate, Double stroke_length, Double stroke_frequency, String sgt) {
64 68
         String orgId = "0";
65 69
         DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), stroke_length, stroke_frequency);
70
+        System.out.println(diagnoseMsg);
66 71
         sendMsgRunTime++;
67 72
         producer.send(diagnoseMsg);
68 73
     }
@@ -70,6 +75,25 @@ public class CamelJDBCCofRealTimeConfiguration {
70 75
     @Resource(name = "diagnoseMessageProducer")
71 76
     private MessageProducer producer;
72 77
 
78
+    private String clobToString(Clob clob){
79
+        StringBuilder sb = new StringBuilder();
80
+        if (clob != null) {
81
+            try (Reader reader = clob.getCharacterStream()) {
82
+                char[] buffer = new char[1024];
83
+                int bytesRead;
84
+                while ((bytesRead = reader.read(buffer)) != -1) {
85
+                    sb.append(buffer, 0, bytesRead);
86
+                }
87
+            } catch (SQLException e) {
88
+                throw new RuntimeException(e);
89
+            } catch (IOException e) {
90
+                throw new RuntimeException(e);
91
+            }
92
+        }
93
+        return sb.toString();
94
+
95
+    }
96
+
73 97
     @Bean
74 98
     public RouteBuilder routeBuilderWithRealTime() {
75 99
         return new RouteBuilder() {
@@ -82,10 +106,10 @@ public class CamelJDBCCofRealTimeConfiguration {
82 106
                         .split(body()).process(exchange -> {
83 107
                             HashMap body = exchange.getIn().getBody(HashMap.class);
84 108
                             exchange.getIn().setHeader("well_id", body.get("well_id"));
85
-                            exchange.getIn().setHeader("sgt_last_time", body.get("sgt_last_time"));
109
+                            exchange.getIn().setHeader("sgt_last_time", body.get("sgt_last_time").toString().substring(0,19));
86 110
                         })
87
-                        .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load" +
88
-                                " from DEFAULT_GONGTU where well_name='${header.well_id}' and dyna_create_time >= '${header.sgt_last_time}' limit 50 "))
111
+                        .setBody(simple("select WELL_ID,WELL_NAME,DYNA_CREATE_TIME,CHECK_DATE,DISPLACEMENT,DISP_LOAD,STROKE,FREQUENCY,SUSP_MAX_LOAD,SUSP_MIN_LOAD" +
112
+                                " from DEFAULT_GONGTU where well_id='${header.well_id}' and DYNA_CREATE_TIME >= to_date('${header.sgt_last_time}','yyyy-mm-dd hh24:mi:ss') and rownum<50 "))
89 113
                         .to("jdbc:oracle")
90 114
                         .process(exchange -> {
91 115
                             sendMsgRunTime = 0;
@@ -93,14 +117,14 @@ public class CamelJDBCCofRealTimeConfiguration {
93 117
                         .split(body()).process(exchange -> {
94 118
                             Message in = exchange.getIn();
95 119
                             HashMap<String, Object> aRow = in.getBody(HashMap.class);
96
-                            String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
97
-                            aRow.put("dyna_create_time", prod_date);
120
+                            String prod_date = aRow.get("DYNA_CREATE_TIME").toString().split("\\+")[0];
121
+                            aRow.put("DYNA_CREATE_TIME", prod_date);
98 122
                             aRow.put("sgt", "");
99
-                            if (!StringUtils.isEmpty(aRow.get("displacement")) && !StringUtils.isEmpty(aRow.get("disp_load"))) {
100
-                                String[] displacements = aRow.get("displacement").toString().split(";");//10 四舍五入
101
-                                String[] disp_loads = aRow.get("disp_load").toString().split(";");
102
-                                Double susp_max_load = max(disp_loads);
103
-                                Double susp_min_load = min(disp_loads);
123
+                            if (!StringUtils.isEmpty(aRow.get("DISPLACEMENT")) && !StringUtils.isEmpty(aRow.get("DISP_LOAD"))) {
124
+                                String[] displacements =  clobToString((Clob) aRow.get("DISPLACEMENT")).split(",");//10 四舍五入
125
+                                String[] disp_loads = clobToString((Clob) aRow.get("DISP_LOAD")).split(",");
126
+                                Double SUSP_MAX_LOAD = max(disp_loads);
127
+                                Double SUSP_MIN_LOAD = min(disp_loads);
104 128
                                 String sgt = "";
105 129
                                 for (int i = 0; i < displacements.length; i++) {
106 130
                                     sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
@@ -112,15 +136,15 @@ public class CamelJDBCCofRealTimeConfiguration {
112 136
                                 }
113 137
                                 Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
114 138
                                 aRow.put("sgt", SGTUtil.encodeToString(doubles));
115
-                                aRow.put("susp_max_load", susp_max_load);
116
-                                aRow.put("susp_min_load", susp_min_load);
139
+                                aRow.put("SUSP_MAX_LOAD", SUSP_MAX_LOAD);
140
+                                aRow.put("SUSP_MIN_LOAD", SUSP_MIN_LOAD);
117 141
                             }
118 142
                             //对于位移没有数据,所有数据都在载荷中的特殊数据做特别处理
119
-                            if (StringUtils.isEmpty(aRow.get("displacement")) && !StringUtils.isEmpty(aRow.get("disp_load"))) {
120
-                                String disp_load = aRow.get("disp_load").toString().replaceAll(";", ",");
121
-                                Double[][] doubles = SGTUtil.encodeToDoubleArray(disp_load);
143
+                            if (StringUtils.isEmpty(aRow.get("DISPLACEMENT")) && !StringUtils.isEmpty(aRow.get("DISP_LOAD"))) {
144
+                                String DISP_LOAD = aRow.get("DISP_LOAD").toString().replaceAll(";", ",");
145
+                                Double[][] doubles = SGTUtil.encodeToDoubleArray(DISP_LOAD);
122 146
                                 aRow.put("sgt", SGTUtil.encodeToString(doubles));
123
-                                String[] split = disp_load.split(",");
147
+                                String[] split = DISP_LOAD.split(",");
124 148
                                 List<String> list = new ArrayList<>();
125 149
                                 for (int i = 0; i < split.length; i++) {
126 150
                                     if (i % 2 != 0) {
@@ -129,43 +153,43 @@ public class CamelJDBCCofRealTimeConfiguration {
129 153
                                     }
130 154
                                 }
131 155
                                 String[] loads = list.toArray(new String[0]);
132
-                                Double susp_max_load = null;
133
-                                Double susp_min_load = null;
156
+                                Double SUSP_MAX_LOAD = null;
157
+                                Double SUSP_MIN_LOAD = null;
134 158
                                 if (loads.length > 0) {
135
-                                    susp_max_load = max(loads);
136
-                                    susp_min_load = min(loads);
159
+                                    SUSP_MAX_LOAD = max(loads);
160
+                                    SUSP_MIN_LOAD = min(loads);
137 161
                                 }
138
-                                aRow.put("susp_max_load", susp_max_load);
139
-                                aRow.put("susp_min_load", susp_min_load);
162
+                                aRow.put("SUSP_MAX_LOAD", SUSP_MAX_LOAD);
163
+                                aRow.put("SUSP_MIN_LOAD", SUSP_MIN_LOAD);
140 164
                             }
141
-                            aRow.putIfAbsent("stroke", "0.0");
142
-                            aRow.putIfAbsent("frequency", "0.0");
143
-                            aRow.putIfAbsent("susp_max_load", "0.0");
144
-                            aRow.putIfAbsent("susp_min_load", "0.0");
145
-                            if (!StringUtils.isEmpty(aRow.get("frequency"))) {
146
-                                BigDecimal bd = new BigDecimal(aRow.get("frequency").toString());
147
-                                double frequency = bd.setScale(1, RoundingMode.HALF_UP).doubleValue();
148
-                                aRow.put("frequency", frequency);
165
+                            aRow.putIfAbsent("STROKE", "0.0");
166
+                            aRow.putIfAbsent("FREQUENCY", "0.0");
167
+                            aRow.putIfAbsent("SUSP_MAX_LOAD", "0.0");
168
+                            aRow.putIfAbsent("SUSP_MIN_LOAD", "0.0");
169
+                            if (!StringUtils.isEmpty(aRow.get("FREQUENCY"))) {
170
+                                BigDecimal bd = new BigDecimal(aRow.get("FREQUENCY").toString());
171
+                                double FREQUENCY = bd.setScale(1, RoundingMode.HALF_UP).doubleValue();
172
+                                aRow.put("FREQUENCY", FREQUENCY);
149 173
                             }
150
-                            if (!StringUtils.isEmpty(aRow.get("stroke"))) {
151
-                                double stroke1 = Double.parseDouble(aRow.get("stroke").toString());
174
+                            if (!StringUtils.isEmpty(aRow.get("STROKE"))) {
175
+                                double stroke1 = Double.parseDouble(aRow.get("STROKE").toString());
152 176
                                 BigDecimal bd = new BigDecimal(stroke1);
153
-                                double stroke = bd.setScale(1, RoundingMode.HALF_UP).doubleValue();
154
-                                aRow.put("stroke", stroke);
177
+                                double STROKE = bd.setScale(1, RoundingMode.HALF_UP).doubleValue();
178
+                                aRow.put("STROKE", STROKE);
155 179
                             }
156
-                            String wellName = aRow.get("well_name").toString();
157
-                            String wellId = aRow.get("well_name").toString();
158
-                            String prodDate = aRow.get("dyna_create_time").toString().substring(0, 19);
159
-                            Double strokeLength = Double.valueOf(aRow.get("stroke").toString());
160
-                            Double strokeFrequency = Double.valueOf(aRow.get("frequency").toString());
180
+                            String wellName = aRow.get("WELL_NAME").toString();
181
+                            String wellId = aRow.get("WELL_ID").toString();
182
+                            String prodDate = aRow.get("DYNA_CREATE_TIME").toString().substring(0, 19);
183
+                            Double strokeLength = Double.valueOf(aRow.get("STROKE").toString());
184
+                            Double strokeFrequency = Double.valueOf(aRow.get("FREQUENCY").toString());
161 185
                             String sgt = aRow.get("sgt").toString();
162 186
                             in.setHeader("sgt_last_time", prodDate);
163 187
                             in.setHeader("well_id", wellId);
164 188
                             sendDataToRocketMQ(wellName, wellId, prodDate, strokeLength, strokeFrequency, sgt);
165 189
                         })
166
-                        .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,susp_max_load,susp_min_load,sgt) " +
167
-                                "select '${body[well_name]}','${body[dyna_create_time]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
168
-                                "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${body[dyna_create_time]}' )"))
190
+                        .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,SUSP_MAX_LOAD,SUSP_MIN_LOAD,sgt,stroke_length,stroke_frequency) " +
191
+                                "select '${body[WELL_ID]}','${body[DYNA_CREATE_TIME]}','${body[SUSP_MAX_LOAD]}','${body[SUSP_MIN_LOAD]}','${body[sgt]}','${body[STROKE]}','${body[FREQUENCY]}' " +
192
+                                "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[WELL_ID]}' and  prod_date = '${body[DYNA_CREATE_TIME]}' )"))
169 193
                         .to("jdbc:centralbase")
170 194
                         .setBody(simple("update centralbase.sys_access_well_control set sgt_last_time = '${header.sgt_last_time}' where well_id ='${header.well_id}' "))
171 195
                         .to("jdbc:centralbase")

+ 11 - 5
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -11,6 +11,7 @@ import org.springframework.context.annotation.Configuration;
11 11
 
12 12
 import java.math.BigDecimal;
13 13
 import java.math.RoundingMode;
14
+import java.time.LocalDate;
14 15
 import java.time.LocalDateTime;
15 16
 import java.time.format.DateTimeFormatter;
16 17
 import java.util.*;
@@ -186,14 +187,15 @@ public class CamelJDBCConfiguration {
186 187
                                 body.put("error_id", 1);
187 188
                                 body.put("updateTime", format);
188 189
                             }
190
+                            body.put("sgtLastTime",format);
189 191
 
190 192
                         })
191 193
                         //之前在数据导入时没有自动填写入最后更新时间,会导致新加入的数据无法自动导入
192 194
                         .setBody(simple("update centralbase.sys_access_well_control set well_common_name='${body[well_common_name]}'," +
193
-                                "org_id='${body[org_id]}',update_time = '${body[updateTime]}'::timestamp,sgt_last_time='${body[updateTime]}', " +
195
+                                "org_id='${body[org_id]}',update_time = '${body[updateTime]}'::timestamp,sgt_last_time='${body[sgtLastTime]}', " +
194 196
                                 "remarks ='${body[remarks]}' ,error_id ='${body[error_id]}' " +
195
-                                "where well_id ='${body[well_id]} and " +
196
-                                "(sgt_last_time is null or sgt_last_time<'${body[yesterday]}'::timestamp)"))
197
+                                "where well_id ='${body[well_id]}' and " +
198
+                                "(sgt_last_time is null or sgt_last_time::timestamp<'${body[yesterday]}'::timestamp)"))
197 199
                         .to("jdbc:centralbase")
198 200
                         .end();
199 201
 
@@ -334,15 +336,17 @@ public class CamelJDBCConfiguration {
334 336
                         .process(exchange -> {
335 337
                             Message in = exchange.getIn();
336 338
                             in.setHeader("date", getDate());
339
+                            in.setHeader("date", LocalDateTime.now().minusDays(1).toLocalDate().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
337 340
                         });
338 341
                 setSysControlBody(volDaily)
339 342
                         .setBody(simple("select distinct  WELL_ID,PROD_DATE,PROD_TIME, LIQ_PROD_DAILY, OIL_PROD_DAILY,GAS_PROD_DAILY" +
340
-                                ",WATER_CUT, REMARKS from DBA01 where PROD_DATE  = to_date('${header.date}','yyyy-MM-dd')" +
343
+                                ",WATER_CUT, REMARKS from ZY_DXGW.PC_PRO_COM_DAILY where PROD_DATE  = to_date('${header.date}','yyyy-MM-dd')" +
341 344
                                 " and WELL_ID ='${header.well_id}'"))
342 345
                         .to("jdbc:oracle")
343 346
                         .split(body()).process(exchange -> {
344 347
                             Message in = exchange.getIn();
345 348
                             HashMap<String, Object> aRow = in.getBody(HashMap.class);
349
+
346 350
                             aRow.putIfAbsent("PROD_TIME", "0.0");
347 351
                             aRow.putIfAbsent("LIQ_PROD_DAILY", "0.0");
348 352
                             aRow.putIfAbsent("OIL_PROD_DAILY", "0.0");
@@ -383,6 +387,7 @@ public class CamelJDBCConfiguration {
383 387
                             if (!aRow.containsKey("YMD")) {
384 388
                                 aRow.put("YMD", 0.85);
385 389
                             }
390
+
386 391
                         })
387 392
                         .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks,gas_oil_ratio,water_prod_daily,water_gas_ratio,surface_crude_water_density,surface_crude_oil_density,stim_content) " +
388 393
                                 "select '${body[WELL_ID]}','${body[PROD_DATE]}','${body[WATER_CUT]}','${body[LIQ_PROD_DAILY]}','${body[OIL_PROD_DAILY]}','${body[GAS_PROD_DAILY]}','${body[WATER_CUT]}','${body[REMARKS]}','${body[QYB]}','${body[RCSL]}','${body[SQB]}','${body[SMD]}','${body[YMD]}' ,'${body[SCCW]}'" +
@@ -407,7 +412,8 @@ public class CamelJDBCConfiguration {
407 412
                             in.setHeader("date", getDate());
408 413
                         });
409 414
                 setSysControlBody(volDailyLiqProdDaily)
410
-                        .setBody(simple("select distinct  WELL_ID,PROD_DATE,WATER_CUT, LIQ_PROD_DAILY,OIL_PROD_DAILY,GAS_PROD_DAILY,WATER_CUT, REMARKS from DBA01 where PROD_DATE  = to_date('${header.date}','yyyy-MM-dd')and WELL_ID ='${header.well_id}' and qyrq is not null "))
415
+                        .setBody(simple("select distinct  WELL_ID,PROD_DATE,WATER_CUT, LIQ_PROD_DAILY,OIL_PROD_DAILY,GAS_PROD_DAILY,WATER_CUT, REMARKS" +
416
+                                " from ZY_DXGW.PC_PRO_COM_DAILY where PROD_DATE  = to_date('${header.date}','yyyy-MM-dd')and WELL_ID ='${header.well_id}'"))
411 417
                         .to("jdbc:oracle")
412 418
                         .split(body()).process(exchange -> {
413 419
                             Message in = exchange.getIn();

+ 2 - 2
src/main/resources/application.yml

@@ -131,7 +131,7 @@ gct:
131 131
     one:
132 132
       topic: diagnose-msg
133 133
       tags: v1
134
-      log-file-path: /home/lloyd/Desktop/log/task.json
134
+      log-file-path: C:\Users\Administrator\Desktop\package\a.json
135 135
       producer:
136 136
         group: diagnose
137 137
         access-key: diagnose-msg-v1
@@ -144,7 +144,7 @@ gct:
144 144
     two:
145 145
       topic: warn-msg
146 146
       tags: v1
147
-      log-file-path: /home/lloyd/Desktop/log/task.json
147
+      log-file-path: C:\Users\Administrator\Desktop\package\a.json
148 148
       producer:
149 149
         group: warn
150 150
         access-key: warn-msg-v1

File diff suppressed because it is too large
+ 32 - 8
src/test/java/com/gct/tools/etlcamelhuge/EtlCamelHugeApplicationTests.java