Przeglądaj źródła

bj,dym 重复导入不是最新问题解决-2

gxt 3 lat temu
rodzic
commit
0a4205c985

+ 5 - 5
src/main/java/com/gct/tools/etlcamelhuge/controller/BaseDataController.java

@@ -58,7 +58,8 @@ public class BaseDataController {
58 58
                 sql = "insert into centralbase.cb_pc_pro_wellbore_status_daily(well_id,prod_date,oil_prod_method,oil_nozzle,back_pres,tubing_pres,casing_pres,pump_depth) " +
59 59
                         "values (?,?,?,?,?,?,?,?)";
60 60
                 insertCount = insertBaseDataSourceOfStatusDaily(sql, map);
61
-                List<Map<String, Object>> dymIsNotNUllList = selectA2DataOfNotIsNullDYM();
61
+                sql = "select jh , rq , dym from centralbase.cb_pc_pro_wellbore_status_daily where  (jh,rq) in (SELECT  jh,max(rq) rq  FROM DBA01 WHERE dym is not null and to_date('"+date+"','yyyy-MM-dd') group by jh)";
62
+                List<Map<String, Object>> dymIsNotNUllList = selectA2DataOfNotIsNullDYMOrBJ(sql);
62 63
                 for (Map<String, Object> stringObjectMap : dymIsNotNUllList) {
63 64
                     sql = "update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '" + stringObjectMap.get("DYM") + "' where well_id = '" + stringObjectMap.get("JH") + "' and prod_date::date  = '" + stringObjectMap.get("RQ") + "' ";
64 65
                     updateDYMCount = updateBaseDataSourceOfStatusDaily(sql) + updateDYMCount;
@@ -77,8 +78,8 @@ public class BaseDataController {
77 78
                     sql = " update centralbase.cb_pc_pro_wellbore_status_daily set submergence_depth = '" + stringObjectMap.get("submergence_depth") + "' where well_id = '" + stringObjectMap.get("well_id") + "' and prod_date  = '" + stringObjectMap.get("prod_date") + "' ";
78 79
                     updateCMDCount = updateBaseDataSourceOfStatusDaily(sql) + updateCMDCount;
79 80
                 }
80
-                sql = " SELECT  distinct jh,max(rq),bj FROM DBA01 WHERE dym is not null group by jh,dym ";
81
-                List<Map<String, Object>> oilNozzleList = selectBaseDataSourceStatusDaily(sql);
81
+                sql = "select jh , rq , bj from centralbase.cb_pc_pro_wellbore_status_daily where  (jh,rq) in (SELECT  jh,max(rq) rq  FROM DBA01 WHERE bj is not null and to_date('"+date+"','yyyy-MM-dd') group by jh)";
82
+                List<Map<String, Object>> oilNozzleList = selectA2DataOfNotIsNullDYMOrBJ(sql);
82 83
                 for (Map<String, Object> stringObjectMap : oilNozzleList) {
83 84
                     sql = " update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '" + stringObjectMap.get("BJ") + "' where well_id ='" + stringObjectMap.get("JH") + "' and prod_date='" + stringObjectMap.get("RQ") + "' ";
84 85
                     updateOLiNozzleCount = updateBaseDataSourceOfStatusDaily(sql) + updateOLiNozzleCount;
@@ -104,9 +105,8 @@ public class BaseDataController {
104 105
         return jdbcTemplate.update(sql, map.get("JH"), map.get("RQ"), map.get("CYFS"), map.get("YZ"), map.get("HYSX"), map.get("YYSX"), map.get("BS"), map.get("DYM"));
105 106
     }
106 107
 
107
-    public List<Map<String, Object>> selectA2DataOfNotIsNullDYM() {
108
+    public List<Map<String, Object>> selectA2DataOfNotIsNullDYMOrBJ(String sql) {
108 109
         jdbcTemplate = new JdbcTemplate(oracleDataSource);
109
-        String sql = "SELECT  distinct jh,max(rq),dym FROM DBA01 WHERE dym is not null group by jh,dym";
110 110
         return jdbcTemplate.queryForList(sql);
111 111
     }
112 112
 

+ 1 - 1
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCCofRealTimeConfiguration.java

@@ -161,7 +161,7 @@ public class CamelJDBCCofRealTimeConfiguration  {
161 161
                         .log("${header.date}"+" routeId:insert-runtimeAndSendToMQ ->  centralbase.cb_temp_well_mech_runtime insert data failed ${body}")
162 162
                         .end();
163 163
 
164
-               from("timer:mytimer-update-avg-mech_daily?period=1800000")
164
+               from("timer:mytimer-update-avg-mech_daily?period=3600000")
165 165
                         .routeId("update-avg-mech_daily")
166 166
                         .setHeader("date", constant(getDate()))
167 167
                         .setBody(simple("select well_id,avg(stroke_length) stroke_length  ,avg(stroke_frequency) stroke_frequency from centralbase.cb_temp_well_mech_runtime where prod_date::date='${header.date}' group by well_id"))

+ 9 - 9
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -172,35 +172,35 @@ public class CamelJDBCConfiguration {
172 172
                         .end();
173 173
 
174 174
 
175
-                from("timer:mytimer-update-statusDaily-DYM?period=1800000")
175
+                from("timer:mytimer-update-statusDaily-DYM?period=3600000")
176 176
                         .routeId("update-statusDaily-DYM")
177
-                        .setHeader("date", constant(getDate() + " 00:00:00"))
178
-                        .setBody(simple("SELECT  distinct jh,max(rq),dym FROM DBA01 WHERE dym is not null group by jh,dym"))
177
+                        .setHeader("date", constant(getDate()))
178
+                        .setBody(simple("select jh , rq , dym from centralbase.cb_pc_pro_wellbore_status_daily where  (jh,rq) in (SELECT  jh,max(rq) rq  FROM DBA01 WHERE dym is not null and to_date('${header.date}','yyyy-MM-dd') group by jh)"))
179 179
                         .to("jdbc:oracle")
180 180
                         .split(body())
181
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}',dym_date ='${body[RQ]}'  where well_id = '${body[JH]}' and prod_date::date  = '${header.date}' and dym_date < '${body[RQ]}' "))
181
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set start_pump_liq_level = '${body[DYM]}'  where well_id = '${body[JH]}' and prod_date::date  = '${header.date}' "))
182 182
                         .doTry()
183 183
                         .to("jdbc:centralbase")
184 184
                         .doCatch(Exception.class)
185 185
                         .log("${header.date}"+" routeId:update-statusDaily-DYM ->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
186 186
                         .end();
187 187
                 //因为数据库有可能会查出多条数据,现在的做法时,在数据库中增加两个个字段,用来记录BJ和DYM 的更新时间 ,只有获取到的时间大于数据库中存储的时间时,才会更新,并且该字段也会更新
188
-                from("timer:mytimer-update-statusDaily-BJ?period=1800000")
188
+                from("timer:mytimer-update-statusDaily-BJ?period=3600000")
189 189
                         .routeId("update-statusDaily-BJ")
190
-                        .setHeader("date", constant(getDate() + " 00:00:00"))
191
-                        .setBody(simple("SELECT  distinct jh,max(rq),bj FROM DBA01 WHERE bj is not null group by jh,bj"))
190
+                        .setHeader("date", constant(getDate()))
191
+                        .setBody(simple("select jh , rq , bj from centralbase.cb_pc_pro_wellbore_status_daily where  (jh,rq) in (SELECT  jh,max(rq) rq  FROM DBA01 WHERE bj is not null and to_date('${header.date}','yyyy-MM-dd') group by jh)"))
192 192
                         .to("jdbc:oracle")
193 193
                         .split(body()).process(exchange -> {
194 194
                             HashMap body = exchange.getIn().getBody(HashMap.class);
195 195
                         })
196
-                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[BJ]}',bj_date = '${body[RQ]}' where well_id = '${body[JH]}' and prod_date::date  = '${header.date}' and bj_date < '${body[RQ]}'  "))
196
+                        .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set oil_nozzle = '${body[BJ]}' where well_id = '${body[JH]}' and prod_date::date  = '${header.date}'  "))
197 197
                         .doTry()
198 198
                         .to("jdbc:centralbase")
199 199
                         .doCatch(Exception.class)
200 200
                         .log("${header.date}"+" routeId:update-statusDaily-BJ ->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
201 201
                         .end();
202 202
 
203
-                from("timer:mytimer-update-statusDaily-submergenceDepth?period=1800000")
203
+                from("timer:mytimer-update-statusDaily-submergenceDepth?period=3600000")
204 204
                         .routeId("update-statusDaily-submergenceDepth")
205 205
                         .setHeader("date", constant(getDate() + " 00:00:00"))
206 206
                         .setBody(simple("select well_id,prod_date,start_pump_liq_level,pump_depth from centralbase.cb_pc_pro_wellbore_status_daily where prod_date = '${header.date}' "))

+ 1 - 1
src/main/resources/application.yml

@@ -91,7 +91,7 @@ spring:
91 91
         ## 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
92 92
         max-lifetime: 1800000
93 93
         ## 数据库连接超时时间,默认30秒,即30000
94
-        connection-timeout: 30000
94
+        connection-timeout: 60000
95 95
 
96 96
 management:
97 97
   info: