Browse Source

正常使用版本

gxt 3 years ago
parent
commit
a1168c8b16

+ 2 - 1
src/main/java/com/gct/tools/etlcamelhuge/MQ/MessageProducer.java

@@ -1,5 +1,6 @@
1 1
 package com.gct.tools.etlcamelhuge.MQ;
2 2
 
3
+import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
3 4
 import org.apache.rocketmq.client.exception.MQClientException;
4 5
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
5 6
 import org.apache.rocketmq.client.producer.SendCallback;
@@ -23,7 +24,7 @@ public interface MessageProducer {
23 24
     void init();
24 25
 
25 26
     default void send(MessageBody msgBody) {
26
-        LoggerFactory.getLogger(MessageProducer.class).info("sending msg :==>{}", msgBody.toJsonBody());
27
+        LoggerFactory.getLogger(MessageProducer.class).info("sending msg :==>{}",((DiagnoseMsg)msgBody).getWellId()+" "+ ((DiagnoseMsg)msgBody).getProdDate()+" "+((DiagnoseMsg)msgBody).getSgt().substring(0,5));
27 28
     }
28 29
 
29 30
     void destroy() throws Exception;

+ 33 - 50
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -1,8 +1,10 @@
1 1
 package com.gct.tools.etlcamelhuge.routeconfig;
2 2
 
3
+import com.alibaba.fastjson.JSONObject;
3 4
 import com.gct.common.util.SGTUtil;
4 5
 import com.gct.tools.etlcamelhuge.MQ.MessageBody;
5 6
 import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
7
+import com.gct.tools.etlcamelhuge.camelconfig.MyDataSourceConfiguration;
6 8
 import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
7 9
 import org.apache.camel.*;
8 10
 import org.apache.camel.builder.RouteBuilder;
@@ -11,8 +13,10 @@ import org.apache.rocketmq.spring.core.RocketMQTemplate;
11 13
 import org.springframework.beans.factory.annotation.Autowired;
12 14
 import org.springframework.context.annotation.Bean;
13 15
 import org.springframework.context.annotation.Configuration;
16
+import org.springframework.jdbc.core.JdbcTemplate;
14 17
 
15 18
 import javax.annotation.Resource;
19
+import javax.sql.DataSource;
16 20
 import java.math.BigDecimal;
17 21
 import java.text.DecimalFormat;
18 22
 import java.text.SimpleDateFormat;
@@ -46,10 +50,22 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
46 50
         return Arrays.stream(doubles).max().getAsDouble();
47 51
     }
48 52
 
53
+
49 54
     public String getDate(){
50 55
         return  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
51 56
     }
52 57
 
58
+    private JdbcTemplate jdbcTemplate;
59
+    @Resource(name = "gtsj")
60
+    DataSource dataSource;
61
+
62
+
63
+
64
+    public  List list(){
65
+                jdbcTemplate = new JdbcTemplate(dataSource);
66
+     return    jdbcTemplate.queryForList("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '2021-08-25 17:00:00' offset  0 limit 100");
67
+    }
68
+
53 69
     @Resource(name = "diagnoseMessageProducer")
54 70
     private MessageProducer producer;
55 71
     @Bean
@@ -62,15 +78,11 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
62 78
             //全部执行完成的大概时间在30-40分钟
63 79
             @Override
