123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637 |
- package com.gct.tools.etlcamelhuge.routeconfig;
- import com.gct.tools.etlcamelhuge.MQ.MessageBody;
- import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
- import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
- import org.apache.camel.*;
- import org.apache.camel.builder.RouteBuilder;
- //import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.spring.core.RocketMQTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import javax.annotation.Resource;
- import java.math.BigDecimal;
- import java.text.SimpleDateFormat;
- import java.time.LocalDateTime;
- import java.util.*;
- /**
- * class name: CamelJDBCConfiguration
- *
- * @author lloyd
- * @version 1.0
- * @since 2021/4/14 下午3:16
- */
- @Configuration
- public class CamelJDBCConfiguration /*extends RouteBuilder */ {
- @Autowired
- private RocketMQTemplate rocketMQTemplate;
- /***
- * 从天安那边获取功图数据
- */
- /* @Override
- public void configure() throws Exception {*/
- /* SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
- Date date = new Date(System.currentTimeMillis());
- String formatDate = formatter.format(date) + " 00:00:00";
- // 一小时执行一次
- from("timer:mytimer1?period=3600000")
- .routeId("jdbc-gtsj-?")
- .setHeader("dyna_create_time", constant(formatDate))
- .setBody(simple("select well_id,dyna_create_time,check_date,displacement,disp_load from public.pc_fd_pumpjack_dyna_dia_t where well_id ='SQ2306' and dyna_create_time > '${header.dyna_create_time}'::timestamp and dyna_create_time <'${header.dyna_create_time}'::timestamp + '1 day' order by dyna_create_time limit 1 "))
- .to("jdbc:gtsj")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
- aRow.put("dyna_create_time", prod_date);
- if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
- String[] displacements = aRow.get("displacement").toString().split(";");
- String[] disp_loads = aRow.get("disp_load").toString().split(";");
- String sgt = "";
- for (int i = 0; i < displacements.length; i++) {
- sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
- }
- String[] s = sgt.split(",");
- String w = "";
- for (int i = 0; i < s.length; i++) {
- w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
- }
- aRow.put("sgt", w);
- }
- })//从这里开始
- .setHeader("MQOne", simple("${body}"))
- .setHeader("well_name", simple("${body[well_id]}"))
- //.setHeader("pord_date", simple("${body[dyna_create_time]}"))
- //.setHeader("rcv_date", simple("${body[check_date]}"))
- //.setHeader("sgt", simple("${body[sgt]}"))
- .setBody(simple("update centralbase.cb_temp_well_mech_runtime set prod_date = '${body[dyna_create_time]}',sgt = '${body[sgt]}' where well_id = (select well_id from centralbase.cb_cd_well_source where well_common_name = '${body[well_id]}')"))
- .to("jdbc:centralbase")
- .setBody(simple("select well_id,org_id from centralbase.cb_cd_well_source where well_common_name = '${in.header.well_name}' "))
- .to("jdbc:centralbase")
- .setHeader("MQTwo", simple("${body}"))
- //.setHeader("well_id", simple("${body[well_id]}"))
- //.setHeader("org_id", simple("${body[org_id]}"))
- .process(exchange -> {
- HashMap mqOne = exchange.getMessage().getHeader("MQOne", HashMap.class);
- HashMap mqTwo = exchange.getMessage().getHeader("MQTwo", HashMap.class);
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("well_name", mqOne.get("well_id"));
- jsonObject.put("org_id", mqTwo.get("org_id"));
- jsonObject.put("prod_date", mqOne.get("dyna_create_time"));
- jsonObject.put("rcv_date", mqOne.get("check_date"));
- jsonObject.put("sgt", mqOne.get("sgt"));
- rocketMQTemplate.sendOneWayOrderly("Diagnose_MsgV1", jsonObject, mqTwo.get("well_id").toString());
- }).log("insert!!!").end();
- //从天安那边获取状态日数据
- from("timer:mytimer2?period=3600000")
- .routeId("jdbc-ztr-?")
- .setHeader("acqui_time", constant(formatDate))
- .setBody(simple("select well_id,acqui_time,phase_a_current,phase_b_current,phase_c_current,tubing_pre,casing_pre,phase_a_voltage,phase_b_voltage,phase_c_voltage from public.t_dyna_well_oil where acqui_time > '${header.acqui_time}'::timestamp and acqui_time <'${header.acqui_time}'::timestamp + '1 day'"))
- .to("jdbc:gtsj")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- if (aRow.get("phase_a_current").toString() == null || aRow.get("phase_b_current").toString() == null || aRow.get("phase_c_current").toString() == null || aRow.get("tubing_pre").toString() == null || aRow.get("casing_pre").toString() == null || aRow.get("phase_a_voltage").toString() == null || aRow.get("phase_b_voltage").toString() == null || aRow.get("phase_c_voltage").toString() == null) {
- aRow.put("phase_a_current", "0.0");
- aRow.put("phase_b_current", "0.0");
- aRow.put("phase_c_current", "0.0");
- aRow.put("tubing_pre", "0.0");
- aRow.put("casing_pre", "0.0");
- aRow.put("phase_a_voltage", "0.0");
- aRow.put("phase_b_voltage", "0.0");
- aRow.put("phase_c_voltage", "0.0");
- }
- })
- .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set prod_date = '${body[acqui_time]}'::timestamp,elec_pump_current_a = '${body[phase_a_current]}',elec_pump_current_b = '${body[phase_b_current]}',elec_pump_current_c = '${body[phase_c_current]}',tubing_pres = '${body[tubing_pre]}',casing_pres = '${body[casing_pre]}' ,elec_pump_voltage_a = '${body[phase_a_voltage]}',elec_pump_voltage_b = '${body[phase_b_voltage]}',elec_pump_voltage_c = '${body[phase_c_voltage]}' where well_id = (select well_id from centralbase.cb_cd_well_source where well_common_name = '${body[well_id]}')"))
- .to("jdbc:centralbase")
- .log("insert completed")
- .end();*/
- /* from("timer:send?period=30000")
- .routeId("Diagnose_MsgV1")
- //.setBody(simple("select * from "))
- .process(exchange -> {
- JSONObject jo = new JSONObject();
- jo.put("well_name", "XJ_001");
- jo.put("org_id", "14");
- jo.put("prod_date", "2021-01-25 09:00:00");
- jo.put("sgt", "0,4176,0,4208,1,4256,3,4304,6,4416,9,4496,12,4576,17,4768,21,4864,25,4912,32,4864,38,4784,43,4704,52,4560,59,4496,66,4480,73,4496,84,4608,92,4688,100,4736,112,4816,121,4816,129,4800,143,4704,152,4640,161,4592,175,4560,185,4560,194,4576,204,4592,218,4640,227,4656,237,4656,251,4640,260,4624,269,4592,283,4512,292,4480,300,4464,313,4432,321,4432,329,4448,337,4448,348,4432,355,4400,362,4368,371,4304,377,4256,383,4208,390,4128,395,4112,399,4096,405,4064,408,4064,411,4047,414,4064,416,4064,418,4047,419,4047,419,4032,419,4015,419,3968,417,3904,416,3856,414,3792,410,3600,407,3536,403,3504,399,3520,393,3584,388,3647,383,3711,374,3824,368,3808,362,3776,352,3679,345,3615,337,3568,326,3504,317,3520,309,3536,301,3584,288,3647,279,3664,270,3664,256,3647,247,3600,237,3568,223,3520,213,3504,204,3504,190,3536,180,3568,171,3600,161,3615,148,3647,139,3647,130,3632,117,3615,108,3615,100,3615,88,3632,80,3664,73,3696,63,3760,56,3792,50,3808,44,3840,35,3856,30,3856,25,3872,19,3904,15,3920,12,3952,7,4015,5,4047,3,4064,1,4096,0,4112,0,4112");
- //RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
- DefaultMQProducer producer = new DefaultMQProducer("test1.json");
- producer.setNamesrvAddr("10.72.143.2:9876");
- // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
- producer.start();
- org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message();
- message.setKeys("XJ_001");
- message.setBody(jo.toJSONString().getBytes(StandardCharsets.UTF_8));
- message.setTopic("Diagnose_MsgV1");
- rocketMQTemplate.sendOneWayOrderly("Diagnose_MsgV1",jo,"XJ_001");
- producer.sendOneway(message);
- System.out.println(" send ");
- }).end();
- };*/
- /* @Bean
- public RouteBuilder routeBuilder() {
- ArrayList<HashMap<String, Object>> hashMaps = new ArrayList<>();
- return new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("timer://foo?period=1000")
- .routeId("jdbc-pg-?")
- .setHeader("prod_time_ctl", constant("2021-02-27 10:00:00.000000"))
- //.setBody(constant("select * from centralbase.cb_temp_well_mech_runtime where prod_date= :prod_time_ctl "))
- .setBody(simple("select * from centralbase.cb_temp_well_mech_runtime where prod_date= :?prod_time_ctl ::timestamp "))
- .log("select body :${body}")
- .to("jdbc:centralbase?useHeadersAsParameters=true")
- .tracing()
- //via transform
- .transform()
- .body(result -> {
- ArrayList<HashMap<String, Object>> list = hashMaps.getClass().cast(result);
- log.info("The data count:{}", list.toString());
- return list;
- })
- //split data to one by one
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- log.info("The data count2222:{}", aRow.toString());
- })
- .log("ok?").setHeader("dddd", constant("ssss"))
- .process().message(message -> {
- log.info("message:{}", message.getHeader("dddd"));
- ArrayList<HashMap<String, Object>> body = message.getBody(ArrayList.class);
- log.info("body::::{}",body.toString());
- })
- .process(exchange -> {
- Message in = exchange.getIn();
- ArrayList<HashMap<String, Object>> aRow = in.getBody(ArrayList.class);
- log.info("The data count3333:{}", aRow.toString());
- })
- //create "insert sql" with parameters
- .setBody(simple("insert into dgns_gtbaseparam(wellname,recivetime,sgt) values('${body[well_id]}','${body[prod_date]}' :: timestamp,'${body[sgt]}')"))
- //now for test1.json
- //.setBody(simple("insert into dgns_gtbaseparam(wellname,recivetime,sgt) values('${body[well_id]}',now(),'${body[sgt]}')"))
- //look up the insert sql
- .process().message(message -> {
- log.info("The insert sql {}", message.getBody(String.class));
- })
- //try test1.json
- .doTry()
- .to("jdbc:diagnosis").log("insert completed")
- .doCatch(Throwable.class)
- .log("primary key already exited,insert failed")
- //"if" statement test1.json
- .choice()
- .when(exchange -> {
- return exchange.getException() != null;
- if (exchange.getException() != null) {
- log.error(exchange.getException().getMessage());
- return exchange.getException().getMessage().contains("already");
- }
- return false;
- }).log("primary key already exited,insert failed")
- .otherwise().log("insert completed")
- .endChoice()
- .end();
- }
- ;
- };
- }*/
- /*@Bean
- public RouteBuilder routeBuilderWithOracle() {
- ArrayList<HashMap<String, Object>> hashMaps = new ArrayList<>();
- MQDiagnoseMsg mqDiagnoseMsg = new MQDiagnoseMsg();
- return new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- from("timer:send?period=30000")
- .routeId("Diagnose_MsgV1")
- .setBody(simple("select count(org_id)+1 as count from centralbase.cb_pc_organization "))
- .to("jdbc:centralbase")
- .process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- exchange.getIn().setHeader("count" ,aRow.get("count"));
- })
- .setBody(simple("select distinct station_id,org_id,station_name from centralbase.cb_cd_well_source where station_id not in (select org_id_pre from centralbase.cb_pc_organization) "))
- .to("jdbc:centralbase")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- aRow.put("newId",aRow.get("org_id")+"@"+aRow.get("station_id"));
- })
- .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_id_pre,parent_id,org_name,org_level) values('${in.header.count}','${body[station_id]}','${body[org_id]}','${body[station_name]}','3')"))
- .to("jdbc:centralbase")
- // 因为使用的是每次count 求和之后当作org_id 插入,且只有一个唯一约束,所以在新增之后,会出现脏数据,加了一个删除,将不符合要求脏数据的都去掉
- .setBody(simple("delete from centralbase.cb_pc_organization where org_name is null or parent_id is null or org_id_pre is null"))
- .to("jdbc:centralbase")
- .setBody(simple("select c1.org_id,c1.org_id_pre from centralbase.cb_pc_organization c1 ,centralbase.cb_cd_well_source c2 where c1.org_id_pre = c2.station_id"))
- .to("jdbc:centralbase")
- .split(body())
- .setBody(simple("update centralbase.cb_cd_well_source set org_id ='${body[org_id]}' where station_id = '${body[org_id_pre]}' "))
- .to("jdbc:centralbase")
- .end();
- };
- };
- }*/
- //获取最小载荷
- public Double min(String[] strings){
- double[] doubles = new double[strings.length];
- for (int i = 0; i < strings.length; i++) {
- doubles[i] = Double.parseDouble(strings[i]);
- }
- return Arrays.stream(doubles).min().getAsDouble();
- }
- //获取最大载荷
- public Double max(String[] strings){
- double[] doubles = new double[strings.length];
- for (int i = 0; i < strings.length; i++) {
- doubles[i] = Double.parseDouble(strings[i]);
- }
- return Arrays.stream(doubles).max().getAsDouble();
- }
- @Resource(name = "diagnoseMessageProducer")
- private MessageProducer producer;
- @Bean
- public RouteBuilder routeBuilderWithOracle1() {
- SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
- Date date = new Date(System.currentTimeMillis());
- String date1 = formatter.format(date);
- String formatDate = formatter.format(date) + " 00:00:00";
- return new RouteBuilder() {
- private SortedSet<String> organization;
- private Map<String, Integer> orgIDs;
- private Integer orgID;
- //全部执行完成的大概时间在30-40分钟
- @Override
- public void configure() throws Exception {
- //24小时执行一次
- //单个执行时间30s左右,在之前有数据的情况下
- from("timer:mytimer1?period=604800000")
- .routeId("oracle-1")
- .setHeader("date", constant(date1))
- .setBody(simple("select distinct jh,cydmc,zyq,zk,qyrq,sccw,qk,bz from zd_zdgs.dba01@A2 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
- .to("jdbc:oracle")
- .transform()
- .body((result) -> {
- organization = new TreeSet<>();
- orgID = 0;
- orgIDs = new HashMap<>();
- return result;
- })
- .step("1")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK");
- String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC");
- String org_level1 = aRow.get("ZYQ").toString();
- aRow.put("station_id", org_level3);
- //这里是重新生成的orgid,最好先查一下centralbase里已有的
- //默认一张新表
- if (organization.add(org_level1)) {
- orgID++;
- orgIDs.put(org_level1, orgID);
- }
- if (organization.add(org_level2)) {
- orgID++;
- orgIDs.put(org_level2, orgID);
- }
- if (organization.add(org_level3)) {
- orgID++;
- orgIDs.put(org_level3, orgID);
- }
- })
- .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
- "values ('${body[JH]}','${body[JH]}','${body[QYRQ]}'::timestamp,'${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[BZ]}')" +
- " ON conflict(well_id) DO UPDATE set remarks = '${body[BZ]}' "))
- .to("jdbc:centralbase")
- .end()
- .transform().body((re) -> {
- List<Map<String, Object>> rows = new ArrayList<>();
- int code = 0;
- for (String s : organization) {
- code++;// code is same as org_id
- String[] orgs = s.split("@");
- Map<String, Object> row = new HashMap<>();
- row.put("org_id_pre", s);
- row.put("org_code", code);
- row.put("org_id", "" + code);
- switch (orgs.length) {
- case 1:
- row.put("org_name", orgs[0]);
- row.put("org_level", 1);
- row.put("org_parent", "0");
- break;
- case 2:
- row.put("org_name", orgs[1]);
- row.put("org_level", 2);
- row.put("org_parent", orgIDs.get(orgs[0]).toString());
- break;
- case 3:
- row.put("org_name", orgs[2]);
- row.put("org_level", 3);
- row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString());
- break;
- }
- rows.add(row);
- }
- return rows;
- }).split(body())
- .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
- "values('${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}')" +
- "ON conflict(org_id_pre) DO UPDATE set org_code = '${body[org_code]}' "))
- .to("jdbc:centralbase")
- .end()
- .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
- .to("jdbc:centralbase")
- .split(body())
- .setBody(simple("update centralbase.cb_cd_well_source set org_id = '${body[org_id]}' where station_name = '${body[org_name]}'"))
- .to("jdbc:centralbase")
- .log("insert")
- .end();
- //单独执行时间10s
- from("timer:mytimer2?period=3600000")
- .routeId("oracle-2")
- .setHeader("date", constant(date1))
- .setBody(simple("select distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from zd_zdgs.dba01@A2 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
- //.setBody(simple("select distinct jh,rq,cyfs,yz,hysx , yysx ,tysx,bs,dym from zd_zdgs.dba01@A2 where rq = to_date('2021-07-05','yyyy-MM-dd') and qyrq is not null "))
- .to("jdbc:oracle")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- if (aRow.get("YZ") == null) aRow.put("YZ", "0.0");
- if (aRow.get("HYSX") == null) aRow.put("HYSX", "0.0");
- if (aRow.get("YYSX") == null) aRow.put("YYSX", "0.0");
- if (aRow.get("TYSX") == null) aRow.put("TYSX", "0.0");
- if (aRow.get("BS") == null) aRow.put("BS", "0.0");
- })
- .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth)" +
- "values ('${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}')"))
- .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth) " +
- "select '${body[JH]}','${body[RQ]}','${body[CYFS]}','${body[YZ]}','${body[HYSX]}','${body[YYSX]}','${body[TYSX]}','${body[BS]}' " +
- "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_status_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}')"))
- .to("jdbc:centralbase")
- .log("insert !!!")
- .end();
- //查询井对应dym不为空的数据 --目前是只要对应井能查到dym不为空的,无论是什么时间的,都放进去
- //将查询到的DYM数据更新到cb_pc_pro_wellbore_status_daily中
- //0 0 */1 * * ? 每1个小时执行一次
- //单独执行时间是4m15s 317条数据
- from("timer:mytimer5?period=3600000")
- .routeId("oracle-5")
- .setHeader("date", constant(date1 + " 00:00:00"))
- //三个月之内dym不为空的数据
- //.setBody(simple("SELECT distinct jh,rq,dym FROM zd_zdgs.dba01@A2 WHERE rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
- .setBody(simple("SELECT distinct jh,max(rq),dym FROM zd_zdgs.dba01@A2 WHERE dym is not null group by jh,dym"))
- .to("jdbc:oracle")
- .split(body())
- .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}' where well_id = '${body[JH]}' "))
- .to("jdbc:centralbase")
- .log("insert !!!")
- .end();
- //单独执行时间30s
- from("timer:mytimer3?period=3600000")
- .routeId("oracle-3")
- .setHeader("date", constant(date1))
- .setBody(simple("select distinct jh,rq,scsj, rcyl1,rcyl,rcql,hs, bz from zd_zdgs.dba01@A2 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
- .to("jdbc:oracle")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- if (aRow.get("SCSJ") == null) aRow.put("SCSJ", "0.0");
- if (aRow.get("RCYL1") == null) aRow.put("RCYL1", "0.0");
- if (aRow.get("RCYL") == null) aRow.put("RCYL", "0.0");
- if (aRow.get("RCQL") == null) aRow.put("RCQL", "0.0");
- if (aRow.get("HS") == null) aRow.put("HS", "0.0");
- if (aRow.get("BZ") == null) aRow.put("DYM", "");
- })
- .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks) " +
- "values ('${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}')"))
- .setBody(simple("insert into centralbase.cb_pc_pro_wellbore_vol_daily(well_id,prod_date,prod_time,liq_prod_daily,oil_prod_daily,gas_prod_daily,water_cut,remarks) " +
- "select '${body[JH]}','${body[RQ]}','${body[SCSJ]}','${body[RCYL1]}','${body[RCYL]}','${body[RCQL]}','${body[HS]}','${body[BZ]}' " +
- "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_pro_wellbore_vol_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}' )"))
- .to("jdbc:centralbase")
- .log("insert !!!")
- .end();
- //0 0 */1 * * ? 每1个小时执行一次
- //单独执行一次30s
- from("timer:mytimer4?period=3600000")
- .routeId("oracle-4")
- .setHeader("date", constant(date1))
- .setBody(simple("select distinct jh,rq,dym,jy,ly,bj,bs,bx,zs,cc,cs,blx,dl from zd_zdgs.dba01@A2 where rq = to_date('${header.date}','yyyy-MM-dd') and qyrq is not null "))
- .to("jdbc:oracle")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- if (aRow.get("JY") == null) aRow.put("JY", "0.0");
- if (aRow.get("LY") == null) aRow.put("LY", "0.0");
- if (aRow.get("BJ") == null) aRow.put("BJ", "0.0");
- if (aRow.get("BS") == null) aRow.put("BS", "0.0");
- if (aRow.get("BX") == null) aRow.put("BX", "0.0");
- if (aRow.get("ZS") == null) aRow.put("ZS", "0.0");
- if (aRow.get("CC") == null) aRow.put("CC", "0.0");
- if (aRow.get("CS") == null) aRow.put("CS", "0.0");
- if (aRow.get("BLX") == null) aRow.put("BLX", "");
- if (aRow.get("DL") == null) aRow.put("DL", "0.0");
- })
- .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency)" +
- "values ('${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}')"))
- .setBody(simple("insert into centralbase.cb_temp_well_mech_daily(well_id,prod_date,static_pressure,flow_pres,pump_diameter,pump_depth,pump_efficiency,rotate_frequency,stroke_length,stroke_frequency,pump_type,elec_frequency) " +
- "select '${body[JH]}','${body[RQ]}','${body[JY]}','${body[LY]}','${body[BJ]}','${body[BS]}','${body[BX]}','${body[ZS]}','${body[CC]}','${body[CS]}','${body[BLX]}','${body[DL]}' " +
- "where NOT EXISTS ( SELECT * FROM centralbase.cb_temp_well_mech_daily WHERE well_id = '${body[JH]}' and prod_date = '${body[RQ]}' )"))
- .to("jdbc:centralbase")
- .log("insert !!!")
- .end();
- from("timer:mytimer6?period=3600000")
- .routeId("oracle-6")
- .setHeader("date", constant(date1 + " 00:00:00"))
- //五个月之内bj不为空的数据
- //.setBody(simple("SELECT distinct jh,rq,dym FROM zd_zdgs.dba01@A2 WHERE rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-3),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and dym is not null;"))
- .setBody(simple("SELECT distinct jh,bj FROM zd_zdgs.dba01@A2 WHERE rq between to_date(TO_CHAR(ADD_MONTHS(SYSDATE,-5),'yyyy-MM-dd'),'yyyy-MM-dd')and to_date(TO_CHAR(SYSDATE,'yyyy-MM-dd'),'yyyy-MM-dd') and bj is not null"))
- .to("jdbc:oracle")
- .split(body())
- .setBody(simple("update centralbase.cb_temp_well_mech_runtime set pump_diameter = '${body[BJ]}' where well_id = '${body[JH]}' "))
- .to("jdbc:centralbase")
- .log("insert !!!")
- .end();
- //从天安哪里获取的数据
- //0 0 */1 * * ? 每1个小时执行一次
- //单独执行一小时的数据30s
- from("timer:mytimer7?period=3600000")
- .routeId("jdbc-gtsj-?")
- .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
- .to("jdbc:centralbase")
- .split(body())
- .setHeader("date", simple("${body[max]}"))
- //.setHeader("dyna_create_time", constant(formatDate))
- .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where dyna_create_time > '${header.date}' "))
- .to("jdbc:gtsj")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
- aRow.put("dyna_create_time", prod_date);
- if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
- String[] displacements = aRow.get("displacement").toString().split(";");
- String[] disp_loads = aRow.get("disp_load").toString().split(";");
- Double susp_max_load = max(disp_loads);
- Double susp_min_load = min(disp_loads);
- String sgt = "";
- for (int i = 0; i < displacements.length; i++) {
- sgt = sgt + displacements[i] + "," + disp_loads[i] + ",";
- }
- String[] s = sgt.split(",");
- String w = "";
- for (int i = 0; i < s.length; i++) {
- w += new BigDecimal(Math.round(Double.parseDouble(s[i]) * 100)).stripTrailingZeros().toPlainString() + ",";
- }
- aRow.put("sgt", w);
- aRow.put("susp_max_load",susp_max_load);
- aRow.put("susp_min_load",susp_min_load);
- }
- if (aRow.get("stroke") == null) aRow.put("stroke", "0.0");
- if (aRow.get("frequency") == null) aRow.put("frequency", "0.0");
- if (aRow.get("susp_max_load") == null) aRow.put("susp_max_load", "0.0");
- if (aRow.get("susp_min_load") == null) aRow.put("susp_min_load", "0.0");
- })
- .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
- "values ('${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}')"))
- .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
- "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
- "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and prod_date = '${body[dyna_create_time]}' )"))
- .to("jdbc:centralbase")
- .log("insert!!!").end();
- from("timer:mytimer1?period=3600000")
- .routeId("centralbase-1")
- .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) "))
- //.setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date ='2021-07-12 10:25:00' and ti.well_id = '台24' limit 1 "))
- .to("jdbc:centralbase")
- .split(body())
- .process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- String wellName =aRow.get("well_common_name").toString();
- String wellId =aRow.get("well_id").toString();
- String orgId = aRow.get("org_id").toString();
- String prodDate = aRow.get("prod_date").toString().substring(0,19);
- Double strokeLength = Double.valueOf(aRow.get("stroke_length").toString());
- Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString());
- String sgt = aRow.get("sgt").toString();
- DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
- producer.send((MessageBody) diagnoseMsg);
- }).log("send success").end();
- //-------------------------------------------------------------------------------------------------------------------------------------------------------------------
- /* from("timer:mytimer2?period=3600000")
- .routeId("oracle-2")
- .setBody(simple("select distinct station_id,station_name from centralbase.cb_cd_well_source"))
- .to("jdbc:centralbase")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);
- String parent_id = aRow.get("station_id").toString().substring(0, aRow.get("station_id").toString().lastIndexOf("@"));
- aRow.put("parent_id",parent_id);
- String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10);
- aRow.put("org_id",org_id);
- })
- .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" +
- "values('${body[org_id]}','${body[station_name]}','3','${body[parent_id]}')"))
- .to("jdbc:centralbase")
- .log("insert two")
- .end();
- from("timer:mytimer3?period=999999999")
- .routeId("oracle-3")
- .setBody(simple("select distinct parent_id from centralbase.cb_pc_organization"))
- .to("jdbc:centralbase")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);//沙南作业区@沙采二区
- String parent_id = aRow.get("parent_id").toString().split("@")[0];
- String org_name = aRow.get("parent_id").toString().split("@")[1];
- //if (!aRow.containsValue(org_name)){
- aRow.put("org_name",org_name);
- // }
- //if (!aRow.containsValue(parent_id)){
- aRow.put("parent_id",parent_id);
- //}
- String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10);
- aRow.put("levelTwoOrgId",org_id);
- })
- .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" +
- "values('${body[levelTwoOrgId]}','${body[org_name]}','2','${body[parent_id]}')"))
- .to("jdbc:centralbase")
- .log("insert !!!")
- .end();
- from("timer:mytimer4?period=999999999")
- .routeId("oracle-4")
- .setBody(simple("select distinct parent_id from centralbase.cb_pc_organization where org_level = '2'"))
- .to("jdbc:centralbase")
- .split(body()).process(exchange -> {
- Message in = exchange.getIn();
- HashMap<String, Object> aRow = in.getBody(HashMap.class);//沙南作业区@沙采二区
- String org_id = UUID.randomUUID().toString().replace("-","").substring(0,10);
- aRow.put("levelOneOrgId",org_id);
- })
- .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_name,org_level,parent_id)" +
- "values('${body[levelOneOrgId]}','${body[parent_id]}','1','0')"))
- .to("jdbc:centralbase")
- .log("insert !!!")
- .end();
- from("timer:mytimer5?period=999999999")
- .routeId("oracle-5")
- .setBody(simple("select org_id,org_name,parent_id from centralbase.cb_pc_organization where org_level = '1' "))
- .to("jdbc:centralbase")
- .split(body())
- .setBody(simple("update centralbase.cb_pc_organization set parent_id = (select org_id from centralbase.cb_pc_organization where org_name = '${body[org_name]}' ) " +
- "where org_level = '2' and parent_id = '${body[org_name]}'"))
- .to("jdbc:centralbase")
- .setBody(simple("select org_id,org_name,parent_id from centralbase.cb_pc_organization where org_level = '2'"))
- .to("jdbc:centralbase")
- .split(body())
- .setBody(simple("update centralbase.cb_pc_organization set parent_id = (select org_id from centralbase.cb_pc_organization where org_name = '${body[org_name]}' ) " +
- "where org_level = '3' and split_part(parent_id,'@',2) = '${body[org_name]}'"))
- .to("jdbc:centralbase")
- .log("insert !!!")
- .end();
- */
- /// from("quartz://name?cron=0 0 */1 * * ?") //0 0 */1 * * ? 每1个小时执行一次
- // .routeId("oracle-6")
- // .setBody(simple("select org_id,org_name from centralbase.cb_pc_organization where org_level = '3' "))
- // .to("jdbc:centralbase")
- // .split(body()).log("${body}")
- // .end();
- }
- ;
- };
- }
- }
|