瀏覽代碼

调整格式

lijian 3 年之前
父節點
當前提交
4fd20b9c89

+ 327 - 264
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -46,9 +46,16 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
46 46
         return Arrays.stream(doubles).max().getAsDouble();
47 47
     }
48 48
 
49
+
49 50
     public String getDate(){
50 51
         return  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
51 52
     }
53
+//    public static String getDate()
54
+//    {
55
+//        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
56
+//        String date = df.format(new java.util.Date());// new Date()为获取当前系统时间
57
+//        return date;
58
+//    }
52 59
 
53 60
     @Resource(name = "diagnoseMessageProducer")
54 61
     private MessageProducer producer;
@@ -62,8 +69,325 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
62 69
             //全部执行完成的大概时间在30-40分钟
63 70
             @Override
64 71
             public void configure() throws Exception {
65
-                //24小时执行一次
66
-                //单个执行时间30s左右,在之前有数据的情况下
72
+
73
+
74
+                //   A2数据
75
+                from("timer:mytimer2?period=3600000")
76
+                        .routeId("oracle-2")
77
+                        .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
78
+                        .to("jdbc:centralbase")
79
+                        .split(body()).process(exchange -> {
80
+                            Message in = exchange.getIn();
81
+                            in.setHeader("date", getDate());
82
+                })
83
+                        .setBody(simple("SELECT DISTINCT well_id wellid FROM centralbase.cb_temp_well_mech_runtime where prod_date > '${header.date}' "))
84
+                        .to("jdbc:centralbase")
85
+                        .split(body())
86
+                        .setBody(simple("select v.WELL_COMMON_NAME jh,v.PROD_DATE rq,v.OIL_PRODUCTION_METHOD cyfs,v.PUMP_DIAMETER yz,v.BACK_PRES hysx,v.MAX_TUBING_PRES yysx,v.MAX_CASING_PRES tysx,v.PUMP_DEPTH bs from V_PC_PRO_COM_DAILY_CYYC_A2 v  where  v.PROD_DATE = to_date('${header.date}','yyyy-MM-dd')  and  v.WELL_COMMON_NAME = '${body[wellid]}' "))
87
+                        .to("jdbc:oracle")
88
+                        .split(body()).process(exchange -> {
89
+                    Message in = exchange.getIn();
90
+                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
91
+                    if (aRow.get("YZ") == null) aRow.put("YZ", "0.0");
92
+                    if (aRow.get("HYSX") == null) aRow.put("HYSX", "0.0");
93
+                    if (aRow.get("YYSX") == null) aRow.put("YYSX", "0.0");
94
+                    if (aRow.get("TYSX") == null) aRow.put("TYSX", "0.0");
95
+                    if (aRow.get("BS") == null) aRow.put("BS", "0.0");
96
+                    if (aRow.get("DYM") == null)aRow.put("DYM","0.0");
97
+                })
98
+                        .log("${header.date}")
99
+                        .log("mytimer2")
100
+
101
+                        .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth,start_pump_liq_level) " +
102
+                                "select '${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}' ,'${body[DYM]}'" +
103
+                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_status_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}')"))
104
+                        .doTry()
105
+                        .to("jdbc:centralbase")
106
+                        .doCatch(Exception.class)
107
+                        .log("${header.date}"+" routeId:oracle-2->  centralbase.cb_pc_pro_wellbore_status_daily insert date faild")
108
+                        .end();
109
+
110
+
111
+                from("timer:mytimer3?period=3600000")
112
+                        .routeId("oracle-3")
113
+                        .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
114
+                        .to("jdbc:centralbase")
115
+                        .split(body()).process(exchange -> {
116
+                            Message in = exchange.getIn();
117
+                            in.setHeader("date", getDate());
118
+                })
119
+                        .log("mytimer3b")
120
+                        .setBody(simple("SELECT DISTINCT well_id wellid FROM centralbase.cb_temp_well_mech_runtime where prod_date > '${header.date}' "))
121
+                        .to("jdbc:centralbase")
122
+                        .split(body())
123
+                        .setBody(simple("select  WELL_COMMON_NAME jh, PROD_DATE rq,PROD_TIME scsj, LIQ_PROD_DAILY rcyl1,OIL_PROD_DAILY rcyl,GAS_PROD_DAILY rcql,WATER_CUT hs,REMARKS bz from V_PC_PRO_COM_DAILY_CYYC_A2 where prod_date = to_date('${header.date}','yyyy-MM-dd') and WELL_COMMON_NAME = '${body[wellid]}'"))
124
+                        .to("jdbc:oracle")
125
+                        .split(body()).process(exchange -> {
126
+                    Message in = exchange.getIn();
127
+                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
128
+                    if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0");
129
+                    if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0");
130
+                    if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0");
131
+                    if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0");
132
+                    if (aRow.get("HS") == null) aRow.put("HS", "0.0");
133
+                    if (aRow.get("BZ") == null) aRow.put("BZ", "");
134
+                    aRow.put("RCSL",-1);
135
+                    aRow.put("QYB",-1);
136
+                    aRow.put("SQB",-1);
137
+                    if (aRow.get("RCQL")!=null && aRow.get("RCYL")!=null && !aRow.get("RCYL").equals("0.0") && aRow.get("RCYL") != "0.0"){
138
+                        double qyb = Double.valueOf(aRow.get("RCQL").toString()) / Double.valueOf(aRow.get("RCYL").toString());
139
+                        if (!Double.isNaN(qyb) && !Double.isInfinite(qyb)){
140
+                            BigDecimal bd=new BigDecimal(qyb);
141
+                            double d1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
142
+                            aRow.put("QYB",d1);
143
+                        }
144
+                    }
145
+                    if (aRow.get("RCYL1")!=null && aRow.get("HS") != null && !aRow.get("HS").equals("0.0") && aRow.get("HS") != "0.0"){
146
+                        double rcsl = (Double.valueOf(aRow.get("RCYL1").toString()) * Double.valueOf(aRow.get("HS").toString()))/100;
147
+                        if (!Double.isNaN(rcsl) && !Double.isInfinite(rcsl)) {
148
+                            BigDecimal bd = new BigDecimal(rcsl);
149
+                            double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
150
+                            aRow.put("RCSL", d1);
151
+                        }
152
+                    }
153
+                    if (aRow.get("RCQL") != null && aRow.get("RCSL") != null && !aRow.get("RCQL").equals("0.0") && aRow.get("RCQL") != "0.0" ){
154
+                        double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("RCQL").toString());
155
+                        if (!Double.isNaN(sqb) && !Double.isInfinite(sqb)) {
156
+                            BigDecimal bd = new BigDecimal(sqb);
157
+                            double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
158
+                            aRow.put("SQB", d1);
159
+                        }
160
+                    }
161
+                    if (!aRow.containsKey("SMD")){
162
+                        aRow.put("SMD",1);
163
+                    }
164
+                    if (!aRow.containsKey("YMD")){
165
+                        aRow.put("YMD",0.85);
166
+                    }
167
+                })
168
+                        .log("mytimer3a")
169
+                        .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks,gas_oil_ratio,water_prod_daily,water_gas_ratio,surface_crude_water_density,surface_crude_oil_density) " +
170
+                                "select '${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}','${body[QYB]}','${body[RCSL]}','${body[SQB]}','${body[SMD]}','${body[YMD]}' " +
171
+                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
172
+                        .doTry()
173
+                        .to("jdbc:centralbase")
174
+                        .doCatch(Exception.class)
175
+                        .log("${header.date}"+" routeId:oracle-3->  centralbase.cb_pc_pro_wellbore_status_daily insert date faild")
176
+                        .endDoTry()
177
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  water_prod_daily =null where water_prod_daily = -1 and prod_date = '${header.date}' "))
178
+                        .to("jdbc:centralbase")
179
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  gas_oil_ratio =null where gas_oil_ratio = -1 and prod_date = '${header.date}' "))
180
+                        .to("jdbc:centralbase")
181
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  water_gas_ratio =null where water_gas_ratio = -1 and prod_date = '${header.date}' "))
182
+                        .to("jdbc:centralbase")
183
+                        .doCatch(Exception.class)
184
+                        .log("${header.date}"+" routeId:oracle-3->  centralbase.cb_pc_pro_wellbore_vol_daily update date faild")
185
+                        .end();
186
+
187
+                from("timer:mytimer4?period=3600000")
188
+                    .routeId("oracle-4")
189
+                    .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
190
+                    .to("jdbc:centralbase")
191
+                    .split(body()).process(exchange -> {
192
+                        Message in = exchange.getIn();
193
+                        in.setHeader("date", getDate());
194
+            })
195
+                    .setBody(simple("SELECT DISTINCT well_id jh FROM centralbase.cb_temp_well_mech_runtime where prod_date > '${header.date}' "))
196
+                    .to("jdbc:centralbase")
197
+                    .split(body()).process(exchange -> {
198
+                        Message in = exchange.getIn();
199
+                        HashMap<String, Object> aRow = in.getBody(HashMap.class);
200
+                        aRow.put("JH", aRow.get("jh").toString());
201
+                        in.setHeader("JH",aRow.get("jh").toString());
202
+                })
203
+                    .setBody(simple("Select t.jh, t.prod_date rq,t.static_pressure jy,t.flow_pres ly,t.pump_diameter bj, t.pump_depth bs,t.pump_efficiency bx,t.rotate_frequency zs,t.stroke_length cc,t.stroke_frequency cs, t.pump_type blx,t.elec_frequency dl,t.dynamic_liq_level dym   FROM V_TEMP_WELL_MECH_ALL t where jh = '${body[JH]}'"))
204
+                    .to("jdbc:oracle")
205
+                    .split(body()).process(exchange -> {
206
+                        Message in = exchange.getIn();
207
+                        HashMap<String, Object> aRow = in.getBody(HashMap.class);
208
+                        if (aRow.get("JY") == null) aRow.put("JY", "0.0");
209
+                        if (aRow.get("LY") == null) aRow.put("LY", "0.0");
210
+                        if (aRow.get("BJ") == null) aRow.put("BJ", "0.0");
211
+                        if (aRow.get("BS") == null) aRow.put("BS", "0.0");
212
+                        if (aRow.get("BX") == null) aRow.put("BX", "0.0");
213
+                        if (aRow.get("ZS") == null) aRow.put("ZS", "0.0");
214
+                        if (aRow.get("CC") == null) aRow.put("CC", "0.0");
215
+                        if (aRow.get("CS") == null) aRow.put("CS", "0.0");
216
+                        if (aRow.get("BLX") == null) aRow.put("BLX", "");
217
+                        if (aRow.get("DL") == null) aRow.put("DL", "0.0");
218
+                        if (aRow.get("DYM") == null) aRow.put("DYM", "0.0");
219
+                })
220
+                        .log("mytimer4")
221
+                        .doTry()
222
+                        .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency,elec_pump_current_b) " +
223
+                                "select '${body[JH]}','${header.date}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}','${body[DYM]}' " +
224
+                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_temp_well_mech_daily WHERE well_id = '${body[JH]}' and  prod_date = '${header.date}' )"))
225
+                        .to("jdbc:centralbase")
226
+                        .doCatch(Exception.class)
227
+                        .log("${header.date}"+" routeId:oracle-4->  centralbase.cb_temp_well_mech_daily insert date faild")
228
+                        .end();
229
+
230
+                from("timer:mytimer5?period=3600000")
231
+                        .routeId("oracle-5")
232
+                        .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
233
+                        .to("jdbc:centralbase")
234
+                        .split(body()).process(exchange -> {
235
+                            Message in = exchange.getIn();
236
+                            in.setHeader("date", getDate()+ " 00:00:00");
237
+                })
238
+                        .setBody(simple("select well_id,prod_date,elec_pump_current_b from centralbase.cb_temp_well_mech_daily where prod_date = '${header.date}' "))
239
+                        .to("jdbc:centralbase")
240
+                        .split(body())
241
+                        .doTry()
242
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[elec_pump_current_b]}' where well_id = '${body[well_id]}' and prod_date  = '${body[prod_date]}'"))
243
+                        .to("jdbc:centralbase")
244
+                        .doCatch(Exception.class)
245
+                        .log("${header.date}"+" routeId:oracle-5->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
246
+                        .end();
247
+
248
+                from("timer:mytimer7?period=3600000")
249
+                        .routeId("oracle-7")
250
+                        .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
251
+                        .to("jdbc:centralbase")
252
+                        .split(body()).process(exchange -> {
253
+                    Message in = exchange.getIn();
254
+                    in.setHeader("date", getDate()+ " 00:00:00");
255
+                })
256
+                        .setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' "))
257
+                        .to("jdbc:centralbase")
258
+                        .split(body()).process(exchange -> {
259
+                            Message in = exchange.getIn();
260
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
261
+                            aRow.put("submergence_depth",null);
262
+                    if (aRow.get("start_pump_liq_level")!=null && aRow.get("pump_depth")!=null){
263
+                        double cmd= Double.valueOf(aRow.get("pump_depth").toString())-Double.valueOf(aRow.get("start_pump_liq_level").toString())/10;
264
+                        BigDecimal bd=new BigDecimal(cmd);
265
+                        double cmd1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
266
+                        aRow.put("submergence_depth",cmd1);
267
+                    }
268
+                })
269
+                        .doTry()
270
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set submergence_depth = '${body[submergence_depth]}' where well_id = '${body[well_id]}' and prod_date  = '${body[prod_date]}'"))
271
+                        .to("jdbc:centralbase")
272
+                        .doCatch(Exception.class)
273
+                        .log("${header.date}"+" routeId:oracle-7->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
274
+                        .end();
275
+
276
+                //检泵周期信息
277
+                from("timer:mytimer8?period=3600000")
278
+                        .routeId("oracle-8")
279
+                        .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
280
+                        .to("jdbc:centralbase")
281
+                        .split(body()).process(exchange -> {
282
+                    Message in = exchange.getIn();
283
+                        in.setHeader("date", getDate());
284
+                })
285
+                        .setBody(simple("select wellname , jbsj , jbyy, jbzq  from JBSJ where SYDATE > date'2021-09-22'"))
286
+                        .to("jdbc:oracle")
287
+                        .split(body()).process(exchange -> {
288
+                            Message in = exchange.getIn();
289
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
290
+                            if (aRow.get("JBSJ") != null)
291
+                            aRow.put("JBSJ",aRow.get("JBSJ").toString().substring(0,10));
292
+                })
293
+                        .doTry()
294
+                        .setBody(simple("insert into centralbase.cb_cd_checkpump_source(well_common_name,check_pump_date,reason,period,sys_date) " +
295
+                                        "select '${body[WELLNAME]}','${body[JBSJ]}','${body[JBYY]}','${body[JBZQ]}','${header.date}' " +
296
+                                        "where NOT EXISTS(select * from centralbase.cb_cd_checkpump_source where well_common_name = '${body[WELLNAME]}' and sys_date = '${header.date}')"))
297
+                        .to("jdbc:centralbase")
298
+                        .doCatch(Exception.class)
299
+                        .log("${header.date}"+" routeId:oracle-8->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
300
+                        .end();
301
+
302
+                //稀稠油信息
303
+                from("timer:mytimer9?period=3600000")
304
+                        .routeId("oracle-9")
305
+                        .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
306
+                        .to("jdbc:centralbase")
307
+                        .split(body()).process(exchange -> {
308
+                            Message in = exchange.getIn();
309
+                            in.setHeader("date", getDate());
310
+                })
311
+                        .setBody(simple("select well_common_name,QK,CW,A_RCYOU,A_RCYE,RCYOU,RCYE,HS,LRQ,CZ_YOU,CZ_YE,CMD,SCXJRQ,YCBZ,UP_DATE from V_Cyyc_YcjData where UP_DATE > to_date('2021-08-01','yyyy-MM-dd')"))
312
+                        .to("jdbc:oracle")
313
+                        .split(body()).process(exchange -> {
314
+                            Message in = exchange.getIn();
315
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
316
+                            if (aRow.get("LRQ") != null)
317
+                            aRow.put("LRQ",aRow.get("LRQ").toString().substring(0,10));
318
+                            if (aRow.get("SCXJRQ") != null)
319
+                            aRow.put("SCXJRQ",aRow.get("SCXJRQ").toString().substring(0,10));
320
+                            if (aRow.get("UP_DATE") != null)
321
+                            aRow.put("UP_DATE",aRow.get("UP_DATE").toString().substring(0,10));
322
+                })
323
+                        .setBody(simple("insert into centralbase.cb_pc_abnl_model_dgn_thin_heavy(WELL_COMMON_NAME,QK,CW,A_RCYOU,A_RCYE,RCYOU,RCYE,HS,LRQ,CZ_YOU,CZ_YE,CMD,SCXJRQ,YCBZ,UP_DATE) " +
324
+                                        "select '${body[WELL_COMMON_NAME]}','${body[QK]}','${body[CW]}','${body[A_RCYOU]}','${body[A_RCYE]}','${body[RCYOU]}','${body[RCYE]}','${body[HS]}','${body[LRQ]}'," +
325
+                                      " '${body[CZ_YOU]}','${body[CZ_YE]}','${body[CMD]}','${body[SCXJRQ]}','${body[YCBZ]}','${body[UP_DATE]}' " +
326
+                                        "where NOT EXISTS(select * from centralbase.cb_pc_abnl_model_dgn_thin_heavy where well_common_name = '${body[WELL_COMMON_NAME]}' and UP_DATE = '${body[UP_DATE]}')"))
327
+                        .doTry()
328
+                        .to("jdbc:centralbase")
329
+                        .doCatch(Exception.class)
330
+                        .log("${header.date}"+" routeId:oracle-9->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
331
+                        .end();
332
+
333
+
334
+                from("timer:mytimer10?period=3600000")
335
+                        .routeId("centralbase-3")
336
+                        .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
337
+                        .to("jdbc:centralbase")
338
+                        .split(body()).process(exchange -> {
339
+                            Message in = exchange.getIn();
340
+                            in.setHeader("date", getDate());
341
+                })
342
+                        .setBody(simple("select well_id,avg(stroke_length) stroke_length  ,avg(stroke_frequency) stroke_frequency from centralbase.cb_temp_well_mech_runtime where prod_date::date='${header.date}' group by well_id"))
343
+                        .to("jdbc:centralbase")//.log("${body}")
344
+                        .split(body()).process(exchange -> {
345
+                            Message in = exchange.getIn();
346
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
347
+                            if (aRow.get("stroke_length")!=null && aRow.get("stroke_frequency")!=null){
348
+                            double stroke_length=Double.parseDouble(aRow.get("stroke_length").toString());
349
+                            double stroke_frequency=Double.parseDouble(aRow.get("stroke_frequency").toString());
350
+                            BigDecimal bd=new BigDecimal(stroke_length);
351
+                            double stroke_lengt1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
352
+                            BigDecimal bd1=new BigDecimal(stroke_frequency);
353
+                            double stroke_frequency1=bd1.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
354
+                            aRow.put("strokeLength",stroke_lengt1);
355
+                            aRow.put("strokeFrequency",stroke_frequency1);
356
+                    }
357
+                })
358
+                        .doTry()
359
+                        .setBody(simple("update centralbase.cb_temp_well_mech_daily set stroke_length='${body[strokeLength]}' ,stroke_frequency ='${body[strokeFrequency]}' where well_id = '${body[well_id]}' and prod_date::date='${header.date}' "))
360
+                        .to("jdbc:centralbase")
361
+                        .doCatch(Exception.class)
362
+                        .log("${header.date}"+" routeId:centralbase-3->  centralbase.cb_temp_well_mech_daily update date faild")
363
+                        .end();
364
+
365
+                from("timer:mytimer11?period=3600000")
366
+                        .routeId("centralbase-1")
367
+                        .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) and ti.stroke_length > 0 "))
368
+                        .to("jdbc:centralbase")
369
+                        .split(body())
370
+                        .doTry()
371
+                        .process(exchange -> {
372
+                            Message in = exchange.getIn();
373
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
374
+                            String wellName =aRow.get("well_common_name").toString();
375
+                            String wellId =aRow.get("well_id").toString();
376
+                            String orgId = aRow.get("org_id").toString();
377
+                            String prodDate = aRow.get("prod_date").toString().substring(0,19);
378
+                            Double strokeLength = Double.valueOf(aRow.get("stroke_length").toString());
379
+                            Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString());
380
+                            String sgt = aRow.get("sgt").toString();
381
+                            DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
382
+                            System.out.println(diagnoseMsg);
383
+                            producer.send((MessageBody) diagnoseMsg);
384
+                        })
385
+                        .doCatch(Exception.class)
386
+                        .log("${header.date}"+" rocketMQ send data faild")
387
+                        .end();
388
+
389
+
390
+                //手动导入井信息
67 391
 
68 392
 //                //修改org_id
69 393
 //                from("timer:mytimer1?period=604800000")
@@ -116,7 +440,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
116 440
 //                        .doCatch(Exception.class)
117 441
 //                        .log("${header.date}"+" routeId:oracle-2->  centralbase.cb_pc_organization insert date faild")
118 442
 //                        .end();
119
-                            //  ,  a.project_name qk , a.station_name zd
443
+                //  ,  a.project_name qk , a.station_name zd
120 444
 //                        .setBody(simple("select distinct a.org_name_cj zyq ,  a.project_name qk from V_CD_WELL_SOURCE_YC a "))
121 445
 //                        .to("jdbc:oracle")
122 446
 //                        .split(body()).process(exchange -> {
@@ -341,267 +665,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
341 665
 //                        .log("${header.date}"+" routeId:jdbc-gtsj-?->  centralbase.cb_temp_well_mech_runtime insert date faild")
342 666
 //                        .end();
343 667
 
344
-                /*——————————————————————————————————————————————————————————-*/
345
-
346
-                //单独执行时间10s    A2数据
347
-                from("timer:mytimer2?period=3600000")
348
-                        .routeId("oracle-2")
349
-                        .setHeader("date", constant(getDate()))
350
-                        .setBody(simple("SELECT DISTINCT well_id wellid FROM centralbase.cb_temp_well_mech_runtime where prod_date > '${header.date}' "))
351
-                        .to("jdbc:centralbase")
352
-                        .split(body())
353
-//                        .split(body())
354
-//                       .setBody(simple("select  distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
355
-//                        .setBody(simple("select v.WELL_COMMON_NAME jh,v.PROD_DATE rq,v.OIL_PRODUCTION_METHOD cyfs,v.PUMP_DIAMETER yz,v.BACK_PRES hysx,v.MAX_TUBING_PRES yysx,v.MAX_CASING_PRES tysx,v.PUMP_DEPTH bs , l.DYNAMIC_LIQ_LEVEL dym from V_PC_PRO_COM_DAILY_CYYC v " +
356
-//                                " left join ( select * from V_TEMP_WELL_MECH_ALL where PROD_DATE = to_date('${header.date}','yyyy-MM-dd') ) l on l.well_id = v.well_id  where v.PROD_DATE = to_date('${header.date}','yyyy-MM-dd') "))
357
-                        .setBody(simple("select v.WELL_COMMON_NAME jh,v.PROD_DATE rq,v.OIL_PRODUCTION_METHOD cyfs,v.PUMP_DIAMETER yz,v.BACK_PRES hysx,v.MAX_TUBING_PRES yysx,v.MAX_CASING_PRES tysx,v.PUMP_DEPTH bs from V_PC_PRO_COM_DAILY_CYYC v  where  v.PROD_DATE = to_date('${header.date}','yyyy-MM-dd')  and  v.WELL_COMMON_NAME = '${body[wellid]}' "))
358
-
359
-                        .to("jdbc:oracle")
360
-                        .split(body()).process(exchange -> {
361
-                    Message in = exchange.getIn();
362
-                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
363
-                    if (aRow.get("YZ") == null) aRow.put("YZ", "0.0");
364
-                    if (aRow.get("HYSX") == null) aRow.put("HYSX", "0.0");
365
-                    if (aRow.get("YYSX") == null) aRow.put("YYSX", "0.0");
366
-                    if (aRow.get("TYSX") == null) aRow.put("TYSX", "0.0");
367
-                    if (aRow.get("BS") == null) aRow.put("BS", "0.0");
368
-                    if (aRow.get("DYM") == null)aRow.put("DYM","0.0");
369
-                })
370
-                        .log("mytimer2")
371
-                        .doTry()
372
-                        .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth,start_pump_liq_level) " +
373
-                                "select '${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}' ,'${body[DYM]}'" +
374
-                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_status_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}')"))
375
-                        .to("jdbc:centralbase")
376
-                        .doCatch(Exception.class)
377
-                        .log("${header.date}"+" routeId:oracle-2->  centralbase.cb_pc_pro_wellbore_status_daily insert date faild")
378
-                        .end();
379
-
380
-                //单独执行时间30s
381
-                from("timer:mytimer3?period=3600000")
382
-                        .routeId("oracle-3")
383
-                        .setHeader("date", constant(getDate()))
384
-                        .setBody(simple("SELECT DISTINCT well_id wellid FROM centralbase.cb_temp_well_mech_runtime where prod_date > '${header.date}' "))
385
-                        .doTry()
386
-                        .to("jdbc:centralbase")
387
-                        .doCatch(Exception.class)
388
-                        .log("centralbase error")
389
-                        .split(body())
390
-//                        .setBody(simple("select distinct  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
391
-                        .setBody(simple("select  WELL_COMMON_NAME jh, PROD_DATE rq,PROD_TIME scsj, LIQ_PROD_DAILY rcyl1,OIL_PROD_DAILY rcyl,GAS_PROD_DAILY rcql,WATER_CUT hs,REMARKS bz from V_PC_PRO_COM_DAILY_CYYC where prod_date = to_date('${header.date}','yyyy-MM-dd') and WELL_COMMON_NAME = '${body[wellid]}'"))
392
-                        .doTry()
393
-                        .to("jdbc:oracle")
394
-                        .doCatch(Exception.class)
395
-                        .log("oracle error")
396
-                        .split(body()).process(exchange -> {
397
-                    Message in = exchange.getIn();
398
-                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
399
-                    if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0");
400
-                    if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0");
401
-                    if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0");
402
-                    if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0");
403
-                    if (aRow.get("HS") == null) aRow.put("HS", "0.0");
404
-                    if (aRow.get("BZ") == null) aRow.put("BZ", "");
405
-                    aRow.put("RCSL",-1);
406
-                    aRow.put("QYB",-1);
407
-                    aRow.put("SQB",-1);
408
-                    if (aRow.get("RCQL")!=null && aRow.get("RCYL")!=null && !aRow.get("RCYL").equals("0.0") && aRow.get("RCYL") != "0.0"){
409
-                        double qyb = Double.valueOf(aRow.get("RCQL").toString()) / Double.valueOf(aRow.get("RCYL").toString());
410
-                        if (!Double.isNaN(qyb) && !Double.isInfinite(qyb)){
411
-                            BigDecimal bd=new BigDecimal(qyb);
412
-                            double d1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
413
-                            aRow.put("QYB",d1);
414
-                        }
415
-                    }
416
-                    if (aRow.get("RCYL1")!=null && aRow.get("HS") != null && !aRow.get("HS").equals("0.0") && aRow.get("HS") != "0.0"){
417
-                        double rcsl = (Double.valueOf(aRow.get("RCYL1").toString()) * Double.valueOf(aRow.get("HS").toString()))/100;
418
-                        if (!Double.isNaN(rcsl) && !Double.isInfinite(rcsl)) {
419
-                            BigDecimal bd = new BigDecimal(rcsl);
420
-                            double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
421
-                            aRow.put("RCSL", d1);
422
-                        }
423
-                    }
424
-                    if (aRow.get("RCQL") != null && aRow.get("RCSL") != null && !aRow.get("RCQL").equals("0.0") && aRow.get("RCQL") != "0.0" ){
425
-                        double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("RCQL").toString());
426
-                        if (!Double.isNaN(sqb) && !Double.isInfinite(sqb)) {
427
-                            BigDecimal bd = new BigDecimal(sqb);
428
-                            double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
429
-                            aRow.put("SQB", d1);
430
-                        }
431
-                    }
432
-                    if (!aRow.containsKey("SMD")){
433
-                        aRow.put("SMD",1);
434
-                    }
435
-                    if (!aRow.containsKey("YMD")){
436
-                        aRow.put("YMD",0.85);
437
-                    }
438
-                })
439
-                        .log("mytimer3")
440
-                        .doTry()
441
-                        .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks,gas_oil_ratio,water_prod_daily,water_gas_ratio,surface_crude_water_density,surface_crude_oil_density) " +
442
-                                "select '${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}','${body[QYB]}','${body[RCSL]}','${body[SQB]}','${body[SMD]}','${body[YMD]}' " +
443
-                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
444
-                        .to("jdbc:centralbase")
445
-                        .doCatch(Exception.class)
446
-                        .log("${header.date}"+" routeId:oracle-3->  centralbase.cb_pc_pro_wellbore_status_daily insert date faild")
447
-                        .doTry()
448
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  water_prod_daily =null where water_prod_daily = -1 and prod_date = '${header.date}' "))
449
-                        .to("jdbc:centralbase")
450
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  gas_oil_ratio =null where gas_oil_ratio = -1 and prod_date = '${header.date}' "))
451
-                        .to("jdbc:centralbase")
452
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  water_gas_ratio =null where water_gas_ratio = -1 and prod_date = '${header.date}' "))
453
-                        .to("jdbc:centralbase")
454
-                        .doCatch(Exception.class)
455
-                        .log("${header.date}"+" routeId:oracle-3->  centralbase.cb_pc_pro_wellbore_vol_daily update date faild")
456
-                        .end();
457
-
458
-//                //0 0 */1 * * ? 每1个小时执行一次
459
-//                //单独执行一次30s
460
-                from("timer:mytimer4?period=3600000")
461
-                    .routeId("oracle-4")
462
-                    .setHeader("date", constant(getDate()))
463
-//                        .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null  "))
464
-//                        .setBody(simple("select distinct b.well_common_name jh,  a.prod_date rq,a.static_pressure jy,a.flow_pres ly,a.pump_diameter bj,a.pump_depth bs,a.pump_efficiency bx,a.rotate_frequency zs,a.stroke_length cc,a.stroke_frequency cs,a.pump_type blx,a.elec_frequency dl from (select * from V_TEMP_WELL_MECH_ALL a1 where a1.prod_date = to_date('${header.date}', 'yyyy-MM-dd')) a left join  V_CD_WELL_SOURCE_YC    b on a.well_id = b.well_id "))
465
-                    .setBody(simple("SELECT DISTINCT well_id jh FROM centralbase.cb_temp_well_mech_runtime where prod_date > '${header.date}' "))
466
-                    .to("jdbc:centralbase")
467
-                    .split(body()).process(exchange -> {
468
-                        Message in = exchange.getIn();
469
-                        HashMap<String, Object> aRow = in.getBody(HashMap.class);
470
-                        aRow.put("JH", aRow.get("jh").toString());
471
-                        in.setHeader("JH",aRow.get("jh").toString());
472
-                })
473
-                    .setBody(simple("   select well_id wellid from  V_CD_WELL_SOURCE_YC  where well_common_name = '${body[JH]}' "))
474
-                    .to("jdbc:oracle")
475
-                    .split(body()).process(exchange -> {
476
-                        Message in = exchange.getIn();
477
-                        HashMap<String, Object> aRow = in.getBody(HashMap.class);
478
-                        aRow.put("wellid", aRow.get("WELLID").toString());
479
-                })
480
-                    .setBody(simple("  select * from( Select t.prod_date rq,t.static_pressure jy,t.flow_pres ly,t.pump_diameter bj,\n" +
481
-                            "                                      t.pump_depth bs,t.pump_efficiency bx,t.rotate_frequency zs,t.stroke_length cc,t.stroke_frequency cs,\n" +
482
-                            "                                      t.pump_type blx,t.elec_frequency dl, row_number() Over(Partition By t.well_id\n" +
483
-                            "                                Order By t.prod_date desc ) rn   FROM V_TEMP_WELL_MECH_ALL  t where well_id = '${body[wellid]}' and dynamic_liq_level > 0)where rn = 1 "))
484
-                    .to("jdbc:oracle")
485
-                    .split(body()).process(exchange -> {
486
-                        Message in = exchange.getIn();
487
-                        HashMap<String, Object> aRow = in.getBody(HashMap.class);
488
-                        if (aRow.get("JY") == null) aRow.put("JY", "0.0");
489
-                        if (aRow.get("LY") == null) aRow.put("LY", "0.0");
490
-                        if (aRow.get("BJ") == null) aRow.put("BJ", "0.0");
491
-                        if (aRow.get("BS") == null) aRow.put("BS", "0.0");
492
-                        if (aRow.get("BX") == null) aRow.put("BX", "0.0");
493
-                        if (aRow.get("ZS") == null) aRow.put("ZS", "0.0");
494
-                        if (aRow.get("CC") == null) aRow.put("CC", "0.0");
495
-                        if (aRow.get("CS") == null) aRow.put("CS", "0.0");
496
-                        if (aRow.get("BLX") == null) aRow.put("BLX", "");
497
-                        if (aRow.get("DL") == null) aRow.put("DL", "0.0");
498
-                })
499
-                        .log("mytimer4")
500
-                        .doTry()
501
-                        .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency) " +
502
-                                "select '${header.JH}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}' " +
503
-                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_temp_well_mech_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
504
-                        .to("jdbc:centralbase")
505
-                        .doCatch(Exception.class)
506
-                        .log("${header.date}"+" routeId:oracle-4->  centralbase.cb_temp_well_mech_daily insert date faild")
507
-                        .end();
508
-
509
-
510
-                from("timer:mytimer7?period=3600000")
511
-                        .routeId("oracle-7")
512
-                        .setHeader("date", constant(getDate() + " 00:00:00"))
513
-                        //三个月之内dym不为空的数据
514
-                        //.setBody(simple("SELECT distinct jh,rq,dym FROM DBA01 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
515
-                        .setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' "))
516
-                        .to("jdbc:centralbase")
517
-                        .split(body()).process(exchange -> {
518
-                            Message in = exchange.getIn();
519
-                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
520
-                            aRow.put("submergence_depth",null);
521
-                    if (aRow.get("start_pump_liq_level")!=null && aRow.get("pump_depth")!=null){
522
-                        double cmd= Double.valueOf(aRow.get("pump_depth").toString())-Double.valueOf(aRow.get("start_pump_liq_level").toString())/10;
523
-                        BigDecimal bd=new BigDecimal(cmd);
524
-                        double cmd1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
525
-                        aRow.put("submergence_depth",cmd1);
526
-                    }
527
-                })
528
-                        .doTry()
529
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set submergence_depth = '${body[submergence_depth]}' where well_id = '${body[well_id]}' and prod_date  = '${body[prod_date]}'"))
530
-                        .to("jdbc:centralbase")
531
-                        .doCatch(Exception.class)
532
-                        .log("${header.date}"+" routeId:oracle-7->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
533
-                        .end();
534
-
535
-
536
-
537
-
538
-//                from("timer:mytimer9?period=3600000")
539
-//                        .routeId("centralbase-2")
540
-//                        .setHeader("date", constant(getDate() + " 00:00:00"))
541
-//                        .setBody(simple("select distinct rn.well_id,cb.prod_date,rn.pump_diameter  from centralbase.cb_temp_well_mech_runtime rn ,centralbase.cb_pc_pro_wellbore_status_daily cb where cb.well_id = rn.well_id " +
542
-//                                "and cb.prod_date = '${header.date}' "))
543
-//                        .to("jdbc:centralbase")//.log("${body}")
544
-//                        .split(body()).process(exchange -> {
545
-//                    Message in = exchange.getIn();
546
-//                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
547
-//                    aRow.putIfAbsent("pump_diameter", "0.0");
548
-//                })
549
-//                        .doTry()
550
-//                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[pump_diameter]}' where well_id ='${body[well_id]}' and prod_date='${body[prod_date]}' "))
551
-//                        .to("jdbc:centralbase")
552
-//                        .doCatch(Exception.class)
553
-//                        .log("${header.date}"+" routeId:centralbase-2->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
554
-//                        .end();
555
-//
556
-                from("timer:mytimer10?period=3600000")
557
-                        .routeId("centralbase-3")
558
-                        .setHeader("date", constant(getDate()))
559
-                        .setBody(simple("select well_id,avg(stroke_length) stroke_length  ,avg(stroke_frequency) stroke_frequency from centralbase.cb_temp_well_mech_runtime where prod_date::date='${header.date}' group by well_id"))
560
-                        .to("jdbc:centralbase")//.log("${body}")
561
-                        .split(body()).process(exchange -> {
562
-                        Message in = exchange.getIn();
563
-                        HashMap<String, Object> aRow = in.getBody(HashMap.class);
564
-                        if (aRow.get("stroke_length")!=null && aRow.get("stroke_frequency")!=null){
565
-                        double stroke_length=Double.parseDouble(aRow.get("stroke_length").toString());
566
-                        double stroke_frequency=Double.parseDouble(aRow.get("stroke_frequency").toString());
567
-                        BigDecimal bd=new BigDecimal(stroke_length);
568
-                        double stroke_lengt1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
569
-                        BigDecimal bd1=new BigDecimal(stroke_frequency);
570
-                        double stroke_frequency1=bd1.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
571
-                        aRow.put("strokeLength",stroke_lengt1);
572
-                        aRow.put("strokeFrequency",stroke_frequency1);
573
-                    }
574
-                })
575
-                        .doTry()
576
-                        .setBody(simple("update centralbase.cb_temp_well_mech_daily set stroke_length='${body[strokeLength]}' ,stroke_frequency ='${body[strokeFrequency]}' where well_id = '${body[well_id]}' and prod_date::date='${header.date}' "))
577
-                        .to("jdbc:centralbase")
578
-                        .doCatch(Exception.class)
579
-                        .log("${header.date}"+" routeId:centralbase-3->  centralbase.cb_temp_well_mech_daily update date faild")
580
-                        .end();
581
-
582
-                from("timer:mytimer11?period=3600000")
583
-                        .routeId("centralbase-1")
584
-                        .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) and ti.stroke_length > 0 "))
585
-                        .to("jdbc:centralbase")
586
-                        .split(body())
587
-                        .doTry()
588
-                        .process(exchange -> {
589
-                            Message in = exchange.getIn();
590
-                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
591
-                            String wellName =aRow.get("well_common_name").toString();
592
-                            String wellId =aRow.get("well_id").toString();
593
-                            String orgId = aRow.get("org_id").toString();
594
-                            String prodDate = aRow.get("prod_date").toString().substring(0,19);
595
-                            Double strokeLength = Double.valueOf(aRow.get("stroke_length").toString());
596
-                            Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString());
597
-                            String sgt = aRow.get("sgt").toString();
598
-                            DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
599
-                            producer.send((MessageBody) diagnoseMsg);
600
-                        })
601
-                        .doCatch(Exception.class)
602
-                        .log("${header.date}"+" rocketMQ send data faild")
603
-                        .end();
604
-
605 668
             }
