lijian 3 vuotta sitten
vanhempi
commit
0ce1ab57a2

+ 5 - 5
src/main/java/com/gct/tools/etlcamelhuge/camelconfig/MyDataSourceConfiguration.java

@@ -36,9 +36,9 @@ public class MyDataSourceConfiguration {
36 36
         return DataSourceBuilder.create().build();
37 37
     }
38 38
 
39
-    @Bean(name = "gtsj")
40
-    @ConfigurationProperties(prefix = "spring.datasource.ds4")
41
-    public DataSource dataSource4() {
42
-        return DataSourceBuilder.create().build();
43
-    }
39
+//    @Bean(name = "gtsj")
40
+//    @ConfigurationProperties(prefix = "spring.datasource.ds4")
41
+//    public DataSource dataSource4() {
42
+//        return DataSourceBuilder.create().build();
43
+//    }
44 44
 }

+ 387 - 248
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -64,102 +64,298 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
64 64
             public void configure() throws Exception {
65 65
                 //24小时执行一次
66 66
                 //单个执行时间30s左右,在之前有数据的情况下
67
-                from("timer:mytimer1?period=604800000")
68
-                        .routeId("oracle-1")
69
-                        .setHeader("date", constant(getDate()))
70
-                        .setBody(simple("select  distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
71
-                        .to("jdbc:oracle")
72
-                        .transform()
73
-                        .body((result) -> {
74
-                            organization = new TreeSet<>();
75
-                            orgID = 0;
76
-                            orgIDs = new HashMap<>();
77
-                            return result;
78
-                        })
79
-                        .step("1")
80
-                        .split(body()).process(exchange -> {
81
-                    Message in = exchange.getIn();
82
-                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
83
-                    String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK");
84
-                    String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC");
85
-                    String org_level1 = aRow.get("ZYQ").toString();
86
-                    aRow.put("station_id", org_level3);
87
-                    //这里是重新生成的orgid,最好先查一下centralbase里已有的
88
-                    //默认一张新表
89
-                    if (organization.add(org_level1)) {
90
-                        orgID++;
91
-                        orgIDs.put(org_level1, orgID);
92
-                    }
93
-                    if (organization.add(org_level2)) {
94
-                        orgID++;
95
-                        orgIDs.put(org_level2, orgID);
96
-                    }
97
-                    if (organization.add(org_level3)) {
98
-                        orgID++;
99
-                        orgIDs.put(org_level3, orgID);
100
-                    }
101
-                })
102
-                        .doTry()
103
-                        .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
104
-                                "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}')" +
105
-                                " ON conflict(well_id) DO UPDATE set remarks = '${body[BZ]}' "))
106
-                        .to("jdbc:centralbase")
107
-                        .doCatch(Exception.class)
108
-                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_cd_well_source 导入数据失败")
109
-                        .end()
110
-                        .transform().body((re) -> {
111
-                    List<Map<String, Object>> rows = new ArrayList<>();
112
-                    int code = 0;
113
-                    for (String s : organization) {
114
-                        code++;// code is same as org_id
115
-                        String[] orgs = s.split("@");
116
-                        Map<String, Object> row = new HashMap<>();
117
-                        row.put("org_id_pre", s);
118
-                        row.put("org_code", code);
119
-                        row.put("org_id", "" + code);
120
-                        switch (orgs.length) {
121
-                            case 1:
122
-                                row.put("org_name", orgs[0]);
123
-                                row.put("org_level", 1);
124
-                                row.put("org_parent", "0");
125
-                                break;
126
-                            case 2:
127
-                                row.put("org_name", orgs[1]);
128
-                                row.put("org_level", 2);
129
-                                row.put("org_parent", orgIDs.get(orgs[0]).toString());
130
-                                break;
131
-                            case 3:
132
-                                row.put("org_name", orgs[2]);
133
-                                row.put("org_level", 3);
134
-                                row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString());
135
-                                break;
136
-                        }
137
-                        rows.add(row);
138
-                    }
139
-                    return rows;
140
-                }).split(body())
141
-                        .doTry()
142
-                        .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
143
-                                "values('${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}')" +
144
-                                "ON conflict(org_id_pre) DO UPDATE set org_code = '${body[org_code]}' "))
145
-                        .to("jdbc:centralbase")
146
-                        .doCatch(Exception.class)
147
-                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization insert date faild")
148
-                        .end()
149
-                        .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
150
-                        .to("jdbc:centralbase")
151
-                        .split(body())
152
-                        .doTry()
153
-                        .setBody(simple("update centralbase.cb_cd_well_source set org_id = '${body[org_id]}' where station_name = '${body[org_name]}'"))
154
-                        .to("jdbc:centralbase")
155
-                        .doCatch(Exception.class)
156
-                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization update date faild")
157
-                        .end();
158
-                //单独执行时间10s
67
+
68
+//                //修改org_id
69
+//                from("timer:mytimer1?period=604800000")
70
+//                        .routeId("oracle-1")
71
+//                        .setHeader("date", constant(getDate()))
72
+//                        .setBody(simple("select org_id,org_id_pre from centralbase.cb_pc_organization where org_level = '3' "))
73
+//                        .to("jdbc:centralbase")
74
+//                        .split(body())
75
+//                        .doTry()
76
+//                        .setBody(simple("update centralbase.cb_cd_well_source set org_id = '${body[org_id]}' where station_id = '${body[org_id_pre]}'"))
77
+//                        .to("jdbc:centralbase")
78
+//                        .doCatch(Exception.class)
79
+//                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization update date faild")
80
+//                        .end();
81
+
82
+//                        //level3   ------>  用来导入层级
83
+//                        .setBody(simple("select distinct a.org_name_cj zyq,  a.project_name qk , a.station_name zd  from V_CD_WELL_SOURCE_YC a "))
84
+//                        .to("jdbc:oracle")
85
+//                        .split(body()).process(exchange -> {
86
+//                            Message in = exchange.getIn();
87
+//                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
88
+//                            String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("QK")+ "@" + aRow.get("ZD");
89
+//                            String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("QK");
90
+//                            String orgname = aRow.get("ZD").toString();
91
+//                            String zyq = aRow.get("ZYQ").toString();
92
+//                            aRow.put("station_id", org_level2);
93
+//                            aRow.put("org_name",orgname);
94
+//                            aRow.put("zyq",zyq);
95
+//                            aRow.put("lev3",org_level3);
96
+//                        })
97
+//
98
+//                        .log("${body[ZYQ]}")
99
+//                        .doTry()
100
+//                        .setBody(simple("insert into centralbase.cb_pc_organization(org_name,org_level,parent_id,org_id_pre)" +
101
+//                                "values('${body[org_name]}','3',(select org_id from centralbase.cb_pc_organization where org_id_pre = '${body[lev3]}'),'${body[station_id]}') "))
102
+//                        .to("jdbc:centralbase")
103
+//                        .doCatch(Exception.class)
104
+//                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization insert date faild")
105
+//                        .end();
106
+
107
+//                            level2
108
+//                        .setBody(simple("select org_id_pre from centralbase.cb_pc_organization where org_id_pre like '%@%'"))
109
+//                        .to("jdbc:centralbase")
110
+////                        .doCatch(Exception.class)
111
+//                        .split(body())
112
+//                        .log("${body[org_id_pre]}")
113
+//                        .doTry()
114
+//                        .setBody(simple("update centralbase.cb_pc_organization set parent_id = (select org_id from centralbase.cb_pc_organization where org_id_pre = (select split_part(org_id_pre, '@', 1) from centralbase.cb_pc_organization  where org_id_pre = '${body[org_id_pre]}') ) where org_id_pre = '${body[org_id_pre]}' "))
115
+//                        .to("jdbc:centralbase")
116
+//                        .doCatch(Exception.class)
117
+//                        .log("${header.date}"+" routeId:oracle-2->  centralbase.cb_pc_organization insert date faild")
118
+//                        .end();
119
+                            //  ,  a.project_name qk , a.station_name zd
120
+//                        .setBody(simple("select distinct a.org_name_cj zyq ,  a.project_name qk from V_CD_WELL_SOURCE_YC a "))
121
+//                        .to("jdbc:oracle")
122
+//                        .split(body()).process(exchange -> {
123
+//                            Message in = exchange.getIn();
124
+//                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
125
+//                            String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("QK");
126
+//                            String org_level3 = aRow.get("ZYQ").toString() ;
127
+//                            String orgname = aRow.get("QK").toString();
128
+//                            String zyq = aRow.get("ZYQ").toString();
129
+//                            aRow.put("station_id", org_level2);
130
+//                            aRow.put("org_name",orgname);
131
+//                            aRow.put("zyq",zyq);
132
+//                            aRow.put("lev3",org_level3);
133
+//                        })
134
+//                        .log("${body[ZYQ]}")
135
+//                        .doTry()
136
+//                        .setBody(simple("insert into centralbase.cb_pc_organization(org_name,org_level,parent_id,org_id_pre)" +
137
+//                                "values('${body[org_name]}','2','0','${body[station_id]}') "))
138
+//                        .to("jdbc:centralbase")
139
+//                        .doCatch(Exception.class)
140
+//                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization insert date faild")
141
+//                        .end();
142
+
143
+//                        .transform()
144
+//                        .body((result) -> {
145
+//                            organization = new TreeSet<>();
146
+//                            orgID = 0;
147
+//                            orgIDs = new HashMap<>();
148
+//                            return result;
149
+//                        })
150
+//                        .step("1")
151
+//                        .split(body()).process(exchange -> {
152
+//                    Message in = exchange.getIn();
153
+//                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
154
+//                    String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("QK") + "@" + aRow.get("ZD");
155
+//                    String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("QK");
156
+//                    String org_level1 = aRow.get("ZYQ").toString();
157
+//                    aRow.put("station_id", org_level3);
158
+//                    //这里是重新生成的orgid,最好先查一下centralbase里已有的
159
+//                    //默认一张新表
160
+//                    if (organization.add(org_level1)) {
161
+//                        orgID++;
162
+//                        orgIDs.put(org_level1, orgID);
163
+//                    }
164
+//                    if (organization.add(org_level2)) {
165
+//                        orgID++;
166
+//                        orgIDs.put(org_level2, orgID);
167
+//                    }
168
+//                    if (organization.add(org_level3)) {
169
+//                        orgID++;
170
+//                        orgIDs.put(org_level3, orgID);
171
+//                    }
172
+//                })
173
+//                    .transform().body((re) -> {
174
+//                    List<Map<String, Object>> rows = new ArrayList<>();
175
+//                    int code = 0;
176
+//                    for (String s : organization) {
177
+//                        code++;// code is same as org_id
178
+//                        String[] orgs = s.split("@");
179
+//                        Map<String, Object> row = new HashMap<>();
180
+//                        row.put("org_id_pre", s);
181
+//                        row.put("org_code", code);
182
+//                        row.put("org_id", "" + code);
183
+//                        switch (orgs.length) {
184
+//                            case 1:
185
+//                                row.put("org_name", orgs[0]);
186
+//                                row.put("org_level", 1);
187
+//                                row.put("org_parent", "0");
188
+//                                break;
189
+//                            case 2:
190
+//                                row.put("org_name", orgs[1]);
191
+//                                row.put("org_level", 2);
192
+//                                row.put("org_parent", orgIDs.get(orgs[0]).toString());
193
+//                                break;
194
+//                            case 3:
195
+//                                row.put("org_name", orgs[2]);
196
+//                                row.put("org_level", 3);
197
+//                                row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString());
198
+//                                break;
199
+//                        }
200
+//                        rows.add(row);
201
+//                    }
202
+//                    return rows;
203
+//                }).split(body())
204
+//                        .doTry()
205
+//                        .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
206
+//                                "values('${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}') "))
207
+//                        .to("jdbc:centralbase")
208
+//                        .doCatch(Exception.class)
209
+//                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization insert date faild")
210
+//                        .end()
211
+
212
+
213
+                //导入井的基本信息 导一次就够了
214
+//                from("timer:mytimer1?period=604800000")
215
+//                        .routeId("oracle-1")
216
+//                        .setHeader("date", constant(getDate()))
217
+//                        .setBody(simple("select distinct a.well_common_name jh, a.org_name_cj zyq, a.GROUP_UNIT_NAME cw, a.station_name zd, a.project_name qk from V_CD_WELL_SOURCE_YC a "))
218
+//                        .to("jdbc:oracle")
219
+//                        .transform()
220
+//                        .body((result) -> {
221
+//                            organization = new TreeSet<>();
222
+//                            orgID = 0;
223
+//                            orgIDs = new HashMap<>();
224
+//                            return result;
225
+//                        })
226
+//                        .step("1")
227
+//                        .split(body()).process(exchange -> {
228
+//                    Message in = exchange.getIn();
229
+//                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
230
+//                    String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("QK") + "@" + aRow.get("ZD");
231
+//                    String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("QK");
232
+//                    String org_level1 = aRow.get("ZYQ").toString();
233
+//                    aRow.put("station_id", org_level3);
234
+
235
+
236
+
237
+
238
+
239
+
240
+//                查询井对应dym不为空的数据 --目前是只要对应井能查到dym不为空的,无论是什么时间的,都放进去
241
+//                将查询到的DYM数据更新到cb_pc_pro_wellbore_status_daily中
242
+//                0 0 */1 * * ? 每1个小时执行一次
243
+//                单独执行时间是4m15s 317条数据
244
+
245
+
246
+//                 没用from("timer:mytimer5?period=3600000")
247
+//                        .routeId("oracle-5")
248
+//                        .setHeader("date", constant(getDate() + " 00:00:00"))
249
+//                        //三个月之内dym不为空的数据
250
+//                        //.setBody(simple("SELECT distinct jh,rq,dym FROM DBA01 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
251
+//                        .setBody(simple("SELECT  distinct jh,max(rq),dym FROM DBA01 WHERE dym is not null group by jh,dym"))
252
+//                        .to("jdbc:oracle")
253
+//                        .split(body())
254
+//                        .doTry()
255
+//                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' and prod_date::date  = '${header.date}' "))
256
+//                        .to("jdbc:centralbase")
257
+//                        .doCatch(Exception.class)
258
+//                        .log("${header.date}"+" routeId:oracle-5->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
259
+//                        .end();
260
+
261
+
262
+
263
+
264
+
265
+
266
+//
267
+//                from("timer:mytimer6?period=3600000")
268
+//                        .routeId("oracle-6")
269
+//                        .setHeader("date", constant(getDate() + " 00:00:00"))
270
+//                        //五个月之内bj不为空的数据
271
+//                        .setBody(simple("SELECT distinct jh,bj FROM DBA01 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-5),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and bj is not null"))
272
+//                        .to("jdbc:oracle")
273
+//                        .split(body())
274
+//                        .doTry()
275
+//                        .setBody(simple("update centralbase.cb_temp_well_mech_runtime set pump_diameter = '${body[BJ]}' where well_id = '${body[JH]}' "))
276
+//                        .to("jdbc:centralbase")
277
+//                        .doCatch(Exception.class)
278
+//                        .log("${header.date}"+" routeId:oracle-6->  centralbase.cb_temp_well_mech_runtime update date faild")
279
+//                        .end();
280
+//                //从天安哪里获取的数据
281
+//                //0 0 */1 * * ? 每1个小时执行一次
282
+//                //单独执行一小时的数据30s
283
+//                from("timer:mytimer8?period=3600000")
284
+//                        .routeId("jdbc-gtsj-?")
285
+//                        .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
286
+//                        .to("jdbc:centralbase")
287
+//                        .split(body())
288
+//                        .setHeader("date", simple("${body[max]}"))
289
+//                        .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '${header.date}' "))
290
+//                        .to("jdbc:gtsj")
291
+//                        .split(body()).process(exchange -> {
292
+//                    Message in = exchange.getIn();
293
+//                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
294
+//                    String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
295
+//                    aRow.put("dyna_create_time", prod_date);
296
+//                    if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
297
+//                        String[] displacements = aRow.get("displacement").toString().split(";");//10 四舍五入
298
+//                        //String[] displacements = wy(displacementsOld);
299
+//                        String[] disp_loads = aRow.get("disp_load").toString().split(";");
300
+//                        Double susp_max_load = max(disp_loads);
301
+//                        Double susp_min_load = min(disp_loads);
302
+//                        String sgt = "";
303
+//                        for (int i = 0; i < displacements.length; i++) {
304
+//                            sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
305
+//                        }
306
+//                        String[] s = sgt.split(",");
307
+//                        String w = "";
308
+//                        for (int i = 0; i < s.length; i++) {
309
+//                            w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
310
+//                        }
311
+//                        Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
312
+//                      /*  for (int i = 0;i<doubles.length;i++){
313
+//                            doubles[i][0] = doubles[i][0] / 10;
314
+//                        }*/
315
+//                        aRow.put("sgt", SGTUtil.encodeToString(doubles));
316
+//                        aRow.put("susp_max_load",susp_max_load);
317
+//                        aRow.put("susp_min_load",susp_min_load);
318
+//                    }
319
+//                    if (aRow.get("stroke") == null) aRow.put("stroke", "0.0");
320
+//                    if (aRow.get("frequency") == null) aRow.put("frequency", "0.0");
321
+//                    if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load", "0.0");
322
+//                    if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load", "0.0");
323
+//                    if (aRow.get("frequency") != null){
324
+//                        BigDecimal bd=new BigDecimal(aRow.get("frequency").toString());
325
+//                        double frequency=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
326
+//                        aRow.put("frequency",frequency);
327
+//                    }
328
+//                    if (aRow.get("stroke") != null){
329
+//                        double stroke1 = Double.parseDouble(aRow.get("stroke").toString());
330
+//                        BigDecimal bd=new BigDecimal(stroke1);
331
+//                        double stroke=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
332
+//                        aRow.put("stroke",stroke);
333
+//                    }
334
+//                })
335
+//                        .doTry()
336
+//                        .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
337
+//                                "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
338
+//                                "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${body[dyna_create_time]}' )"))
339
+//                        .to("jdbc:centralbase")
340
+//                        .doCatch(Exception.class)
341
+//                        .log("${header.date}"+" routeId:jdbc-gtsj-?->  centralbase.cb_temp_well_mech_runtime insert date faild")
342
+//                        .end();
343
+
344
+                /*——————————————————————————————————————————————————————————-*/
345
+
346
+                //单独执行时间10s    A2数据
159 347
                 from("timer:mytimer2?period=3600000")
