Browse Source

增加statusDaily想关接口

gxt 3 years ago
parent
commit
f44a1b063e

+ 2 - 0
src/main/java/com/gct/tools/etlcamelhuge/EtlCamelHugeApplication.java

@@ -3,10 +3,12 @@ package com.gct.tools.etlcamelhuge;
3 3
 
4 4
 import org.springframework.boot.SpringApplication;
5 5
 import org.springframework.boot.autoconfigure.SpringBootApplication;
6
+import org.springframework.scheduling.annotation.EnableAsync;
6 7
 import springfox.documentation.swagger2.annotations.EnableSwagger2;
7 8
 
8 9
 @SpringBootApplication
9 10
 @EnableSwagger2
11
+@EnableAsync
10 12
 public class EtlCamelHugeApplication {
11 13
     public static void main(String[] args) {
12 14
         SpringApplication.run(EtlCamelHugeApplication.class, args);

+ 50 - 0
src/main/java/com/gct/tools/etlcamelhuge/camelconfig/AsyncConfiguration.java

@@ -0,0 +1,50 @@
1
+package com.gct.tools.etlcamelhuge.camelconfig;
2
+
3
+import org.slf4j.Logger;
4
+import org.slf4j.LoggerFactory;
5
+import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
6
+import org.springframework.context.annotation.Bean;
7
+import org.springframework.context.annotation.Configuration;
8
+import org.springframework.core.task.AsyncTaskExecutor;
9
+import org.springframework.scheduling.annotation.AsyncConfigurer;
10
+import org.springframework.scheduling.annotation.EnableAsync;
11
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
12
+
13
+import java.lang.reflect.Method;
14
+
15
+/**
16
+ * class name: AsyncConfiguration
17
+ *
18
+ * @author lloyd
19
+ * @version 1.0
20
+ * @since 2021/6/25 下午1:45
21
+ */
22
+@Configuration
23
+@EnableAsync
24
+public class AsyncConfiguration implements AsyncConfigurer {
25
+
26
+    @Bean
27
+    public AsyncTaskExecutor asyncTaskExecutor() {
28
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
29
+        executor.setCorePoolSize(8);
30
+        executor.setMaxPoolSize(8);
31
+        return executor;
32
+    }
33
+    @Override
34
+    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){
35
+        return new LogAsyncUncaughtExceptionHandler();
36
+    }
37
+    class LogAsyncUncaughtExceptionHandler implements  AsyncUncaughtExceptionHandler{
38
+
39
+        @Override
40
+        public void handleUncaughtException(Throwable ex, Method method, Object... params) {
41
+            String paramLog="";
42
+            int i=0;
43
+            for (Object param : params) {
44
+                paramLog+="param:"+i+"=>"+param.toString()+" ";
45
+            }
46
+            Logger log = LoggerFactory.getLogger(this.getClass());
47
+            log.error("Method:{} run error,Exception Message:{},StackTrace:{},input params:{}",method.getName(),ex.getMessage(),ex.getStackTrace(),paramLog);
48
+        }
49
+    }
50
+}

+ 118 - 0
src/main/java/com/gct/tools/etlcamelhuge/controller/BaseDataController.java

@@ -0,0 +1,118 @@
1
+package com.gct.tools.etlcamelhuge.controller;
2
+
3
+import com.alibaba.fastjson.JSONObject;
4
+import com.gct.tools.etlcamelhuge.entity.BaseDataBody;
5
+import io.swagger.annotations.Api;
6
+import io.swagger.annotations.ApiOperation;
7
+import org.omg.CORBA.IRObject;
8
+import org.springframework.jdbc.core.JdbcTemplate;
9
+import org.springframework.web.bind.annotation.PostMapping;
10
+import org.springframework.web.bind.annotation.RequestBody;
11
+import org.springframework.web.bind.annotation.RequestMapping;
12
+import org.springframework.web.bind.annotation.RestController;
13
+
14
+import javax.annotation.Resource;
15
+import javax.sql.DataSource;
16
+import java.math.BigDecimal;
17
+import java.util.List;
18
+import java.util.Map;
19
+
20
+/**
21
+ * class name: BaseDataController
22
+ *
23
+ * @author gxt
24
+ * @version 1.0
25
+ * @since 2021/8/30 下午3:51 周一
26
+ */
27
+@RestController
28
+@RequestMapping("/BaseData")
29
+@Api(value = "BaseDataController",description = "基础数据操作controller")
30
+public class BaseDataController {
31
+
32
+    private JdbcTemplate jdbcTemplate;
33
+    @Resource(name = "oracle")
34
+    DataSource oracleDataSource;
35
+
36
+    @Resource(name = "centralbase")
37
+    DataSource baseDataSource;
38
+
39
+    @PostMapping("/StatusDaily")
40
+    @ApiOperation("从 A2 获取 数据插入到 centrlBase 中")
41
+    public JSONObject saveDataToStatusDaily(@RequestBody BaseDataBody baseDataBody){
42
+        JSONObject jsonObject = new JSONObject();
43
+        long insertCount = 0;
44
+        try {
45
+            jdbcTemplate = new JdbcTemplate(oracleDataSource);
46
+            String date = baseDataBody.getDate();
47
+            String sql = "select  distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from DBA01 where rq  = to_date('"+date+"','yyyy-MM-dd') and qyrq is not null ";
48
+            List<Map<String, Object>> list = jdbcTemplate.queryForList(sql);
49
+            for (Map<String, Object> map : list) {
50
+                if (map.get("YZ") == null) map.put("YZ", "0.0");
51
+                if (map.get("HYSX") == null) map.put("HYSX", "0.0");
52
+                if (map.get("YYSX") == null) map.put("YYSX", "0.0");
53
+                if (map.get("TYSX") == null) map.put("TYSX", "0.0");
54
+                if (map.get("BS") == null) map.put("BS", "0.0");
55
+                sql = "insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth) " +
56
+                        "value (?,?,?,?,?,?,?,?)";
57
+                int count = insertBaseDataSourceOfStatusDaily(sql, map);
58
+                List<Map<String, Object>> dymIsNotNUllList = selectA2DataOfNotIsNullDYM();
59
+                for (Map<String, Object> stringObjectMap : dymIsNotNUllList) {
60
+                    sql = "update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '"+stringObjectMap.get("DYM")+"' where well_id = '"+stringObjectMap.get("JH")+"' and prod_date::date  = '"+stringObjectMap.get("RQ")+"' ";
61
+                    int dymIsNotNullCount = updateBaseDataSourceOfStatusDaily(sql);
62
+                    insertCount = insertCount + dymIsNotNullCount;
63
+                }
64
+                sql = "select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '"+map.get("RQ")+"' ";
65
+                List<Map<String, Object>> CMDDataList = selectBaseDataSourceStatusDaily(sql);
66
+                for (Map<String, Object> stringObjectMap : CMDDataList) {
67
+                    stringObjectMap.put("submergence_depth",null);
68
+                    if (stringObjectMap.get("start_pump_liq_level")!=null && stringObjectMap.get("pump_depth")!=null){
69
+                        double cmd= Double.valueOf(stringObjectMap.get("pump_depth").toString())-Double.valueOf(stringObjectMap.get("start_pump_liq_level").toString())/10;
70
+                        BigDecimal bd=new BigDecimal(cmd);
71
+                        double cmd1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
72
+                        stringObjectMap.put("submergence_depth",cmd1);
73
+                    }
74
+                    sql = " update centralbase.cb_pc_pro_wellbore_status_daily set submergence_depth = '"+stringObjectMap.get("submergence_depth")+"' where well_id = '"+stringObjectMap.get("well_id")+"' and prod_date  = '"+stringObjectMap.get("prod_date")+"' ";
75
+                    int CMDUpdateCount = updateBaseDataSourceOfStatusDaily(sql);
76
+                    insertCount = insertCount + CMDUpdateCount;
77
+                }
78
+                sql = " select distinct rn.well_id,cb.prod_date,rn.pump_diameter  from centralbase.cb_temp_well_mech_runtime rn ,centralbase.cb_pc_pro_wellbore_status_daily cb where cb.well_id = rn.well_id " +
79
+                        " and cb.prod_date = '"+date+"'  ";
80
+                List<Map<String, Object>> oilNozzleList = selectBaseDataSourceStatusDaily(sql);
81
+                for (Map<String, Object> stringObjectMap : oilNozzleList) {
82
+                    sql = " update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '"+stringObjectMap.get("pump_diameter")+"' where well_id ='"+stringObjectMap.get("well_id")+"' and prod_date='"+stringObjectMap.get("prod_date")+"' ";
83
+                    int oliNozzle = updateBaseDataSourceOfStatusDaily(sql);
84
+                    insertCount = insertCount + oliNozzle;
85
+                }
86
+            }
87
+        }catch (Exception e){
88
+            e.printStackTrace();
89
+            jsonObject.put("error",e.getMessage());
90
+        }finally {
91
+            jsonObject.put("插入条数",insertCount);
92
+        }
93
+        return jsonObject;
94
+    }
95
+
96
+
97
+    public int insertBaseDataSourceOfStatusDaily(String sql,Map<String,Object> map){
98
+        jdbcTemplate = new JdbcTemplate(baseDataSource);
99
+        if (map.isEmpty()) return 0;
100
+        return  jdbcTemplate.update(sql, map.get("JH"), map.get("RQ"), map.get("CYFS"), map.get("YZ"), map.get("HYSX"), map.get("YYSX"), map.get("BS"), map.get("DYM"));
101
+    }
102
+
103
+    public List<Map<String, Object>> selectA2DataOfNotIsNullDYM(){
104
+        jdbcTemplate = new JdbcTemplate(oracleDataSource);
105
+        String sql = "SELECT  distinct jh,max(rq),dym FROM DBA01 WHERE dym is not null group by jh,dym";
106
+        return  jdbcTemplate.queryForList(sql);
107
+    }
108
+
109
+    public List<Map<String, Object>> selectBaseDataSourceStatusDaily(String sql){
110
+        jdbcTemplate = new JdbcTemplate(baseDataSource);
111
+         return  jdbcTemplate.queryForList(sql);
112
+    }
113
+    public int updateBaseDataSourceOfStatusDaily(String sql){
114
+        jdbcTemplate = new JdbcTemplate(baseDataSource);
115
+        return  jdbcTemplate.update(sql);
116
+    }
117
+
118
+}

+ 16 - 13
src/main/java/com/gct/tools/etlcamelhuge/controller/GtController.java

@@ -30,7 +30,7 @@ import java.util.stream.Collectors;
30 30
  * @since 2021/8/26 下午2:50 周四
31 31
  */
32 32
 @RestController
33
-@RequestMapping("/")
33
+@RequestMapping("/GTData")
34 34
 @Api(value = "GtController",description = "功图数据操作controller")
35 35
 public class GtController {
36 36
 
@@ -38,6 +38,9 @@ public class GtController {
38 38
     @Resource(name = "centralbase")
39 39
     DataSource dataSource;
40 40
 
41
+    @Resource(name = "gtsj")
42
+    DataSource dataSourceOfGTSJ;
43
+
41 44
     @Resource(name = "diagnoseMessageProducer")
42 45
     private MessageProducer producer;
43 46
 
@@ -49,7 +52,7 @@ public class GtController {
49 52
         List<String> wellList = gtBody.getWellList().stream().map(x->x.toString()).collect(Collectors.toList());
50 53
         JSONObject jsonObject = new JSONObject();
51 54
         jdbcTemplate = new JdbcTemplate(dataSource);
52
-        int sumData = 0;
55
+        long sumData = 0;
53 56
         try {
54 57
                 if (wellList.isEmpty() || wellList.size()==0 ){
55 58
                     String sql = String.format("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so " +
@@ -71,7 +74,7 @@ public class GtController {
71 74
         return  jsonObject;
72 75
     }
73 76
 
74
-    public int sendDataToMQ(String sql){
77
+    public long sendDataToMQ(String sql){
75 78
         int sumData = 0;
76 79
         List<Map<String, Object>> list = jdbcTemplate.queryForList(sql);
77 80
         for (Map<String, Object> map : list) {
@@ -92,15 +95,15 @@ public class GtController {
92 95
         return sumData;
93 96
     }
94 97
 
95
-   /* @GetMapping("/getGTSJ")
98
+    @GetMapping("/getGTSJ")
96 99
     @ApiOperation(value = "从实时的机采数据生产的表中获取数据放入到 Runtime 表中")
97 100
     public JSONObject getGTSJ(@RequestBody GTBody gtBody){
98 101
 
99 102
         String startDate = gtBody.getStartDate();
100 103
         String endDate = gtBody.getEndDate();
101
-        List<String> wellList = gtBody.getWellList();
104
+        JSONArray wellList = gtBody.getWellList();
102 105
         JSONObject jsonObject = new JSONObject();
103
-        jdbcTemplate = new JdbcTemplate(dataSource);
106
+        jdbcTemplate = new JdbcTemplate(dataSourceOfGTSJ);
104 107
         int curPage = 0;
105 108
         int pageSize = 5000;
106 109
         int sumData = 0;
@@ -117,13 +120,13 @@ public class GtController {
117 120
                         Double susp_max_load = max(disp_loads);
118 121
                         Double susp_min_load = min(disp_loads);
119 122
                         String sgt = "";
120
-                        for (int i = 0; i < displacements.length; i++) {
121
-                            sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
123
+                        for (int y = 0; y < displacements.length; y++) {
124
+                            sgt = sgt + displacements[y] + "," + disp_loads[y] + ",";
122 125
                         }
123 126
                         String[] s = sgt.split(",");
124 127
                         String w = "";
125
-                        for (int i = 0; i < s.length; i++) {
126
-                            w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
128
+                        for (int x = 0; x < s.length; x++) {
129
+                            w += new BigDecimal(Math.round(Double.parseDouble(s[x]) * 100)).stripTrailingZeros().toPlainString() + ",";
127 130
                         }
128 131
                         Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
129 132
                         map.put("sgt", SGTUtil.encodeToString(doubles));
@@ -148,11 +151,11 @@ public class GtController {
148 151
                     jdbcTemplate.update("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
149 152
                             "value (?,?,?,?,?,?,?)",map.get("well_name").toString(),map.get("dyna_create_time"),map.get("stroke"),map.get("frequency"),map.get("susp_max_load"),map.get("susp_min_load"),map.get("sgt"));
150 153
                 }
151
-               *//* sumData += list.size();
154
+                sumData += list.size();
152 155
                 if (list.size()< pageSize){
153 156
                     break;
154 157
                 }
155
-                curPage ++ ;*//*
158
+                curPage ++ ;
156 159
             }
157 160
         }catch (Exception e){
158 161
             jsonObject.put("error",e.getMessage());
@@ -160,7 +163,7 @@ public class GtController {
160 163
             jsonObject.put("sumData",sumData);
161 164
         }
162 165
         return    jsonObject;
163
-         }*/
166
+         }
164 167
 
165 168
 
166 169
     public Double min(String[] strings){

+ 25 - 0
src/main/java/com/gct/tools/etlcamelhuge/entity/BaseDataBody.java

@@ -0,0 +1,25 @@
1
+package com.gct.tools.etlcamelhuge.entity;
2
+
3
+import com.alibaba.fastjson.JSONArray;
4
+import io.swagger.annotations.ApiModel;
5
+import io.swagger.annotations.ApiModelProperty;
6
+import lombok.Data;
7
+
8
+/**
9
+ * class name: GTBody
10
+ *
11
+ * @author gxt
12
+ * @version 1.0
13
+ * @since 2021/8/27 下午2:34 周五
14
+ */
15
+@Data
16
+@ApiModel(value = "BaseDataBody", description = "")
17
+public class BaseDataBody {
18
+
19
+    @ApiModelProperty(value = "时间")
20
+    public String date;
21
+    /*@ApiModelProperty(value = "结束时间")
22
+    public String endDate;*/
23
+    @ApiModelProperty(value = "井号集合")
24
+    public JSONArray wellList;
25
+}

+ 220 - 0
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCCofRealTimeConfiguration.java

@@ -0,0 +1,220 @@
1
+package com.gct.tools.etlcamelhuge.routeconfig;
2
+
3
+import com.gct.common.util.SGTUtil;
4
+import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
5
+import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
6
+import lombok.Data;
7
+import org.apache.camel.Message;
8
+import org.apache.camel.builder.RouteBuilder;
9
+import org.springframework.context.annotation.Bean;
10
+import org.springframework.context.annotation.Configuration;
11
+import org.springframework.scheduling.annotation.Async;
12
+
13
+import javax.annotation.Resource;
14
+import java.math.BigDecimal;
15
+import java.time.LocalDateTime;
16
+import java.time.format.DateTimeFormatter;
17
+import java.util.Arrays;
18
+import java.util.HashMap;
19
+import java.util.Map;
20
+import java.util.SortedSet;
21
+
22
+/**
23
+ * class name: CamelJDBCCofRealTimeConfiguration.java
24
+ *  实时导数据类
25
+ * @author lloyd
26
+ * @version 1.0
27
+ * @since 2021/4/14 下午3:16
28
+ */
29
+@Configuration
30
+public class CamelJDBCCofRealTimeConfiguration  {
31
+
32
+    private static long sendMsgRunTime=0;
33
+
34
+    public Double min(String[] strings){
35
+        double[] doubles = new double[strings.length];
36
+        for (int i = 0; i < strings.length; i++) {
37
+            doubles[i] = Double.parseDouble(strings[i]);
38
+        }
39
+        return Arrays.stream(doubles).min().getAsDouble();
40
+    }
41
+    //获取最大载荷
42
+    public Double max(String[] strings){
43
+        double[] doubles = new double[strings.length];
44
+        for (int i = 0; i < strings.length; i++) {
45
+            doubles[i] = Double.parseDouble(strings[i]);
46
+        }
47
+        return Arrays.stream(doubles).max().getAsDouble();
48
+    }
49
+
50
+
51
+    public String getDate(){
52
+        return  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
53
+    }
54
+
55
+
56
+    /***
57
+     * 异步发送 RunTime 消息到MQ中
58
+     * */
59
+    @Async
60
+    public long sendDataToRocketMQ(String wellName,String wellId,String prodDate,String stroke_length ,String stroke_frequency,String sgt){
61
+        String orgId = "0";
62
+        prodDate = prodDate.substring(0,19);
63
+        Double strokeLength = Double.valueOf(stroke_length);
64
+        Double strokeFrequency = Double.valueOf(stroke_frequency);
65
+        if (sgt == null || sgt.length() ==0){
66
+            sgt = "0,0";
67
+        }
68
+        DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
69
+        sendMsgRunTime++;
70
+        producer.send(diagnoseMsg);
71
+        return sendMsgRunTime;
72
+    }
73
+
74
+    @Resource(name = "diagnoseMessageProducer")
75
+    private MessageProducer producer;
76
+    @Bean
77
+    public RouteBuilder routeBuilderWithRealTime() {
78
+        return new RouteBuilder() {
79
+            @Override
80
+            public void configure() throws Exception {
81
+                from("timer:mytimer-insert-mechDaily?period=3600000")
82
+                        .routeId("insert-mech_daily")
83
+                        .setHeader("date", constant(getDate()))
84
+                        .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null  "))
85
+                        .to("jdbc:oracle")
86
+                        .split(body()).process(exchange -> {
87
+                    Message in = exchange.getIn();
88
+                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
89
+                    if (aRow.get("JY") == null) aRow.put("JY", "0.0");
90
+                    if (aRow.get("LY") == null) aRow.put("LY", "0.0");
91
+                    if (aRow.get("BJ") == null) aRow.put("BJ", "0.0");
92
+                    if (aRow.get("BS") == null) aRow.put("BS", "0.0");
93
+                    if (aRow.get("BX") == null) aRow.put("BX", "0.0");
94
+                    if (aRow.get("ZS") == null) aRow.put("ZS", "0.0");
95
+                    if (aRow.get("CC") == null) aRow.put("CC", "0.0");
96
+                    if (aRow.get("CS") == null) aRow.put("CS", "0.0");
97
+                    if (aRow.get("BLX") == null) aRow.put("BLX", "");
98
+                    if (aRow.get("DL") == null) aRow.put("DL", "0.0");
99
+                })
100
+                        .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency) " +
101
+                                "select '${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}' " +
102
+                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_temp_well_mech_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
103
+                        .doTry()
104
+                        .to("jdbc:centralbase")
105
+                        .doCatch(Exception.class)
106
+                        .log("${header.date}"+" routeId:insert-mech_daily ->  centralbase.cb_temp_well_mech_daily insert data failed")
107
+                        .end();
108
+
109
+                from("timer:mytimer-update-runTime?period=3600000")
110
+                        .routeId("update-runTime")
111
+                        .setHeader("date", constant(getDate() + " 00:00:00"))
112
+                        .setBody(simple("SELECT  distinct jh,max(rq),bj FROM DBA01 WHERE dym is not null group by jh,bj"))
113
+                        .to("jdbc:oracle")
114
+                        .split(body()).process(exchange -> {
115
+                            Message in = exchange.getIn();
116
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
117
+                            if (aRow.get("BJ") ==null || aRow.get("BJ").equals("")) aRow.put("BJ","0.0");
118
+                        })
119
+                        .setBody(simple("update centralbase.cb_temp_well_mech_runtime set pump_diameter = '${body[BJ]}' where well_id = '${body[JH]}' "))
120
+                        .doTry()
121
+                        .to("jdbc:centralbase")
122
+                        .doCatch(Exception.class)
123
+                        .log("${header.date}"+" routeId:update-runTime -> centralbase.cb_temp_well_mech_runtime update data failed")
124
+                        .end();
125
+
126
+                from("timer:mytimer-insert-runtimeAndSendToMQ?period=3600000")
127
+                        .routeId("insert-runtimeAndSendToMQ")
128
+                        .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
129
+                        .to("jdbc:centralbase")
130
+                        .split(body())
131
+                        .setHeader("date", simple("${body[max]}"))
132
+                        .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '${header.date}' "))
133
+                        .doTry()
134
+                        .to("jdbc:gtsj")
135
+                        .doCatch(Exception.class)
136
+                        .log("${deader.date}" + " routeId:insert-runtimeAndSendToMQ -> select runTime data failed")
137
+                        .process(exchange -> {
138
+                            sendMsgRunTime = 0;
139
+                        })
140
+                        .split(body()).process(exchange -> {
141
+                    Message in = exchange.getIn();
142
+                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
143
+                    String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
144
+                    aRow.put("dyna_create_time", prod_date);
145
+                    if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
146
+                        String[] displacements = aRow.get("displacement").toString().split(";");//10 四舍五入
147
+                        String[] disp_loads = aRow.get("disp_load").toString().split(";");
148
+                        Double susp_max_load = max(disp_loads);
149
+                        Double susp_min_load = min(disp_loads);
150
+                        String sgt = "";
151
+                        for (int i = 0; i < displacements.length; i++) {
152
+                            sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
153
+                        }
154
+                        String[] s = sgt.split(",");
155
+                        String w = "";
156
+                        for (int i = 0; i < s.length; i++) {
157
+                            w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
158
+                        }
159
+                        Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
160
+                        aRow.put("sgt", SGTUtil.encodeToString(doubles));
161
+                        aRow.put("susp_max_load",susp_max_load);
162
+                        aRow.put("susp_min_load",susp_min_load);
163
+                    }
164
+                    if (aRow.get("stroke") == null) aRow.put("stroke", "0.0");
165
+                    if (aRow.get("frequency") == null) aRow.put("frequency", "0.0");
166
+                    if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load", "0.0");
167
+                    if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load", "0.0");
168
+                    if (aRow.get("frequency") != null){
169
+                        BigDecimal bd=new BigDecimal(aRow.get("frequency").toString());
170
+                        double frequency=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
171
+                        aRow.put("frequency",frequency);
172
+                    }
173
+                    if (aRow.get("stroke") != null){
174
+                        double stroke1 = Double.parseDouble(aRow.get("stroke").toString());
175
+                        BigDecimal bd=new BigDecimal(stroke1);
176
+                        double stroke=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
177
+                        aRow.put("stroke",stroke);
178
+                    }
179
+                            long count = sendDataToRocketMQ(aRow.get("well_name").toString(), aRow.get("well_name").toString(), aRow.get("dyna_create_time").toString(), aRow.get("stroke").toString(), aRow.get("frequency").toString(), aRow.get("sgt").toString());
180
+                            System.out.println("消息发送时间为:   "+LocalDateTime.now()+"   发送的消息数量为: "+count + "   数据时间为:" + aRow.get("dyna_create_time"));
181
+
182
+                        })
183
+                        .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
184
+                                "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
185
+                                "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${body[dyna_create_time]}' )"))
186
+                        .doTry()
187
+                        .to("jdbc:centralbase")
188
+                        .doCatch(Exception.class)
189
+                        .log("${header.date}"+" routeId:insert-runtimeAndSendToMQ ->  centralbase.cb_temp_well_mech_runtime insert data failed ${body}")
190
+                        .end();
191
+
192
+               from("timer:mytimer-update-avg-mech_daily?period=3600000")
193
+                        .routeId("update-avg-mech_daily")
194
+                        .setHeader("date", constant(getDate()))
195
+                        .setBody(simple("select well_id,avg(stroke_length) stroke_length  ,avg(stroke_frequency) stroke_frequency from centralbase.cb_temp_well_mech_runtime where prod_date::date='${header.date}' group by well_id"))
196
+                        .to("jdbc:centralbase")
197
+                        .split(body()).process(exchange -> {
198
+                    Message in = exchange.getIn();
199
+                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
200
+                    if (aRow.get("stroke_length")!=null && aRow.get("stroke_frequency")!=null){
201
+                        double stroke_length=Double.parseDouble(aRow.get("stroke_length").toString());
202
+                        double stroke_frequency=Double.parseDouble(aRow.get("stroke_frequency").toString());
203
+                        BigDecimal bd=new BigDecimal(stroke_length);
204
+                        double stroke_lengt1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
205
+                        BigDecimal bd1=new BigDecimal(stroke_frequency);
206
+                        double stroke_frequency1=bd1.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
207
+                        aRow.put("strokeLength",stroke_lengt1);
208
+                        aRow.put("strokeFrequency",stroke_frequency1);
209
+                    }
210
+                })
211
+                        .setBody(simple("update centralbase.cb_temp_well_mech_daily set stroke_length='${body[strokeLength]}' ,stroke_frequency ='${body[strokeFrequency]}' where well_id = '${body[well_id]}' and prod_date::date='${header.date}' "))
212
+                        .doTry()
213
+                        .to("jdbc:centralbase")
214
+                        .doCatch(Exception.class)
215
+                        .log("${header.date}"+" routeId:update-avg-mech_daily ->  centralbase.cb_temp_well_mech_daily update data failed")
216
+                        .end();
217
+            };
218
+        };
219
+    }
220
+}

+ 48 - 231
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired;
16 16
 import org.springframework.context.annotation.Bean;
17 17
 import org.springframework.context.annotation.Configuration;
18 18
 import org.springframework.jdbc.core.JdbcTemplate;
19
+import org.springframework.scheduling.annotation.Async;
19 20
 
20 21
 import javax.annotation.Resource;
21 22
 import javax.sql.DataSource;
@@ -41,37 +42,13 @@ class LogMessage{
41 42
  * @since 2021/4/14 下午3:16
42 43
  */
43 44
 @Configuration
44
-public class CamelJDBCConfiguration /*extends RouteBuilder */ {
45
-
46
-    public Double min(String[] strings){
47
-        double[] doubles = new double[strings.length];
48
-        for (int i = 0; i < strings.length; i++) {
49
-            doubles[i] = Double.parseDouble(strings[i]);
50
-        }
51
-        return Arrays.stream(doubles).min().getAsDouble();
52
-    }
53
-    //获取最大载荷
54
-    public Double max(String[] strings){
55
-        double[] doubles = new double[strings.length];
56
-        for (int i = 0; i < strings.length; i++) {
57
-            doubles[i] = Double.parseDouble(strings[i]);
58
-        }
59
-        return Arrays.stream(doubles).max().getAsDouble();
60
-    }
45
+public class CamelJDBCConfiguration {
61 46
 
62 47
 
63 48
     public String getDate(){
64 49
         return  LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
65 50
     }
66 51
 
67
-
68
-
69
-    private static Integer sendMsgRunTime=0;
70
-
71
-
72
-
73
-    @Resource(name = "diagnoseMessageProducer")
74
-    private MessageProducer producer;
75 52
     @Bean
76 53
     public RouteBuilder routeBuilderWithOracle1() {
77 54
         return new RouteBuilder() {
@@ -171,9 +148,8 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
171 148
                         .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization update data failed")
172 149
                         .end();*/
173 150
 
174
-
175
-                from("timer:mytimer2?period=3600000")
176
-                        .routeId("oracle-2")
151
+                from("timer:mytimer-insert-statusDaily?period=3600000")
152
+                        .routeId("insert-statusDaily")
177 153
                         .setHeader("date", constant(getDate()))
178 154
                         .setBody(simple("select  distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
179 155
                         .to("jdbc:oracle")
@@ -186,34 +162,32 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
186 162
                     if (aRow.get("TYSX") == null) aRow.put("TYSX", "0.0");
187 163
                     if (aRow.get("BS") == null) aRow.put("BS", "0.0");
188 164
                 })
189
-                        .doTry()
190 165
                         .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth) " +
191 166
                                 "select '${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}' " +
192 167
                                 "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_status_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}')"))
168
+                        .doTry()
193 169
                         .to("jdbc:centralbase")
194 170
                         .doCatch(Exception.class)
195
-                        .log("${header.date}"+" routeId:oracle-2->  centralbase.cb_pc_pro_wellbore_status_daily insert data failed")
171
+                        .log("${header.date}"+" routeId:insert-statusDaily ->  centralbase.cb_pc_pro_wellbore_status_daily insert data failed")
196 172
                         .end();
197 173
 
198
-                from("timer:mytimer5?period=3600000")
199
-                        .routeId("oracle-5")
174
+
175
+                from("timer:mytimer-update-statusDaily-DYM?period=3600000")
176
+                        .routeId("update-statusDaily-DYM")
200 177
                         .setHeader("date", constant(getDate() + " 00:00:00"))
201
-                        //三个月之内dym不为空的数据
202
-                        //.setBody(simple("SELECT distinct jh,rq,dym FROM DBA01 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
203 178
                         .setBody(simple("SELECT  distinct jh,max(rq),dym FROM DBA01 WHERE dym is not null group by jh,dym"))
204 179
                         .to("jdbc:oracle")
205 180
                         .split(body())
206
-                        .doTry()
207 181
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' and prod_date::date  = '${header.date}' "))
182
+                        .doTry()
208 183
                         .to("jdbc:centralbase")
209 184
                         .doCatch(Exception.class)
210
-                        .log("${header.date}"+" routeId:oracle-5->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
185
+                        .log("${header.date}"+" routeId:update-statusDaily-DYM ->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
211 186
                         .end();
212 187
 
213
-                from("timer:mytimer7?period=3600000")
214
-                        .routeId("oracle-7")
188
+                from("timer:mytimer-update-statusDaily-submergenceDepth?period=3600000")
189
+                        .routeId("update-statusDaily-submergenceDepth")
215 190
                         .setHeader("date", constant(getDate() + " 00:00:00"))
216
-                        //.setBody(simple("SELECT  distinct jh,max(rq),dym FROM DBA01 WHERE dym is not null group by jh,dym"))
217 191
                         .setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' "))
218 192
                         .to("jdbc:centralbase")
219 193
                         .split(body()).process(exchange -> {
@@ -227,15 +201,36 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
227 201
                                 aRow.put("submergence_depth",cmd1);
228 202
                             }
229 203
                 })
230
-                        .doTry()
231 204
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set submergence_depth = '${body[submergence_depth]}' where well_id = '${body[well_id]}' and prod_date  = '${body[prod_date]}'"))
205
+                        .doTry()
232 206
                         .to("jdbc:centralbase")
233 207
                         .doCatch(Exception.class)
234
-                        .log("${header.date}"+" routeId:oracle-7->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
208
+                        .log("${header.date}"+" routeId:update-statusDaily-submergenceDepth ->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
235 209
                         .end();
236 210
 
237
-                from("timer:mytimer3?period=3600000")
238
-                        .routeId("oracle-3")
211
+
212
+                from("timer:mytimer-update-statusDaily-oil_nozzle?period=3600000")
213
+                        .routeId("update-statusDaily-oil_nozzle")
214
+                        .setHeader("date", constant(getDate() + " 00:00:00"))
215
+                        .setBody(simple("select distinct rn.well_id,cb.prod_date,rn.pump_diameter  from centralbase.cb_temp_well_mech_runtime rn ,centralbase.cb_pc_pro_wellbore_status_daily cb where cb.well_id = rn.well_id " +
216
+                                "and cb.prod_date = '${header.date}' "))
217
+                        .to("jdbc:centralbase")//.log("${body}")
218
+                        .split(body()).process(exchange -> {
219
+                            Message in = exchange.getIn();
220
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
221
+                            aRow.putIfAbsent("pump_diameter", "0.0");
222
+                        })
223
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[pump_diameter]}' where well_id ='${body[well_id]}' and prod_date='${body[prod_date]}' "))
224
+                        .doTry()
225
+                        .to("jdbc:centralbase")
226
+                        .doCatch(Exception.class)
227
+                        .log("${header.date}"+" routeId:update-statusDaily-oil_nozzle ->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
228
+                        .end();
229
+
230
+
231
+
232
+                from("timer:mytimer-insert-volDaily?period=3600000")
233
+                        .routeId("insert-volDaily")
239 234
                         .setHeader("date", constant(getDate()))
240 235
                         .setBody(simple("select distinct  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
241 236
                         .to("jdbc:oracle")
@@ -282,26 +277,23 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
282 277
                         aRow.put("YMD",0.85);
283 278
                     }
284 279
                 })
285
-                        .doTry()
286 280
                         .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks,gas_oil_ratio,water_prod_daily,water_gas_ratio,surface_crude_water_density,surface_crude_oil_density) " +
287 281
                                 "select '${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}','${body[QYB]}','${body[RCSL]}','${body[SQB]}','${body[SMD]}','${body[YMD]}' " +
288 282
                                 "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
283
+                        .doTry()
289 284
                         .to("jdbc:centralbase")
290 285
                         .doCatch(Exception.class)
291
-                        .log("${header.date}"+" routeId:oracle-3->  centralbase.cb_pc_pro_wellbore_vol_daily insert data failed")
292
-                        .doTry()
293
-                        .setBody(simple("routeId:oracle-3->  update centralbase.cb_pc_pro_wellbore_vol_daily set  water_prod_daily =null where water_prod_daily = -1 and prod_date = '${header.date}' "))
286
+                        .log("${header.date}"+" routeId:insert-volDaily ->  centralbase.cb_pc_pro_wellbore_vol_daily insert data failed")
287
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  water_prod_daily =null where water_prod_daily = -1 and prod_date = '${header.date}' "))
294 288
                         .to("jdbc:centralbase")
295
-                        .setBody(simple("routeId:oracle-3->  update centralbase.cb_pc_pro_wellbore_vol_daily set  gas_oil_ratio =null where gas_oil_ratio = -1 and prod_date = '${header.date}' "))
289
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  gas_oil_ratio =null where gas_oil_ratio = -1 and prod_date = '${header.date}' "))
296 290
                         .to("jdbc:centralbase")
297
-                        .setBody(simple(" routeId:oracle-3->  update centralbase.cb_pc_pro_wellbore_vol_daily set  water_gas_ratio =null where water_gas_ratio = -1 and prod_date = '${header.date}' "))
291
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  water_gas_ratio =null where water_gas_ratio = -1 and prod_date = '${header.date}' "))
298 292
                         .to("jdbc:centralbase")
299
-                        .doCatch(Exception.class)
300
-                        .log("${header.date}"+" routeId:oracle-3->  centralbase.cb_pc_pro_wellbore_vol_daily update data failed")
301 293
                         .end();
302 294
 
303
-                from("timer:mytimer12?period=3600000")
304
-                        .routeId("oracle-12")
295
+                from("timer:mytimer-update-volDaily-liq_prod_daily?period=3600000")
296
+                        .routeId("update-volDaily-liq_prod_daily")
305 297
                         .setHeader("date", constant(getDate()))
306 298
                         .setBody(simple("select distinct  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
307 299
                         .to("jdbc:oracle")
@@ -348,193 +340,18 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
348 340
                                 aRow.put("YMD",0.85);
349 341
                             }
350 342
                         })
351
-                        .doTry()
352 343
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set prod_time = '${body[SCSJ]}' ,liq_prod_daily='${body[RCYL1]}' ,oil_prod_daily ='${body[RCYL]}' ,gas_prod_daily ='${body[RCQL]}' ,water_cut='${body[HS]}' ,remarks='${body[BZ]}' ,gas_oil_ratio='${body[QYB]}' ,water_prod_daily='${body[RCSL]}' ,water_gas_ratio='${body[SQB]}',surface_crude_water_density='${body[SMD]}',surface_crude_oil_density= '${body[YMD]}' " +
353 344
                                 "where well_id = '${body[JH]}' and prod_date ='${body[RQ]}' "))
354
-                        .to("jdbc:centralbase")
355
-                        .doCatch(Exception.class)
356
-                        .log("${header.date}"+" routeId:oracle-12->  centralbase.cb_pc_pro_wellbore_vol_daily update data failed")
357
-                        .end();
358
-
359
-                from("timer:mytimer4?period=3600000")
360
-                        .routeId("oracle-4")
361
-                        .setHeader("date", constant(getDate()))
362
-                        .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null  "))
363
-                        .to("jdbc:oracle")
364
-                        .split(body()).process(exchange -> {
365
-                    Message in = exchange.getIn();
366
-                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
367
-                    if (aRow.get("JY") == null) aRow.put("JY", "0.0");
368
-                    if (aRow.get("LY") == null) aRow.put("LY", "0.0");
369
-                    if (aRow.get("BJ") == null) aRow.put("BJ", "0.0");
370
-                    if (aRow.get("BS") == null) aRow.put("BS", "0.0");
371
-                    if (aRow.get("BX") == null) aRow.put("BX", "0.0");
372
-                    if (aRow.get("ZS") == null) aRow.put("ZS", "0.0");
373
-                    if (aRow.get("CC") == null) aRow.put("CC", "0.0");
374
-                    if (aRow.get("CS") == null) aRow.put("CS", "0.0");
375
-                    if (aRow.get("BLX") == null) aRow.put("BLX", "");
376
-                    if (aRow.get("DL") == null) aRow.put("DL", "0.0");
377
-                })
378 345
                         .doTry()
379
-                        .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency) " +
380
-                                "select '${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}' " +
381
-                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_temp_well_mech_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
382 346
                         .to("jdbc:centralbase")
383 347
                         .doCatch(Exception.class)
384
-                        .log("${header.date}"+" routeId:oracle-4->  centralbase.cb_temp_well_mech_daily insert data failed")
385
-                        .end();
386
-
387
-                from("timer:mytimer6?period=3600000")
388
-                        .routeId("oracle-6")
389
-                        .setHeader("date", constant(getDate() + " 00:00:00"))
390
-                        .setBody(simple("SELECT  distinct jh,max(rq),bj FROM DBA01 WHERE dym is not null group by jh,bj"))
391
-                        .to("jdbc:oracle")
392
-                        .split(body()).process(exchange -> {
393
-                            Message in = exchange.getIn();
394
-                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
395
-                            if (aRow.get("BJ") ==null || aRow.get("BJ").equals("")) aRow.put("BJ","0.0");
396
-                        })
397
-                        .setBody(simple("update centralbase.cb_temp_well_mech_runtime set pump_diameter = '${body[BJ]}' where well_id = '${body[JH]}' "))
398
-                        .to("jdbc:centralbase")
399
-                        .log("${header.date}"+" routeId:oracle-6->  centralbase.cb_temp_well_mech_runtime update data failed")
400
-                        .end();
401
-
402
-                from("timer:mytimer8?period=3600000")
403
-                        .routeId("jdbc-gtsj-?")
404
-                        .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
405
-                        .to("jdbc:centralbase")
406
-                        .split(body())
407
-                        .setHeader("date", simple("${body[max]}"))
408
-                        .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '${header.date}' "))
409
-                        .to("jdbc:gtsj")
410
-                        .split(body()).process(exchange -> {
411
-                    Message in = exchange.getIn();
412
-                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
413
-                    String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
414
-                    aRow.put("dyna_create_time", prod_date);
415
-                    if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
416
-                        String[] displacements = aRow.get("displacement").toString().split(";");//10 四舍五入
417
-                        //String[] displacements = wy(displacementsOld);
418
-                        String[] disp_loads = aRow.get("disp_load").toString().split(";");
419
-                        Double susp_max_load = max(disp_loads);
420
-                        Double susp_min_load = min(disp_loads);
421
-                        String sgt = "";
422
-                        for (int i = 0; i < displacements.length; i++) {
423
-                            sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
424
-                        }
425
-                        String[] s = sgt.split(",");
426
-                        String w = "";
427
-                        for (int i = 0; i < s.length; i++) {
428
-                            w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
429
-                        }
430
-                        Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
431
-                        aRow.put("sgt", SGTUtil.encodeToString(doubles));
432
-                        aRow.put("susp_max_load",susp_max_load);
433
-                        aRow.put("susp_min_load",susp_min_load);
434
-                    }
435
-                    if (aRow.get("stroke") == null) aRow.put("stroke", "0.0");
436
-                    if (aRow.get("frequency") == null) aRow.put("frequency", "0.0");
437
-                    if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load", "0.0");
438
-                    if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load", "0.0");
439
-                    if (aRow.get("frequency") != null){
440
-                        BigDecimal bd=new BigDecimal(aRow.get("frequency").toString());
441
-                        double frequency=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
442
-                        aRow.put("frequency",frequency);
443
-                    }
444
-                    if (aRow.get("stroke") != null){
445
-                        double stroke1 = Double.parseDouble(aRow.get("stroke").toString());
446
-                        BigDecimal bd=new BigDecimal(stroke1);
447
-                        double stroke=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
448
-                        aRow.put("stroke",stroke);
449
-                    }
450
-                })
451
-                        .doTry()
452
-                        .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
453
-                                "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
454
-                                "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${body[dyna_create_time]}' )"))
455
-                        .to("jdbc:centralbase")
456
-                        .doCatch(Exception.class)
457
-                        .log("${header.date}"+" routeId:jdbc-gtsj-?->  centralbase.cb_temp_well_mech_runtime insert data failed ${body}")
458
-                        .end();
459
-
460
-                  from("timer:mytimer9?period=3600000")
461
-                        .routeId("centralbase-2")
462
-                        .setHeader("date", constant(getDate() + " 00:00:00"))
463
-                        .setBody(simple("select distinct rn.well_id,cb.prod_date,rn.pump_diameter  from centralbase.cb_temp_well_mech_runtime rn ,centralbase.cb_pc_pro_wellbore_status_daily cb where cb.well_id = rn.well_id\n" +
464
-                                "and cb.prod_date = '${header.date}' "))
465
-                        .to("jdbc:centralbase")//.log("${body}")
466
-                        .split(body()).process(exchange -> {
467
-                            Message in = exchange.getIn();
468
-                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
469
-                            aRow.putIfAbsent("pump_diameter", "0.0");
470
-                })
471
-                        .doTry()
472
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[pump_diameter]}' where well_id ='${body[well_id]}' and prod_date='${body[prod_date]}' "))
473
-                        .to("jdbc:centralbase")
474
-                        .doCatch(Exception.class)
475
-                        .log("${header.date}"+" routeId:centralbase-2->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
476
-                        .end();
477
-
478
-               from("timer:mytimer10?period=3600000")
479
-                        .routeId("centralbase-3")
480
-                        .setHeader("date", constant(getDate()))
481
-                        .setBody(simple("select well_id,avg(stroke_length) stroke_length  ,avg(stroke_frequency) stroke_frequency from centralbase.cb_temp_well_mech_runtime where prod_date::date='${header.date}' group by well_id"))
482
-                        .to("jdbc:centralbase")//.log("${body}")
483
-                        .split(body()).process(exchange -> {
484
-                    Message in = exchange.getIn();
485
-                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
486
-                    if (aRow.get("stroke_length")!=null && aRow.get("stroke_frequency")!=null){
487
-                        double stroke_length=Double.parseDouble(aRow.get("stroke_length").toString());
488
-                        double stroke_frequency=Double.parseDouble(aRow.get("stroke_frequency").toString());
489
-                        BigDecimal bd=new BigDecimal(stroke_length);
490
-                        double stroke_lengt1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
491
-                        BigDecimal bd1=new BigDecimal(stroke_frequency);
492
-                        double stroke_frequency1=bd1.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
493
-                        aRow.put("strokeLength",stroke_lengt1);
494
-                        aRow.put("strokeFrequency",stroke_frequency1);
495
-                    }
496
-                })
497
-                        .doTry()
498
-                        .setBody(simple("update centralbase.cb_temp_well_mech_daily set stroke_length='${body[strokeLength]}' ,stroke_frequency ='${body[strokeFrequency]}' where well_id = '${body[well_id]}' and prod_date::date='${header.date}' "))
499
-                        .to("jdbc:centralbase")
500
-                        .doCatch(Exception.class)
501
-                        .log("${header.date}"+" routeId:centralbase-3->  centralbase.cb_temp_well_mech_daily update data failed")
348
+                        .log("${header.date}"+" routeId:update-volDaily-liq_prod_daily ->  centralbase.cb_pc_pro_wellbore_vol_daily update data failed")
502 349
                         .end();
350
+            };
351
+        };
503 352
 
