Browse Source

修改sendDataToMQ logic and add select a2 and gtsj 接口

gxt 3 years ago
parent
commit
df02c29f20

+ 66 - 2
src/main/java/com/gct/tools/etlcamelhuge/controller/BaseDataController.java

@@ -14,8 +14,10 @@ import org.springframework.web.bind.annotation.RestController;
14 14
 import javax.annotation.Resource;
15 15
 import javax.sql.DataSource;
16 16
 import java.math.BigDecimal;
17
+import java.util.ArrayList;
17 18
 import java.util.List;
18 19
 import java.util.Map;
20
+import java.util.stream.Collectors;
19 21
 
20 22
 /**
21 23
  * class name: BaseDataController
@@ -36,8 +38,11 @@ public class BaseDataController {
36 38
     @Resource(name = "centralbase")
37 39
     DataSource baseDataSource;
38 40
 
41
+    @Resource(name = "gtsj")
42
+    DataSource gtsjDataSource;
43
+
39 44
     @PostMapping("/saveDataToStatusDaily")
40
-    @ApiOperation("从 A2 获取 数据插入到 centrlBase-StatusDaily 中")
45
+    @ApiOperation("从 A2 获取 数据插入到 centrlBase-StatusDaily 中,只需要传入查询时间")
41 46
     public JSONObject saveDataToStatusDaily(@RequestBody BaseDataBody baseDataBody) {
42 47
         JSONObject jsonObject = new JSONObject();
43 48
         int insertCount = 0;
@@ -122,7 +127,7 @@ public class BaseDataController {
122 127
 
123 128
 
124 129
     @PostMapping("/saveDataToVolDaily")
125
-    @ApiOperation("从 A2 获取 数据插入到 centrlBase-VolDaily 中")
130
+    @ApiOperation("从 A2 获取 数据插入到 centrlBase-VolDaily 中,只需要传入查询时间")
126 131
     public JSONObject saveVolDaily(@RequestBody BaseDataBody baseDataBody) {
127 132
         JSONObject jsonObject = new JSONObject();
128 133
         int insertCount = 0;
@@ -198,4 +203,63 @@ public class BaseDataController {
198 203
         jdbcTemplate = new JdbcTemplate(baseDataSource);
199 204
         return jdbcTemplate.update(sql);
200 205
     }
206
+
207
+    @PostMapping("/getNeedRunTimeData")
208
+    @ApiOperation("查询天安的数据库 只需要传过来 需要查询的sql 就行")
209
+    public JSONObject getNeedRunTimeData(@RequestBody BaseDataBody baseDataBody){
210
+        JSONObject jsonObject = new JSONObject();
211
+        jdbcTemplate = new JdbcTemplate(gtsjDataSource);
212
+        try{
213
+            List<Map<String, Object>> list = jdbcTemplate.queryForList(baseDataBody.getSql());
214
+            jsonObject.put("RunTimedata",list);
215
+        }catch (Exception e){
216
+            jsonObject.put("error",e.getMessage());
217
+        }
218
+         return jsonObject;
219
+    }
220
+
221
+    @PostMapping("/getNeedA2Data")
222
+    @ApiOperation("查询A2数据库 只需要传过来 需要查询的sql 就行")
223
+    public JSONObject getNeedA2Data(@RequestBody BaseDataBody baseDataBody){
224
+        JSONObject jsonObject = new JSONObject();
225
+        jdbcTemplate = new JdbcTemplate(oracleDataSource);
226
+        try{
227
+            List<Map<String, Object>> list = jdbcTemplate.queryForList(baseDataBody.getSql());
228
+            jsonObject.put("A2data",list);
229
+        }catch (Exception e){
230
+            jsonObject.put("error",e.getMessage());
231
+        }
232
+        return jsonObject;
233
+    }
234
+
235
+    @PostMapping("/getA2WellSorceData")
236
+    @ApiOperation("查询A2数据库 并且和 wellSource 做对比 只填写需要查询A2Data 的时间就行")
237
+    public JSONObject getA2WellSorceData(@RequestBody BaseDataBody baseDataBody){
238
+        JSONObject jsonObject = new JSONObject();
239
+        jdbcTemplate = new JdbcTemplate(oracleDataSource);
240
+        try{
241
+            String sql = "select  distinct jh  from DBA01 where rq  = to_date('"+baseDataBody.getDate()+"','yyyy-MM-dd') and qyrq is not null ";
242
+            List<Map<String, Object>>  A2DataList = jdbcTemplate.queryForList(sql);
243
+            List<Map<String, Object>>  wellSourceList = getWellSource();
244
+            Object collect = A2DataList.stream().filter(item -> !wellSourceList.contains(item)).collect(Collectors.toList());
245
+            Object collect1 = wellSourceList.stream().filter(item -> !A2DataList.contains(item)).collect(Collectors.toList());
246
+            jsonObject.put("wellSource 中不存在 A2 的井号为",collect);
247
+            jsonObject.put("A2 中不存在 wellSource 的井号为",collect1);
248
+        }catch (Exception e){
249
+            jsonObject.put("error",e.getMessage());
250
+        }
251
+        return jsonObject;
252
+    }
253
+
254
+    public List<Map<String, Object>>  getWellSource(){
255
+        List<Map<String, Object>>  list = new ArrayList<>();
256
+        try{
257
+            jdbcTemplate = new JdbcTemplate(baseDataSource);
258
+            String sql = " select distinct well_id from centralbase.cb_cd_well_source ";
259
+            list = jdbcTemplate.queryForList(sql);
260
+        }catch (Exception e){
261
+            e.printStackTrace();
262
+        }
263
+        return list;
264
+    }
201 265
 }

+ 5 - 2
src/main/java/com/gct/tools/etlcamelhuge/entity/BaseDataBody.java

@@ -18,8 +18,11 @@ public class BaseDataBody {
18 18
 
19 19
     @ApiModelProperty(value = "时间")
20 20
     public String date;
21
+
22
+    @ApiModelProperty(value = "sql语句")
23
+    public String sql;
21 24
     /*@ApiModelProperty(value = "结束时间")
22 25
     public String endDate;*/
