Browse Source

数据导入代码修改

gxt_xa_000000 4 years ago
parent
commit
05b55351e6

+ 42 - 25
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -268,10 +268,11 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
268
 
268
 
269
             @Override
269
             @Override
270
             public void configure() throws Exception {
270
             public void configure() throws Exception {
271
+                //0 0 18 * * ?   每天下午6点执行一次
271
                 from("timer:mytimer1?period=999999999")
272
                 from("timer:mytimer1?period=999999999")
272
                 .routeId("oracle-1")
273
                 .routeId("oracle-1")
273
                 .setHeader("date",constant(date1))
274
                 .setHeader("date",constant(date1))
274
-                .setBody(simple("select  jh,cydmc,zyq,zk,qyrq from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
275
+                .setBody(simple("select  distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
275
                 .to("jdbc:oracle")
276
                 .to("jdbc:oracle")
276
                 .transform()
277
                 .transform()
277
                 .body((result) -> {
278
                 .body((result) -> {
@@ -303,8 +304,8 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
303
                         orgIDs.put(org_level3, orgID);
304
                         orgIDs.put(org_level3, orgID);
304
                     }
305
                     }
305
                 })
306
                 })
306
-                .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name) " +
307
-                        "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}')"))
307
+                .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
308
+                        "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}')"))
308
                 .to("jdbc:centralbase")
309
                 .to("jdbc:centralbase")
309
                 .end()
310
                 .end()
310
 
311
 
@@ -340,7 +341,8 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
340
                     return rows;
341
                     return rows;
341
                 }).split(body())
342
                 }).split(body())
342
                         .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
343
                         .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
343
-                                "values('${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}')"))
344
+                                "values('${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}')" +
345
+                                "ON conflict(org_name) DO UPDATE set org_code = '${body[org_code]}' "))
344
                         .to("jdbc:centralbase")
346
                         .to("jdbc:centralbase")
345
                         .end()
347
                         .end()
346
                         .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
348
                         .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
@@ -354,7 +356,8 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
354
                 from("timer:mytimer2?period=999999999")
356
                 from("timer:mytimer2?period=999999999")
355
                         .routeId("oracle-2")
357
                         .routeId("oracle-2")
356
                         .setHeader("date",constant(date1))
358
                         .setHeader("date",constant(date1))
357
-                        .setBody(simple("select  jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
359
+                        .setBody(simple("select  distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
360
+                        //.setBody(simple("select  distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from zd_zdgs.dba01@A2 where rq  = to_date('2021-07-05','yyyy-MM-dd') and qyrq is not null "))
358
                         .to("jdbc:oracle")
361
                         .to("jdbc:oracle")
359
                         .split(body()).process(exchange -> {
362
                         .split(body()).process(exchange -> {
360
                             Message in = exchange.getIn();
363
                             Message in = exchange.getIn();
@@ -364,10 +367,24 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
364
                             if (aRow.get("YYSX")==null) aRow.put("YYSX","0.0");
367
                             if (aRow.get("YYSX")==null) aRow.put("YYSX","0.0");
365
                             if (aRow.get("TYSX")==null) aRow.put("TYSX","0.0");
368
                             if (aRow.get("TYSX")==null) aRow.put("TYSX","0.0");
366
                             if (aRow.get("BS")==null)   aRow.put("BS","0.0");
369
                             if (aRow.get("BS")==null)   aRow.put("BS","0.0");
367
-                            if (aRow.get("DYM")==null)  aRow.put("DYM","0.0");
368
                 })
370
                 })
369
-                        .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth,start_pump_liq_level)" +
370
-                                "values ('${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}','${body[DYM]}')"))
371
+                        .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth)" +
372
+                                "values ('${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}')"))
373
+                        .to("jdbc:centralbase")
374
+                        .log("insert !!!")
375
+                        .end();
376
+                //查询井对应dym不为空的数据 --目前是只要对应井能查到dym不为空的,无论是什么时间的,都放进去
377
+               //将查询到的DYM数据更新到cb_pc_pro_wellbore_status_daily中
378
+                //0 0 */1 * * ? 每1个小时执行一次
379
+                from("timer:mytimer5?period=999999999")
380
+                        .routeId("oracle-5")
381
+                        .setHeader("date",constant(date1))
382
+                        //三个月之内dym不为空的数据
383
+                        //.setBody(simple("SELECT distinct jh,rq,dym FROM zd_zdgs.dba01@A2 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
384
+                        .setBody(simple("SELECT  distinct jh,max(rq),dym FROM zd_zdgs.dba01@A2 WHERE dym is not null group by jh,dym"))
385
+                        .to("jdbc:oracle")
386
+                        .split(body())
387
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' and prod_date = '2021-07-05' "))
371
                         .to("jdbc:centralbase")
388
                         .to("jdbc:centralbase")
372
                         .log("insert !!!")
389
                         .log("insert !!!")
373
                         .end();
390
                         .end();
@@ -393,6 +410,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
393
                         .log("insert !!!")
410
                         .log("insert !!!")
394
                         .end();
411
                         .end();
395
 
412
 
413
+                //0 0 */1 * * ? 每1个小时执行一次
396
                 from("timer:mytimer4?period=999999999")
