Browse Source

正常使用版本

gxt 3 years ago
parent
commit
47101f2695

+ 66 - 51
src/main/java/com/gct/tools/etlcamelhuge/controller/GtController.java

@@ -98,9 +98,7 @@ public class GtController {
98
     @GetMapping("/getGTSJ")
98
     @GetMapping("/getGTSJ")
99
     @ApiOperation(value = "从实时的机采数据生产的表中获取数据放入到 Runtime 表中")
99
     @ApiOperation(value = "从实时的机采数据生产的表中获取数据放入到 Runtime 表中")
100
     public JSONObject getGTSJ(@RequestBody GTBody gtBody){
100
     public JSONObject getGTSJ(@RequestBody GTBody gtBody){
101
-
102
         String startDate = gtBody.getStartDate();
101
         String startDate = gtBody.getStartDate();
103
-        String endDate = gtBody.getEndDate();
104
         JSONArray wellList = gtBody.getWellList();
102
         JSONArray wellList = gtBody.getWellList();
105
         JSONObject jsonObject = new JSONObject();
103
         JSONObject jsonObject = new JSONObject();
106
         jdbcTemplate = new JdbcTemplate(dataSourceOfGTSJ);
104
         jdbcTemplate = new JdbcTemplate(dataSourceOfGTSJ);
@@ -108,54 +106,29 @@ public class GtController {
108
         int pageSize = 5000;
106
         int pageSize = 5000;
109
         int sumData = 0;
107
         int sumData = 0;
110
         try {
108
         try {
111
-            for (int i = 0; i < wellList.size(); i++) {
112
-                String sql = String.format("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '%s' and well_name = '%s' offset %d limit %d ", startDate, wellList.get(i));
113
-                List<Map<String, Object>> list = jdbcTemplate.queryForList(sql);
114
-                for (Map<String, Object> map : list) {
115
-                    String prod_date = map.get("dyna_create_time").toString().split("\\+")[0];
116
-                    map.put("dyna_create_time", prod_date);
117
-                    if (map.get("displacement") != null && !map.get("displacement").equals("") && map.get("disp_load") != null && !map.get("disp_load").equals("")) {
118
-                        String[] displacements = map.get("displacement").toString().split(";");//10 四舍五入
119
-                        String[] disp_loads = map.get("disp_load").toString().split(";");
120
-                        Double susp_max_load = max(disp_loads);
121
-                        Double susp_min_load = min(disp_loads);
122
-                        String sgt = "";
123
-                        for (int y = 0; y < displacements.length; y++) {
124
-                            sgt = sgt + displacements[y] + "," + disp_loads[y] + ",";
125
-                        }
126
-                        String[] s = sgt.split(",");
127
-                        String w = "";
128
-                        for (int x = 0; x < s.length; x++) {
129
-                            w += new BigDecimal(Math.round(Double.parseDouble(s[x]) * 100)).stripTrailingZeros().toPlainString() + ",";
130
-                        }
131
-                        Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
132
-                        map.put("sgt", SGTUtil.encodeToString(doubles));
133
-                        map.put("susp_max_load",susp_max_load);
134
-                        map.put("susp_min_load",susp_min_load);
135
-                    }
136
-                    if (map.get("stroke") == null) map.put("stroke", "0.0");
137
-                    if (map.get("frequency") == null) map.put("frequency", "0.0");
138
-                    if (map.get("susp_max_load") == null) map.put("susp_max_load", "0.0");
139
-                    if (map.get("susp_min_load") == null) map.put("susp_min_load", "0.0");
140
-                    if (map.get("frequency") != null){
141
-                        BigDecimal bd=new BigDecimal(map.get("frequency").toString());
142
-                        double frequency=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
143
-                        map.put("frequency",frequency);
144
-                    }
145
-                    if (map.get("stroke") != null){
146
-                        double stroke1 = Double.parseDouble(map.get("stroke").toString());
147
-                        BigDecimal bd=new BigDecimal(stroke1);
148
-                        double stroke=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
149
-                        map.put("stroke",stroke);
150
-                    }
151
-                    jdbcTemplate.update("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
152
-                            "values (?,?,?,?,?,?,?)",map.get("well_name").toString(),map.get("dyna_create_time"),map.get("stroke"),map.get("frequency"),map.get("susp_max_load"),map.get("susp_min_load"),map.get("sgt"));
153
-                }
154
-                sumData += list.size();
155
-                if (list.size()< pageSize){
156
-                    break;
109
+            if (wellList.isEmpty() || wellList.size() ==0 ){
110
+                while (true){
111
+                    String sql = String.format("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '%s'  limit %d offset %d ", startDate,curPage,pageSize);
112
+                    List<Map<String, Object>> list = jdbcTemplate.queryForList(sql);
113
+                    sql = String.format("select count(1) from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '%s' ", startDate );
114
+                    Integer count = jdbcTemplate.queryForObject(sql, Integer.class);
115
+                    int pageNum = (count % pageSize ==0 ) ?(count / pageSize) : (count/pageSize)+1;
116
+                    insertDataToRunTime(list);
117
+                    sumData += list.size();
118
+                    curPage ++;
119
+                    if (curPage>pageNum) break;
157
                 }
120
                 }
158
-                curPage ++ ;
121
+            }else {
122
+              for (int i = 0; i < wellList.size(); i++) {
123
+                  String sql = String.format("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '%s' and well_name = '%s'  limit %d offset %d ", startDate, wellList.get(i),curPage,pageSize);
124
+                  List<Map<String, Object>> list = jdbcTemplate.queryForList(sql);
125
+                  insertDataToRunTime(list);
126
+                  sumData += list.size();
127
+                  if (list.size()< pageSize){
128
+                      break;
129
+                  }
130
+                  curPage ++ ;
131
+              }
159
             }
132
             }
160
         }catch (Exception e){
133
         }catch (Exception e){
161
             jsonObject.put("error",e.getMessage());
134
             jsonObject.put("error",e.getMessage());
@@ -163,8 +136,50 @@ public class GtController {
163
             jsonObject.put("sumData",sumData);
136
             jsonObject.put("sumData",sumData);
164
         }
137
         }
165
         return    jsonObject;
138
         return    jsonObject;
166
-         }
167
-
139
+}
140
+    public void insertDataToRunTime(List<Map<String, Object>> list){
141
+        for (Map<String, Object> map : list) {
142
+            String prod_date = map.get("dyna_create_time").toString().split("\\+")[0];
143
+            map.put("dyna_create_time", prod_date);
144
+            if (map.get("displacement") != null && !map.get("displacement").equals("") && map.get("disp_load") != null && !map.get("disp_load").equals("")) {
145
+                String[] displacements = map.get("displacement").toString().split(";");//10 四舍五入
146
+                String[] disp_loads = map.get("disp_load").toString().split(";");
147
+                Double susp_max_load = max(disp_loads);
148
+                Double susp_min_load = min(disp_loads);
149
+                String sgt = "";
150
+                for (int y = 0; y < displacements.length; y++) {
151
+                    sgt = sgt + displacements[y] + "," + disp_loads[y] + ",";
152
+                }
153
+                String[] s = sgt.split(",");
154
+                String w = "";
155
+                for (int x = 0; x < s.length; x++) {
156
+                    w += new BigDecimal(Math.round(Double.parseDouble(s[x]) * 100)).stripTrailingZeros().toPlainString() + ",";
157
+                }
158
+                Double[][] doubles = SGTUtil.encodeToDoubleArray(w);
159
+                map.put("sgt", SGTUtil.encodeToString(doubles));
160
+                map.put("susp_max_load",susp_max_load);
161
+                map.put("susp_min_load",susp_min_load);
162
+            }
163
+            if (map.get("stroke") == null) map.put("stroke", "0.0");
164
+            if (map.get("frequency") == null) map.put("frequency", "0.0");
165
+            if (map.get("susp_max_load") == null) map.put("susp_max_load", "0.0");
166
+            if (map.get("susp_min_load") == null) map.put("susp_min_load", "0.0");
167
+            if (map.get("frequency") != null){
168
+                BigDecimal bd=new BigDecimal(map.get("frequency").toString());
169
+                double frequency=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
170
+                map.put("frequency",frequency);
171
+            }
172
+            if (map.get("stroke") != null){
173
+                double stroke1 = Double.parseDouble(map.get("stroke").toString());
174
+                BigDecimal bd=new BigDecimal(stroke1);
175
+                double stroke=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
176
+                map.put("stroke",stroke);
177
+            }
178
+            jdbcTemplate = new JdbcTemplate(dataSource);
179
+            jdbcTemplate.update("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
180
+                    "values (?,?,?,?,?,?,?)",map.get("well_name").toString(),map.get("dyna_create_time"),map.get("stroke"),map.get("frequency"),map.get("susp_max_load"),map.get("susp_min_load"),map.get("sgt"));
181
+        }
182
+   }
168
 
183
 
169
     public Double min(String[] strings){
184
     public Double min(String[] strings){
170
         double[] doubles = new double[strings.length];
185
         double[] doubles = new double[strings.length];

+ 3 - 10
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCCofRealTimeConfiguration.java

@@ -97,16 +97,14 @@ public class CamelJDBCCofRealTimeConfiguration  {
97
                         .end();*/
97
                         .end();*/
98
 
98
 
99
                 from("timer:mytimer-insert-runtime?period=1800000")
99
                 from("timer:mytimer-insert-runtime?period=1800000")
100
-                        .routeId("insert-runtimeAndSendToMQ")
100
+                        .routeId("insert-runtime")
101
                         .setBody(simple("select max(prod_date)  from centralbase.cb_temp_well_mech_runtime "))
101
                         .setBody(simple("select max(prod_date)  from centralbase.cb_temp_well_mech_runtime "))
102
                         .to("jdbc:centralbase")
102
                         .to("jdbc:centralbase")
103
+                        .log("max(prod_date) ${body}")
103
                         .split(body())
104
                         .split(body())
104
                         .setHeader("date", simple("${body[max]}"))
105
                         .setHeader("date", simple("${body[max]}"))
105
                         .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '${header.date}' "))
106
                         .setBody(simple("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where   dyna_create_time > '${header.date}' "))
106
-                        .doTry()
107
                         .to("jdbc:gtsj")
107
                         .to("jdbc:gtsj")
108
-                        .doCatch(Exception.class)
109
-                        .log("${header.date}" + " routeId:insert-runtimeAndSendToMQ -> select runTime data failed")
110
                         .process(exchange -> {
108
                         .process(exchange -> {
111
                             sendMsgRunTime = 0;
109
                             sendMsgRunTime = 0;
112
                         })
110
                         })
@@ -114,6 +112,7 @@ public class CamelJDBCCofRealTimeConfiguration  {
114
                     Message in = exchange.getIn();
112
                     Message in = exchange.getIn();
115
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
113
                     HashMap<String, Object> aRow = in.getBody(HashMap.class);
116
                     String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
114
                     String prod_date = aRow.get("dyna_create_time").toString().split("\\+")[0];
115
+                    System.out.println("prod_date---"+prod_date);
117
                     aRow.put("dyna_create_time", prod_date);
116
                     aRow.put("dyna_create_time", prod_date);
118
                     if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
117
                     if (aRow.get("displacement") != null && !aRow.get("displacement").equals("") && aRow.get("disp_load") != null && !aRow.get("disp_load").equals("")) {
119
                         String[] displacements = aRow.get("displacement").toString().split(";");//10 四舍五入
118
                         String[] displacements = aRow.get("displacement").toString().split(";");//10 四舍五入
@@ -156,10 +155,7 @@ public class CamelJDBCCofRealTimeConfiguration  {
156
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
155
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
157
                                 "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
156
                                 "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
158
                                 "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${body[dyna_create_time]}' )"))
157
                                 "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${body[dyna_create_time]}' )"))
159
-                        .doTry()
160
                         .to("jdbc:centralbase")
158
                         .to("jdbc:centralbase")
161
-                        .doCatch(Exception.class)
162
-                        .log("${header.date}"+" routeId:insert-runtimeAndSendToMQ ->  centralbase.cb_temp_well_mech_runtime insert data failed ${body}")
163
                         .end();
159
                         .end();
164
 
160
 
165
                 from("timer:mytimer-SendToMQ?period=180000")
161
                 from("timer:mytimer-SendToMQ?period=180000")
@@ -178,9 +174,6 @@ public class CamelJDBCCofRealTimeConfiguration  {
178
                             Double strokeLength = Double.valueOf(aRow.get("stroke_length").toString());
174
                             Double strokeLength = Double.valueOf(aRow.get("stroke_length").toString());
179
                             Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString());
175
                             Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString());
180
                             String sgt = aRow.get("sgt").toString();
176
                             String sgt = aRow.get("sgt").toString();
181
-                            if (sgt == null || sgt.length() ==0){
182
-                                sgt = "0,0";
183
-                            }
184
                             DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
177
                             DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
185
                             producer.send((MessageBody) diagnoseMsg);
178
                             producer.send((MessageBody) diagnoseMsg);
186
                         })
179
                         })

+ 1 - 0
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -300,6 +300,7 @@ public class CamelJDBCConfiguration {
300
                         .to("jdbc:centralbase")
300
                         .to("jdbc:centralbase")
301
                         .doCatch(Exception.class)
301
                         .doCatch(Exception.class)
302
                         .log("${header.date}"+" routeId:insert-volDaily ->  centralbase.cb_pc_pro_wellbore_vol_daily insert data failed")
302
                         .log("${header.date}"+" routeId:insert-volDaily ->  centralbase.cb_pc_pro_wellbore_vol_daily insert data failed")
303
+                        .endDoTry()
303
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  water_prod_daily =null where water_prod_daily = -1 and prod_date = '${header.date}' "))
304
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  water_prod_daily =null where water_prod_daily = -1 and prod_date = '${header.date}' "))
304
                         .to("jdbc:centralbase")
305
                         .to("jdbc:centralbase")
305
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  gas_oil_ratio =null where gas_oil_ratio = -1 and prod_date = '${header.date}' "))
306
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_vol_daily set  gas_oil_ratio =null where gas_oil_ratio = -1 and prod_date = '${header.date}' "))