package com.gct.tools.etlcamelhuge.routeconfig; import com.gct.common.util.SGTUtil; import com.gct.tools.etlcamelhuge.MQ.MessageBody; import com.gct.tools.etlcamelhuge.MQ.MessageProducer; import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg; import org.apache.camel.Message; import org.apache.camel.builder.RouteBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; import java.math.BigDecimal; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; /** * class name: CamelJDBCCofRealTimeConfiguration.java * 实时导数据类 * @author lloyd * @version 1.0 * @since 2021/4/14 下午3:16 */ @Configuration public class CamelJDBCCofRealTimeConfiguration { private static long sendMsgRunTime=0; public Double min(String[] strings){ double[] doubles = new double[strings.length]; for (int i = 0; i < strings.length; i++) { doubles[i] = Double.parseDouble(strings[i]); } return Arrays.stream(doubles).min().getAsDouble(); } //获取最大载荷 public Double max(String[] strings){ double[] doubles = new double[strings.length]; for (int i = 0; i < strings.length; i++) { doubles[i] = Double.parseDouble(strings[i]); } return Arrays.stream(doubles).max().getAsDouble(); } public String getDate(){ return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); } /*** * 异步发送 RunTime 消息到MQ中 * */ // @Async public void sendDataToRocketMQ(String wellName, String wellId, String prodDate, Double stroke_length , Double stroke_frequency, String sgt){ String orgId = "0"; prodDate = prodDate.substring(0,19); if (sgt == null || sgt.length() ==0){ sgt = "0,0"; } DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), stroke_length, stroke_frequency); sendMsgRunTime++; producer.send(diagnoseMsg); } @Resource(name = "diagnoseMessageProducer") private MessageProducer producer; @Bean public RouteBuilder routeBuilderWithRealTime() { return new RouteBuilder() { @Override public void configure() throws Exception { /*from("timer:mytimer-update-runTime?period=3600000") .routeId("update-runTime") .setHeader("date", constant(getDate() + " 00:00:00")) .setBody(simple("SELECT distinct jh,max(rq),bj FROM DBA01 WHERE dym is not null group by jh,bj")) .to("jdbc:oracle") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); if (aRow.get("BJ") ==null || aRow.get("BJ").equals("")) aRow.put("BJ","0.0"); }) .setBody(simple("update centralbase.cb_temp_well_mech_runtime set pump_diameter = '${body[BJ]}' where well_id = '${body[JH]}' ")) .doTry() .to("jdbc:centralbase") .doCatch(Exception.class) .log("${header.date}"+" routeId:update-runTime -> centralbase.cb_temp_well_mech_runtime update data failed") .end();*/ from("timer:mytimer-insert-runtime?period=1800000") .routeId("insert-runtime") .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime ")) .to("jdbc:centralbase") .split(body()) .setHeader("date", simple("${body[max]}")) .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where dyna_create_time > '${header.date}' ")) .to("jdbc:gtsj") .process(exchange -> { sendMsgRunTime = 0; }) .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0]; System.out.println("prod_date---"+prod_date); aRow.put("dyna_create_time", prod_date); if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) { String[] displacements = aRow.get("displacement").toString().split(";");//10 四舍五入 String[] disp_loads = aRow.get("disp_load").toString().split(";"); Double susp_max_load = max(disp_loads); Double susp_min_load = min(disp_loads); String sgt = ""; for (int i = 0; i < displacements.length; i++) { sgt = sgt + displacements[i] + "," + disp_loads[i] + ","; } String[] s = sgt.split(","); String w = ""; for (int i = 0; i < s.length; i++) { w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ","; } Double[][] doubles = SGTUtil.encodeToDoubleArray(w); aRow.put("sgt", SGTUtil.encodeToString(doubles)); aRow.put("susp_max_load",susp_max_load); aRow.put("susp_min_load",susp_min_load); } //对于位移没有数据,所有数据都在载荷中的特殊数据做特别处理 if ((aRow.get("displacement") == null || aRow.get("displacement").equals("")) && (aRow.get("disp_load") != null && !aRow.get("disp_load").equals(""))){ String disp_load = aRow.get("disp_load").toString().replaceAll(";", ","); Double[][] doubles = SGTUtil.encodeToDoubleArray(disp_load); aRow.put("sgt", SGTUtil.encodeToString(doubles)); String[] split = disp_load.split(","); List list = new ArrayList<>(); for (int i = 0; i < split.length; i++) { if (i%2 != 0){ BigDecimal bigDecimal = new BigDecimal(split[i]).divide(new BigDecimal("100.0")); list.add(bigDecimal.toString()); } } String[] loads = list.toArray(new String[0]); Double susp_max_load = max(loads); Double susp_min_load = min(loads); aRow.put("susp_max_load",susp_max_load); aRow.put("susp_min_load",susp_min_load); } if (aRow.get("stroke") == null) aRow.put("stroke", "0.0"); if (aRow.get("frequency") == null) aRow.put("frequency", "0.0"); if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load", "0.0"); if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load", "0.0"); if (aRow.get("frequency") != null){ BigDecimal bd=new BigDecimal(aRow.get("frequency").toString()); double frequency=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue(); aRow.put("frequency",frequency); } if (aRow.get("stroke") != null){ double stroke1 = Double.parseDouble(aRow.get("stroke").toString()); BigDecimal bd=new BigDecimal(stroke1); double stroke=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue(); aRow.put("stroke",stroke); } // long count = sendDataToRocketMQ(aRow.get("well_name").toString(), aRow.get("well_name").toString(), aRow.get("dyna_create_time").toString(), aRow.get("stroke").toString(), aRow.get("frequency").toString(), aRow.get("sgt").toString()); // System.out.println("消息发送时间为: "+LocalDateTime.now()+" 发送的消息数量为: "+count + " 数据时间为:" + aRow.get("dyna_create_time")); String wellName =aRow.get("well_name").toString(); String wellId =aRow.get("well_name").toString(); String prodDate = aRow.get("dyna_create_time").toString().substring(0,19); Double strokeLength = Double.valueOf(aRow.get("stroke").toString()); Double strokeFrequency = Double.valueOf(aRow.get("frequency").toString()); String sgt = aRow.get("sgt").toString(); sendDataToRocketMQ(wellName,wellId,prodDate,strokeLength,strokeFrequency,sgt); }) .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " + "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " + "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and prod_date = '${body[dyna_create_time]}' )")) .to("jdbc:centralbase") .end(); /*from("timer:mytimer-SendToMQ?period=180000") .routeId("SendToMQ") .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) ")) .to("jdbc:centralbase") .split(body()) .doTry() .process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); String wellName =aRow.get("well_common_name").toString(); String wellId =aRow.get("well_id").toString(); String orgId = aRow.get("org_id").toString(); String prodDate = aRow.get("prod_date").toString().substring(0,19); Double strokeLength = Double.valueOf(aRow.get("stroke_length").toString()); Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString()); String sgt = aRow.get("sgt").toString(); DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency); producer.send((MessageBody) diagnoseMsg); }) .doCatch(Exception.class) .log("${header.date}"+" rocketMQ send data failed") .endDoTry() .end();*/ from("timer:mytimer-update-avg-mech_daily?period=3600000") .routeId("update-avg-mech_daily") .process(exchange -> { Message in = exchange.getIn(); in.setHeader("date",getDate()); }) .setBody(simple("select well_id,avg(stroke_length) stroke_length ,avg(stroke_frequency) stroke_frequency from centralbase.cb_temp_well_mech_runtime where prod_date::date='${header.date}' group by well_id")) .to("jdbc:centralbase") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); if (aRow.get("stroke_length")!=null && aRow.get("stroke_frequency")!=null){ double stroke_length=Double.parseDouble(aRow.get("stroke_length").toString()); double stroke_frequency=Double.parseDouble(aRow.get("stroke_frequency").toString()); BigDecimal bd=new BigDecimal(stroke_length); double stroke_lengt1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue(); BigDecimal bd1=new BigDecimal(stroke_frequency); double stroke_frequency1=bd1.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue(); aRow.put("strokeLength",stroke_lengt1); aRow.put("strokeFrequency",stroke_frequency1); } }) .setBody(simple("update centralbase.cb_temp_well_mech_daily set stroke_length='${body[strokeLength]}' ,stroke_frequency ='${body[strokeFrequency]}' where well_id = '${body[well_id]}' and prod_date::date='${header.date}' ")) .doTry() .to("jdbc:centralbase") .doCatch(Exception.class) .log("${header.date}"+" routeId:update-avg-mech_daily -> centralbase.cb_temp_well_mech_daily update data failed") .end(); from("timer:mytimer-insert-mechDaily?period=3600000") .routeId("insert-mech_daily") .process(exchange -> { Message in = exchange.getIn(); in.setHeader("date",getDate()); }) .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null ")) .to("jdbc:oracle") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); if (aRow.get("JY") == null) aRow.put("JY", "0.0"); if (aRow.get("LY") == null) aRow.put("LY", "0.0"); if (aRow.get("BJ") == null) aRow.put("BJ", "0.0"); if (aRow.get("BS") == null) aRow.put("BS", "0.0"); if (aRow.get("BX") == null) aRow.put("BX", "0.0"); if (aRow.get("ZS") == null) aRow.put("ZS", "0.0"); if (aRow.get("CC") == null) aRow.put("CC", "0.0"); if (aRow.get("CS") == null) aRow.put("CS", "0.0"); if (aRow.get("BLX") == null) aRow.put("BLX", ""); if (aRow.get("DL") == null) aRow.put("DL", "0.0"); }) .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency) " + "select '${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}' " + "where NOT EXISTS ( SELECT * FROM centralbase.cb_temp_well_mech_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}' )")) .doTry() .to("jdbc:centralbase") .doCatch(Exception.class) .log("${header.date}"+" routeId:insert-mech_daily -> centralbase.cb_temp_well_mech_daily insert data failed") .end(); }; }; } }