160 348
                         .routeId("oracle-2")
161 349
                         .setHeader("date", constant(getDate()))
162
-                        .setBody(simple("select  distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
350
+                        .setBody(simple("SELECT DISTINCT well_id wellid FROM centralbase.cb_temp_well_mech_runtime where prod_date > '${header.date}' "))
351
+                        .to("jdbc:centralbase")
352
+                        .split(body())
353
+//                        .split(body())
354
+//                       .setBody(simple("select  distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
355
+//                        .setBody(simple("select v.WELL_COMMON_NAME jh,v.PROD_DATE rq,v.OIL_PRODUCTION_METHOD cyfs,v.PUMP_DIAMETER yz,v.BACK_PRES hysx,v.MAX_TUBING_PRES yysx,v.MAX_CASING_PRES tysx,v.PUMP_DEPTH bs , l.DYNAMIC_LIQ_LEVEL dym from V_PC_PRO_COM_DAILY_CYYC v " +
356
+//                                " left join ( select * from V_TEMP_WELL_MECH_ALL where PROD_DATE = to_date('${header.date}','yyyy-MM-dd') ) l on l.well_id = v.well_id  where v.PROD_DATE = to_date('${header.date}','yyyy-MM-dd') "))
357
+                        .setBody(simple("select v.WELL_COMMON_NAME jh,v.PROD_DATE rq,v.OIL_PRODUCTION_METHOD cyfs,v.PUMP_DIAMETER yz,v.BACK_PRES hysx,v.MAX_TUBING_PRES yysx,v.MAX_CASING_PRES tysx,v.PUMP_DEPTH bs from V_PC_PRO_COM_DAILY_CYYC v  where  v.PROD_DATE = to_date('${header.date}','yyyy-MM-dd')  and  v.WELL_COMMON_NAME = '${body[wellid]}' "))
358
+
163 359
                         .to("jdbc:oracle")
164 360
                         .split(body()).process(exchange -> {
165 361
                     Message in = exchange.getIn();
@@ -169,64 +365,34 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
169 365
                     if (aRow.get("YYSX") == null) aRow.put("YYSX", "0.0");
170 366
                     if (aRow.get("TYSX") == null) aRow.put("TYSX", "0.0");
171 367
                     if (aRow.get("BS") == null) aRow.put("BS", "0.0");
368
+                    if (aRow.get("DYM") == null)aRow.put("DYM","0.0");
172 369
                 })
370
+                        .log("mytimer2")
173 371
                         .doTry()
174
-                        .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth) " +
175
-                                "select '${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}' " +
372
+                        .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth,start_pump_liq_level) " +
373
+                                "select '${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}' ,'${body[DYM]}'" +
176 374
                                 "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_status_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}')"))
