浏览代码

其他需要的数据导入

gxt_xa_000000 4 年之前
父节点
当前提交
9091a64bbc
共有 1 个文件被更改,包括 147 次插入58 次删除
  1. 147 58
      src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

+ 147 - 58
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -1,14 +1,26 @@
1 1
 package com.gct.tools.etlcamelhuge.routeconfig;
2 2
 
3
+import com.alibaba.fastjson.JSONObject;
4
+import com.gct.tools.etlcamelhuge.entity.MQDiagnoseMsg;
3 5
 import org.apache.camel.*;
4 6
 import org.apache.camel.builder.RouteBuilder;
7
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
8
+//import org.apache.rocketmq.common.message.Message;
5 9
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
10
+import org.slf4j.Logger;
6 11
 import org.springframework.beans.factory.annotation.Autowired;
7 12
 import org.springframework.context.annotation.Bean;
8 13
 import org.springframework.context.annotation.Configuration;
14
+import org.springframework.http.HttpEntity;
15
+import org.springframework.http.HttpHeaders;
16
+import org.springframework.http.MediaType;
17
+import org.springframework.jdbc.support.SQLStateSQLExceptionTranslator;
9 18
 
19
+import java.math.BigDecimal;
20
+import java.nio.charset.StandardCharsets;
10 21
 import java.text.SimpleDateFormat;
11 22
 import java.util.*;
23
+import java.util.concurrent.atomic.AtomicInteger;
12 24
 
13 25
 /**
14 26
  * class name: CamelJDBCConfiguration
@@ -18,7 +30,7 @@ import java.util.*;
18 30
  * @since 2021/4/14 下午3:16
19 31
  */
20 32
 @Configuration
21
-public class CamelJDBCConfiguration /*extends RouteBuilder */ {
33
+public class CamelJDBCConfiguration /*extends RouteBuilder */{
22 34
 
23 35
     @Autowired
24 36
     private RocketMQTemplate rocketMQTemplate;
@@ -245,9 +257,10 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
245 257
     }*/
246 258
     @Bean
247 259
     public RouteBuilder routeBuilderWithOracle1() {
248
-        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
249
-        Date date = new Date(System.currentTimeMillis());
250
-        String formatDate = formatter.format(date) + " 00:00:00";
260
+         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
261
+         Date date = new Date(System.currentTimeMillis());
262
+         String date1 = formatter.format(date);
263
+         String formatDate = formatter.format(date) + " 00:00:00";
251 264
         return new RouteBuilder() {
252 265
             private SortedSet<String> organization;
253 266
             private Map<String, Integer> orgIDs;
@@ -255,10 +268,10 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
255 268
 
256 269
             @Override
257 270
             public void configure() throws Exception {
258
-
259 271
                 from("timer:mytimer1?period=999999999")
260 272
                 .routeId("oracle-1")
261
-                .setBody(simple("select  jh,cydmc,zyq,zk,qyrq from zd_zdgs.dba01@A2 where rq  = to_date('2021-07-01','yyyy-MM-dd') and qyrq is not null "))
273
+                .setHeader("date",constant(date1))
274
+                .setBody(simple("select  jh,cydmc,zyq,zk,qyrq from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
262 275
                 .to("jdbc:oracle")
263 276
                 .transform()
264 277
                 .body((result) -> {
@@ -267,8 +280,8 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
267 280
                     orgIDs = new HashMap<>();
268 281
                     return result;
269 282
                 })
283
+                        .step("1")
270 284
                 .split(body()).process(exchange -> {
271
-
272 285
                     Message in = exchange.getIn();
273 286
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
274 287
                     String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK");
@@ -289,16 +302,12 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
289 302
                         orgID++;
290 303
                         orgIDs.put(org_level3, orgID);
291 304
                     }
292
-
293
-
294 305
                 })
295
-                .log("${body}")
296 306
                 .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name) " +
297 307
                         "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}')"))
298 308
                 .to("jdbc:centralbase")
309
+                .end()
299 310
 
300
-                .log("insert success")
301
-                /*******notice*********/
302 311
                 .transform().body((re) -> {
303 312
                     List<Map<String, Object>> rows = new ArrayList<>();
304 313
                     int code = 0;
@@ -329,24 +338,134 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
329 338
                         rows.add(row);
330 339
                     }
331 340
                     return rows;
332
-                })
341
+                }).split(body())
333 342
                         .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
