Bladeren bron

Merge remote-tracking branch 'origin/zygs-1024' into zygs-1024

# Conflicts:
#	src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java
xsr 1 maand geleden
bovenliggende
commit
8a75bb5d80

+ 1 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/DiagnoseMessageProducer.java

@@ -49,6 +49,7 @@ public class DiagnoseMessageProducer implements MessageProducer {
49 49
             return;
50 50
         }
51 51
         DiagnoseMsg actualMsg = (DiagnoseMsg) msgBody;
52
+        System.out.println(properties);
52 53
         Message msg = new Message(properties.getTopic(), properties.getTags(), actualMsg.getWellId(), JSONObject.toJSONString(actualMsg).getBytes(StandardCharsets.UTF_8));
53 54
         sendDefault(producer, msg, (MessageBody) actualMsg, failCallBack, successCallBack,0);
54 55
     }

+ 4 - 0
src/main/java/com/gct/tools/etlcamelhuge/MQ/MessageProducer.java

@@ -1,6 +1,7 @@
1 1
 package com.gct.tools.etlcamelhuge.MQ;
2 2
 
3 3
 import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
4
+import org.apache.rocketmq.client.exception.MQBrokerException;
4 5
 import org.apache.rocketmq.client.exception.MQClientException;
5 6
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
6 7
 import org.apache.rocketmq.client.producer.SendCallback;
@@ -11,6 +12,7 @@ import org.slf4j.Logger;
11 12
 import org.slf4j.LoggerFactory;
12 13
 
13 14
 import java.util.Objects;
15
+import java.util.concurrent.TimeUnit;
14 16
 
