Просмотр исходного кода

修改组织机构基础表的导入问题 待测试

Lloyd лет назад: 4
Родитель
Сommit
fdacb8cf33

+ 100 - 43
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -1,26 +1,15 @@
1 1
 package com.gct.tools.etlcamelhuge.routeconfig;
2 2
 
3
-import com.alibaba.fastjson.JSONObject;
4
-import com.gct.tools.etlcamelhuge.entity.MQDiagnoseMsg;
5 3
 import org.apache.camel.*;
6 4
 import org.apache.camel.builder.RouteBuilder;
7
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
8 5
 //import org.apache.rocketmq.common.message.Message;
9 6
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
10
-import org.slf4j.Logger;
11 7
 import org.springframework.beans.factory.annotation.Autowired;
12 8
 import org.springframework.context.annotation.Bean;
13 9
 import org.springframework.context.annotation.Configuration;
14
-import org.springframework.http.HttpEntity;
15
-import org.springframework.http.HttpHeaders;
16
-import org.springframework.http.MediaType;
17
-import org.springframework.jdbc.support.SQLStateSQLExceptionTranslator;
18 10
 
19
-import java.math.BigDecimal;
20
-import java.nio.charset.StandardCharsets;
21 11
 import java.text.SimpleDateFormat;
22 12
 import java.util.*;
23
-import java.util.concurrent.atomic.AtomicInteger;
24 13
 
25 14
 /**
26 15
  * class name: CamelJDBCConfiguration
@@ -30,7 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger;
30 19
  * @since 2021/4/14 下午3:16
31 20
  */
32 21
 @Configuration
33
-public class CamelJDBCConfiguration /*extends RouteBuilder */{
22
+public class CamelJDBCConfiguration /*extends RouteBuilder */ {
34 23
 
35 24
     @Autowired
36 25
     private RocketMQTemplate rocketMQTemplate;
@@ -255,30 +244,96 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
255 244
             };
256 245
         };
257 246
     }*/
258
-
259
-
260
-
261
- @Bean
247
+    @Bean
262 248
     public RouteBuilder routeBuilderWithOracle1() {
263
-         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
264
-         Date date = new Date(System.currentTimeMillis());
265
-         String formatDate = formatter.format(date) + " 00:00:00";
249
+        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
250
+        Date date = new Date(System.currentTimeMillis());
251
+        String formatDate = formatter.format(date) + " 00:00:00";
266 252
         return new RouteBuilder() {
253
+            private SortedSet<String> organization;
254
+            private Map<String, Integer> orgIDs;
255
+            private Integer orgID;
256
+
267 257
             @Override
268 258
             public void configure() throws Exception {
259
+
269 260
                 from("timer:mytimer1?period=999999999")
270
-                        .routeId("oracle-1")
271
-                        .setBody(simple("select  jh,cydmc,zyq,zk,qyrq from zd_zdgs.dba01@A2 where rq  = to_date('2021-07-01','yyyy-MM-dd') and qyrq is not null "))
272
-                        .to("jdbc:oracle")
273
-                        .split(body()).process(exchange -> {
274
-                        Message in = exchange.getIn();
275
-                        HashMap<String, Object> aRow = in.getBody(HashMap.class);
276
-                        aRow.put("station_id",aRow.get("ZYQ")+"@"+aRow.get("CYDMC")+"@"+aRow.get("ZK"));
277
-                }).log("${body}")
278
-                        .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name) " +
279
-                                "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}')"))
261
+                .routeId("oracle-1")
262
+                .setBody(simple("select  jh,cydmc,zyq,zk,qyrq from zd_zdgs.dba01@A2 where rq  = to_date('2021-07-01','yyyy-MM-dd') and qyrq is not null "))
263
+                .to("jdbc:oracle")
264
+                .transform()
265
+                .body((result) -> {
266
+                    organization = new TreeSet<>();
267
+                    orgID = 0;
268
+                    orgIDs = new HashMap<>();
269
+                    return result;
270
+                })
271
+                .split(body()).process(exchange -> {
272
+
273
+                    Message in = exchange.getIn();
274
+                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
275
+                    String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK");
276
+                    String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC");
277
+                    String org_level1 = aRow.get("ZYQ").toString();
278
+                    aRow.put("station_id", org_level3);
279
+                    //这里是重新生成的orgid,最好先查一下centralbase里已有的
280
+                    //默认一张新表
281
+                    if (organization.add(org_level1)) {
282
+                        orgID++;
283
+                        orgIDs.put(org_level1, orgID);
284
+                    }
285
+                    if (organization.add(org_level2)) {
286
+                        orgID++;
287
+                        orgIDs.put(org_level2, orgID);
288
+                    }
289
+                    if (organization.add(org_level3)) {
290
+                        orgID++;
291
+                        orgIDs.put(org_level3, orgID);
292
+                    }
293
+
294
+
295
+                })
296
+                .log("${body}")
297
+                .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name) " +
298
+                        "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}')"))
299
+                .to("jdbc:centralbase")
300
+
301
+                .log("insert success")
302
+                /*******notice*********/
303
+                .transform().body((re) -> {
304
+                    List<Map<String, Object>> rows = new ArrayList<>();
305
+                    int code = 0;
306
+                    for (String s : organization) {
307
+                        code++;// code is same as org_id
308
+                        String[] orgs = s.split("@");
309
+                        Map<String, Object> row = new HashMap<>();
310
+                        row.put("org_id_pre", s);
311
+                        row.put("org_code", code);
312
+                        row.put("org_id", "" + code);
313
+                        switch (orgs.length) {
314
+                            case 1:
315
+                                row.put("org_name", orgs[0]);
316
+                                row.put("org_level", 1);
317
+                                row.put("org_parent", "0");
318
+                                break;
319
+                            case 2:
320
+                                row.put("org_name", orgs[1]);
321
+                                row.put("org_level", 2);
322
+                                row.put("org_parent", orgIDs.get(orgs[0]).toString());
323
+                                break;
324
+                            case 3:
325
+                                row.put("org_name", orgs[2]);
326
+                                row.put("org_level", 3);
327
+                                row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString());
328
+                                break;
329
+                        }
330
+                    }
331
+                    return rows;
332
+                })
333
+                        .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
334
+                                "values('${body[org_id],body[org_code],body[org_name],body[org_level],body[parent_id],body[org_id_pre]}"))
280 335
                         .to("jdbc:centralbase")