606 669
         };
607 670
     }

+ 2 - 3
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelRestConfiguration.java

@@ -174,7 +174,7 @@ public class CamelRestConfiguration extends RouteBuilder {
174 174
 
175 175
         JSONObject body = new JSONObject();
176 176
 
177
-         /* restConfiguration()
177
+          restConfiguration()
178 178
                 .component("servlet")
179 179
                 .host("11.72.128.71");
180 180
         from("timer:mytimer?period=3600000")
@@ -191,7 +191,6 @@ public class CamelRestConfiguration extends RouteBuilder {
191 191
 //                .setBody(simple("${header[data]}"))
192 192
 //                .setBody(simple(body.toJSONString()))
193 193
                 .setBody(simple("${in.header.id}"))
194
-
195 194
                 .log("${header.id}")
196 195
                 .split(body()).process(
197 196
                 exchange -> {
@@ -217,7 +216,7 @@ public class CamelRestConfiguration extends RouteBuilder {
217 216
                         "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${in.header.date}' )"))
218 217
                 .to("jdbc:centralbase")
219 218
                 .doCatch(Exception.class)
220
-                .end();*/
219
+                .end();
221 220
 
222 221
       /* JSONArray jsonArray = new JSONArray();
223 222
         JSONObject search = new JSONObject();

+ 23 - 24
src/main/resources/application.yml

@@ -12,20 +12,20 @@ spring:
12 12
       password: 123456
13 13
       driver-class-name: org.postgresql.Driver
14 14
       hikari:
15
-        ## 连接池名字
16
-        pool-name: SystemHikariCP
17
-        ## 最小空闲连接数量
18
-        minimum-idle: 5
19
-        ## 空闲连接存活最大时间,默认600000(10分钟)
20
-        idle-timeout: 600000
21
-        ## 连接池最大连接数,默认是10
22
-        maximum-pool-size: 15
23
-        ## 此属性控制从池返回的连接的默认自动提交行为,默认值:true
24
-        auto-commit: true
25
-        ## 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
26
-        max-lifetime: 1800000
27
-        ## 数据库连接超时时间,默认30秒,即30000
28
-        connection-timeout: 30000
15
+         ## 连接池名字
16
+         pool-name: SystemHikariCP
17
+         ## 最小空闲连接数量
18
+         minimum-idle: 5
19
+         ## 空闲连接存活最大时间,默认600000(10分钟)
20
+         idle-timeout: 600000
21
+         ## 连接池最大连接数,默认是10
22
+         maximum-pool-size: 150
23
+         ## 此属性控制从池返回的连接的默认自动提交行为,默认值:true
24
+         auto-commit: true
25
+         ## 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
26
+         max-lifetime: 1800000
27
+         ## 数据库连接超时时间,默认30秒,即30000
28
+         connection-timeout: 30000
29 29
 
30 30
     ds2:
31 31
       ## Hikari连接池的设置 Hikari 时间单位都是毫秒
@@ -42,7 +42,7 @@ spring:
42 42
         ## 空闲连接存活最大时间,默认600000(10分钟)
43 43
         idle-timeout: 600000
44 44
         ## 连接池最大连接数,默认是10
45
-        maximum-pool-size: 15
45
+        maximum-pool-size: 150
46 46
         ## 此属性控制从池返回的连接的默认自动提交行为,默认值:true
47 47
         auto-commit: true
48 48
         ## 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
@@ -53,13 +53,13 @@ spring:
53 53
     ds3:
54 54
       ## Hikari连接池的设置 Hikari 时间单位都是毫秒
55 55
       type: com.zaxxer.hikari.HikariDataSource
56
-      jdbc-url: jdbc:oracle:thin:@10.72.48.8:1521:ora920
57
-      username: cyyc_a2dms
58
-      password: cyyca2dms2018#
59
-     ## jdbc-url: jdbc:oracle:thin:@11.72.128.73:1521:orcl
60
-     ## username: dhzdly
61
-     ## password: gctgct1QAZ2Wsx
62
-      driver-class-name: oracle.jdbc.driver.OracleDriver
56
+      ##jdbc-url: jdbc:oracle:thin:@10.72.48.8:1521:ora920
57
+      ##username: cyyc_a2dms
58
+      ##password: cyyca2dms2018#
59
+      jdbc-url: jdbc:oracle:thin:@11.72.128.73:1521:orcl
60
+      username: dhzdly
61
+      password: gctgct1QAZ2Wsx
62
+      driver-class-name: oracle.jdbc.OracleDriver
63 63
       hikari:
64 64
         ## 连接池名字
65 65
         pool-name: SystemHikariCP
@@ -68,14 +68,13 @@ spring:
68 68
         ## 空闲连接存活最大时间,默认600000(10分钟)
69 69
         idle-timeout: 600000
70 70
         ## 连接池最大连接数,默认是10
71
-        maximum-pool-size: 15
71
+        maximum-pool-size: 150
72 72
         ## 此属性控制从池返回的连接的默认自动提交行为,默认值:true
73 73
         auto-commit: true
74 74
         ## 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
75 75
         max-lifetime: 1800000
76 76
         ## 数据库连接超时时间,默认30秒,即30000
77 77
         connection-timeout: 30000
78
-        connection-test-query: SELECT 1 FROM DUAL
79 78
 ##    ds4:
80 79
 ##      ## Hikari连接池的设置 Hikari 时间单位都是毫秒
81 80
 ##      type: com.zaxxer.hikari.HikariDataSource