23
-    @ApiModelProperty(value = "井号集合")
24
-    public JSONArray wellList;
26
+    /*@ApiModelProperty(value = "井号集合")
27
+    public JSONArray wellList;*/
25 28
 }

+ 31 - 4
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCCofRealTimeConfiguration.java

@@ -1,6 +1,7 @@
1 1
 package com.gct.tools.etlcamelhuge.routeconfig;
2 2
 
3 3
 import com.gct.common.util.SGTUtil;
4
+import com.gct.tools.etlcamelhuge.MQ.MessageBody;
4 5
 import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
5 6
 import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
6 7
 import lombok.Data;
@@ -56,7 +57,7 @@ public class CamelJDBCCofRealTimeConfiguration  {
56 57
     /***
57 58
      * 异步发送 RunTime 消息到MQ中
58 59
      * */
59
-    @Async
60
+   // @Async
60 61
     public long sendDataToRocketMQ(String wellName,String wellId,String prodDate,String stroke_length ,String stroke_frequency,String sgt){
61 62
         String orgId = "0";
62 63
         prodDate = prodDate.substring(0,19);
@@ -95,7 +96,7 @@ public class CamelJDBCCofRealTimeConfiguration  {
95 96
                         .log("${header.date}"+" routeId:update-runTime -> centralbase.cb_temp_well_mech_runtime update data failed")
96 97
                         .end();*/
97 98
 
98
-                from("timer:mytimer-insert-runtimeAndSendToMQ?period=1800000")
99
+                from("timer:mytimer-insert-runtime?period=1800000")
99 100
                         .routeId("insert-runtimeAndSendToMQ")
100 101
                         .setBody(simple("select max(prod_date)  from centralbase.cb_temp_well_mech_runtime "))
101 102
                         .to("jdbc:centralbase")
@@ -148,8 +149,8 @@ public class CamelJDBCCofRealTimeConfiguration  {
148 149
                         double stroke=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue();
149 150
                         aRow.put("stroke",stroke);
150 151
                     }
151
-                            long count = sendDataToRocketMQ(aRow.get("well_name").toString(), aRow.get("well_name").toString(), aRow.get("dyna_create_time").toString(), aRow.get("stroke").toString(), aRow.get("frequency").toString(), aRow.get("sgt").toString());
152
-                            System.out.println("消息发送时间为:   "+LocalDateTime.now()+"   发送的消息数量为: "+count + "   数据时间为:" + aRow.get("dyna_create_time"));
152
+                           // long count = sendDataToRocketMQ(aRow.get("well_name").toString(), aRow.get("well_name").toString(), aRow.get("dyna_create_time").toString(), aRow.get("stroke").toString(), aRow.get("frequency").toString(), aRow.get("sgt").toString());
153
+                           // System.out.println("消息发送时间为:   "+LocalDateTime.now()+"   发送的消息数量为: "+count + "   数据时间为:" + aRow.get("dyna_create_time"));
153 154
 
154 155
                         })
155 156
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
@@ -161,6 +162,32 @@ public class CamelJDBCCofRealTimeConfiguration  {
161 162
                         .log("${header.date}"+" routeId:insert-runtimeAndSendToMQ ->  centralbase.cb_temp_well_mech_runtime insert data failed ${body}")
162 163
                         .end();
163 164
 
165
+                from("timer:mytimer-SendToMQ?period=180000")
166
+                        .routeId("SendToMQ")
167
+                        .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) "))
168
+                        .to("jdbc:centralbase")
169
+                        .split(body())
170
+                        .doTry()
171
+                        .process(exchange -> {
172
+                            Message in = exchange.getIn();
173
+                            HashMap<String, Object> aRow = in.getBody(HashMap.class);
174
+                            String wellName =aRow.get("well_common_name").toString();
175
+                            String wellId =aRow.get("well_id").toString();
176
+                            String orgId = aRow.get("org_id").toString();
177
+                            String prodDate = aRow.get("prod_date").toString().substring(0,19);
178
+                            Double strokeLength = Double.valueOf(aRow.get("stroke_length").toString());
179
+                            Double strokeFrequency = Double.valueOf(aRow.get("stroke_frequency").toString());
180
+                            String sgt = aRow.get("sgt").toString();
181
+                            if (sgt == null || sgt.length() ==0){
182
+                                sgt = "0,0";
183
+                            }
184
+                            DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
185
+                            producer.send((MessageBody) diagnoseMsg);
186
+                        })
187
+                        .doCatch(Exception.class)
188
+                        .log("${header.date}"+" rocketMQ send data failed")
189
+                        .end();
190
+
164 191
                from("timer:mytimer-update-avg-mech_daily?period=3600000")
165 192
                         .routeId("update-avg-mech_daily")
166 193
                         .setHeader("date", constant(getDate()))

+ 1 - 0
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -148,6 +148,7 @@ public class CamelJDBCConfiguration {
148 148
                         .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization update data failed")
149 149
                         .end();*/
150 150
 
151
+
151 152
                 from("timer:mytimer-insert-statusDaily?period=3600000")
152 153
                         .routeId("insert-statusDaily")
153 154
                         .setHeader("date", constant(getDate()))