Browse Source

正常使用版本

gxt 3 years ago
parent
commit
cae54b13e1

+ 1 - 1
src/main/java/com/gct/tools/etlcamelhuge/MQ/DefaultMsgSendSuccessCallBack.java

@@ -18,6 +18,6 @@ public class DefaultMsgSendSuccessCallBack implements SendSuccessCallBack {
18 18
     @Override
19 19
     public void accept(SendResult sendResult) {
20 20
         System.out.println("send = "+count++);
21
-        log.info("send msg success,msg:{}", sendResult.getMsgId());
21
+        //log.info("send msg success,msg:{}", sendResult.getMsgId());
22 22
     }
23 23
 }

+ 118 - 30
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -58,7 +58,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
58 58
             private SortedSet<String> organization;
59 59
             private Map<String, Integer> orgIDs;
60 60
             private Integer orgID;
61
-
61
+            private Map<String, Integer> stringIntegerMap;
62 62
             //全部执行完成的大概时间在30-40分钟
63 63
             @Override
64 64
             public void configure() throws Exception {
@@ -67,8 +67,11 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
67 67
                 from("timer:mytimer1?period=604800000")
68 68
                         .routeId("oracle-1")
69 69
                         .setHeader("date", constant(getDate()))
70
+                        .doTry()
70 71
                         .setBody(simple("select  distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
71 72
                         .to("jdbc:oracle")
73
+                        .doCatch(Exception.class)
74
+                        .log("${header.date}"+"routeId:oracle-1->  select cb_cd_well_source need data failed")
72 75
                         .transform()
73 76
                         .body((result) -> {
74 77
                             organization = new TreeSet<>();
@@ -105,7 +108,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
105 108
                                 " ON conflict(well_id) DO UPDATE set remarks = '${body[BZ]}' "))
106 109
                         .to("jdbc:centralbase")
107 110
                         .doCatch(Exception.class)
108
-                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_cd_well_source 导入数据失败")
111
+                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_cd_well_source insert data failed")
109 112
                         .end()
110 113
                         .transform().body((re) -> {
111 114
                     List<Map<String, Object>> rows = new ArrayList<>();
@@ -144,7 +147,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
144 147
                                 "ON conflict(org_id_pre) DO UPDATE set org_code = '${body[org_code]}' "))
145 148
                         .to("jdbc:centralbase")
146 149
                         .doCatch(Exception.class)
147
-                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization insert date faild")
150
+                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization insert data failed")
148 151
                         .end()
149 152
                         .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
150 153
                         .to("jdbc:centralbase")
@@ -153,14 +156,17 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
153 156
                         .setBody(simple("update centralbase.cb_cd_well_source set org_id = '${body[org_id]}' where station_name = '${body[org_name]}'"))
154 157
                         .to("jdbc:centralbase")
155 158
                         .doCatch(Exception.class)
156
-                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization update date faild")
159
+                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization update data failed")
157 160
                         .end();
158 161
                 //单独执行时间10s
159 162
                 from("timer:mytimer2?period=3600000")
160 163
                         .routeId("oracle-2")
161 164
                         .setHeader("date", constant(getDate()))
165
+                        .doTry()
162 166
                         .setBody(simple("select  distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
163 167
                         .to("jdbc:oracle")
168
+                        .doCatch(Exception.class)
169
+                        .log("${header.date}"+" routeId:oracle-2->  select cb_pc_pro_wellbore_status_daily  need data failed")
164 170
                         .split(body()).process(exchange -> {
165 171
                     Message in = exchange.getIn();
166 172
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
@@ -176,7 +182,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
176 182
                                 "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_status_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}')"))
177 183
                         .to("jdbc:centralbase")
178 184
                         .doCatch(Exception.class)
179
-                        .log("${header.date}"+" routeId:oracle-2->  centralbase.cb_pc_pro_wellbore_status_daily insert date faild")
185
+                        .log("${header.date}"+" routeId:oracle-2->  centralbase.cb_pc_pro_wellbore_status_daily insert data failed")
180 186
                         .end();
181 187
                 //查询井对应dym不为空的数据 --目前是只要对应井能查到dym不为空的,无论是什么时间的,都放进去
182 188
                 //将查询到的DYM数据更新到cb_pc_pro_wellbore_status_daily中
@@ -186,24 +192,29 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
186 192
                         .routeId("oracle-5")
187 193
                         .setHeader("date", constant(getDate() + " 00:00:00"))
188 194
                         //三个月之内dym不为空的数据
195
+                        .doTry()
189 196
                         //.setBody(simple("SELECT distinct jh,rq,dym FROM DBA01 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
190 197
                         .setBody(simple("SELECT  distinct jh,max(rq),dym FROM DBA01 WHERE dym is not null group by jh,dym"))
191 198
                         .to("jdbc:oracle")
199
+                        .doCatch(Exception.class)
200
+                        .log("${header.date}"+" routeId:oracle-5->  select DYM data failed")
192 201
                         .split(body())
193 202
                         .doTry()
194 203
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' and prod_date::date  = '${header.date}' "))
195 204
                         .to("jdbc:centralbase")
196 205
                         .doCatch(Exception.class)
197
-                        .log("${header.date}"+" routeId:oracle-5->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
206
+                        .log("${header.date}"+" routeId:oracle-5->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
198 207
                         .end();
199 208
 
200 209
                 from("timer:mytimer7?period=3600000")
201 210
                         .routeId("oracle-7")
202 211
                         .setHeader("date", constant(getDate() + " 00:00:00"))
203
-                        //三个月之内dym不为空的数据
204
-                        //.setBody(simple("SELECT distinct jh,rq,dym FROM DBA01 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
212
+                        .doTry()
213
+                        //.setBody(simple("SELECT  distinct jh,max(rq),dym FROM DBA01 WHERE dym is not null group by jh,dym"))
205 214
                         .setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' "))
206 215
                         .to("jdbc:centralbase")
216
+                        .doCatch(Exception.class)
217
+                        .log("${header.date}"+" routeId:oracle-7-> select  centralbase.cb_pc_pro_wellbore_status_daily need data failed")
207 218
                         .split(body()).process(exchange -> {
208 219
                             Message in = exchange.getIn();
209 220
                             HashMap<String, Object> aRow = in.getBody(HashMap.class);
@@ -219,14 +230,17 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
219 230
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set submergence_depth = '${body[submergence_depth]}' where well_id = '${body[well_id]}' and prod_date  = '${body[prod_date]}'"))
220 231
                         .to("jdbc:centralbase")
221 232
                         .doCatch(Exception.class)
222
-                        .log("${header.date}"+" routeId:oracle-7->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
233
+                        .log("${header.date}"+" routeId:oracle-7->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
223 234
                         .end();
224 235
                 //单独执行时间30s
225 236
                 from("timer:mytimer3?period=3600000")
226 237
                         .routeId("oracle-3")
227 238
                         .setHeader("date", constant(getDate()))
239
+                        .doTry()
228 240
                         .setBody(simple("select distinct  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
229 241
                         .to("jdbc:oracle")
242
+                        .doCatch(Exception.class)
243
+                        .log("${header.date}"+" routeId:oracle-3->  select centralbase.cb_pc_pro_wellbore_vol_daily need  insert data failed")
230 244
                         .split(body()).process(exchange -> {
231 245
                     Message in = exchange.getIn();
232 246
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
@@ -276,26 +290,86 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
276 290
                                 "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
277 291
                         .to("jdbc:centralbase")
278 292
                         .doCatch(Exception.class)
279
-                        .log("${header.date}"+" routeId:oracle-3->  centralbase.cb_pc_pro_wellbore_status_daily insert date faild")
293
+                        .log("${header.date}"+" routeId:oracle-3->  centralbase.cb_pc_pro_wellbore_vol_daily insert data failed")
280 294
                         .doTry()
281
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  water_prod_daily =null where water_prod_daily = -1 and prod_date = '${header.date}' "))
295
+                        .setBody(simple("routeId:oracle-3->  update centralbase.cb_pc_pro_wellbore_vol_daily set  water_prod_daily =null where water_prod_daily = -1 and prod_date = '${header.date}' "))
282 296
                         .to("jdbc:centralbase")
283
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  gas_oil_ratio =null where gas_oil_ratio = -1 and prod_date = '${header.date}' "))
297
+                        .setBody(simple("routeId:oracle-3->  update centralbase.cb_pc_pro_wellbore_vol_daily set  gas_oil_ratio =null where gas_oil_ratio = -1 and prod_date = '${header.date}' "))
284 298
                         .to("jdbc:centralbase")
285
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  water_gas_ratio =null where water_gas_ratio = -1 and prod_date = '${header.date}' "))
299
+                        .setBody(simple(" routeId:oracle-3->  update centralbase.cb_pc_pro_wellbore_vol_daily set  water_gas_ratio =null where water_gas_ratio = -1 and prod_date = '${header.date}' "))
286 300
                         .to("jdbc:centralbase")
287 301
                         .doCatch(Exception.class)
288
-                        .log("${header.date}"+" routeId:oracle-3->  centralbase.cb_pc_pro_wellbore_vol_daily update date faild")
302
+                        .log("${header.date}"+" routeId:oracle-3->  centralbase.cb_pc_pro_wellbore_vol_daily update data failed")
289 303
                         .end();
290 304
 
291
-
305
+                from("timer:mytimer12?period=3600000")
306
+                        .routeId("oracle-12")
307
+                        .setHeader("date", constant(getDate()))
308
+                        .doTry()
309
+                        .setBody(simple("select distinct  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
310
+                        .to("jdbc:oracle")
311
+                        .doCatch(Exception.class)
312
+                        .log("${header.date}"+" routeId:oracle-12-> select  centralbase.cb_pc_pro_wellbore_vol_daily need  update data failed")
313
+                        .split(body()).process(exchange -> {
314
+                            Message in = exchange.getIn();
315
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
316
+                            if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0");
317
+                            if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0");
318
+                            if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0");
319
+                            if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0");
320
+                            if (aRow.get("HS") == null) aRow.put("HS", "0.0");
321
+                            if (aRow.get("BZ") == null) aRow.put("BZ", "");
322
+                            aRow.put("RCSL",-1);
323
+                            aRow.put("QYB",-1);
324
+                            aRow.put("SQB",-1);
325
+                            if (aRow.get("RCQL")!=null && aRow.get("RCYL")!=null && !aRow.get("RCYL").equals("0.0") && aRow.get("RCYL") != "0.0"){
326
+                                double qyb = Double.valueOf(aRow.get("RCQL").toString()) / Double.valueOf(aRow.get("RCYL").toString());
327
+                                if (!Double.isNaN(qyb) && !Double.isInfinite(qyb)){
328
+                                    BigDecimal bd=new BigDecimal(qyb);
329
+                                    double d1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
330
+                                    aRow.put("QYB",d1);
331
+                                }
332
+                            }
333
+                            if (aRow.get("RCYL1")!=null && aRow.get("HS") != null && !aRow.get("HS").equals("0.0") && aRow.get("HS") != "0.0"){
334
+                                double rcsl = (Double.valueOf(aRow.get("RCYL1").toString()) * Double.valueOf(aRow.get("HS").toString()))/100;
335
+                                if (!Double.isNaN(rcsl) && !Double.isInfinite(rcsl)) {
336
+                                    BigDecimal bd = new BigDecimal(rcsl);
337
+                                    double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
338
+                                    aRow.put("RCSL", d1);
339
+                                }
340
+                            }
341
+                            if (aRow.get("RCQL") != null && aRow.get("RCSL") != null && !aRow.get("RCQL").equals("0.0") && aRow.get("RCQL") != "0.0" ){
342
+                                double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("RCQL").toString());
343
+                                if (!Double.isNaN(sqb) && !Double.isInfinite(sqb)) {
344
+                                    BigDecimal bd = new BigDecimal(sqb);
345
+                                    double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
346
+                                    aRow.put("SQB", d1);
347
+                                }
348
+                            }
349
+                            if (!aRow.containsKey("SMD")){
350
+                                aRow.put("SMD",1);
351
+                            }
352
+                            if (!aRow.containsKey("YMD")){
353
+                                aRow.put("YMD",0.85);
354
+                            }
355
+                        })
356
+                        .doTry()
357
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set prod_time = '${body[SCSJ]}' ,liq_prod_daily='${body[RCYL1]}' ,oil_prod_daily ='${body[RCYL]}' ,gas_prod_daily ='${body[RCQL]}' ,water_cut='${body[HS]}' ,remarks='${body[BZ]}' ,gas_oil_ratio='${body[QYB]}' ,water_prod_daily='${body[RCSL]}' ,water_gas_ratio='${body[SQB]}',surface_crude_water_density='${body[SMD]}',surface_crude_oil_density= '${body[YMD]}' " +
358
+                                "where well_id = '${body[JH]}' and prod_date ='${body[RQ]}' "))
359
+                        .to("jdbc:centralbase")
360
+                        .doCatch(Exception.class)
361
+                        .log("${header.date}"+" routeId:oracle-12->  centralbase.cb_pc_pro_wellbore_vol_daily update data failed")
362
+                        .end();
292 363
                 //0 0 */1 * * ? 每1个小时执行一次
293 364
                 //单独执行一次30s
294 365
                 from("timer:mytimer4?period=3600000")
295 366
                         .routeId("oracle-4")
296 367
                         .setHeader("date", constant(getDate()))
368
+                        .doTry()
297 369
                         .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null  "))
298 370
                         .to("jdbc:oracle")
371
+                        .doCatch(Exception.class)
372
+                        .log("${header.date}"+" routeId:oracle-4-> select  centralbase.cb_temp_well_mech_daily need insert data failed")
299 373
                         .split(body()).process(exchange -> {
300 374
                     Message in = exchange.getIn();
301 375
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
@@ -316,36 +390,46 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
316 390
                                 "where NOT EXISTS ( SELECT * FROM centralbase.cb_temp_well_mech_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
317 391
                         .to("jdbc:centralbase")
318 392
                         .doCatch(Exception.class)
319
-                        .log("${header.date}"+" routeId:oracle-4->  centralbase.cb_temp_well_mech_daily insert date faild")
393
+                        .log("${header.date}"+" routeId:oracle-4->  centralbase.cb_temp_well_mech_daily insert data failed")
320 394
                         .end();
321 395
 
322 396
                 from("timer:mytimer6?period=3600000")
323 397
                         .routeId("oracle-6")
324 398
                         .setHeader("date", constant(getDate() + " 00:00:00"))
399
+                        .doTry()
325 400
                         //五个月之内bj不为空的数据
326
-                        .setBody(simple("SELECT distinct jh,bj FROM DBA01 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-5),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and bj is not null"))
401
+                        //.setBody(simple("SELECT distinct jh,bj FROM DBA01 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-5),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and bj is not null"))
402
+                        .setBody(simple("SELECT  distinct jh,max(rq),bj FROM DBA01 WHERE dym is not null group by jh,bj"))
327 403
                         .to("jdbc:oracle")
404
+                        .doCatch(Exception.class)
405
+                        .log("${header.date}"+" routeId:oracle-6-> select bj  centralbase.cb_temp_well_mech_runtime need update data failed")
328 406
                         .split(body())
329 407
                         .doTry()
330 408
                         .setBody(simple("update centralbase.cb_temp_well_mech_runtime set pump_diameter = '${body[BJ]}' where well_id = '${body[JH]}' "))
331 409
                         .to("jdbc:centralbase")
332 410
                         .doCatch(Exception.class)
333
-                        .log("${header.date}"+" routeId:oracle-6->  centralbase.cb_temp_well_mech_runtime update date faild")
411
+                        .log("${header.date}"+" routeId:oracle-6->  centralbase.cb_temp_well_mech_runtime update data failed")
334 412
                         .end();
335 413
                 //从天安哪里获取的数据
336 414
                 //0 0 */1 * * ? 每1个小时执行一次
337 415
                 //单独执行一小时的数据30s
338
-                from("timer:mytimer8?period=3600000")
416
+               from("timer:mytimer8?period=3600000")
339 417
                         .routeId("jdbc-gtsj-?")
418
+                        .doTry()
340 419
                         .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
341 420
                         .to("jdbc:centralbase")
421
+                        .doCatch(Exception.class)
422
+                        .log("${header.date}"+" routeId:jdbc-gtsj-?->select max(date)  data failed")
342 423
                         .split(body())
343 424
                         .setHeader("date", simple("${body[max]}"))
344 425
                         .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '${header.date}' "))
426
+                        //.setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '2021-08-23 17:00:00' "))
345 427
                         .to("jdbc:gtsj")
346 428
                         .split(body()).process(exchange -> {
429
+
347 430
                     Message in = exchange.getIn();
348 431
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
432
+                    System.out.println("well_name---"+aRow.get("well_name"));
349 433
                     String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
350 434
                     aRow.put("dyna_create_time", prod_date);
351 435
                     if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
@@ -364,9 +448,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
364 448
                             w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
365 449
                         }
366 450
                         Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
367
-                      /*  for (int i = 0;i<doubles.length;i++){
368
-                            doubles[i][0] = doubles[i][0] / 10;
369
-                        }*/
370 451
                         aRow.put("sgt", SGTUtil.encodeToString(doubles));
371 452
                         aRow.put("susp_max_load",susp_max_load);
372 453
                         aRow.put("susp_min_load",susp_min_load);
@@ -393,15 +474,18 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
393 474
                                 "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${body[dyna_create_time]}' )"))
394 475
                         .to("jdbc:centralbase")
395 476
                         .doCatch(Exception.class)
396
-                        .log("${header.date}"+" routeId:jdbc-gtsj-?->  centralbase.cb_temp_well_mech_runtime insert date faild")
477
+                        .log("${header.date}"+" routeId:jdbc-gtsj-?->  centralbase.cb_temp_well_mech_runtime insert data failed")
397 478
                         .end();
398 479
 
399
-                from("timer:mytimer9?period=3600000")
480
+                  from("timer:mytimer9?period=3600000")
400 481
                         .routeId("centralbase-2")
401 482
                         .setHeader("date", constant(getDate() + " 00:00:00"))
483
+                        .doTry()
402 484
                         .setBody(simple("select distinct rn.well_id,cb.prod_date,rn.pump_diameter  from centralbase.cb_temp_well_mech_runtime rn ,centralbase.cb_pc_pro_wellbore_status_daily cb where cb.well_id = rn.well_id\n" +
403 485
                                 "and cb.prod_date = '${header.date}' "))
404 486
                         .to("jdbc:centralbase")//.log("${body}")
487
+                        .doCatch(Exception.class)
488
+                        .log("${header.date}"+" routeId:centralbase-2-> select  pump_diameter centralbase.cb_pc_pro_wellbore_status_daily  data failed")
405 489
                         .split(body()).process(exchange -> {
406 490
                             Message in = exchange.getIn();
407 491
                             HashMap<String, Object> aRow = in.getBody(HashMap.class);
@@ -411,14 +495,17 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
411 495
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[pump_diameter]}' where well_id ='${body[well_id]}' and prod_date='${body[prod_date]}' "))
412 496
                         .to("jdbc:centralbase")
413 497
                         .doCatch(Exception.class)
414
-                        .log("${header.date}"+" routeId:centralbase-2->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
498
+                        .log("${header.date}"+" routeId:centralbase-2->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
415 499
                         .end();
416 500
 
417
-                from("timer:mytimer10?period=3600000")
501
+               from("timer:mytimer10?period=3600000")
418 502
                         .routeId("centralbase-3")
419 503
                         .setHeader("date", constant(getDate()))
504
+                        .doTry()
420 505
                         .setBody(simple("select well_id,avg(stroke_length) stroke_length  ,avg(stroke_frequency) stroke_frequency from centralbase.cb_temp_well_mech_runtime where prod_date::date='${header.date}' group by well_id"))
421 506
                         .to("jdbc:centralbase")//.log("${body}")
507
+                        .doCatch(Exception.class)
508
+                        .log("${header.date}"+" routeId:centralbase-3-> select stroke_length  centralbase.cb_temp_well_mech_daily  data failed")
422 509
                         .split(body()).process(exchange -> {
423 510
                     Message in = exchange.getIn();
424 511
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
@@ -437,9 +524,8 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
437 524
                         .setBody(simple("update centralbase.cb_temp_well_mech_daily set stroke_length='${body[strokeLength]}' ,stroke_frequency ='${body[strokeFrequency]}' where well_id = '${body[well_id]}' and prod_date::date='${header.date}' "))
438 525
                         .to("jdbc:centralbase")
439 526
                         .doCatch(Exception.class)
440
-                        .log("${header.date}"+" routeId:centralbase-3->  centralbase.cb_temp_well_mech_daily update date faild")
527
+                        .log("${header.date}"+" routeId:centralbase-3->  centralbase.cb_temp_well_mech_daily update data failed")
441 528
                         .end();
442
-
443 529
                 from("timer:mytimer11?period=3600000")
444 530
                         .routeId("centralbase-1")
445 531
                         .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) "))
@@ -456,13 +542,15 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
456 542
                             Double strokeLength = Double.valueOf(aRow.get("stroke_length").toString());
457 543
                             Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString());
458 544
                             String sgt = aRow.get("sgt").toString();
545
+                            if (sgt == null || sgt.length() ==0){
546
+                                sgt = "0,0";
547
+                            }
459 548
                             DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
460 549
                             producer.send((MessageBody) diagnoseMsg);
461 550
                         })
462 551
                         .doCatch(Exception.class)
463
-                        .log("${header.date}"+" rocketMQ send data faild")
552
+                        .log("${header.date}"+" rocketMQ send data failed")
464 553
                         .end();
465
-
466 554
             };
467 555
         };
468 556
     }

+ 2 - 1
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelRestConfiguration.java

@@ -12,7 +12,7 @@ import java.util.HashMap;
12 12
 import java.util.concurrent.atomic.AtomicInteger;
13 13
 
14 14
 
15
-@Configuration
15
+//@Configuration
16 16
 public class CamelRestConfiguration extends RouteBuilder {
17 17
     @Override
18 18
     public synchronized void configure() throws Exception {
@@ -37,6 +37,7 @@ public class CamelRestConfiguration extends RouteBuilder {
37 37
                 .setBody(simple(body.toJSONString()))
38 38
                 .to("rest:post:/dibs/0/service/token")
39 39
                 .unmarshal().json(JsonLibrary.Jackson)
40
+                .log("${body}")
40 41
                 .setBody(simple("${body[result]}"))
41 42
                 .setHeader("token", simple("${body[token]}"))
42 43
                 .setHeader("Authorization", simple("Bearer " + "${in.header.token}"))

+ 1 - 1
src/main/resources/application.yml

@@ -111,7 +111,7 @@ management:
111 111
       cors:
112 112
         allow-credentials: off
113 113
 server:
114
-  port: 8080
114
+  port: 9999
115 115
 
116 116
 
117 117
 rocketmq: