瀏覽代碼

新接入井信息及获取井方式的修改

lijian 3 年之前
父節點
當前提交
92f0905ec1

+ 12 - 5
src/main/java/com/gct/tools/etlcamelhuge/camelconfig/MyDataSourceConfiguration.java

@@ -36,9 +36,16 @@ public class MyDataSourceConfiguration {
36 36
         return DataSourceBuilder.create().build();
37 37
     }
38 38
 
39
-//    @Bean(name = "gtsj")
40
-//    @ConfigurationProperties(prefix = "spring.datasource.ds4")
41
-//    public DataSource dataSource4() {
42
-//        return DataSourceBuilder.create().build();
43
-//    }
39
+    @Bean(name = "aoid")
40
+    @ConfigurationProperties(prefix = "spring.datasource.ds4")
41
+    public DataSource dataSource4() {
42
+        return DataSourceBuilder.create().build();
43
+    }
44
+
45
+    @Bean(name = "oracle_A2")
46
+    @ConfigurationProperties(prefix = "spring.datasource.ds5")
47
+    public DataSource dataSource5() {
48
+        return DataSourceBuilder.create().build();
49
+    }
50
+
44 51
 }

+ 364 - 12
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -1,19 +1,31 @@
1 1
 package com.gct.tools.etlcamelhuge.routeconfig;
2 2
 
3
+import ch.qos.logback.core.db.dialect.DBUtil;
4
+import com.alibaba.fastjson.JSONArray;
5
+import com.alibaba.fastjson.JSONObject;
3 6
 import com.gct.common.util.SGTUtil;
4 7
 import com.gct.tools.etlcamelhuge.MQ.MessageBody;
5 8
 import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
6 9
 import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
10
+import com.gct.tools.etlcamelhuge.entity.ZDJG_WELL;
7 11
 import org.apache.camel.*;
8 12
 import org.apache.camel.builder.RouteBuilder;
9 13
 //import org.apache.rocketmq.common.message.Message;
14
+import org.apache.camel.model.ExpressionNode;
15
+import org.apache.camel.model.ProcessorDefinition;
16
+import org.apache.camel.model.RouteDefinition;
10 17
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
11 18
 import org.springframework.beans.factory.annotation.Autowired;
12 19
 import org.springframework.context.annotation.Bean;
13 20
 import org.springframework.context.annotation.Configuration;
21
+import org.springframework.util.StringUtils;
14 22
 
15 23
 import javax.annotation.Resource;
24
+import java.io.*;
16 25
 import java.math.BigDecimal;
26
+import java.net.HttpURLConnection;
27
+import java.net.URL;
28
+import java.net.URLEncoder;
17 29
 import java.text.DecimalFormat;
18 30
 import java.text.SimpleDateFormat;
19 31
 import java.time.LocalDateTime;
@@ -30,6 +42,137 @@ import java.util.*;
30 42
 @Configuration
31 43
 public class CamelJDBCConfiguration /*extends RouteBuilder */ {
32 44
 
45
+    public  ArrayList<ZDJG_WELL> loadKey() throws java.lang.Exception {
46
+        String url = "http://11.72.128.71/api-service/device/query?appKey=SR5vY4bED7";
47
+        URL restURL = new URL(url);
48
+        /*
49
+         * 此处的urlConnection对象实际上是根据URL的请求协议(此处是http)生成的URLConnection类 的子类HttpURLConnection
50
+         */
51
+        HttpURLConnection conn = (HttpURLConnection) restURL.openConnection();
52
+        //请求方式
53
+        conn.setRequestMethod("POST");
54
+        //设置是否从httpUrlConnection读入,默认情况下是true; httpUrlConnection.setDoInput(true);
55
+        conn.setDoOutput(true);
56
+        //allowUserInteraction 如果为 true,则在允许用户交互(例如弹出一个验证对话框)的上下文中对此 URL 进行检查。
57
+        conn.setAllowUserInteraction(false);
58
+//        conn.setRequestProperty("Content-type", "application/json");
59
+        conn.connect();
60
+
61
+        DataOutputStream out = new DataOutputStream(conn
62
+                .getOutputStream());
63
+        String content = "type" + "=simple";
64
+        content += "&pageSize=" + 5000 ;
65
+        content += "&page="+1;
66
+//        String encode = URLEncoder.encode(content, "UTF-8");
67
+        out.writeBytes(content);
68
+
69
+        out.flush();
70
+        out.close();
71
+
72
+        InputStream inputStream = conn.getInputStream();
73
+        InputStreamReader inputStreamReader = new InputStreamReader(inputStream,"UTF-8");
74
+        BufferedReader bReader = null;
75
+        bReader= new BufferedReader(inputStreamReader);
76
+
77
+        String line, resultStr = "";
78
+
79
+        while (null != (line = bReader.readLine())) {
80
+            resultStr += line;
81
+        }
82
+        bReader.close();
83
+
84
+        JSONObject json = JSONObject.parseObject(resultStr);
85
+        JSONArray data = json.getJSONArray("data");
86
+        ArrayList<ZDJG_WELL> list = new ArrayList<>();
87
+        long startTime=System.currentTimeMillis();
88
+        ZDJG_WELL zdjg_well = null;
89
+        for (int i = 0; i<data.size(); i++){
90
+//            r.delay(404);
91
+            String id = data.getJSONObject(i).get("id").toString();
92
+            String well_id = data.getJSONObject(i).get("code").toString();
93
+            String param = id+"/upData/计量间计量日产液";
94
+            String s = loadDate(param, well_id);
95
+//            System.out.println(i);
96
+            zdjg_well = new ZDJG_WELL();
97
+            if (!s.equals("")){
98
+                zdjg_well.setJljcyl(Double.parseDouble(s));
99
+                zdjg_well.setWell_id(well_id);
100
+                list.add(zdjg_well);
101
+            }
102
+
103
+        }
104
+        long endTime=System.currentTimeMillis();
105
+        System.out.println(endTime-startTime);
106
+        return list;
107
+    }
108
+
109
+    public  String loadDate(String param,String wellId) throws java.lang.Exception {
110
+        String date = getDate();
111
+        Calendar ca = Calendar.getInstance();//得到一个Calendar的实例
112
+        ca.setTime(new Date()); //设置时间为当前时间
113
+        ca.add(Calendar.DATE, -1);
114
+        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
115
+        String date1 =  sf.format(ca.getTime());
116
+
117
+        URL restURL = new URL("http://11.72.128.71/api-service/tsdb/getSeries?appKey=SR5vY4bED7");
118
+
119
+        //此处的urlConnection对象实际上是根据URL的请求协议(此处是http) 生成的URLConnection类 的子类HttpURLConnection
120
+        HttpURLConnection conn = (HttpURLConnection) restURL.openConnection();
121
+
122
+        //请求方式
123
+        conn.setRequestMethod("POST");
124
+
125
+        //设置是否从httpUrlConnection读入,默认情况下是true; httpUrlConnection.setDoInput(true);
126
+        conn.setDoOutput(true);
127
+
128
+        //allowUserInteraction 如果为 true,则在允许用户交互(例如弹出一个验证对话框)的上下文中对此 URL 进行检查。
129
+        conn.setAllowUserInteraction(false);
130
+        DataOutputStream out = new DataOutputStream(conn
131
+                .getOutputStream());
132
+
133
+        String content = "id=" +   URLEncoder.encode(param, "UTF-8");
134
+        content += "&type=number";
135
+        content += "&startTime=" +  URLEncoder.encode(date1+"T00:00:00+0800" , "UTF-8");
136
+
137
+        content += "&endTime=" + URLEncoder.encode(date+"T00:00:00+0800", "UTF-8");
138
+        String encode = URLEncoder.encode(content, "UTF-8");
139
+        out.writeBytes(content);
140
+        out.flush();
141
+        out.close();
142
+        PrintStream ps = new PrintStream(conn.getOutputStream());
143
+
144
+        ps.close();
145
+
146
+        InputStream is = null;
147
+        if (conn.getResponseCode() == 200){
148
+            is = conn.getInputStream();
149
+        }else {
150
+            is= conn.getErrorStream();
151
+        }
152
+        BufferedReader bReader = new BufferedReader(new InputStreamReader(is));
153
+        List<String> list = new ArrayList<>();
154
+
155
+        String line, resultStr = "";
156
+        while (null != (line = bReader.readLine())) {
157
+            resultStr += line;
158
+        }
159
+
160
+        JSONObject json = null;
161
+        String data = "";
162
+        try {
163
+            json = JSONObject.parseObject(resultStr);
164
+            if (json != null) {
165
+                JSONArray data1 = json.getJSONArray("data");
166
+                data = data1.getJSONObject(0).getString("value");
167
+            }
168
+        }catch (java.lang.Exception e){
169
+//            System.out.println(resultStr);
170
+        }
171
+        bReader.close();
172
+        is.close();
173
+        return data;
174
+    }
175
+
33 176
     public Double min(String[] strings){
34 177
         double[] doubles = new double[strings.length];
35 178
         for (int i = 0; i < strings.length; i++) {
@@ -63,12 +206,190 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
63 206
     public RouteBuilder routeBuilderWithOracle1() {
64 207
         return new RouteBuilder() {
65 208
             private SortedSet<String> organization;
66
-            private Map<String, Integer> orgIDs;
67
-            private Integer orgID;
209
+            private Map<String, Integer>  orgIDs = new HashMap<>();
210
+            private int orgID;
211
+            private Map<Object, Object> orgIdPreList;
212
+            private Integer org;
213
+
214
+            public ProcessorDefinition<ExpressionNode> setMyBody(RouteDefinition route){
215
+                return  route.setBody(simple("select well_id from centralbase.sys_access_well_control  where  access_status='1'  "))
216
+                        .to("jdbc:centralbase")
217
+                        .split(body()).process(exchange -> {
218
+                            HashMap body = exchange.getIn().getBody(HashMap.class);
219
+                            exchange.getIn().setHeader("well_id",body.get("well_id"));
220
+                        });
221
+            }
68 222
 
69 223
             //全部执行完成的大概时间在30-40分钟
70 224
             @Override
71 225
             public void configure() throws Exception {
226
+                RouteDefinition OrgAndWellSource= (RouteDefinition) from("timer:insert-OrgAndWellSource?period=86400000")
227
+                        .routeId("insert-OrgAndWellSource")
228
+                        .setHeader("date", constant(getDate()))
229
+                        .process(exchange -> {
230
+                            org = 0;
231
+                            orgIdPreList = new HashMap<>();
232
+                        })
233
+                        .setBody(simple("select  max(cast(org_id as int))  from centralbase.cb_pc_organization"))
234
+                        .to("jdbc:centralbase")
235
+                        .process(exchange -> {
236
+                            HashMap body = exchange.getIn().getBody(HashMap.class);
237
+                            if (body == null || StringUtils.isEmpty(body.get("max"))) org = 0;
238
+                            else
239
+                                org = Integer.valueOf(body.get("max").toString());
240
+                        })
241
+                        .setBody(simple("select org_id_pre,org_id from centralbase.cb_pc_organization"))
242
+                        .to("jdbc:centralbase")
243
+                        .split(body()).process(exchange -> {
244
+                            HashMap body = exchange.getIn().getBody(HashMap.class);
245
+                            orgIdPreList.put(body.get("org_id_pre"),body.get("org_id"));
246
+                            orgIDs.put(body.get("org_id_pre").toString(),Integer.valueOf(body.get("org_id").toString()));
247
+                        }).end();
248
+                setMyBody(OrgAndWellSource)
249
+
250
+                        .setBody(simple("select distinct a.well_common_name jh, a.org_name_cj zyq, a.GROUP_UNIT_NAME cw, a.station_name zd, a.project_name qk from V_CD_WELL_SOURCE_YC a where a.well_common_name ='${header.well_id}' "))
251
+//                        .setBody(simple("select  distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from DBA01 where jh ='${header.well_id}' "))
252
+                        .to("jdbc:oracle_A2")
253
+                        .transform()
254
+                        .body((result) -> {
255
+                            organization = new TreeSet<>();
256
+                            return result;
257
+                        })
258
+                        .step("1")
259
+                        .split(body()).process(exchange -> {
260
+                    Message in = exchange.getIn();
261
+                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
262
+                    String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("QK") + "@" + aRow.get("ZD");
263
+                    String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("QK");
264
+                    String org_level1 = aRow.get("ZYQ").toString();
265
+                    aRow.put("station_id", org_level3);
266
+                    orgID = org;
267
+                    if ((!orgIdPreList.containsKey(org_level1)) || (!orgIdPreList.containsKey(org_level2)) || (!orgIdPreList.containsKey(org_level3))) {
268
+                        if (organization.add(org_level1)) {
269
+                            if (!orgIDs.containsKey(org_level1)) orgIDs.put(org_level1,++orgID);
270
+                        }
271
+                        if (organization.add(org_level2)) {
272
+                            if (!orgIDs.containsKey(org_level2)) orgIDs.put(org_level2,++orgID);
273
+                        }
274
+                        if (organization.add(org_level3)) {
275
+                            if (!orgIDs.containsKey(org_level3)) orgIDs.put(org_level3,++orgID);
276
+                        }
277
+                    }
278
+                    if(orgIdPreList.get(org_level3) !=null){
279
+                        aRow.put("org_id",orgIdPreList.get(org_level3));
280
+                        return;
281
+                    }
282
+                    if(orgIDs.get(org_level3) !=null){
283
+                        aRow.put("org_id",orgIDs.get(org_level3));
284
+                    }
285
+                })                                                                             //                                         完井层位ID 井站名称|站库名称 生产层段名称    生产区块名称
286
+                        .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,org_id,station_id,station_name,completion_name,PRODUCING_AREA_name) " +
287
+                                "select '${body[JH]}','${body[JH]}','${body[org_id]}','${body[station_id]}','${body[ZD]}','${body[CW]}','${body[QK]}' " +
288
+                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_cd_well_source WHERE well_id = '${body[JH]}' )"))
289
+                        .to("jdbc:centralbase")
290
+                        .end()
291
+                        .transform().body((re) -> {
292
+                    List<Map<String, Object>> rows = new ArrayList<>();
293
+                    for (String s : organization) {
294
+                        Map<String, Object> row = new HashMap<>();
295
+                        String[] orgs = s.split("@");
296
+                        row.put("org_id_pre", s);
297
+                        switch (orgs.length) {
298
+                            case 1:
299
+                                row.put("org_name", orgs[0]);
300
+                                row.put("org_level", 1);
301
+                                row.put("org_parent", "0");
302
+                                break;
303
+                            case 2:
304
+                                row.put("org_name", orgs[1]);
305
+                                row.put("org_level", 2);
306
+                                row.put("org_parent", orgIDs.get(orgs[0]).toString());
307
+                                break;
308
+                            case 3:
309
+                                row.put("org_name", orgs[2]);
310
+                                row.put("org_level", 3);
311
+                                row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString());
312
+                                break;
313
+                        }
314
+                        if (!orgIdPreList.containsKey(s)) {
315
+                            org++;
316
+                            row.put("org_code", org);
317
+                            row.put("org_id", "" + org);
318
+                            orgIdPreList.put(s, row.get("org_id"));
319
+                            rows.add(row);
320
+                        }
321
+                    }
322
+                    return rows;
323
+                }).split(body())
324
+                        .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
325
+                                "select '${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}' " +
326
+                                "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_organization WHERE org_id = '${body[org_id]}')"))
327
+                        .doTry()
328
+                        .to("jdbc:centralbase")
329
+                        .doCatch(Exception.class)
330
+                        .log("${header.date}"+" routeId:insert-OrgAndWellSource->  centralbase.cb_pc_organization insert data failed")
331
+                        .end();
332
+
333
+
334
+
335
+                from("timer:update-wellControl?period=3600000")
336
+                        .routeId("update-wellControl")
337
+                        .setBody(simple("select scc.well_id,wo.well_common_name,op.org_id from centralbase.sys_access_well_control scc\n" +
338
+                                "left join centralbase.cb_cd_well_source wo on scc.well_id = wo.well_id\n" +
339
+                                "left join centralbase.cb_pc_organization op on wo.org_id = op.org_id\n" +
340
+                                "where scc.access_status ='1' "))
341
+                        .to("jdbc:centralbase")
342
+                        .split(body())
343
+                        .split(body()).process(exchange -> {
344
+                    HashMap<String, Object> body = exchange.getIn().getBody(HashMap.class);
345
+                    String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
346
+                    if (body.get("well_common_name") == null || body.get("well_common_name").equals("")) {
347
+                        body.put("remarks","暂无井信息");
348
+                        body.put("error_id",0);
349
+                        body.put("updateTime",format);
350
+                    }else if (body.get("org_id") == null || body.get("org_id").equals("")){
351
+                        body.put("remarks","暂无机构信息");
352
+                        body.put("error_id",0);
353
+                        body.put("updateTime",format);
354
+                    }else {
355
+                        body.put("remarks","");
356
+                        body.put("error_id",1);
357
+                        body.put("updateTime",format);
358
+                    }
359
+                })
360
+                        .setBody(simple("update centralbase.sys_access_well_control set well_common_name='${body[well_common_name]}',org_id='${body[org_id]}',update_time = '${body[updateTime]}'::timestamp, remarks ='${body[remarks]}' ,error_id ='${body[error_id]}' where well_id ='${body[well_id]}' "))
361
+                        .to("jdbc:centralbase")
362
+                        .end();
363
+
364
+
365
+                from("timer:selectAllWellName?period=3600000")
366
+                        .routeId("selectAllWellName")
367
+                        .setBody(simple("SELECT DISTINCT\n" +
368
+                                "\tr.well_id WELLID \n" +
369
+                                "FROM\n" +
370
+                                "\tcentralbase.cb_temp_well_mech_runtime r\n" +
371
+                                "\tLEFT JOIN centralbase.sys_access_well_control s ON r.well_id = s.well_id\n" +
372
+                                "WHERE\n" +
373
+                                "\tr.prod_date > '2021-10-01' \n" +
374
+                                "\tAND stroke_length > 0\n" +
375
+                                "\tand s.well_id is null"))
376
+                        .to("jdbc:centralbase")
377
+                        .split(body()).process(exchange -> {
378
+                            Message in = exchange.getIn();
379
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
380
+                            in.setHeader("date", getDate()+ " 00:00:00");
381
+                            in.setHeader("jh",aRow.get("wellid"));
382
+                })
383
+                        .log("${header.jh}")
384
+                        .doTry()
385
+                        .setBody(simple("insert into centralbase.sys_access_well_control(well_id,well_common_name,access_status,access_status_msg,access_date,error_id)" +
386
+                                " select '${header.jh}','${header.jh}','1','已接入','${header.date}','1' " +
387
+                                " where NOT EXISTS ( SELECT * FROM centralbase.sys_access_well_control WHERE well_id = '${header.jh}')"))
388
+                        .to("jdbc:centralbase")
389
+                       .doCatch(Exception.class)
390
+                        .log("${header.date}"+" routeId:selectAllWellName->  centralbase.sys_access_well_control update date faild")
391
+                        .end();
392
+
72 393
 
73 394
 
74 395
                 //   A2数据
@@ -80,7 +401,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
80 401
                             Message in = exchange.getIn();
81 402
                             in.setHeader("date", getDate());
82 403
                 })
83
-                        .setBody(simple("SELECT DISTINCT well_id wellid FROM centralbase.cb_temp_well_mech_runtime where prod_date > '${header.date}' "))
404
+                        .setBody(simple("SELECT well_id wellid FROM centralbase.sys_access_well_control where access_status = '1' and error_id = '1' "))
84 405
                         .to("jdbc:centralbase")
85 406
                         .split(body())
86 407
                         .setBody(simple("select v.WELL_COMMON_NAME jh,v.PROD_DATE rq,v.OIL_PRODUCTION_METHOD cyfs,v.PUMP_DIAMETER yz,v.BACK_PRES hysx,v.MAX_TUBING_PRES yysx,v.MAX_CASING_PRES tysx,v.PUMP_DEPTH bs from V_PC_PRO_COM_DAILY_CYYC_A2 v  where  v.PROD_DATE = to_date('${header.date}','yyyy-MM-dd')  and  v.WELL_COMMON_NAME = '${body[wellid]}' "))
@@ -117,7 +438,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
117 438
                             in.setHeader("date", getDate());
118 439
                 })
119 440
                         .log("mytimer3b")
120
-                        .setBody(simple("SELECT DISTINCT well_id wellid FROM centralbase.cb_temp_well_mech_runtime where prod_date > '${header.date}' "))
441
+                        .setBody(simple("SELECT well_id wellid FROM centralbase.sys_access_well_control where access_status = '1' and error_id = '1' "))
121 442
                         .to("jdbc:centralbase")
