Bläddra i källkod

功图数据最大最小载荷

gxt_xa_000000 4 år sedan
förälder
incheckning
bff7262b3e

+ 96 - 75
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
30 30
  * @since 2021/4/14 下午3:16
31 31
  */
32 32
 @Configuration
33
-public class CamelJDBCConfiguration /*extends RouteBuilder */{
33
+public class CamelJDBCConfiguration /*extends RouteBuilder */ {
34 34
 
35 35
     @Autowired
36 36
     private RocketMQTemplate rocketMQTemplate;
@@ -255,35 +255,52 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
255 255
             };
256 256
         };
257 257
     }*/
258
+    //获取最小载荷
259
+    public Double min(String[] strings){
260
+        double[] doubles = new double[strings.length];
261
+        for (int i = 0; i < strings.length; i++) {
262
+            doubles[i] = Double.parseDouble(strings[i]);
263
+        }
264
+        return Arrays.stream(doubles).min().getAsDouble();
265
+    }
266
+    //获取最大载荷
267
+    public Double max(String[] strings){
268
+        double[] doubles = new double[strings.length];
269
+        for (int i = 0; i < strings.length; i++) {
270
+            doubles[i] = Double.parseDouble(strings[i]);
271
+        }
272
+        return Arrays.stream(doubles).max().getAsDouble();
273
+    }
258 274
     @Bean