177 375
                         .to("jdbc:centralbase")
178 376
                         .doCatch(Exception.class)
179 377
                         .log("${header.date}"+" routeId:oracle-2->  centralbase.cb_pc_pro_wellbore_status_daily insert date faild")
180 378
                         .end();
181
-                //查询井对应dym不为空的数据 --目前是只要对应井能查到dym不为空的,无论是什么时间的,都放进去
182
-                //将查询到的DYM数据更新到cb_pc_pro_wellbore_status_daily中
183
-                //0 0 */1 * * ? 每1个小时执行一次
184
-                //单独执行时间是4m15s 317条数据
185
-                from("timer:mytimer5?period=3600000")
186
-                        .routeId("oracle-5")
187
-                        .setHeader("date", constant(getDate() + " 00:00:00"))
188
-                        //三个月之内dym不为空的数据
189
-                        //.setBody(simple("SELECT distinct jh,rq,dym FROM DBA01 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
190
-                        .setBody(simple("SELECT  distinct jh,max(rq),dym FROM DBA01 WHERE dym is not null group by jh,dym"))
191
-                        .to("jdbc:oracle")
192
-                        .split(body())
193
-                        .doTry()
194
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' and prod_date::date  = '${header.date}' "))
195
-                        .to("jdbc:centralbase")
196
-                        .doCatch(Exception.class)
197
-                        .log("${header.date}"+" routeId:oracle-5->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
198
-                        .end();
199 379
 
200
-                from("timer:mytimer7?period=3600000")
201
-                        .routeId("oracle-7")
202
-                        .setHeader("date", constant(getDate() + " 00:00:00"))
203
-                        //三个月之内dym不为空的数据
204
-                        //.setBody(simple("SELECT distinct jh,rq,dym FROM DBA01 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
205
-                        .setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' "))
206
-                        .to("jdbc:centralbase")
207
-                        .split(body()).process(exchange -> {
208
-                            Message in = exchange.getIn();
209
-                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
210
-                            aRow.put("submergence_depth",null);
211
-                            if (aRow.get("start_pump_liq_level")!=null && aRow.get("pump_depth")!=null){
212
-                                double cmd= Double.valueOf(aRow.get("pump_depth").toString())-Double.valueOf(aRow.get("start_pump_liq_level").toString())/10;
213
-                                BigDecimal bd=new BigDecimal(cmd);
214
-                                double cmd1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
215
-                                aRow.put("submergence_depth",cmd1);
216
-                            }
217
-                })
218
-                        .doTry()
219
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set submergence_depth = '${body[submergence_depth]}' where well_id = '${body[well_id]}' and prod_date  = '${body[prod_date]}'"))
220
-                        .to("jdbc:centralbase")
221
-                        .doCatch(Exception.class)
222
-                        .log("${header.date}"+" routeId:oracle-7->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
223
-                        .end();
224 380
                 //单独执行时间30s
225 381
                 from("timer:mytimer3?period=3600000")
226 382
                         .routeId("oracle-3")
227 383
                         .setHeader("date", constant(getDate()))
228
-                        .setBody(simple("select distinct  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
384
+                        .setBody(simple("SELECT DISTINCT well_id wellid FROM centralbase.cb_temp_well_mech_runtime where prod_date > '${header.date}' "))
385
+                        .doTry()
386
+                        .to("jdbc:centralbase")
387
+                        .doCatch(Exception.class)
388
+                        .log("centralbase error")
389
+                        .split(body())
390
+//                        .setBody(simple("select distinct  jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
391
+                        .setBody(simple("select  WELL_COMMON_NAME jh, PROD_DATE rq,PROD_TIME scsj, LIQ_PROD_DAILY rcyl1,OIL_PROD_DAILY rcyl,GAS_PROD_DAILY rcql,WATER_CUT hs,REMARKS bz from V_PC_PRO_COM_DAILY_CYYC where prod_date = to_date('${header.date}','yyyy-MM-dd') and WELL_COMMON_NAME = '${body[wellid]}'"))
392
+                        .doTry()
229 393
                         .to("jdbc:oracle")
394
+                        .doCatch(Exception.class)
395
+                        .log("oracle error")
230 396
                         .split(body()).process(exchange -> {
231 397
                     Message in = exchange.getIn();
232 398
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
@@ -270,6 +436,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
270 436
                         aRow.put("YMD",0.85);
271 437
                     }
272 438
                 })
439
+                        .log("mytimer3")
273 440
                         .doTry()
274 441
                         .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks,gas_oil_ratio,water_prod_daily,water_gas_ratio,surface_crude_water_density,surface_crude_oil_density) " +
275 442
                                 "select '${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}','${body[QYB]}','${body[RCSL]}','${body[SQB]}','${body[SMD]}','${body[YMD]}' " +
@@ -288,141 +455,113 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
288 455
                         .log("${header.date}"+" routeId:oracle-3->  centralbase.cb_pc_pro_wellbore_vol_daily update date faild")
289 456
                         .end();
290 457
 
291
-
292
-                //0 0 */1 * * ? 每1个小时执行一次
293
-                //单独执行一次30s
458
+//                //0 0 */1 * * ? 每1个小时执行一次
459
+//                //单独执行一次30s
294 460
                 from("timer:mytimer4?period=3600000")
295
-                        .routeId("oracle-4")
296
-                        .setHeader("date", constant(getDate()))
297
-                        .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null  "))
298
-                        .to("jdbc:oracle")
299
-                        .split(body()).process(exchange -> {
300
-                    Message in = exchange.getIn();
301
-                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
302
-                    if (aRow.get("JY") == null) aRow.put("JY", "0.0");
303
-                    if (aRow.get("LY") == null) aRow.put("LY", "0.0");
304
-                    if (aRow.get("BJ") == null) aRow.put("BJ", "0.0");
305
-                    if (aRow.get("BS") == null) aRow.put("BS", "0.0");
306
-                    if (aRow.get("BX") == null) aRow.put("BX", "0.0");
307
-                    if (aRow.get("ZS") == null) aRow.put("ZS", "0.0");
308
-                    if (aRow.get("CC") == null) aRow.put("CC", "0.0");
309
-                    if (aRow.get("CS") == null) aRow.put("CS", "0.0");
310
-                    if (aRow.get("BLX") == null) aRow.put("BLX", "");
311
-                    if (aRow.get("DL") == null) aRow.put("DL", "0.0");
461
+                    .routeId("oracle-4")
462
+                    .setHeader("date", constant(getDate()))
463
+//                        .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from DBA01 where rq  = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null  "))
464
+//                        .setBody(simple("select distinct b.well_common_name jh,  a.prod_date rq,a.static_pressure jy,a.flow_pres ly,a.pump_diameter bj,a.pump_depth bs,a.pump_efficiency bx,a.rotate_frequency zs,a.stroke_length cc,a.stroke_frequency cs,a.pump_type blx,a.elec_frequency dl from (select * from V_TEMP_WELL_MECH_ALL a1 where a1.prod_date = to_date('${header.date}', 'yyyy-MM-dd')) a left join  V_CD_WELL_SOURCE_YC    b on a.well_id = b.well_id "))
465
+                    .setBody(simple("SELECT DISTINCT well_id jh FROM centralbase.cb_temp_well_mech_runtime where prod_date > '${header.date}' "))
466
+                    .to("jdbc:centralbase")
467
+                    .split(body()).process(exchange -> {
468
+                        Message in = exchange.getIn();
469
+                        HashMap<String, Object> aRow = in.getBody(HashMap.class);
470
+                        aRow.put("JH", aRow.get("jh").toString());
471
+                        in.setHeader("JH",aRow.get("jh").toString());
312 472
                 })
473
+                    .setBody(simple("   select well_id wellid from  V_CD_WELL_SOURCE_YC  where well_common_name = '${body[JH]}' "))
474
+                    .to("jdbc:oracle")
475
+                    .split(body()).process(exchange -> {
476
+                        Message in = exchange.getIn();
477
+                        HashMap<String, Object> aRow = in.getBody(HashMap.class);
478
+                        aRow.put("wellid", aRow.get("WELLID").toString());
479
+                })
480
+                    .setBody(simple("  select * from( Select t.prod_date rq,t.static_pressure jy,t.flow_pres ly,t.pump_diameter bj,\n" +
481
+                            "                                      t.pump_depth bs,t.pump_efficiency bx,t.rotate_frequency zs,t.stroke_length cc,t.stroke_frequency cs,\n" +
482
+                            "                                      t.pump_type blx,t.elec_frequency dl, row_number() Over(Partition By t.well_id\n" +
483
+                            "                                Order By t.prod_date desc ) rn   FROM V_TEMP_WELL_MECH_ALL  t where well_id = '${body[wellid]}' and dynamic_liq_level > 0)where rn = 1 "))
484
+                    .to("jdbc:oracle")
485
+                    .split(body()).process(exchange -> {
486
+                        Message in = exchange.getIn();
487
+                        HashMap<String, Object> aRow = in.getBody(HashMap.class);
488
+                        if (aRow.get("JY") == null) aRow.put("JY", "0.0");
489
+                        if (aRow.get("LY") == null) aRow.put("LY", "0.0");
490
+                        if (aRow.get("BJ") == null) aRow.put("BJ", "0.0");
491
+                        if (aRow.get("BS") == null) aRow.put("BS", "0.0");
492
+                        if (aRow.get("BX") == null) aRow.put("BX", "0.0");
493
+                        if (aRow.get("ZS") == null) aRow.put("ZS", "0.0");
494
+                        if (aRow.get("CC") == null) aRow.put("CC", "0.0");
495
+                        if (aRow.get("CS") == null) aRow.put("CS", "0.0");
496
+                        if (aRow.get("BLX") == null) aRow.put("BLX", "");
497
+                        if (aRow.get("DL") == null) aRow.put("DL", "0.0");
498
+                })
499
+                        .log("mytimer4")
313 500
                         .doTry()
314 501
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency) " +
315
-                                "select '${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}' " +
502
+                                "select '${header.JH}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}' " +
316 503
                                 "where NOT EXISTS ( SELECT * FROM centralbase.cb_temp_well_mech_daily WHERE well_id = '${body[JH]}' and  prod_date = '${body[RQ]}' )"))
317 504
                         .to("jdbc:centralbase")
318 505
                         .doCatch(Exception.class)
319 506
                         .log("${header.date}"+" routeId:oracle-4->  centralbase.cb_temp_well_mech_daily insert date faild")
320 507
                         .end();
321 508
 
322
-                from("timer:mytimer6?period=3600000")
323
-                        .routeId("oracle-6")
324
-                        .setHeader("date", constant(getDate() + " 00:00:00"))
325
-                        //五个月之内bj不为空的数据
326
-                        .setBody(simple("SELECT distinct jh,bj FROM DBA01 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-5),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and bj is not null"))
327
-                        .to("jdbc:oracle")
328
-                        .split(body())
329
-                        .doTry()
330
-                        .setBody(simple("update centralbase.cb_temp_well_mech_runtime set pump_diameter = '${body[BJ]}' where well_id = '${body[JH]}' "))
331
-                        .to("jdbc:centralbase")
332
-                        .doCatch(Exception.class)
333
-                        .log("${header.date}"+" routeId:oracle-6->  centralbase.cb_temp_well_mech_runtime update date faild")
334
-                        .end();
335
-                //从天安哪里获取的数据
336
-                //0 0 */1 * * ? 每1个小时执行一次
337
-                //单独执行一小时的数据30s
338
-                from("timer:mytimer8?period=3600000")
339
-                        .routeId("jdbc-gtsj-?")
340
-                        .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
341
-                        .to("jdbc:centralbase")
342
-                        .split(body())
343
-                        .setHeader("date", simple("${body[max]}"))
344
-                        .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '${header.date}' "))
345
-                        .to("jdbc:gtsj")
346
-                        .split(body()).process(exchange -> {
347
-                    Message in = exchange.getIn();
348
-                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
349
-                    String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
350
-                    aRow.put("dyna_create_time", prod_date);
351
-                    if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
352
-                        String[] displacements = aRow.get("displacement").toString().split(";");//10 四舍五入
353
-                        //String[] displacements = wy(displacementsOld);
354
-                        String[] disp_loads = aRow.get("disp_load").toString().split(";");
355
-                        Double susp_max_load = max(disp_loads);
356
-                        Double susp_min_load = min(disp_loads);
357
-                        String sgt = "";
358
-                        for (int i = 0; i < displacements.length; i++) {
359
-                            sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
360
-                        }
361
-                        String[] s = sgt.split(",");
362
-                        String w = "";
363
-                        for (int i = 0; i < s.length; i++) {
364
-                            w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
365
-                        }
366
-                        Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
367
-                      /*  for (int i = 0;i<doubles.length;i++){
368
-                            doubles[i][0] = doubles[i][0] / 10;
369
-                        }*/
370
-                        aRow.put("sgt", SGTUtil.encodeToString(doubles));
371
-                        aRow.put("susp_max_load",susp_max_load);
372
-                        aRow.put("susp_min_load",susp_min_load);
373
-                    }
374
-                    if (aRow.get("stroke") == null) aRow.put("stroke", "0.0");
375
-                    if (aRow.get("frequency") == null) aRow.put("frequency", "0.0");
376
-                    if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load", "0.0");
377
-                    if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load", "0.0");
378
-                    if (aRow.get("frequency") != null){
379
-                        BigDecimal bd=new BigDecimal(aRow.get("frequency").toString());
380
-                        double frequency=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
381
-                        aRow.put("frequency",frequency);
382
-                    }
383
-                    if (aRow.get("stroke") != null){
384
-                        double stroke1 = Double.parseDouble(aRow.get("stroke").toString());
385
-                        BigDecimal bd=new BigDecimal(stroke1);
386
-                        double stroke=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
387
-                        aRow.put("stroke",stroke);
388
-                    }
389
-                })
390
-                        .doTry()
391
-                        .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
392
-                                "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
393
-                                "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${body[dyna_create_time]}' )"))
394
-                        .to("jdbc:centralbase")
395
-                        .doCatch(Exception.class)
396
-                        .log("${header.date}"+" routeId:jdbc-gtsj-?->  centralbase.cb_temp_well_mech_runtime insert date faild")
397
-                        .end();
398 509
 