334
-                                "values('${body[org_id],body[org_code],body[org_name],body[org_level],body[parent_id],body[org_id_pre]}"))
343
+                                "values('${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}')"))
335 344
                         .to("jdbc:centralbase")
336
-                        /*******************************/
345
+                        .end()
346
+                        .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
347
+                        .to("jdbc:centralbase")
348
+                        .split(body())
349
+                        .setBody(simple("update centralbase.cb_cd_well_source set org_id = '${body[org_id]}' where station_name = '${body[org_name]}'"))
350
+                        .to("jdbc:centralbase")
351
+                        .log("insert")
337 352
                         .end();
338 353
 
339 354
                 from("timer:mytimer2?period=999999999")
340 355
                         .routeId("oracle-2")
341
-                        .setBody(simple("select distinct station_id,station_name from centralbase.cb_cd_well_source"))
356
+                        .setHeader("date",constant(date1))
357
+                        .setBody(simple("select  jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
358
+                        .to("jdbc:oracle")
359
+                        .split(body()).process(exchange -> {
360
+                            Message in = exchange.getIn();
361
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
362
+                            if (aRow.get("YZ")==null)   aRow.put("YZ","0.0");
363
+                            if (aRow.get("HYSX")==null) aRow.put("HYSX","0.0");
364
+                            if (aRow.get("YYSX")==null) aRow.put("YYSX","0.0");
365
+                            if (aRow.get("TYSX")==null) aRow.put("TYSX","0.0");
366
+                            if (aRow.get("BS")==null)   aRow.put("BS","0.0");
367
+                            if (aRow.get("DYM")==null)  aRow.put("DYM","0.0");
368
+                })
369
+                        .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth,start_pump_liq_level)" +
370
+                                "values ('${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}','${body[DYM]}')"))
342 371
                         .to("jdbc:centralbase")
