xsr il y a 8 mois
Parent
commit
3da2e8bf4a

+ 111 - 106
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCCofRealTimeConfiguration.java

@@ -19,26 +19,28 @@ import java.util.*;
19 19
 
20 20
 /**
21 21
  * class name: CamelJDBCCofRealTimeConfiguration.java
22
- *  实时导数据类
22
+ * 实时导数据类
23
+ *
23 24
  * @author lloyd
24 25
  * @version 1.0
25 26
  * @since 2021/4/14 下午3:16
26 27
  */
27
-//@Configuration
28
-public class CamelJDBCCofRealTimeConfiguration  {
28
+@Configuration
29
+public class CamelJDBCCofRealTimeConfiguration {
29 30
 
30
-    private static long sendMsgRunTime=0;
31
+    private static long sendMsgRunTime = 0;
31 32
 
32
-    public Double min(String[] strings){
33
+    public Double min(String[] strings) {
33 34
         double[] doubles = new double[strings.length];
34 35
         for (int i = 0; i < strings.length; i++) {
35 36
             doubles[i] = Double.parseDouble(strings[i]);
36 37
         }
37 38
         OptionalDouble min = Arrays.stream(doubles).min();
38
-        return min.isPresent()? min.getAsDouble() : null;
39
+        return min.isPresent() ? min.getAsDouble() : null;
39 40
     }
41
+
40 42
     //获取最大载荷
41
-    public Double max(String[] strings){
43
+    public Double max(String[] strings) {
42 44
         double[] doubles = new double[strings.length];
43 45
         for (int i = 0; i < strings.length; i++) {
44 46
             doubles[i] = Double.parseDouble(strings[i]);
@@ -48,16 +50,16 @@ public class CamelJDBCCofRealTimeConfiguration  {
48 50
     }
49 51
 
50 52
 
51
-    public String getDate(){
52
-        return  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
53
+    public String getDate() {
54
+        return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
53 55
     }
54 56
 
55 57
 
56 58
     /***
57 59
      * 异步发送 RunTime 消息到MQ中
58 60
      * */
59
-   // @Async
60
-    public void sendDataToRocketMQ(String wellName, String wellId, String prodDate, Double stroke_length , Double stroke_frequency, String sgt){
61
+    // @Async
62
+    public void sendDataToRocketMQ(String wellName, String wellId, String prodDate, Double stroke_length, Double stroke_frequency, String sgt) {
61 63
         String orgId = "0";
62 64
         DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), stroke_length, stroke_frequency);
63 65
         sendMsgRunTime++;
@@ -66,6 +68,7 @@ public class CamelJDBCCofRealTimeConfiguration  {
66 68
 
67 69
     @Resource(name = "diagnoseMessageProducer")
68 70
     private MessageProducer producer;
71
+
69 72
     @Bean
70 73
     public RouteBuilder routeBuilderWithRealTime() {
71 74
         return new RouteBuilder() {
@@ -77,86 +80,87 @@ public class CamelJDBCCofRealTimeConfiguration  {
77 80
                         .to("jdbc:centralbase")
78 81
                         .split(body()).process(exchange -> {
79 82
                             HashMap body = exchange.getIn().getBody(HashMap.class);
80
-                            exchange.getIn().setHeader("well_id",body.get("well_id"));
81
-                            exchange.getIn().setHeader("sgt_last_time",body.get("sgt_last_time"));
83
+                            exchange.getIn().setHeader("well_id", body.get("well_id"));
84
+                            exchange.getIn().setHeader("sgt_last_time", body.get("sgt_last_time"));
82 85
                         })
83
-                        .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   well_name='${header.well_id}' and  dyna_create_time >= '${header.sgt_last_time}' limit 50 "))
86
+                        .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load" +
87
+                                " from DEFAULT_GONGTU where well_name='${header.well_id}' and dyna_create_time >= '${header.sgt_last_time}' limit 50 "))
84 88
                         .to("jdbc:oracle")
85 89
                         .process(exchange -> {
86 90
                             sendMsgRunTime = 0;
87 91
                         })
88 92
                         .split(body()).process(exchange -> {
89
-                    Message in = exchange.getIn();
90
-                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
91
-                    String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
92
-                    aRow.put("dyna_create_time", prod_date);
93
-                    aRow.put("sgt", "");
94
-                    if (!StringUtils.isEmpty(aRow.get("displacement")) && !StringUtils.isEmpty(aRow.get("disp_load"))) {
95
-                        String[] displacements = aRow.get("displacement").toString().split(";");//10 四舍五入
96
-                        String[] disp_loads = aRow.get("disp_load").toString().split(";");
97
-                        Double susp_max_load = max(disp_loads);
98
-                        Double susp_min_load = min(disp_loads);
99
-                        String sgt = "";
100
-                        for (int i = 0; i < displacements.length; i++) {
101
-                            sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
102
-                        }
103
-                        String[] s = sgt.split(",");
104
-                        String w = "";
105
-                        for (int i = 0; i < s.length; i++) {
106
-                            w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
107
-                        }
108
-                        Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
109
-                        aRow.put("sgt", SGTUtil.encodeToString(doubles));
110
-                        aRow.put("susp_max_load",susp_max_load);
111
-                        aRow.put("susp_min_load",susp_min_load);
112
-                    }
113
-                    //对于位移没有数据,所有数据都在载荷中的特殊数据做特别处理
114
-                     if (StringUtils.isEmpty(aRow.get("displacement")) && !StringUtils.isEmpty(aRow.get("disp_load"))){
115
-                        String disp_load = aRow.get("disp_load").toString().replaceAll(";", ",");
116
-                        Double[][] doubles = SGTUtil.encodeToDoubleArray(disp_load);
117
-                        aRow.put("sgt", SGTUtil.encodeToString(doubles));
118
-                        String[] split = disp_load.split(",");
119
-                        List<String> list = new ArrayList<>();
120
-                        for (int i = 0; i < split.length; i++) {
121
-                            if (i%2 != 0){
122
-                                BigDecimal bigDecimal = new  BigDecimal(split[i]).divide(new BigDecimal("100.0"));
123
-                                list.add(bigDecimal.toString());
93
+                            Message in = exchange.getIn();
94
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
95
+                            String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
96
+                            aRow.put("dyna_create_time", prod_date);
97
+                            aRow.put("sgt", "");
98
+                            if (!StringUtils.isEmpty(aRow.get("displacement")) && !StringUtils.isEmpty(aRow.get("disp_load"))) {
99
+                                String[] displacements = aRow.get("displacement").toString().split(";");//10 四舍五入
100
+                                String[] disp_loads = aRow.get("disp_load").toString().split(";");
101
+                                Double susp_max_load = max(disp_loads);
102
+                                Double susp_min_load = min(disp_loads);
103
+                                String sgt = "";
104
+                                for (int i = 0; i < displacements.length; i++) {
105
+                                    sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
106
+                                }
107
+                                String[] s = sgt.split(",");
108
+                                String w = "";
109
+                                for (int i = 0; i < s.length; i++) {
110
+                                    w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
111
+                                }
112
+                                Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
113
+                                aRow.put("sgt", SGTUtil.encodeToString(doubles));
114
+                                aRow.put("susp_max_load", susp_max_load);
115
+                                aRow.put("susp_min_load", susp_min_load);
124 116
                             }
125
-                        }
126
-                        String[] loads = list.toArray(new String[0]);
127
-                         Double susp_max_load =null;
128
-                         Double susp_min_load =null;
129
-                        if (loads.length>0  ) {
130
-                            susp_max_load = max(loads);
131
-                            susp_min_load = min(loads);
132
-                        }
133
-                        aRow.put("susp_max_load",susp_max_load);
134
-                        aRow.put("susp_min_load",susp_min_load);
135
-                    }
136
-                    aRow.putIfAbsent("stroke", "0.0");
137
-                    aRow.putIfAbsent("frequency", "0.0");
138
-                    aRow.putIfAbsent("susp_max_load", "0.0");
139
-                    aRow.putIfAbsent("susp_min_load", "0.0");
140
-                    if (!StringUtils.isEmpty(aRow.get("frequency"))){
141
-                        BigDecimal bd=new BigDecimal(aRow.get("frequency").toString());
142
-                        double frequency=bd.setScale(1, RoundingMode.HALF_UP).doubleValue();
143
-                        aRow.put("frequency",frequency);
144
-                    }
145
-                    if (!StringUtils.isEmpty(aRow.get("stroke"))){
146
-                        double stroke1 = Double.parseDouble(aRow.get("stroke").toString());
147
-                        BigDecimal bd=new BigDecimal(stroke1);
148
-                        double stroke=bd.setScale(1, RoundingMode.HALF_UP).doubleValue();
149
-                        aRow.put("stroke",stroke);
150
-                    }
151
-                            String wellName =aRow.get("well_name").toString();
152
-                            String wellId =aRow.get("well_name").toString();
153
-                            String prodDate = aRow.get("dyna_create_time").toString().substring(0,19);
117
+                            //对于位移没有数据,所有数据都在载荷中的特殊数据做特别处理
118
+                            if (StringUtils.isEmpty(aRow.get("displacement")) && !StringUtils.isEmpty(aRow.get("disp_load"))) {
119
+                                String disp_load = aRow.get("disp_load").toString().replaceAll(";", ",");
120
+                                Double[][] doubles = SGTUtil.encodeToDoubleArray(disp_load);
121
+                                aRow.put("sgt", SGTUtil.encodeToString(doubles));
122
+                                String[] split = disp_load.split(",");
123
+                                List<String> list = new ArrayList<>();
124
+                                for (int i = 0; i < split.length; i++) {
125
+                                    if (i % 2 != 0) {
126
+                                        BigDecimal bigDecimal = new BigDecimal(split[i]).divide(new BigDecimal("100.0"));
127
+                                        list.add(bigDecimal.toString());
128
+                                    }
129
+                                }
130
+                                String[] loads = list.toArray(new String[0]);
131
+                                Double susp_max_load = null;
132
+                                Double susp_min_load = null;
133
+                                if (loads.length > 0) {
134
+                                    susp_max_load = max(loads);
135
+                                    susp_min_load = min(loads);
136
+                                }
137
+                                aRow.put("susp_max_load", susp_max_load);
138
+                                aRow.put("susp_min_load", susp_min_load);
139
+                            }
140
+                            aRow.putIfAbsent("stroke", "0.0");
141
+                            aRow.putIfAbsent("frequency", "0.0");
142
+                            aRow.putIfAbsent("susp_max_load", "0.0");
143
+                            aRow.putIfAbsent("susp_min_load", "0.0");
144
+                            if (!StringUtils.isEmpty(aRow.get("frequency"))) {
145
+                                BigDecimal bd = new BigDecimal(aRow.get("frequency").toString());
146
+                                double frequency = bd.setScale(1, RoundingMode.HALF_UP).doubleValue();
147
+                                aRow.put("frequency", frequency);
148
+                            }
149
+                            if (!StringUtils.isEmpty(aRow.get("stroke"))) {
150
+                                double stroke1 = Double.parseDouble(aRow.get("stroke").toString());
151
+                                BigDecimal bd = new BigDecimal(stroke1);
152
+                                double stroke = bd.setScale(1, RoundingMode.HALF_UP).doubleValue();
153
+                                aRow.put("stroke", stroke);
154
+                            }
155
+                            String wellName = aRow.get("well_name").toString();
156
+                            String wellId = aRow.get("well_name").toString();
157
+                            String prodDate = aRow.get("dyna_create_time").toString().substring(0, 19);
154 158
                             Double strokeLength = Double.valueOf(aRow.get("stroke").toString());
155 159
                             Double strokeFrequency = Double.valueOf(aRow.get("frequency").toString());
156 160
                             String sgt = aRow.get("sgt").toString();
157
-                            in.setHeader("sgt_last_time",prodDate);
158
-                            in.setHeader("well_id",wellId);
159
-                            sendDataToRocketMQ(wellName,wellId,prodDate,strokeLength,strokeFrequency,sgt);
161
+                            in.setHeader("sgt_last_time", prodDate);
162
+                            in.setHeader("well_id", wellId);
163
+                            sendDataToRocketMQ(wellName, wellId, prodDate, strokeLength, strokeFrequency, sgt);
160 164
                         })
161 165
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,susp_max_load,susp_min_load,sgt) " +
162 166
                                 "select '${body[well_name]}','${body[dyna_create_time]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
@@ -166,46 +170,46 @@ public class CamelJDBCCofRealTimeConfiguration  {
166 170
                         .to("jdbc:centralbase")
167 171
                         .end();
168 172
 
169
-               from("timer:mytimer-update-avg-mech_daily?period=10800000")
173
+                from("timer:mytimer-update-avg-mech_daily?period=10800000")
170 174
                         .routeId("update-avg-mech_daily")
171
-                       .process(exchange -> {
172
-                           Message in = exchange.getIn();
173
-                           in.setHeader("date",getDate());
174
-                       })
175
+                        .process(exchange -> {
176
+                            Message in = exchange.getIn();
177
+                            in.setHeader("date", getDate());
178
+                        })
175 179
                         .setBody(simple("select well_id,avg(stroke_length) stroke_length  ,avg(stroke_frequency) stroke_frequency from centralbase.cb_temp_well_mech_runtime where prod_date::date='${header.date}' group by well_id"))
176 180
                         .to("jdbc:centralbase")
177 181
                         .split(body()).process(exchange -> {
178
-                    Message in = exchange.getIn();
179
-                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
180
-                    if (aRow.get("stroke_length")!=null && aRow.get("stroke_frequency")!=null){
181
-                        double stroke_length=Double.parseDouble(aRow.get("stroke_length").toString());
182
-                        double stroke_frequency=Double.parseDouble(aRow.get("stroke_frequency").toString());
183
-                        BigDecimal bd=new BigDecimal(stroke_length);
184
-                        double stroke_lengt1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
185
-                        BigDecimal bd1=new BigDecimal(stroke_frequency);
186
-                        double stroke_frequency1=bd1.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
187
-                        aRow.put("strokeLength",stroke_lengt1);
188
-                        aRow.put("strokeFrequency",stroke_frequency1);
189
-                    }
190
-                })
182
+                            Message in = exchange.getIn();
183
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
184
+                            if (aRow.get("stroke_length") != null && aRow.get("stroke_frequency") != null) {
185
+                                double stroke_length = Double.parseDouble(aRow.get("stroke_length").toString());
186
+                                double stroke_frequency = Double.parseDouble(aRow.get("stroke_frequency").toString());
187
+                                BigDecimal bd = new BigDecimal(stroke_length);
188
+                                double stroke_lengt1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
189
+                                BigDecimal bd1 = new BigDecimal(stroke_frequency);
190
+                                double stroke_frequency1 = bd1.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
191
+                                aRow.put("strokeLength", stroke_lengt1);
192
+                                aRow.put("strokeFrequency", stroke_frequency1);
193
+                            }
194
+                        })
191 195
                         .setBody(simple("update centralbase.cb_temp_well_mech_daily set stroke_length='${body[strokeLength]}' ,stroke_frequency ='${body[strokeFrequency]}' where well_id = '${body[well_id]}' and prod_date::date='${header.date}' "))
192 196
                         .doTry()
193 197
                         .to("jdbc:centralbase")
194 198
                         .doCatch(Exception.class)
195
-                        .log("${header.date}"+" routeId:update-avg-mech_daily ->  centralbase.cb_temp_well_mech_daily update data failed")
199
+                        .log("${header.date}" + " routeId:update-avg-mech_daily ->  centralbase.cb_temp_well_mech_daily update data failed")
196 200
                         .end();
197 201
 
198
-                from("timer:mytimer-insert-mechDaily?period=3600000")
202
+               /* from("timer:mytimer-insert-mechDaily?period=3600000")
199 203
                         .routeId("insert-mech_daily")
200 204
                         .process(exchange -> {
201 205
                             Message in = exchange.getIn();
202
-                            in.setHeader("date",getDate());
206
+                            in.setHeader("date", getDate());
203 207
                         })
204 208
                         .setBody(simple("select well_id from centralbase.sys_access_well_control  where  access_status='1'  "))
205 209
                         .to("jdbc:centralbase")
206 210
                         .split(body()).process(exchange -> {
207 211
                             HashMap body = exchange.getIn().getBody(HashMap.class);
208
-                            exchange.getIn().setHeader("well_id",body.get("well_id"));
212
+                            exchange.getIn().setHeader("well_id", body.get("well_id"));
209 213
                         })
210 214
                         .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and jh='${header.well_id}' and qyrq is not null  "))
211 215
                         .to("jdbc:oracle")
@@ -229,9 +233,10 @@ public class CamelJDBCCofRealTimeConfiguration  {
229 233
                         .doTry()
230 234
                         .to("jdbc:centralbase")
231 235
                         .doCatch(Exception.class)
232
-                        .log("${header.date}"+" routeId:insert-mech_daily ->  centralbase.cb_temp_well_mech_daily insert data failed")
233
-                        .end();
234
-            };
236
+                        .log("${header.date}" + " routeId:insert-mech_daily ->  centralbase.cb_temp_well_mech_daily insert data failed")
237
+                        .end();*/
238
+            }
239
+
235 240
         };
236 241
     }
237 242
 }

+ 47 - 41
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -7,6 +7,7 @@ import org.apache.camel.model.ExpressionNode;
7 7
 import org.apache.camel.model.ProcessorDefinition;
8 8
 import org.apache.camel.model.RouteDefinition;
9 9
 import org.springframework.context.annotation.Bean;
10
+import org.springframework.context.annotation.Configuration;
10 11
 
11 12
 import java.math.BigDecimal;
12 13
 import java.math.RoundingMode;
@@ -21,7 +22,7 @@ import java.util.*;
21 22
  * @version 1.0
22 23
  * @since 2021/4/14 下午3:16
23 24
  */
24
-//@Configuration
25
+@Configuration
25 26
 public class CamelJDBCConfiguration {
26 27
 
27 28
 
@@ -72,7 +73,7 @@ public class CamelJDBCConfiguration {
72 73
                             orgIdPreList.put(body.get("org_id_pre"),body.get("org_id"));
73 74
                         }).end();
74 75
                         setMyBody(OrgAndWellSource)
75
-                       .setBody(simple("select  distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from DBA01 where rq=to_date('${header.date}','yyyy-mm-dd hh24:mi:ss') and jh ='${header.well_id}'"))
76
+                       .setBody(simple("select  distinct WELL_ID,cydmc,zyq,zk,qyrq,sccw,qk,REMARKS from DBA01 where PROD_DATE=to_date('${header.date}','yyyy-mm-dd hh24:mi:ss') and WELL_ID ='${header.well_id}'"))
76 77
                        .to("jdbc:oracle")
77 78
                        .transform()
78 79
                        .body((result) -> {
@@ -86,7 +87,7 @@ public class CamelJDBCConfiguration {
86 87
                       if (StringUtils.isEmpty(aRow.get("qyrq"))){
87 88
                           aRow.put("QYRQ","2021-01-01 00:00:00");
88 89
                       }
89
-                      if (!aRow.containsKey("JM")) aRow.put("JM",aRow.get("JH"));
90
+                      if (!aRow.containsKey("JM")) aRow.put("JM",aRow.get("WELL_ID"));
90 91
                       String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK");
91 92
                       String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC");
92 93
                       String org_level1 = aRow.get("ZYQ").toString();
@@ -112,8 +113,8 @@ public class CamelJDBCConfiguration {
112 113
                       }
113 114
                   })
114 115
                   .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,org_id,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
115
-                                  "select '${body[JH]}','${body[JM]}','${body[QYRQ]}'::timestamp,'${body[org_id]}','${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}' " +
116
-                                  "where NOT EXISTS ( SELECT * FROM centralbase.cb_cd_well_source WHERE well_id = '${body[JH]}' )"))
116
+                                  "select '${body[WELL_ID]}','${body[JM]}','${body[QYRQ]}'::timestamp,'${body[org_id]}','${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[REMARKS]}' " +
117
+                                  "where NOT EXISTS ( SELECT * FROM centralbase.cb_cd_well_source WHERE well_id = '${body[WELL_ID]}' )"))
117 118
                   .to("jdbc:centralbase")
118 119
                   .end()
119 120
                   .transform().body((re) -> {
@@ -335,39 +336,41 @@ public class CamelJDBCConfiguration {
335 336
                             in.setHeader("date", getDate());
336 337
                         });
337 338
                 setSysControlBody(volDaily)
338
-                        .setBody(simple("select distinct  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz,sccw from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and jh ='${header.well_id}' and qyrq is not null "))
339
+                        .setBody(simple("select distinct  WELL_ID,PROD_DATE,PROD_TIME, LIQ_PROD_DAILY, OIL_PROD_DAILY,GAS_PROD_DAILY" +
340
+                                ",WATER_CUT, REMARKS from DBA01 where PROD_DATE  = to_date('${header.date}','yyyy-MM-dd')" +
341
+                                " and WELL_ID ='${header.well_id}'"))
339 342
                         .to("jdbc:oracle")
340 343
                         .split(body()).process(exchange -> {
341 344
                             Message in = exchange.getIn();
342 345
                             HashMap<String, Object> aRow = in.getBody(HashMap.class);
343
-                            if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0");
344
-                            if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0");
345
-                            if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0");
346
-                            if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0");
347
-                            if (aRow.get("HS") == null) aRow.put("HS", "0.0");
348
-                            if (aRow.get("BZ") == null) aRow.put("BZ", "");
349
-                            if (aRow.get("SCCW") == null) aRow.put("SCCW", "");
346
+                            aRow.putIfAbsent("PROD_TIME", "0.0");
347
+                            aRow.putIfAbsent("LIQ_PROD_DAILY", "0.0");
348
+                            aRow.putIfAbsent("OIL_PROD_DAILY", "0.0");
349
+                            aRow.putIfAbsent("GAS_PROD_DAILY", "0.0");
350
+                            aRow.putIfAbsent("WATER_CUT", "0.0");
351
+                            aRow.putIfAbsent("REMARKS", "");
352
+                            aRow.put("SCCW","");
350 353
                             aRow.put("RCSL", -1);
351 354
                             aRow.put("QYB", -1);
352 355
                             aRow.put("SQB", -1);
353
-                            if (aRow.get("RCQL") != null && aRow.get("RCYL") != null && !aRow.get("RCYL").equals("0.0") && aRow.get("RCYL") != "0.0") {
354
-                                double qyb = Double.valueOf(aRow.get("RCQL").toString()) / Double.valueOf(aRow.get("RCYL").toString());
356
+                            if (aRow.get("GAS_PROD_DAILY") != null && aRow.get("OIL_PROD_DAILY") != null && !aRow.get("OIL_PROD_DAILY").equals("0.0") && aRow.get("OIL_PROD_DAILY") != "0.0") {
357
+                                double qyb = Double.valueOf(aRow.get("GAS_PROD_DAILY").toString()) / Double.valueOf(aRow.get("OIL_PROD_DAILY").toString());
355 358
                                 if (!Double.isNaN(qyb) && !Double.isInfinite(qyb)) {
356 359
                                     BigDecimal bd = new BigDecimal(qyb);
357 360
                                     double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
358 361
                                     aRow.put("QYB", d1);
359 362
                                 }
360 363
                             }
361
-                            if (aRow.get("RCYL1") != null && aRow.get("HS") != null && !aRow.get("HS").equals("0.0") && aRow.get("HS") != "0.0") {
362
-                                double rcsl = (Double.valueOf(aRow.get("RCYL1").toString()) * Double.valueOf(aRow.get("HS").toString())) / 100;
364
+                            if (aRow.get("LIQ_PROD_DAILY") != null && aRow.get("WATER_CUT") != null && !aRow.get("WATER_CUT").equals("0.0") && aRow.get("WATER_CUT") != "0.0") {
365
+                                double rcsl = (Double.valueOf(aRow.get("LIQ_PROD_DAILY").toString()) * Double.valueOf(aRow.get("WATER_CUT").toString())) / 100;
363 366
                                 if (!Double.isNaN(rcsl) && !Double.isInfinite(rcsl)) {
364 367
                                     BigDecimal bd = new BigDecimal(rcsl);
365 368
                                     double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
366 369
                                     aRow.put("RCSL", d1);
367 370
                                 }
368 371
                             }
369
-                            if (aRow.get("RCQL") != null && aRow.get("RCSL") != null && !aRow.get("RCQL").equals("0.0") && aRow.get("RCQL") != "0.0") {
370
-                                double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("RCQL").toString());
372
+                            if (aRow.get("GAS_PROD_DAILY") != null && aRow.get("RCSL") != null && !aRow.get("GAS_PROD_DAILY").equals("0.0") && aRow.get("GAS_PROD_DAILY") != "0.0") {
373
+                                double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("GAS_PROD_DAILY").toString());
371 374
                                 if (!Double.isNaN(sqb) && !Double.isInfinite(sqb)) {
372 375
                                     BigDecimal bd = new BigDecimal(sqb);
373 376
                                     double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
@@ -382,8 +385,8 @@ public class CamelJDBCConfiguration {
382 385
                             }
383 386
                         })
384 387
                         .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks,gas_oil_ratio,water_prod_daily,water_gas_ratio,surface_crude_water_density,surface_crude_oil_density,stim_content) " +
385
-                                "select '${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}','${body[QYB]}','${body[RCSL]}','${body[SQB]}','${body[SMD]}','${body[YMD]}' ,'${body[SCCW]}'" +
386
-                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
388
+                                "select '${body[WELL_ID]}','${body[PROD_DATE]}','${body[WATER_CUT]}','${body[LIQ_PROD_DAILY]}','${body[OIL_PROD_DAILY]}','${body[GAS_PROD_DAILY]}','${body[WATER_CUT]}','${body[REMARKS]}','${body[QYB]}','${body[RCSL]}','${body[SQB]}','${body[SMD]}','${body[YMD]}' ,'${body[SCCW]}'" +
389
+                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[WELL_ID]}' and  prod_date = '${body[PROD_DATE]}' )"))
387 390
                         .doTry()
388 391
                         .to("jdbc:centralbase")
389 392
                         .doCatch(Exception.class)
@@ -404,38 +407,38 @@ public class CamelJDBCConfiguration {
404 407
                             in.setHeader("date", getDate());
405 408
                         });
406 409
                 setSysControlBody(volDailyLiqProdDaily)
407
-                        .setBody(simple("select distinct  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd')and jh ='${header.well_id}' and qyrq is not null "))
410
+                        .setBody(simple("select distinct  WELL_ID,PROD_DATE,WATER_CUT, LIQ_PROD_DAILY,OIL_PROD_DAILY,GAS_PROD_DAILY,WATER_CUT, REMARKS from DBA01 where PROD_DATE  = to_date('${header.date}','yyyy-MM-dd')and WELL_ID ='${header.well_id}' and qyrq is not null "))
408 411
                         .to("jdbc:oracle")
409 412
                         .split(body()).process(exchange -> {
410 413
                             Message in = exchange.getIn();
411 414
                             HashMap<String, Object> aRow = in.getBody(HashMap.class);
412
-                            if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0");
413
-                            if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0");
414
-                            if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0");
415
-                            if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0");
416
-                            if (aRow.get("HS") == null) aRow.put("HS", "0.0");
417
-                            if (aRow.get("BZ") == null) aRow.put("BZ", "");
415
+                            if (aRow.get("WATER_CUT") == null) aRow.put("WATER_CUT", "0.0");
416
+                            if (aRow.get("LIQ_PROD_DAILY") == null) aRow.put("LIQ_PROD_DAILY", "0.0");
417
+                            if (aRow.get("OIL_PROD_DAILY") == null) aRow.put("OIL_PROD_DAILY", "0.0");
418
+                            if (aRow.get("GAS_PROD_DAILY") == null) aRow.put("GAS_PROD_DAILY", "0.0");
419
+                            if (aRow.get("WATER_CUT") == null) aRow.put("WATER_CUT", "0.0");
420
+                            if (aRow.get("REMARKS") == null) aRow.put("REMARKS", "");
418 421
                             aRow.put("RCSL", -1);
419 422
                             aRow.put("QYB", -1);
420 423
                             aRow.put("SQB", -1);
421
-                            if (aRow.get("RCQL") != null && aRow.get("RCYL") != null && !aRow.get("RCYL").equals("0.0") && aRow.get("RCYL") != "0.0") {
422
-                                double qyb = Double.valueOf(aRow.get("RCQL").toString()) / Double.valueOf(aRow.get("RCYL").toString());
424
+                            if (aRow.get("GAS_PROD_DAILY") != null && aRow.get("OIL_PROD_DAILY") != null && !aRow.get("OIL_PROD_DAILY").equals("0.0") && aRow.get("OIL_PROD_DAILY") != "0.0") {
425
+                                double qyb = Double.valueOf(aRow.get("GAS_PROD_DAILY").toString()) / Double.valueOf(aRow.get("OIL_PROD_DAILY").toString());
423 426
                                 if (!Double.isNaN(qyb) && !Double.isInfinite(qyb)) {
424 427
                                     BigDecimal bd = new BigDecimal(qyb);
425 428
                                     double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
426 429
                                     aRow.put("QYB", d1);
427 430
                                 }
428 431
                             }
429
-                            if (aRow.get("RCYL1") != null && aRow.get("HS") != null && !aRow.get("HS").equals("0.0") && aRow.get("HS") != "0.0") {
430
-                                double rcsl = (Double.valueOf(aRow.get("RCYL1").toString()) * Double.valueOf(aRow.get("HS").toString())) / 100;
432
+                            if (aRow.get("LIQ_PROD_DAILY") != null && aRow.get("WATER_CUT") != null && !aRow.get("WATER_CUT").equals("0.0") && aRow.get("WATER_CUT") != "0.0") {
433
+                                double rcsl = (Double.valueOf(aRow.get("LIQ_PROD_DAILY").toString()) * Double.valueOf(aRow.get("WATER_CUT").toString())) / 100;
431 434
                                 if (!Double.isNaN(rcsl) && !Double.isInfinite(rcsl)) {
432 435
                                     BigDecimal bd = new BigDecimal(rcsl);
433 436
                                     double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
434 437
                                     aRow.put("RCSL", d1);
435 438
                                 }
436 439
                             }
437
-                            if (aRow.get("RCQL") != null && aRow.get("RCSL") != null && !aRow.get("RCQL").equals("0.0") && aRow.get("RCQL") != "0.0") {
438
-                                double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("RCQL").toString());
440
+                            if (aRow.get("GAS_PROD_DAILY") != null && aRow.get("RCSL") != null && !aRow.get("GAS_PROD_DAILY").equals("0.0") && aRow.get("GAS_PROD_DAILY") != "0.0") {
441
+                                double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("GAS_PROD_DAILY").toString());
439 442
                                 if (!Double.isNaN(sqb) && !Double.isInfinite(sqb)) {
440 443
                                     BigDecimal bd = new BigDecimal(sqb);
441 444
                                     double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
@@ -449,32 +452,35 @@ public class CamelJDBCConfiguration {
449 452
                                 aRow.put("YMD", 0.85);
450 453
                             }
451 454
                         })
452
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set prod_time = '${body[SCSJ]}' ,liq_prod_daily='${body[RCYL1]}' ,oil_prod_daily ='${body[RCYL]}' ,gas_prod_daily ='${body[RCQL]}' ,water_cut='${body[HS]}' ,remarks='${body[BZ]}' ,gas_oil_ratio='${body[QYB]}' ,water_prod_daily='${body[RCSL]}' ,water_gas_ratio='${body[SQB]}',surface_crude_water_density='${body[SMD]}',surface_crude_oil_density= '${body[YMD]}' " +
453
-                                "where well_id = '${body[JH]}' and prod_date ='${body[RQ]}' "))
455
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set prod_time = '${body[WATER_CUT]}' ,liq_prod_daily='${body[LIQ_PROD_DAILY]}' ,oil_prod_daily ='${body[OIL_PROD_DAILY]}' ,gas_prod_daily ='${body[GAS_PROD_DAILY]}' " +
456
+                                ",water_cut='${body[WATER_CUT]}' ,remarks='${body[REMARKS]}' ,gas_oil_ratio='${body[QYB]}' " +
457
+                                ",water_prod_daily='${body[RCSL]}' ,water_gas_ratio='${body[SQB]}',surface_crude_water_density='${body[SMD]}'" +
458
+                                ",surface_crude_oil_density= '${body[YMD]}' " +
459
+                                "where well_id = '${body[WELL_ID]}' and prod_date ='${body[PROD_DATE]}' "))
454 460
                         .doTry()
455 461
                         .to("jdbc:centralbase")
456 462
                         .doCatch(Exception.class)
457 463
                         .log("${header.date}" + " routeId:update-volDaily-liq_prod_daily ->  centralbase.cb_pc_pro_wellbore_vol_daily update data failed")
458 464
                         .end();
459 465
 
460
-                RouteDefinition runTime = from("timer:mytimer-update-runTime-strokeAndFrequency?period=10800000")
466
+               /* RouteDefinition runTime = from("timer:mytimer-update-runTime-strokeAndFrequency?period=10800000")
461 467
                         .routeId("update-runTime-strokeAndFrequency")
462 468
                         .process(exchange -> {
463 469
                             Message in = exchange.getIn();
464 470
                             in.setHeader("date", getDate());
465 471
                         });
466 472
                 setSysControlBody(runTime)
467
-                        .setBody(simple("select distinct  jh,rq,cc,cs from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd')and jh ='${header.well_id}' and qyrq is not null "))
473
+                        .setBody(simple("select distinct  WELL_ID,DYNA_CREATE_TIME,STROKE,FREQUENCY from DBA01 where PROD_DATE  = to_date('${header.date}','yyyy-MM-dd') and WELL_ID ='${header.well_id}'"))
468 474
                         .to("jdbc:oracle")
469 475
                         .split(body())
470 476
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency)" +
471
-                                "select '${body[JH]}','${body[RQ]}','${body[CC]}','${body[CS]}'" +
472
-                                "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
477
+                                "select '${body[WELL_ID]}','${body[PROD_DATE]}','${body[STROKE]}','${body[FREQUENCY]}'" +
478
+                                "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[WELL_ID]}' and  prod_date = '${body[PROD_DATE]}' )"))
473 479
                         .doTry()
474 480
                         .to("jdbc:centralbase")
475 481
                         .doCatch(Exception.class)
476 482
                         .log("${header.date}" + " routeId:update-runTime-strokeAndFrequency ->  centralbase.cb_temp_well_mech_runtime update data failed")
477
-                        .end();
483
+                        .end();*/
478 484
             }
