package com.gct.tools.etlcamelhuge.routeconfig; import org.apache.camel.*; import org.apache.camel.builder.RouteBuilder; //import org.apache.rocketmq.common.message.Message; import org.apache.camel.model.ExpressionNode; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.RouteDefinition; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; import java.math.BigDecimal; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; /** * class name: CamelJDBCConfiguration * * @author lloyd * @version 1.0 * @since 2021/4/14 下午3:16 */ @Configuration public class CamelJDBCConfiguration { public String getDate(){ return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); } @Bean public RouteBuilder routeBuilderWithOracle1() { return new RouteBuilder() { private SortedSet organization; private Map orgIDs = new HashMap<>(); private int orgID; private Map orgIdPreList; private Integer org; public ProcessorDefinition setMyBody(RouteDefinition route){ return route.setBody(simple("select well_id from centralbase.sys_access_well_control where access_status='1' ")) .to("jdbc:centralbase") .split(body()).process(exchange -> { HashMap body = exchange.getIn().getBody(HashMap.class); exchange.getIn().setHeader("well_id",body.get("well_id")); }); } @Override public void configure() throws Exception { RouteDefinition OrgAndWellSource= (RouteDefinition) from("timer:insert-OrgAndWellSource?period=86400000") .routeId("insert-OrgAndWellSource") .setHeader("date", constant(getDate()+" 00:00:00")) .process(exchange -> { org = 0; orgIdPreList = new HashMap<>(); }) .setBody(simple("select max(org_id) from centralbase.cb_pc_organization")) .to("jdbc:centralbase") .process(exchange -> { HashMap body = exchange.getIn().getBody(HashMap.class); if (body == null || StringUtils.isEmpty(body.get("max"))) org = 0; else org = Integer.valueOf(body.get("max").toString()); }) .setBody(simple("select org_id_pre,org_id from centralbase.cb_pc_organization")) .to("jdbc:centralbase") .split(body()).process(exchange -> { HashMap body = exchange.getIn().getBody(HashMap.class); orgIdPreList.put(body.get("org_id_pre"),body.get("org_id")); }).end(); setMyBody(OrgAndWellSource) .setBody(simple("select distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from DBA01 where rq=to_date('${header.date}','yyyy-mm-dd hh24:mi:ss') and jh ='${header.well_id}'")) .to("jdbc:oracle") .transform() .body((result) -> { organization = new TreeSet<>(); return result; }) .step("1") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); if (StringUtils.isEmpty(aRow.get("qyrq"))){ aRow.put("QYRQ","2021-01-01 00:00:00"); } if (!aRow.containsKey("JM")) aRow.put("JM",aRow.get("JH")); String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK"); String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC"); String org_level1 = aRow.get("ZYQ").toString(); aRow.put("station_id", org_level3); orgID = org; if ((!orgIdPreList.containsKey(org_level1)) || (!orgIdPreList.containsKey(org_level2)) || (!orgIdPreList.containsKey(org_level3))) { if (organization.add(org_level1)) { if (!orgIDs.containsKey(org_level1)) orgIDs.put(org_level1,++orgID); } if (organization.add(org_level2)) { if (!orgIDs.containsKey(org_level2)) orgIDs.put(org_level2,++orgID); } if (organization.add(org_level3)) { if (!orgIDs.containsKey(org_level3)) orgIDs.put(org_level3,++orgID); } } if(orgIdPreList.get(org_level3) !=null){ aRow.put("org_id",orgIdPreList.get(org_level3)); return; } if(orgIDs.get(org_level3) !=null){ aRow.put("org_id",orgIDs.get(org_level3)); } }) .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,org_id,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " + "select '${body[JH]}','${body[JM]}','${body[QYRQ]}'::timestamp,'${body[org_id]}','${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}' " + "where NOT EXISTS ( SELECT * FROM centralbase.cb_cd_well_source WHERE well_id = '${body[JH]}' )")) .to("jdbc:centralbase") .end() .transform().body((re) -> { List> rows = new ArrayList<>(); for (String s : organization) { Map row = new HashMap<>(); String[] orgs = s.split("@"); row.put("org_id_pre", s); switch (orgs.length) { case 1: row.put("org_name", orgs[0]); row.put("org_level", 1); row.put("org_parent", "0"); break; case 2: row.put("org_name", orgs[1]); row.put("org_level", 2); row.put("org_parent", orgIDs.get(orgs[0]).toString()); break; case 3: row.put("org_name", orgs[2]); row.put("org_level", 3); row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString()); break; } if (!orgIdPreList.containsKey(s)) { org++; row.put("org_code", org); row.put("org_id", "" + org); orgIdPreList.put(s, row.get("org_id")); rows.add(row); } } return rows; }).split(body()) .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" + "select '${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}' " + "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_organization WHERE org_id = '${body[org_id]}')")) .doTry() .to("jdbc:centralbase") .doCatch(Exception.class) .log("${header.date}"+" routeId:insert-OrgAndWellSource-> centralbase.cb_pc_organization insert data failed") .end(); from("timer:update-wellControl?period=3600000") .routeId("update-wellControl") .setBody(simple("select scc.well_id,wo.well_common_name,op.org_id from centralbase.sys_access_well_control scc\n" + "left join centralbase.cb_cd_well_source wo on scc.well_id = wo.well_id\n" + "left join centralbase.cb_pc_organization op on wo.org_id = op.org_id\n" + "where scc.access_status ='1' ")) .to("jdbc:centralbase") .split(body()) .split(body()).process(exchange -> { HashMap body = exchange.getIn().getBody(HashMap.class); String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); if (body.get("well_common_name") == null || body.get("well_common_name").equals("")) { body.put("remarks","暂无井信息"); body.put("error_id",0); body.put("updateTime",format); }else if (body.get("org_id") == null || body.get("org_id").equals("")){ body.put("remarks","暂无机构信息"); body.put("error_id",0); body.put("updateTime",format); }else { body.put("remarks",""); body.put("error_id",1); body.put("updateTime",format); } }) .setBody(simple("update centralbase.sys_access_well_control set well_common_name='${body[well_common_name]}',org_id='${body[org_id]}',update_time = '${body[updateTime]}'::timestamp, remarks ='${body[remarks]}' ,error_id ='${body[error_id]}' where well_id ='${body[well_id]}' ")) .to("jdbc:centralbase") .end(); RouteDefinition statusDaily= from("timer:mytimer-insert-statusDaily?period=3600000") .routeId("insert-statusDaily") .process(exchange -> { Message in = exchange.getIn(); in.setHeader("date",getDate()); }); setMyBody(statusDaily) .setBody(simple("select distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and jh='${header.well_id}' and qyrq is not null ")) .to("jdbc:oracle") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); if (aRow.get("YZ") == null) aRow.put("YZ", "0.0"); if (aRow.get("HYSX") == null) aRow.put("HYSX", "0.0"); if (aRow.get("YYSX") == null) aRow.put("YYSX", "0.0"); if (aRow.get("TYSX") == null) aRow.put("TYSX", "0.0"); if (aRow.get("BS") == null) aRow.put("BS", "0.0"); }) .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth) " + "select '${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}' " + "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_status_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}')")) .doTry() .to("jdbc:centralbase") .doCatch(Exception.class) .log("${header.date}"+" routeId:insert-statusDaily -> centralbase.cb_pc_pro_wellbore_status_daily insert data failed") .end(); RouteDefinition statusDailyDYM = from("timer:mytimer-update-statusDaily-DYM?period=3600000") .routeId("update-statusDaily-DYM") .process(exchange -> { Message in = exchange.getIn(); in.setHeader("date",getDate()); }); setMyBody(statusDailyDYM) .setBody(simple("select distinct jh , rq , dym from DBA01 where (jh,rq) in (SELECT jh,max(rq) rq FROM DBA01 WHERE dym is not null and rq<= to_date('${header.date}','yyyy-MM-dd')and jh='${header.well_id}' group by jh)")) .to("jdbc:oracle") .split(body()) .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' and prod_date::date = '${header.date}' ")) .doTry() .to("jdbc:centralbase") .doCatch(Exception.class) .log("${header.date}"+" routeId:update-statusDaily-DYM -> centralbase.cb_pc_pro_wellbore_status_daily update data failed") .end(); RouteDefinition statusDailyBJ = from("timer:mytimer-update-statusDaily-BJ?period=3600000") .routeId("update-statusDaily-BJ") .process(exchange -> { Message in = exchange.getIn(); in.setHeader("date",getDate()); }); setMyBody(statusDailyBJ) .setBody(simple("select distinct jh , rq , bj from DBA01 where (jh,rq) in (SELECT jh,max(rq) rq FROM DBA01 WHERE bj is not null and rq<= to_date('${header.date}','yyyy-MM-dd') and jh='${header.well_id}' group by jh)")) .to("jdbc:oracle") .split(body()).process(exchange -> { HashMap body = exchange.getIn().getBody(HashMap.class); }) .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[BJ]}' where well_id = '${body[JH]}' and prod_date::date = '${header.date}' ")) .doTry() .to("jdbc:centralbase") .doCatch(Exception.class) .log("${header.date}"+" routeId:update-statusDaily-BJ -> centralbase.cb_pc_pro_wellbore_status_daily update data failed") .end(); RouteDefinition submergenceDepth = from("timer:mytimer-update-statusDaily-submergenceDepth?period=3600000") .routeId("update-statusDaily-submergenceDepth") .process(exchange -> { Message in = exchange.getIn(); in.setHeader("date",getDate()+" 00:00:00"); }); setMyBody(submergenceDepth) .setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' and well_id='${header.well_id}' ")) .to("jdbc:centralbase") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); aRow.put("submergence_depth",null); if (aRow.get("start_pump_liq_level")!=null && aRow.get("pump_depth")!=null){ double cmd= Double.valueOf(aRow.get("pump_depth").toString())-Double.valueOf(aRow.get("start_pump_liq_level").toString())/10; BigDecimal bd=new BigDecimal(cmd); double cmd1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue(); aRow.put("submergence_depth",cmd1); } }) .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set submergence_depth = '${body[submergence_depth]}' where well_id = '${body[well_id]}' and prod_date = '${body[prod_date]}'")) .doTry() .to("jdbc:centralbase") .doCatch(Exception.class) .log("${header.date}"+" routeId:update-statusDaily-submergenceDepth -> centralbase.cb_pc_pro_wellbore_status_daily update data failed") .end(); RouteDefinition volDaily = from("timer:mytimer-insert-volDaily?period=3600000") .routeId("insert-volDaily") .process(exchange -> { Message in = exchange.getIn(); in.setHeader("date",getDate()); }); setMyBody(volDaily) .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and jh ='${header.well_id}' and qyrq is not null ")) .to("jdbc:oracle") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0"); if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0"); if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0"); if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0"); if (aRow.get("HS") == null) aRow.put("HS", "0.0"); if (aRow.get("BZ") == null) aRow.put("BZ", ""); aRow.put("RCSL",-1); aRow.put("QYB",-1); aRow.put("SQB",-1); if (aRow.get("RCQL")!=null && aRow.get("RCYL")!=null && !aRow.get("RCYL").equals("0.0") && aRow.get("RCYL") != "0.0"){ double qyb = Double.valueOf(aRow.get("RCQL").toString()) / Double.valueOf(aRow.get("RCYL").toString()); if (!Double.isNaN(qyb) && !Double.isInfinite(qyb)){ BigDecimal bd=new BigDecimal(qyb); double d1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue(); aRow.put("QYB",d1); } } if (aRow.get("RCYL1")!=null && aRow.get("HS") != null && !aRow.get("HS").equals("0.0") && aRow.get("HS") != "0.0"){ double rcsl = (Double.valueOf(aRow.get("RCYL1").toString()) * Double.valueOf(aRow.get("HS").toString()))/100; if (!Double.isNaN(rcsl) && !Double.isInfinite(rcsl)) { BigDecimal bd = new BigDecimal(rcsl); double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue(); aRow.put("RCSL", d1); } } if (aRow.get("RCQL") != null && aRow.get("RCSL") != null && !aRow.get("RCQL").equals("0.0") && aRow.get("RCQL") != "0.0" ){ double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("RCQL").toString()); if (!Double.isNaN(sqb) && !Double.isInfinite(sqb)) { BigDecimal bd = new BigDecimal(sqb); double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue(); aRow.put("SQB", d1); } } if (!aRow.containsKey("SMD")){ aRow.put("SMD",1); } if (!aRow.containsKey("YMD")){ aRow.put("YMD",0.85); } }) .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks,gas_oil_ratio,water_prod_daily,water_gas_ratio,surface_crude_water_density,surface_crude_oil_density) " + "select '${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}','${body[QYB]}','${body[RCSL]}','${body[SQB]}','${body[SMD]}','${body[YMD]}' " + "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}' )")) .doTry() .to("jdbc:centralbase") .doCatch(Exception.class) .log("${header.date}"+" routeId:insert-volDaily -> centralbase.cb_pc_pro_wellbore_vol_daily insert data failed") .endDoTry() .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set water_prod_daily =null where water_prod_daily = -1 and prod_date = '${header.date}' ")) .to("jdbc:centralbase") .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set gas_oil_ratio =null where gas_oil_ratio = -1 and prod_date = '${header.date}' ")) .to("jdbc:centralbase") .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set water_gas_ratio =null where water_gas_ratio = -1 and prod_date = '${header.date}' ")) .to("jdbc:centralbase") .end(); RouteDefinition volDailyLiqProdDaily = from("timer:mytimer-update-volDaily-liq_prod_daily?period=3600000") .routeId("update-volDaily-liq_prod_daily") .process(exchange -> { Message in = exchange.getIn(); in.setHeader("date",getDate()); }); setMyBody(volDailyLiqProdDaily) .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd')and jh ='${header.well_id}' and qyrq is not null ")) .to("jdbc:oracle") .split(body()).process(exchange -> { Message in = exchange.getIn(); HashMap aRow = in.getBody(HashMap.class); if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0"); if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0"); if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0"); if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0"); if (aRow.get("HS") == null) aRow.put("HS", "0.0"); if (aRow.get("BZ") == null) aRow.put("BZ", ""); aRow.put("RCSL",-1); aRow.put("QYB",-1); aRow.put("SQB",-1); if (aRow.get("RCQL")!=null && aRow.get("RCYL")!=null && !aRow.get("RCYL").equals("0.0") && aRow.get("RCYL") != "0.0"){ double qyb = Double.valueOf(aRow.get("RCQL").toString()) / Double.valueOf(aRow.get("RCYL").toString()); if (!Double.isNaN(qyb) && !Double.isInfinite(qyb)){ BigDecimal bd=new BigDecimal(qyb); double d1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue(); aRow.put("QYB",d1); } } if (aRow.get("RCYL1")!=null && aRow.get("HS") != null && !aRow.get("HS").equals("0.0") && aRow.get("HS") != "0.0"){ double rcsl = (Double.valueOf(aRow.get("RCYL1").toString()) * Double.valueOf(aRow.get("HS").toString()))/100; if (!Double.isNaN(rcsl) && !Double.isInfinite(rcsl)) { BigDecimal bd = new BigDecimal(rcsl); double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue(); aRow.put("RCSL", d1); } } if (aRow.get("RCQL") != null && aRow.get("RCSL") != null && !aRow.get("RCQL").equals("0.0") && aRow.get("RCQL") != "0.0" ){ double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("RCQL").toString()); if (!Double.isNaN(sqb) && !Double.isInfinite(sqb)) { BigDecimal bd = new BigDecimal(sqb); double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue(); aRow.put("SQB", d1); } } if (!aRow.containsKey("SMD")){ aRow.put("SMD",1); } if (!aRow.containsKey("YMD")){ aRow.put("YMD",0.85); } }) .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set prod_time = '${body[SCSJ]}' ,liq_prod_daily='${body[RCYL1]}' ,oil_prod_daily ='${body[RCYL]}' ,gas_prod_daily ='${body[RCQL]}' ,water_cut='${body[HS]}' ,remarks='${body[BZ]}' ,gas_oil_ratio='${body[QYB]}' ,water_prod_daily='${body[RCSL]}' ,water_gas_ratio='${body[SQB]}',surface_crude_water_density='${body[SMD]}',surface_crude_oil_density= '${body[YMD]}' " + "where well_id = '${body[JH]}' and prod_date ='${body[RQ]}' ")) .doTry() .to("jdbc:centralbase") .doCatch(Exception.class) .log("${header.date}"+" routeId:update-volDaily-liq_prod_daily -> centralbase.cb_pc_pro_wellbore_vol_daily update data failed") .end(); }; }; } }