|
@@ -3,6 +3,9 @@ package com.gct.tools.etlcamelhuge.routeconfig;
|
3
|
3
|
import org.apache.camel.*;
|
4
|
4
|
import org.apache.camel.builder.RouteBuilder;
|
5
|
5
|
//import org.apache.rocketmq.common.message.Message;
|
|
6
|
+import org.apache.camel.model.ExpressionNode;
|
|
7
|
+import org.apache.camel.model.ProcessorDefinition;
|
|
8
|
+import org.apache.camel.model.RouteDefinition;
|
6
|
9
|
import org.springframework.context.annotation.Bean;
|
7
|
10
|
import org.springframework.context.annotation.Configuration;
|
8
|
11
|
import org.springframework.util.StringUtils;
|
|
@@ -26,91 +29,93 @@ public class CamelJDBCConfiguration {
|
26
|
29
|
public String getDate(){
|
27
|
30
|
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
|
28
|
31
|
}
|
29
|
|
-
|
30
|
32
|
@Bean
|
31
|
33
|
public RouteBuilder routeBuilderWithOracle1() {
|
32
|
34
|
return new RouteBuilder() {
|
33
|
35
|
private SortedSet<String> organization;
|
34
|
|
- private Map<String, Integer> orgIDs;
|
35
|
|
- private Integer orgID;
|
36
|
|
- private Map<String, Integer> stringIntegerMap;
|
37
|
|
- private List<Object> wellIdList;
|
38
|
|
- private Set<Object> orgIdPreList;
|
|
36
|
+ private Map<String, Integer> orgIDs = new HashMap<>();
|
|
37
|
+ private int orgID;
|
|
38
|
+ private Map<Object, Object> orgIdPreList;
|
39
|
39
|
private Integer org;
|
40
|
|
- //全部执行完成的大概时间在30-40分钟
|
|
40
|
+
|
|
41
|
+ public ProcessorDefinition<ExpressionNode> setMyBody(RouteDefinition route){
|
|
42
|
+ return route.setBody(simple("select well_id from centralbase.sys_access_well_control where access_status='1' "))
|
|
43
|
+ .to("jdbc:centralbase")
|
|
44
|
+ .split(body()).process(exchange -> {
|
|
45
|
+ HashMap body = exchange.getIn().getBody(HashMap.class);
|
|
46
|
+ exchange.getIn().setHeader("well_id",body.get("well_id"));
|
|
47
|
+ });
|
|
48
|
+ }
|
41
|
49
|
@Override
|
42
|
50
|
public void configure() throws Exception {
|
43
|
51
|
|
44
|
|
- from("timer:insert-OrgAndWellSource?period=86400000")
|
|
52
|
+ RouteDefinition OrgAndWellSource= (RouteDefinition) from("timer:insert-OrgAndWellSource?period=86400000")
|
45
|
53
|
.routeId("insert-OrgAndWellSource")
|
46
|
54
|
.setHeader("date", constant(getDate()))
|
47
|
55
|
.process(exchange -> {
|
48
|
56
|
org = 0;
|
49
|
|
- wellIdList = new ArrayList<>();
|
50
|
|
- orgIdPreList =new HashSet<>();
|
|
57
|
+ orgIdPreList = new HashMap<>();
|
51
|
58
|
})
|
52
|
59
|
.setBody(simple("select max(org_id) from centralbase.cb_pc_organization"))
|
53
|
60
|
.to("jdbc:centralbase")
|
54
|
61
|
.process(exchange -> {
|
55
|
62
|
HashMap body = exchange.getIn().getBody(HashMap.class);
|
56
|
|
- if(body==null|| StringUtils.isEmpty(body.get("max"))) org=0;
|
|
63
|
+ if (body == null || StringUtils.isEmpty(body.get("max"))) org = 0;
|
57
|
64
|
else
|
58
|
|
- org = Integer.valueOf(body.get("max").toString());
|
59
|
|
- })
|
60
|
|
- .setBody(simple("select well_id from centralbase.sys_access_well_control where access_status='1' "))
|
61
|
|
- .to("jdbc:centralbase")
|
62
|
|
- .split(body()).process(exchange -> {
|
63
|
|
- HashMap body = exchange.getIn().getBody(HashMap.class);
|
64
|
|
- exchange.getIn().setHeader("well_id",body.get("well_id"));
|
|
65
|
+ org = Integer.valueOf(body.get("max").toString());
|
65
|
66
|
})
|
66
|
|
- .setBody(simple("select org_id_pre from centralbase.cb_pc_organization"))
|
|
67
|
+ .setBody(simple("select org_id_pre,org_id from centralbase.cb_pc_organization"))
|
67
|
68
|
.to("jdbc:centralbase")
|
68
|
69
|
.split(body()).process(exchange -> {
|
69
|
70
|
HashMap body = exchange.getIn().getBody(HashMap.class);
|
70
|
|
- orgIdPreList.add(body.get("org_id_pre"));
|
71
|
|
- }).end()
|
|
71
|
+ orgIdPreList.put(body.get("org_id_pre"),body.get("org_id"));
|
|
72
|
+ }).end();
|
|
73
|
+ setMyBody(OrgAndWellSource)
|
72
|
74
|
.setBody(simple("select distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from DBA01 where jh ='${header.well_id}' "))
|
73
|
75
|
.to("jdbc:oracle")
|
74
|
|
- .transform()
|
75
|
|
- .body((result) -> {
|
|
76
|
+ .transform()
|
|
77
|
+ .body((result) -> {
|
76
|
78
|
organization = new TreeSet<>();
|
77
|
|
- orgID = 0;
|
78
|
|
- orgIDs = new HashMap<>();
|
79
|
79
|
return result;
|
80
|
80
|
})
|
81
|
|
- .step("1")
|
82
|
|
- .split(body()).process(exchange -> {
|
|
81
|
+ .step("1")
|
|
82
|
+ .split(body()).process(exchange -> {
|
83
|
83
|
Message in = exchange.getIn();
|
84
|
84
|
HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
|
85
|
+ if (!aRow.containsKey("JM")) aRow.put("JM",aRow.get("JH"));
|
85
|
86
|
String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK");
|
86
|
87
|
String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC");
|
87
|
88
|
String org_level1 = aRow.get("ZYQ").toString();
|
88
|
89
|
aRow.put("station_id", org_level3);
|
89
|
|
- if (organization.add(org_level1)) {
|
90
|
|
- orgID++;
|
91
|
|
- orgIDs.put(org_level1, orgID);
|
|
90
|
+ orgID = org;
|
|
91
|
+ if ((!orgIdPreList.containsKey(org_level1)) || (!orgIdPreList.containsKey(org_level2)) || (!orgIdPreList.containsKey(org_level3))) {
|
|
92
|
+ if (organization.add(org_level1)) {
|
|
93
|
+ if (!orgIDs.containsKey(org_level1)) orgIDs.put(org_level1,++orgID);
|
|
94
|
+ }
|
|
95
|
+ if (organization.add(org_level2)) {
|
|
96
|
+ if (!orgIDs.containsKey(org_level2)) orgIDs.put(org_level2,++orgID);
|
|
97
|
+ }
|
|
98
|
+ if (organization.add(org_level3)) {
|
|
99
|
+ if (!orgIDs.containsKey(org_level3)) orgIDs.put(org_level3,++orgID);
|
|
100
|
+ }
|
92
|
101
|
}
|
93
|
|
- if (organization.add(org_level2)) {
|
94
|
|
- orgID++;
|
95
|
|
- orgIDs.put(org_level2, orgID);
|
|
102
|
+ if(orgIdPreList.get(org_level3) !=null){
|
|
103
|
+ aRow.put("org_id",orgIdPreList.get(org_level3));
|
|
104
|
+ return;
|
96
|
105
|
}
|
97
|
|
- if (organization.add(org_level3)) {
|
98
|
|
- orgID++;
|
99
|
|
- orgIDs.put(org_level3, orgID);
|
|
106
|
+ if(orgIDs.get(org_level3) !=null){
|
|
107
|
+ aRow.put("org_id",orgIDs.get(org_level3));
|
100
|
108
|
}
|
101
|
|
-
|
102
|
109
|
})
|
103
|
|
- .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
|
104
|
|
- "select '${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}' " +
|
|
110
|
+ .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,org_id,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
|
|
111
|
+ "select '${body[JH]}','${body[JM]}','${body[QYRQ]}'::timestamp,'${body[org_id]}','${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}' " +
|
105
|
112
|
"where NOT EXISTS ( SELECT * FROM centralbase.cb_cd_well_source WHERE well_id = '${body[JH]}' )"))
|
106
|
|
- .to("jdbc:centralbase")
|
107
|
|
- .end()
|
108
|
|
- .transform().body((re) -> {
|
|
113
|
+ .to("jdbc:centralbase")
|
|
114
|
+ .end()
|
|
115
|
+ .transform().body((re) -> {
|
109
|
116
|
List<Map<String, Object>> rows = new ArrayList<>();
|
110
|
|
- Map<String, Object> row = new HashMap<>();
|
111
|
|
-// int code = org;
|
112
|
117
|
for (String s : organization) {
|
113
|
|
- // code is same as org_id
|
|
118
|
+ Map<String, Object> row = new HashMap<>();
|
114
|
119
|
String[] orgs = s.split("@");
|
115
|
120
|
row.put("org_id_pre", s);
|
116
|
121
|
switch (orgs.length) {
|
|
@@ -130,32 +135,24 @@ public class CamelJDBCConfiguration {
|
130
|
135
|
row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString());
|
131
|
136
|
break;
|
132
|
137
|
}
|
133
|
|
- if (orgIdPreList.add(s)) {
|
|
138
|
+ if (!orgIdPreList.containsKey(s)) {
|
134
|
139
|
org++;
|
135
|
140
|
row.put("org_code", org);
|
136
|
141
|
row.put("org_id", "" + org);
|
|
142
|
+ orgIdPreList.put(s, row.get("org_id"));
|
137
|
143
|
rows.add(row);
|
138
|
144
|
}
|
139
|
145
|
}
|
140
|
146
|
return rows;
|
141
|
147
|
}).split(body())
|
142
|
|
- .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
|
|
148
|
+ .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
|
143
|
149
|
"select '${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}' " +
|
144
|
|
- "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_organization WHERE org_id = '${body[org_id]}' and org_code='${body[org_code]}' and org_name = '${body[org_name]}' and org_level = '${body[org_level]}' and parent_id ='${body[org_parent]}' and org_id_pre ='${body[org_id_pre]}')"))
|
145
|
|
- .doTry()
|
146
|
|
- .to("jdbc:centralbase")
|
147
|
|
- .doCatch(Exception.class)
|
148
|
|
- .log("${header.date}"+" routeId:insert-OrgAndWellSource-> centralbase.cb_pc_organization insert data failed")
|
149
|
|
- .end()
|
150
|
|
- .setBody(simple("select org_id,org_id_pre from centralbase.cb_pc_organization where org_level = '3' "))
|
151
|
|
- .to("jdbc:centralbase")
|
152
|
|
- .split(body())
|
153
|
|
- .doTry()
|
154
|
|
- .setBody(simple("update centralbase.cb_cd_well_source set org_id = '${body[org_id]}' where station_id = '${body[org_id_pre]}'"))
|
155
|
|
- .to("jdbc:centralbase")
|
156
|
|
- .doCatch(Exception.class)
|
157
|
|
- .log("${header.date}"+" routeId:insert-OrgAndWellSource-> centralbase.cb_pc_organization update data failed")
|
158
|
|
- .end();
|
|
150
|
+ "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_organization WHERE org_id = '${body[org_id]}')"))
|
|
151
|
+ .doTry()
|
|
152
|
+ .to("jdbc:centralbase")
|
|
153
|
+ .doCatch(Exception.class)
|
|
154
|
+ .log("${header.date}"+" routeId:insert-OrgAndWellSource-> centralbase.cb_pc_organization insert data failed")
|
|
155
|
+ .end();
|
159
|
156
|
|
160
|
157
|
from("timer:update-wellControl?period=3600000")
|
161
|
158
|
.routeId("update-wellControl")
|
|
@@ -187,13 +184,14 @@ public class CamelJDBCConfiguration {
|
187
|
184
|
.end();
|
188
|
185
|
|
189
|
186
|
|
190
|
|
- from("timer:mytimer-insert-statusDaily?period=3600000")
|
|
187
|
+ RouteDefinition statusDaily= from("timer:mytimer-insert-statusDaily?period=3600000")
|
191
|
188
|
.routeId("insert-statusDaily")
|
192
|
189
|
.process(exchange -> {
|
193
|
190
|
Message in = exchange.getIn();
|
194
|
191
|
in.setHeader("date",getDate());
|
195
|
|
- })
|
196
|
|
- .setBody(simple("select distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
|
|
192
|
+ });
|
|
193
|
+ setMyBody(statusDaily)
|
|
194
|
+ .setBody(simple("select distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and jh='${header.well_id}' and qyrq is not null "))
|
197
|
195
|
.to("jdbc:oracle")
|
198
|
196
|
.split(body()).process(exchange -> {
|
199
|
197
|
Message in = exchange.getIn();
|
|
@@ -214,13 +212,14 @@ public class CamelJDBCConfiguration {
|
214
|
212
|
.end();
|
215
|
213
|
|
216
|
214
|
|
217
|
|
- from("timer:mytimer-update-statusDaily-DYM?period=3600000")
|
|
215
|
+ RouteDefinition statusDailyDYM = from("timer:mytimer-update-statusDaily-DYM?period=3600000")
|
218
|
216
|
.routeId("update-statusDaily-DYM")
|
219
|
217
|
.process(exchange -> {
|
220
|
218
|
Message in = exchange.getIn();
|
221
|
219
|
in.setHeader("date",getDate());
|
222
|
|
- })
|
223
|
|
- .setBody(simple("select distinct jh , rq , dym from DBA01 where (jh,rq) in (SELECT jh,max(rq) rq FROM DBA01 WHERE dym is not null and rq<= to_date('${header.date}','yyyy-MM-dd') group by jh)"))
|
|
220
|
+ });
|
|
221
|
+ setMyBody(statusDailyDYM)
|
|
222
|
+ .setBody(simple("select distinct jh , rq , dym from DBA01 where (jh,rq) in (SELECT jh,max(rq) rq FROM DBA01 WHERE dym is not null and rq<= to_date('${header.date}','yyyy-MM-dd')and jh='${header.well_id}' group by jh)"))
|
224
|
223
|
.to("jdbc:oracle")
|
225
|
224
|
.split(body())
|
226
|
225
|
.setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' and prod_date::date = '${header.date}' "))
|
|
@@ -229,14 +228,14 @@ public class CamelJDBCConfiguration {
|
229
|
228
|
.doCatch(Exception.class)
|
230
|
229
|
.log("${header.date}"+" routeId:update-statusDaily-DYM -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
|
231
|
230
|
.end();
|
232
|
|
- //因为数据库有可能会查出多条数据,现在的做法时,在数据库中增加两个个字段,用来记录BJ和DYM 的更新时间 ,只有获取到的时间大于数据库中存储的时间时,才会更新,并且该字段也会更新
|
233
|
|
- from("timer:mytimer-update-statusDaily-BJ?period=3600000")
|
|
231
|
+ RouteDefinition statusDailyBJ = from("timer:mytimer-update-statusDaily-BJ?period=3600000")
|
234
|
232
|
.routeId("update-statusDaily-BJ")
|
235
|
233
|
.process(exchange -> {
|
236
|
234
|
Message in = exchange.getIn();
|
237
|
235
|
in.setHeader("date",getDate());
|
238
|
|
- })
|
239
|
|
- .setBody(simple("select distinct jh , rq , bj from DBA01 where (jh,rq) in (SELECT jh,max(rq) rq FROM DBA01 WHERE bj is not null and rq<= to_date('${header.date}','yyyy-MM-dd') group by jh)"))
|
|
236
|
+ });
|
|
237
|
+ setMyBody(statusDailyBJ)
|
|
238
|
+ .setBody(simple("select distinct jh , rq , bj from DBA01 where (jh,rq) in (SELECT jh,max(rq) rq FROM DBA01 WHERE bj is not null and rq<= to_date('${header.date}','yyyy-MM-dd') and jh='${header.well_id}' group by jh)"))
|
240
|
239
|
.to("jdbc:oracle")
|
241
|
240
|
.split(body()).process(exchange -> {
|
242
|
241
|
HashMap body = exchange.getIn().getBody(HashMap.class);
|
|
@@ -248,13 +247,14 @@ public class CamelJDBCConfiguration {
|
248
|
247
|
.log("${header.date}"+" routeId:update-statusDaily-BJ -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
|
249
|
248
|
.end();
|
250
|
249
|
|
251
|
|
- from("timer:mytimer-update-statusDaily-submergenceDepth?period=3600000")
|
|
250
|
+ RouteDefinition submergenceDepth = from("timer:mytimer-update-statusDaily-submergenceDepth?period=3600000")
|
252
|
251
|
.routeId("update-statusDaily-submergenceDepth")
|
253
|
252
|
.process(exchange -> {
|
254
|
253
|
Message in = exchange.getIn();
|
255
|
254
|
in.setHeader("date",getDate()+" 00:00:00");
|
256
|
|
- })
|
257
|
|
- .setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' "))
|
|
255
|
+ });
|
|
256
|
+ setMyBody(submergenceDepth)
|
|
257
|
+ .setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' and well_id='${header.well_id}' "))
|
258
|
258
|
.to("jdbc:centralbase")
|
259
|
259
|
.split(body()).process(exchange -> {
|
260
|
260
|
Message in = exchange.getIn();
|
|
@@ -274,13 +274,14 @@ public class CamelJDBCConfiguration {
|
274
|
274
|
.log("${header.date}"+" routeId:update-statusDaily-submergenceDepth -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
|
275
|
275
|
.end();
|
276
|
276
|
|
277
|
|
- from("timer:mytimer-insert-volDaily?period=3600000")
|
|
277
|
+ RouteDefinition volDaily = from("timer:mytimer-insert-volDaily?period=3600000")
|
278
|
278
|
.routeId("insert-volDaily")
|
279
|
279
|
.process(exchange -> {
|
280
|
280
|
Message in = exchange.getIn();
|
281
|
281
|
in.setHeader("date",getDate());
|
282
|
|
- })
|
283
|
|
- .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
|
|
282
|
+ });
|
|
283
|
+ setMyBody(volDaily)
|
|
284
|
+ .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and jh ='${header.well_id}' and qyrq is not null "))
|
284
|
285
|
.to("jdbc:oracle")
|
285
|
286
|
.split(body()).process(exchange -> {
|
286
|
287
|
Message in = exchange.getIn();
|
|
@@ -341,13 +342,14 @@ public class CamelJDBCConfiguration {
|
341
|
342
|
.to("jdbc:centralbase")
|
342
|
343
|
.end();
|
343
|
344
|
|
344
|
|
- from("timer:mytimer-update-volDaily-liq_prod_daily?period=3600000")
|
|
345
|
+ RouteDefinition volDailyLiqProdDaily = from("timer:mytimer-update-volDaily-liq_prod_daily?period=3600000")
|
345
|
346
|
.routeId("update-volDaily-liq_prod_daily")
|
346
|
347
|
.process(exchange -> {
|
347
|
348
|
Message in = exchange.getIn();
|
348
|
349
|
in.setHeader("date",getDate());
|
349
|
|
- })
|
350
|
|
- .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
|
|
350
|
+ });
|
|
351
|
+ setMyBody(volDailyLiqProdDaily)
|
|
352
|
+ .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd')and jh ='${header.well_id}' and qyrq is not null "))
|
351
|
353
|
.to("jdbc:oracle")
|
352
|
354
|
.split(body()).process(exchange -> {
|
353
|
355
|
Message in = exchange.getIn();
|