CamelJDBCCofRealTimeConfiguration.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package com.gct.tools.etlcamelhuge.routeconfig;
  2. import com.gct.common.util.SGTUtil;
  3. import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
  4. import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
  5. import lombok.Data;
  6. import org.apache.camel.Message;
  7. import org.apache.camel.builder.RouteBuilder;
  8. import org.springframework.context.annotation.Bean;
  9. import org.springframework.context.annotation.Configuration;
  10. import org.springframework.scheduling.annotation.Async;
  11. import javax.annotation.Resource;
  12. import java.math.BigDecimal;
  13. import java.time.LocalDateTime;
  14. import java.time.format.DateTimeFormatter;
  15. import java.util.Arrays;
  16. import java.util.HashMap;
  17. import java.util.Map;
  18. import java.util.SortedSet;
  19. /**
  20. * class name: CamelJDBCCofRealTimeConfiguration.java
  21. * 实时导数据类
  22. * @author lloyd
  23. * @version 1.0
  24. * @since 2021/4/14 下午3:16
  25. */
  26. @Configuration
  27. public class CamelJDBCCofRealTimeConfiguration {
  28. private static long sendMsgRunTime=0;
  29. public Double min(String[] strings){
  30. double[] doubles = new double[strings.length];
  31. for (int i = 0; i < strings.length; i++) {
  32. doubles[i] = Double.parseDouble(strings[i]);
  33. }
  34. return Arrays.stream(doubles).min().getAsDouble();
  35. }
  36. //获取最大载荷
  37. public Double max(String[] strings){
  38. double[] doubles = new double[strings.length];
  39. for (int i = 0; i < strings.length; i++) {
  40. doubles[i] = Double.parseDouble(strings[i]);
  41. }
  42. return Arrays.stream(doubles).max().getAsDouble();
  43. }
  44. public String getDate(){
  45. return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
  46. }
  47. /***
  48. * 异步发送 RunTime 消息到MQ中
  49. * */
  50. @Async
  51. public long sendDataToRocketMQ(String wellName,String wellId,String prodDate,String stroke_length ,String stroke_frequency,String sgt){
  52. String orgId = "0";
  53. prodDate = prodDate.substring(0,19);
  54. Double strokeLength = Double.valueOf(stroke_length);
  55. Double strokeFrequency = Double.valueOf(stroke_frequency);
  56. if (sgt == null || sgt.length() ==0){
  57. sgt = "0,0";
  58. }
  59. DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
  60. sendMsgRunTime++;
  61. producer.send(diagnoseMsg);
  62. return sendMsgRunTime;
  63. }
  64. @Resource(name = "diagnoseMessageProducer")
  65. private MessageProducer producer;
  66. @Bean
  67. public RouteBuilder routeBuilderWithRealTime() {
  68. return new RouteBuilder() {
  69. @Override
  70. public void configure() throws Exception {
  71. from("timer:mytimer-insert-mechDaily?period=3600000")
  72. .routeId("insert-mech_daily")
  73. .setHeader("date", constant(getDate()))
  74. .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
  75. .to("jdbc:oracle")
  76. .split(body()).process(exchange -> {
  77. Message in = exchange.getIn();
  78. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  79. if (aRow.get("JY") == null) aRow.put("JY", "0.0");
  80. if (aRow.get("LY") == null) aRow.put("LY", "0.0");
  81. if (aRow.get("BJ") == null) aRow.put("BJ", "0.0");
  82. if (aRow.get("BS") == null) aRow.put("BS", "0.0");
  83. if (aRow.get("BX") == null) aRow.put("BX", "0.0");
  84. if (aRow.get("ZS") == null) aRow.put("ZS", "0.0");
  85. if (aRow.get("CC") == null) aRow.put("CC", "0.0");
  86. if (aRow.get("CS") == null) aRow.put("CS", "0.0");
  87. if (aRow.get("BLX") == null) aRow.put("BLX", "");
  88. if (aRow.get("DL") == null) aRow.put("DL", "0.0");
  89. })
  90. .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency) " +
  91. "select '${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}' " +
  92. "where NOT EXISTS ( SELECT * FROM centralbase.cb_temp_well_mech_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}' )"))
  93. .doTry()
  94. .to("jdbc:centralbase")
  95. .doCatch(Exception.class)
  96. .log("${header.date}"+" routeId:insert-mech_daily -> centralbase.cb_temp_well_mech_daily insert data failed")
  97. .end();
  98. from("timer:mytimer-update-runTime?period=3600000")
  99. .routeId("update-runTime")
  100. .setHeader("date", constant(getDate() + " 00:00:00"))
  101. .setBody(simple("SELECT distinct jh,max(rq),bj FROM DBA01 WHERE dym is not null group by jh,bj"))
  102. .to("jdbc:oracle")
  103. .split(body()).process(exchange -> {
  104. Message in = exchange.getIn();
  105. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  106. if (aRow.get("BJ") ==null || aRow.get("BJ").equals("")) aRow.put("BJ","0.0");
  107. })
  108. .setBody(simple("update centralbase.cb_temp_well_mech_runtime set pump_diameter = '${body[BJ]}' where well_id = '${body[JH]}' "))
  109. .doTry()
  110. .to("jdbc:centralbase")
  111. .doCatch(Exception.class)
  112. .log("${header.date}"+" routeId:update-runTime -> centralbase.cb_temp_well_mech_runtime update data failed")
  113. .end();
  114. from("timer:mytimer-insert-runtimeAndSendToMQ?period=3600000")
  115. .routeId("insert-runtimeAndSendToMQ")
  116. .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
  117. .to("jdbc:centralbase")
  118. .split(body())
  119. .setHeader("date", simple("${body[max]}"))
  120. .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where dyna_create_time > '${header.date}' "))
  121. .doTry()
  122. .to("jdbc:gtsj")
  123. .doCatch(Exception.class)
  124. .log("${deader.date}" + " routeId:insert-runtimeAndSendToMQ -> select runTime data failed")
  125. .process(exchange -> {
  126. sendMsgRunTime = 0;
  127. })
  128. .split(body()).process(exchange -> {
  129. Message in = exchange.getIn();
  130. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  131. String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
  132. aRow.put("dyna_create_time", prod_date);
  133. if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
  134. String[] displacements = aRow.get("displacement").toString().split(";");//10 四舍五入
  135. String[] disp_loads = aRow.get("disp_load").toString().split(";");
  136. Double susp_max_load = max(disp_loads);
  137. Double susp_min_load = min(disp_loads);
  138. String sgt = "";
  139. for (int i = 0; i < displacements.length; i++) {
  140. sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
  141. }
  142. String[] s = sgt.split(",");
  143. String w = "";
  144. for (int i = 0; i < s.length; i++) {
  145. w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
  146. }
  147. Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
  148. aRow.put("sgt", SGTUtil.encodeToString(doubles));
  149. aRow.put("susp_max_load",susp_max_load);
  150. aRow.put("susp_min_load",susp_min_load);
  151. }
  152. if (aRow.get("stroke") == null) aRow.put("stroke", "0.0");
  153. if (aRow.get("frequency") == null) aRow.put("frequency", "0.0");
  154. if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load", "0.0");
  155. if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load", "0.0");
  156. if (aRow.get("frequency") != null){
  157. BigDecimal bd=new BigDecimal(aRow.get("frequency").toString());
  158. double frequency=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
  159. aRow.put("frequency",frequency);
  160. }
  161. if (aRow.get("stroke") != null){
  162. double stroke1 = Double.parseDouble(aRow.get("stroke").toString());
  163. BigDecimal bd=new BigDecimal(stroke1);
  164. double stroke=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
  165. aRow.put("stroke",stroke);
  166. }
  167. long count = sendDataToRocketMQ(aRow.get("well_name").toString(), aRow.get("well_name").toString(), aRow.get("dyna_create_time").toString(), aRow.get("stroke").toString(), aRow.get("frequency").toString(), aRow.get("sgt").toString());
  168. System.out.println("消息发送时间为: "+LocalDateTime.now()+" 发送的消息数量为: "+count + " 数据时间为:" + aRow.get("dyna_create_time"));
  169. })
  170. .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
  171. "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
  172. "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and prod_date = '${body[dyna_create_time]}' )"))
  173. .doTry()
  174. .to("jdbc:centralbase")
  175. .doCatch(Exception.class)
  176. .log("${header.date}"+" routeId:insert-runtimeAndSendToMQ -> centralbase.cb_temp_well_mech_runtime insert data failed ${body}")
  177. .end();
  178. from("timer:mytimer-update-avg-mech_daily?period=3600000")
  179. .routeId("update-avg-mech_daily")
  180. .setHeader("date", constant(getDate()))
  181. .setBody(simple("select well_id,avg(stroke_length) stroke_length ,avg(stroke_frequency) stroke_frequency from centralbase.cb_temp_well_mech_runtime where prod_date::date='${header.date}' group by well_id"))
  182. .to("jdbc:centralbase")
  183. .split(body()).process(exchange -> {
  184. Message in = exchange.getIn();
  185. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  186. if (aRow.get("stroke_length")!=null && aRow.get("stroke_frequency")!=null){
  187. double stroke_length=Double.parseDouble(aRow.get("stroke_length").toString());
  188. double stroke_frequency=Double.parseDouble(aRow.get("stroke_frequency").toString());
  189. BigDecimal bd=new BigDecimal(stroke_length);
  190. double stroke_lengt1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
  191. BigDecimal bd1=new BigDecimal(stroke_frequency);
  192. double stroke_frequency1=bd1.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
  193. aRow.put("strokeLength",stroke_lengt1);
  194. aRow.put("strokeFrequency",stroke_frequency1);
  195. }
  196. })
  197. .setBody(simple("update centralbase.cb_temp_well_mech_daily set stroke_length='${body[strokeLength]}' ,stroke_frequency ='${body[strokeFrequency]}' where well_id = '${body[well_id]}' and prod_date::date='${header.date}' "))
  198. .doTry()
  199. .to("jdbc:centralbase")
  200. .doCatch(Exception.class)
  201. .log("${header.date}"+" routeId:update-avg-mech_daily -> centralbase.cb_temp_well_mech_daily update data failed")
  202. .end();
  203. };
  204. };
  205. }
  206. }