CamelJDBCConfiguration.java 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. package com.gct.tools.etlcamelhuge.routeconfig;
  2. import org.apache.camel.*;
  3. import org.apache.camel.builder.RouteBuilder;
  4. //import org.apache.rocketmq.common.message.Message;
  5. import org.apache.camel.model.ExpressionNode;
  6. import org.apache.camel.model.ProcessorDefinition;
  7. import org.apache.camel.model.RouteDefinition;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import org.springframework.util.StringUtils;
  11. import java.math.BigDecimal;
  12. import java.math.RoundingMode;
  13. import java.time.LocalDateTime;
  14. import java.time.format.DateTimeFormatter;
  15. import java.util.*;
  16. /**
  17. * class name: CamelJDBCConfiguration
  18. *
  19. * @author lloyd
  20. * @version 1.0
  21. * @since 2021/4/14 下午3:16
  22. */
  23. @Configuration
  24. public class CamelJDBCConfiguration {
  25. public String getDate(){
  26. return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
  27. }
  28. @Bean
  29. public RouteBuilder routeBuilderWithOracle1() {
  30. return new RouteBuilder() {
  31. private SortedSet<String> organization;
  32. private Map<String, Integer> orgIDs = new HashMap<>();
  33. private int orgID;
  34. private Map<Object, Object> orgIdPreList;
  35. private Integer org;
  36. public ProcessorDefinition<ExpressionNode> setMyBody(RouteDefinition route){
  37. return route.setBody(simple("select well_id from centralbase.sys_access_well_control where access_status='1' "))
  38. .to("jdbc:centralbase")
  39. .split(body()).process(exchange -> {
  40. HashMap body = exchange.getIn().getBody(HashMap.class);
  41. exchange.getIn().setHeader("well_id",body.get("well_id"));
  42. });
  43. }
  44. @Override
  45. public void configure() throws Exception {
  46. RouteDefinition OrgAndWellSource= (RouteDefinition) from("timer:insert-OrgAndWellSource?period=86400000")
  47. .routeId("insert-OrgAndWellSource")
  48. .setHeader("date", constant(getDate()+" 00:00:00"))
  49. .process(exchange -> {
  50. org = 0;
  51. orgIdPreList = new HashMap<>();
  52. })
  53. .setBody(simple("select max(org_id) from centralbase.cb_pc_organization"))
  54. .to("jdbc:centralbase")
  55. .process(exchange -> {
  56. HashMap body = exchange.getIn().getBody(HashMap.class);
  57. if (body == null || StringUtils.isEmpty(body.get("max"))) org = 0;
  58. else
  59. org = Integer.valueOf(body.get("max").toString());
  60. })
  61. .setBody(simple("select org_id_pre,org_id from centralbase.cb_pc_organization"))
  62. .to("jdbc:centralbase")
  63. .split(body()).process(exchange -> {
  64. HashMap body = exchange.getIn().getBody(HashMap.class);
  65. orgIdPreList.put(body.get("org_id_pre"),body.get("org_id"));
  66. }).end();
  67. setMyBody(OrgAndWellSource)
  68. .setBody(simple("select distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from DBA01 where rq=to_date('${header.date}','yyyy-mm-dd hh24:mi:ss') and jh ='${header.well_id}'"))
  69. .to("jdbc:oracle")
  70. .transform()
  71. .body((result) -> {
  72. organization = new TreeSet<>();
  73. return result;
  74. })
  75. .step("1")
  76. .split(body()).process(exchange -> {
  77. Message in = exchange.getIn();
  78. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  79. if (StringUtils.isEmpty(aRow.get("qyrq"))){
  80. aRow.put("QYRQ","2021-01-01 00:00:00");
  81. }
  82. if (!aRow.containsKey("JM")) aRow.put("JM",aRow.get("JH"));
  83. String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK");
  84. String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC");
  85. String org_level1 = aRow.get("ZYQ").toString();
  86. aRow.put("station_id", org_level3);
  87. orgID = org;
  88. if ((!orgIdPreList.containsKey(org_level1)) || (!orgIdPreList.containsKey(org_level2)) || (!orgIdPreList.containsKey(org_level3))) {
  89. if (organization.add(org_level1)) {
  90. if (!orgIDs.containsKey(org_level1)) orgIDs.put(org_level1,++orgID);
  91. }
  92. if (organization.add(org_level2)) {
  93. if (!orgIDs.containsKey(org_level2)) orgIDs.put(org_level2,++orgID);
  94. }
  95. if (organization.add(org_level3)) {
  96. if (!orgIDs.containsKey(org_level3)) orgIDs.put(org_level3,++orgID);
  97. }
  98. }
  99. if(orgIdPreList.get(org_level3) !=null){
  100. aRow.put("org_id",orgIdPreList.get(org_level3));
  101. return;
  102. }
  103. if(orgIDs.get(org_level3) !=null){
  104. aRow.put("org_id",orgIDs.get(org_level3));
  105. }
  106. })
  107. .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,org_id,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
  108. "select '${body[JH]}','${body[JM]}','${body[QYRQ]}'::timestamp,'${body[org_id]}','${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}' " +
  109. "where NOT EXISTS ( SELECT * FROM centralbase.cb_cd_well_source WHERE well_id = '${body[JH]}' )"))
  110. .to("jdbc:centralbase")
  111. .end()
  112. .transform().body((re) -> {
  113. List<Map<String, Object>> rows = new ArrayList<>();
  114. for (String s : organization) {
  115. Map<String, Object> row = new HashMap<>();
  116. String[] orgs = s.split("@");
  117. row.put("org_id_pre", s);
  118. switch (orgs.length) {
  119. case 1:
  120. row.put("org_name", orgs[0]);
  121. row.put("org_level", 1);
  122. row.put("org_parent", "0");
  123. break;
  124. case 2:
  125. row.put("org_name", orgs[1]);
  126. row.put("org_level", 2);
  127. row.put("org_parent", orgIDs.get(orgs[0]).toString());
  128. break;
  129. case 3:
  130. row.put("org_name", orgs[2]);
  131. row.put("org_level", 3);
  132. row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString());
  133. break;
  134. }
  135. if (!orgIdPreList.containsKey(s)) {
  136. org++;
  137. row.put("org_code", org);
  138. row.put("org_id", "" + org);
  139. orgIdPreList.put(s, row.get("org_id"));
  140. rows.add(row);
  141. }
  142. }
  143. return rows;
  144. }).split(body())
  145. .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
  146. "select '${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}' " +
  147. "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_organization WHERE org_id = '${body[org_id]}')"))
  148. .doTry()
  149. .to("jdbc:centralbase")
  150. .doCatch(Exception.class)
  151. .log("${header.date}"+" routeId:insert-OrgAndWellSource-> centralbase.cb_pc_organization insert data failed")
  152. .end();
  153. from("timer:update-wellControl?period=3600000")
  154. .routeId("update-wellControl")
  155. .setBody(simple("select scc.well_id,wo.well_common_name,op.org_id from centralbase.sys_access_well_control scc\n" +
  156. "left join centralbase.cb_cd_well_source wo on scc.well_id = wo.well_id\n" +
  157. "left join centralbase.cb_pc_organization op on wo.org_id = op.org_id\n" +
  158. "where scc.access_status ='1' "))
  159. .to("jdbc:centralbase")
  160. .split(body())
  161. .split(body()).process(exchange -> {
  162. HashMap<String, Object> body = exchange.getIn().getBody(HashMap.class);
  163. DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  164. String format = LocalDateTime.now().format(df);
  165. String yesterday = LocalDateTime.now().minusDays(1).format(df);
  166. body.put("yesterday", yesterday);
  167. if (body.get("well_common_name") == null || body.get("well_common_name").equals("")) {
  168. body.put("remarks","暂无井信息");
  169. body.put("error_id",0);
  170. body.put("updateTime",format);
  171. }else if (body.get("org_id") == null || body.get("org_id").equals("")){
  172. body.put("remarks","暂无机构信息");
  173. body.put("error_id",0);
  174. body.put("updateTime",format);
  175. }else {
  176. body.put("remarks","");
  177. body.put("error_id",1);
  178. body.put("updateTime",format);
  179. }
  180. })
  181. //之前在数据导入时没有自动填写入最后更新时间,会导致新加入的数据无法自动导入
  182. .setBody(simple("update centralbase.sys_access_well_control set well_common_name='${body[well_common_name]}'," +
  183. "org_id='${body[org_id]}',update_time = '${body[updateTime]}'::timestamp,sgt_last_time='${body[updateTime]}', " +
  184. "remarks ='${body[remarks]}' ,error_id ='${body[error_id]}' " +
  185. "where well_id ='${body[well_id]}' and " +
  186. "(sgt_last_time is null or sgt_last_time<'${body[yesterday]}'::timestamp)"))
  187. .to("jdbc:centralbase")
  188. .end();
  189. RouteDefinition statusDaily= from("timer:mytimer-insert-statusDaily?period=10800000")
  190. .routeId("insert-statusDaily")
  191. .process(exchange -> {
  192. Message in = exchange.getIn();
  193. in.setHeader("date",getDate());
  194. });
  195. setMyBody(statusDaily)
  196. .setBody(simple("select distinct jh,rq,cyfs,yz,hysx ,bj, yysx ,tysx,bs,dym from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and jh='${header.well_id}' and qyrq is not null "))
  197. .to("jdbc:oracle")
  198. .split(body()).process(exchange -> {
  199. Message in = exchange.getIn();
  200. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  201. if (aRow.get("YZ") == null) aRow.put("YZ", "0.0");
  202. if (aRow.get("BJ") == null) aRow.put("BJ", "0.0");
  203. if (aRow.get("HYSX") == null) aRow.put("HYSX", "0.0");
  204. if (aRow.get("YYSX") == null) aRow.put("YYSX", "0.0");
  205. if (aRow.get("TYSX") == null) aRow.put("TYSX", "0.0");
  206. if (aRow.get("BS") == null) aRow.put("BS", "0.0");
  207. })
  208. .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth) " +
  209. "select '${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[BJ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}' " +
  210. "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_status_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}')"))
  211. .doTry()
  212. .to("jdbc:centralbase")
  213. .doCatch(Exception.class)
  214. .log("${header.date}"+" routeId:insert-statusDaily -> centralbase.cb_pc_pro_wellbore_status_daily insert data failed")
  215. .end();
  216. RouteDefinition statusDailyDYM = from("timer:mytimer-update-statusDaily-DYM?period=10800000")
  217. .routeId("update-statusDaily-DYM")
  218. .process(exchange -> {
  219. Message in = exchange.getIn();
  220. in.setHeader("date",getDate());
  221. });
  222. setMyBody(statusDailyDYM)
  223. .setBody(simple("select distinct jh , csrq , ymsd from ZD_YTGSCX.DCA002_ZD where (jh,csrq) in (SELECT jh,max(csrq) csrq FROM ZD_YTGSCX.DCA002_ZD WHERE ymsd is not null and jh='${header.well_id}' group by jh)"))
  224. .to("jdbc:oracle")
  225. .split(body())
  226. .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[YMSD]}' where well_id = '${body[JH]}' and prod_date::date = '${header.date}' "))
  227. .doTry()
  228. .to("jdbc:centralbase")
  229. .doCatch(Exception.class)
  230. .log("${header.date}"+" routeId:update-statusDaily-DYM -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
  231. .end();
  232. RouteDefinition submergenceDepth = from("timer:mytimer-update-statusDaily-submergenceDepth?period=10800000")
  233. .routeId("update-statusDaily-submergenceDepth")
  234. .process(exchange -> {
  235. Message in = exchange.getIn();
  236. in.setHeader("date",getDate()+" 00:00:00");
  237. });
  238. setMyBody(submergenceDepth)
  239. .setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' and well_id='${header.well_id}' "))
  240. .to("jdbc:centralbase")
  241. .split(body()).process(exchange -> {
  242. Message in = exchange.getIn();
  243. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  244. aRow.put("submergence_depth",null);
  245. if (aRow.get("start_pump_liq_level")!=null && aRow.get("pump_depth")!=null){
  246. double cmd= Double.parseDouble(aRow.get("pump_depth").toString())-Double.parseDouble(aRow.get("start_pump_liq_level").toString())/10;
  247. BigDecimal bd=new BigDecimal(cmd);
  248. double cmd1=bd.setScale(1, RoundingMode.HALF_UP).doubleValue();
  249. aRow.put("submergence_depth",cmd1);
  250. }
  251. })
  252. .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set submergence_depth = '${body[submergence_depth]}' where well_id = '${body[well_id]}' and prod_date = '${body[prod_date]}'"))
  253. .doTry()
  254. .to("jdbc:centralbase")
  255. .doCatch(Exception.class)
  256. .log("${header.date}"+" routeId:update-statusDaily-submergenceDepth -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
  257. .end();
  258. RouteDefinition volDaily = from("timer:mytimer-insert-volDaily?period=10800000")
  259. .routeId("insert-volDaily")
  260. .process(exchange -> {
  261. Message in = exchange.getIn();
  262. in.setHeader("date",getDate());
  263. });
  264. setMyBody(volDaily)
  265. .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz,sccw from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and jh ='${header.well_id}' and qyrq is not null "))
  266. .to("jdbc:oracle")
  267. .split(body()).process(exchange -> {
  268. Message in = exchange.getIn();
  269. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  270. if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0");
  271. if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0");
  272. if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0");
  273. if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0");
  274. if (aRow.get("HS") == null) aRow.put("HS", "0.0");
  275. if (aRow.get("BZ") == null) aRow.put("BZ", "");
  276. if (aRow.get("SCCW") == null) aRow.put("SCCW", "");
  277. aRow.put("RCSL",-1);
  278. aRow.put("QYB",-1);
  279. aRow.put("SQB",-1);
  280. if (aRow.get("RCQL")!=null && aRow.get("RCYL")!=null && !aRow.get("RCYL").equals("0.0") && aRow.get("RCYL") != "0.0"){
  281. double qyb = Double.valueOf(aRow.get("RCQL").toString()) / Double.valueOf(aRow.get("RCYL").toString());
  282. if (!Double.isNaN(qyb) && !Double.isInfinite(qyb)){
  283. BigDecimal bd=new BigDecimal(qyb);
  284. double d1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
  285. aRow.put("QYB",d1);
  286. }
  287. }
  288. if (aRow.get("RCYL1")!=null && aRow.get("HS") != null && !aRow.get("HS").equals("0.0") && aRow.get("HS") != "0.0"){
  289. double rcsl = (Double.valueOf(aRow.get("RCYL1").toString()) * Double.valueOf(aRow.get("HS").toString()))/100;
  290. if (!Double.isNaN(rcsl) && !Double.isInfinite(rcsl)) {
  291. BigDecimal bd = new BigDecimal(rcsl);
  292. double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
  293. aRow.put("RCSL", d1);
  294. }
  295. }
  296. if (aRow.get("RCQL") != null && aRow.get("RCSL") != null && !aRow.get("RCQL").equals("0.0") && aRow.get("RCQL") != "0.0" ){
  297. double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("RCQL").toString());
  298. if (!Double.isNaN(sqb) && !Double.isInfinite(sqb)) {
  299. BigDecimal bd = new BigDecimal(sqb);
  300. double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
  301. aRow.put("SQB", d1);
  302. }
  303. }
  304. if (!aRow.containsKey("SMD")){
  305. aRow.put("SMD",1);
  306. }
  307. if (!aRow.containsKey("YMD")){
  308. aRow.put("YMD",0.85);
  309. }
  310. })
  311. .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks,gas_oil_ratio,water_prod_daily,water_gas_ratio,surface_crude_water_density,surface_crude_oil_density,stim_content) " +
  312. "select '${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}','${body[QYB]}','${body[RCSL]}','${body[SQB]}','${body[SMD]}','${body[YMD]}' ,'${body[SCCW]}'" +
  313. "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}' )"))
  314. .doTry()
  315. .to("jdbc:centralbase")
  316. .doCatch(Exception.class)
  317. .log("${header.date}"+" routeId:insert-volDaily -> centralbase.cb_pc_pro_wellbore_vol_daily insert data failed")
  318. .endDoTry()
  319. .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set water_prod_daily =null where water_prod_daily = -1 and prod_date = '${header.date}' "))
  320. .to("jdbc:centralbase")
  321. .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set gas_oil_ratio =null where gas_oil_ratio = -1 and prod_date = '${header.date}' "))
  322. .to("jdbc:centralbase")
  323. .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set water_gas_ratio =null where water_gas_ratio = -1 and prod_date = '${header.date}' "))
  324. .to("jdbc:centralbase")
  325. .end();
  326. RouteDefinition volDailyLiqProdDaily = from("timer:mytimer-update-volDaily-liq_prod_daily?period=10800000")
  327. .routeId("update-volDaily-liq_prod_daily")
  328. .process(exchange -> {
  329. Message in = exchange.getIn();
  330. in.setHeader("date",getDate());
  331. });
  332. setMyBody(volDailyLiqProdDaily)
  333. .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd')and jh ='${header.well_id}' and qyrq is not null "))
  334. .to("jdbc:oracle")
  335. .split(body()).process(exchange -> {
  336. Message in = exchange.getIn();
  337. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  338. if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0");
  339. if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0");
  340. if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0");
  341. if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0");
  342. if (aRow.get("HS") == null) aRow.put("HS", "0.0");
  343. if (aRow.get("BZ") == null) aRow.put("BZ", "");
  344. aRow.put("RCSL",-1);
  345. aRow.put("QYB",-1);
  346. aRow.put("SQB",-1);
  347. if (aRow.get("RCQL")!=null && aRow.get("RCYL")!=null && !aRow.get("RCYL").equals("0.0") && aRow.get("RCYL") != "0.0"){
  348. double qyb = Double.valueOf(aRow.get("RCQL").toString()) / Double.valueOf(aRow.get("RCYL").toString());
  349. if (!Double.isNaN(qyb) && !Double.isInfinite(qyb)){
  350. BigDecimal bd=new BigDecimal(qyb);
  351. double d1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
  352. aRow.put("QYB",d1);
  353. }
  354. }
  355. if (aRow.get("RCYL1")!=null && aRow.get("HS") != null && !aRow.get("HS").equals("0.0") && aRow.get("HS") != "0.0"){
  356. double rcsl = (Double.valueOf(aRow.get("RCYL1").toString()) * Double.valueOf(aRow.get("HS").toString()))/100;
  357. if (!Double.isNaN(rcsl) && !Double.isInfinite(rcsl)) {
  358. BigDecimal bd = new BigDecimal(rcsl);
  359. double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
  360. aRow.put("RCSL", d1);
  361. }
  362. }
  363. if (aRow.get("RCQL") != null && aRow.get("RCSL") != null && !aRow.get("RCQL").equals("0.0") && aRow.get("RCQL") != "0.0" ){
  364. double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("RCQL").toString());
  365. if (!Double.isNaN(sqb) && !Double.isInfinite(sqb)) {
  366. BigDecimal bd = new BigDecimal(sqb);
  367. double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
  368. aRow.put("SQB", d1);
  369. }
  370. }
  371. if (!aRow.containsKey("SMD")){
  372. aRow.put("SMD",1);
  373. }
  374. if (!aRow.containsKey("YMD")){
  375. aRow.put("YMD",0.85);
  376. }
  377. })
  378. .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set prod_time = '${body[SCSJ]}' ,liq_prod_daily='${body[RCYL1]}' ,oil_prod_daily ='${body[RCYL]}' ,gas_prod_daily ='${body[RCQL]}' ,water_cut='${body[HS]}' ,remarks='${body[BZ]}' ,gas_oil_ratio='${body[QYB]}' ,water_prod_daily='${body[RCSL]}' ,water_gas_ratio='${body[SQB]}',surface_crude_water_density='${body[SMD]}',surface_crude_oil_density= '${body[YMD]}' " +
  379. "where well_id = '${body[JH]}' and prod_date ='${body[RQ]}' "))
  380. .doTry()
  381. .to("jdbc:centralbase")
  382. .doCatch(Exception.class)
  383. .log("${header.date}"+" routeId:update-volDaily-liq_prod_daily -> centralbase.cb_pc_pro_wellbore_vol_daily update data failed")
  384. .end();
  385. RouteDefinition runTime = from("timer:mytimer-update-runTime-strokeAndFrequency?period=10800000")
  386. .routeId("update-runTime-strokeAndFrequency")
  387. .process(exchange -> {
  388. Message in = exchange.getIn();
  389. in.setHeader("date",getDate());
  390. });
  391. setMyBody(runTime)
  392. .setBody(simple("select distinct jh,rq,cc,cs from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd')and jh ='${header.well_id}' and qyrq is not null "))
  393. .to("jdbc:oracle")
  394. .split(body())
  395. .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency)" +
  396. "select '${body[JH]}','${body[RQ]}','${body[CC]}','${body[CS]}'" +
  397. "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}' )"))
  398. .doTry()
  399. .to("jdbc:centralbase")
  400. .doCatch(Exception.class)
  401. .log("${header.date}"+" routeId:update-runTime-strokeAndFrequency -> centralbase.cb_temp_well_mech_runtime update data failed")
  402. .end();
  403. };
  404. };
  405. }
  406. }