|
@@ -2,10 +2,12 @@ package com.gct.tools.etlcamelhuge.routeconfig;
|
2
|
2
|
|
3
|
3
|
import com.alibaba.fastjson.JSONObject;
|
4
|
4
|
import com.gct.common.util.SGTUtil;
|
|
5
|
+import com.gct.tools.etlcamelhuge.MQ.DefaultMsgSendSuccessCallBack;
|
5
|
6
|
import com.gct.tools.etlcamelhuge.MQ.MessageBody;
|
6
|
7
|
import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
|
7
|
8
|
import com.gct.tools.etlcamelhuge.camelconfig.MyDataSourceConfiguration;
|
8
|
9
|
import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
|
|
10
|
+import lombok.Data;
|
9
|
11
|
import org.apache.camel.*;
|
10
|
12
|
import org.apache.camel.builder.RouteBuilder;
|
11
|
13
|
//import org.apache.rocketmq.common.message.Message;
|
|
@@ -24,6 +26,13 @@ import java.time.LocalDateTime;
|
24
|
26
|
import java.time.format.DateTimeFormatter;
|
25
|
27
|
import java.util.*;
|
26
|
28
|
|
|
29
|
+@Data
|
|
30
|
+class LogMessage{
|
|
31
|
+ String id;
|
|
32
|
+ LocalDateTime date;
|
|
33
|
+ Object data;
|
|
34
|
+ String msg;
|
|
35
|
+}
|
27
|
36
|
/**
|
28
|
37
|
* class name: CamelJDBCConfiguration
|
29
|
38
|
*
|
|
@@ -55,16 +64,11 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
55
|
64
|
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
|
56
|
65
|
}
|
57
|
66
|
|
58
|
|
- private JdbcTemplate jdbcTemplate;
|
59
|
|
- @Resource(name = "gtsj")
|
60
|
|
- DataSource dataSource;
|
61
|
67
|
|
62
|
68
|
|
|
69
|
+ private static Integer sendMsgRunTime=0;
|
|
70
|
+
|
63
|
71
|
|
64
|
|
- public List list(){
|
65
|
|
- jdbcTemplate = new JdbcTemplate(dataSource);
|
66
|
|
- return jdbcTemplate.queryForList("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where dyna_create_time > '2021-08-25 17:00:00' offset 0 limit 100");
|
67
|
|
- }
|
68
|
72
|
|
69
|
73
|
@Resource(name = "diagnoseMessageProducer")
|
70
|
74
|
private MessageProducer producer;
|
|
@@ -173,7 +177,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
173
|
177
|
.setHeader("date", constant(getDate()))
|
174
|
178
|
.setBody(simple("select distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
|
175
|
179
|
.to("jdbc:oracle")
|
176
|
|
- .log("${header.date}"+" routeId:oracle-2-> select cb_pc_pro_wellbore_status_daily need data failed")
|
177
|
180
|
.split(body()).process(exchange -> {
|
178
|
181
|
Message in = exchange.getIn();
|
179
|
182
|
HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
|
@@ -199,7 +202,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
199
|
202
|
//.setBody(simple("SELECT distinct jh,rq,dym FROM DBA01 WHERE rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
|
200
|
203
|
.setBody(simple("SELECT distinct jh,max(rq),dym FROM DBA01 WHERE dym is not null group by jh,dym"))
|
201
|
204
|
.to("jdbc:oracle")
|
202
|
|
- .log("${header.date}"+" routeId:oracle-5-> select DYM data failed")
|
203
|
205
|
.split(body())
|
204
|
206
|
.doTry()
|
205
|
207
|
.setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' and prod_date::date = '${header.date}' "))
|
|
@@ -214,7 +216,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
214
|
216
|
//.setBody(simple("SELECT distinct jh,max(rq),dym FROM DBA01 WHERE dym is not null group by jh,dym"))
|
215
|
217
|
.setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' "))
|
216
|
218
|
.to("jdbc:centralbase")
|
217
|
|
- .log("${header.date}"+" routeId:oracle-7-> select centralbase.cb_pc_pro_wellbore_status_daily need data failed")
|
218
|
219
|
.split(body()).process(exchange -> {
|
219
|
220
|
Message in = exchange.getIn();
|
220
|
221
|
HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
|
@@ -238,7 +239,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
238
|
239
|
.setHeader("date", constant(getDate()))
|
239
|
240
|
.setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
|
240
|
241
|
.to("jdbc:oracle")
|
241
|
|
- .log("${header.date}"+" routeId:oracle-3-> select centralbase.cb_pc_pro_wellbore_vol_daily need insert data failed")
|
242
|
242
|
.split(body()).process(exchange -> {
|
243
|
243
|
Message in = exchange.getIn();
|
244
|
244
|
HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
|
@@ -305,7 +305,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
305
|
305
|
.setHeader("date", constant(getDate()))
|
306
|
306
|
.setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
|
307
|
307
|
.to("jdbc:oracle")
|
308
|
|
- .log("${header.date}"+" routeId:oracle-12-> select centralbase.cb_pc_pro_wellbore_vol_daily need update data failed")
|
309
|
308
|
.split(body()).process(exchange -> {
|
310
|
309
|
Message in = exchange.getIn();
|
311
|
310
|
HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
|
@@ -362,7 +361,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
362
|
361
|
.setHeader("date", constant(getDate()))
|
363
|
362
|
.setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
|
364
|
363
|
.to("jdbc:oracle")
|
365
|
|
- .log("${header.date}"+" routeId:oracle-4-> select centralbase.cb_temp_well_mech_daily need insert data failed")
|
366
|
364
|
.split(body()).process(exchange -> {
|
367
|
365
|
Message in = exchange.getIn();
|
368
|
366
|
HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
|
@@ -391,7 +389,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
391
|
389
|
.setHeader("date", constant(getDate() + " 00:00:00"))
|
392
|
390
|
.setBody(simple("SELECT distinct jh,max(rq),bj FROM DBA01 WHERE dym is not null group by jh,bj"))
|
393
|
391
|
.to("jdbc:oracle")
|
394
|
|
- .log("${header.date}"+" routeId:oracle-6-> select bj centralbase.cb_temp_well_mech_runtime need update data failed")
|
395
|
392
|
.split(body()).process(exchange -> {
|
396
|
393
|
Message in = exchange.getIn();
|
397
|
394
|
HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
|
@@ -401,6 +398,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
401
|
398
|
.to("jdbc:centralbase")
|
402
|
399
|
.log("${header.date}"+" routeId:oracle-6-> centralbase.cb_temp_well_mech_runtime update data failed")
|
403
|
400
|
.end();
|
|
401
|
+
|
404
|
402
|
from("timer:mytimer8?period=3600000")
|
405
|
403
|
.routeId("jdbc-gtsj-?")
|
406
|
404
|
.setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
|
|
@@ -456,7 +454,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
456
|
454
|
"where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and prod_date = '${body[dyna_create_time]}' )"))
|
457
|
455
|
.to("jdbc:centralbase")
|
458
|
456
|
.doCatch(Exception.class)
|
459
|
|
- .log("${header.date}"+" routeId:jdbc-gtsj-?-> centralbase.cb_temp_well_mech_runtime insert data failed")
|
|
457
|
+ .log("${header.date}"+" routeId:jdbc-gtsj-?-> centralbase.cb_temp_well_mech_runtime insert data failed ${body}")
|
460
|
458
|
.end();
|
461
|
459
|
|
462
|
460
|
from("timer:mytimer9?period=3600000")
|
|
@@ -465,7 +463,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
465
|
463
|
.setBody(simple("select distinct rn.well_id,cb.prod_date,rn.pump_diameter from centralbase.cb_temp_well_mech_runtime rn ,centralbase.cb_pc_pro_wellbore_status_daily cb where cb.well_id = rn.well_id\n" +
|
466
|
464
|
"and cb.prod_date = '${header.date}' "))
|
467
|
465
|
.to("jdbc:centralbase")//.log("${body}")
|
468
|
|
- .log("${header.date}"+" routeId:centralbase-2-> select pump_diameter centralbase.cb_pc_pro_wellbore_status_daily data failed")
|
469
|
466
|
.split(body()).process(exchange -> {
|
470
|
467
|
Message in = exchange.getIn();
|
471
|
468
|
HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
|
@@ -481,11 +478,8 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
481
|
478
|
from("timer:mytimer10?period=3600000")
|
482
|
479
|
.routeId("centralbase-3")
|
483
|
480
|
.setHeader("date", constant(getDate()))
|
484
|
|
- .doTry()
|
485
|
481
|
.setBody(simple("select well_id,avg(stroke_length) stroke_length ,avg(stroke_frequency) stroke_frequency from centralbase.cb_temp_well_mech_runtime where prod_date::date='${header.date}' group by well_id"))
|
486
|
482
|
.to("jdbc:centralbase")//.log("${body}")
|
487
|
|
- .doCatch(Exception.class)
|
488
|
|
- .log("${header.date}"+" routeId:centralbase-3-> select stroke_length centralbase.cb_temp_well_mech_daily data failed")
|
489
|
483
|
.split(body()).process(exchange -> {
|
490
|
484
|
Message in = exchange.getIn();
|
491
|
485
|
HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
|
@@ -509,8 +503,11 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
509
|
503
|
|
510
|
504
|
from("timer:mytimer11?period=3600000")
|
511
|
505
|
.routeId("centralbase-1")
|
512
|
|
- .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) "))
|
513
|
|
- .to("jdbc:centralbase")
|
|
506
|
+ .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date = (select max(prod_date) from centralbase.cb_temp_well_mech_runtime) "))
|
|
507
|
+ .to("jdbc:centralbase").process(exchange -> {
|
|
508
|
+ sendMsgRunTime=0;
|
|
509
|
+ DefaultMsgSendSuccessCallBack.count.set(0);
|
|
510
|
+ })
|
514
|
511
|
.split(body())
|
515
|
512
|
.doTry()
|
516
|
513
|
.process(exchange -> {
|
|
@@ -527,12 +524,15 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
527
|
524
|
sgt = "0,0";
|
528
|
525
|
}
|
529
|
526
|
DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
|
530
|
|
- producer.send((MessageBody) diagnoseMsg);
|
|
527
|
+ sendMsgRunTime++;
|
|
528
|
+ System.out.println("sendMsgRunTime = " + sendMsgRunTime);
|
|
529
|
+ producer.send(diagnoseMsg);
|
531
|
530
|
})
|
532
|
531
|
.doCatch(Exception.class)
|
533
|
532
|
.log("${header.date}"+" rocketMQ send data failed")
|
534
|
533
|
.end();
|
535
|
534
|
|
|
535
|
+
|
536
|
536
|
};
|
537
|
537
|
};
|
538
|
538
|
|