Procházet zdrojové kódy

新版本数据读取

xsr před 1 týdnem
rodič
revize
7d36b78bf8

+ 18 - 0
pom.xml

@@ -33,6 +33,24 @@
33 33
         </dependencies>
34 34
     </dependencyManagement>
35 35
     <dependencies>
36
+        <!-- Apache POI (Excel处理) -->
37
+        <dependency>
38
+            <groupId>org.apache.poi</groupId>
39
+            <artifactId>poi</artifactId>
40
+            <version>5.2.3</version>
41
+        </dependency>
42
+        <dependency>
43
+            <groupId>org.apache.poi</groupId>
44
+            <artifactId>poi-ooxml</artifactId>
45
+            <version>5.2.3</version>
46
+        </dependency>
47
+        <!-- Camel Bindy CSV/Excel处理 -->
48
+        <dependency>
49
+            <groupId>org.apache.camel</groupId>
50
+            <artifactId>camel-bindy</artifactId>
51
+            <version>${camel.version}</version>
52
+        </dependency>
53
+
36 54
         <dependency>
37 55
             <groupId>org.springframework.boot</groupId>
38 56
             <artifactId>spring-boot-starter-quartz</artifactId>

+ 3 - 0
src/main/java/com/gct/tools/etlcamelhuge/EtlCamelHugeApplication.java

@@ -1,14 +1,17 @@
1 1
 package com.gct.tools.etlcamelhuge;
2 2
 
3 3
 
4
+import com.gct.tools.etlcamelhuge.camelconfig.TianAnDatabaseProperties;
4 5
 import org.springframework.boot.SpringApplication;
5 6
 import org.springframework.boot.autoconfigure.SpringBootApplication;
7
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
6 8
 import org.springframework.scheduling.annotation.EnableAsync;
7 9
 import springfox.documentation.swagger2.annotations.EnableSwagger2;
8 10
 
9 11
 @SpringBootApplication
10 12
 @EnableSwagger2
11 13
 @EnableAsync