372
+                        .log("insert !!!")
373
+                        .end();
374
+
375
+                from("timer:mytimer3?period=999999999")
376
+                        .routeId("oracle-3")
377
+                        .setHeader("date",constant(date1))
378
+                        .setBody(simple("select  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
379
+                        .to("jdbc:oracle")
343 380
                         .split(body()).process(exchange -> {
344 381
                     Message in = exchange.getIn();
345 382
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
346
-                    String parent_id = aRow.get("station_id").toString().substring(0, aRow.get("station_id").toString().lastIndexOf("@"));
347
-                    aRow.put("parent_id", parent_id);
348
-                    String org_id = UUID.randomUUID().toString().replace("-", "").substring(0, 10);
349
-                    aRow.put("org_id", org_id);
383
+                    if (aRow.get("SCSJ")==null)   aRow.put("SCSJ","0.0");
384
+                    if (aRow.get("RCYL1")==null) aRow.put("RCYL1","0.0");
385
+                    if (aRow.get("RCYL")==null) aRow.put("RCYL","0.0");
386
+                    if (aRow.get("RCQL")==null) aRow.put("RCQL","0.0");
387
+                    if (aRow.get("HS")==null)   aRow.put("HS","0.0");
388
+                    if (aRow.get("BZ")==null)  aRow.put("DYM","");
389
+                })
390
+                        .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks) " +
391
+                                "values ('${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}')"))
392
+                        .to("jdbc:centralbase")
393
+                        .log("insert !!!")
394
+                        .end();
395
+
396
+                from("timer:mytimer4?period=999999999")
397
+                        .routeId("oracle-4")
398
+                        .setHeader("date",constant(date1))
399
+                        .setBody(simple("select  jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from zd_zdgs.dba01@A2 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null  "))
400
+                        .to("jdbc:oracle")
401
+                        .split(body()).process(exchange -> {
402
+                    Message in = exchange.getIn();
403
+                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
404
+                    if (aRow.get("DYM")==null)   aRow.put("DYM","0.0");
405
+                    if (aRow.get("JY")==null) aRow.put("JY","0.0");
406
+                    if (aRow.get("LY")==null) aRow.put("LY","0.0");
407
+                    if (aRow.get("BJ")==null) aRow.put("BJ","0.0");
408
+                    if (aRow.get("BS")==null)   aRow.put("BS","0.0");
409
+                    if (aRow.get("BX")==null)  aRow.put("BX","0.0");
410
+                    if (aRow.get("ZS")==null)  aRow.put("ZS","0.0");
411
+                    if (aRow.get("CC")==null)  aRow.put("CC","0.0");
412
+                    if (aRow.get("CS")==null)  aRow.put("CS","0.0");
413
+                    if (aRow.get("BLX")==null)  aRow.put("BLX","");
414
+                    if (aRow.get("DL")==null)  aRow.put("DL","0.0");
415
+                })
416
+                        .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,dynamic_liq_level,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency)" +
417
+                                "values ('${body[JH]}','${body[RQ]}','${body[DYM]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}')"))
418
+                        .to("jdbc:centralbase")
419
+                        .log("insert !!!")
420
+                        .end();
421
+
422
+                //从天安哪里获取的数据
423
+                from("timer:mytimer1?period=10000")
424
+                        .routeId("jdbc-gtsj-?")
425
+                        .setHeader("dyna_create_time", constant(formatDate))
426
+                        .setBody(simple("select well_id,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '2021-07-01 00:00:00' "))
427
+                        .to("jdbc:gtsj")
428
+                        .split(body()).process(exchange -> {
429
+                    Message in = exchange.getIn();
430
+                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
431
+                    String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
432
+                    aRow.put("dyna_create_time", prod_date);
433
+                    if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
434
+                        String[] displacements = aRow.get("displacement").toString().split(";");
435
+                        String[] disp_loads = aRow.get("disp_load").toString().split(";");
436
+                        String sgt = "";
437
+                        for (int i = 0; i < displacements.length; i++) {
438
+                            sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
439
+                        }
440
+                        String[] s = sgt.split(",");
441
+                        String w = "";
442
+                        for (int i = 0; i < s.length; i++) {
443
+                            w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
444
+                        }
445
+                        aRow.put("sgt", w);
446
+                    }
447
+                    if (aRow.get("stroke") == null) aRow.put("stroke","0.0");
448
+                    if (aRow.get("frequency") == null) aRow.put("frequency","0.0");
449
+                    if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load","0.0");
450
+                    if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load","0.0");
451
+
452
+                })//从这里开始
453
+                        .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
454
+                                "values ('${body[well_id]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}')"))
455
+                        .to("jdbc:centralbase")
456
+                        .log("insert!!!").end();
457
+
458
+           /*     from("timer:mytimer2?period=999999999")
459
+                        .routeId("oracle-2")
460
+                        .setBody(simple("select distinct station_id,station_name from centralbase.cb_cd_well_source"))
461
+                        .to("jdbc:centralbase")
462
+                        .split(body()).process(exchange -> {
463
+                            Message in = exchange.getIn();
464
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
465
+                            String parent_id = aRow.get("station_id").toString().substring(0, aRow.get("station_id").toString().lastIndexOf("@"));
466
+                            aRow.put("parent_id",parent_id);
467
+                            String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10);
468
+                            aRow.put("org_id",org_id);
350 469
                 })
351 470
                         .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" +
352 471
                                 "values('${body[org_id]}','${body[station_name]}','3','${body[parent_id]}')"))
@@ -364,13 +483,13 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
364 483
                     String parent_id = aRow.get("parent_id").toString().split("@")[0];
365 484
                     String org_name = aRow.get("parent_id").toString().split("@")[1];
366 485
                     //if (!aRow.containsValue(org_name)){
367
-                    aRow.put("org_name", org_name);
486
+                    aRow.put("org_name",org_name);
368 487
                     // }
369 488
                     //if (!aRow.containsValue(parent_id)){
370
-                    aRow.put("parent_id", parent_id);
489
+                    aRow.put("parent_id",parent_id);
371 490
                     //}
372
-                    String org_id = UUID.randomUUID().toString().replace("-", "").substring(0, 10);
373
-                    aRow.put("levelTwoOrgId", org_id);
491
+                    String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10);
492
+                    aRow.put("levelTwoOrgId",org_id);
374 493
                 })
375 494
                         .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" +
376 495
                                 "values('${body[levelTwoOrgId]}','${body[org_name]}','2','${body[parent_id]}')"))
@@ -386,8 +505,8 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
386 505
                     Message in = exchange.getIn();
387 506
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);//沙南作业区@沙采二区
388 507
 
389
-                    String org_id = UUID.randomUUID().toString().replace("-", "").substring(0, 10);
390
-                    aRow.put("levelOneOrgId", org_id);
508
+                    String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10);
509
+                    aRow.put("levelOneOrgId",org_id);
391 510
                 })
392 511
                         .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" +
393 512
                                 "values('${body[levelOneOrgId]}','${body[parent_id]}','1','0')"))
@@ -420,40 +539,10 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
420 539
                         .setBody(simple("update centralbase.cb_cd_well_source set org_id = '${body[org_id]}' where station_name = '${body[org_name]}'"))
421 540
                         .to("jdbc:centralbase")
422 541
                         .log("insert !!!")
423
-                        .end();
542
+                        .end();*/
424 543
             }
