package com.gct.tools.etlcamelhuge.routeconfig; import com.gct.tools.etlcamelhuge.MQ.MessageBody; import com.gct.tools.etlcamelhuge.MQ.MessageProducer; import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg; import org.apache.camel.*; import org.apache.camel.builder.RouteBuilder; //import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; import java.math.BigDecimal; import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.util.*; /** * class name: CamelJDBCConfiguration * * @author lloyd * @version 1.0 * @since 2021/4/14 下午3:16 */ @Configuration public class CamelJDBCConfiguration /*extends RouteBuilder */ { @Autowired private RocketMQTemplate rocketMQTemplate; /*** * 从天安那边获取功图数据 */ /* @Override public void configure() throws Exception {*/ /* SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd"); Date date = new Date(System.currentTimeMillis()); String formatDate = formatter.format(date) + " 00:00:00"; // 一小时执行一次 from("timer:mytimer1?period=3600000") .routeId("jdbc-gtsj-?") .setHeader("dyna_create_time", constant(formatDate)) .setBody(simple("select well_id,dyna_create_time,check_date,displacement,disp_load from public.pc_fd_pumpjack_dyna_dia_t where well_id ='SQ2306' and dyna_create_time > '${header.dyna_create_time}'::timestamp and dyna_create_time <'${header.dyna_create_time}'::timestamp + '1 day' order by dyna_create_time limit 1 ")) .to("jdbc:gtsj") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0]; aRow.put("dyna_create_time", prod_date); if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) { String[] displacements = aRow.get("displacement").toString().split(";"); String[] disp_loads = aRow.get("disp_load").toString().split(";"); String sgt = ""; for (int i = 0; i < displacements.length; i++) { sgt = sgt + displacements[i] + "," + disp_loads[i] + ","; } String[] s = sgt.split(","); String w = ""; for (int i = 0; i < s.length; i++) { w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ","; } aRow.put("sgt", w); } })//从这里开始 .setHeader("MQOne", simple("${body}")) .setHeader("well_name", simple("${body[well_id]}")) //.setHeader("pord_date", simple("${body[dyna_create_time]}")) //.setHeader("rcv_date", simple("${body[check_date]}")) //.setHeader("sgt", simple("${body[sgt]}")) .setBody(simple("update centralbase.cb_temp_well_mech_runtime set prod_date = '${body[dyna_create_time]}',sgt = '${body[sgt]}' where well_id = (select well_id from centralbase.cb_cd_well_source where well_common_name = '${body[well_id]}')")) .to("jdbc:centralbase") .setBody(simple("select well_id,org_id from centralbase.cb_cd_well_source where well_common_name = '${in.header.well_name}' ")) .to("jdbc:centralbase") .setHeader("MQTwo", simple("${body}")) //.setHeader("well_id", simple("${body[well_id]}")) //.setHeader("org_id", simple("${body[org_id]}")) .process(exchange -> { HashMap mqOne = exchange.getMessage().getHeader("MQOne", HashMap.class); HashMap mqTwo = exchange.getMessage().getHeader("MQTwo", HashMap.class); JSONObject jsonObject = new JSONObject(); jsonObject.put("well_name", mqOne.get("well_id")); jsonObject.put("org_id", mqTwo.get("org_id")); jsonObject.put("prod_date", mqOne.get("dyna_create_time")); jsonObject.put("rcv_date", mqOne.get("check_date")); jsonObject.put("sgt", mqOne.get("sgt")); rocketMQTemplate.sendOneWayOrderly("Diagnose_MsgV1", jsonObject, mqTwo.get("well_id").toString()); }).log("insert!!!").end(); //从天安那边获取状态日数据 from("timer:mytimer2?period=3600000") .routeId("jdbc-ztr-?") .setHeader("acqui_time", constant(formatDate)) .setBody(simple("select well_id,acqui_time,phase_a_current,phase_b_current,phase_c_current,tubing_pre,casing_pre,phase_a_voltage,phase_b_voltage,phase_c_voltage from public.t_dyna_well_oil where acqui_time > '${header.acqui_time}'::timestamp and acqui_time <'${header.acqui_time}'::timestamp + '1 day'")) .to("jdbc:gtsj") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); if (aRow.get("phase_a_current").toString() == null || aRow.get("phase_b_current").toString() == null || aRow.get("phase_c_current").toString() == null || aRow.get("tubing_pre").toString() == null || aRow.get("casing_pre").toString() == null || aRow.get("phase_a_voltage").toString() == null || aRow.get("phase_b_voltage").toString() == null || aRow.get("phase_c_voltage").toString() == null) { aRow.put("phase_a_current", "0.0"); aRow.put("phase_b_current", "0.0"); aRow.put("phase_c_current", "0.0"); aRow.put("tubing_pre", "0.0"); aRow.put("casing_pre", "0.0"); aRow.put("phase_a_voltage", "0.0"); aRow.put("phase_b_voltage", "0.0"); aRow.put("phase_c_voltage", "0.0"); } }) .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set prod_date = '${body[acqui_time]}'::timestamp,elec_pump_current_a = '${body[phase_a_current]}',elec_pump_current_b = '${body[phase_b_current]}',elec_pump_current_c = '${body[phase_c_current]}',tubing_pres = '${body[tubing_pre]}',casing_pres = '${body[casing_pre]}' ,elec_pump_voltage_a = '${body[phase_a_voltage]}',elec_pump_voltage_b = '${body[phase_b_voltage]}',elec_pump_voltage_c = '${body[phase_c_voltage]}' where well_id = (select well_id from centralbase.cb_cd_well_source where well_common_name = '${body[well_id]}')")) .to("jdbc:centralbase") .log("insert completed") .end();*/ /* from("timer:send?period=30000") .routeId("Diagnose_MsgV1") //.setBody(simple("select * from ")) .process(exchange -> { JSONObject jo = new JSONObject(); jo.put("well_name", "XJ_001"); jo.put("org_id", "14"); jo.put("prod_date", "2021-01-25 09:00:00"); jo.put("sgt", "0,4176,0,4208,1,4256,3,4304,6,4416,9,4496,12,4576,17,4768,21,4864,25,4912,32,4864,38,4784,43,4704,52,4560,59,4496,66,4480,73,4496,84,4608,92,4688,100,4736,112,4816,121,4816,129,4800,143,4704,152,4640,161,4592,175,4560,185,4560,194,4576,204,4592,218,4640,227,4656,237,4656,251,4640,260,4624,269,4592,283,4512,292,4480,300,4464,313,4432,321,4432,329,4448,337,4448,348,4432,355,4400,362,4368,371,4304,377,4256,383,4208,390,4128,395,4112,399,4096,405,4064,408,4064,411,4047,414,4064,416,4064,418,4047,419,4047,419,4032,419,4015,419,3968,417,3904,416,3856,414,3792,410,3600,407,3536,403,3504,399,3520,393,3584,388,3647,383,3711,374,3824,368,3808,362,3776,352,3679,345,3615,337,3568,326,3504,317,3520,309,3536,301,3584,288,3647,279,3664,270,3664,256,3647,247,3600,237,3568,223,3520,213,3504,204,3504,190,3536,180,3568,171,3600,161,3615,148,3647,139,3647,130,3632,117,3615,108,3615,100,3615,88,3632,80,3664,73,3696,63,3760,56,3792,50,3808,44,3840,35,3856,30,3856,25,3872,19,3904,15,3920,12,3952,7,4015,5,4047,3,4064,1,4096,0,4112,0,4112"); //RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); DefaultMQProducer producer = new DefaultMQProducer("test1.json"); producer.setNamesrvAddr("10.72.143.2:9876"); // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY"); producer.start(); org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message(); message.setKeys("XJ_001"); message.setBody(jo.toJSONString().getBytes(StandardCharsets.UTF_8)); message.setTopic("Diagnose_MsgV1"); rocketMQTemplate.sendOneWayOrderly("Diagnose_MsgV1",jo,"XJ_001"); producer.sendOneway(message); System.out.println(" send "); }).end(); };*/ /* @Bean public RouteBuilder routeBuilder() { ArrayList> hashMaps = new ArrayList<>(); return new RouteBuilder() { @Override public void configure() throws Exception { from("timer://foo?period=1000") .routeId("jdbc-pg-?") .setHeader("prod_time_ctl", constant("2021-02-27 10:00:00.000000")) //.setBody(constant("select * from centralbase.cb_temp_well_mech_runtime where prod_date= :prod_time_ctl ")) .setBody(simple("select * from centralbase.cb_temp_well_mech_runtime where prod_date= :?prod_time_ctl ::timestamp ")) .log("select body :${body}") .to("jdbc:centralbase?useHeadersAsParameters=true") .tracing() //via transform .transform() .body(result -> { ArrayList> list = hashMaps.getClass().cast(result); log.info("The data count:{}", list.toString()); return list; }) //split data to one by one .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); log.info("The data count2222:{}", aRow.toString()); }) .log("ok?").setHeader("dddd", constant("ssss")) .process().message(message -> { log.info("message:{}", message.getHeader("dddd")); ArrayList> body = message.getBody(ArrayList.class); log.info("body::::{}",body.toString()); }) .process(exchange -> { Message in = exchange.getIn(); ArrayList> aRow = in.getBody(ArrayList.class); log.info("The data count3333:{}", aRow.toString()); }) //create "insert sql" with parameters .setBody(simple("insert into dgns_gtbaseparam(wellname,recivetime,sgt) values('${body[well_id]}','${body[prod_date]}' :: timestamp,'${body[sgt]}')")) //now for test1.json //.setBody(simple("insert into dgns_gtbaseparam(wellname,recivetime,sgt) values('${body[well_id]}',now(),'${body[sgt]}')")) //look up the insert sql .process().message(message -> { log.info("The insert sql {}", message.getBody(String.class)); }) //try test1.json .doTry() .to("jdbc:diagnosis").log("insert completed") .doCatch(Throwable.class) .log("primary key already exited,insert failed") //"if" statement test1.json .choice() .when(exchange -> { return exchange.getException() != null; if (exchange.getException() != null) { log.error(exchange.getException().getMessage()); return exchange.getException().getMessage().contains("already"); } return false; }).log("primary key already exited,insert failed") .otherwise().log("insert completed") .endChoice() .end(); } ; }; }*/ /*@Bean public RouteBuilder routeBuilderWithOracle() { ArrayList> hashMaps = new ArrayList<>(); MQDiagnoseMsg mqDiagnoseMsg = new MQDiagnoseMsg(); return new RouteBuilder() { @Override public void configure() throws Exception { from("timer:send?period=30000") .routeId("Diagnose_MsgV1") .setBody(simple("select count(org_id)+1 as count from centralbase.cb_pc_organization ")) .to("jdbc:centralbase") .process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); exchange.getIn().setHeader("count" ,aRow.get("count")); }) .setBody(simple("select distinct station_id,org_id,station_name from centralbase.cb_cd_well_source where station_id not in (select org_id_pre from centralbase.cb_pc_organization) ")) .to("jdbc:centralbase") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); aRow.put("newId",aRow.get("org_id")+"@"+aRow.get("station_id")); }) .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_id_pre,parent_id,org_name,org_level) values('${in.header.count}','${body[station_id]}','${body[org_id]}','${body[station_name]}','3')")) .to("jdbc:centralbase") // 因为使用的是每次count 求和之后当作org_id 插入,且只有一个唯一约束,所以在新增之后,会出现脏数据,加了一个删除,将不符合要求脏数据的都去掉 .setBody(simple("delete from centralbase.cb_pc_organization where org_name is null or parent_id is null or org_id_pre is null")) .to("jdbc:centralbase") .setBody(simple("select c1.org_id,c1.org_id_pre from centralbase.cb_pc_organization c1 ,centralbase.cb_cd_well_source c2 where c1.org_id_pre = c2.station_id")) .to("jdbc:centralbase") .split(body()) .setBody(simple("update centralbase.cb_cd_well_source set org_id ='${body[org_id]}' where station_id = '${body[org_id_pre]}' ")) .to("jdbc:centralbase") .end(); }; }; }*/ //获取最小载荷 public Double min(String[] strings){ double[] doubles = new double[strings.length]; for (int i = 0; i < strings.length; i++) { doubles[i] = Double.parseDouble(strings[i]); } return Arrays.stream(doubles).min().getAsDouble(); } //获取最大载荷 public Double max(String[] strings){ double[] doubles = new double[strings.length]; for (int i = 0; i < strings.length; i++) { doubles[i] = Double.parseDouble(strings[i]); } return Arrays.stream(doubles).max().getAsDouble(); } @Resource(name = "diagnoseMessageProducer") private MessageProducer producer; @Bean public RouteBuilder routeBuilderWithOracle1() { SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd"); Date date = new Date(System.currentTimeMillis()); String date1 = formatter.format(date); String formatDate = formatter.format(date) + " 00:00:00"; return new RouteBuilder() { private SortedSet organization; private Map orgIDs; private Integer orgID; //全部执行完成的大概时间在30-40分钟 @Override public void configure() throws Exception { //24小时执行一次 //单个执行时间30s左右,在之前有数据的情况下 from("timer:mytimer1?period=604800000") .routeId("oracle-1") .setHeader("date", constant(date1)) .setBody(simple("select distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from zd_zdgs.dba01@A2 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null ")) .to("jdbc:oracle") .transform() .body((result) -> { organization = new TreeSet<>(); orgID = 0; orgIDs = new HashMap<>(); return result; }) .step("1") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK"); String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC"); String org_level1 = aRow.get("ZYQ").toString(); aRow.put("station_id", org_level3); //这里是重新生成的orgid,最好先查一下centralbase里已有的 //默认一张新表 if (organization.add(org_level1)) { orgID++; orgIDs.put(org_level1, orgID); } if (organization.add(org_level2)) { orgID++; orgIDs.put(org_level2, orgID); } if (organization.add(org_level3)) { orgID++; orgIDs.put(org_level3, orgID); } }) .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " + "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}')" + " ON conflict(well_id) DO UPDATE set remarks = '${body[BZ]}' ")) .to("jdbc:centralbase") .end() .transform().body((re) -> { List> rows = new ArrayList<>(); int code = 0; for (String s : organization) { code++;// code is same as org_id String[] orgs = s.split("@"); Map row = new HashMap<>(); row.put("org_id_pre", s); row.put("org_code", code); row.put("org_id", "" + code); switch (orgs.length) { case 1: row.put("org_name", orgs[0]); row.put("org_level", 1); row.put("org_parent", "0"); break; case 2: row.put("org_name", orgs[1]); row.put("org_level", 2); row.put("org_parent", orgIDs.get(orgs[0]).toString()); break; case 3: row.put("org_name", orgs[2]); row.put("org_level", 3); row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString()); break; } rows.add(row); } return rows; }).split(body()) .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" + "values('${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}')" + "ON conflict(org_id_pre) DO UPDATE set org_code = '${body[org_code]}' ")) .to("jdbc:centralbase") .end() .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' ")) .to("jdbc:centralbase") .split(body()) .setBody(simple("update centralbase.cb_cd_well_source set org_id = '${body[org_id]}' where station_name = '${body[org_name]}'")) .to("jdbc:centralbase") .log("insert") .end(); //单独执行时间10s from("timer:mytimer2?period=3600000") .routeId("oracle-2") .setHeader("date", constant(date1)) .setBody(simple("select distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from zd_zdgs.dba01@A2 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null ")) //.setBody(simple("select distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from zd_zdgs.dba01@A2 where rq = to_date('2021-07-05','yyyy-MM-dd') and qyrq is not null ")) .to("jdbc:oracle") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); if (aRow.get("YZ") == null) aRow.put("YZ", "0.0"); if (aRow.get("HYSX") == null) aRow.put("HYSX", "0.0"); if (aRow.get("YYSX") == null) aRow.put("YYSX", "0.0"); if (aRow.get("TYSX") == null) aRow.put("TYSX", "0.0"); if (aRow.get("BS") == null) aRow.put("BS", "0.0"); }) .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth)" + "values ('${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}')")) .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth) " + "select '${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}' " + "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_status_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}')")) .to("jdbc:centralbase") .log("insert !!!") .end(); //查询井对应dym不为空的数据 --目前是只要对应井能查到dym不为空的,无论是什么时间的,都放进去 //将查询到的DYM数据更新到cb_pc_pro_wellbore_status_daily中 //0 0 */1 * * ? 每1个小时执行一次 //单独执行时间是4m15s 317条数据 from("timer:mytimer5?period=3600000") .routeId("oracle-5") .setHeader("date", constant(date1 + " 00:00:00")) //三个月之内dym不为空的数据 //.setBody(simple("SELECT distinct jh,rq,dym FROM zd_zdgs.dba01@A2 WHERE rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;")) .setBody(simple("SELECT distinct jh,max(rq),dym FROM zd_zdgs.dba01@A2 WHERE dym is not null group by jh,dym")) .to("jdbc:oracle") .split(body()) .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' ")) .to("jdbc:centralbase") .log("insert !!!") .end(); //单独执行时间30s from("timer:mytimer3?period=3600000") .routeId("oracle-3") .setHeader("date", constant(date1)) .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from zd_zdgs.dba01@A2 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null ")) .to("jdbc:oracle") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0"); if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0"); if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0"); if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0"); if (aRow.get("HS") == null) aRow.put("HS", "0.0"); if (aRow.get("BZ") == null) aRow.put("DYM", ""); }) .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks) " + "values ('${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}')")) .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks) " + "select '${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}' " + "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}' )")) .to("jdbc:centralbase") .log("insert !!!") .end(); //0 0 */1 * * ? 每1个小时执行一次 //单独执行一次30s from("timer:mytimer4?period=3600000") .routeId("oracle-4") .setHeader("date", constant(date1)) .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from zd_zdgs.dba01@A2 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null ")) .to("jdbc:oracle") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); if (aRow.get("JY") == null) aRow.put("JY", "0.0"); if (aRow.get("LY") == null) aRow.put("LY", "0.0"); if (aRow.get("BJ") == null) aRow.put("BJ", "0.0"); if (aRow.get("BS") == null) aRow.put("BS", "0.0"); if (aRow.get("BX") == null) aRow.put("BX", "0.0"); if (aRow.get("ZS") == null) aRow.put("ZS", "0.0"); if (aRow.get("CC") == null) aRow.put("CC", "0.0"); if (aRow.get("CS") == null) aRow.put("CS", "0.0"); if (aRow.get("BLX") == null) aRow.put("BLX", ""); if (aRow.get("DL") == null) aRow.put("DL", "0.0"); }) .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency)" + "values ('${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}')")) .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency) " + "select '${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}' " + "where NOT EXISTS ( SELECT * FROM centralbase.cb_temp_well_mech_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}' )")) .to("jdbc:centralbase") .log("insert !!!") .end(); from("timer:mytimer6?period=3600000") .routeId("oracle-6") .setHeader("date", constant(date1 + " 00:00:00")) //五个月之内bj不为空的数据 //.setBody(simple("SELECT distinct jh,rq,dym FROM zd_zdgs.dba01@A2 WHERE rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;")) .setBody(simple("SELECT distinct jh,bj FROM zd_zdgs.dba01@A2 WHERE rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-5),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and bj is not null")) .to("jdbc:oracle") .split(body()) .setBody(simple("update centralbase.cb_temp_well_mech_runtime set pump_diameter = '${body[BJ]}' where well_id = '${body[JH]}' ")) .to("jdbc:centralbase") .log("insert !!!") .end(); //从天安哪里获取的数据 //0 0 */1 * * ? 每1个小时执行一次 //单独执行一小时的数据30s from("timer:mytimer7?period=3600000") .routeId("jdbc-gtsj-?") .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime ")) .to("jdbc:centralbase") .split(body()) .setHeader("date", simple("${body[max]}")) //.setHeader("dyna_create_time", constant(formatDate)) .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where dyna_create_time > '${header.date}' ")) .to("jdbc:gtsj") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0]; aRow.put("dyna_create_time", prod_date); if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) { String[] displacements = aRow.get("displacement").toString().split(";"); String[] disp_loads = aRow.get("disp_load").toString().split(";"); Double susp_max_load = max(disp_loads); Double susp_min_load = min(disp_loads); String sgt = ""; for (int i = 0; i < displacements.length; i++) { sgt = sgt + displacements[i] + "," + disp_loads[i] + ","; } String[] s = sgt.split(","); String w = ""; for (int i = 0; i < s.length; i++) { w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ","; } aRow.put("sgt", w); aRow.put("susp_max_load",susp_max_load); aRow.put("susp_min_load",susp_min_load); } if (aRow.get("stroke") == null) aRow.put("stroke", "0.0"); if (aRow.get("frequency") == null) aRow.put("frequency", "0.0"); if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load", "0.0"); if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load", "0.0"); }) .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " + "values ('${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}')")) .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " + "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " + "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and prod_date = '${body[dyna_create_time]}' )")) .to("jdbc:centralbase") .log("insert!!!").end(); from("timer:mytimer1?period=3600000") .routeId("centralbase-1") .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) ")) //.setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date ='2021-07-12 10:25:00' and ti.well_id = '台24' limit 1 ")) .to("jdbc:centralbase") .split(body()) .process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); String wellName =aRow.get("well_common_name").toString(); String wellId =aRow.get("well_id").toString(); String orgId = aRow.get("org_id").toString(); String prodDate = aRow.get("prod_date").toString().substring(0,19); Double strokeLength = Double.valueOf(aRow.get("stroke_length").toString()); Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString()); String sgt = aRow.get("sgt").toString(); DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency); producer.send((MessageBody) diagnoseMsg); }).log("send success").end(); //------------------------------------------------------------------------------------------------------------------------------------------------------------------- /* from("timer:mytimer2?period=3600000") .routeId("oracle-2") .setBody(simple("select distinct station_id,station_name from centralbase.cb_cd_well_source")) .to("jdbc:centralbase") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); String parent_id = aRow.get("station_id").toString().substring(0, aRow.get("station_id").toString().lastIndexOf("@")); aRow.put("parent_id",parent_id); String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10); aRow.put("org_id",org_id); }) .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" + "values('${body[org_id]}','${body[station_name]}','3','${body[parent_id]}')")) .to("jdbc:centralbase") .log("insert two") .end(); from("timer:mytimer3?period=999999999") .routeId("oracle-3") .setBody(simple("select distinct parent_id from centralbase.cb_pc_organization")) .to("jdbc:centralbase") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class);//沙南作业区@沙采二区 String parent_id = aRow.get("parent_id").toString().split("@")[0]; String org_name = aRow.get("parent_id").toString().split("@")[1]; //if (!aRow.containsValue(org_name)){ aRow.put("org_name",org_name); // } //if (!aRow.containsValue(parent_id)){ aRow.put("parent_id",parent_id); //} String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10); aRow.put("levelTwoOrgId",org_id); }) .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" + "values('${body[levelTwoOrgId]}','${body[org_name]}','2','${body[parent_id]}')")) .to("jdbc:centralbase") .log("insert !!!") .end(); from("timer:mytimer4?period=999999999") .routeId("oracle-4") .setBody(simple("select distinct parent_id from centralbase.cb_pc_organization where org_level = '2'")) .to("jdbc:centralbase") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class);//沙南作业区@沙采二区 String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10); aRow.put("levelOneOrgId",org_id); }) .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" + "values('${body[levelOneOrgId]}','${body[parent_id]}','1','0')")) .to("jdbc:centralbase") .log("insert !!!") .end(); from("timer:mytimer5?period=999999999") .routeId("oracle-5") .setBody(simple("select org_id,org_name,parent_id from centralbase.cb_pc_organization where org_level = '1' ")) .to("jdbc:centralbase") .split(body()) .setBody(simple("update centralbase.cb_pc_organization set parent_id = (select org_id from centralbase.cb_pc_organization where org_name = '${body[org_name]}' ) " + "where org_level = '2' and parent_id = '${body[org_name]}'")) .to("jdbc:centralbase") .setBody(simple("select org_id,org_name,parent_id from centralbase.cb_pc_organization where org_level = '2'")) .to("jdbc:centralbase") .split(body()) .setBody(simple("update centralbase.cb_pc_organization set parent_id = (select org_id from centralbase.cb_pc_organization where org_name = '${body[org_name]}' ) " + "where org_level = '3' and split_part(parent_id,'@',2) = '${body[org_name]}'")) .to("jdbc:centralbase") .log("insert !!!") .end(); */ /// from("quartz://name?cron=0 0 */1 * * ?") //0 0 */1 * * ? 每1个小时执行一次 // .routeId("oracle-6") // .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' ")) // .to("jdbc:centralbase") // .split(body()).log("${body}") // .end(); } ; }; } }