259 275
     public RouteBuilder routeBuilderWithOracle1() {
260
-         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
261
-         Date date = new Date(System.currentTimeMillis());
262
-         String date1 = formatter.format(date);
263
-         String formatDate = formatter.format(date) + " 00:00:00";
276
+        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
277
+        Date date = new Date(System.currentTimeMillis());
278
+        String date1 = formatter.format(date);
279
+        String formatDate = formatter.format(date) + " 00:00:00";
264 280
         return new RouteBuilder() {
265 281
             private SortedSet<String> organization;
266 282
             private Map<String, Integer> orgIDs;
267 283
             private Integer orgID;
284
+
268 285
             //全部执行完成的大概时间在30-40分钟
269 286
             @Override
270 287
             public void configure() throws Exception {
271 288
                 //0 0 18 * * ?   每天下午6点执行一次
272 289
                 //单个执行时间30s左右,在之前有数据的情况下
273 290
                 from("timer:mytimer1?period=99999999999")
274
-                .routeId("oracle-1")
275
-                .setHeader("date",constant(date1))
276
-                .setBody(simple("select  distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
277
-                .to("jdbc:oracle")
278
-                .transform()
279
-                .body((result) -> {
280
-                    organization = new TreeSet<>();
281
-                    orgID = 0;
282
-                    orgIDs = new HashMap<>();
283
-                    return result;
284
-                })
291
+                        .routeId("oracle-1")
292
+                        .setHeader("date", constant(date1))
293
+                        .setBody(simple("select  distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
294
+                        .to("jdbc:oracle")
295
+                        .transform()
296
+                        .body((result) -> {
297
+                            organization = new TreeSet<>();
298
+                            orgID = 0;
299
+                            orgIDs = new HashMap<>();
300
+                            return result;
301
+                        })
285 302
                         .step("1")
286
-                .split(body()).process(exchange -> {
303
+                        .split(body()).process(exchange -> {
287 304
                     Message in = exchange.getIn();
288 305
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
289 306
                     String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK");
@@ -305,13 +322,13 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
305 322
                         orgIDs.put(org_level3, orgID);
306 323
                     }
307 324
                 })
308
-                .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
309
-                        "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}')" +
310
-                        " ON conflict(well_id) DO UPDATE set remarks = '${body[BZ]}' "))
311
-                .to("jdbc:centralbase")
312
-                .end()
325
+                        .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
326
+                                "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}')" +
327
+                                " ON conflict(well_id) DO UPDATE set remarks = '${body[BZ]}' "))
328
+                        .to("jdbc:centralbase")
329
+                        .end()
313 330
 
314
-                .transform().body((re) -> {
331
+                        .transform().body((re) -> {
315 332
                     List<Map<String, Object>> rows = new ArrayList<>();
316 333
                     int code = 0;
317 334
                     for (String s : organization) {
@@ -355,33 +372,33 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
355 372
                         .log("insert")
356 373
                         .end();
357 374
                 //单独执行时间10s
358
-               /* from("timer:mytimer2?period=99999999999")
375
+                from("timer:mytimer2?period=99999999999")
359 376
                         .routeId("oracle-2")
360
-                        .setHeader("date",constant(date1))
377
+                        .setHeader("date", constant(date1))
361 378
                         .setBody(simple("select  distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
362 379
                         //.setBody(simple("select  distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from zd_zdgs.dba01@A2 where rq  = to_date('2021-07-05','yyyy-MM-dd') and qyrq is not null "))
363 380
                         .to("jdbc:oracle")
364 381
                         .split(body()).process(exchange -> {
365
-                            Message in = exchange.getIn();
366
-                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
367
-                            if (aRow.get("YZ")==null)   aRow.put("YZ","0.0");
368
-                            if (aRow.get("HYSX")==null) aRow.put("HYSX","0.0");
369
-                            if (aRow.get("YYSX")==null) aRow.put("YYSX","0.0");
370
-                            if (aRow.get("TYSX")==null) aRow.put("TYSX","0.0");
371
-                            if (aRow.get("BS")==null)   aRow.put("BS","0.0");
382
+                    Message in = exchange.getIn();
383
+                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
384
+                    if (aRow.get("YZ") == null) aRow.put("YZ", "0.0");
385
+                    if (aRow.get("HYSX") == null) aRow.put("HYSX", "0.0");
386
+                    if (aRow.get("YYSX") == null) aRow.put("YYSX", "0.0");
387
+                    if (aRow.get("TYSX") == null) aRow.put("TYSX", "0.0");
388
+                    if (aRow.get("BS") == null) aRow.put("BS", "0.0");
372 389
                 })
373 390
                         .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth)" +
374 391
                                 "values ('${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}')"))
375 392
                         .to("jdbc:centralbase")
376 393
                         .log("insert !!!")
377
-                        .end();*/
394
+                        .end();
378 395
                 //查询井对应dym不为空的数据 --目前是只要对应井能查到dym不为空的,无论是什么时间的,都放进去
379
-               //将查询到的DYM数据更新到cb_pc_pro_wellbore_status_daily中
396
+                //将查询到的DYM数据更新到cb_pc_pro_wellbore_status_daily中
380 397
                 //0 0 */1 * * ? 每1个小时执行一次
381 398
                 //单独执行时间是4m15s 317条数据
382
-             /*   from("timer:mytimer5?period=99999999999")
399
+                from("timer:mytimer5?period=99999999999")
383 400
                         .routeId("oracle-5")
384
-                        .setHeader("date",constant(date1+" 00:00:00"))
401
+                        .setHeader("date", constant(date1 + " 00:00:00"))
385 402
                         //三个月之内dym不为空的数据
386 403
                         //.setBody(simple("SELECT distinct jh,rq,dym FROM zd_zdgs.dba01@A2 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
387 404
                         .setBody(simple("SELECT  distinct jh,max(rq),dym FROM zd_zdgs.dba01@A2 WHERE dym is not null group by jh,dym"))
@@ -390,67 +407,67 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
390 407
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' and prod_date = '${header.date}' "))
391 408
                         .to("jdbc:centralbase")
392 409
                         .log("insert !!!")
393
-                        .end();*/
410
+                        .end();
394 411
                 //单独执行时间30s
395
-            /*    from("timer:mytimer3?period=99999999999")
412
+                from("timer:mytimer3?period=99999999999")
396 413
                         .routeId("oracle-3")
397
-                        .setHeader("date",constant(date1))
398
-                        .setBody(simple("select  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
414
+                        .setHeader("date", constant(date1))
415
+                        .setBody(simple("select distinct  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
399 416
                         .to("jdbc:oracle")
400 417
                         .split(body()).process(exchange -> {
401 418
                     Message in = exchange.getIn();
402 419
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
403
-                    if (aRow.get("SCSJ")==null)   aRow.put("SCSJ","0.0");
404
-                    if (aRow.get("RCYL1")==null) aRow.put("RCYL1","0.0");
405
-                    if (aRow.get("RCYL")==null) aRow.put("RCYL","0.0");
406
-                    if (aRow.get("RCQL")==null) aRow.put("RCQL","0.0");
407
-                    if (aRow.get("HS")==null)   aRow.put("HS","0.0");
408
-                    if (aRow.get("BZ")==null)  aRow.put("DYM","");
420
+                    if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0");
421
+                    if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0");
422
+                    if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0");
423
+                    if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0");
424
+                    if (aRow.get("HS") == null) aRow.put("HS", "0.0");
425
+                    if (aRow.get("BZ") == null) aRow.put("DYM", "");
409 426
                 })
410 427
                         .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks) " +
411 428
                                 "values ('${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}')"))
412 429
                         .to("jdbc:centralbase")
413 430
                         .log("insert !!!")
414
-                        .end();*/
431
+                        .end();
415 432
 
416 433
                 //0 0 */1 * * ? 每1个小时执行一次
417 434
                 //单独执行一次30s
418
-             /*   from("timer:mytimer4?period=99999999999")
435
+                from("timer:mytimer4?period=99999999999")
419 436
                         .routeId("oracle-4")
420
-                        .setHeader("date",constant(date1))
421
-                        .setBody(simple("select  jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null  "))
437
+                        .setHeader("date", constant(date1))
438
+                        .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null  "))
422 439
                         .to("jdbc:oracle")
423 440
                         .split(body()).process(exchange -> {
424 441
                     Message in = exchange.getIn();
425 442
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
426
-                    if (aRow.get("JY")==null) aRow.put("JY","0.0");
427
-                    if (aRow.get("LY")==null) aRow.put("LY","0.0");
428
-                    if (aRow.get("BJ")==null) aRow.put("BJ","0.0");
429
-                    if (aRow.get("BS")==null)   aRow.put("BS","0.0");
430
-                    if (aRow.get("BX")==null)  aRow.put("BX","0.0");
431
-                    if (aRow.get("ZS")==null)  aRow.put("ZS","0.0");
432
-                    if (aRow.get("CC")==null)  aRow.put("CC","0.0");
433
-                    if (aRow.get("CS")==null)  aRow.put("CS","0.0");
434
-                    if (aRow.get("BLX")==null)  aRow.put("BLX","");
435
-                    if (aRow.get("DL")==null)  aRow.put("DL","0.0");
443
+                    if (aRow.get("JY") == null) aRow.put("JY", "0.0");
444
+                    if (aRow.get("LY") == null) aRow.put("LY", "0.0");
445
+                    if (aRow.get("BJ") == null) aRow.put("BJ", "0.0");
446
+                    if (aRow.get("BS") == null) aRow.put("BS", "0.0");
447
+                    if (aRow.get("BX") == null) aRow.put("BX", "0.0");
448
+                    if (aRow.get("ZS") == null) aRow.put("ZS", "0.0");
449
+                    if (aRow.get("CC") == null) aRow.put("CC", "0.0");
450
+                    if (aRow.get("CS") == null) aRow.put("CS", "0.0");
451
+                    if (aRow.get("BLX") == null) aRow.put("BLX", "");
452
+                    if (aRow.get("DL") == null) aRow.put("DL", "0.0");
436 453
                 })
437 454
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency)" +
438 455
                                 "values ('${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}')"))
439 456
                         .to("jdbc:centralbase")
440 457
                         .log("insert !!!")
441 458
                         .end();
442
-*/
459
+
443 460
                 //从天安哪里获取的数据
444 461
                 //0 0 */1 * * ? 每1个小时执行一次
445 462
                 //单独执行一小时的数据30s
446
-            /*    from("timer:mytimer7?period=99999999999")
463
+                from("timer:mytimer7?period=99999999999")
447 464
                         .routeId("jdbc-gtsj-?")
448 465
                         .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
449 466
                         .to("jdbc:centralbase")
450 467
                         .split(body())
451
-                        .setHeader("date",simple("${body[max]}"))
468
+                        .setHeader("date", simple("${body[max]}"))
452 469
                         //.setHeader("dyna_create_time", constant(formatDate))
453
-                        .setBody(simple("select well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '${header.date}' "))
470
+                        .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '${header.date}' "))
454 471
                         .to("jdbc:gtsj")
455 472
                         .split(body()).process(exchange -> {
456 473
                     Message in = exchange.getIn();
@@ -460,6 +477,8 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
460 477
                     if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
461 478
                         String[] displacements = aRow.get("displacement").toString().split(";");
462 479
                         String[] disp_loads = aRow.get("disp_load").toString().split(";");
480
+                        Double susp_max_load = max(disp_loads);
481
+                        Double susp_min_load = min(disp_loads);
463 482
                         String sgt = "";
464 483
                         for (int i = 0; i < displacements.length; i++) {
465 484
                             sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
@@ -470,17 +489,19 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
470 489
                             w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
471 490
                         }
472 491
                         aRow.put("sgt", w);
492
+                        aRow.put("susp_max_load",susp_max_load);
493
+                        aRow.put("susp_min_load",susp_min_load);
473 494
                     }
474
-                    if (aRow.get("stroke") == null) aRow.put("stroke","0.0");
475
-                    if (aRow.get("frequency") == null) aRow.put("frequency","0.0");
476
-                    if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load","0.0");
477
-                    if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load","0.0");
495
+                    if (aRow.get("stroke") == null) aRow.put("stroke", "0.0");
496
+                    if (aRow.get("frequency") == null) aRow.put("frequency", "0.0");
497
+                    if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load", "0.0");
498
+                    if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load", "0.0");
478 499
 
479 500
                 })
480 501
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
481 502
                                 "values ('${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}')"))
482 503
                         .to("jdbc:centralbase")
483
-                        .log("insert!!!").end();*/
504
+                        .log("insert!!!").end();
484 505
 //-------------------------------------------------------------------------------------------------------------------------------------------------------------------
485 506
            /*     from("timer:mytimer2?period=999999999")
486 507
                         .routeId("oracle-2")
@@ -558,12 +579,12 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
558 579
                         .log("insert !!!")
559 580
                         .end();
560 581
 */
561
-               /// from("quartz://name?cron=0 0 */1 * * ?") //0 0 */1 * * ? 每1个小时执行一次
562
-                 //       .routeId("oracle-6")
563
-                  //      .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
564
-                  //      .to("jdbc:centralbase")
565
-                  //      .split(body()).log("${body}")
566
-                 //       .end();
582
+                /// from("quartz://name?cron=0 0 */1 * * ?") //0 0 */1 * * ? 每1个小时执行一次
583
+                //       .routeId("oracle-6")
584
+                //      .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
585
+                //      .to("jdbc:centralbase")
586
+                //      .split(body()).log("${body}")
587
+                //       .end();
567 588
 
568 589
 
569 590
             }