122 443
                         .split(body())
123 444
                         .setBody(simple("select  WELL_COMMON_NAME jh, PROD_DATE rq,PROD_TIME scsj, LIQ_PROD_DAILY rcyl1,OIL_PROD_DAILY rcyl,GAS_PROD_DAILY rcql,WATER_CUT hs,REMARKS bz from V_PC_PRO_COM_DAILY_CYYC_A2 where prod_date = to_date('${header.date}','yyyy-MM-dd') and WELL_COMMON_NAME = '${body[wellid]}'"))
@@ -165,7 +486,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
165 486
                         aRow.put("YMD",0.85);
166 487
                     }
167 488
                 })
168
-                        .log("mytimer3a")
489
+                        .log("mytimer3")
169 490
                         .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks,gas_oil_ratio,water_prod_daily,water_gas_ratio,surface_crude_water_density,surface_crude_oil_density) " +
170 491
                                 "select '${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}','${body[QYB]}','${body[RCSL]}','${body[SQB]}','${body[SMD]}','${body[YMD]}' " +
171 492
                                 "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
@@ -192,7 +513,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
192 513
                         Message in = exchange.getIn();
193 514
                         in.setHeader("date", getDate());
194 515
             })
195
-                    .setBody(simple("SELECT DISTINCT well_id jh FROM centralbase.cb_temp_well_mech_runtime where prod_date > '${header.date}' "))
516
+                    .setBody(simple("SELECT well_id jh FROM centralbase.sys_access_well_control where access_status = '1' and error_id = '1'"))
196 517
                     .to("jdbc:centralbase")