479 485
 
480 486
             ;

+ 0 - 84
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJMSConfiguration.java

@@ -1,84 +0,0 @@
1
-package com.gct.tools.etlcamelhuge.routeconfig;
2
-
3
-import lombok.extern.slf4j.Slf4j;
4
-import org.apache.camel.Message;
5
-import org.apache.camel.builder.RouteBuilder;
6
-import org.apache.camel.component.file.GenericFile;
7
-import org.springframework.context.annotation.Bean;
8
-import org.springframework.context.annotation.Configuration;
9
-
10
-import java.io.BufferedReader;
11
-import java.io.File;
12
-import java.io.FileInputStream;
13
-import java.io.InputStreamReader;
14
-
15
-/**
16
- * class name: CamelJMSConfiguration
17
- *
18
- * @author lloyd
19
- * @version 1.0
20
- * @since 2021/4/14 下午3:17
21
- */
22
-@Slf4j
23
-//@Configuration
24
-public class CamelJMSConfiguration {
25
-    @Bean
26
-    RouteBuilder routeBuilderJms() {
27
-        return new RouteBuilder() {
28
-            @Override
29
-            public void configure() throws Exception {
30
-                from("file:{{user.home}}/Desktop/cameltest/out")
31
-                        .routeId("file-to-jms")
32
-                        .transform()
33
-                        .body(GenericFile.class, gf -> {
34
-                            final BufferedReader in;
35
-                            try {
36
-                                in = new BufferedReader(new InputStreamReader(new FileInputStream((File) gf.getFile())));
37
-                                StringBuilder sb = new StringBuilder();
38
-                                while (true) {
39
-                                    String s = in.readLine();
40
-                                    if (s == null || s.isEmpty()) break;
41
-                                    sb.append(s);
42
-                                }
43
-                                return sb.toString();
44
-                            } catch (Exception e) {
45
-                                throw new RuntimeException("read fault!");
46
-                            }
47
-                        })
48
-                        .process()
49
-                        .exchange(exchange -> {
50
-                            final Message in = exchange.getIn();
51
-                            String body = in.getBody(String.class);
52
-                            log.info("exchange is :" + body);
53
-                        })
54
-                        .to("jms:queue:testfiles").tracing();
55
-
56
-                from("jms:queue:testfiles")
57
-                        .routeId("jms-to-file")
58
-                        .transform()
59
-                        .body(String.class, s -> s + "amazing\n")
60
-                        .process()
61
-                        .exchange(exchange -> {
62
-                            final Message in = exchange.getIn();
63
-                            log.info("from jms exchange is :" + in.getBody(String.class));
64
-                        })
65
-                        .to("file:{{user.home}}/Desktop/cameltest/jmsout").tracing();
66
-            }
67
-        };
68
-    }
69
-
70
-    /* @Component
71
-    static class DefaultJmsComponent implements ComponentCustomizer<JmsComponent> {
72
-
73
-        private final ConnectionFactory connectionFactory;
74
-
75
-        DefaultJmsComponent(ConnectionFactory connectionFactory) {
76
-            this.connectionFactory = connectionFactory;
77
-        }
78
-
79
-        @Override
80
-        public void customize(JmsComponent component) {
81
-            component.setConnectionFactory(connectionFactory);
82
-        }
83
-    }*/
84
-}

Fichier diff supprimé car celui-ci est trop grand
+ 0 - 544
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelRestConfiguration.java


+ 0 - 12
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelRocketMQConfiguration.java

@@ -1,12 +0,0 @@
1
-package com.gct.tools.etlcamelhuge.routeconfig;
2
-
3
-/**
4
- * class name: CamelRocketMQConfiguration
5
- *
6
- * @author lloyd
7
- * @version 1.0
8
- * @since 2021/5/17 下午4:14
9
- */
10
-public class CamelRocketMQConfiguration {
11
-
12
-}