Procházet zdrojové kódy

数据介入每一小时执行一次

gxt_xa_000000 před 4 roky
rodič
revize
ada0b870ed

+ 30 - 5
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -386,6 +386,9 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
386 386
                 })
387 387
                         .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth)" +
388 388
                                 "values ('${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}')"))
389
+                        .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth) " +
390
+                                "select '${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}' " +
391
+                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_status_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}')"))
389 392
                         .to("jdbc:centralbase")
390 393
                         .log("insert !!!")
391 394
                         .end();
@@ -401,10 +404,11 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
401 404
                         .setBody(simple("SELECT  distinct jh,max(rq),dym FROM zd_zdgs.dba01@A2 WHERE dym is not null group by jh,dym"))
402 405
                         .to("jdbc:oracle")
403 406
                         .split(body())
404
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' and prod_date = '${header.date}' "))
407
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' "))
405 408
                         .to("jdbc:centralbase")
406 409
                         .log("insert !!!")
407 410
                         .end();
411
+
408 412
                 //单独执行时间30s
409 413
                 from("timer:mytimer3?period=3600000")
410 414
                         .routeId("oracle-3")
@@ -423,6 +427,9 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
423 427
                 })
424 428
                         .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks) " +
425 429
                                 "values ('${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}')"))
430
+                        .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks) " +
431
+                                "select '${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}' " +
432
+                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
426 433
                         .to("jdbc:centralbase")
427 434
                         .log("insert !!!")
428 435
                         .end();
@@ -450,14 +457,29 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
450 457
                 })
451 458
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency)" +
452 459
                                 "values ('${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}')"))
460
+                        .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency) " +
461
+                                "select '${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}' " +
462
+                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_temp_well_mech_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
453 463
                         .to("jdbc:centralbase")
454 464
                         .log("insert !!!")
455 465
                         .end();
456 466
 
467
+                from("timer:mytimer6?period=3600000")
468
+                        .routeId("oracle-6")
469
+                        .setHeader("date", constant(date1 + " 00:00:00"))
470
+                        //五个月之内bj不为空的数据
471
+                        //.setBody(simple("SELECT distinct jh,rq,dym FROM zd_zdgs.dba01@A2 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
472
+                        .setBody(simple("SELECT distinct jh,bj FROM zd_zdgs.dba01@A2 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-5),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and bj is not null"))
473
+                        .to("jdbc:oracle")
474
+                        .split(body())
475
+                        .setBody(simple("update centralbase.cb_temp_well_mech_runtime set pump_diameter = '${body[BJ]}' where well_id = '${body[JH]}' "))
476
+                        .to("jdbc:centralbase")
477
+                        .log("insert !!!")
478
+                        .end();
457 479
                 //从天安哪里获取的数据
458 480
                 //0 0 */1 * * ? 每1个小时执行一次
459 481
                 //单独执行一小时的数据30s
460
-               /* from("timer:mytimer7?period=3600000")
482
+                from("timer:mytimer7?period=3600000")
461 483
                         .routeId("jdbc-gtsj-?")
462 484
                         .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
463 485
                         .to("jdbc:centralbase")
@@ -497,14 +519,18 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
497 519
                 })
498 520
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
499 521
                                 "values ('${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}')"))
522
+                        .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
523
+                                "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
524
+                                "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${body[dyna_create_time]}' )"))
500 525
                         .to("jdbc:centralbase")
501
-                        .log("insert!!!").end();*/
526
+                        .log("insert!!!").end();
502 527
 
503 528
                 from("timer:mytimer1?period=3600000")
504 529
                         .routeId("centralbase-1")
505 530
                         .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) "))
531
+                        //.setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date ='2021-07-12 10:25:00' and ti.well_id = '台24' limit 1 "))
506 532
                         .to("jdbc:centralbase")
507
-                        .split(body()).log("log{body}")
533
+                        .split(body())
508 534
                         .process(exchange -> {
509 535
                             Message in = exchange.getIn();
510 536
                             HashMap<String, Object> aRow = in.getBody(HashMap.class);
@@ -514,7 +540,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
514 540
                             String prodDate = aRow.get("prod_date").toString().substring(0,19);
515 541
                             Double strokeLength = Double.valueOf(aRow.get("stroke_length").toString());
516 542
                             Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString());
517
-                            System.out.println("strokeFrequency"+strokeFrequency);
518 543
                             String sgt = aRow.get("sgt").toString();
519 544
                             DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
520 545
                             producer.send((MessageBody) diagnoseMsg);