399
-                from("timer:mytimer9?period=3600000")
400
-                        .routeId("centralbase-2")
510
+                from("timer:mytimer7?period=3600000")
511
+                        .routeId("oracle-7")
401 512
                         .setHeader("date", constant(getDate() + " 00:00:00"))
402
-                        .setBody(simple("select distinct rn.well_id,cb.prod_date,rn.pump_diameter  from centralbase.cb_temp_well_mech_runtime rn ,centralbase.cb_pc_pro_wellbore_status_daily cb where cb.well_id = rn.well_id\n" +
403
-                                "and cb.prod_date = '${header.date}' "))
404
-                        .to("jdbc:centralbase")//.log("${body}")
513
+                        //三个月之内dym不为空的数据
514
+                        //.setBody(simple("SELECT distinct jh,rq,dym FROM DBA01 WHERE  rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
515
+                        .setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' "))
516
+                        .to("jdbc:centralbase")
405 517
                         .split(body()).process(exchange -> {
406 518
                             Message in = exchange.getIn();
407 519
                             HashMap<String, Object> aRow = in.getBody(HashMap.class);
408
-                            aRow.putIfAbsent("pump_diameter", "0.0");
520
+                            aRow.put("submergence_depth",null);
521
+                    if (aRow.get("start_pump_liq_level")!=null && aRow.get("pump_depth")!=null){
522
+                        double cmd= Double.valueOf(aRow.get("pump_depth").toString())-Double.valueOf(aRow.get("start_pump_liq_level").toString())/10;
523
+                        BigDecimal bd=new BigDecimal(cmd);
524
+                        double cmd1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
525
+                        aRow.put("submergence_depth",cmd1);
526
+                    }
409 527
                 })
