CamelJDBCConfiguration.java 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. package com.gct.tools.etlcamelhuge.routeconfig;
  2. import com.gct.tools.etlcamelhuge.MQ.MessageBody;
  3. import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
  4. import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
  5. import org.apache.camel.*;
  6. import org.apache.camel.builder.RouteBuilder;
  7. //import org.apache.rocketmq.common.message.Message;
  8. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.context.annotation.Configuration;
  12. import javax.annotation.Resource;
  13. import java.math.BigDecimal;
  14. import java.text.SimpleDateFormat;
  15. import java.time.LocalDateTime;
  16. import java.util.*;
  17. /**
  18. * class name: CamelJDBCConfiguration
  19. *
  20. * @author lloyd
  21. * @version 1.0
  22. * @since 2021/4/14 下午3:16
  23. */
  24. @Configuration
  25. public class CamelJDBCConfiguration /*extends RouteBuilder */ {
  26. @Autowired
  27. private RocketMQTemplate rocketMQTemplate;
  28. /***
  29. * 从天安那边获取功图数据
  30. */
  31. /* @Override
  32. public void configure() throws Exception {*/
  33. /* SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
  34. Date date = new Date(System.currentTimeMillis());
  35. String formatDate = formatter.format(date) + " 00:00:00";
  36. // 一小时执行一次
  37. from("timer:mytimer1?period=3600000")
  38. .routeId("jdbc-gtsj-?")
  39. .setHeader("dyna_create_time", constant(formatDate))
  40. .setBody(simple("select well_id,dyna_create_time,check_date,displacement,disp_load from public.pc_fd_pumpjack_dyna_dia_t where well_id ='SQ2306' and dyna_create_time > '${header.dyna_create_time}'::timestamp and dyna_create_time <'${header.dyna_create_time}'::timestamp + '1 day' order by dyna_create_time limit 1 "))
  41. .to("jdbc:gtsj")
  42. .split(body()).process(exchange -> {
  43. Message in = exchange.getIn();
  44. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  45. String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
  46. aRow.put("dyna_create_time", prod_date);
  47. if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
  48. String[] displacements = aRow.get("displacement").toString().split(";");
  49. String[] disp_loads = aRow.get("disp_load").toString().split(";");
  50. String sgt = "";
  51. for (int i = 0; i < displacements.length; i++) {
  52. sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
  53. }
  54. String[] s = sgt.split(",");
  55. String w = "";
  56. for (int i = 0; i < s.length; i++) {
  57. w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
  58. }
  59. aRow.put("sgt", w);
  60. }
  61. })//从这里开始
  62. .setHeader("MQOne", simple("${body}"))
  63. .setHeader("well_name", simple("${body[well_id]}"))
  64. //.setHeader("pord_date", simple("${body[dyna_create_time]}"))
  65. //.setHeader("rcv_date", simple("${body[check_date]}"))
  66. //.setHeader("sgt", simple("${body[sgt]}"))
  67. .setBody(simple("update centralbase.cb_temp_well_mech_runtime set prod_date = '${body[dyna_create_time]}',sgt = '${body[sgt]}' where well_id = (select well_id from centralbase.cb_cd_well_source where well_common_name = '${body[well_id]}')"))
  68. .to("jdbc:centralbase")
  69. .setBody(simple("select well_id,org_id from centralbase.cb_cd_well_source where well_common_name = '${in.header.well_name}' "))
  70. .to("jdbc:centralbase")
  71. .setHeader("MQTwo", simple("${body}"))
  72. //.setHeader("well_id", simple("${body[well_id]}"))
  73. //.setHeader("org_id", simple("${body[org_id]}"))
  74. .process(exchange -> {
  75. HashMap mqOne = exchange.getMessage().getHeader("MQOne", HashMap.class);
  76. HashMap mqTwo = exchange.getMessage().getHeader("MQTwo", HashMap.class);
  77. JSONObject jsonObject = new JSONObject();
  78. jsonObject.put("well_name", mqOne.get("well_id"));
  79. jsonObject.put("org_id", mqTwo.get("org_id"));
  80. jsonObject.put("prod_date", mqOne.get("dyna_create_time"));
  81. jsonObject.put("rcv_date", mqOne.get("check_date"));
  82. jsonObject.put("sgt", mqOne.get("sgt"));
  83. rocketMQTemplate.sendOneWayOrderly("Diagnose_MsgV1", jsonObject, mqTwo.get("well_id").toString());
  84. }).log("insert!!!").end();
  85. //从天安那边获取状态日数据
  86. from("timer:mytimer2?period=3600000")
  87. .routeId("jdbc-ztr-?")
  88. .setHeader("acqui_time", constant(formatDate))
  89. .setBody(simple("select well_id,acqui_time,phase_a_current,phase_b_current,phase_c_current,tubing_pre,casing_pre,phase_a_voltage,phase_b_voltage,phase_c_voltage from public.t_dyna_well_oil where acqui_time > '${header.acqui_time}'::timestamp and acqui_time <'${header.acqui_time}'::timestamp + '1 day'"))
  90. .to("jdbc:gtsj")
  91. .split(body()).process(exchange -> {
  92. Message in = exchange.getIn();
  93. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  94. if (aRow.get("phase_a_current").toString() == null || aRow.get("phase_b_current").toString() == null || aRow.get("phase_c_current").toString() == null || aRow.get("tubing_pre").toString() == null || aRow.get("casing_pre").toString() == null || aRow.get("phase_a_voltage").toString() == null || aRow.get("phase_b_voltage").toString() == null || aRow.get("phase_c_voltage").toString() == null) {
  95. aRow.put("phase_a_current", "0.0");
  96. aRow.put("phase_b_current", "0.0");
  97. aRow.put("phase_c_current", "0.0");
  98. aRow.put("tubing_pre", "0.0");
  99. aRow.put("casing_pre", "0.0");
  100. aRow.put("phase_a_voltage", "0.0");
  101. aRow.put("phase_b_voltage", "0.0");
  102. aRow.put("phase_c_voltage", "0.0");
  103. }
  104. })
  105. .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set prod_date = '${body[acqui_time]}'::timestamp,elec_pump_current_a = '${body[phase_a_current]}',elec_pump_current_b = '${body[phase_b_current]}',elec_pump_current_c = '${body[phase_c_current]}',tubing_pres = '${body[tubing_pre]}',casing_pres = '${body[casing_pre]}' ,elec_pump_voltage_a = '${body[phase_a_voltage]}',elec_pump_voltage_b = '${body[phase_b_voltage]}',elec_pump_voltage_c = '${body[phase_c_voltage]}' where well_id = (select well_id from centralbase.cb_cd_well_source where well_common_name = '${body[well_id]}')"))
  106. .to("jdbc:centralbase")
  107. .log("insert completed")
  108. .end();*/
  109. /* from("timer:send?period=30000")
  110. .routeId("Diagnose_MsgV1")
  111. //.setBody(simple("select * from "))
  112. .process(exchange -> {
  113. JSONObject jo = new JSONObject();
  114. jo.put("well_name", "XJ_001");
  115. jo.put("org_id", "14");
  116. jo.put("prod_date", "2021-01-25 09:00:00");
  117. jo.put("sgt", "0,4176,0,4208,1,4256,3,4304,6,4416,9,4496,12,4576,17,4768,21,4864,25,4912,32,4864,38,4784,43,4704,52,4560,59,4496,66,4480,73,4496,84,4608,92,4688,100,4736,112,4816,121,4816,129,4800,143,4704,152,4640,161,4592,175,4560,185,4560,194,4576,204,4592,218,4640,227,4656,237,4656,251,4640,260,4624,269,4592,283,4512,292,4480,300,4464,313,4432,321,4432,329,4448,337,4448,348,4432,355,4400,362,4368,371,4304,377,4256,383,4208,390,4128,395,4112,399,4096,405,4064,408,4064,411,4047,414,4064,416,4064,418,4047,419,4047,419,4032,419,4015,419,3968,417,3904,416,3856,414,3792,410,3600,407,3536,403,3504,399,3520,393,3584,388,3647,383,3711,374,3824,368,3808,362,3776,352,3679,345,3615,337,3568,326,3504,317,3520,309,3536,301,3584,288,3647,279,3664,270,3664,256,3647,247,3600,237,3568,223,3520,213,3504,204,3504,190,3536,180,3568,171,3600,161,3615,148,3647,139,3647,130,3632,117,3615,108,3615,100,3615,88,3632,80,3664,73,3696,63,3760,56,3792,50,3808,44,3840,35,3856,30,3856,25,3872,19,3904,15,3920,12,3952,7,4015,5,4047,3,4064,1,4096,0,4112,0,4112");
  118. //RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
  119. DefaultMQProducer producer = new DefaultMQProducer("test1.json");
  120. producer.setNamesrvAddr("10.72.143.2:9876");
  121. // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
  122. producer.start();
  123. org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message();
  124. message.setKeys("XJ_001");
  125. message.setBody(jo.toJSONString().getBytes(StandardCharsets.UTF_8));
  126. message.setTopic("Diagnose_MsgV1");
  127. rocketMQTemplate.sendOneWayOrderly("Diagnose_MsgV1",jo,"XJ_001");
  128. producer.sendOneway(message);
  129. System.out.println(" send ");
  130. }).end();
  131. };*/
  132. /* @Bean
  133. public RouteBuilder routeBuilder() {
  134. ArrayList<HashMap<String, Object>> hashMaps = new ArrayList<>();
  135. return new RouteBuilder() {
  136. @Override
  137. public void configure() throws Exception {
  138. from("timer://foo?period=1000")
  139. .routeId("jdbc-pg-?")
  140. .setHeader("prod_time_ctl", constant("2021-02-27 10:00:00.000000"))
  141. //.setBody(constant("select * from centralbase.cb_temp_well_mech_runtime where prod_date= :prod_time_ctl "))
  142. .setBody(simple("select * from centralbase.cb_temp_well_mech_runtime where prod_date= :?prod_time_ctl ::timestamp "))
  143. .log("select body :${body}")
  144. .to("jdbc:centralbase?useHeadersAsParameters=true")
  145. .tracing()
  146. //via transform
  147. .transform()
  148. .body(result -> {
  149. ArrayList<HashMap<String, Object>> list = hashMaps.getClass().cast(result);
  150. log.info("The data count:{}", list.toString());
  151. return list;
  152. })
  153. //split data to one by one
  154. .split(body()).process(exchange -> {
  155. Message in = exchange.getIn();
  156. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  157. log.info("The data count2222:{}", aRow.toString());
  158. })
  159. .log("ok?").setHeader("dddd", constant("ssss"))
  160. .process().message(message -> {
  161. log.info("message:{}", message.getHeader("dddd"));
  162. ArrayList<HashMap<String, Object>> body = message.getBody(ArrayList.class);
  163. log.info("body::::{}",body.toString());
  164. })
  165. .process(exchange -> {
  166. Message in = exchange.getIn();
  167. ArrayList<HashMap<String, Object>> aRow = in.getBody(ArrayList.class);
  168. log.info("The data count3333:{}", aRow.toString());
  169. })
  170. //create "insert sql" with parameters
  171. .setBody(simple("insert into dgns_gtbaseparam(wellname,recivetime,sgt) values('${body[well_id]}','${body[prod_date]}' :: timestamp,'${body[sgt]}')"))
  172. //now for test1.json
  173. //.setBody(simple("insert into dgns_gtbaseparam(wellname,recivetime,sgt) values('${body[well_id]}',now(),'${body[sgt]}')"))
  174. //look up the insert sql
  175. .process().message(message -> {
  176. log.info("The insert sql {}", message.getBody(String.class));
  177. })
  178. //try test1.json
  179. .doTry()
  180. .to("jdbc:diagnosis").log("insert completed")
  181. .doCatch(Throwable.class)
  182. .log("primary key already exited,insert failed")
  183. //"if" statement test1.json
  184. .choice()
  185. .when(exchange -> {
  186. return exchange.getException() != null;
  187. if (exchange.getException() != null) {
  188. log.error(exchange.getException().getMessage());
  189. return exchange.getException().getMessage().contains("already");
  190. }
  191. return false;
  192. }).log("primary key already exited,insert failed")
  193. .otherwise().log("insert completed")
  194. .endChoice()
  195. .end();
  196. }
  197. ;
  198. };
  199. }*/
  200. /*@Bean
  201. public RouteBuilder routeBuilderWithOracle() {
  202. ArrayList<HashMap<String, Object>> hashMaps = new ArrayList<>();
  203. MQDiagnoseMsg mqDiagnoseMsg = new MQDiagnoseMsg();
  204. return new RouteBuilder() {
  205. @Override
  206. public void configure() throws Exception {
  207. from("timer:send?period=30000")
  208. .routeId("Diagnose_MsgV1")
  209. .setBody(simple("select count(org_id)+1 as count from centralbase.cb_pc_organization "))
  210. .to("jdbc:centralbase")
  211. .process(exchange -> {
  212. Message in = exchange.getIn();
  213. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  214. exchange.getIn().setHeader("count" ,aRow.get("count"));
  215. })
  216. .setBody(simple("select distinct station_id,org_id,station_name from centralbase.cb_cd_well_source where station_id not in (select org_id_pre from centralbase.cb_pc_organization) "))
  217. .to("jdbc:centralbase")
  218. .split(body()).process(exchange -> {
  219. Message in = exchange.getIn();
  220. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  221. aRow.put("newId",aRow.get("org_id")+"@"+aRow.get("station_id"));
  222. })
  223. .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_id_pre,parent_id,org_name,org_level) values('${in.header.count}','${body[station_id]}','${body[org_id]}','${body[station_name]}','3')"))
  224. .to("jdbc:centralbase")
  225. // 因为使用的是每次count 求和之后当作org_id 插入,且只有一个唯一约束,所以在新增之后,会出现脏数据,加了一个删除,将不符合要求脏数据的都去掉
  226. .setBody(simple("delete from centralbase.cb_pc_organization where org_name is null or parent_id is null or org_id_pre is null"))
  227. .to("jdbc:centralbase")
  228. .setBody(simple("select c1.org_id,c1.org_id_pre from centralbase.cb_pc_organization c1 ,centralbase.cb_cd_well_source c2 where c1.org_id_pre = c2.station_id"))
  229. .to("jdbc:centralbase")
  230. .split(body())
  231. .setBody(simple("update centralbase.cb_cd_well_source set org_id ='${body[org_id]}' where station_id = '${body[org_id_pre]}' "))
  232. .to("jdbc:centralbase")
  233. .end();
  234. };
  235. };
  236. }*/
  237. //获取最小载荷
  238. public Double min(String[] strings){
  239. double[] doubles = new double[strings.length];
  240. for (int i = 0; i < strings.length; i++) {
  241. doubles[i] = Double.parseDouble(strings[i]);
  242. }
  243. return Arrays.stream(doubles).min().getAsDouble();
  244. }
  245. //获取最大载荷
  246. public Double max(String[] strings){
  247. double[] doubles = new double[strings.length];
  248. for (int i = 0; i < strings.length; i++) {
  249. doubles[i] = Double.parseDouble(strings[i]);
  250. }
  251. return Arrays.stream(doubles).max().getAsDouble();
  252. }
  253. @Resource(name = "diagnoseMessageProducer")
  254. private MessageProducer producer;
  255. @Bean
  256. public RouteBuilder routeBuilderWithOracle1() {
  257. SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
  258. Date date = new Date(System.currentTimeMillis());
  259. String date1 = formatter.format(date);
  260. String formatDate = formatter.format(date) + " 00:00:00";
  261. return new RouteBuilder() {
  262. private SortedSet<String> organization;
  263. private Map<String, Integer> orgIDs;
  264. private Integer orgID;
  265. //全部执行完成的大概时间在30-40分钟
  266. @Override
  267. public void configure() throws Exception {
  268. //24小时执行一次
  269. //单个执行时间30s左右,在之前有数据的情况下
  270. from("timer:mytimer1?period=604800000")
  271. .routeId("oracle-1")
  272. .setHeader("date", constant(date1))
  273. .setBody(simple("select distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from zd_zdgs.dba01@A2 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
  274. .to("jdbc:oracle")
  275. .transform()
  276. .body((result) -> {
  277. organization = new TreeSet<>();
  278. orgID = 0;
  279. orgIDs = new HashMap<>();
  280. return result;
  281. })
  282. .step("1")
  283. .split(body()).process(exchange -> {
  284. Message in = exchange.getIn();
  285. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  286. String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK");
  287. String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC");
  288. String org_level1 = aRow.get("ZYQ").toString();
  289. aRow.put("station_id", org_level3);
  290. //这里是重新生成的orgid,最好先查一下centralbase里已有的
  291. //默认一张新表
  292. if (organization.add(org_level1)) {
  293. orgID++;
  294. orgIDs.put(org_level1, orgID);
  295. }
  296. if (organization.add(org_level2)) {
  297. orgID++;
  298. orgIDs.put(org_level2, orgID);
  299. }
  300. if (organization.add(org_level3)) {
  301. orgID++;
  302. orgIDs.put(org_level3, orgID);
  303. }
  304. })
  305. .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
  306. "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}')" +
  307. " ON conflict(well_id) DO UPDATE set remarks = '${body[BZ]}' "))
  308. .to("jdbc:centralbase")
  309. .end()
  310. .transform().body((re) -> {
  311. List<Map<String, Object>> rows = new ArrayList<>();
  312. int code = 0;
  313. for (String s : organization) {
  314. code++;// code is same as org_id
  315. String[] orgs = s.split("@");
  316. Map<String, Object> row = new HashMap<>();
  317. row.put("org_id_pre", s);
  318. row.put("org_code", code);
  319. row.put("org_id", "" + code);
  320. switch (orgs.length) {
  321. case 1:
  322. row.put("org_name", orgs[0]);
  323. row.put("org_level", 1);
  324. row.put("org_parent", "0");
  325. break;
  326. case 2:
  327. row.put("org_name", orgs[1]);
  328. row.put("org_level", 2);
  329. row.put("org_parent", orgIDs.get(orgs[0]).toString());
  330. break;
  331. case 3:
  332. row.put("org_name", orgs[2]);
  333. row.put("org_level", 3);
  334. row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString());
  335. break;
  336. }
  337. rows.add(row);
  338. }
  339. return rows;
  340. }).split(body())
  341. .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
  342. "values('${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}')" +
  343. "ON conflict(org_id_pre) DO UPDATE set org_code = '${body[org_code]}' "))
  344. .to("jdbc:centralbase")
  345. .end()
  346. .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
  347. .to("jdbc:centralbase")
  348. .split(body())
  349. .setBody(simple("update centralbase.cb_cd_well_source set org_id = '${body[org_id]}' where station_name = '${body[org_name]}'"))
  350. .to("jdbc:centralbase")
  351. .log("insert")
  352. .end();
  353. //单独执行时间10s
  354. from("timer:mytimer2?period=3600000")
  355. .routeId("oracle-2")
  356. .setHeader("date", constant(date1))
  357. .setBody(simple("select distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from zd_zdgs.dba01@A2 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
  358. //.setBody(simple("select distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from zd_zdgs.dba01@A2 where rq = to_date('2021-07-05','yyyy-MM-dd') and qyrq is not null "))
  359. .to("jdbc:oracle")
  360. .split(body()).process(exchange -> {
  361. Message in = exchange.getIn();
  362. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  363. if (aRow.get("YZ") == null) aRow.put("YZ", "0.0");
  364. if (aRow.get("HYSX") == null) aRow.put("HYSX", "0.0");
  365. if (aRow.get("YYSX") == null) aRow.put("YYSX", "0.0");
  366. if (aRow.get("TYSX") == null) aRow.put("TYSX", "0.0");
  367. if (aRow.get("BS") == null) aRow.put("BS", "0.0");
  368. })
  369. .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth)" +
  370. "values ('${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}')"))
  371. .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth) " +
  372. "select '${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}' " +
  373. "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_status_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}')"))
  374. .to("jdbc:centralbase")
  375. .log("insert !!!")
  376. .end();
  377. //查询井对应dym不为空的数据 --目前是只要对应井能查到dym不为空的,无论是什么时间的,都放进去
  378. //将查询到的DYM数据更新到cb_pc_pro_wellbore_status_daily中
  379. //0 0 */1 * * ? 每1个小时执行一次
  380. //单独执行时间是4m15s 317条数据
  381. from("timer:mytimer5?period=3600000")
  382. .routeId("oracle-5")
  383. .setHeader("date", constant(date1 + " 00:00:00"))
  384. //三个月之内dym不为空的数据
  385. //.setBody(simple("SELECT distinct jh,rq,dym FROM zd_zdgs.dba01@A2 WHERE rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
  386. .setBody(simple("SELECT distinct jh,max(rq),dym FROM zd_zdgs.dba01@A2 WHERE dym is not null group by jh,dym"))
  387. .to("jdbc:oracle")
  388. .split(body())
  389. .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' "))
  390. .to("jdbc:centralbase")
  391. .log("insert !!!")
  392. .end();
  393. //单独执行时间30s
  394. from("timer:mytimer3?period=3600000")
  395. .routeId("oracle-3")
  396. .setHeader("date", constant(date1))
  397. .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from zd_zdgs.dba01@A2 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
  398. .to("jdbc:oracle")
  399. .split(body()).process(exchange -> {
  400. Message in = exchange.getIn();
  401. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  402. if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0");
  403. if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0");
  404. if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0");
  405. if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0");
  406. if (aRow.get("HS") == null) aRow.put("HS", "0.0");
  407. if (aRow.get("BZ") == null) aRow.put("DYM", "");
  408. })
  409. .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks) " +
  410. "values ('${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}')"))
  411. .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks) " +
  412. "select '${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}' " +
  413. "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}' )"))
  414. .to("jdbc:centralbase")
  415. .log("insert !!!")
  416. .end();
  417. //0 0 */1 * * ? 每1个小时执行一次
  418. //单独执行一次30s
  419. from("timer:mytimer4?period=3600000")
  420. .routeId("oracle-4")
  421. .setHeader("date", constant(date1))
  422. .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from zd_zdgs.dba01@A2 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
  423. .to("jdbc:oracle")
  424. .split(body()).process(exchange -> {
  425. Message in = exchange.getIn();
  426. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  427. if (aRow.get("JY") == null) aRow.put("JY", "0.0");
  428. if (aRow.get("LY") == null) aRow.put("LY", "0.0");
  429. if (aRow.get("BJ") == null) aRow.put("BJ", "0.0");
  430. if (aRow.get("BS") == null) aRow.put("BS", "0.0");
  431. if (aRow.get("BX") == null) aRow.put("BX", "0.0");
  432. if (aRow.get("ZS") == null) aRow.put("ZS", "0.0");
  433. if (aRow.get("CC") == null) aRow.put("CC", "0.0");
  434. if (aRow.get("CS") == null) aRow.put("CS", "0.0");
  435. if (aRow.get("BLX") == null) aRow.put("BLX", "");
  436. if (aRow.get("DL") == null) aRow.put("DL", "0.0");
  437. })
  438. .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency)" +
  439. "values ('${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}')"))
  440. .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency) " +
  441. "select '${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}' " +
  442. "where NOT EXISTS ( SELECT * FROM centralbase.cb_temp_well_mech_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}' )"))
  443. .to("jdbc:centralbase")
  444. .log("insert !!!")
  445. .end();
  446. from("timer:mytimer6?period=3600000")
  447. .routeId("oracle-6")
  448. .setHeader("date", constant(date1 + " 00:00:00"))
  449. //五个月之内bj不为空的数据
  450. //.setBody(simple("SELECT distinct jh,rq,dym FROM zd_zdgs.dba01@A2 WHERE rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
  451. .setBody(simple("SELECT distinct jh,bj FROM zd_zdgs.dba01@A2 WHERE rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-5),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and bj is not null"))
  452. .to("jdbc:oracle")
  453. .split(body())
  454. .setBody(simple("update centralbase.cb_temp_well_mech_runtime set pump_diameter = '${body[BJ]}' where well_id = '${body[JH]}' "))
  455. .to("jdbc:centralbase")
  456. .log("insert !!!")
  457. .end();
  458. //从天安哪里获取的数据
  459. //0 0 */1 * * ? 每1个小时执行一次
  460. //单独执行一小时的数据30s
  461. from("timer:mytimer7?period=3600000")
  462. .routeId("jdbc-gtsj-?")
  463. .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
  464. .to("jdbc:centralbase")
  465. .split(body())
  466. .setHeader("date", simple("${body[max]}"))
  467. //.setHeader("dyna_create_time", constant(formatDate))
  468. .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where dyna_create_time > '${header.date}' "))
  469. .to("jdbc:gtsj")
  470. .split(body()).process(exchange -> {
  471. Message in = exchange.getIn();
  472. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  473. String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
  474. aRow.put("dyna_create_time", prod_date);
  475. if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
  476. String[] displacements = aRow.get("displacement").toString().split(";");
  477. String[] disp_loads = aRow.get("disp_load").toString().split(";");
  478. Double susp_max_load = max(disp_loads);
  479. Double susp_min_load = min(disp_loads);
  480. String sgt = "";
  481. for (int i = 0; i < displacements.length; i++) {
  482. sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
  483. }
  484. String[] s = sgt.split(",");
  485. String w = "";
  486. for (int i = 0; i < s.length; i++) {
  487. w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
  488. }
  489. aRow.put("sgt", w);
  490. aRow.put("susp_max_load",susp_max_load);
  491. aRow.put("susp_min_load",susp_min_load);
  492. }
  493. if (aRow.get("stroke") == null) aRow.put("stroke", "0.0");
  494. if (aRow.get("frequency") == null) aRow.put("frequency", "0.0");
  495. if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load", "0.0");
  496. if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load", "0.0");
  497. })
  498. .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
  499. "values ('${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}')"))
  500. .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
  501. "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
  502. "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and prod_date = '${body[dyna_create_time]}' )"))
  503. .to("jdbc:centralbase")
  504. .log("insert!!!").end();
  505. from("timer:mytimer1?period=3600000")
  506. .routeId("centralbase-1")
  507. .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) "))
  508. //.setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date ='2021-07-12 10:25:00' and ti.well_id = '台24' limit 1 "))
  509. .to("jdbc:centralbase")
  510. .split(body())
  511. .process(exchange -> {
  512. Message in = exchange.getIn();
  513. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  514. String wellName =aRow.get("well_common_name").toString();
  515. String wellId =aRow.get("well_id").toString();
  516. String orgId = aRow.get("org_id").toString();
  517. String prodDate = aRow.get("prod_date").toString().substring(0,19);
  518. Double strokeLength = Double.valueOf(aRow.get("stroke_length").toString());
  519. Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString());
  520. String sgt = aRow.get("sgt").toString();
  521. DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
  522. producer.send((MessageBody) diagnoseMsg);
  523. }).log("send success").end();
  524. //-------------------------------------------------------------------------------------------------------------------------------------------------------------------
  525. /* from("timer:mytimer2?period=3600000")
  526. .routeId("oracle-2")
  527. .setBody(simple("select distinct station_id,station_name from centralbase.cb_cd_well_source"))
  528. .to("jdbc:centralbase")
  529. .split(body()).process(exchange -> {
  530. Message in = exchange.getIn();
  531. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  532. String parent_id = aRow.get("station_id").toString().substring(0, aRow.get("station_id").toString().lastIndexOf("@"));
  533. aRow.put("parent_id",parent_id);
  534. String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10);
  535. aRow.put("org_id",org_id);
  536. })
  537. .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" +
  538. "values('${body[org_id]}','${body[station_name]}','3','${body[parent_id]}')"))
  539. .to("jdbc:centralbase")
  540. .log("insert two")
  541. .end();
  542. from("timer:mytimer3?period=999999999")
  543. .routeId("oracle-3")
  544. .setBody(simple("select distinct parent_id from centralbase.cb_pc_organization"))
  545. .to("jdbc:centralbase")
  546. .split(body()).process(exchange -> {
  547. Message in = exchange.getIn();
  548. HashMap<String, Object> aRow = in.getBody(HashMap.class);//沙南作业区@沙采二区
  549. String parent_id = aRow.get("parent_id").toString().split("@")[0];
  550. String org_name = aRow.get("parent_id").toString().split("@")[1];
  551. //if (!aRow.containsValue(org_name)){
  552. aRow.put("org_name",org_name);
  553. // }
  554. //if (!aRow.containsValue(parent_id)){
  555. aRow.put("parent_id",parent_id);
  556. //}
  557. String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10);
  558. aRow.put("levelTwoOrgId",org_id);
  559. })
  560. .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" +
  561. "values('${body[levelTwoOrgId]}','${body[org_name]}','2','${body[parent_id]}')"))
  562. .to("jdbc:centralbase")
  563. .log("insert !!!")
  564. .end();
  565. from("timer:mytimer4?period=999999999")
  566. .routeId("oracle-4")
  567. .setBody(simple("select distinct parent_id from centralbase.cb_pc_organization where org_level = '2'"))
  568. .to("jdbc:centralbase")
  569. .split(body()).process(exchange -> {
  570. Message in = exchange.getIn();
  571. HashMap<String, Object> aRow = in.getBody(HashMap.class);//沙南作业区@沙采二区
  572. String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10);
  573. aRow.put("levelOneOrgId",org_id);
  574. })
  575. .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" +
  576. "values('${body[levelOneOrgId]}','${body[parent_id]}','1','0')"))
  577. .to("jdbc:centralbase")
  578. .log("insert !!!")
  579. .end();
  580. from("timer:mytimer5?period=999999999")
  581. .routeId("oracle-5")
  582. .setBody(simple("select org_id,org_name,parent_id from centralbase.cb_pc_organization where org_level = '1' "))
  583. .to("jdbc:centralbase")
  584. .split(body())
  585. .setBody(simple("update centralbase.cb_pc_organization set parent_id = (select org_id from centralbase.cb_pc_organization where org_name = '${body[org_name]}' ) " +
  586. "where org_level = '2' and parent_id = '${body[org_name]}'"))
  587. .to("jdbc:centralbase")
  588. .setBody(simple("select org_id,org_name,parent_id from centralbase.cb_pc_organization where org_level = '2'"))
  589. .to("jdbc:centralbase")
  590. .split(body())
  591. .setBody(simple("update centralbase.cb_pc_organization set parent_id = (select org_id from centralbase.cb_pc_organization where org_name = '${body[org_name]}' ) " +
  592. "where org_level = '3' and split_part(parent_id,'@',2) = '${body[org_name]}'"))
  593. .to("jdbc:centralbase")
  594. .log("insert !!!")
  595. .end();
  596. */
  597. /// from("quartz://name?cron=0 0 */1 * * ?") //0 0 */1 * * ? 每1个小时执行一次
  598. // .routeId("oracle-6")
  599. // .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
  600. // .to("jdbc:centralbase")
  601. // .split(body()).log("${body}")
  602. // .end();
  603. }
  604. ;
  605. };
  606. }
  607. }