|
@@ -1,28 +1,13 @@
|
1
|
1
|
package com.gct.tools.etlcamelhuge.routeconfig;
|
2
|
2
|
|
3
|
|
-import com.alibaba.fastjson.JSONObject;
|
4
|
|
-import com.gct.common.util.SGTUtil;
|
5
|
|
-import com.gct.tools.etlcamelhuge.MQ.DefaultMsgSendSuccessCallBack;
|
6
|
|
-import com.gct.tools.etlcamelhuge.MQ.MessageBody;
|
7
|
|
-import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
|
8
|
|
-import com.gct.tools.etlcamelhuge.camelconfig.MyDataSourceConfiguration;
|
9
|
|
-import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
|
10
|
|
-import lombok.Data;
|
11
|
3
|
import org.apache.camel.*;
|
12
|
4
|
import org.apache.camel.builder.RouteBuilder;
|
13
|
5
|
//import org.apache.rocketmq.common.message.Message;
|
14
|
|
-import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
15
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
16
|
6
|
import org.springframework.context.annotation.Bean;
|
17
|
7
|
import org.springframework.context.annotation.Configuration;
|
18
|
|
-import org.springframework.jdbc.core.JdbcTemplate;
|
19
|
|
-import org.springframework.scheduling.annotation.Async;
|
|
8
|
+import org.springframework.util.StringUtils;
|
20
|
9
|
|
21
|
|
-import javax.annotation.Resource;
|
22
|
|
-import javax.sql.DataSource;
|
23
|
10
|
import java.math.BigDecimal;
|
24
|
|
-import java.text.DecimalFormat;
|
25
|
|
-import java.text.SimpleDateFormat;
|
26
|
11
|
import java.time.LocalDateTime;
|
27
|
12
|
import java.time.format.DateTimeFormatter;
|
28
|
13
|
import java.util.*;
|
|
@@ -49,97 +34,157 @@ public class CamelJDBCConfiguration {
|
49
|
34
|
private Map<String, Integer> orgIDs;
|
50
|
35
|
private Integer orgID;
|
51
|
36
|
private Map<String, Integer> stringIntegerMap;
|
|
37
|
+ private List<Object> wellIdList;
|
|
38
|
+ private Set<Object> orgIdPreList;
|
|
39
|
+ private Integer org;
|
52
|
40
|
//全部执行完成的大概时间在30-40分钟
|
53
|
41
|
@Override
|
54
|
42
|
public void configure() throws Exception {
|
55
|
|
- /*from("timer:mytimer1?period=604800000")
|
56
|
|
- .routeId("oracle-1")
|
|
43
|
+
|
|
44
|
+ from("timer:insert-OrgAndWellSource?period=86400000")
|
|
45
|
+ .routeId("insert-OrgAndWellSource")
|
57
|
46
|
.setHeader("date", constant(getDate()))
|
58
|
|
- .setBody(simple("select distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
|
59
|
|
- .to("jdbc:oracle")
|
60
|
|
- .log("${header.date}"+"routeId:oracle-1-> select cb_cd_well_source need data failed")
|
61
|
|
- .transform()
|
62
|
|
- .body((result) -> {
|
63
|
|
- organization = new TreeSet<>();
|
64
|
|
- orgID = 0;
|
65
|
|
- orgIDs = new HashMap<>();
|
66
|
|
- return result;
|
|
47
|
+ .process(exchange -> {
|
|
48
|
+ org = 0;
|
|
49
|
+ wellIdList = new ArrayList<>();
|
|
50
|
+ orgIdPreList =new HashSet<>();
|
67
|
51
|
})
|
68
|
|
- .step("1")
|
69
|
|
- .split(body()).process(exchange -> {
|
70
|
|
- Message in = exchange.getIn();
|
71
|
|
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
72
|
|
- String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK");
|
73
|
|
- String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC");
|
74
|
|
- String org_level1 = aRow.get("ZYQ").toString();
|
75
|
|
- aRow.put("station_id", org_level3);
|
76
|
|
- if (organization.add(org_level1)) {
|
77
|
|
- orgID++;
|
78
|
|
- orgIDs.put(org_level1, orgID);
|
79
|
|
- }
|
80
|
|
- if (organization.add(org_level2)) {
|
81
|
|
- orgID++;
|
82
|
|
- orgIDs.put(org_level2, orgID);
|
83
|
|
- }
|
84
|
|
- if (organization.add(org_level3)) {
|
85
|
|
- orgID++;
|
86
|
|
- orgIDs.put(org_level3, orgID);
|
87
|
|
- }
|
88
|
|
- })
|
89
|
|
- .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
|
90
|
|
- "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}')" +
|
91
|
|
- " ON conflict(well_id) DO UPDATE set remarks = '${body[BZ]}' "))
|
|
52
|
+ .setBody(simple("select max(org_id) from centralbase.cb_pc_organization"))
|
92
|
53
|
.to("jdbc:centralbase")
|
93
|
|
- .log("${header.date}"+" routeId:oracle-1-> centralbase.cb_cd_well_source insert data failed")
|
94
|
|
- .end()
|
95
|
|
- .transform().body((re) -> {
|
96
|
|
- List<Map<String, Object>> rows = new ArrayList<>();
|
97
|
|
- int code = 0;
|
98
|
|
- for (String s : organization) {
|
99
|
|
- code++;// code is same as org_id
|
100
|
|
- String[] orgs = s.split("@");
|
101
|
|
- Map<String, Object> row = new HashMap<>();
|
102
|
|
- row.put("org_id_pre", s);
|
103
|
|
- row.put("org_code", code);
|
104
|
|
- row.put("org_id", "" + code);
|
105
|
|
- switch (orgs.length) {
|
106
|
|
- case 1:
|
107
|
|
- row.put("org_name", orgs[0]);
|
108
|
|
- row.put("org_level", 1);
|
109
|
|
- row.put("org_parent", "0");
|
110
|
|
- break;
|
111
|
|
- case 2:
|
112
|
|
- row.put("org_name", orgs[1]);
|
113
|
|
- row.put("org_level", 2);
|
114
|
|
- row.put("org_parent", orgIDs.get(orgs[0]).toString());
|
115
|
|
- break;
|
116
|
|
- case 3:
|
117
|
|
- row.put("org_name", orgs[2]);
|
118
|
|
- row.put("org_level", 3);
|
119
|
|
- row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString());
|
120
|
|
- break;
|
121
|
|
- }
|
122
|
|
- rows.add(row);
|
123
|
|
- }
|
124
|
|
- return rows;
|
125
|
|
- }).split(body())
|
126
|
|
- .doTry()
|
127
|
|
- .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
|
128
|
|
- "values('${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}')" +
|
129
|
|
- "ON conflict(org_id_pre) DO UPDATE set org_code = '${body[org_code]}' "))
|
|
54
|
+ .process(exchange -> {
|
|
55
|
+ HashMap body = exchange.getIn().getBody(HashMap.class);
|
|
56
|
+ if(body==null|| StringUtils.isEmpty(body.get("max"))) org=0;
|
|
57
|
+ else
|
|
58
|
+ org = Integer.valueOf(body.get("max").toString());
|
|
59
|
+ })
|
|
60
|
+ .setBody(simple("select well_id from centralbase.sys_access_well_control where access_status='1' "))
|
130
|
61
|
.to("jdbc:centralbase")
|
131
|
|
- .doCatch(Exception.class)
|
132
|
|
- .log("${header.date}"+" routeId:oracle-1-> centralbase.cb_pc_organization insert data failed")
|
133
|
|
- .end()
|
134
|
|
- .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
|
|
62
|
+ .split(body()).process(exchange -> {
|
|
63
|
+ HashMap body = exchange.getIn().getBody(HashMap.class);
|
|
64
|
+ exchange.getIn().setHeader("well_id",body.get("well_id"));
|
|
65
|
+ })
|
|
66
|
+ .setBody(simple("select org_id_pre from centralbase.cb_pc_organization"))
|
|
67
|
+ .to("jdbc:centralbase")
|
|
68
|
+ .split(body()).process(exchange -> {
|
|
69
|
+ HashMap body = exchange.getIn().getBody(HashMap.class);
|
|
70
|
+ orgIdPreList.add(body.get("org_id_pre"));
|
|
71
|
+ }).end()
|
|
72
|
+ .setBody(simple("select distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from DBA01 where jh ='${header.well_id}' "))
|
|
73
|
+ .to("jdbc:oracle")
|
|
74
|
+ .transform()
|
|
75
|
+ .body((result) -> {
|
|
76
|
+ organization = new TreeSet<>();
|
|
77
|
+ orgID = 0;
|
|
78
|
+ orgIDs = new HashMap<>();
|
|
79
|
+ return result;
|
|
80
|
+ })
|
|
81
|
+ .step("1")
|
|
82
|
+ .split(body()).process(exchange -> {
|
|
83
|
+ Message in = exchange.getIn();
|
|
84
|
+ HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
|
85
|
+ String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK");
|
|
86
|
+ String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC");
|
|
87
|
+ String org_level1 = aRow.get("ZYQ").toString();
|
|
88
|
+ aRow.put("station_id", org_level3);
|
|
89
|
+ if (organization.add(org_level1)) {
|
|
90
|
+ orgID++;
|
|
91
|
+ orgIDs.put(org_level1, orgID);
|
|
92
|
+ }
|
|
93
|
+ if (organization.add(org_level2)) {
|
|
94
|
+ orgID++;
|
|
95
|
+ orgIDs.put(org_level2, orgID);
|
|
96
|
+ }
|
|
97
|
+ if (organization.add(org_level3)) {
|
|
98
|
+ orgID++;
|
|
99
|
+ orgIDs.put(org_level3, orgID);
|
|
100
|
+ }
|
|
101
|
+
|
|
102
|
+ })
|
|
103
|
+ .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
|
|
104
|
+ "select '${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}' " +
|
|
105
|
+ "where NOT EXISTS ( SELECT * FROM centralbase.cb_cd_well_source WHERE well_id = '${body[JH]}' )"))
|
|
106
|
+ .to("jdbc:centralbase")
|
|
107
|
+ .end()
|
|
108
|
+ .transform().body((re) -> {
|
|
109
|
+ List<Map<String, Object>> rows = new ArrayList<>();
|
|
110
|
+ Map<String, Object> row = new HashMap<>();
|
|
111
|
+// int code = org;
|
|
112
|
+ for (String s : organization) {
|
|
113
|
+ // code is same as org_id
|
|
114
|
+ String[] orgs = s.split("@");
|
|
115
|
+ row.put("org_id_pre", s);
|
|
116
|
+ switch (orgs.length) {
|
|
117
|
+ case 1:
|
|
118
|
+ row.put("org_name", orgs[0]);
|
|
119
|
+ row.put("org_level", 1);
|
|
120
|
+ row.put("org_parent", "0");
|
|
121
|
+ break;
|
|
122
|
+ case 2:
|
|
123
|
+ row.put("org_name", orgs[1]);
|
|
124
|
+ row.put("org_level", 2);
|
|
125
|
+ row.put("org_parent", orgIDs.get(orgs[0]).toString());
|
|
126
|
+ break;
|
|
127
|
+ case 3:
|
|
128
|
+ row.put("org_name", orgs[2]);
|
|
129
|
+ row.put("org_level", 3);
|
|
130
|
+ row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString());
|
|
131
|
+ break;
|
|
132
|
+ }
|
|
133
|
+ if (orgIdPreList.add(s)) {
|
|
134
|
+ org++;
|
|
135
|
+ row.put("org_code", org);
|
|
136
|
+ row.put("org_id", "" + org);
|
|
137
|
+ rows.add(row);
|
|
138
|
+ }
|
|
139
|
+ }
|
|
140
|
+ return rows;
|
|
141
|
+ }).split(body())
|
|
142
|
+ .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
|
|
143
|
+ "select '${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}' " +
|
|
144
|
+ "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_organization WHERE org_id = '${body[org_id]}' and org_code='${body[org_code]}' and org_name = '${body[org_name]}' and org_level = '${body[org_level]}' and parent_id ='${body[org_parent]}' and org_id_pre ='${body[org_id_pre]}')"))
|
|
145
|
+ .doTry()
|
|
146
|
+ .to("jdbc:centralbase")
|
|
147
|
+ .doCatch(Exception.class)
|
|
148
|
+ .log("${header.date}"+" routeId:insert-OrgAndWellSource-> centralbase.cb_pc_organization insert data failed")
|
|
149
|
+ .end()
|
|
150
|
+ .setBody(simple("select org_id,org_id_pre from centralbase.cb_pc_organization where org_level = '3' "))
|
|
151
|
+ .to("jdbc:centralbase")
|
|
152
|
+ .split(body())
|
|
153
|
+ .doTry()
|
|
154
|
+ .setBody(simple("update centralbase.cb_cd_well_source set org_id = '${body[org_id]}' where station_id = '${body[org_id_pre]}'"))
|
|
155
|
+ .to("jdbc:centralbase")
|
|
156
|
+ .doCatch(Exception.class)
|
|
157
|
+ .log("${header.date}"+" routeId:insert-OrgAndWellSource-> centralbase.cb_pc_organization update data failed")
|
|
158
|
+ .end();
|
|
159
|
+
|
|
160
|
+ from("timer:update-wellControl?period=3600000")
|
|
161
|
+ .routeId("update-wellControl")
|
|
162
|
+ .setBody(simple("select scc.well_id,wo.well_common_name,op.org_id from centralbase.sys_access_well_control scc\n" +
|
|
163
|
+ "left join centralbase.cb_cd_well_source wo on scc.well_id = wo.well_id\n" +
|
|
164
|
+ "left join centralbase.cb_pc_organization op on wo.org_id = op.org_id\n" +
|
|
165
|
+ "where scc.access_status ='1' "))
|
135
|
166
|
.to("jdbc:centralbase")
|
136
|
167
|
.split(body())
|
137
|
|
- .doTry()
|
138
|
|
- .setBody(simple("update centralbase.cb_cd_well_source set org_id = '${body[org_id]}' where station_name = '${body[org_name]}'"))
|
|
168
|
+ .split(body()).process(exchange -> {
|
|
169
|
+ HashMap<String, Object> body = exchange.getIn().getBody(HashMap.class);
|
|
170
|
+ String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
|
|
171
|
+ if (body.get("well_common_name") == null || body.get("well_common_name").equals("")) {
|
|
172
|
+ body.put("remarks","暂无井信息");
|
|
173
|
+ body.put("error_id",0);
|
|
174
|
+ body.put("updateTime",format);
|
|
175
|
+ }else if (body.get("org_id") == null || body.get("org_id").equals("")){
|
|
176
|
+ body.put("remarks","暂无机构信息");
|
|
177
|
+ body.put("error_id",0);
|
|
178
|
+ body.put("updateTime",format);
|
|
179
|
+ }else {
|
|
180
|
+ body.put("remarks","");
|
|
181
|
+ body.put("error_id",1);
|
|
182
|
+ body.put("updateTime",format);
|
|
183
|
+ }
|
|
184
|
+ })
|
|
185
|
+ .setBody(simple("update centralbase.sys_access_well_control set well_common_name='${body[well_common_name]}',org_id='${body[org_id]}',update_time = '${body[updateTime]}'::timestamp, remarks ='${body[remarks]}' ,error_id ='${body[error_id]}' where well_id ='${body[well_id]}' "))
|
139
|
186
|
.to("jdbc:centralbase")
|
140
|
|
- .doCatch(Exception.class)
|
141
|
|
- .log("${header.date}"+" routeId:oracle-1-> centralbase.cb_pc_organization update data failed")
|
142
|
|
- .end();*/
|
|
187
|
+ .end();
|
143
|
188
|
|
144
|
189
|
|
145
|
190
|
from("timer:mytimer-insert-statusDaily?period=3600000")
|
|
@@ -229,27 +274,6 @@ public class CamelJDBCConfiguration {
|
229
|
274
|
.log("${header.date}"+" routeId:update-statusDaily-submergenceDepth -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
|
230
|
275
|
.end();
|
231
|
276
|
|
232
|
|
-
|
233
|
|
- /* from("timer:mytimer-update-statusDaily-oil_nozzle?period=1800000")
|
234
|
|
- .routeId("update-statusDaily-oil_nozzle")
|
235
|
|
- .setHeader("date", constant(getDate() + " 00:00:00"))
|
236
|
|
- .setBody(simple("select distinct rn.well_id,cb.prod_date,rn.pump_diameter from centralbase.cb_temp_well_mech_runtime rn ,centralbase.cb_pc_pro_wellbore_status_daily cb where cb.well_id = rn.well_id " +
|
237
|
|
- "and cb.prod_date = '${header.date}' "))
|
238
|
|
- .to("jdbc:centralbase")//.log("${body}")
|
239
|
|
- .split(body()).process(exchange -> {
|
240
|
|
- Message in = exchange.getIn();
|
241
|
|
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
|
242
|
|
- aRow.putIfAbsent("pump_diameter", "0.0");
|
243
|
|
- })
|
244
|
|
- .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[pump_diameter]}' where well_id ='${body[well_id]}' and prod_date='${body[prod_date]}' "))
|
245
|
|
- .doTry()
|
246
|
|
- .to("jdbc:centralbase")
|
247
|
|
- .doCatch(Exception.class)
|
248
|
|
- .log("${header.date}"+" routeId:update-statusDaily-oil_nozzle -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
|
249
|
|
- .end();*/
|
250
|
|
-
|
251
|
|
-
|
252
|
|
-
|
253
|
277
|
from("timer:mytimer-insert-volDaily?period=3600000")
|
254
|
278
|
.routeId("insert-volDaily")
|
255
|
279
|
.process(exchange -> {
|