410 528
                         .doTry()
411
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[pump_diameter]}' where well_id ='${body[well_id]}' and prod_date='${body[prod_date]}' "))
529
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set submergence_depth = '${body[submergence_depth]}' where well_id = '${body[well_id]}' and prod_date  = '${body[prod_date]}'"))
412 530
                         .to("jdbc:centralbase")
413 531
                         .doCatch(Exception.class)
414
-                        .log("${header.date}"+" routeId:centralbase-2->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
532
+                        .log("${header.date}"+" routeId:oracle-7->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
415 533
                         .end();
416 534
 
535
+
536
+
537
+
538
+//                from("timer:mytimer9?period=3600000")
539
+//                        .routeId("centralbase-2")
540
+//                        .setHeader("date", constant(getDate() + " 00:00:00"))
541
+//                        .setBody(simple("select distinct rn.well_id,cb.prod_date,rn.pump_diameter  from centralbase.cb_temp_well_mech_runtime rn ,centralbase.cb_pc_pro_wellbore_status_daily cb where cb.well_id = rn.well_id " +
542
+//                                "and cb.prod_date = '${header.date}' "))
543
+//                        .to("jdbc:centralbase")//.log("${body}")
544
+//                        .split(body()).process(exchange -> {
545
+//                    Message in = exchange.getIn();
546
+//                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
547
+//                    aRow.putIfAbsent("pump_diameter", "0.0");
548
+//                })
549
+//                        .doTry()
550
+//                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[pump_diameter]}' where well_id ='${body[well_id]}' and prod_date='${body[prod_date]}' "))
551
+//                        .to("jdbc:centralbase")
552
+//                        .doCatch(Exception.class)
553
+//                        .log("${header.date}"+" routeId:centralbase-2->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
554
+//                        .end();
555
+//
417 556
                 from("timer:mytimer10?period=3600000")
418 557
                         .routeId("centralbase-3")
419 558
                         .setHeader("date", constant(getDate()))
420 559
                         .setBody(simple("select well_id,avg(stroke_length) stroke_length  ,avg(stroke_frequency) stroke_frequency from centralbase.cb_temp_well_mech_runtime where prod_date::date='${header.date}' group by well_id"))
421 560
                         .to("jdbc:centralbase")//.log("${body}")
422 561
                         .split(body()).process(exchange -> {
423
-                    Message in = exchange.getIn();
424
-                    HashMap<String, Object> aRow = in.getBody(HashMap.class);
425
-                    if (aRow.get("stroke_length")!=null && aRow.get("stroke_frequency")!=null){
562
+                        Message in = exchange.getIn();
563
+                        HashMap<String, Object> aRow = in.getBody(HashMap.class);
564
+                        if (aRow.get("stroke_length")!=null && aRow.get("stroke_frequency")!=null){
426 565
                         double stroke_length=Double.parseDouble(aRow.get("stroke_length").toString());
427 566
                         double stroke_frequency=Double.parseDouble(aRow.get("stroke_frequency").toString());
428 567
                         BigDecimal bd=new BigDecimal(stroke_length);
@@ -442,7 +581,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
442 581
 
443 582
                 from("timer:mytimer11?period=3600000")
444 583
                         .routeId("centralbase-1")
445
-                        .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) "))
584
+                        .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) and ti.stroke_length > 0 "))
446 585
                         .to("jdbc:centralbase")
447 586
                         .split(body())
448 587
                         .doTry()
@@ -463,7 +602,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
463 602
                         .log("${header.date}"+" rocketMQ send data faild")
464 603
                         .end();
465 604
 
466
-            };
605
+            }
467 606
         };
468 607
     }