197 518
                     .split(body()).process(exchange -> {
198 519
                         Message in = exchange.getIn();
@@ -217,7 +538,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
217 538
                         if (aRow.get("DL") == null) aRow.put("DL", "0.0");
218 539
                         if (aRow.get("DYM") == null) aRow.put("DYM", "0.0");
219 540
                 })
220
-                        .log("mytimer4")
541
+//                        .log("mytimer4")
221 542
                         .doTry()
222 543
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency,elec_pump_current_b) " +
223 544
                                 "select '${body[JH]}','${header.date}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}','${body[DYM]}' " +
@@ -245,6 +566,36 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
245 566
                         .log("${header.date}"+" routeId:oracle-5->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
246 567
                         .end();
247 568
 
569
+                //计量间产液量
570
+                from("timer:mytimer6?period=3600000")
571
+                        .routeId("oracle-6")
572
+                        .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
573
+                        .to("jdbc:centralbase")
574
+                        .split(body()).process(exchange -> {
575
+                            Message in = exchange.getIn();
576
+                            in.setHeader("date", getDate()+ " 00:00:00");
577
+                            in.setHeader("data",loadKey());
578
+                })
579
+                        .setBody(simple("${in.header.data}"))
580
+                        .split(body()).process(exchange -> {
581
+                            Message in = exchange.getIn();
582
+                            ZDJG_WELL body = in.getBody(ZDJG_WELL.class);
583
+                            in.setHeader("jljcyl",body.getJljcyl());
584
+                            in.setHeader("wellname",body.getWell_id());
585
+
586
+                })
587
+//                        .log("${in.header.date}")
588
+//                        .log("${in.header.jljcyl}")
589
+//                        .log("${in.header.wellname}")
590
+                        .doTry()
591
+                        .setBody(simple("insert into centralbase.cb_pc_pro_well_meter_team(well_id,start_prod_date,test_liqu_vol_daily) " +
592
+                                " select '${in.header.wellname}','${in.header.date}','${in.header.jljcyl}'" +
593
+                                " where NOT EXISTS(select * from centralbase.cb_pc_pro_well_meter_team  where well_id = '${in.header.wellname}' and start_prod_date = '${in.header.date}') "))
594
+                        .to("jdbc:centralbase")
595
+                        .doCatch(Exception.class)
596
+                        .log("${header.date}"+" routeId:oracle-6->  aoid.aoid_singlegt_yield update date faild")
597
+                        .end();
598
+
248 599
                 from("timer:mytimer7?period=3600000")
249 600
                         .routeId("oracle-7")
250 601
                         .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
@@ -279,10 +630,10 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
279 630
                         .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
280 631
                         .to("jdbc:centralbase")
281 632
                         .split(body()).process(exchange -> {
282
-                    Message in = exchange.getIn();
283
-                        in.setHeader("date", getDate());
633
+                             Message in = exchange.getIn();
634
+                             in.setHeader("date", getDate());
284 635
                 })
285
-                        .setBody(simple("select wellname , jbsj , jbyy, jbzq  from JBSJ where SYDATE > date'2021-09-22'"))
636
+                        .setBody(simple("select wellname , jbsj , jbyy, jbzq  from JBSJ where SYDATE >= date'${header.date}'"))
286 637
                         .to("jdbc:oracle")
287 638
                         .split(body()).process(exchange -> {
288 639
                             Message in = exchange.getIn();
@@ -308,7 +659,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
308 659
                             Message in = exchange.getIn();
309 660
                             in.setHeader("date", getDate());
310 661
                 })
311
-                        .setBody(simple("select well_common_name,QK,CW,A_RCYOU,A_RCYE,RCYOU,RCYE,HS,LRQ,CZ_YOU,CZ_YE,CMD,SCXJRQ,YCBZ,UP_DATE from V_Cyyc_YcjData where UP_DATE > to_date('2021-08-01','yyyy-MM-dd')"))
662
+                        .setBody(simple("select well_common_name,QK,CW,A_RCYOU,A_RCYE,RCYOU,RCYE,HS,LRQ,CZ_YOU,CZ_YE,CMD,SCXJRQ,YCBZ,UP_DATE from V_Cyyc_YcjData where UP_DATE >= to_date('${header.date}','yyyy-MM-dd')"))
312 663
                         .to("jdbc:oracle")
313 664
                         .split(body()).process(exchange -> {
314 665
                             Message in = exchange.getIn();
@@ -364,7 +715,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
364 715
 
365 716
                 from("timer:mytimer11?period=3600000")
366 717
                         .routeId("centralbase-1")
367
-                        .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) and ti.stroke_length > 0 "))
718
+                        .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select  prod_date  from centralbase.cb_temp_well_mech_runtime GROUP BY prod_date  ORDER BY prod_date desc LIMIT  1 OFFSET 1) and ti.stroke_length > 0 "))
368 719
                         .to("jdbc:centralbase")
369 720
                         .split(body())
370 721
                         .doTry()
@@ -668,4 +1019,5 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
668 1019
             }
