|
@@ -8,6 +8,7 @@ import com.gct.tools.etlcamelhuge.MQ.MessageBody;
|
8
|
8
|
import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
|
9
|
9
|
import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
|
10
|
10
|
import com.gct.tools.etlcamelhuge.entity.ZDJG_WELL;
|
|
11
|
+import lombok.extern.slf4j.Slf4j;
|
11
|
12
|
import org.apache.camel.*;
|
12
|
13
|
import org.apache.camel.builder.RouteBuilder;
|
13
|
14
|
//import org.apache.rocketmq.common.message.Message;
|
|
@@ -18,9 +19,11 @@ import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
18
|
19
|
import org.springframework.beans.factory.annotation.Autowired;
|
19
|
20
|
import org.springframework.context.annotation.Bean;
|
20
|
21
|
import org.springframework.context.annotation.Configuration;
|
|
22
|
+import org.springframework.jdbc.core.JdbcTemplate;
|
21
|
23
|
import org.springframework.util.StringUtils;
|
22
|
24
|
|
23
|
25
|
import javax.annotation.Resource;
|
|
26
|
+import javax.sql.DataSource;
|
24
|
27
|
import java.io.*;
|
25
|
28
|
import java.math.BigDecimal;
|
26
|
29
|
import java.net.HttpURLConnection;
|
|
@@ -28,7 +31,9 @@ import java.net.URL;
|
28
|
31
|
import java.net.URLEncoder;
|
29
|
32
|
import java.text.DecimalFormat;
|
30
|
33
|
import java.text.SimpleDateFormat;
|
|
34
|
+import java.time.LocalDate;
|
31
|
35
|
import java.time.LocalDateTime;
|
|
36
|
+import java.time.Period;
|
32
|
37
|
import java.time.format.DateTimeFormatter;
|
33
|
38
|
import java.util.*;
|
34
|
39
|
|
|
@@ -39,9 +44,49 @@ import java.util.*;
|
39
|
44
|
* @version 1.0
|
40
|
45
|
* @since 2021/4/14 下午3:16
|
41
|
46
|
*/
|
|
47
|
+@Slf4j
|
42
|
48
|
@Configuration
|
43
|
49
|
public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
44
|
50
|
|
|
51
|
+ private JdbcTemplate jdbcTemplate;
|
|
52
|
+
|
|
53
|
+ @Resource(name = "oracle_A2")
|
|
54
|
+ DataSource baseDataSource;
|
|
55
|
+
|
|
56
|
+
|
|
57
|
+ public Map<String, Object> getJbzq(String date,String wellId){
|
|
58
|
+ List<Map<String, Object>> list = new ArrayList<>();
|
|
59
|
+ String zq = "";
|
|
60
|
+
|
|
61
|
+ try{
|
|
62
|
+ jdbcTemplate = new JdbcTemplate(baseDataSource);
|
|
63
|
+ String sql = " Select jh,scjbrq,jbyy From ( Select t.jh,t.wgrq scjbrq,t.csmc jbyy, row_number() Over(Partition By t.jh Order By t.wgrq Desc) rn FROM Ddcc03_v t Where t.jh= '"+wellId+"' And t.wgrq<=date'"+date+"') Where rn < 3 ";
|
|
64
|
+ list = jdbcTemplate.queryForList(sql);
|
|
65
|
+ if (list.size()>0){
|
|
66
|
+ String scjbrq = list.get(0).get("SCJBRQ").toString().substring(0,10);
|
|
67
|
+ list.get(0).put("ZQ","");
|
|
68
|
+ if (list.size()>1){
|
|
69
|
+ String scjbrq_two = list.get(1).get("SCJBRQ").toString().substring(0,10);
|
|
70
|
+ DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd");
|
|
71
|
+ LocalDate date3 = LocalDate.parse(scjbrq, fmt);
|
|
72
|
+ LocalDate date4 = LocalDate.parse(scjbrq_two, fmt);
|
|
73
|
+
|
|
74
|
+ Period p = Period.between(date4, date3);
|
|
75
|
+ if (p.getDays() > 0) {
|
|
76
|
+ zq +=(( p.getYears()*12 + p.getMonths())*30+ p.getDays()) + "天";
|
|
77
|
+ }
|
|
78
|
+ list.get(0).put("ZQ",zq);
|
|
79
|
+ }}else {
|
|
80
|
+ return null;
|
|
81
|
+ }
|
|
82
|
+ }catch (Exception e){
|
|
83
|
+ log.info("检泵周期中的{}",list.get(0).get("JH"));
|
|
84
|
+ }
|
|
85
|
+ return list.get(0);
|
|
86
|
+
|
|
87
|
+ }
|
|
88
|
+
|
|
89
|
+
|
45
|
90
|
public ArrayList<ZDJG_WELL> loadKey() throws java.lang.Exception {
|
46
|
91
|
String url = "http://11.72.128.71/api-service/device/query?appKey=SR5vY4bED7";
|
47
|
92
|
URL restURL = new URL(url);
|
|
@@ -525,8 +570,8 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
525
|
570
|
})
|
526
|
571
|
// .setBody(simple("Select t.jh, t.prod_date rq,t.static_pressure jy,t.flow_pres ly,t.pump_diameter bj, t.pump_depth bs,t.pump_efficiency bx,t.rotate_frequency zs,
|
527
|
572
|
// t.stroke_length cc,t.stroke_frequency cs, t.pump_type blx,t.elec_frequency dl,t.dynamic_liq_level dym FROM V_TEMP_WELL_MECH_ALL t where jh = '${body[JH]}'"))
|
528
|
|
- .setBody(simple("select * from ( Select v.WELL_COMMON_NAME, t.prod_date rq,t.static_pressure jy,t.flow_pres ly,t.pump_diameter bj, t.pump_depth bs,t.pump_efficiency bx, t.rotate_frequency zs,t.stroke_length cc,t.stroke_frequency cs, t.pump_type blx,t.elec_frequency dl, t.dynamic_liq_level dym " +
|
529
|
|
- " FROM V_TEMP_WELL_MECH_ALL t inner join V_CD_WELL_SOURCE_YC v on t.well_id = v.well_id and v.WELL_COMMON_NAME = '${body[JH]}' " +
|
|
573
|
+ .setBody(simple("select * from ( Select v.WELL_COMMON_NAME jh, t.prod_date rq,t.static_pressure jy,t.flow_pres ly,t.pump_diameter bj, t.pump_depth bs,t.pump_efficiency bx, t.rotate_frequency zs,t.stroke_length cc,t.stroke_frequency cs, t.pump_type blx,t.elec_frequency dl, t.dynamic_liq_level dym " +
|
|
574
|
+ " FROM V_TEMP_WELL_MECH_ALL t inner join V_CD_WELL_SOURCE_YC v on t.well_id = v.well_id and v.WELL_COMMON_NAME = '${body[JH]}' and t.dynamic_liq_level > 0 " +
|
530
|
575
|
" order by rq desc) where rownum = 1"))
|
531
|
576
|
.to("jdbc:oracle_A2")
|
532
|
577
|
.split(body()).process(exchange -> {
|
|
@@ -545,6 +590,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
545
|
590
|
if (aRow.get("DYM") == null) aRow.put("DYM", "0.0");
|
546
|
591
|
})
|
547
|
592
|
// .log("mytimer4")
|
|
593
|
+
|
548
|
594
|
.doTry()
|
549
|
595
|
.setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency,elec_pump_current_b) " +
|
550
|
596
|
"select '${body[JH]}','${header.date}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}','${body[DYM]}' " +
|
|
@@ -572,8 +618,25 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
572
|
618
|
.log("${header.date}"+" routeId:oracle-5-> centralbase.cb_pc_pro_wellbore_status_daily update date faild")
|
573
|
619
|
.end();
|
574
|
620
|
|
|
621
|
+
|
|
622
|
+ from("timer:mytimerDelete?period=3600000")
|
|
623
|
+ .routeId("oracle-Delete")
|
|
624
|
+ .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
|
|
625
|
+ .to("jdbc:centralbase")
|
|
626
|
+ .split(body()).process(exchange -> {
|
|
627
|
+ Message in = exchange.getIn();
|
|
628
|
+ in.setHeader("date", getDate()+ " 00:00:00");
|
|
629
|
+ })
|
|
630
|
+ .doTry()
|
|
631
|
+ .setBody(simple("delete from centralbase.cb_temp_well_mech_runtime where prod_date > '${header.date}'::TIMESTAMP + '-1 day' and sgt ='' "))
|
|
632
|
+ .to("jdbc:centralbase")
|
|
633
|
+ .doCatch(Exception.class)
|
|
634
|
+ .log("${header.date}"+" routeId:oracle-Delete-> centralbase.cb_temp_well_mech_runtime Delete date faild")
|
|
635
|
+ .end();
|
|
636
|
+
|
|
637
|
+
|
575
|
638
|
//计量间产液量
|
576
|
|
- from("timer:mytimer6?period=3600000")
|
|
639
|
+ from("timer:mytimer6?period=3600000")
|
577
|
640
|
.routeId("oracle-6")
|
578
|
641
|
.setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
|
579
|
642
|
.to("jdbc:centralbase")
|
|
@@ -590,9 +653,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
590
|
653
|
in.setHeader("wellname",body.getWell_id());
|
591
|
654
|
|
592
|
655
|
})
|
593
|
|
-// .log("${in.header.date}")
|
594
|
|
-// .log("${in.header.jljcyl}")
|
595
|
|
-// .log("${in.header.wellname}")
|
596
|
656
|
.doTry()
|
597
|
657
|
.setBody(simple("insert into centralbase.cb_pc_pro_well_meter_team(well_id,start_prod_date,test_liqu_vol_daily) " +
|
598
|
658
|
" select '${in.header.wellname}','${in.header.date}','${in.header.jljcyl}'" +
|
|
@@ -639,55 +699,40 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
639
|
699
|
Message in = exchange.getIn();
|
640
|
700
|
in.setHeader("date", getDate());
|
641
|
701
|
})
|
642
|
|
- .setBody(simple("select wellname , jbsj , jbyy, jbzq from JBSJ where SYDATE >= date'${header.date}'"))
|
643
|
|
- .to("jdbc:oracle")
|
|
702
|
+ .setBody(simple("SELECT well_id wellid FROM centralbase.sys_access_well_control where access_status = '1' and error_id = '1' "))
|
|
703
|
+ .to("jdbc:centralbase")
|
644
|
704
|
.split(body()).process(exchange -> {
|
645
|
|
- Message in = exchange.getIn();
|
646
|
|
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
647
|
|
- if (aRow.get("JBSJ") != null)
|
648
|
|
- aRow.put("JBSJ",aRow.get("JBSJ").toString().substring(0,10));
|
649
|
|
- })
|
|
705
|
+ Message in = exchange.getIn();
|
|
706
|
+ HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
|
707
|
+// aRow.put("JH", aRow.get("wellid").toString());
|
|
708
|
+ Map<String, Object> map = getJbzq(getDate(), aRow.get("wellid").toString());
|
|
709
|
+ if (map !=null ){
|
|
710
|
+ aRow.put("JH",map.get("JH"));
|
|
711
|
+ aRow.put("SCJBRQ",map.get("SCJBRQ"));
|
|
712
|
+ aRow.put("JBYY", map.get("JBYY"));
|
|
713
|
+ aRow.put("ZQ",!map.get("ZQ").equals("") && map.get("ZQ") !=null ? map.get("ZQ") :"只检泵一次");
|
|
714
|
+
|
|
715
|
+ }
|
|
716
|
+ })
|
|
717
|
+// .setBody(simple("select wellname , jbsj , jbyy, jbzq from JBSJ where SYDATE >= date'${header.date}'"))
|
|
718
|
+// .setBody(simple(" Select jh,scjbrq,jbyy From ( Select t.jh,t.wgrq scjbrq,t.csmc jbyy, row_number() Over(Partition By t.jh Order By t.wgrq Desc) rn FROM Ddcc03_v t Where t.jh= '${header.JH}' And t.wgrq<=date'${header.date}') Where rn < 3"))
|
|
719
|
+//
|
|
720
|
+// .to("jdbc:oracle_A2")
|
|
721
|
+// .split(body()).process(exchange -> {
|
|
722
|
+// Message in = exchange.getIn();
|
|
723
|
+// HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
|
724
|
+// if (aRow.get("JBSJ") != null)
|
|
725
|
+// aRow.put("JBSJ",aRow.get("JBSJ").toString().substring(0,10));
|
|
726
|
+// })
|
650
|
727
|
.doTry()
|
651
|
728
|
.setBody(simple("insert into centralbase.cb_cd_checkpump_source(well_common_name,check_pump_date,reason,period,sys_date) " +
|
652
|
|
- "select '${body[WELLNAME]}','${body[JBSJ]}','${body[JBYY]}','${body[JBZQ]}','${header.date}' " +
|
|
729
|
+ "select '${body[JH]}','${body[SCJBRQ]}','${body[JBYY]}','${body[ZQ]}','${header.date}' " +
|
653
|
730
|
"where NOT EXISTS(select * from centralbase.cb_cd_checkpump_source where well_common_name = '${body[WELLNAME]}' and sys_date = '${header.date}')"))
|
654
|
731
|
.to("jdbc:centralbase")
|
655
|
732
|
.doCatch(Exception.class)
|
656
|
733
|
.log("${header.date}"+" routeId:oracle-8-> centralbase.cb_pc_pro_wellbore_status_daily update date faild")
|
657
|
734
|
.end();
|
658
|
735
|
|
659
|
|
- //稀稠油信息
|
660
|
|
- /* from("timer:mytimer9?period=3600000")
|
661
|
|
- .routeId("oracle-9")
|
662
|
|
- .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
|
663
|
|
- .to("jdbc:centralbase")
|
664
|
|
- .split(body()).process(exchange -> {
|
665
|
|
- Message in = exchange.getIn();
|
666
|
|
- in.setHeader("date", getDate());
|
667
|
|
- })
|
668
|
|
- .setBody(simple("select well_common_name,QK,CW,A_RCYOU,A_RCYE,RCYOU,RCYE,HS,LRQ,CZ_YOU,CZ_YE,CMD,SCXJRQ,YCBZ,UP_DATE from V_Cyyc_YcjData where UP_DATE >= to_date('${header.date}','yyyy-MM-dd')"))
|
669
|
|
- .to("jdbc:oracle")
|
670
|
|
- .split(body()).process(exchange -> {
|
671
|
|
- Message in = exchange.getIn();
|
672
|
|
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
673
|
|
- if (aRow.get("LRQ") != null)
|
674
|
|
- aRow.put("LRQ",aRow.get("LRQ").toString().substring(0,10));
|
675
|
|
- if (aRow.get("SCXJRQ") != null)
|
676
|
|
- aRow.put("SCXJRQ",aRow.get("SCXJRQ").toString().substring(0,10));
|
677
|
|
- if (aRow.get("UP_DATE") != null)
|
678
|
|
- aRow.put("UP_DATE",aRow.get("UP_DATE").toString().substring(0,10));
|
679
|
|
- })
|
680
|
|
- .setBody(simple("insert into centralbase.cb_pc_abnl_model_dgn_thin_heavy(WELL_COMMON_NAME,QK,CW,A_RCYOU,A_RCYE,RCYOU,RCYE,HS,LRQ,CZ_YOU,CZ_YE,CMD,SCXJRQ,YCBZ,UP_DATE) " +
|
681
|
|
- "select '${body[WELL_COMMON_NAME]}','${body[QK]}','${body[CW]}','${body[A_RCYOU]}','${body[A_RCYE]}','${body[RCYOU]}','${body[RCYE]}','${body[HS]}','${body[LRQ]}'," +
|
682
|
|
- " '${body[CZ_YOU]}','${body[CZ_YE]}','${body[CMD]}','${body[SCXJRQ]}','${body[YCBZ]}','${body[UP_DATE]}' " +
|
683
|
|
- "where NOT EXISTS(select * from centralbase.cb_pc_abnl_model_dgn_thin_heavy where well_common_name = '${body[WELL_COMMON_NAME]}' and UP_DATE = '${body[UP_DATE]}')"))
|
684
|
|
- .doTry()
|
685
|
|
- .to("jdbc:centralbase")
|
686
|
|
- .doCatch(Exception.class)
|
687
|
|
- .log("${header.date}"+" routeId:oracle-9-> centralbase.cb_pc_pro_wellbore_status_daily update date faild")
|
688
|
|
- .end();*/
|
689
|
|
-
|
690
|
|
-
|
691
|
736
|
from("timer:mytimer10?period=3600000")
|
692
|
737
|
.routeId("centralbase-3")
|
693
|
738
|
.setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
|
|
@@ -736,13 +781,45 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
|
736
|
781
|
Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString());
|
737
|
782
|
String sgt = aRow.get("sgt").toString();
|
738
|
783
|
DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
|
739
|
|
- System.out.println(diagnoseMsg);
|
740
|
784
|
producer.send((MessageBody) diagnoseMsg);
|
741
|
785
|
})
|
742
|
786
|
.doCatch(Exception.class)
|
743
|
787
|
.log("${header.date}"+" rocketMQ send data faild")
|
744
|
788
|
.end();
|
745
|
789
|
|
|
790
|
+ //稀稠油信息
|
|
791
|
+ from("timer:mytimer9?period=3600000")
|
|
792
|
+ .routeId("oracle-9")
|
|
793
|
+ .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
|
|
794
|
+ .to("jdbc:centralbase")
|
|
795
|
+ .split(body()).process(exchange -> {
|
|
796
|
+ Message in = exchange.getIn();
|
|
797
|
+ in.setHeader("date", getDate());
|
|
798
|
+ })
|
|
799
|
+ .setBody(simple("select well_common_name,QK,CW,A_RCYOU,A_RCYE,RCYOU,RCYE,HS,LRQ,CZ_YOU,CZ_YE,CMD,SCXJRQ,YCBZ,UP_DATE from V_Cyyc_YcjData where UP_DATE >= to_date('${header.date}','yyyy-MM-dd')"))
|
|
800
|
+ .to("jdbc:oracle")
|
|
801
|
+ .split(body()).process(exchange -> {
|
|
802
|
+ Message in = exchange.getIn();
|
|
803
|
+ HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
|
804
|
+ if (aRow.get("LRQ") != null)
|
|
805
|
+ aRow.put("LRQ",aRow.get("LRQ").toString().substring(0,10));
|
|
806
|
+ if (aRow.get("SCXJRQ") != null)
|
|
807
|
+ aRow.put("SCXJRQ",aRow.get("SCXJRQ").toString().substring(0,10));
|
|
808
|
+ if (aRow.get("UP_DATE") != null)
|
|
809
|
+ aRow.put("UP_DATE",aRow.get("UP_DATE").toString().substring(0,10));
|
|
810
|
+ })
|
|
811
|
+ .setBody(simple("insert into centralbase.cb_pc_abnl_model_dgn_thin_heavy(WELL_COMMON_NAME,QK,CW,A_RCYOU,A_RCYE,RCYOU,RCYE,HS,LRQ,CZ_YOU,CZ_YE,CMD,SCXJRQ,YCBZ,UP_DATE) " +
|
|
812
|
+ "select '${body[WELL_COMMON_NAME]}','${body[QK]}','${body[CW]}','${body[A_RCYOU]}','${body[A_RCYE]}','${body[RCYOU]}','${body[RCYE]}','${body[HS]}','${body[LRQ]}'," +
|
|
813
|
+ " '${body[CZ_YOU]}','${body[CZ_YE]}','${body[CMD]}','${body[SCXJRQ]}','${body[YCBZ]}','${body[UP_DATE]}' " +
|
|
814
|
+ "where NOT EXISTS(select * from centralbase.cb_pc_abnl_model_dgn_thin_heavy where well_common_name = '${body[WELL_COMMON_NAME]}' and UP_DATE = '${body[UP_DATE]}')"))
|
|
815
|
+ .doTry()
|
|
816
|
+ .to("jdbc:centralbase")
|
|
817
|
+ .doCatch(Exception.class)
|
|
818
|
+ .log("${header.date}"+" routeId:oracle-9-> centralbase.cb_pc_pro_wellbore_status_daily update date faild")
|
|
819
|
+ .end();
|
|
820
|
+
|
|
821
|
+
|
|
822
|
+
|
746
|
823
|
|
747
|
824
|
//手动导入井信息
|
748
|
825
|
|