469 608
 }

Tiedoston diff-näkymää rajattu, sillä se on liian suuri
+ 225 - 22
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelRestConfiguration.java


+ 40 - 34
src/main/resources/application.yml

@@ -7,7 +7,7 @@ spring:
7 7
     ds1:
8 8
       ## Hikari连接池的设置 Hikari 时间单位都是毫秒
9 9
       type: com.zaxxer.hikari.HikariDataSource
10
-      jdbc-url: jdbc:postgresql://10.72.143.2:54321/database
10
+      jdbc-url: jdbc:postgresql://11.72.128.68:54321/database
11 11
       username: root
12 12
       password: 123456
13 13
       driver-class-name: org.postgresql.Driver
@@ -17,7 +17,7 @@ spring:
17 17
         ## 最小空闲连接数量
18 18
         minimum-idle: 5
19 19
         ## 空闲连接存活最大时间,默认600000(10分钟)
20
-        idle-timeout: 60000
20
+        idle-timeout: 600000
21 21
         ## 连接池最大连接数,默认是10
22 22
         maximum-pool-size: 15
23 23
         ## 此属性控制从池返回的连接的默认自动提交行为,默认值:true
@@ -26,10 +26,11 @@ spring:
26 26
         max-lifetime: 1800000
27 27
         ## 数据库连接超时时间,默认30秒,即30000