669 1020
         };
670 1021
     }
1022
+
671 1023
 }

+ 5 - 5
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelRestConfiguration.java

@@ -174,7 +174,7 @@ public class CamelRestConfiguration extends RouteBuilder {
174 174
 
175 175
         JSONObject body = new JSONObject();
176 176
 
177
-          restConfiguration()
177
+          /*restConfiguration()
178 178
                 .component("servlet")
179 179
                 .host("11.72.128.71");
180 180
         from("timer:mytimer?period=3600000")
@@ -191,7 +191,7 @@ public class CamelRestConfiguration extends RouteBuilder {
191 191
 //                .setBody(simple("${header[data]}"))
192 192
 //                .setBody(simple(body.toJSONString()))
193 193
                 .setBody(simple("${in.header.id}"))
194
-                .log("${header.id}")
194
+//                .log("${header.id}")
195 195
                 .split(body()).process(
196 196
                 exchange -> {
197 197
                     Message in = exchange.getIn();
@@ -212,11 +212,11 @@ public class CamelRestConfiguration extends RouteBuilder {
212 212
                 .log("${in.header.date}")
213 213
                 .doTry()
214 214
                 .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
215
-                        "select '${body[well_name]}','${in.header.date}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
216
-                        "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${in.header.date}' )"))
215
+                        "values( '${body[well_name]}','${in.header.date}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}') " ))
216
+//                        "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${in.header.date}' )"))
217 217
                 .to("jdbc:centralbase")
218 218
                 .doCatch(Exception.class)
219
-                .end();
219
+                .end();*/
220 220
 
221 221
       /* JSONArray jsonArray = new JSONArray();
222 222
         JSONObject search = new JSONObject();

+ 50 - 3
src/main/resources/application.yml

@@ -53,9 +53,9 @@ spring:
53 53
     ds3:
54 54
       ## Hikari连接池的设置 Hikari 时间单位都是毫秒
55 55
       type: com.zaxxer.hikari.HikariDataSource
56
-      ##jdbc-url: jdbc:oracle:thin:@10.72.48.8:1521:ora920
57
-      ##username: cyyc_a2dms
58
-      ##password: cyyca2dms2018#
56
+#      jdbc-url: jdbc:oracle:thin:@10.72.48.8:1521:ora920
57
+      ##      username: cyyc_a2dms
58
+      ##      password: cyyca2dms2018#
59 59
       jdbc-url: jdbc:oracle:thin:@11.72.128.73:1521:orcl
60 60
       username: dhzdly
61 61
       password: gctgct1QAZ2Wsx
@@ -75,6 +75,53 @@ spring:
75 75
         max-lifetime: 1800000
76 76
         ## 数据库连接超时时间,默认30秒,即30000
77 77
         connection-timeout: 30000
78
+    ds5:
79
+      ## Hikari连接池的设置 Hikari 时间单位都是毫秒
80
+      type: com.zaxxer.hikari.HikariDataSource
81
+      jdbc-url: jdbc:oracle:thin:@10.72.48.8:1521:ora920
82
+      username: cyyc_a2dms
83
+      password: cyyca2dms2018#
84
+#      jdbc-url: jdbc:oracle:thin:@11.72.128.73:1521:orcl
85
+#      username: dhzdly
86
+#      password: gctgct1QAZ2Wsx
87
+      driver-class-name: oracle.jdbc.OracleDriver
88
+      hikari:
89
+        ## 连接池名字
90
+        pool-name: SystemHikariCP
91
+        ## 最小空闲连接数量
92
+        minimum-idle: 5
93
+        ## 空闲连接存活最大时间,默认600000(10分钟)
94
+        idle-timeout: 600000
95
+        ## 连接池最大连接数,默认是10
96
+        maximum-pool-size: 150
97
+        ## 此属性控制从池返回的连接的默认自动提交行为,默认值:true
98
+        auto-commit: true
99
+        ## 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
100
+        max-lifetime: 1800000
101
+        ## 数据库连接超时时间,默认30秒,即30000
102
+        connection-timeout: 30000
103
+    ds4:
104
+      ## Hikari连接池的设置 Hikari 时间单位都是毫秒
105
+      type: com.zaxxer.hikari.HikariDataSource
106
+      jdbc-url: jdbc:postgresql://11.72.128.68:54321/calliquid
107
+      username: root
108
+      password: 123456
109
+      driver-class-name: org.postgresql.Driver
110
+      hikari:
111
+        ## 连接池名字
112
+        pool-name: SystemHikariCP
113
+        ## 最小空闲连接数量
114
+        minimum-idle: 5
115
+        ## 空闲连接存活最大时间,默认600000(10分钟)
116
+        idle-timeout: 600000
117
+        ## 连接池最大连接数,默认是10
118
+        maximum-pool-size: 150
119
+        ## 此属性控制从池返回的连接的默认自动提交行为,默认值:true
120
+        auto-commit: true
121
+        ## 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
122
+        max-lifetime: 1800000
123
+        ## 数据库连接超时时间,默认30秒,即30000
124
+        connection-timeout: 30000
78 125
 ##    ds4:
79 126
 ##      ## Hikari连接池的设置 Hikari 时间单位都是毫秒
80 127
 ##      type: com.zaxxer.hikari.HikariDataSource