425 544
 
426 545
             ;
427 546
         };
428 547
     }
429
-}
430
-
431
-
432
-   /* from("timer:mytimer1?period=30000")
433
-                        .setHeader("acqui_time", constant(formatDate))
434
-                                .setBody(simple("select distinct cydmc jlz ,zyq from zd_zdgs.dba01@A2 where rq  > to_date('2021-06-30','yyyy-MM-dd')  group by zyq,cydmc"))
435
-                                .to("jdbc:oracle")
436
-                                .split(body()).process(exchange -> {
437
-                                Message in = exchange.getIn();
438
-                                HashMap<String, Object> aRow = in.getBody(HashMap.class);
439
-        })
440
-        .setBody(simple("insert into centralbase.cb_pc_organization(org_name,org_level,parent_id) " +
441
-        "select '${body[JLZ]}' ,'2' ,org_id  from centralbase.cb_pc_organization  where org_name = '${body[ZYQ]}' " +
442
-        "group by org_id "))
443
-        .to("jdbc:centralbase")
444
-        .log("insert !!!!")
445
-        .end();*/
446
-    /*from("timer:mytimer2?period=30000")
447
-                        .startupOrder(20)
448
-                                .setBody(simple("select distinct cydmc jlz ,zyq,zk zd from zd_zdgs.dba01@A2 where rq  > to_date('2021-06-30','yyyy-MM-dd')"))
449
-                                .to("jdbc:oracle")
450
-                                .split(body()).process(exchange -> {
451
-                                Message in = exchange.getIn();
452
-                                HashMap<String, Object> aRow = in.getBody(HashMap.class);
453
-        })
454
-        .setBody(simple("insert into centralbase.cb_pc_organization(org_name,org_level,parent_id) " +
455
-        "select '${body[ZD]}' ,'3' ,org_id  from centralbase.cb_pc_organization  where org_name = '${body[JLZ]}' " +
456
-        "group by org_id "))
457
-        .to("jdbc:centralbase")
458
-        .log("insert completed")
459
-        .end();*/
548
+}