28 28
         connection-timeout: 30000
29
+
29 30
     ds2:
30 31
       ## Hikari连接池的设置 Hikari 时间单位都是毫秒
31 32
       type: com.zaxxer.hikari.HikariDataSource
32
-      jdbc-url: jdbc:postgresql://10.72.143.2:54321/diagnosis
33
+      jdbc-url: jdbc:postgresql://11.72.128.68:54321/diagnosis
33 34
       username: root
34 35
       password: 123456
35 36
       driver-class-name: org.postgresql.Driver
@@ -39,7 +40,7 @@ spring:
39 40
         ## 最小空闲连接数量
40 41
         minimum-idle: 5
41 42
         ## 空闲连接存活最大时间,默认600000(10分钟)
42
-        idle-timeout: 60000
43
+        idle-timeout: 600000
43 44
         ## 连接池最大连接数,默认是10
44 45
         maximum-pool-size: 15
45 46
         ## 此属性控制从池返回的连接的默认自动提交行为,默认值:true
@@ -48,12 +49,16 @@ spring:
48 49
         max-lifetime: 1800000
49 50
         ## 数据库连接超时时间,默认30秒,即30000
50 51
         connection-timeout: 30000
52
+
51 53
     ds3:
52 54
       ## Hikari连接池的设置 Hikari 时间单位都是毫秒
