浏览代码

新的稳定版

gxt 3 年之前
父节点
当前提交
4394ead81b

+ 12 - 12
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCCofRealTimeConfiguration.java

@@ -4,12 +4,10 @@ import com.gct.common.util.SGTUtil;
4
 import com.gct.tools.etlcamelhuge.MQ.MessageBody;
4
 import com.gct.tools.etlcamelhuge.MQ.MessageBody;
5
 import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
5
 import com.gct.tools.etlcamelhuge.MQ.MessageProducer;
6
 import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
6
 import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg;
7
-import lombok.Data;
8
 import org.apache.camel.Message;
7
 import org.apache.camel.Message;
9
 import org.apache.camel.builder.RouteBuilder;
8
 import org.apache.camel.builder.RouteBuilder;
10
 import org.springframework.context.annotation.Bean;
9
 import org.springframework.context.annotation.Bean;
11
 import org.springframework.context.annotation.Configuration;
10
 import org.springframework.context.annotation.Configuration;
12
-import org.springframework.scheduling.annotation.Async;
13
 
11
 
14
 import javax.annotation.Resource;
12
 import javax.annotation.Resource;
15
 import java.math.BigDecimal;
13
 import java.math.BigDecimal;
@@ -17,8 +15,6 @@ import java.time.LocalDateTime;
17
 import java.time.format.DateTimeFormatter;
15
 import java.time.format.DateTimeFormatter;
18
 import java.util.Arrays;
16
 import java.util.Arrays;
19
 import java.util.HashMap;
17
 import java.util.HashMap;
20
-import java.util.Map;
21
-import java.util.SortedSet;
22
 
18
 
23
 /**
19
 /**
24
  * class name: CamelJDBCCofRealTimeConfiguration.java
20
  * class name: CamelJDBCCofRealTimeConfiguration.java
@@ -58,18 +54,15 @@ public class CamelJDBCCofRealTimeConfiguration  {
58
      * 异步发送 RunTime 消息到MQ中
54
      * 异步发送 RunTime 消息到MQ中
59
      * */
55
      * */
60
    // @Async
56
    // @Async
61
-    public long sendDataToRocketMQ(String wellName,String wellId,String prodDate,String stroke_length ,String stroke_frequency,String sgt){
57
+    public void sendDataToRocketMQ(String wellName, String wellId, String prodDate, Double stroke_length , Double stroke_frequency, String sgt){
62
         String orgId = "0";
58
         String orgId = "0";
63
         prodDate = prodDate.substring(0,19);
59
         prodDate = prodDate.substring(0,19);
64
-        Double strokeLength = Double.valueOf(stroke_length);
65
-        Double strokeFrequency = Double.valueOf(stroke_frequency);
66
         if (sgt == null || sgt.length() ==0){
60
         if (sgt == null || sgt.length() ==0){
67
             sgt = "0,0";
61
             sgt = "0,0";
68
         }
62
         }
69
-        DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency);
63
+        DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), stroke_length, stroke_frequency);
70
         sendMsgRunTime++;
64
         sendMsgRunTime++;
71
         producer.send(diagnoseMsg);
65
         producer.send(diagnoseMsg);
72
-        return sendMsgRunTime;
73
     }
66
     }
74
 
67
 
75
     @Resource(name = "diagnoseMessageProducer")
68
     @Resource(name = "diagnoseMessageProducer")
@@ -149,7 +142,13 @@ public class CamelJDBCCofRealTimeConfiguration  {
149
                     }
142
                     }
150
                            // long count = sendDataToRocketMQ(aRow.get("well_name").toString(), aRow.get("well_name").toString(), aRow.get("dyna_create_time").toString(), aRow.get("stroke").toString(), aRow.get("frequency").toString(), aRow.get("sgt").toString());
143
                            // long count = sendDataToRocketMQ(aRow.get("well_name").toString(), aRow.get("well_name").toString(), aRow.get("dyna_create_time").toString(), aRow.get("stroke").toString(), aRow.get("frequency").toString(), aRow.get("sgt").toString());
151
                            // System.out.println("消息发送时间为:   "+LocalDateTime.now()+"   发送的消息数量为: "+count + "   数据时间为:" + aRow.get("dyna_create_time"));
144
                            // System.out.println("消息发送时间为:   "+LocalDateTime.now()+"   发送的消息数量为: "+count + "   数据时间为:" + aRow.get("dyna_create_time"));
152
-
145
+                            String wellName =aRow.get("well_name").toString();
146
+                            String wellId =aRow.get("well_name").toString();
147
+                            String prodDate = aRow.get("dyna_create_time").toString().substring(0,19);
148
+                            Double strokeLength = Double.valueOf(aRow.get("stroke").toString());
149
+                            Double strokeFrequency = Double.valueOf(aRow.get("frequency").toString());
150
+                            String sgt = aRow.get("sgt").toString();
151
+                            sendDataToRocketMQ(wellName,wellId,prodDate,strokeLength,strokeFrequency,sgt);
153
                         })
152
                         })
154
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
153
                         .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
155
                                 "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
154
                                 "select '${body[well_name]}','${body[dyna_create_time]}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}' " +
@@ -157,7 +156,7 @@ public class CamelJDBCCofRealTimeConfiguration  {
157
                         .to("jdbc:centralbase")
156
                         .to("jdbc:centralbase")
158
                         .end();
157
                         .end();
159
 
158
 
160
-                from("timer:mytimer-SendToMQ?period=180000")
159
+                /*from("timer:mytimer-SendToMQ?period=180000")
161
                         .routeId("SendToMQ")
160
                         .routeId("SendToMQ")
162
                         .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) "))
161
                         .setBody(simple("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so where ti.well_id = so.well_id and ti.prod_date =(select max(prod_date) from centralbase.cb_temp_well_mech_runtime) "))
163
                         .to("jdbc:centralbase")
162
                         .to("jdbc:centralbase")
@@ -178,7 +177,8 @@ public class CamelJDBCCofRealTimeConfiguration  {
178
                         })
177
                         })
179
                         .doCatch(Exception.class)
178
                         .doCatch(Exception.class)
180
                         .log("${header.date}"+" rocketMQ send data failed")
179
                         .log("${header.date}"+" rocketMQ send data failed")
181
-                        .end();
180
+                        .endDoTry()
181
+                        .end();*/
182
 
182
 
183
                from("timer:mytimer-update-avg-mech_daily?period=3600000")
183
                from("timer:mytimer-update-avg-mech_daily?period=3600000")
184
                         .routeId("update-avg-mech_daily")
184
                         .routeId("update-avg-mech_daily")