504
-                from("timer:mytimer11?period=3600000")
505
-                        .routeId("centralbase-1")
506
-                        .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date = (select max(prod_date) from centralbase.cb_temp_well_mech_runtime) "))
507
-                        .to("jdbc:centralbase").process(exchange -> {
508
-                            sendMsgRunTime=0;
509
-                            DefaultMsgSendSuccessCallBack.count.set(0);
510
-                        })
511
-                        .split(body())
512
-                        .doTry()
513
-                        .process(exchange -> {
514
-                            Message in = exchange.getIn();
515
-                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
516
-                            String wellName =aRow.get("well_common_name").toString();
517
-                            String wellId =aRow.get("well_id").toString();
518
-                            String orgId = aRow.get("org_id").toString();
519
-                            String prodDate = aRow.get("prod_date").toString().substring(0,19);
520
-                            Double strokeLength = Double.valueOf(aRow.get("stroke_length").toString());
521
-                            Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString());
522
-                            String sgt = aRow.get("sgt").toString();
523
-                            if (sgt == null || sgt.length() ==0){
524
-                                sgt = "0,0";
525
-                            }
526
-                            DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
527
-                            sendMsgRunTime++;
528
-                            System.out.println("sendMsgRunTime = " + sendMsgRunTime);
529
-                            producer.send(diagnoseMsg);
530
-                        })
531
-                        .doCatch(Exception.class)
532
-                        .log("${header.date}"+" rocketMQ send data failed")
533
-                        .end();
534 353
 
535 354
 
536
-            };
537
-        };
538 355
 
539 356
     }
540 357
 }