14
+@EnableConfigurationProperties({TianAnDatabaseProperties.class})
12 15
 public class EtlCamelHugeApplication {
13 16
     public static void main(String[] args) {
14 17
         SpringApplication.run(EtlCamelHugeApplication.class, args);

+ 12 - 0
src/main/java/com/gct/tools/etlcamelhuge/camelconfig/TianAnDatabaseProperties.java

@@ -0,0 +1,12 @@
1
+package com.gct.tools.etlcamelhuge.camelconfig;
2
+
3
+import lombok.Data;
4
+import org.springframework.boot.context.properties.ConfigurationProperties;
5
+
6
+@Data
7
+@ConfigurationProperties(prefix = "gct.tian-an")
8
+public class TianAnDatabaseProperties {
9
+    private String username;
10
+    private String password;
11
+    private String url;
12
+}

+ 24 - 0
src/main/java/com/gct/tools/etlcamelhuge/entity/TianAnGtPoint.java

@@ -0,0 +1,24 @@
1
+package com.gct.tools.etlcamelhuge.entity;
2
+
3
+import lombok.AllArgsConstructor;
4
+import lombok.Data;
5
+import lombok.NoArgsConstructor;
6
+import org.apache.camel.dataformat.bindy.annotation.DataField;
7
+
8
+@Data
9
+@AllArgsConstructor
10
+@NoArgsConstructor
11
+public class TianAnGtPoint {
12
+
13
+    @DataField(pos = 1)
14
+    private String acqUnitId;
15
+
16
+    @DataField(pos = 2)
17
+    private String tag;
18
+
19
+    @DataField(pos = 3)
20
+    private String tagDescription;
21
+
22
+    @DataField(pos = 4)
23
+    private String wellName;
24
+}

+ 330 - 47
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCCofRealTimeConfiguration.java

@@ -1,25 +1,36 @@
1 1
 package com.gct.tools.etlcamelhuge.routeconfig;
2 2
 
3
+import com.alibaba.fastjson.JSONArray;
4
+import com.alibaba.fastjson.JSONObject;
3 5
 import com.gct.common.util.SGTUtil;
4 6
 import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
7
+import com.gct.tools.etlcamelhuge.camelconfig.TianAnDatabaseProperties;
5 8
 import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
9
+import com.gct.tools.etlcamelhuge.entity.TianAnGtPoint;
6 10
 import org.apache.camel.Message;
7 11
 import org.apache.camel.builder.RouteBuilder;
12
+import org.apache.camel.dataformat.bindy.csv.BindyCsvDataFormat;
13
+import org.springframework.beans.factory.annotation.Autowired;
8 14
 import org.springframework.context.annotation.Bean;
9 15
 import org.springframework.context.annotation.Configuration;
10 16
 import org.springframework.util.StringUtils;
11 17
 
12 18
 import javax.annotation.Resource;
13
-import java.io.IOException;
14
-import java.io.Reader;
19
+import java.io.*;
15 20
 import java.lang.management.MemoryMXBean;
16 21
 import java.math.BigDecimal;
17 22
 import java.math.RoundingMode;
23
+import java.net.HttpURLConnection;
24
+import java.net.URL;
25
+import java.nio.charset.StandardCharsets;
18 26
 import java.sql.Clob;
19 27
 import java.sql.SQLException;
28
+import java.time.Instant;
20 29
 import java.time.LocalDateTime;
30
+import java.time.ZoneId;
21 31
 import java.time.format.DateTimeFormatter;
22 32
 import java.util.*;
33
+import java.util.stream.Collectors;
23 34
 
24 35
 /**
25 36
  * class name: CamelJDBCCofRealTimeConfiguration.java
@@ -75,7 +86,7 @@ public class CamelJDBCCofRealTimeConfiguration {
75 86
     @Resource(name = "diagnoseMessageProducer")
76 87
     private MessageProducer producer;
77 88
 
78
-    private String clobToString(Clob clob){
89
+    private String clobToString(Clob clob) {
79 90
         StringBuilder sb = new StringBuilder();
80 91
         if (clob != null) {
81 92
             try (Reader reader = clob.getCharacterStream()) {
@@ -94,6 +105,315 @@ public class CamelJDBCCofRealTimeConfiguration {
94 105
 
95 106
     }
96 107
 
108
+    @Autowired
109
+    private TianAnDatabaseProperties tianAn;
110
+
111
+    private static String postTokenBodyTemplate = "\"username\":\"%s\",\"password\":\"%s\"";
112
+
113
+    private static String postGtDataBodyTemplate = "\"unit_name\":\"%s\",\"tags\":\"%s\"";
114
+
115
+    private JSONObject sendPostRequest(String apiUrl, Map<String, String> headers, String requestBody) throws IOException {
116
+        URL url = new URL(apiUrl);
117
+        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
118
+        // 配置请求
119
+        connection.setRequestMethod("POST");
120
+        connection.setDoOutput(true);
121
+        // 设置请求头
122
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
123
+            connection.setRequestProperty(entry.getKey(), entry.getValue());
124
+        }
125
+        // 写入请求体
126
+        try (OutputStream os = connection.getOutputStream()) {
127
+            byte[] input = requestBody.getBytes(StandardCharsets.UTF_8);
128
+            os.write(input, 0, input.length);
129
+        }
130
+        // 处理响应
131
+        int responseCode = connection.getResponseCode();
132
+        try (BufferedReader br = new BufferedReader(
133
+                new InputStreamReader(
134
+                        responseCode == 200 ? connection.getInputStream() : connection.getErrorStream(),
135
+                        StandardCharsets.UTF_8
136
+                )
137
+        )) {
138
+            StringBuilder response = new StringBuilder();
139
+            String responseLine;
140
+            while ((responseLine = br.readLine()) != null) {
141
+                response.append(responseLine.trim());
142
+            }
143
+            JSONObject obj = new JSONObject();
144
+            obj.put("responseCode", responseCode);
145
+            obj.put("response", response.toString());
146
+            return obj;
147
+        }
148
+    }
149
+
150
+    private JSONObject sendGetRequest(String apiUrl, Map<String, String> queryParams, Map<String, String> headers) throws IOException {
151
+        // 构建带查询参数的URL
152
+        StringBuilder urlBuilder = new StringBuilder(apiUrl);
153
+        if (queryParams != null && !queryParams.isEmpty()) {
154
+            urlBuilder.append("?");
155
+            boolean firstParam = true;
156
+            for (Map.Entry<String, String> param : queryParams.entrySet()) {
157
+                if (!firstParam) {
158
+                    urlBuilder.append("&");
159
+                }
160
+                urlBuilder.append(param.getKey())
161
+                        .append("=")
162
+                        .append(param.getValue());
163
+                firstParam = false;
164
+            }
165
+        }
166
+        URL url = new URL(urlBuilder.toString());
167
+        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
168
+        // 配置请求
169
+        connection.setRequestMethod("GET");
170
+        // 设置请求头
171
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
172
+            connection.setRequestProperty(entry.getKey(), entry.getValue());
173
+        }
174
+        // 处理响应
175
+        int responseCode = connection.getResponseCode();
176
+        try (BufferedReader br = new BufferedReader(
177
+                new InputStreamReader(
178
+                        responseCode == 200 ? connection.getInputStream() : connection.getErrorStream(),
179
+                        StandardCharsets.UTF_8
180
+                )
181
+        )) {
182
+            StringBuilder response = new StringBuilder();
183
+            String responseLine;
184
+            while ((responseLine = br.readLine()) != null) {
185
+                response.append(responseLine.trim());
186
+            }
187
+            JSONObject obj = new JSONObject();
188
+            obj.put("responseCode", responseCode);
189
+            obj.put("response", response.toString());
190
+            return obj;
191
+        }
192
+    }
193
+
194
+    private String timestampToBeijingTime(long timestamp) {
195
+        // 创建北京时间的时区(Asia/Shanghai)
196
+        ZoneId beijingZone = ZoneId.of("Asia/Shanghai");
197
+        // 将时间戳转换为 Instant(UTC 时间点)
198
+        Instant instant = Instant.ofEpochMilli(timestamp);
199
+        // 将 Instant 转换为北京时间的 LocalDateTime
200
+        LocalDateTime localDateTime = instant.atZone(beijingZone).toLocalDateTime();
201
+        // 定义日期时间格式
202
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
203
+        // 格式化为字符串
204
+        return localDateTime.format(formatter);
205
+    }
206
+
207
+    @Bean
208
+    public RouteBuilder routerBuilderRealTimeNew() {
209
+        return new RouteBuilder() {
210
+            @Override
211
+            public void configure() throws Exception {
212
+                from("timer:mytimer-insert-runtime?period=1800000")
213
+                        .to("direct:postToken")
214
+                        .to("direct:getAccessWell")
215
+                        .to("direct:getTianAnGtPointData")
216
+                        .to("direct:querySingleWellDataFromTianAn");
217
+
218
+
219
+                from("direct:postToken")
220
+                        .routeId("postToken")
221
+                        .setHeader("Content-Type", constant("application/x-www-form-urlencoded"))
222
+                        .setHeader("CamelHttpMethod", constant("POST"))
223
+                        .setBody(constant(String.format(postTokenBodyTemplate, tianAn.getUsername(), tianAn.getPassword())))
224
+                        .to(tianAn.getUrl() + "/token")
225
+                        .process(exchange -> {
226
+                            String response = exchange.getIn().getBody(String.class);
227
+                            JSONObject obj = JSONObject.parseObject(response);
228
+                            if (!obj.getString("result").equals("ok"))
229
+                                throw new RuntimeException(obj.getString("content"));
230
+                            exchange.setProperty("access_token", obj.getJSONObject("content").getString("access_token"));
231
+                            exchange.setProperty("token_type", obj.getJSONObject("content").getString("token_type"));
232
+                        });
233
+
234
+                from("direct:getAccessWell")
235
+                        .routeId("getAccessWell")
236
+                        .setBody(simple("select well_id ,sgt_last_time  from centralbase.sys_access_well_control  where  access_status='1'  "))
237
+                        .to("jdbc:centralbase")
238
+                        .process(exchange -> {
239
+                            List<Map<String, Object>> body = exchange.getIn().getBody(List.class);
240
+                            Map<String, String> sgtLastTimeMap = new HashMap<>();
241
+                            body.forEach(map -> {
242
+                                if (Objects.isNull(map.get("sgt_last_time"))) {
243
+                                    map.put("sgt_last_time", LocalDateTime.now().minusHours(4).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
244
+                                }
245
+                                sgtLastTimeMap.put(map.get("well_id").toString(), map.get("sgt_last_time").toString().substring(0, 19));
246
+                            });
247
+                            exchange.setProperty("sgtLastTimeMap", sgtLastTimeMap);
248
+                        });
249
+
250
+                BindyCsvDataFormat bindy = new BindyCsvDataFormat(com.gct.tools.etlcamelhuge.entity.TianAnGtPoint.class);
251
+                bindy.setLocale("en_US");
252
+
253
+                from("direct:getTianAnGtPointData")
254
+                        .routeId("getTianAnGtPointData")
255
+                        .setHeader("CamelFileName", constant("employees.xlsx")) // Excel文件名
256
+                        .to("classpath:data/employees.xlsx") // 从resources/data目录读取文件
257
+                        .log("读取Excel文件成功")
258
+                        .convertBodyTo(String.class) // 转换为字符串
259
+                        .unmarshal(bindy) // 使用Bindy解析CSV/Excel
260
+                        .log("解析Excel内容完成")
261
+                        .process(exchange -> {
262
+                            List<TianAnGtPoint> list = exchange.getIn().getBody(List.class);
263
+                            //对wellname做分类,存入Exchange属性
264
+                            Map<String, List<TianAnGtPoint>> wellNameMap = new HashMap<>();
265
+                            for (TianAnGtPoint tianAnGtPoint : list) {
266
+                                wellNameMap.computeIfAbsent(tianAnGtPoint.getWellName(), k -> new ArrayList<>()).add(tianAnGtPoint);
267
+                            }
268
+                            exchange.setProperty("wellNameData", wellNameMap);
269
+                            exchange.getIn().setBody("Excel解析完成,数据已存入exchange属性");
270
+                        })
271
+                        .log("Excel数据已分类后存入Exchange属性");
272
+
273
+                from("direct:querySingleWellDataFromTianAn")
274
+                        .routeId("querySingleWellDataFromTianAn")
275
+                        .process(exchange -> {
276
+                            Map<String, List<TianAnGtPoint>> wellNameData = exchange.getProperty("wellNameData", Map.class);
277
+
278
+                            wellNameData.forEach((wellName, list) -> {
279
+                                //对acqUnitId 在进行分类 功图与冲次数据
280
+                                Map<String, List<TianAnGtPoint>> acqUnitMap = new HashMap<>();
281
+                                for (TianAnGtPoint tianAnGtPoint : list) {
282
+                                    acqUnitMap.computeIfAbsent(tianAnGtPoint.getAcqUnitId(), k -> new ArrayList<>()).add(tianAnGtPoint);
283
+                                }
284
+
285
+                                Map<String, String> rowMap = new HashMap<>();
286
+
287
+                                acqUnitMap.forEach((unitName, tianAnGtPoints) -> {
288
+                                    Map<String, String> headers = new HashMap<>();
289
+                                    headers.put("Content-Type", "application/json");
290
+                                    headers.put("Authorization", "Bearer " + exchange.getProperty("access_token"));
291
+
292
+                                    StringBuilder tags = new StringBuilder();
293
+                                    tianAnGtPoints.forEach(tianAnGtPoint -> {
294
+                                        tags.append(tianAnGtPoint.getTag()).append(",");
295
+                                    });
296
+                                    tags.deleteCharAt(tags.length() - 1);
297
+
298
+                                    Map<String, String> queryParams = new HashMap<>();
299
+                                    queryParams.put("unit_name", unitName);
300
+                                    queryParams.put("tags", tags.toString());
301
+
302
+                                    try {
303
+                                        JSONObject obj = sendGetRequest(tianAn.getUrl() + "/api_3rd/get_tags_datas", queryParams, headers);
304
+                                        JSONArray res = obj.getJSONArray("response");
305
+                                        for (int i = 0; i < res.size(); i++) {
306
+                                            JSONObject resObj = res.getJSONObject(i);
307
+                                            rowMap.put(resObj.getString("tagName"), resObj.getString("value"));
308
+                                        }
309
+                                        if (res.size() > 0) {
310
+                                            String prodDate = timestampToBeijingTime(res.getJSONObject(0).getLong("timestamp"));
311
+                                            rowMap.put("prod_date", prodDate);
312
+                                        }
313
+                                    } catch (IOException e) {
314
+                                        throw new RuntimeException(e);
315
+                                    }
316
+                                });
317
+
318
+                                log.info("wellName:{},rowData:{}", wellName, rowMap);
319
+                                Map<String, String> sgtLastTimeMap = exchange.getProperty("sgtLastTimeMap", Map.class);
320
+
321
+                                if (rowMap.containsKey("prod_date") && sgtLastTimeMap.containsKey(wellName)) {
322
+                                    DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
323
+                                    LocalDateTime prodDate = LocalDateTime.parse(rowMap.get("prod_date"), df);
324
+                                    LocalDateTime sgtLastTime = LocalDateTime.parse(sgtLastTimeMap.get(wellName), df);
325
+                                    if (prodDate.isAfter(sgtLastTime)) {
326
+                                        rowMap.put("sgt_last_time", sgtLastTimeMap.get(wellName));
327
+                                        rowMap.put("wellName", wellName);
328
+                                        exchange.getIn().setHeader("sgt_last_time", sgtLastTimeMap.get(wellName));
329
+                                        exchange.getIn().setHeader("wellName", wellName);
330
+
331
+                                        exchange.getIn().setBody(rowMap);
332
+                                        exchange.getContext().createProducerTemplate().send("direct:saveToCentralbaseAndSendMessage", exchange);
333
+                                    }
334
+                                }
335
+                            });
336
+                        });
337
+
338
+                from("direct:saveToCentralbaseAndSendMessage")
339
+                        .routeId("saveToCentralbaseAndSendMessage")
340
+                        .process(exchange -> {
341
+                            Map<String, Object> rowMap = exchange.getIn().getBody(Map.class);
342
+                            String wellName = rowMap.get("wellName").toString();
343
+                            String prodDate = rowMap.get("prod_date").toString();
344
+                            String frequency = rowMap.get("CHC").toString();
345
+                            if (!StringUtils.isEmpty(rowMap.get("DYNACARD_DISPLACEMENT")) && !StringUtils.isEmpty(rowMap.get("DYNACARD_LOAD"))) {
346
+                                String[] displacements = rowMap.get("DYNACARD_DISPLACEMENT").toString().split(",");//10 四舍五入
347
+                                for (int i = 0; i < displacements.length; i++) {
348
+                                    displacements[i] = Double.toString(Double.parseDouble(displacements[i]) / 10);
349
+                                }
350
+                                String[] disp_loads = rowMap.get("DYNACARD_LOAD").toString().split(",");
351
+                                Double SUSP_MAX_LOAD = max(disp_loads);
352
+                                Double SUSP_MIN_LOAD = min(disp_loads);
353
+                                String sgt = "";
354
+                                for (int i = 0; i < displacements.length; i++) {
355
+                                    sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
356
+                                }
357
+                                String[] s = sgt.split(",");
358
+                                String w = "";
359
+                                for (int i = 0; i < s.length; i++) {
360
+                                    w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
361
+                                }
362
+                                Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
363
+                                rowMap.put("sgt", SGTUtil.encodeToString(doubles));
364
+                                rowMap.put("SUSP_MAX_LOAD", SUSP_MAX_LOAD);
365
+                                rowMap.put("SUSP_MIN_LOAD", SUSP_MIN_LOAD);
366
+                                double stroke = Arrays.stream(doubles)
367
+                                        .mapToDouble(row -> row[0])
368
+                                        .max()
369
+                                        .orElse(Double.NEGATIVE_INFINITY);
370
+                                rowMap.put("stroke", stroke);
371
+                                log.info("wellName:{},rowData:{}", wellName, rowMap);
372
+                                // sendDataToRocketMQ(wellName, wellName, prodDate, stroke, frequency, sgt);
373
+                            }
374
+                        })
375
+                        .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,SUSP_MAX_LOAD,SUSP_MIN_LOAD,sgt,stroke_length,stroke_frequency) " +
376
+                                "select '${body[wellName]}','${body[prod_date]}','${body[SUSP_MAX_LOAD]}','${body[SUSP_MIN_LOAD]}','${body[sgt]}','${body[stroke]}','${body[CHC]}' " +
377
+                                "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[wellName]}' and  prod_date = '${body[DYNA_CREATE_TIME]}' )"))
378
+                        .to("jdbc:centralbase")
379
+                        .setBody(simple("update centralbase.sys_access_well_control set sgt_last_time = '${header.sgt_last_time}' where well_id ='${header.well_id}' "))
380
+                        .to("jdbc:centralbase")
381
+                        .end();
382
+
383
+
384
+                from("timer:mytimer-update-avg-mech_daily?period=10800000")
385
+                        .routeId("update-avg-mech_daily")
386
+                        .process(exchange -> {
387
+                            Message in = exchange.getIn();
388
+                            in.setHeader("date", getDate());
389
+                        })
390
+                        .setBody(simple("select well_id,avg(stroke_length) stroke_length  ,avg(stroke_frequency) stroke_frequency from centralbase.cb_temp_well_mech_runtime where prod_date::date='${header.date}' group by well_id"))
391
+                        .to("jdbc:centralbase")
392
+                        .split(body()).process(exchange -> {
393
+                            Message in = exchange.getIn();
394
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
395
+                            if (aRow.get("stroke_length") != null && aRow.get("stroke_frequency") != null) {
396
+                                double stroke_length = Double.parseDouble(aRow.get("stroke_length").toString());
397
+                                double stroke_frequency = Double.parseDouble(aRow.get("stroke_frequency").toString());
398
+                                BigDecimal bd = new BigDecimal(stroke_length);
399
+                                double stroke_lengt1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
400
+                                BigDecimal bd1 = new BigDecimal(stroke_frequency);
401
+                                double stroke_frequency1 = bd1.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
402
+                                aRow.put("strokeLength", stroke_lengt1);
403
+                                aRow.put("strokeFrequency", stroke_frequency1);
404
+                            }
405
+                        })
406
+                        .setBody(simple("update centralbase.cb_temp_well_mech_daily set stroke_length='${body[strokeLength]}' ,stroke_frequency ='${body[strokeFrequency]}' where well_id = '${body[well_id]}' and prod_date::date='${header.date}' "))
407
+                        .doTry()
408
+                        .to("jdbc:centralbase")
409
+                        .doCatch(Exception.class)
410
+                        .log("${header.date}" + " routeId:update-avg-mech_daily ->  centralbase.cb_temp_well_mech_daily update data failed")
411
+                        .end();
412
+
413
+            }
414
+        };
415
+    }
416
+
97 417
     @Bean
98 418
     public RouteBuilder routeBuilderWithRealTime() {
99 419
         return new RouteBuilder() {
@@ -107,9 +427,9 @@ public class CamelJDBCCofRealTimeConfiguration {
107 427
                             HashMap body = exchange.getIn().getBody(HashMap.class);
108 428
                             exchange.getIn().setHeader("well_id", body.get("well_id"));
109 429
 //                            System.out.println(body.get("well_id"));
110
-                            if (Objects.isNull(body.get("sgt_last_time")))body.put("sgt_last_time"
111
-                                    ,LocalDateTime.now().minusHours(4).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
112
-                            exchange.getIn().setHeader("sgt_last_time", body.get("sgt_last_time").toString().substring(0,19));
430
+                            if (Objects.isNull(body.get("sgt_last_time"))) body.put("sgt_last_time"
431
+                                    , LocalDateTime.now().minusHours(4).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
432
+                            exchange.getIn().setHeader("sgt_last_time", body.get("sgt_last_time").toString().substring(0, 19));
113 433
                         })
114 434
                         .setBody(simple("select WELL_ID,WELL_NAME,DYNA_CREATE_TIME,CHECK_DATE,DISPLACEMENT,DISP_LOAD,STROKE,FREQUENCY,SUSP_MAX_LOAD,SUSP_MIN_LOAD" +
115 435
                                 " from DEFAULT_GONGTU where CHECK_DATE >= to_date('${header.sgt_last_time}','yyyy-mm-dd hh24:mi:ss') " +
@@ -126,12 +446,12 @@ public class CamelJDBCCofRealTimeConfiguration {
126 446
                             aRow.put("sgt", "");
127 447
                             //swap table error design column
128 448
                             Object tempObj = aRow.get("STROKE");
129
-                            aRow.put("STROKE",aRow.get("FREQUENCY"));
130
-                            aRow.put("FREQUENCY",tempObj);
449
+                            aRow.put("STROKE", aRow.get("FREQUENCY"));
450
+                            aRow.put("FREQUENCY", tempObj);
131 451
                             if (!StringUtils.isEmpty(aRow.get("DISPLACEMENT")) && !StringUtils.isEmpty(aRow.get("DISP_LOAD"))) {
132
-                                String[] displacements =  clobToString((Clob) aRow.get("DISPLACEMENT")).split(",");//10 四舍五入
452
+                                String[] displacements = clobToString((Clob) aRow.get("DISPLACEMENT")).split(",");//10 四舍五入
133 453
                                 for (int i = 0; i < displacements.length; i++) {
134
-                                    displacements[i] = Double.toString(Double.parseDouble(displacements[i])/10);
454
+                                    displacements[i] = Double.toString(Double.parseDouble(displacements[i]) / 10);
135 455
                                 }
136 456
 //                                System.out.println("disp="+Arrays.deepToString(displacements));
137 457
                                 String[] disp_loads = clobToString((Clob) aRow.get("DISP_LOAD")).split(",");
@@ -236,43 +556,6 @@ public class CamelJDBCCofRealTimeConfiguration {
236 556
                         .doCatch(Exception.class)
237 557
                         .log("${header.date}" + " routeId:update-avg-mech_daily ->  centralbase.cb_temp_well_mech_daily update data failed")
238 558
                         .end();
239
-
240
-               /* from("timer:mytimer-insert-mechDaily?period=3600000")
241
-                        .routeId("insert-mech_daily")
242
-                        .process(exchange -> {
243
-                            Message in = exchange.getIn();
244
-                            in.setHeader("date", getDate());
245
-                        })
246
-                        .setBody(simple("select well_id from centralbase.sys_access_well_control  where  access_status='1'  "))
247
-                        .to("jdbc:centralbase")
248
-                        .split(body()).process(exchange -> {
249
-                            HashMap body = exchange.getIn().getBody(HashMap.class);
250
-                            exchange.getIn().setHeader("well_id", body.get("well_id"));
251
-                        })
252
-                        .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and jh='${header.well_id}' and qyrq is not null  "))
253
-                        .to("jdbc:oracle")
254
-                        .split(body()).process(exchange -> {
255
-                            Message in = exchange.getIn();
256
-                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
257
-                            if (aRow.get("JY") == null) aRow.put("JY", "0.0");
258
-                            if (aRow.get("LY") == null) aRow.put("LY", "0.0");
259
-                            if (aRow.get("BJ") == null) aRow.put("BJ", "0.0");
260
-                            if (aRow.get("BS") == null) aRow.put("BS", "0.0");
261
-                            if (aRow.get("BX") == null) aRow.put("BX", "0.0");
262
-                            if (aRow.get("ZS") == null) aRow.put("ZS", "0.0");
263
-                            if (aRow.get("CC") == null) aRow.put("CC", "0.0");
264
-                            if (aRow.get("CS") == null) aRow.put("CS", "0.0");
265
-                            if (aRow.get("BLX") == null) aRow.put("BLX", "");
266
-                            if (aRow.get("DL") == null) aRow.put("DL", "0.0");
267
-                        })
268
-                        .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency) " +
269
-                                "select '${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}' " +
270
-                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_temp_well_mech_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
271
-                        .doTry()
272
-                        .to("jdbc:centralbase")
273
-                        .doCatch(Exception.class)
274
-                        .log("${header.date}" + " routeId:insert-mech_daily ->  centralbase.cb_temp_well_mech_daily insert data failed")
275
-                        .end();*/
276 559
             }
277 560
 
278 561
         };

+ 6 - 0
src/main/resources/application.yml

@@ -95,6 +95,8 @@ spring:
95 95
         ## 数据库连接超时时间,默认30秒,即30000
96 96
         connection-timeout: 30000
97 97
 
98
+
99
+
98 100
 management:
99 101
   info:
100 102
     camel:
@@ -127,6 +129,10 @@ rocketmq:
127 129
     access-key: 123456
128 130
     secret-key: 123456
129 131
 gct:
132
+  tian-an:
133
+    url: 11.72.150.241:8127
134
+    username: dihang
135
+    password: DH@scada1234
130 136
   rocketmq:
131 137
     one:
132 138
       topic: diagnose-msg

binární
src/main/resources/zy_gt_point.xlsx