15 17
 /**
16 18
  * class name: MessageProducer
@@ -32,6 +34,7 @@ public interface MessageProducer {
32 34
     default void sendDefault(DefaultMQProducer producer, Message msg, MessageBody actualMsg, SendFailCallBack failCallBack, SendSuccessCallBack successCallBack,int retryCount) {
33 35
         Logger log = LoggerFactory.getLogger(producer.getProducerGroup() + producer.getClass().getTypeName());
34 36
         try {
37
+
35 38
             producer.send(msg, new SendCallback() {
36 39
                 @Override
37 40
                 public void onSuccess(SendResult sendResult) {
@@ -49,6 +52,7 @@ public interface MessageProducer {
49 52
                 }
50 53
             });
51 54
         } catch (MQClientException | RemotingException | InterruptedException e) {
55
+            e.printStackTrace();
52 56
             log.error("Send msg failed,by cause:{},stack:{}", e.getMessage(), e.getStackTrace());
53 57
         }
54 58
     }

+ 79 - 42
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCCofRealTimeConfiguration.java

@@ -10,9 +10,13 @@ import org.springframework.context.annotation.Configuration;
10 10
 import org.springframework.util.StringUtils;
11 11
 
12 12
 import javax.annotation.Resource;
13
+import java.io.IOException;
14
+import java.io.Reader;
13 15
 import java.lang.management.MemoryMXBean;
14 16
 import java.math.BigDecimal;
15 17
 import java.math.RoundingMode;
18
+import java.sql.Clob;
19
+import java.sql.SQLException;
16 20
 import java.time.LocalDateTime;
17 21
 import java.time.format.DateTimeFormatter;
18 22
 import java.util.*;
@@ -63,6 +67,7 @@ public class CamelJDBCCofRealTimeConfiguration {
63 67
     public void sendDataToRocketMQ(String wellName, String wellId, String prodDate, Double stroke_length, Double stroke_frequency, String sgt) {
64 68
         String orgId = "0";
65 69
         DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), stroke_length, stroke_frequency);
70
+        System.out.println(diagnoseMsg);
66 71
         sendMsgRunTime++;
67 72
         producer.send(diagnoseMsg);
68 73
     }
@@ -70,6 +75,25 @@ public class CamelJDBCCofRealTimeConfiguration {
70 75
     @Resource(name = "diagnoseMessageProducer")
71 76
     private MessageProducer producer;
72 77
 
78
+    private String clobToString(Clob clob){
79
+        StringBuilder sb = new StringBuilder();
80
+        if (clob != null) {
81
+            try (Reader reader = clob.getCharacterStream()) {
82
+                char[] buffer = new char[1024];
83
+                int bytesRead;
84
+                while ((bytesRead = reader.read(buffer)) != -1) {
85
+                    sb.append(buffer, 0, bytesRead);
86
+                }
87
+            } catch (SQLException e) {
88
+                throw new RuntimeException(e);
89
+            } catch (IOException e) {
90
+                throw new RuntimeException(e);
91
+            }
92
+        }
93
+        return sb.toString();
94
+
95
+    }
96
+
73 97
     @Bean
74 98
     public RouteBuilder routeBuilderWithRealTime() {
75 99
         return new RouteBuilder() {
@@ -82,10 +106,14 @@ public class CamelJDBCCofRealTimeConfiguration {
82 106
                         .split(body()).process(exchange -> {
83 107
                             HashMap body = exchange.getIn().getBody(HashMap.class);
84 108
                             exchange.getIn().setHeader("well_id", body.get("well_id"));
85
-                            exchange.getIn().setHeader("sgt_last_time", body.get("sgt_last_time"));
109
+//                            System.out.println(body.get("well_id"));
110
+                            if (Objects.isNull(body.get("sgt_last_time")))body.put("sgt_last_time"
111
+                                    ,LocalDateTime.now().minusHours(4).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
112
+                            exchange.getIn().setHeader("sgt_last_time", body.get("sgt_last_time").toString().substring(0,19));
86 113
                         })
87
-                        .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load" +
88
-                                " from DEFAULT_GONGTU where well_name='${header.well_id}' and dyna_create_time >= '${header.sgt_last_time}' limit 50 "))
114
+                        .setBody(simple("select WELL_ID,WELL_NAME,DYNA_CREATE_TIME,CHECK_DATE,DISPLACEMENT,DISP_LOAD,STROKE,FREQUENCY,SUSP_MAX_LOAD,SUSP_MIN_LOAD" +
115
+                                " from DEFAULT_GONGTU where CHECK_DATE >= to_date('${header.sgt_last_time}','yyyy-mm-dd hh24:mi:ss') " +
116
+                                "and well_id='${header.well_id}' and  rownum<50 "))
89 117
                         .to("jdbc:oracle")
90 118
                         .process(exchange -> {
91 119
                             sendMsgRunTime = 0;
@@ -93,14 +121,22 @@ public class CamelJDBCCofRealTimeConfiguration {
93 121
                         .split(body()).process(exchange -> {
94 122
                             Message in = exchange.getIn();
95 123
                             HashMap<String, Object> aRow = in.getBody(HashMap.class);
96
-                            String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
97
-                            aRow.put("dyna_create_time", prod_date);
124
+                            String prod_date = aRow.get("DYNA_CREATE_TIME").toString().split("\\+")[0];
125
+                            aRow.put("DYNA_CREATE_TIME", prod_date);
98 126
                             aRow.put("sgt", "");
99
-                            if (!StringUtils.isEmpty(aRow.get("displacement")) && !StringUtils.isEmpty(aRow.get("disp_load"))) {
100
-                                String[] displacements = aRow.get("displacement").toString().split(";");//10 四舍五入
101
-                                String[] disp_loads = aRow.get("disp_load").toString().split(";");
102
-                                Double susp_max_load = max(disp_loads);
103
-                                Double susp_min_load = min(disp_loads);
127
+                            //swap table error design column
128
+                            Object tempObj = aRow.get("STROKE");
129
+                            aRow.put("STROKE",aRow.get("FREQUENCY"));
130
+                            aRow.put("FREQUENCY",tempObj);
131
+                            if (!StringUtils.isEmpty(aRow.get("DISPLACEMENT")) && !StringUtils.isEmpty(aRow.get("DISP_LOAD"))) {
132
+                                String[] displacements =  clobToString((Clob) aRow.get("DISPLACEMENT")).split(",");//10 四舍五入
133
+                                for (int i = 0; i < displacements.length; i++) {
134
+                                    displacements[i] = Double.toString(Double.parseDouble(displacements[i])/10);
135
+                                }
136
+//                                System.out.println("disp="+Arrays.deepToString(displacements));
137
+                                String[] disp_loads = clobToString((Clob) aRow.get("DISP_LOAD")).split(",");
138
+                                Double SUSP_MAX_LOAD = max(disp_loads);
139
+                                Double SUSP_MIN_LOAD = min(disp_loads);
104 140
                                 String sgt = "";
105 141
                                 for (int i = 0; i < displacements.length; i++) {
106 142
                                     sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
@@ -112,15 +148,15 @@ public class CamelJDBCCofRealTimeConfiguration {
112 148
                                 }
113 149
                                 Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
114 150
                                 aRow.put("sgt", SGTUtil.encodeToString(doubles));
115
-                                aRow.put("susp_max_load", susp_max_load);
116
-                                aRow.put("susp_min_load", susp_min_load);
151
+                                aRow.put("SUSP_MAX_LOAD", SUSP_MAX_LOAD);
152
+                                aRow.put("SUSP_MIN_LOAD", SUSP_MIN_LOAD);
117 153
                             }
118 154
                             //对于位移没有数据,所有数据都在载荷中的特殊数据做特别处理
119
-                            if (StringUtils.isEmpty(aRow.get("displacement")) && !StringUtils.isEmpty(aRow.get("disp_load"))) {
120
-                                String disp_load = aRow.get("disp_load").toString().replaceAll(";", ",");
121
-                                Double[][] doubles = SGTUtil.encodeToDoubleArray(disp_load);
155
+                            if (StringUtils.isEmpty(aRow.get("DISPLACEMENT")) && !StringUtils.isEmpty(aRow.get("DISP_LOAD"))) {
156
+                                String DISP_LOAD = aRow.get("DISP_LOAD").toString().replaceAll(";", ",");
157
+                                Double[][] doubles = SGTUtil.encodeToDoubleArray(DISP_LOAD);
122 158
                                 aRow.put("sgt", SGTUtil.encodeToString(doubles));
123
-                                String[] split = disp_load.split(",");
159
+                                String[] split = DISP_LOAD.split(",");
124 160
                                 List<String> list = new ArrayList<>();
125 161
                                 for (int i = 0; i < split.length; i++) {
126 162
                                     if (i % 2 != 0) {
@@ -129,43 +165,44 @@ public class CamelJDBCCofRealTimeConfiguration {
129 165
                                     }
130 166
                                 }
131 167
                                 String[] loads = list.toArray(new String[0]);
132
-                                Double susp_max_load = null;
133
-                                Double susp_min_load = null;
168
+                                Double SUSP_MAX_LOAD = null;
169
+                                Double SUSP_MIN_LOAD = null;
134 170
                                 if (loads.length > 0) {
135
-                                    susp_max_load = max(loads);
136
-                                    susp_min_load = min(loads);
171
+                                    SUSP_MAX_LOAD = max(loads);
172
+                                    SUSP_MIN_LOAD = min(loads);
137 173
                                 }
138
-                                aRow.put("susp_max_load", susp_max_load);
139
-                                aRow.put("susp_min_load", susp_min_load);
174
+                                aRow.put("SUSP_MAX_LOAD", SUSP_MAX_LOAD);
175
+                                aRow.put("SUSP_MIN_LOAD", SUSP_MIN_LOAD);
140 176
                             }
141
-                            aRow.putIfAbsent("stroke", "0.0");
142
-                            aRow.putIfAbsent("frequency", "0.0");
143
-                            aRow.putIfAbsent("susp_max_load", "0.0");
144
-                            aRow.putIfAbsent("susp_min_load", "0.0");
145
-                            if (!StringUtils.isEmpty(aRow.get("frequency"))) {
146
-                                BigDecimal bd = new BigDecimal(aRow.get("frequency").toString());
147
-                                double frequency = bd.setScale(1, RoundingMode.HALF_UP).doubleValue();
148
-                                aRow.put("frequency", frequency);
177
+                            aRow.putIfAbsent("STROKE", "0.0");
178
+                            aRow.putIfAbsent("FREQUENCY", "0.0");
179
+                            aRow.putIfAbsent("SUSP_MAX_LOAD", "0.0");
180
+                            aRow.putIfAbsent("SUSP_MIN_LOAD", "0.0");
181
+                            if (!StringUtils.isEmpty(aRow.get("FREQUENCY"))) {
182
+                                BigDecimal bd = new BigDecimal(aRow.get("FREQUENCY").toString());
183
+                                double FREQUENCY = bd.setScale(1, RoundingMode.HALF_UP).doubleValue();
184
+                                aRow.put("FREQUENCY", FREQUENCY);
149 185
                             }
150
-                            if (!StringUtils.isEmpty(aRow.get("stroke"))) {
151
-                                double stroke1 = Double.parseDouble(aRow.get("stroke").toString());
186
+                            if (!StringUtils.isEmpty(aRow.get("STROKE"))) {
187
+                                double stroke1 = Double.parseDouble(aRow.get("STROKE").toString());
152 188
                                 BigDecimal bd = new BigDecimal(stroke1);
153
-                                double stroke = bd.setScale(1, RoundingMode.HALF_UP).doubleValue();
154
-                                aRow.put("stroke", stroke);
189
+                                double STROKE = bd.setScale(1, RoundingMode.HALF_UP).doubleValue();
190
+                                aRow.put("STROKE", STROKE);
155 191
                             }
156
-                            String wellName = aRow.get("well_name").toString();
157
-                            String wellId = aRow.get("well_name").toString();
158
-                            String prodDate = aRow.get("dyna_create_time").toString().substring(0, 19);
159
-                            Double strokeLength = Double.valueOf(aRow.get("stroke").toString());
160
-                            Double strokeFrequency = Double.valueOf(aRow.get("frequency").toString());
192
+                            String wellName = aRow.get("WELL_NAME").toString();
193
+                            String wellId = aRow.get("WELL_ID").toString();
194
+                            String prodDate = aRow.get("DYNA_CREATE_TIME").toString().substring(0, 19);
195
+                            Double strokeLength = Double.valueOf(aRow.get("STROKE").toString());
196
+                            Double strokeFrequency = Double.valueOf(aRow.get("FREQUENCY").toString());
161 197
                             String sgt = aRow.get("sgt").toString();
198
+//                            System.out.println(sgt);
162 199
                             in.setHeader("sgt_last_time", prodDate);
163 200
                             in.setHeader("well_id", wellId);
164 201
                             sendDataToRocketMQ(wellName, wellId, prodDate, strokeLength, strokeFrequency, sgt);
165 202
                         })
166
-                        .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,susp_max_load,susp_min_load,sgt) " +
167
-                                "select '${body[well_name]}','${body[dyna_create_time]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
168
-                                "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${body[dyna_create_time]}' )"))
203
+                        .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,SUSP_MAX_LOAD,SUSP_MIN_LOAD,sgt,stroke_length,stroke_frequency) " +
204
+                                "select '${body[WELL_ID]}','${body[DYNA_CREATE_TIME]}','${body[SUSP_MAX_LOAD]}','${body[SUSP_MIN_LOAD]}','${body[sgt]}','${body[STROKE]}','${body[FREQUENCY]}' " +
205
+                                "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[WELL_ID]}' and  prod_date = '${body[DYNA_CREATE_TIME]}' )"))
169 206
                         .to("jdbc:centralbase")
170 207
                         .setBody(simple("update centralbase.sys_access_well_control set sgt_last_time = '${header.sgt_last_time}' where well_id ='${header.well_id}' "))
171 208
                         .to("jdbc:centralbase")

+ 28 - 19
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -11,6 +11,7 @@ import org.springframework.context.annotation.Configuration;
11 11
 
12 12
 import java.math.BigDecimal;
13 13
 import java.math.RoundingMode;
14
+import java.time.LocalDate;
14 15
 import java.time.LocalDateTime;
15 16
 import java.time.format.DateTimeFormatter;
16 17
 import java.util.*;
@@ -79,14 +80,15 @@ public class CamelJDBCConfiguration {
79 80
                                 body.put("error_id", 1);
80 81
                                 body.put("updateTime", format);
81 82
                             }
83
+                            body.put("sgtLastTime",format);
82 84
 
83 85
                         })
84 86
                         //之前在数据导入时没有自动填写入最后更新时间,会导致新加入的数据无法自动导入
85 87
                         .setBody(simple("update centralbase.sys_access_well_control set well_common_name='${body[well_common_name]}'," +
86
-                                "org_id='${body[org_id]}',update_time = '${body[updateTime]}'::timestamp,sgt_last_time='${body[updateTime]}', " +
88
+                                "org_id='${body[org_id]}',update_time = '${body[updateTime]}'::timestamp,sgt_last_time='${body[sgtLastTime]}', " +
87 89
                                 "remarks ='${body[remarks]}' ,error_id ='${body[error_id]}' " +
88
-                                "where well_id ='${body[well_id]} and " +
89
-                                "(sgt_last_time is null or sgt_last_time<'${body[yesterday]}'::timestamp)"))
90
+                                "where well_id ='${body[well_id]}' and " +
91
+                                "(sgt_last_time is null or sgt_last_time::timestamp<'${body[yesterday]}'::timestamp)"))
90 92
                         .to("jdbc:centralbase")
91 93
                         .end();
92 94
 
@@ -135,10 +137,11 @@ public class CamelJDBCConfiguration {
135 137
                         });
136 138
                 setSysControlBody(statusDailyDYM)
137 139
                         .setBody(simple("select distinct WELL_ID , PROD_DATE , DYNAMIC_LIQ_LEVEL " +
138
-                                "from ZY_DXGW.PC_PRO_COM_DAILY " +
139
-                                "where  (WELL_ID,PROD_DATE) in" +
140
-                                " (SELECT  WELL_ID,max(PROD_DATE) PROD_DATE  FROM ZY_DXGW.PC_PRO_COM_DAILY" +
141
-                                "    WHERE DYNAMIC_LIQ_LEVEL is not null and WELL_ID='${header.well_id}' group by WELL_ID)"))
140
+                                "                                from ZY_DXGW.PC_PRO_COM_DAILY " +
141
+                                "                                where (WELL_ID,PROD_DATE) in " +
142
+                                "                                 (SELECT WELL_ID, max(PROD_DATE) PROD_DATE  FROM ZY_DXGW.PC_PRO_COM_DAILY " +
143
+                                "                                  where   PROD_DATE > TO_DATE('2024-01-01','yyyy-MM-dd')  " +
144
+                                "and WELL_ID='${header.well_id}' and DYNAMIC_LIQ_LEVEL is not null  group by WELL_ID)"))
142 145
                         .to("jdbc:oracle")
143 146
                         .split(body())
144 147
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYNAMIC_LIQ_LEVEL]}'  " +
@@ -157,10 +160,11 @@ public class CamelJDBCConfiguration {
157 160
                         });
158 161
                 setSysControlBody(statusDailyBS)
159 162
                         .setBody(simple("select distinct WELL_ID , PROD_DATE , PUMP_DEPTH " +
160
-                                "from ZY_DXGW.PC_PRO_COM_DAILY " +
161
-                                "where  (WELL_ID,PROD_DATE) in" +
162
-                                " (SELECT  WELL_ID,max(PROD_DATE) PROD_DATE  FROM ZY_DXGW.PC_PRO_COM_DAILY" +
163
-                                "    WHERE PUMP_DEPTH is not null and WELL_ID='${header.well_id}' group by WELL_ID)"))
163
+                                "                                from ZY_DXGW.PC_PRO_COM_DAILY " +
164
+                                "                                where (WELL_ID,PROD_DATE) in " +
165
+                                "                                 (SELECT WELL_ID, max(PROD_DATE) PROD_DATE  FROM ZY_DXGW.PC_PRO_COM_DAILY " +
166
+                                "                                  where   PROD_DATE > TO_DATE('2024-01-01','yyyy-MM-dd')  " +
167
+                                "and WELL_ID='${header.well_id}' and PUMP_DEPTH is not null  group by WELL_ID)"))
164 168
                         .to("jdbc:oracle")
165 169
                         .split(body())
166 170
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set pump_depth = '${body[PUMP_DEPTH]}'  " +
@@ -179,10 +183,11 @@ public class CamelJDBCConfiguration {
179 183
                         });
180 184
                 setSysControlBody(statusDailyBJ)
181 185
                         .setBody(simple("select distinct WELL_ID , PROD_DATE , PUMP_DIAMETER " +
182
-                                "from ZY_DXGW.PC_PRO_COM_DAILY " +
183
-                                "where  (WELL_ID,PROD_DATE) in" +
184
-                                " (SELECT  WELL_ID,max(PROD_DATE) PROD_DATE  FROM ZY_DXGW.PC_PRO_COM_DAILY" +
185
-                                "    WHERE PUMP_DIAMETER is not null and WELL_ID='${header.well_id}' group by WELL_ID)"))
186
+                                "                                from ZY_DXGW.PC_PRO_COM_DAILY " +
187
+                                "                                where (WELL_ID,PROD_DATE) in " +
188
+                                "                                 (SELECT WELL_ID, max(PROD_DATE) PROD_DATE  FROM ZY_DXGW.PC_PRO_COM_DAILY " +
189
+                                "                                  where   PROD_DATE > TO_DATE('2024-01-01','yyyy-MM-dd')  " +
190
+                                "and WELL_ID='${header.well_id}' and PUMP_DIAMETER is not null  group by WELL_ID)"))
186 191
                         .to("jdbc:oracle")
187 192
                         .split(body())
188 193
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[PUMP_DIAMETER]}'  " +
@@ -205,7 +210,7 @@ public class CamelJDBCConfiguration {
205 210
                         .split(body()).process(exchange -> {
206 211
                             Message in = exchange.getIn();
207 212
                             HashMap<String, Object> aRow = in.getBody(HashMap.class);
208
-                            aRow.put("submergence_depth", null);
213
+                            aRow.put("submergence_depth", 0);
209 214
                             if (aRow.get("start_pump_liq_level") != null && aRow.get("pump_depth") != null) {
210 215
                                 double cmd = Double.parseDouble(aRow.get("pump_depth").toString()) - Double.parseDouble(aRow.get("start_pump_liq_level").toString()) / 10;
211 216
                                 BigDecimal bd = new BigDecimal(cmd);
@@ -217,7 +222,7 @@ public class CamelJDBCConfiguration {
217 222
                         .doTry()
218 223
                         .to("jdbc:centralbase")
219 224
                         .doCatch(Exception.class)
220
-                        .log("${header.date}" + " routeId:update-statusDaily-submergenceDepth ->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
225
+//                        .log("${header.date}" + " routeId:update-statusDaily-submergenceDepth ->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
221 226
                         .end();
222 227
 
223 228
 
@@ -227,15 +232,17 @@ public class CamelJDBCConfiguration {
227 232
                         .process(exchange -> {
228 233
                             Message in = exchange.getIn();
229 234
                             in.setHeader("date", getDate());
235
+                            in.setHeader("date", LocalDateTime.now().minusDays(1).toLocalDate().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
230 236
                         });
231 237
                 setSysControlBody(volDaily)
232 238
                         .setBody(simple("select distinct  WELL_ID,PROD_DATE,PROD_TIME, LIQ_PROD_DAILY, OIL_PROD_DAILY,GAS_PROD_DAILY" +
233
-                                ",WATER_CUT, REMARKS from DBA01 where PROD_DATE  = to_date('${header.date}','yyyy-MM-dd')" +
239
+                                ",WATER_CUT, REMARKS from ZY_DXGW.PC_PRO_COM_DAILY where PROD_DATE  = to_date('${header.date}','yyyy-MM-dd')" +
234 240
                                 " and WELL_ID ='${header.well_id}'"))
235 241
                         .to("jdbc:oracle")
236 242
                         .split(body()).process(exchange -> {
237 243
                             Message in = exchange.getIn();
238 244
                             HashMap<String, Object> aRow = in.getBody(HashMap.class);
245
+
239 246
                             aRow.putIfAbsent("PROD_TIME", "0.0");
240 247
                             aRow.putIfAbsent("LIQ_PROD_DAILY", "0.0");
241 248
                             aRow.putIfAbsent("OIL_PROD_DAILY", "0.0");
@@ -276,6 +283,7 @@ public class CamelJDBCConfiguration {
276 283
                             if (!aRow.containsKey("YMD")) {
277 284
                                 aRow.put("YMD", 0.85);
278 285
                             }
286
+
279 287
                         })
280 288
                         .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks,gas_oil_ratio,water_prod_daily,water_gas_ratio,surface_crude_water_density,surface_crude_oil_density,stim_content) " +
281 289
                                 "select '${body[WELL_ID]}','${body[PROD_DATE]}','${body[WATER_CUT]}','${body[LIQ_PROD_DAILY]}','${body[OIL_PROD_DAILY]}','${body[GAS_PROD_DAILY]}','${body[WATER_CUT]}','${body[REMARKS]}','${body[QYB]}','${body[RCSL]}','${body[SQB]}','${body[SMD]}','${body[YMD]}' ,'${body[SCCW]}'" +
@@ -300,7 +308,8 @@ public class CamelJDBCConfiguration {
300 308
                             in.setHeader("date", getDate());
301 309
                         });
302 310
                 setSysControlBody(volDailyLiqProdDaily)
303
-                        .setBody(simple("select distinct  WELL_ID,PROD_DATE,WATER_CUT, LIQ_PROD_DAILY,OIL_PROD_DAILY,GAS_PROD_DAILY,WATER_CUT, REMARKS from DBA01 where PROD_DATE  = to_date('${header.date}','yyyy-MM-dd')and WELL_ID ='${header.well_id}' and qyrq is not null "))
311
+                        .setBody(simple("select distinct  WELL_ID,PROD_DATE,WATER_CUT, LIQ_PROD_DAILY,OIL_PROD_DAILY,GAS_PROD_DAILY,WATER_CUT, REMARKS" +
312
+                                " from ZY_DXGW.PC_PRO_COM_DAILY where PROD_DATE  = to_date('${header.date}','yyyy-MM-dd')and WELL_ID ='${header.well_id}'"))
304 313
                         .to("jdbc:oracle")
305 314
                         .split(body()).process(exchange -> {
306 315
                             Message in = exchange.getIn();

+ 2 - 2
src/main/resources/application.yml

@@ -131,7 +131,7 @@ gct:
131 131
     one:
132 132
       topic: diagnose-msg
133 133
       tags: v1
134
-      log-file-path: /home/lloyd/Desktop/log/task.json
134
+      log-file-path: C:\Users\Administrator\Desktop\package\a.json
135 135
       producer:
136 136
         group: diagnose
137 137
         access-key: diagnose-msg-v1
@@ -144,7 +144,7 @@ gct:
144 144
     two:
145 145
       topic: warn-msg
146 146
       tags: v1
147
-      log-file-path: /home/lloyd/Desktop/log/task.json
147
+      log-file-path: C:\Users\Administrator\Desktop\package\a.json
148 148
       producer:
149 149
         group: warn
150 150
         access-key: warn-msg-v1

File diff suppressed because it is too large
+ 32 - 8
src/test/java/com/gct/tools/etlcamelhuge/EtlCamelHugeApplicationTests.java