浏览代码

修改runtime表导入数据不全的bug

gxt 3 年之前
父节点
当前提交
380f98fa6e

+ 5 - 6
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCCofRealTimeConfiguration.java

@@ -71,17 +71,14 @@ public class CamelJDBCCofRealTimeConfiguration  {
71 71
             public void configure() throws Exception {
72 72
                 from("timer:mytimer-insert-runtime?period=1800000")
73 73
                         .routeId("insert-runtime")
74
-                        .setBody(simple("select max(prod_date)  from centralbase.cb_temp_well_mech_runtime "))
75
-                        .to("jdbc:centralbase")
76
-                        .split(body())
77
-                        .setHeader("date", simple("${body[max]}"))
78
-                        .setBody(simple("select well_id from centralbase.sys_access_well_control  where  access_status='1'  "))
74
+                        .setBody(simple("select well_id ,sgt_last_time  from centralbase.sys_access_well_control  where  access_status='1'  "))
79 75
                         .to("jdbc:centralbase")
80 76
                         .split(body()).process(exchange -> {
81 77
                             HashMap body = exchange.getIn().getBody(HashMap.class);
82 78
                             exchange.getIn().setHeader("well_id",body.get("well_id"));
79
+                            exchange.getIn().setHeader("sgt_last_time",body.get("sgt_last_time"));
83 80
                         })
84
-                        .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '${header.date}' and well_name='${header.well_id}' "))
81
+                        .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time >= '${header.sgt_last_time}' and well_name='${header.well_id}' "))
85 82
                         .to("jdbc:gtsj")
86 83
                         .process(exchange -> {
87 84
                             sendMsgRunTime = 0;
@@ -151,12 +148,14 @@ public class CamelJDBCCofRealTimeConfiguration  {
151 148
                             Double strokeLength = Double.valueOf(aRow.get("stroke").toString());
152 149
                             Double strokeFrequency = Double.valueOf(aRow.get("frequency").toString());
153 150
                             String sgt = aRow.get("sgt").toString();
151
+                            in.setHeader("sgt_last_time",prodDate);
154 152
                             sendDataToRocketMQ(wellName,wellId,prodDate,strokeLength,strokeFrequency,sgt);
155 153
                         })
156 154
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
157 155
                                 "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
158 156
                                 "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${body[dyna_create_time]}' )"))
159 157
                         .to("jdbc:centralbase")
158
+                        .setBody(simple("update centralbase.sys_access_well_control set sgt_last_time = '${header.sgt_last_time}' "))
160 159
                         .end();
161 160
 
162 161
                from("timer:mytimer-update-avg-mech_daily?period=3600000")