CamelInsertA2Configuration.java 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package com.gct.tools.etlcamelhuge.routeconfig;
  2. import com.alibaba.fastjson.JSON;
  3. import org.apache.camel.Message;
  4. import org.apache.camel.builder.RouteBuilder;
  5. import org.apache.camel.model.ExpressionNode;
  6. import org.apache.camel.model.ProcessorDefinition;
  7. import org.apache.camel.model.RouteDefinition;
  8. import org.springframework.beans.factory.annotation.Value;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.util.StringUtils;
  12. import java.math.BigDecimal;
  13. import java.sql.Timestamp;
  14. import java.time.Instant;
  15. import java.time.LocalDateTime;
  16. import java.time.ZoneId;
  17. import java.time.ZoneOffset;
  18. import java.time.format.DateTimeFormatter;
  19. import java.util.*;
  20. /**
  21. * class name: CamelJDBCConfiguration
  22. *
  23. * @author lloyd
  24. * @version 1.0
  25. * @since 2021/4/14 下午3:16
  26. */
  27. @Configuration
  28. public class CamelInsertA2Configuration {
  29. @Value("${insertA2.cron}")
  30. private String cron;
  31. @Bean
  32. public RouteBuilder routeBuilderWithInsertA2() {
  33. return new RouteBuilder() {
  34. @Override
  35. public void configure() throws Exception {
  36. from("quartz:tab?cron="+cron)
  37. .routeId("update-test")
  38. .process(exchange -> {
  39. Message in = exchange.getIn();
  40. in.setHeader("date",getDate());
  41. })
  42. .setBody(simple("select well_id,prod_date,liq_prod_daily from aoid.aoid_daily_yield where prod_date='${header.date}' "))
  43. .to("jdbc:aoid")
  44. .split(body()).process(exchange -> {
  45. Message in = exchange.getIn();
  46. HashMap<String, Object> aRow = in.getBody(HashMap.class);
  47. //aRow.putIfAbsent("liq_prod_daily", null);
  48. Timestamp timestamp = (Timestamp) aRow.get("prod_date");
  49. Instant instant = Instant.ofEpochMilli(timestamp.getTime());
  50. LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
  51. aRow.put("prod_date",localDateTime.plusDays(1).toLocalDate().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
  52. })
  53. .setBody(simple(" insert into DMS_DBA01_RCYL (JH, RQ, RCYL) values('${body[well_id]}', to_date(to_char('${body[prod_date]}'),'yyyy-MM-dd'),'${body[liq_prod_daily]}') "))
  54. .to("jdbc:insertA2")
  55. .end();
  56. };
  57. };
  58. }
  59. private String getDate(){
  60. return LocalDateTime.now().minusDays(1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
  61. }
  62. }