Browse Source

错误时间信息报警
接入实时数据

lijian 2 years ago
parent
commit
d56408f53e

+ 206 - 68
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelRestConfiguration.java

@@ -10,41 +10,41 @@ import org.apache.camel.Message;
10 10
 import org.apache.camel.builder.RouteBuilder;
11 11
 import org.apache.camel.model.dataformat.JsonLibrary;
12 12
 import org.springframework.context.annotation.Configuration;
13
+import org.springframework.jdbc.core.JdbcTemplate;
13 14
 
15
+import javax.annotation.Resource;
16
+import javax.sql.DataSource;
14 17
 import java.io.*;
15 18
 import java.math.BigDecimal;
16 19
 import java.net.HttpURLConnection;
17 20
 import java.net.URL;
18 21
 import java.net.URLEncoder;
19 22
 import java.text.SimpleDateFormat;
20
-import java.util.ArrayList;
21
-import java.util.Date;
22
-import java.util.HashMap;
23
-import java.util.List;
23
+import java.util.*;
24 24
 
25 25
 
26 26
 @Configuration
27 27
 public class CamelRestConfiguration extends RouteBuilder {
28 28
 
29 29
 
30
+    private static Map<String, String> accessWellMap = new HashMap();
31
+
30 32
     public String gtTransUtil(String gtdata)//处理示功图
31 33
     {
32 34
         String gt = "";
33
-        if (gtdata == null){
35
+        if (gtdata == null) {
34 36
             gtdata = "";
35 37
         }
36 38
 
37 39
         Double[][] doubles = null;
38
-        int len = gtdata.length()/8;
39
-        if(!gtdata.equals(""))
40
-        {
41
-            for(int i=0;i<len;i++)
42
-            {
43
-                gt += String.valueOf(Double.parseDouble(gtdata.substring(i*8, i*8+3))/100)+ ",";
44
-                if (i ==len-1){
45
-                    gt += String.valueOf(Double.parseDouble(gtdata.substring(i*8+3, i*8+8))/100);
46
-                }else {
47
-                    gt += String.valueOf(Double.parseDouble(gtdata.substring(i*8+3, i*8+8))/100)+",";
40
+        int len = gtdata.length() / 8;
41
+        if (!gtdata.equals("")) {
42
+            for (int i = 0; i < len; i++) {
43
+                gt += String.valueOf(Double.parseDouble(gtdata.substring(i * 8, i * 8 + 3)) / 100) + ",";
44
+                if (i == len - 1) {
45
+                    gt += String.valueOf(Double.parseDouble(gtdata.substring(i * 8 + 3, i * 8 + 8)) / 100);
46
+                } else {
47
+                    gt += String.valueOf(Double.parseDouble(gtdata.substring(i * 8 + 3, i * 8 + 8)) / 100) + ",";
48 48
                 }
49 49
             }
50 50
 
@@ -59,14 +59,57 @@ public class CamelRestConfiguration extends RouteBuilder {
59 59
         String string = SGTUtil.encodeToString(doubles);
60 60
         return string;
61 61
     }
62
-    public static String getDate()
63
-    {
62
+
63
+    public static String getDate() {
64 64
         SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
65 65
         String date = df.format(new java.util.Date());// new Date()为获取当前系统时间
66 66
         return date;
67 67
     }
68 68
 
69
-    public String loadPhdDate( String well_id) throws Exception {
69
+
70
+    public void getCurrentTimeParams(String wellId, String wellName) throws Exception {
71
+
72
+        Map<String, Double> map = new HashMap<>();
73
+
74
+        String ACurrent = "/upData/A相电流";
75
+        String BCurrent = "/upData/B相电流";
76
+        String CCurrent = "/upData/C相电流";
77
+        String APress = "/upData/A相电压";
78
+        String BPress = "/upData/B相电压";
79
+        String CPress = "/upData/C相电压";
80
+        String noTotalPower = "/upData/总无功功率";
81
+        String totalPower = "/upData/总有功功率";
82
+        String casePress = "/upData/套压";
83
+        String oilPress = "/upData/油压";
84
+
85
+        map.put("ACurrent", Double.valueOf("".equals(getParams(wellId + ACurrent)) ? "0" : getParams(wellId + ACurrent)));
86
+        map.put("BCurrent", Double.valueOf("".equals(getParams(wellId + BCurrent)) ? "0" : getParams(wellId + BCurrent)));
87
+        map.put("CCurrent", Double.valueOf("".equals(getParams(wellId + CCurrent)) ? "0" : getParams(wellId + CCurrent)));
88
+        map.put("APress", Double.valueOf("".equals(getParams(wellId + APress)) ? "0" : getParams(wellId + APress)));
89
+        map.put("BPress", Double.valueOf("".equals(getParams(wellId + BPress)) ? "0" : getParams(wellId + BPress)));
90
+        map.put("CPress", Double.valueOf("".equals(getParams(wellId + CPress)) ? "0" : getParams(wellId + CPress)));
91
+        map.put("noTotalPower", Double.valueOf("".equals(getParams(wellId + noTotalPower)) ? "0" : getParams(wellId + noTotalPower)));
92
+        map.put("totalPower", Double.valueOf("".equals(getParams(wellId + totalPower)) ? "0" : getParams(wellId + totalPower)));
93
+        map.put("casePress", Double.valueOf("".equals(getParams(wellId + casePress)) ? "0" : getParams(wellId + casePress)));
94
+        map.put("oilPress", Double.valueOf("".equals(getParams(wellId + oilPress)) ? "0" : getParams(wellId + oilPress)));
95
+        insertCurrentTimeParams(map, wellName);
96
+
97
+
98
+    }
99
+
100
+    public void insertCurrentTimeParams(Map<String, Double> map, String wellName) {
101
+        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");//设置日期格式
102
+        String dateNow = df.format(new java.util.Date());
103
+        JdbcTemplate jdbcTemplate = new JdbcTemplate(baseDataSource);
104
+        String sql = "INSERT INTO centralbase.cb_pc_current_time_params (well_name, prod_date,a_current, b_current, c_current, a_press, b_press, c_press, no_total_power, total_power, case_press, oil_press) " +
105
+                "VALUES ('" + wellName + "','" + dateNow + "'," + (map.get("ACurrent") == 0 ? null : map.get("ACurrent")) + "," + (map.get("BCurrent") == 0 ? null : map.get("BCurrent")) + "," + (map.get("CCurrent") == 0 ? null : map.get("CCurrent")) + "," + (map.get("APress") == 0 ? null : map.get("APress")) + "," + (map.get("BPress") == 0 ? null : map.get("BPress")) + ","
106
+                + (map.get("CPress") == 0 ? null : map.get("CPress")) + "," + (map.get("noTotalPower") == 0 ? null : map.get("noTotalPower")) + "," + (map.get("totalPower") == 0 ? null : map.get("totalPower")) + "," + (map.get("casePress") == 0 ? null : map.get("casePress")) + "," + (map.get("oilPress") == 0 ? null : map.get("oilPress")) + ")  ON CONFLICT (well_name,prod_date)  DO NOTHING;";
107
+        if (accessWellMap.get(wellName) != null)
108
+            jdbcTemplate.update(sql);
109
+    }
110
+
111
+
112
+    public String getParams(String param) throws Exception {
70 113
         URL restURL = new URL("http://11.72.128.71/api-service/tsdb/getLast?appKey=MOsEWVl8S0");
71 114
 
72 115
         //此处的urlConnection对象实际上是根据URL的请求协议(此处是http) 生成的URLConnection类 的子类HttpURLConnection
@@ -82,7 +125,59 @@ public class CamelRestConfiguration extends RouteBuilder {
82 125
         conn.setAllowUserInteraction(false);
83 126
         DataOutputStream out = new DataOutputStream(conn
84 127
                 .getOutputStream());
85
-        String content = "id="+URLEncoder.encode(well_id+"/upData/电流平衡率", "UTF-8");
128
+        String content = "id=" + URLEncoder.encode(param, "UTF-8");
129
+        out.writeBytes(content);
130
+        out.flush();
131
+        out.close();
132
+        PrintStream ps = new PrintStream(conn.getOutputStream());
133
+
134
+        ps.close();
135
+
136
+        InputStream is = null;
137
+        if (conn.getResponseCode() == 200) {
138
+            is = conn.getInputStream();
139
+        } else {
140
+            is = conn.getErrorStream();
141
+        }
142
+        BufferedReader bReader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
143
+        String line;
144
+        StringBuilder resultStr = new StringBuilder();
145
+        while (null != (line = bReader.readLine())) {
146
+            resultStr.append(line);
147
+        }
148
+
149
+        String value = "";
150
+        JSONObject json = null;
151
+        try {
152
+            json = JSONObject.parseObject(resultStr.toString());
153
+
154
+            if (json.size() > 0) {
155
+                value = json.get("value").toString();
156
+            }
157
+
158
+        } catch (Exception e) {
159
+        }
160
+        return value;
161
+    }
162
+
163
+
164
+    public String loadPhdDate(String well_id) throws Exception {
165
+        URL restURL = new URL("http://11.72.128.71/api-service/tsdb/getLast?appKey=MOsEWVl8S0");
166
+
167
+        //此处的urlConnection对象实际上是根据URL的请求协议(此处是http) 生成的URLConnection类 的子类HttpURLConnection
168
+        HttpURLConnection conn = (HttpURLConnection) restURL.openConnection();
169
+
170
+        //请求方式
171
+        conn.setRequestMethod("POST");
172
+
173
+        //设置是否从httpUrlConnection读入,默认情况下是true; httpUrlConnection.setDoInput(true);
174
+        conn.setDoOutput(true);
175
+
176
+        //allowUserInteraction 如果为 true,则在允许用户交互(例如弹出一个验证对话框)的上下文中对此 URL 进行检查。
177
+        conn.setAllowUserInteraction(false);
178
+        DataOutputStream out = new DataOutputStream(conn
179
+                .getOutputStream());
180
+        String content = "id=" + URLEncoder.encode(well_id + "/upData/电流平衡率", "UTF-8");
86 181
         out.writeBytes(content);
87 182
         out.flush();
88 183
         out.close();
@@ -114,12 +209,32 @@ public class CamelRestConfiguration extends RouteBuilder {
114 209
             }
115 210
 
116 211
         } catch (Exception e) {
117
-            System.out.println(resultStr);
118 212
         }
119 213
 //        System.out.println(value);
120 214
         return value;
121 215
     }
122 216
 
217
+    @Resource(name = "centralbase")
218
+    DataSource baseDataSource;
219
+
220
+
221
+    public void insertData(ZDJG_WELL z) {
222
+        JdbcTemplate jdbcTemplate = new JdbcTemplate(baseDataSource);
223
+        String sql = "INSERT INTO centralbase.collection_time_error (well_id, prod_date,rut_time, stroke_length, stroke_frequency, susp_max_load, susp_min_load, sgt, error_log) " +
224
+                "VALUES ('" + z.getWell_common_name() + "','" + z.getProd_date() + "','" + z.getBz() + "','" + z.getStroke() + "','" + z.getFrequence() + "','" + z.getSxzh() + "','" + z.getXxzh() + "','" + z.getZd_init_sgt() + "','" + z.getErrorInfo() + "')  ON CONFLICT (well_id,prod_date)  DO NOTHING;";
225
+        jdbcTemplate.update(sql);
226
+    }
227
+
228
+    public void getAccessWell() {
229
+        JdbcTemplate jdbcTemplate = new JdbcTemplate(baseDataSource);
230
+        String sql = "SELECT well_id wellid FROM centralbase.sys_access_well_control where access_status = '1' and error_id = '1' ";
231
+        List<String> accessWells = jdbcTemplate.queryForList(sql, String.class);
232
+
233
+        accessWells.forEach(x -> {
234
+            accessWellMap.put(x, x);
235
+        });
236
+    }
237
+
123 238
 
124 239
     public ZDJG_WELL loadGtData(String param, String well_id, String sysDate) throws Exception {
125 240
 //        String date = getDate();
@@ -148,12 +263,12 @@ public class CamelRestConfiguration extends RouteBuilder {
148 263
         ps.close();
149 264
 
150 265
         InputStream is = null;
151
-        if (conn.getResponseCode() == 200){
266
+        if (conn.getResponseCode() == 200) {
152 267
             is = conn.getInputStream();
153
-        }else {
154
-            is= conn.getErrorStream();
268
+        } else {
269
+            is = conn.getErrorStream();
155 270
         }
156
-        BufferedReader bReader = new BufferedReader(new InputStreamReader(is,"UTF-8"));
271
+        BufferedReader bReader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
157 272
         String line, resultStr = "";
158 273
         while (null != (line = bReader.readLine())) {
159 274
             resultStr += line;
@@ -164,29 +279,29 @@ public class CamelRestConfiguration extends RouteBuilder {
164 279
         try {
165 280
             json = JSONObject.parseObject(resultStr);
166 281
 
167
-            if(json.size()>0){
282
+            if (json.size() > 0) {
168 283
                 String value = json.get("value").toString();
169 284
                 String time = json.get("time").toString();
170
-                time= time.replace("Z", " UTC"); //2019-06-27T16:00:00.000 UTC
285
+                time = time.replace("Z", " UTC"); //2019-06-27T16:00:00.000 UTC
171 286
                 SimpleDateFormat format1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS Z");//转换时区格式
172
-                Date cjDate = format1 .parse(time) ;
287
+                Date cjDate = format1.parse(time);
173 288
                 SimpleDateFormat fomat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
174 289
                 SimpleDateFormat fomat3 = new SimpleDateFormat("yyyy-MM-dd HH:mm");
175 290
                 String mydate = fomat2.format(cjDate);
176
-                JSONObject json2= JSONObject.parseObject(value);
291
+                JSONObject json2 = JSONObject.parseObject(value);
177 292
                 String bool = json2.getString("state").toString();
178
-                z.setIs_device_close(bool.equals("true")? "在线": "不在线");
293
+                z.setIs_device_close(bool.equals("true") ? "在线" : "不在线");
179 294
                 z.setIs_well_close("关井");
180
-                if (!bool.equals("false")){
295
+                if (!bool.equals("false")) {
181 296
                     String data = json2.get("data").toString();
182 297
                     JSONObject jsonObject = JSONObject.parseObject(data);
183 298
 //                    String offset = jsonObject.getString("offset");
184 299
                     String jh = well_id.replace(" ", "");
185 300
                     z.setWell_common_name(jh);
186
-                    String gtsj = TransUtil.tranSGT(jsonObject.getString("offset"),jsonObject.getString("load"));
301
+                    String gtsj = TransUtil.tranSGT(jsonObject.getString("offset"), jsonObject.getString("load"));
187 302
                     Date timeRtu = null;
188
-                    String  rtutime = jsonObject.getString("time");
189
-                    timeRtu = fomat3.parse("0000-00-00 00:".equals(rtutime)? "2021-04-10 10:10": rtutime);
303
+                    String rtutime = jsonObject.getString("time");
304
+                    timeRtu = fomat3.parse("0000-00-00 00:".equals(rtutime) ? "2021-04-10 10:10" : rtutime);
190 305
                     Date yunpingtai = fomat2.parse(mydate);
191 306
                     long time1 = timeRtu.getTime();
192 307
                     long time2 = yunpingtai.getTime();
@@ -195,37 +310,59 @@ public class CamelRestConfiguration extends RouteBuilder {
195 310
                     int hours1 = (int) ((time3 - time2) / (1000 * 60 * 60));
196 311
                     int hours = (int) ((time2 - time1) / (1000 * 60 * 60));
197 312
                     //云平台和采集时间相差要小于4小时
198
-                    if (hours <4 && !"".equals(gtsj) && hours1< 4 ){
313
+                    if (hours < 4 && !"".equals(gtsj) && hours1 < 4) {
314
+                        z.setZd_init_sgt(gtsj);//位移载荷点
315
+//
316
+                        if (jsonObject.getString("strokeFrequency") != null) {
317
+                            z.setStroke(CALUtil.round((Double.valueOf(jsonObject.getString("stroke"))) * 100) / 100);//冲程,但GTSB_2的冲程是CC字段
318
+                        }
319
+                        if (jsonObject.getString("stroke") != null) {
320
+                            z.setFrequence(CALUtil.round((Double.valueOf(jsonObject.getString("strokeFrequency"))) * 100) / 100);//冲次,但GTSB_2的冲次是PJXC字段
321
+                        }
322
+                        z.setSxzh(Double.parseDouble(jsonObject.getString("maxLoad")));//最大载荷
323
+                        z.setXxzh(Double.parseDouble(jsonObject.getString("minLoad")));//最小载荷
324
+                        z.setWell_common_name(well_id);
325
+                    } else {
326
+                        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");//设置日期格式
327
+                        String dateNow = df.format(new java.util.Date());
328
+
199 329
                         z.setZd_init_sgt(gtsj);//位移载荷点
200
-//                        z.setProd_date(mydate);
201
-//                        z.setSys_date(sysDate);
202
-//                        z.setIs_well_close(Double.valueOf(jsonObject.getString("minCurrent")) > 0 ? "开井": "关井");
203
-//                        z.setPressure(0);
204
-                        if (jsonObject.getString("strokeFrequency") != null){
205
-                            z.setStroke(CALUtil.round((Double.valueOf(jsonObject.getString("strokeFrequency"))) * 100) / 100);//冲程,但GTSB_2的冲程是CC字段
330
+//
331
+                        if (jsonObject.getString("strokeFrequency") != null) {
332
+                            z.setStroke(CALUtil.round((Double.valueOf(jsonObject.getString("stroke"))) * 100) / 100);//冲程,但GTSB_2的冲程是CC字段
206 333
                         }
207
-                        if (jsonObject.getString("stroke") != null){
208
-                            z.setFrequence(CALUtil.round((Double.valueOf(jsonObject.getString("stroke"))) * 100) / 100);//冲次,但GTSB_2的冲次是PJXC字段
334
+                        if (jsonObject.getString("stroke") != null) {
335
+                            z.setFrequence(CALUtil.round((Double.valueOf(jsonObject.getString("strokeFrequency"))) * 100) / 100);//冲次,但GTSB_2的冲次是PJXC字段
209 336
                         }
337
+                        z.setProd_date(dateNow);
338
+                        z.setBz(rtutime);
210 339
                         z.setSxzh(Double.parseDouble(jsonObject.getString("maxLoad")));//最大载荷
211 340
                         z.setXxzh(Double.parseDouble(jsonObject.getString("minLoad")));//最小载荷
212 341
                         z.setWell_common_name(well_id);
342
+
343
+                        z.setErrorInfo("采集时间错误");
344
+                        if (accessWellMap.get(z.getWell_common_name()) != null)
345
+                            insertData(z);
213 346
                     }
347
+
214 348
                 }
215 349
             }
216
-        }catch (Exception e){
350
+        } catch (Exception e) {
217 351
 //            System.out.println(resultStr);
218 352
         }
219 353
         bReader.close();
220 354
         is.close();
221 355
         return z;
222 356
     }
357
+
223 358
     @Override
224 359
     public synchronized void configure() throws Exception {
225 360
 
361
+
362
+        getAccessWell();
226 363
         JSONObject body = new JSONObject();
227 364
 
228
-          restConfiguration()
365
+        restConfiguration()
229 366
                 .component("servlet")
230 367
                 .host("11.72.128.71");
231 368
         from("timer:mytimer?period=3600000")
@@ -233,35 +370,36 @@ public class CamelRestConfiguration extends RouteBuilder {
233 370
                 .to("rest:post:/api-service/device/query?appKey=MOsEWVl8S0&type=simple&pageSize=8000&page=1")
234 371
                 .unmarshal().json(JsonLibrary.Jackson)
235 372
                 .split(body()).process(
236
-                exchange -> {
237
-                    Message in = exchange.getIn();
238
-                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
239
-                    in.setHeader("id",aRow.get("data"));
240
-                    in.setHeader("date", getDate());
241
-                })
242
-
373
+                        exchange -> {
374
+                            Message in = exchange.getIn();
375
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
376
+                            in.setHeader("id", aRow.get("data"));
377
+                            in.setHeader("date", getDate());
378
+                        })
243 379
                 .setBody(simple("${in.header.id}"))
380
+
244 381
                 .split(body()).process(
245
-                exchange -> {
246
-                    Message in = exchange.getIn();
247
-                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
248
-                    if (aRow.get("typeName").toString().indexOf("油井")> 0){
249
-                        Double phd = "".equals(loadPhdDate(aRow.get("id").toString())) ?  0 : Double.valueOf(loadPhdDate(aRow.get("id").toString()) );
250
-                        ZDJG_WELL zdjg_well = loadGtData(aRow.get("id")+"/analyse/功图数据", aRow.get("code").toString(), "2021-08-17");
251
-                        aRow.put("sgt",zdjg_well.getZd_init_sgt() );//null
252
-                        aRow.put("susp_max_load",zdjg_well.getSxzh());//最大载荷
253
-                        aRow.put("susp_min_load",zdjg_well.getXxzh());//最小载荷
254
-                        aRow.put("stroke",zdjg_well.getStroke());
255
-                        aRow.put("frequency",zdjg_well.getFrequence());
256
-                        aRow.put("dyna_create_time",zdjg_well.getProd_date());
257
-                        aRow.put("well_name",aRow.get("code"));
258
-                        aRow.put("phd",phd== null ? 0: phd);
259
-                    }
260
-                })
382
+                        exchange -> {
383
+                            Message in = exchange.getIn();
384
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
385
+                            if (aRow.get("typeName").toString().indexOf("油井") > 0) {
386
+                                getCurrentTimeParams(aRow.get("id").toString(), aRow.get("code").toString());
387
+                                Double phd = "".equals(loadPhdDate(aRow.get("id").toString())) ? 0 : Double.valueOf(loadPhdDate(aRow.get("id").toString()));
388
+                                ZDJG_WELL zdjg_well = loadGtData(aRow.get("id") + "/analyse/功图数据", aRow.get("code").toString(), "2021-08-17");
389
+                                aRow.put("sgt", zdjg_well.getZd_init_sgt());//null
390
+                                aRow.put("susp_max_load", zdjg_well.getSxzh());//最大载荷
391
+                                aRow.put("susp_min_load", zdjg_well.getXxzh());//最小载荷
392
+                                aRow.put("stroke", zdjg_well.getStroke());
393
+                                aRow.put("frequency", zdjg_well.getFrequence());
394
+                                aRow.put("dyna_create_time", zdjg_well.getProd_date());
395
+                                aRow.put("well_name", zdjg_well.getStroke() == 0 || zdjg_well.getSxzh() == 0 ? "No01" : aRow.get("code"));
396
+                                aRow.put("phd", phd == null ? 0 : phd);
397
+                            }
398
+                        })
261 399
 //                .log("${body[phd]}"+ "sql" + "${body[well_name]}")
262 400
                 .doTry()
263 401
                 .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt,elec_pump_current_b) " +
264
-                        "values( '${body[well_name]}','${in.header.date}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}','${body[phd]}')  " ))
402
+                        "values( '${body[well_name]}','${in.header.date}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}','${body[phd]}')  "))
265 403
 //                        "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${in.header.date}' )"))
266 404
                 .to("jdbc:centralbase")
267 405
                 .doCatch(Exception.class)