64 80
             public void configure() throws Exception {
65
-                //24小时执行一次
66
-                //单个执行时间30s左右,在之前有数据的情况下
67
-                from("timer:mytimer1?period=604800000")
81
+                /*from("timer:mytimer1?period=604800000")
68 82
                         .routeId("oracle-1")
69 83
                         .setHeader("date", constant(getDate()))
70
-                        .doTry()
71 84
                         .setBody(simple("select  distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
72 85
                         .to("jdbc:oracle")
73
-                        .doCatch(Exception.class)
74 86
                         .log("${header.date}"+"routeId:oracle-1->  select cb_cd_well_source need data failed")
75 87
                         .transform()
76 88
                         .body((result) -> {
@@ -87,8 +99,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
87 99
                     String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC");
88 100
                     String org_level1 = aRow.get("ZYQ").toString();
89 101
                     aRow.put("station_id", org_level3);
90
-                    //这里是重新生成的orgid,最好先查一下centralbase里已有的
91
-                    //默认一张新表
92 102
                     if (organization.add(org_level1)) {
93 103
                         orgID++;
94 104
                         orgIDs.put(org_level1, orgID);
@@ -102,12 +112,10 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
102 112
                         orgIDs.put(org_level3, orgID);
103 113
                     }
104 114
                 })
105
-                        .doTry()
106 115
                         .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
107 116
                                 "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}')" +
108 117
                                 " ON conflict(well_id) DO UPDATE set remarks = '${body[BZ]}' "))
109 118
                         .to("jdbc:centralbase")
110
-                        .doCatch(Exception.class)
111 119
                         .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_cd_well_source insert data failed")
112 120
                         .end()
113 121
                         .transform().body((re) -> {
@@ -157,15 +165,14 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
157 165
                         .to("jdbc:centralbase")
158 166
                         .doCatch(Exception.class)
159 167
                         .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization update data failed")
160
-                        .end();
161
-                //单独执行时间10s
168
+                        .end();*/
169
+
170
+
162 171
                 from("timer:mytimer2?period=3600000")
163 172
                         .routeId("oracle-2")
164 173
                         .setHeader("date", constant(getDate()))
165
-                        .doTry()
166 174
                         .setBody(simple("select  distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
167 175
                         .to("jdbc:oracle")
168
-                        .doCatch(Exception.class)
169 176
                         .log("${header.date}"+" routeId:oracle-2->  select cb_pc_pro_wellbore_status_daily  need data failed")
170 177
                         .split(body()).process(exchange -> {
171 178
                     Message in = exchange.getIn();
@@ -184,19 +191,14 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
184 191
                         .doCatch(Exception.class)
185 192
                         .log("${header.date}"+" routeId:oracle-2->  centralbase.cb_pc_pro_wellbore_status_daily insert data failed")
186 193
                         .end();
187
-                //查询井对应dym不为空的数据 --目前是只要对应井能查到dym不为空的,无论是什么时间的,都放进去
188
-                //将查询到的DYM数据更新到cb_pc_pro_wellbore_status_daily中
189
-                //0 0 */1 * * ? 每1个小时执行一次
190
-                //单独执行时间是4m15s 317条数据
194
+
191 195
                 from("timer:mytimer5?period=3600000")
192 196
                         .routeId("oracle-5")
193 197
                         .setHeader("date", constant(getDate() + " 00:00:00"))
194 198
                         //三个月之内dym不为空的数据
195
-                        .doTry()
196 199
                         //.setBody(simple("SELECT distinct jh,rq,dym FROM DBA01 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
197 200
                         .setBody(simple("SELECT  distinct jh,max(rq),dym FROM DBA01 WHERE dym is not null group by jh,dym"))
198 201
                         .to("jdbc:oracle")
199
-                        .doCatch(Exception.class)
200 202
                         .log("${header.date}"+" routeId:oracle-5->  select DYM data failed")
201 203
                         .split(body())
202 204
                         .doTry()
@@ -209,11 +211,9 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
209 211
                 from("timer:mytimer7?period=3600000")
210 212
                         .routeId("oracle-7")
211 213
                         .setHeader("date", constant(getDate() + " 00:00:00"))
212
-                        .doTry()
213 214
                         //.setBody(simple("SELECT  distinct jh,max(rq),dym FROM DBA01 WHERE dym is not null group by jh,dym"))
214 215
                         .setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' "))
215 216
                         .to("jdbc:centralbase")
216
-                        .doCatch(Exception.class)
217 217
                         .log("${header.date}"+" routeId:oracle-7-> select  centralbase.cb_pc_pro_wellbore_status_daily need data failed")
218 218
                         .split(body()).process(exchange -> {
219 219
                             Message in = exchange.getIn();
@@ -232,14 +232,12 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
232 232
                         .doCatch(Exception.class)
233 233
                         .log("${header.date}"+" routeId:oracle-7->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
234 234
                         .end();
235
-                //单独执行时间30s
235
+
236 236
                 from("timer:mytimer3?period=3600000")
237 237
                         .routeId("oracle-3")
238 238
                         .setHeader("date", constant(getDate()))
239
-                        .doTry()
240 239
                         .setBody(simple("select distinct  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
241 240
                         .to("jdbc:oracle")
242
-                        .doCatch(Exception.class)
243 241
                         .log("${header.date}"+" routeId:oracle-3->  select centralbase.cb_pc_pro_wellbore_vol_daily need  insert data failed")
244 242
                         .split(body()).process(exchange -> {
245 243
                     Message in = exchange.getIn();
@@ -305,10 +303,8 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
305 303
                 from("timer:mytimer12?period=3600000")
306 304
                         .routeId("oracle-12")
307 305
                         .setHeader("date", constant(getDate()))
308
-                        .doTry()
309 306
                         .setBody(simple("select distinct  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
310 307
                         .to("jdbc:oracle")
311
-                        .doCatch(Exception.class)
312 308
                         .log("${header.date}"+" routeId:oracle-12-> select  centralbase.cb_pc_pro_wellbore_vol_daily need  update data failed")
313 309
                         .split(body()).process(exchange -> {
314 310
                             Message in = exchange.getIn();
@@ -360,15 +356,12 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
360 356
                         .doCatch(Exception.class)
361 357
                         .log("${header.date}"+" routeId:oracle-12->  centralbase.cb_pc_pro_wellbore_vol_daily update data failed")
362 358
                         .end();
363
-                //0 0 */1 * * ? 每1个小时执行一次
364
-                //单独执行一次30s
359
+
365 360
                 from("timer:mytimer4?period=3600000")
366 361
                         .routeId("oracle-4")
367 362
                         .setHeader("date", constant(getDate()))
368
-                        .doTry()
369 363
                         .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null  "))
370 364
                         .to("jdbc:oracle")
371
-                        .doCatch(Exception.class)
372 365
                         .log("${header.date}"+" routeId:oracle-4-> select  centralbase.cb_temp_well_mech_daily need insert data failed")
373 366
                         .split(body()).process(exchange -> {
374 367
                     Message in = exchange.getIn();
@@ -396,40 +389,29 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
396 389
                 from("timer:mytimer6?period=3600000")
397 390
                         .routeId("oracle-6")
398 391
                         .setHeader("date", constant(getDate() + " 00:00:00"))
399
-                        .doTry()
400
-                        //五个月之内bj不为空的数据
401
-                        //.setBody(simple("SELECT distinct jh,bj FROM DBA01 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-5),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and bj is not null"))
402 392
                         .setBody(simple("SELECT  distinct jh,max(rq),bj FROM DBA01 WHERE dym is not null group by jh,bj"))
403 393
                         .to("jdbc:oracle")
404
-                        .doCatch(Exception.class)
405 394
                         .log("${header.date}"+" routeId:oracle-6-> select bj  centralbase.cb_temp_well_mech_runtime need update data failed")
406
-                        .split(body())
407
-                        .doTry()
395
+                        .split(body()).process(exchange -> {
396
+                            Message in = exchange.getIn();
397
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
398
+                            if (aRow.get("BJ") ==null || aRow.get("BJ").equals("")) aRow.put("BJ","0.0");
399
+                        })
408 400
                         .setBody(simple("update centralbase.cb_temp_well_mech_runtime set pump_diameter = '${body[BJ]}' where well_id = '${body[JH]}' "))
409 401
                         .to("jdbc:centralbase")
410
-                        .doCatch(Exception.class)
411 402
                         .log("${header.date}"+" routeId:oracle-6->  centralbase.cb_temp_well_mech_runtime update data failed")
412 403
                         .end();
413
-                //从天安哪里获取的数据
414
-                //0 0 */1 * * ? 每1个小时执行一次
415
-                //单独执行一小时的数据30s
416
-               from("timer:mytimer8?period=3600000")
404
+                from("timer:mytimer8?period=3600000")
417 405
                         .routeId("jdbc-gtsj-?")
418
-                        .doTry()
419 406
                         .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
420 407
                         .to("jdbc:centralbase")
421
-                        .doCatch(Exception.class)
422
-                        .log("${header.date}"+" routeId:jdbc-gtsj-?->select max(date)  data failed")
423 408
                         .split(body())
424 409
                         .setHeader("date", simple("${body[max]}"))
425 410
                         .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '${header.date}' "))
426
-                        //.setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '2021-08-23 17:00:00' "))
427 411
                         .to("jdbc:gtsj")
428 412
                         .split(body()).process(exchange -> {
429
-
430 413
                     Message in = exchange.getIn();
431 414
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
432
-                    System.out.println("well_name---"+aRow.get("well_name"));
433 415
                     String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
434 416
                     aRow.put("dyna_create_time", prod_date);
435 417
                     if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
@@ -480,11 +462,9 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
480 462
                   from("timer:mytimer9?period=3600000")
481 463
                         .routeId("centralbase-2")
482 464
                         .setHeader("date", constant(getDate() + " 00:00:00"))
483
-                        .doTry()
484 465
                         .setBody(simple("select distinct rn.well_id,cb.prod_date,rn.pump_diameter  from centralbase.cb_temp_well_mech_runtime rn ,centralbase.cb_pc_pro_wellbore_status_daily cb where cb.well_id = rn.well_id\n" +
485 466
                                 "and cb.prod_date = '${header.date}' "))
486 467
                         .to("jdbc:centralbase")//.log("${body}")
487
-                        .doCatch(Exception.class)
488 468
                         .log("${header.date}"+" routeId:centralbase-2-> select  pump_diameter centralbase.cb_pc_pro_wellbore_status_daily  data failed")
489 469
                         .split(body()).process(exchange -> {
490 470
                             Message in = exchange.getIn();
@@ -526,6 +506,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
526 506
                         .doCatch(Exception.class)
527 507
                         .log("${header.date}"+" routeId:centralbase-3->  centralbase.cb_temp_well_mech_daily update data failed")
528 508
                         .end();
509
+
529 510
                 from("timer:mytimer11?period=3600000")
530 511
                         .routeId("centralbase-1")
531 512
                         .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) "))
@@ -551,7 +532,9 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
551 532
                         .doCatch(Exception.class)
552 533
                         .log("${header.date}"+" rocketMQ send data failed")
553 534
                         .end();
535
+
554 536
             };
555 537
         };
538
+
556 539
     }
557
-}
540
+}

+ 3 - 13
src/test/java/com/gct/tools/etlcamelhuge/EtlCamelHugeApplicationTests.java

@@ -165,21 +165,11 @@ class EtlCamelHugeApplicationTests {
165 165
 
166 166
     }
167 167
 
168
+    @Autowired
169
+    CamelJDBCConfiguration camelJDBCConfiguration;
168 170
     @Test
169 171
     public void test10() {
170
-        JSONArray jsonArray = new JSONArray();
171
-        JSONObject search = new JSONObject();
172
-        JSONObject body = new JSONObject();
173
-        body.put("userno", "zdcycgtlyxt"); // 分配的系统用户账号
174
-        body.put("password", "Zdcycgtlyxt");
175
-        body.put("pageNo", 1); // 页码,从1开始
176
-        body.put("pageSize", 10);
177
-        search.put("field","父ID");
178
-        search.put("rule","=");
179
-        search.put("val","kvAfwAVEzv");
180
-        jsonArray.add(search);
181
-        body.put("search",jsonArray);
182
-        System.out.println(body);
172
+
183 173
     }
184 174
 
185 175
     public static void main(String args[]) throws Exception {