CamelJDBCConfiguration.java 25 KB


  1. package com.gct.tools.etlcamelhuge.routeconfig;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.gct.common.util.SGTUtil;
  4. import com.gct.tools.etlcamelhuge.MQ.DefaultMsgSendSuccessCallBack;
  5. import com.gct.tools.etlcamelhuge.MQ.MessageBody;
  6. import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
  7. import com.gct.tools.etlcamelhuge.camelconfig.MyDataSourceConfiguration;
  8. import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
  9. import lombok.Data;
  10. import org.apache.camel.*;
  11. import org.apache.camel.builder.RouteBuilder;
  12. //import org.apache.rocketmq.common.message.Message;
  13. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15. import org.springframework.context.annotation.Bean;
  16. import org.springframework.context.annotation.Configuration;
  17. import org.springframework.jdbc.core.JdbcTemplate;
  18. import org.springframework.scheduling.annotation.Async;
  19. import javax.annotation.Resource;
  20. import javax.sql.DataSource;
  21. import java.math.BigDecimal;
  22. import java.text.DecimalFormat;
  23. import java.text.SimpleDateFormat;
  24. import java.time.LocalDateTime;
  25. import java.time.format.DateTimeFormatter;
  26. import java.util.*;
  27. @Data
  28. class LogMessage{
  29. String id;
  30. LocalDateTime date;
  31. Object data;
  32. String msg;
  33. }
  34. /**
  35. * class name: CamelJDBCConfiguration
  36. *
  37. * @author lloyd
  38. * @version 1.0
  39. * @since 2021/4/14 下午3:16
  40. */
  41. @Configuration
  42. public class CamelJDBCConfiguration {
  43. public String getDate(){
  44. return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
  45. }
  46. @Bean
  47. public RouteBuilder routeBuilderWithOracle1() {
  48. return new RouteBuilder() {
  49. private SortedSet<String> organization;
  50. private Map<String, Integer> orgIDs;
  51. private Integer orgID;
  52. private Map<String, Integer> stringIntegerMap;
  53. //全部执行完成的大概时间在30-40分钟
  54. @Override
  55. public void configure() throws Exception {
  56. /*from("timer:mytimer1?period=604800000")
  57. .routeId("oracle-1")
  58. .setHeader("date", constant(getDate()))
  59. .setBody(simple("select distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
  60. .to("jdbc:oracle")
  61. .log("${header.date}"+"routeId:oracle-1-> select cb_cd_well_source need data failed")
  62. .transform()
  63. .body((result) -> {
  64. organization = new TreeSet<>();
  65. orgID = 0;
  66. orgIDs = new HashMap<>();
  67. return result;
  68. })
  69. .step("1")
  70. .split(body()).process(exchange -> {
  71. Message in = exchange.getIn();
  72. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  73. String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK");
  74. String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC");
  75. String org_level1 = aRow.get("ZYQ").toString();
  76. aRow.put("station_id", org_level3);
  77. if (organization.add(org_level1)) {
  78. orgID++;
  79. orgIDs.put(org_level1, orgID);
  80. }
  81. if (organization.add(org_level2)) {
  82. orgID++;
  83. orgIDs.put(org_level2, orgID);
  84. }
  85. if (organization.add(org_level3)) {
  86. orgID++;
  87. orgIDs.put(org_level3, orgID);
  88. }
  89. })
  90. .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
  91. "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}')" +
  92. " ON conflict(well_id) DO UPDATE set remarks = '${body[BZ]}' "))
  93. .to("jdbc:centralbase")
  94. .log("${header.date}"+" routeId:oracle-1-> centralbase.cb_cd_well_source insert data failed")
  95. .end()
  96. .transform().body((re) -> {
  97. List<Map<String, Object>> rows = new ArrayList<>();
  98. int code = 0;
  99. for (String s : organization) {
  100. code++;// code is same as org_id
  101. String[] orgs = s.split("@");
  102. Map<String, Object> row = new HashMap<>();
  103. row.put("org_id_pre", s);
  104. row.put("org_code", code);
  105. row.put("org_id", "" + code);
  106. switch (orgs.length) {
  107. case 1:
  108. row.put("org_name", orgs[0]);
  109. row.put("org_level", 1);
  110. row.put("org_parent", "0");
  111. break;
  112. case 2:
  113. row.put("org_name", orgs[1]);
  114. row.put("org_level", 2);
  115. row.put("org_parent", orgIDs.get(orgs[0]).toString());
  116. break;
  117. case 3:
  118. row.put("org_name", orgs[2]);
  119. row.put("org_level", 3);
  120. row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString());
  121. break;
  122. }
  123. rows.add(row);
  124. }
  125. return rows;
  126. }).split(body())
  127. .doTry()
  128. .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
  129. "values('${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}')" +
  130. "ON conflict(org_id_pre) DO UPDATE set org_code = '${body[org_code]}' "))
  131. .to("jdbc:centralbase")
  132. .doCatch(Exception.class)
  133. .log("${header.date}"+" routeId:oracle-1-> centralbase.cb_pc_organization insert data failed")
  134. .end()
  135. .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
  136. .to("jdbc:centralbase")
  137. .split(body())
  138. .doTry()
  139. .setBody(simple("update centralbase.cb_cd_well_source set org_id = '${body[org_id]}' where station_name = '${body[org_name]}'"))
  140. .to("jdbc:centralbase")
  141. .doCatch(Exception.class)
  142. .log("${header.date}"+" routeId:oracle-1-> centralbase.cb_pc_organization update data failed")
  143. .end();*/
  144. from("timer:mytimer-insert-statusDaily?period=3600000")
  145. .routeId("insert-statusDaily")
  146. .process(exchange -> {
  147. Message in = exchange.getIn();
  148. in.setHeader("date",getDate());
  149. })
  150. .setBody(simple("select distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
  151. .to("jdbc:oracle")
  152. .split(body()).process(exchange -> {
  153. Message in = exchange.getIn();
  154. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  155. if (aRow.get("YZ") == null) aRow.put("YZ", "0.0");
  156. if (aRow.get("HYSX") == null) aRow.put("HYSX", "0.0");
  157. if (aRow.get("YYSX") == null) aRow.put("YYSX", "0.0");
  158. if (aRow.get("TYSX") == null) aRow.put("TYSX", "0.0");
  159. if (aRow.get("BS") == null) aRow.put("BS", "0.0");
  160. })
  161. .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth) " +
  162. "select '${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}' " +
  163. "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_status_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}')"))
  164. .doTry()
  165. .to("jdbc:centralbase")
  166. .doCatch(Exception.class)
  167. .log("${header.date}"+" routeId:insert-statusDaily -> centralbase.cb_pc_pro_wellbore_status_daily insert data failed")
  168. .end();
  169. from("timer:mytimer-update-statusDaily-DYM?period=3600000")
  170. .routeId("update-statusDaily-DYM")
  171. .process(exchange -> {
  172. Message in = exchange.getIn();
  173. in.setHeader("date",getDate());
  174. })
  175. .setBody(simple("select distinct jh , rq , dym from DBA01 where (jh,rq) in (SELECT jh,max(rq) rq FROM DBA01 WHERE dym is not null and rq<= to_date('${header.date}','yyyy-MM-dd') group by jh)"))
  176. .to("jdbc:oracle")
  177. .split(body())
  178. .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' and prod_date::date = '${header.date}' "))
  179. .doTry()
  180. .to("jdbc:centralbase")
  181. .doCatch(Exception.class)
  182. .log("${header.date}"+" routeId:update-statusDaily-DYM -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
  183. .end();
  184. //因为数据库有可能会查出多条数据,现在的做法时,在数据库中增加两个个字段,用来记录BJ和DYM 的更新时间 ,只有获取到的时间大于数据库中存储的时间时,才会更新,并且该字段也会更新
  185. from("timer:mytimer-update-statusDaily-BJ?period=3600000")
  186. .routeId("update-statusDaily-BJ")
  187. .process(exchange -> {
  188. Message in = exchange.getIn();
  189. in.setHeader("date",getDate());
  190. })
  191. .setBody(simple("select distinct jh , rq , bj from DBA01 where (jh,rq) in (SELECT jh,max(rq) rq FROM DBA01 WHERE bj is not null and rq<= to_date('${header.date}','yyyy-MM-dd') group by jh)"))
  192. .to("jdbc:oracle")
  193. .split(body()).process(exchange -> {
  194. HashMap body = exchange.getIn().getBody(HashMap.class);
  195. })
  196. .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[BJ]}' where well_id = '${body[JH]}' and prod_date::date = '${header.date}' "))
  197. .doTry()
  198. .to("jdbc:centralbase")
  199. .doCatch(Exception.class)
  200. .log("${header.date}"+" routeId:update-statusDaily-BJ -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
  201. .end();
  202. from("timer:mytimer-update-statusDaily-submergenceDepth?period=3600000")
  203. .routeId("update-statusDaily-submergenceDepth")
  204. .process(exchange -> {
  205. Message in = exchange.getIn();
  206. in.setHeader("date",getDate()+" 00:00:00");
  207. })
  208. .setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' "))
  209. .to("jdbc:centralbase")
  210. .split(body()).process(exchange -> {
  211. Message in = exchange.getIn();
  212. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  213. aRow.put("submergence_depth",null);
  214. if (aRow.get("start_pump_liq_level")!=null && aRow.get("pump_depth")!=null){
  215. double cmd= Double.valueOf(aRow.get("pump_depth").toString())-Double.valueOf(aRow.get("start_pump_liq_level").toString())/10;
  216. BigDecimal bd=new BigDecimal(cmd);
  217. double cmd1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
  218. aRow.put("submergence_depth",cmd1);
  219. }
  220. })
  221. .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set submergence_depth = '${body[submergence_depth]}' where well_id = '${body[well_id]}' and prod_date = '${body[prod_date]}'"))
  222. .doTry()
  223. .to("jdbc:centralbase")
  224. .doCatch(Exception.class)
  225. .log("${header.date}"+" routeId:update-statusDaily-submergenceDepth -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
  226. .end();
  227. /* from("timer:mytimer-update-statusDaily-oil_nozzle?period=1800000")
  228. .routeId("update-statusDaily-oil_nozzle")
  229. .setHeader("date", constant(getDate() + " 00:00:00"))
  230. .setBody(simple("select distinct rn.well_id,cb.prod_date,rn.pump_diameter from centralbase.cb_temp_well_mech_runtime rn ,centralbase.cb_pc_pro_wellbore_status_daily cb where cb.well_id = rn.well_id " +
  231. "and cb.prod_date = '${header.date}' "))
  232. .to("jdbc:centralbase")//.log("${body}")
  233. .split(body()).process(exchange -> {
  234. Message in = exchange.getIn();
  235. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  236. aRow.putIfAbsent("pump_diameter", "0.0");
  237. })
  238. .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[pump_diameter]}' where well_id ='${body[well_id]}' and prod_date='${body[prod_date]}' "))
  239. .doTry()
  240. .to("jdbc:centralbase")
  241. .doCatch(Exception.class)
  242. .log("${header.date}"+" routeId:update-statusDaily-oil_nozzle -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
  243. .end();*/
  244. from("timer:mytimer-insert-volDaily?period=3600000")
  245. .routeId("insert-volDaily")
  246. .process(exchange -> {
  247. Message in = exchange.getIn();
  248. in.setHeader("date",getDate());
  249. })
  250. .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
  251. .to("jdbc:oracle")
  252. .split(body()).process(exchange -> {
  253. Message in = exchange.getIn();
  254. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  255. if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0");
  256. if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0");
  257. if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0");
  258. if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0");
  259. if (aRow.get("HS") == null) aRow.put("HS", "0.0");
  260. if (aRow.get("BZ") == null) aRow.put("BZ", "");
  261. aRow.put("RCSL",-1);
  262. aRow.put("QYB",-1);
  263. aRow.put("SQB",-1);
  264. if (aRow.get("RCQL")!=null && aRow.get("RCYL")!=null && !aRow.get("RCYL").equals("0.0") && aRow.get("RCYL") != "0.0"){
  265. double qyb = Double.valueOf(aRow.get("RCQL").toString()) / Double.valueOf(aRow.get("RCYL").toString());
  266. if (!Double.isNaN(qyb) && !Double.isInfinite(qyb)){
  267. BigDecimal bd=new BigDecimal(qyb);
  268. double d1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
  269. aRow.put("QYB",d1);
  270. }
  271. }
  272. if (aRow.get("RCYL1")!=null && aRow.get("HS") != null && !aRow.get("HS").equals("0.0") && aRow.get("HS") != "0.0"){
  273. double rcsl = (Double.valueOf(aRow.get("RCYL1").toString()) * Double.valueOf(aRow.get("HS").toString()))/100;
  274. if (!Double.isNaN(rcsl) && !Double.isInfinite(rcsl)) {
  275. BigDecimal bd = new BigDecimal(rcsl);
  276. double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
  277. aRow.put("RCSL", d1);
  278. }
  279. }
  280. if (aRow.get("RCQL") != null && aRow.get("RCSL") != null && !aRow.get("RCQL").equals("0.0") && aRow.get("RCQL") != "0.0" ){
  281. double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("RCQL").toString());
  282. if (!Double.isNaN(sqb) && !Double.isInfinite(sqb)) {
  283. BigDecimal bd = new BigDecimal(sqb);
  284. double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
  285. aRow.put("SQB", d1);
  286. }
  287. }
  288. if (!aRow.containsKey("SMD")){
  289. aRow.put("SMD",1);
  290. }
  291. if (!aRow.containsKey("YMD")){
  292. aRow.put("YMD",0.85);
  293. }
  294. })
  295. .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks,gas_oil_ratio,water_prod_daily,water_gas_ratio,surface_crude_water_density,surface_crude_oil_density) " +
  296. "select '${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}','${body[QYB]}','${body[RCSL]}','${body[SQB]}','${body[SMD]}','${body[YMD]}' " +
  297. "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}' )"))
  298. .doTry()
  299. .to("jdbc:centralbase")
  300. .doCatch(Exception.class)
  301. .log("${header.date}"+" routeId:insert-volDaily -> centralbase.cb_pc_pro_wellbore_vol_daily insert data failed")
  302. .endDoTry()
  303. .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set water_prod_daily =null where water_prod_daily = -1 and prod_date = '${header.date}' "))
  304. .to("jdbc:centralbase")
  305. .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set gas_oil_ratio =null where gas_oil_ratio = -1 and prod_date = '${header.date}' "))
  306. .to("jdbc:centralbase")
  307. .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set water_gas_ratio =null where water_gas_ratio = -1 and prod_date = '${header.date}' "))
  308. .to("jdbc:centralbase")
  309. .end();
  310. from("timer:mytimer-update-volDaily-liq_prod_daily?period=3600000")
  311. .routeId("update-volDaily-liq_prod_daily")
  312. .process(exchange -> {
  313. Message in = exchange.getIn();
  314. in.setHeader("date",getDate());
  315. })
  316. .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
  317. .to("jdbc:oracle")
  318. .split(body()).process(exchange -> {
  319. Message in = exchange.getIn();
  320. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  321. if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0");
  322. if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0");
  323. if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0");
  324. if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0");
  325. if (aRow.get("HS") == null) aRow.put("HS", "0.0");
  326. if (aRow.get("BZ") == null) aRow.put("BZ", "");
  327. aRow.put("RCSL",-1);
  328. aRow.put("QYB",-1);
  329. aRow.put("SQB",-1);
  330. if (aRow.get("RCQL")!=null && aRow.get("RCYL")!=null && !aRow.get("RCYL").equals("0.0") && aRow.get("RCYL") != "0.0"){
  331. double qyb = Double.valueOf(aRow.get("RCQL").toString()) / Double.valueOf(aRow.get("RCYL").toString());
  332. if (!Double.isNaN(qyb) && !Double.isInfinite(qyb)){
  333. BigDecimal bd=new BigDecimal(qyb);
  334. double d1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
  335. aRow.put("QYB",d1);
  336. }
  337. }
  338. if (aRow.get("RCYL1")!=null && aRow.get("HS") != null && !aRow.get("HS").equals("0.0") && aRow.get("HS") != "0.0"){
  339. double rcsl = (Double.valueOf(aRow.get("RCYL1").toString()) * Double.valueOf(aRow.get("HS").toString()))/100;
  340. if (!Double.isNaN(rcsl) && !Double.isInfinite(rcsl)) {
  341. BigDecimal bd = new BigDecimal(rcsl);
  342. double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
  343. aRow.put("RCSL", d1);
  344. }
  345. }
  346. if (aRow.get("RCQL") != null && aRow.get("RCSL") != null && !aRow.get("RCQL").equals("0.0") && aRow.get("RCQL") != "0.0" ){
  347. double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("RCQL").toString());
  348. if (!Double.isNaN(sqb) && !Double.isInfinite(sqb)) {
  349. BigDecimal bd = new BigDecimal(sqb);
  350. double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
  351. aRow.put("SQB", d1);
  352. }
  353. }
  354. if (!aRow.containsKey("SMD")){
  355. aRow.put("SMD",1);
  356. }
  357. if (!aRow.containsKey("YMD")){
  358. aRow.put("YMD",0.85);
  359. }
  360. })
  361. .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set prod_time = '${body[SCSJ]}' ,liq_prod_daily='${body[RCYL1]}' ,oil_prod_daily ='${body[RCYL]}' ,gas_prod_daily ='${body[RCQL]}' ,water_cut='${body[HS]}' ,remarks='${body[BZ]}' ,gas_oil_ratio='${body[QYB]}' ,water_prod_daily='${body[RCSL]}' ,water_gas_ratio='${body[SQB]}',surface_crude_water_density='${body[SMD]}',surface_crude_oil_density= '${body[YMD]}' " +
  362. "where well_id = '${body[JH]}' and prod_date ='${body[RQ]}' "))
  363. .doTry()
  364. .to("jdbc:centralbase")
  365. .doCatch(Exception.class)
  366. .log("${header.date}"+" routeId:update-volDaily-liq_prod_daily -> centralbase.cb_pc_pro_wellbore_vol_daily update data failed")
  367. .end();
  368. };
  369. };
  370. }
  371. }