CamelJDBCCofRealTimeConfiguration.java 17 KB


  1. package com.gct.tools.etlcamelhuge.routeconfig;
  2. import com.gct.common.util.SGTUtil;
  3. import com.gct.tools.etlcamelhuge.MQ.MessageBody;
  4. import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
  5. import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
  6. import org.apache.camel.Message;
  7. import org.apache.camel.builder.RouteBuilder;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import javax.annotation.Resource;
  11. import java.math.BigDecimal;
  12. import java.time.LocalDateTime;
  13. import java.time.format.DateTimeFormatter;
  14. import java.util.ArrayList;
  15. import java.util.Arrays;
  16. import java.util.HashMap;
  17. import java.util.List;
  18. /**
  19. * class name: CamelJDBCCofRealTimeConfiguration.java
  20. * 实时导数据类
  21. * @author lloyd
  22. * @version 1.0
  23. * @since 2021/4/14 下午3:16
  24. */
  25. @Configuration
  26. public class CamelJDBCCofRealTimeConfiguration {
  27. private static long sendMsgRunTime=0;
  28. public Double min(String[] strings){
  29. double[] doubles = new double[strings.length];
  30. for (int i = 0; i < strings.length; i++) {
  31. doubles[i] = Double.parseDouble(strings[i]);
  32. }
  33. return Arrays.stream(doubles).min().getAsDouble();
  34. }
  35. //获取最大载荷
  36. public Double max(String[] strings){
  37. double[] doubles = new double[strings.length];
  38. for (int i = 0; i < strings.length; i++) {
  39. doubles[i] = Double.parseDouble(strings[i]);
  40. }
  41. return Arrays.stream(doubles).max().getAsDouble();
  42. }
  43. public String getDate(){
  44. return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
  45. }
  46. /***
  47. * 异步发送 RunTime 消息到MQ中
  48. * */
  49. // @Async
  50. public void sendDataToRocketMQ(String wellName, String wellId, String prodDate, Double stroke_length , Double stroke_frequency, String sgt){
  51. String orgId = "0";
  52. prodDate = prodDate.substring(0,19);
  53. if (sgt == null || sgt.length() ==0){
  54. sgt = "0,0";
  55. }
  56. DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), stroke_length, stroke_frequency);
  57. sendMsgRunTime++;
  58. producer.send(diagnoseMsg);
  59. }
  60. @Resource(name = "diagnoseMessageProducer")
  61. private MessageProducer producer;
  62. @Bean
  63. public RouteBuilder routeBuilderWithRealTime() {
  64. return new RouteBuilder() {
  65. @Override
  66. public void configure() throws Exception {
  67. /*from("timer:mytimer-update-runTime?period=3600000")
  68. .routeId("update-runTime")
  69. .setHeader("date", constant(getDate() + " 00:00:00"))
  70. .setBody(simple("SELECT distinct jh,max(rq),bj FROM DBA01 WHERE dym is not null group by jh,bj"))
  71. .to("jdbc:oracle")
  72. .split(body()).process(exchange -> {
  73. Message in = exchange.getIn();
  74. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  75. if (aRow.get("BJ") ==null || aRow.get("BJ").equals("")) aRow.put("BJ","0.0");
  76. })
  77. .setBody(simple("update centralbase.cb_temp_well_mech_runtime set pump_diameter = '${body[BJ]}' where well_id = '${body[JH]}' "))
  78. .doTry()
  79. .to("jdbc:centralbase")
  80. .doCatch(Exception.class)
  81. .log("${header.date}"+" routeId:update-runTime -> centralbase.cb_temp_well_mech_runtime update data failed")
  82. .end();*/
  83. from("timer:mytimer-insert-runtime?period=1800000")
  84. .routeId("insert-runtime")
  85. .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
  86. .to("jdbc:centralbase")
  87. .split(body())
  88. .setHeader("date", simple("${body[max]}"))
  89. .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where dyna_create_time > '${header.date}' "))
  90. .to("jdbc:gtsj")
  91. .process(exchange -> {
  92. sendMsgRunTime = 0;
  93. })
  94. .split(body()).process(exchange -> {
  95. Message in = exchange.getIn();
  96. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  97. String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
  98. System.out.println("prod_date---"+prod_date);
  99. aRow.put("dyna_create_time", prod_date);
  100. if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
  101. String[] displacements = aRow.get("displacement").toString().split(";");//10 四舍五入
  102. String[] disp_loads = aRow.get("disp_load").toString().split(";");
  103. Double susp_max_load = max(disp_loads);
  104. Double susp_min_load = min(disp_loads);
  105. String sgt = "";
  106. for (int i = 0; i < displacements.length; i++) {
  107. sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
  108. }
  109. String[] s = sgt.split(",");
  110. String w = "";
  111. for (int i = 0; i < s.length; i++) {
  112. w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
  113. }
  114. Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
  115. aRow.put("sgt", SGTUtil.encodeToString(doubles));
  116. aRow.put("susp_max_load",susp_max_load);
  117. aRow.put("susp_min_load",susp_min_load);
  118. }
  119. //对于位移没有数据,所有数据都在载荷中的特殊数据做特别处理
  120. if ((aRow.get("displacement") == null || aRow.get("displacement").equals("")) && (aRow.get("disp_load") != null && !aRow.get("disp_load").equals(""))){
  121. String disp_load = aRow.get("disp_load").toString().replaceAll(";", ",");
  122. Double[][] doubles = SGTUtil.encodeToDoubleArray(disp_load);
  123. aRow.put("sgt", SGTUtil.encodeToString(doubles));
  124. String[] split = disp_load.split(",");
  125. List<String> list = new ArrayList<>();
  126. for (int i = 0; i < split.length; i++) {
  127. if (i%2 != 0){
  128. BigDecimal bigDecimal = new BigDecimal(split[i]).divide(new BigDecimal("100.0"));
  129. list.add(bigDecimal.toString());
  130. }
  131. }
  132. String[] loads = list.toArray(new String[0]);
  133. Double susp_max_load = max(loads);
  134. Double susp_min_load = min(loads);
  135. aRow.put("susp_max_load",susp_max_load);
  136. aRow.put("susp_min_load",susp_min_load);
  137. }
  138. if (aRow.get("stroke") == null) aRow.put("stroke", "0.0");
  139. if (aRow.get("frequency") == null) aRow.put("frequency", "0.0");
  140. if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load", "0.0");
  141. if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load", "0.0");
  142. if (aRow.get("frequency") != null){
  143. BigDecimal bd=new BigDecimal(aRow.get("frequency").toString());
  144. double frequency=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
  145. aRow.put("frequency",frequency);
  146. }
  147. if (aRow.get("stroke") != null){
  148. double stroke1 = Double.parseDouble(aRow.get("stroke").toString());
  149. BigDecimal bd=new BigDecimal(stroke1);
  150. double stroke=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
  151. aRow.put("stroke",stroke);
  152. }
  153. // long count = sendDataToRocketMQ(aRow.get("well_name").toString(), aRow.get("well_name").toString(), aRow.get("dyna_create_time").toString(), aRow.get("stroke").toString(), aRow.get("frequency").toString(), aRow.get("sgt").toString());
  154. // System.out.println("消息发送时间为: "+LocalDateTime.now()+" 发送的消息数量为: "+count + " 数据时间为:" + aRow.get("dyna_create_time"));
  155. String wellName =aRow.get("well_name").toString();
  156. String wellId =aRow.get("well_name").toString();
  157. String prodDate = aRow.get("dyna_create_time").toString().substring(0,19);
  158. Double strokeLength = Double.valueOf(aRow.get("stroke").toString());
  159. Double strokeFrequency = Double.valueOf(aRow.get("frequency").toString());
  160. String sgt = aRow.get("sgt").toString();
  161. sendDataToRocketMQ(wellName,wellId,prodDate,strokeLength,strokeFrequency,sgt);
  162. })
  163. .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
  164. "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
  165. "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and prod_date = '${body[dyna_create_time]}' )"))
  166. .to("jdbc:centralbase")
  167. .end();
  168. /*from("timer:mytimer-SendToMQ?period=180000")
  169. .routeId("SendToMQ")
  170. .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) "))
  171. .to("jdbc:centralbase")
  172. .split(body())
  173. .doTry()
  174. .process(exchange -> {
  175. Message in = exchange.getIn();
  176. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  177. String wellName =aRow.get("well_common_name").toString();
  178. String wellId =aRow.get("well_id").toString();
  179. String orgId = aRow.get("org_id").toString();
  180. String prodDate = aRow.get("prod_date").toString().substring(0,19);
  181. Double strokeLength = Double.valueOf(aRow.get("stroke_length").toString());
  182. Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString());
  183. String sgt = aRow.get("sgt").toString();
  184. DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
  185. producer.send((MessageBody) diagnoseMsg);
  186. })
  187. .doCatch(Exception.class)
  188. .log("${header.date}"+" rocketMQ send data failed")
  189. .endDoTry()
  190. .end();*/
  191. from("timer:mytimer-update-avg-mech_daily?period=3600000")
  192. .routeId("update-avg-mech_daily")
  193. .process(exchange -> {
  194. Message in = exchange.getIn();
  195. in.setHeader("date",getDate());
  196. })
  197. .setBody(simple("select well_id,avg(stroke_length) stroke_length ,avg(stroke_frequency) stroke_frequency from centralbase.cb_temp_well_mech_runtime where prod_date::date='${header.date}' group by well_id"))
  198. .to("jdbc:centralbase")
  199. .split(body()).process(exchange -> {
  200. Message in = exchange.getIn();
  201. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  202. if (aRow.get("stroke_length")!=null && aRow.get("stroke_frequency")!=null){
  203. double stroke_length=Double.parseDouble(aRow.get("stroke_length").toString());
  204. double stroke_frequency=Double.parseDouble(aRow.get("stroke_frequency").toString());
  205. BigDecimal bd=new BigDecimal(stroke_length);
  206. double stroke_lengt1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
  207. BigDecimal bd1=new BigDecimal(stroke_frequency);
  208. double stroke_frequency1=bd1.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
  209. aRow.put("strokeLength",stroke_lengt1);
  210. aRow.put("strokeFrequency",stroke_frequency1);
  211. }
  212. })
  213. .setBody(simple("update centralbase.cb_temp_well_mech_daily set stroke_length='${body[strokeLength]}' ,stroke_frequency ='${body[strokeFrequency]}' where well_id = '${body[well_id]}' and prod_date::date='${header.date}' "))
  214. .doTry()
  215. .to("jdbc:centralbase")
  216. .doCatch(Exception.class)
  217. .log("${header.date}"+" routeId:update-avg-mech_daily -> centralbase.cb_temp_well_mech_daily update data failed")
  218. .end();
  219. from("timer:mytimer-insert-mechDaily?period=3600000")
  220. .routeId("insert-mech_daily")
  221. .process(exchange -> {
  222. Message in = exchange.getIn();
  223. in.setHeader("date",getDate());
  224. })
  225. .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
  226. .to("jdbc:oracle")
  227. .split(body()).process(exchange -> {
  228. Message in = exchange.getIn();
  229. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  230. if (aRow.get("JY") == null) aRow.put("JY", "0.0");
  231. if (aRow.get("LY") == null) aRow.put("LY", "0.0");
  232. if (aRow.get("BJ") == null) aRow.put("BJ", "0.0");
  233. if (aRow.get("BS") == null) aRow.put("BS", "0.0");
  234. if (aRow.get("BX") == null) aRow.put("BX", "0.0");
  235. if (aRow.get("ZS") == null) aRow.put("ZS", "0.0");
  236. if (aRow.get("CC") == null) aRow.put("CC", "0.0");
  237. if (aRow.get("CS") == null) aRow.put("CS", "0.0");
  238. if (aRow.get("BLX") == null) aRow.put("BLX", "");
  239. if (aRow.get("DL") == null) aRow.put("DL", "0.0");
  240. })
  241. .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency) " +
  242. "select '${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}' " +
  243. "where NOT EXISTS ( SELECT * FROM centralbase.cb_temp_well_mech_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}' )"))
  244. .doTry()
  245. .to("jdbc:centralbase")
  246. .doCatch(Exception.class)
  247. .log("${header.date}"+" routeId:insert-mech_daily -> centralbase.cb_temp_well_mech_daily insert data failed")
  248. .end();
  249. };
  250. };
  251. }
  252. }