281
-                        .log("insert success")
336
+                        /*******************************/
282 337
                         .end();
283 338
 
284 339
                 from("timer:mytimer2?period=999999999")
@@ -286,12 +341,12 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
286 341
                         .setBody(simple("select distinct station_id,station_name from centralbase.cb_cd_well_source"))
287 342
                         .to("jdbc:centralbase")
288 343
                         .split(body()).process(exchange -> {
289
-                            Message in = exchange.getIn();
290
-                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
291
-                            String parent_id = aRow.get("station_id").toString().substring(0, aRow.get("station_id").toString().lastIndexOf("@"));
292
-                            aRow.put("parent_id",parent_id);
293
-                            String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10);
294
-                            aRow.put("org_id",org_id);
344
+                    Message in = exchange.getIn();
345
+                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
346
+                    String parent_id = aRow.get("station_id").toString().substring(0, aRow.get("station_id").toString().lastIndexOf("@"));
347
+                    aRow.put("parent_id", parent_id);
348
+                    String org_id = UUID.randomUUID().toString().replace("-", "").substring(0, 10);
349
+                    aRow.put("org_id", org_id);
295 350
                 })
296 351
                         .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" +
297 352
                                 "values('${body[org_id]}','${body[station_name]}','3','${body[parent_id]}')"))
@@ -309,13 +364,13 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
309 364
                     String parent_id = aRow.get("parent_id").toString().split("@")[0];
310 365
                     String org_name = aRow.get("parent_id").toString().split("@")[1];
311 366
                     //if (!aRow.containsValue(org_name)){
312
-                    aRow.put("org_name",org_name);
367
+                    aRow.put("org_name", org_name);
313 368
                     // }
314 369
                     //if (!aRow.containsValue(parent_id)){
315
-                    aRow.put("parent_id",parent_id);
370
+                    aRow.put("parent_id", parent_id);
316 371
                     //}
317
-                    String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10);
318
-                    aRow.put("levelTwoOrgId",org_id);
372
+                    String org_id = UUID.randomUUID().toString().replace("-", "").substring(0, 10);
373
+                    aRow.put("levelTwoOrgId", org_id);
319 374
                 })
320 375
                         .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" +
321 376
                                 "values('${body[levelTwoOrgId]}','${body[org_name]}','2','${body[parent_id]}')"))
@@ -331,8 +386,8 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
331 386
                     Message in = exchange.getIn();
332 387
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);//沙南作业区@沙采二区
333 388
 
334
-                    String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10);
335
-                    aRow.put("levelOneOrgId",org_id);
389
+                    String org_id = UUID.randomUUID().toString().replace("-", "").substring(0, 10);
390
+                    aRow.put("levelOneOrgId", org_id);
336 391
                 })
337 392
                         .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" +
338 393
                                 "values('${body[levelOneOrgId]}','${body[parent_id]}','1','0')"))
@@ -366,7 +421,9 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */{
366 421
                         .to("jdbc:centralbase")
367 422
                         .log("insert !!!")
368 423
                         .end();
369
-            };
424
+            }
425
+
426
+            ;
370 427
         };
371 428
     }
372 429
 }