123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- package com.gct.tools.etlcamelhuge.routeconfig;
- import com.alibaba.fastjson.JSONObject;
- import com.gct.common.util.SGTUtil;
- import com.gct.tools.etlcamelhuge.MQ.DefaultMsgSendSuccessCallBack;
- import com.gct.tools.etlcamelhuge.MQ.MessageBody;
- import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
- import com.gct.tools.etlcamelhuge.camelconfig.MyDataSourceConfiguration;
- import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
- import lombok.Data;
- import org.apache.camel.*;
- import org.apache.camel.builder.RouteBuilder;
- //import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.spring.core.RocketMQTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.jdbc.core.JdbcTemplate;
- import org.springframework.scheduling.annotation.Async;
- import javax.annotation.Resource;
- import javax.sql.DataSource;
- import java.math.BigDecimal;
- import java.text.DecimalFormat;
- import java.text.SimpleDateFormat;
- import java.time.LocalDateTime;
- import java.time.format.DateTimeFormatter;
- import java.util.*;
- @Data
- class LogMessage{
- String id;
- LocalDateTime date;
- Object data;
- String msg;
- }
- /**
- * class name: CamelJDBCConfiguration
- *
- * @author lloyd
- * @version 1.0
- * @since 2021/4/14 下午3:16
- */
- @Configuration
- public class CamelJDBCConfiguration {
- public String getDate(){
- return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
- }
- @Bean
- public RouteBuilder routeBuilderWithOracle1() {
- return new RouteBuilder() {
- private SortedSet<String> organization;
- private Map<String, Integer> orgIDs;
- private Integer orgID;
- private Map<String, Integer> stringIntegerMap;
- //全部执行完成的大概时间在30-40分钟
- @Override
- public void configure() throws Exception {
- /*from("timer:mytimer1?period=604800000")
- .routeId("oracle-1")
- .setHeader("date", constant(getDate()))
- .setBody(simple("select distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
- .to("jdbc:oracle")
- .log("${header.date}"+"routeId:oracle-1-> select cb_cd_well_source need data failed")
- .transform()
- .body((result) -> {
- organization = new TreeSet<>();
- orgID = 0;
- orgIDs = new HashMap<>();
- return result;
- })
- .step("1")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK");
- String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC");
- String org_level1 = aRow.get("ZYQ").toString();
- aRow.put("station_id", org_level3);
- if (organization.add(org_level1)) {
- orgID++;
- orgIDs.put(org_level1, orgID);
- }
- if (organization.add(org_level2)) {
- orgID++;
- orgIDs.put(org_level2, orgID);
- }
- if (organization.add(org_level3)) {
- orgID++;
- orgIDs.put(org_level3, orgID);
- }
- })
- .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
- "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}')" +
- " ON conflict(well_id) DO UPDATE set remarks = '${body[BZ]}' "))
- .to("jdbc:centralbase")
- .log("${header.date}"+" routeId:oracle-1-> centralbase.cb_cd_well_source insert data failed")
- .end()
- .transform().body((re) -> {
- List<Map<String, Object>> rows = new ArrayList<>();
- int code = 0;
- for (String s : organization) {
- code++;// code is same as org_id
- String[] orgs = s.split("@");
- Map<String, Object> row = new HashMap<>();
- row.put("org_id_pre", s);
- row.put("org_code", code);
- row.put("org_id", "" + code);
- switch (orgs.length) {
- case 1:
- row.put("org_name", orgs[0]);
- row.put("org_level", 1);
- row.put("org_parent", "0");
- break;
- case 2:
- row.put("org_name", orgs[1]);
- row.put("org_level", 2);
- row.put("org_parent", orgIDs.get(orgs[0]).toString());
- break;
- case 3:
- row.put("org_name", orgs[2]);
- row.put("org_level", 3);
- row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString());
- break;
- }
- rows.add(row);
- }
- return rows;
- }).split(body())
- .doTry()
- .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
- "values('${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}')" +
- "ON conflict(org_id_pre) DO UPDATE set org_code = '${body[org_code]}' "))
- .to("jdbc:centralbase")
- .doCatch(Exception.class)
- .log("${header.date}"+" routeId:oracle-1-> centralbase.cb_pc_organization insert data failed")
- .end()
- .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
- .to("jdbc:centralbase")
- .split(body())
- .doTry()
- .setBody(simple("update centralbase.cb_cd_well_source set org_id = '${body[org_id]}' where station_name = '${body[org_name]}'"))
- .to("jdbc:centralbase")
- .doCatch(Exception.class)
- .log("${header.date}"+" routeId:oracle-1-> centralbase.cb_pc_organization update data failed")
- .end();*/
- from("timer:mytimer-insert-statusDaily?period=3600000")
- .routeId("insert-statusDaily")
- .process(exchange -> {
- Message in = exchange.getIn();
- in.setHeader("date",getDate());
- })
- .setBody(simple("select distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
- .to("jdbc:oracle")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- if (aRow.get("YZ") == null) aRow.put("YZ", "0.0");
- if (aRow.get("HYSX") == null) aRow.put("HYSX", "0.0");
- if (aRow.get("YYSX") == null) aRow.put("YYSX", "0.0");
- if (aRow.get("TYSX") == null) aRow.put("TYSX", "0.0");
- if (aRow.get("BS") == null) aRow.put("BS", "0.0");
- })
- .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth) " +
- "select '${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}' " +
- "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_status_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}')"))
- .doTry()
- .to("jdbc:centralbase")
- .doCatch(Exception.class)
- .log("${header.date}"+" routeId:insert-statusDaily -> centralbase.cb_pc_pro_wellbore_status_daily insert data failed")
- .end();
- from("timer:mytimer-update-statusDaily-DYM?period=3600000")
- .routeId("update-statusDaily-DYM")
- .process(exchange -> {
- Message in = exchange.getIn();
- in.setHeader("date",getDate());
- })
- .setBody(simple("select distinct jh , rq , dym from DBA01 where (jh,rq) in (SELECT jh,max(rq) rq FROM DBA01 WHERE dym is not null and rq<= to_date('${header.date}','yyyy-MM-dd') group by jh)"))
- .to("jdbc:oracle")
- .split(body())
- .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' and prod_date::date = '${header.date}' "))
- .doTry()
- .to("jdbc:centralbase")
- .doCatch(Exception.class)
- .log("${header.date}"+" routeId:update-statusDaily-DYM -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
- .end();
- //因为数据库有可能会查出多条数据,现在的做法时,在数据库中增加两个个字段,用来记录BJ和DYM 的更新时间 ,只有获取到的时间大于数据库中存储的时间时,才会更新,并且该字段也会更新
- from("timer:mytimer-update-statusDaily-BJ?period=3600000")
- .routeId("update-statusDaily-BJ")
- .process(exchange -> {
- Message in = exchange.getIn();
- in.setHeader("date",getDate());
- })
- .setBody(simple("select distinct jh , rq , bj from DBA01 where (jh,rq) in (SELECT jh,max(rq) rq FROM DBA01 WHERE bj is not null and rq<= to_date('${header.date}','yyyy-MM-dd') group by jh)"))
- .to("jdbc:oracle")
- .split(body()).process(exchange -> {
- HashMap body = exchange.getIn().getBody(HashMap.class);
- })
- .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[BJ]}' where well_id = '${body[JH]}' and prod_date::date = '${header.date}' "))
- .doTry()
- .to("jdbc:centralbase")
- .doCatch(Exception.class)
- .log("${header.date}"+" routeId:update-statusDaily-BJ -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
- .end();
- from("timer:mytimer-update-statusDaily-submergenceDepth?period=3600000")
- .routeId("update-statusDaily-submergenceDepth")
- .process(exchange -> {
- Message in = exchange.getIn();
- in.setHeader("date",getDate()+" 00:00:00");
- })
- .setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' "))
- .to("jdbc:centralbase")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- aRow.put("submergence_depth",null);
- if (aRow.get("start_pump_liq_level")!=null && aRow.get("pump_depth")!=null){
- double cmd= Double.valueOf(aRow.get("pump_depth").toString())-Double.valueOf(aRow.get("start_pump_liq_level").toString())/10;
- BigDecimal bd=new BigDecimal(cmd);
- double cmd1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
- aRow.put("submergence_depth",cmd1);
- }
- })
- .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set submergence_depth = '${body[submergence_depth]}' where well_id = '${body[well_id]}' and prod_date = '${body[prod_date]}'"))
- .doTry()
- .to("jdbc:centralbase")
- .doCatch(Exception.class)
- .log("${header.date}"+" routeId:update-statusDaily-submergenceDepth -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
- .end();
- /* from("timer:mytimer-update-statusDaily-oil_nozzle?period=1800000")
- .routeId("update-statusDaily-oil_nozzle")
- .setHeader("date", constant(getDate() + " 00:00:00"))
- .setBody(simple("select distinct rn.well_id,cb.prod_date,rn.pump_diameter from centralbase.cb_temp_well_mech_runtime rn ,centralbase.cb_pc_pro_wellbore_status_daily cb where cb.well_id = rn.well_id " +
- "and cb.prod_date = '${header.date}' "))
- .to("jdbc:centralbase")//.log("${body}")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- aRow.putIfAbsent("pump_diameter", "0.0");
- })
- .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[pump_diameter]}' where well_id ='${body[well_id]}' and prod_date='${body[prod_date]}' "))
- .doTry()
- .to("jdbc:centralbase")
- .doCatch(Exception.class)
- .log("${header.date}"+" routeId:update-statusDaily-oil_nozzle -> centralbase.cb_pc_pro_wellbore_status_daily update data failed")
- .end();*/
- from("timer:mytimer-insert-volDaily?period=3600000")
- .routeId("insert-volDaily")
- .process(exchange -> {
- Message in = exchange.getIn();
- in.setHeader("date",getDate());
- })
- .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
- .to("jdbc:oracle")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0");
- if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0");
- if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0");
- if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0");
- if (aRow.get("HS") == null) aRow.put("HS", "0.0");
- if (aRow.get("BZ") == null) aRow.put("BZ", "");
- aRow.put("RCSL",-1);
- aRow.put("QYB",-1);
- aRow.put("SQB",-1);
- if (aRow.get("RCQL")!=null && aRow.get("RCYL")!=null && !aRow.get("RCYL").equals("0.0") && aRow.get("RCYL") != "0.0"){
- double qyb = Double.valueOf(aRow.get("RCQL").toString()) / Double.valueOf(aRow.get("RCYL").toString());
- if (!Double.isNaN(qyb) && !Double.isInfinite(qyb)){
- BigDecimal bd=new BigDecimal(qyb);
- double d1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
- aRow.put("QYB",d1);
- }
- }
- if (aRow.get("RCYL1")!=null && aRow.get("HS") != null && !aRow.get("HS").equals("0.0") && aRow.get("HS") != "0.0"){
- double rcsl = (Double.valueOf(aRow.get("RCYL1").toString()) * Double.valueOf(aRow.get("HS").toString()))/100;
- if (!Double.isNaN(rcsl) && !Double.isInfinite(rcsl)) {
- BigDecimal bd = new BigDecimal(rcsl);
- double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
- aRow.put("RCSL", d1);
- }
- }
- if (aRow.get("RCQL") != null && aRow.get("RCSL") != null && !aRow.get("RCQL").equals("0.0") && aRow.get("RCQL") != "0.0" ){
- double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("RCQL").toString());
- if (!Double.isNaN(sqb) && !Double.isInfinite(sqb)) {
- BigDecimal bd = new BigDecimal(sqb);
- double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
- aRow.put("SQB", d1);
- }
- }
- if (!aRow.containsKey("SMD")){
- aRow.put("SMD",1);
- }
- if (!aRow.containsKey("YMD")){
- aRow.put("YMD",0.85);
- }
- })
- .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks,gas_oil_ratio,water_prod_daily,water_gas_ratio,surface_crude_water_density,surface_crude_oil_density) " +
- "select '${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}','${body[QYB]}','${body[RCSL]}','${body[SQB]}','${body[SMD]}','${body[YMD]}' " +
- "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}' )"))
- .doTry()
- .to("jdbc:centralbase")
- .doCatch(Exception.class)
- .log("${header.date}"+" routeId:insert-volDaily -> centralbase.cb_pc_pro_wellbore_vol_daily insert data failed")
- .endDoTry()
- .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set water_prod_daily =null where water_prod_daily = -1 and prod_date = '${header.date}' "))
- .to("jdbc:centralbase")
- .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set gas_oil_ratio =null where gas_oil_ratio = -1 and prod_date = '${header.date}' "))
- .to("jdbc:centralbase")
- .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set water_gas_ratio =null where water_gas_ratio = -1 and prod_date = '${header.date}' "))
- .to("jdbc:centralbase")
- .end();
- from("timer:mytimer-update-volDaily-liq_prod_daily?period=3600000")
- .routeId("update-volDaily-liq_prod_daily")
- .process(exchange -> {
- Message in = exchange.getIn();
- in.setHeader("date",getDate());
- })
- .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from DBA01 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
- .to("jdbc:oracle")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0");
- if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0");
- if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0");
- if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0");
- if (aRow.get("HS") == null) aRow.put("HS", "0.0");
- if (aRow.get("BZ") == null) aRow.put("BZ", "");
- aRow.put("RCSL",-1);
- aRow.put("QYB",-1);
- aRow.put("SQB",-1);
- if (aRow.get("RCQL")!=null && aRow.get("RCYL")!=null && !aRow.get("RCYL").equals("0.0") && aRow.get("RCYL") != "0.0"){
- double qyb = Double.valueOf(aRow.get("RCQL").toString()) / Double.valueOf(aRow.get("RCYL").toString());
- if (!Double.isNaN(qyb) && !Double.isInfinite(qyb)){
- BigDecimal bd=new BigDecimal(qyb);
- double d1=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
- aRow.put("QYB",d1);
- }
- }
- if (aRow.get("RCYL1")!=null && aRow.get("HS") != null && !aRow.get("HS").equals("0.0") && aRow.get("HS") != "0.0"){
- double rcsl = (Double.valueOf(aRow.get("RCYL1").toString()) * Double.valueOf(aRow.get("HS").toString()))/100;
- if (!Double.isNaN(rcsl) && !Double.isInfinite(rcsl)) {
- BigDecimal bd = new BigDecimal(rcsl);
- double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
- aRow.put("RCSL", d1);
- }
- }
- if (aRow.get("RCQL") != null && aRow.get("RCSL") != null && !aRow.get("RCQL").equals("0.0") && aRow.get("RCQL") != "0.0" ){
- double sqb = Double.valueOf(aRow.get("RCSL").toString()) / Double.valueOf(aRow.get("RCQL").toString());
- if (!Double.isNaN(sqb) && !Double.isInfinite(sqb)) {
- BigDecimal bd = new BigDecimal(sqb);
- double d1 = bd.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
- aRow.put("SQB", d1);
- }
- }
- if (!aRow.containsKey("SMD")){
- aRow.put("SMD",1);
- }
- if (!aRow.containsKey("YMD")){
- aRow.put("YMD",0.85);
- }
- })
- .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set prod_time = '${body[SCSJ]}' ,liq_prod_daily='${body[RCYL1]}' ,oil_prod_daily ='${body[RCYL]}' ,gas_prod_daily ='${body[RCQL]}' ,water_cut='${body[HS]}' ,remarks='${body[BZ]}' ,gas_oil_ratio='${body[QYB]}' ,water_prod_daily='${body[RCSL]}' ,water_gas_ratio='${body[SQB]}',surface_crude_water_density='${body[SMD]}',surface_crude_oil_density= '${body[YMD]}' " +
- "where well_id = '${body[JH]}' and prod_date ='${body[RQ]}' "))
- .doTry()
- .to("jdbc:centralbase")
- .doCatch(Exception.class)
- .log("${header.date}"+" routeId:update-volDaily-liq_prod_daily -> centralbase.cb_pc_pro_wellbore_vol_daily update data failed")
- .end();
- };
- };
- }
- }
|