53 55
       type: com.zaxxer.hikari.HikariDataSource
54
-      jdbc-url: jdbc:oracle:thin:@11.71.15.28:1521:oracycn2
55
-      username: ZD_FGD
56
-      password: ZJGSDDS#2014ZD
56
+      jdbc-url: jdbc:oracle:thin:@10.72.48.8:1521:ora920
57
+      username: cyyc_a2dms
58
+      password: cyyca2dms2018#
59
+     ## jdbc-url: jdbc:oracle:thin:@11.72.128.73:1521:orcl
60
+     ## username: dhzdly
61
+     ## password: gctgct1QAZ2Wsx
57 62
       driver-class-name: oracle.jdbc.driver.OracleDriver
58 63
       hikari:
59 64
         ## 连接池名字
@@ -61,29 +66,7 @@ spring:
61 66
         ## 最小空闲连接数量
62 67
         minimum-idle: 5
63 68
         ## 空闲连接存活最大时间,默认600000(10分钟)
64
-        idle-timeout: 60000
65
-        ## 连接池最大连接数,默认是10
66
-        maximum-pool-size: 15
67
-        ## 此属性控制从池返回的连接的默认自动提交行为,默认值:true
68
-        auto-commit: true
69
-        ## 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
70
-        max-lifetime: 1800000
71
-        ## 数据库连接超时时间,默认30秒,即30000
72
-        connection-timeout: 30000
73
-    ds4:
74
-      ## Hikari连接池的设置 Hikari 时间单位都是毫秒
75
-      type: com.zaxxer.hikari.HikariDataSource
76
-      jdbc-url: jdbc:postgresql://11.72.187.11:5432/GTDATE
77
-      username: postgres
78
-      password: Postgres_2021#
79
-      driver-class-name: org.postgresql.Driver
80
-      hikari:
81
-        ## 连接池名字
82
-        pool-name: SystemHikariCP
83
-        ## 最小空闲连接数量
84
-        minimum-idle: 5
85
-        ## 空闲连接存活最大时间,默认600000(10分钟)
86
-        idle-timeout: 60000
69
+        idle-timeout: 600000
87 70
         ## 连接池最大连接数,默认是10
88 71
         maximum-pool-size: 15
89 72
         ## 此属性控制从池返回的连接的默认自动提交行为,默认值:true
@@ -92,6 +75,29 @@ spring:
92 75
         max-lifetime: 1800000
93 76
         ## 数据库连接超时时间,默认30秒,即30000
94 77
         connection-timeout: 30000
78
+        connection-test-query: SELECT 1 FROM DUAL
79
+##    ds4:
80
+##      ## Hikari连接池的设置 Hikari 时间单位都是毫秒
81
+##      type: com.zaxxer.hikari.HikariDataSource
82
+##      jdbc-url: jdbc:postgresql://11.72.187.11:5432/GTDATE
83
+##      username: postgres
84
+##      password: Postgres_2021#
85
+##      driver-class-name: org.postgresql.Driver
86
+##      hikari:
87
+##        ## 连接池名字
88
+##        pool-name: SystemHikariCP
89
+##        ## 最小空闲连接数量
90
+##        minimum-idle: 5
91
+##        ## 空闲连接存活最大时间,默认600000(10分钟)
92
+##        idle-timeout: 60000
93
+##        ## 连接池最大连接数,默认是10
94
+##        maximum-pool-size: 15
95
+##        ## 此属性控制从池返回的连接的默认自动提交行为,默认值:true
96
+##        auto-commit: true
97
+##        ## 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
98
+##        max-lifetime: 1800000
99
+##        ## 数据库连接超时时间,默认30秒,即30000
100
+##        connection-timeout: 30000
95 101
 
96 102
 management:
97 103
   info:
@@ -111,12 +117,12 @@ management:
111 117
       cors:
112 118
         allow-credentials: off
113 119
 server:
114
-  port: 8080
120
+  port: 1010
115 121
 
116 122
 
117 123
 rocketmq:
118 124
   #rocketmq的路由调度器的地址
119
-  name-server: 10.72.143.2:9876
125
+  name-server: 11.72.128.68:9876
120 126
   producer:
121 127
     # 消息分组
122 128
     group: aoid
@@ -134,7 +140,7 @@ gct:
134 140
         group: diagnose
135 141
         access-key: diagnose-msg-v1
136 142
         secret-key: diagnose-msg-v1
137
-      name-server: 10.72.143.2:9876
143
+      name-server: 11.72.128.68:9876
138 144
       consumer:
139 145
         group: diagnose
140 146
         access-key: diagnose-msg-v1
@@ -147,7 +153,7 @@ gct:
147 153
         group: warn
148 154
         access-key: warn-msg-v1
149 155
         secret-key: warn-msg-v1
150
-      name-server: 10.72.143.2:9876
156
+      name-server: 11.72.128.68:9876
151 157
       consumer:
152 158
         group: warn
153 159
         access-key: warn-msg-v1