瀏覽代碼

接入平衡率 以及 动液面修复

lijian 3 年之前
父節點
當前提交
01a4b5f8bf

+ 37 - 8
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -268,7 +268,7 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
268 268
             //全部执行完成的大概时间在30-40分钟
269 269
             @Override
270 270
             public void configure() throws Exception {
271
-                RouteDefinition OrgAndWellSource= (RouteDefinition) from("timer:insert-OrgAndWellSource?period=86400000")
271
+             /*   RouteDefinition OrgAndWellSource= (RouteDefinition) from("timer:insert-OrgAndWellSource?period=86400000")
272 272
                         .routeId("insert-OrgAndWellSource")
273 273
                         .setHeader("date", constant(getDate()))
274 274
                         .process(exchange -> {
@@ -722,11 +722,6 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
722 722
 
723 723
                     }
724 724
                     })
725
-                        .log("${body[JH]}")
726
-                        .log("${body[SCJBRQ]}")
727
-                        .log("${body[JBYY]}")
728
-                        .log("${body[ZQ]}")
729
-                        .log("${header.date}")
730 725
                         .doTry()
731 726
                         .setBody(simple("insert into centralbase.cb_cd_checkpump_source(well_common_name,check_pump_date,reason,period,sys_date) " +
732 727
                                         "select '${body[JH]}','${body[SCJBRQ]}','${body[JBYY]}','${body[ZQ]}','${header.date}' " +
@@ -788,10 +783,10 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
788 783
                         })
789 784
                         .doCatch(Exception.class)
790 785
                         .log("${header.date}"+" rocketMQ send data faild")
791
-                        .end();
786
+                        .end();*/
792 787
 
793 788
                 //稀稠油信息
794
-                 from("timer:mytimer9?period=3600000")
789
+                 /* from("timer:mytimer9?period=3600000")
795 790
                         .routeId("oracle-9")
796 791
                         .setBody(simple("select max(prod_date) from centralbase.cb_temp_well_mech_runtime "))
797 792
                         .to("jdbc:centralbase")
@@ -821,6 +816,40 @@ public class CamelJDBCConfiguration /*extends RouteBuilder */ {
821 816
                         .log("${header.date}"+" routeId:oracle-9->  centralbase.cb_pc_pro_wellbore_status_daily update date faild")
822 817
                         .end();
823 818
 
819
+               from("timer:mytimer1?period=604800000")
820
+                        .routeId("oracle-1")
821
+                        .setHeader("date", constant(getDate()))
822
+                        .setBody(simple("select org_id,org_name,org_id_pre from centralbase.cb_pc_organization_test where org_level = '3' "))
823
+                        .to("jdbc:centralbase")
824
+                        .split(body())
825
+                        .log("${body[org_id]}")
826
+                        .log("${body[org_name]}")
827
+                        .log("${body[org_id_pre]}")
828
+                        .doTry()
829
+                        .setBody(simple(" update centralbase.cb_pc_organization set org_id_pre = '${body[org_id_pre]}'  where org_id = '${body[org_id]}' "))
830
+                        .to("jdbc:centralbase")
831
+                        .doCatch(Exception.class)
832
+                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization update date faild")
833
+                        .end();
834
+
835
+
836
+                from("timer:mytimer1?period=604800000")
837
+                        .routeId("oracle-1")
838
+                        .setHeader("date", constant(getDate()))
839
+                        .setBody(simple("select  well_common_name ,station_id,station_name from centralbase.cb_cd_well_source_test  "))
840
+                        .to("jdbc:centralbase")
841
+                        .split(body())
842
+                        .log("${body[well_common_name]}")
843
+                        .log("${body[station_id]}")
844
+                        .log("${body[station_name]}")
845
+                        .doTry()
846
+                        .setBody(simple(" update centralbase.cb_cd_well_source set station_id = '${body[station_id]}'  ,   station_name = '${body[station_name]}'   where well_common_name = '${body[well_common_name]}' "))
847
+                        .to("jdbc:centralbase")
848
+                        .doCatch(Exception.class)
849
+                        .log("${header.date}"+" routeId:oracle-1->  centralbase.cb_pc_organization update date faild")
850
+                        .end(); */
851
+
852
+
824 853
 
825 854
 
826 855
 

+ 61 - 11
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelRestConfiguration.java

@@ -1,7 +1,6 @@
1 1
 package com.gct.tools.etlcamelhuge.routeconfig;
2 2
 
3 3
 
4
-import com.alibaba.fastjson.JSONArray;
5 4
 import com.alibaba.fastjson.JSONObject;
6 5
 import com.gct.common.util.SGTUtil;
7 6
 import com.gct.tools.etlcamelhuge.entity.ZDJG_WELL;
@@ -10,8 +9,6 @@ import com.gct.tools.etlcamelhuge.util.TransUtil;
10 9
 import org.apache.camel.Message;
11 10
 import org.apache.camel.builder.RouteBuilder;
12 11
 import org.apache.camel.model.dataformat.JsonLibrary;
13
-import org.apache.commons.lang3.ArrayUtils;
14
-import org.eclipse.jetty.util.ArrayUtil;
15 12
 import org.springframework.context.annotation.Configuration;
16 13
 
17 14
 import java.io.*;
@@ -20,13 +17,10 @@ import java.net.HttpURLConnection;
20 17
 import java.net.URL;
21 18
 import java.net.URLEncoder;
22 19
 import java.text.SimpleDateFormat;
23
-import java.time.LocalDateTime;
24
-import java.time.format.DateTimeFormatter;
25 20
 import java.util.ArrayList;
26 21
 import java.util.Date;
27 22
 import java.util.HashMap;
28 23
 import java.util.List;
29
-import java.util.concurrent.atomic.AtomicInteger;
30 24
 
31 25
 
32 26
 @Configuration
@@ -72,6 +66,60 @@ public class CamelRestConfiguration extends RouteBuilder {
72 66
         return date;
73 67
     }
74 68
 
69
+    public String loadPhdDate( String well_id) throws Exception {
70
+        URL restURL = new URL("http://11.72.128.71/api-service/tsdb/getLast?appKey=SR5vY4bED7");
71
+
72
+        //此处的urlConnection对象实际上是根据URL的请求协议(此处是http) 生成的URLConnection类 的子类HttpURLConnection
73
+        HttpURLConnection conn = (HttpURLConnection) restURL.openConnection();
74
+
75
+        //请求方式
76
+        conn.setRequestMethod("POST");
77
+
78
+        //设置是否从httpUrlConnection读入,默认情况下是true; httpUrlConnection.setDoInput(true);
79
+        conn.setDoOutput(true);
80
+
81
+        //allowUserInteraction 如果为 true,则在允许用户交互(例如弹出一个验证对话框)的上下文中对此 URL 进行检查。
82
+        conn.setAllowUserInteraction(false);
83
+        DataOutputStream out = new DataOutputStream(conn
84
+                .getOutputStream());
85
+        String content = "id="+URLEncoder.encode(well_id+"/upData/电流平衡率", "UTF-8");
86
+        out.writeBytes(content);
87
+        out.flush();
88
+        out.close();
89
+        PrintStream ps = new PrintStream(conn.getOutputStream());
90
+
91
+        ps.close();
92
+
93
+        InputStream is = null;
94
+        if (conn.getResponseCode() == 200) {
95
+            is = conn.getInputStream();
96
+        } else {
97
+            is = conn.getErrorStream();
98
+        }
99
+        BufferedReader bReader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
100
+        List<String> list = new ArrayList<>();
101
+
102
+        String line, resultStr = "";
103
+        while (null != (line = bReader.readLine())) {
104
+            resultStr += line;
105
+        }
106
+
107
+        String value = "";
108
+        JSONObject json = null;
109
+        try {
110
+            json = JSONObject.parseObject(resultStr);
111
+
112
+            if (json.size() > 0) {
113
+                value = json.get("value").toString();
114
+            }
115
+
116
+        } catch (Exception e) {
117
+            System.out.println(resultStr);
118
+        }
119
+//        System.out.println(value);
120
+        return value;
121
+    }
122
+
75 123
 
76 124
     public ZDJG_WELL loadGtData(String param, String well_id, String sysDate) throws Exception {
77 125
 //        String date = getDate();
@@ -175,7 +223,7 @@ public class CamelRestConfiguration extends RouteBuilder {
175 223
     @Override
176 224
     public synchronized void configure() throws Exception {
177 225
 
178
-        /*JSONObject body = new JSONObject();
226
+        JSONObject body = new JSONObject();
179 227
 
180 228
           restConfiguration()
181 229
                 .component("servlet")
@@ -203,6 +251,7 @@ public class CamelRestConfiguration extends RouteBuilder {
203 251
 //                    in.setHeader("id",aRow.get("id")+"/analyse/功图数据");
204 252
 //                    in.setHeader("wellid",aRow.get("code"));
205 253
                     if (aRow.get("typeName").toString().indexOf("油井")> 0){
254
+                        Double phd = "".equals(loadPhdDate(aRow.get("id").toString())) ?  0 : Double.valueOf(loadPhdDate(aRow.get("id").toString()) );
206 255
                         ZDJG_WELL zdjg_well = loadGtData(aRow.get("id")+"/analyse/功图数据", aRow.get("code").toString(), "2021-08-17");
207 256
                         aRow.put("sgt",gtTransUtil(zdjg_well.getZd_init_sgt()) );//null
208 257
                         aRow.put("susp_max_load",zdjg_well.getSxzh());//最大载荷
@@ -211,18 +260,19 @@ public class CamelRestConfiguration extends RouteBuilder {
211 260
                         aRow.put("frequency",zdjg_well.getFrequence());
212 261
                         aRow.put("dyna_create_time",zdjg_well.getProd_date());
213 262
                         aRow.put("well_name",aRow.get("code"));
263
+                        aRow.put("phd",phd== null ? 0: phd);
214 264
                     }
215 265
 
216 266
 
217 267
                 })
218
-                .log("${in.header.date}")
268
+//                .log("${body[phd]}"+ "sql" + "${body[well_name]}")
219 269
                 .doTry()
220
-                .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " +
221
-                        "values( '${body[well_name]}','${in.header.date}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}') " ))
270
+                .setBody(simple("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt,elec_pump_current_b) " +
271
+                        "values( '${body[well_name]}','${in.header.date}','${body[stroke]}','${body[frequency]}','${body[susp_max_load]}','${body[susp_min_load]}','${body[sgt]}','${body[phd]}')  " ))
222 272
 //                        "where NOT EXISTS (SELECT * FROM centralbase.cb_temp_well_mech_runtime WHERE well_id = '${body[well_name]}' and  prod_date = '${in.header.date}' )"))
223 273
                 .to("jdbc:centralbase")
224 274
                 .doCatch(Exception.class)
225
-                .end();*/
275
+                .end();
226 276
 
227 277
       /* JSONArray jsonArray = new JSONArray();
228 278
         JSONObject search = new JSONObject();