414
                 from("timer:mytimer4?period=999999999")
397
                         .routeId("oracle-4")
415
                         .routeId("oracle-4")
398
                         .setHeader("date",constant(date1))
416
                         .setHeader("date",constant(date1))
@@ -401,7 +419,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
401
                         .split(body()).process(exchange -> {
419
                         .split(body()).process(exchange -> {
402
                     Message in = exchange.getIn();
420
                     Message in = exchange.getIn();
403
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
421
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
404
-                    if (aRow.get("DYM")==null)   aRow.put("DYM","0.0");
405
                     if (aRow.get("JY")==null) aRow.put("JY","0.0");
422
                     if (aRow.get("JY")==null) aRow.put("JY","0.0");
406
                     if (aRow.get("LY")==null) aRow.put("LY","0.0");
423
                     if (aRow.get("LY")==null) aRow.put("LY","0.0");
407
                     if (aRow.get("BJ")==null) aRow.put("BJ","0.0");
424
                     if (aRow.get("BJ")==null) aRow.put("BJ","0.0");
@@ -413,17 +430,18 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
413
                     if (aRow.get("BLX")==null)  aRow.put("BLX","");
430
                     if (aRow.get("BLX")==null)  aRow.put("BLX","");
414
                     if (aRow.get("DL")==null)  aRow.put("DL","0.0");
431
                     if (aRow.get("DL")==null)  aRow.put("DL","0.0");
415
                 })
432
                 })
416
-                        .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,dynamic_liq_level,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency)" +
417
-                                "values ('${body[JH]}','${body[RQ]}','${body[DYM]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}')"))
433
+                        .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency)" +
434
+                                "values ('${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}')"))
418
                         .to("jdbc:centralbase")
435
                         .to("jdbc:centralbase")
419
                         .log("insert !!!")
436
                         .log("insert !!!")
420
                         .end();
437
                         .end();
421
 
438
 
422
                 //从天安哪里获取的数据
439
                 //从天安哪里获取的数据
423
-                from("timer:mytimer1?period=10000")
440
+                //0 0 */1 * * ? 每1个小时执行一次
441
+                from("timer:mytimer7?period=10000")
424
                         .routeId("jdbc-gtsj-?")
442
                         .routeId("jdbc-gtsj-?")
425
                         .setHeader("dyna_create_time", constant(formatDate))
443
                         .setHeader("dyna_create_time", constant(formatDate))
426
-                        .setBody(simple("select well_id,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > (select max(dyna_create_time) from public.pc_fd_pumpjack_dyna_dia_t ) "))
444
+                        .setBody(simple("select well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > (select max(dyna_create_time) from public.pc_fd_pumpjack_dyna_dia_t ) "))
427
                         .to("jdbc:gtsj")
445
                         .to("jdbc:gtsj")
428
                         .split(body()).process(exchange -> {
446
                         .split(body()).process(exchange -> {
429
                     Message in = exchange.getIn();
447
                     Message in = exchange.getIn();
@@ -449,12 +467,12 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
449
                     if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load","0.0");
467
                     if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load","0.0");
450
                     if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load","0.0");
468
                     if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load","0.0");
451
 
469
 
452
-                })//从这里开始
470
+                })
453
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
471
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
454
-                                "values ('${body[well_id]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}')"))
472
+                                "values ('${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}')"))
455
                         .to("jdbc:centralbase")
473
                         .to("jdbc:centralbase")
456
                         .log("insert!!!").end();
474
                         .log("insert!!!").end();
457
-
475
+//-------------------------------------------------------------------------------------------------------------------------------------------------------------------
458
            /*     from("timer:mytimer2?period=999999999")
476
            /*     from("timer:mytimer2?period=999999999")
459
                         .routeId("oracle-2")
477
                         .routeId("oracle-2")
460
                         .setBody(simple("select distinct station_id,station_name from centralbase.cb_cd_well_source"))
478
                         .setBody(simple("select distinct station_id,station_name from centralbase.cb_cd_well_source"))
@@ -530,16 +548,15 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
530
                         .to("jdbc:centralbase")
548
                         .to("jdbc:centralbase")
531
                         .log("insert !!!")
549
                         .log("insert !!!")
532
                         .end();
550
                         .end();
551
+*/
552
+               /// from("quartz://name?cron=0 0 */1 * * ?") //0 0 */1 * * ? 每1个小时执行一次
553
+                 //       .routeId("oracle-6")
554
+                  //      .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
555
+                  //      .to("jdbc:centralbase")
556
+                  //      .split(body()).log("${body}")
557
+                 //       .end();
558
+
533
 
559
 
534
-                from("timer:mytimer6?period=999999999")
535
-                        .routeId("oracle-6")
536
-                        .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
537
-                        .to("jdbc:centralbase")
538
-                        .split(body())
539
-                        .setBody(simple("update centralbase.cb_cd_well_source set org_id = '${body[org_id]}' where station_name = '${body[org_name]}'"))
540
-                        .to("jdbc:centralbase")
541
-                        .log("insert !!!")
542
-                        .end();*/
543
             }
560
             }
544
 
561
 
545
             ;
562
             ;