3 Commits f3010811c5 ... 00da73ddb2

Auteur SHA1 Message Date
  xsr 00da73ddb2 同步井信息 il y a 1 semaine
  xsr 8a75bb5d80 Merge remote-tracking branch 'origin/zygs-1024' into zygs-1024 il y a 1 mois
  xsr f2f770f3be 同步井信息 il y a 1 mois

+ 0 - 0
mvnw


+ 4 - 0
pom.xml

@@ -34,6 +34,10 @@
34 34
     </dependencyManagement>
35 35
     <dependencies>
36 36
         <dependency>
37
+            <groupId>org.springframework.boot</groupId>
38
+            <artifactId>spring-boot-starter-quartz</artifactId>
39
+        </dependency>
40
+        <dependency>
37 41
             <groupId>mysql</groupId>
38 42
             <artifactId>mysql-connector-java</artifactId>
39 43
         </dependency>

+ 1 - 1
src/main/java/com/gct/tools/etlcamelhuge/MQ/DiagnoseMessageProducer.java

@@ -49,7 +49,7 @@ public class DiagnoseMessageProducer implements MessageProducer {
49 49
             return;
50 50
         }
51 51
         DiagnoseMsg actualMsg = (DiagnoseMsg) msgBody;
52
-        System.out.println(properties);
52
+//        System.out.println(properties);
53 53
         Message msg = new Message(properties.getTopic(), properties.getTags(), actualMsg.getWellId(), JSONObject.toJSONString(actualMsg).getBytes(StandardCharsets.UTF_8));
54 54
         sendDefault(producer, msg, (MessageBody) actualMsg, failCallBack, successCallBack,0);
55 55
     }

+ 30 - 0
src/main/java/com/gct/tools/etlcamelhuge/camelconfig/QuartzConfig.java

@@ -0,0 +1,30 @@
1
+package com.gct.tools.etlcamelhuge.camelconfig;
2
+
3
+import com.gct.tools.etlcamelhuge.quartz.SyncWellSourceTask;
4
+import org.quartz.*;
5
+import org.springframework.context.annotation.Bean;
6
+import org.springframework.context.annotation.Configuration;
7
+
8
+import static org.quartz.CronScheduleBuilder.cronSchedule;
9
+
10
+@Configuration
11
+public class QuartzConfig {
12
+
13
+
14
+    @Bean
15
+    public JobDetail jobDetail() {
16
+        return JobBuilder.newJob(SyncWellSourceTask.class)
17
+                .withIdentity("ModbusReadTask" ,"Group1")
18
+                .storeDurably()
19
+                .build();
20
+    }
21
+
22
+    @Bean
23
+    public Trigger trigger() {
24
+        return TriggerBuilder.newTrigger()
25
+                .forJob("ModbusReadTask", "Group1")
26
+                .withSchedule(SimpleScheduleBuilder.simpleSchedule()
27
+                        .withIntervalInHours(1)  // 间隔1小时
28
+                        .repeatForever()).build();
29
+    }
30
+}

+ 106 - 0
src/main/java/com/gct/tools/etlcamelhuge/quartz/SyncWellSourceTask.java

@@ -0,0 +1,106 @@
1
+package com.gct.tools.etlcamelhuge.quartz;
2
+
3
+import org.quartz.JobExecutionContext;
4
+import org.quartz.JobExecutionException;
5
+import org.springframework.jdbc.core.JdbcTemplate;
6
+import org.springframework.scheduling.quartz.QuartzJobBean;
7
+import org.springframework.transaction.annotation.Transactional;
8
+
9
+import javax.annotation.Resource;
10
+import javax.sql.DataSource;
11
+import java.time.LocalDateTime;
12
+import java.time.format.DateTimeFormatter;
13
+import java.util.*;
14
+import java.util.concurrent.atomic.AtomicInteger;
15
+
16
+public class SyncWellSourceTask extends QuartzJobBean {
17
+
18
+    @Resource(name = "oracle")
19
+    DataSource oracle;
20
+
21
+    @Resource(name = "centralbase")
22
+    DataSource centralbase;
23
+
24
+    @Transactional(rollbackFor = Exception.class)
25
+    public void orgUpdate(){
26
+        JdbcTemplate oracleTemplate = new JdbcTemplate(oracle);
27
+        final JdbcTemplate centralbaseTemplate = new JdbcTemplate(centralbase);
28
+        String sql = String.format("select * from zy_dxgw.V_PC_ORGANIZATION_T where CANTON = '重油公司'");
29
+        List<Map<String, Object>> results = oracleTemplate.queryForList(sql);
30
+
31
+        Map<String, Map<String, Object>> parentObjMap = new HashMap<>();
32
+        Map<String, Map<String, Object>> orgIdMap = new HashMap<>();
33
+        AtomicInteger a = new AtomicInteger(1);
34
+        results.forEach(rowMap -> {
35
+            rowMap.put("org_id_map_int",a.getAndIncrement());
36
+            orgIdMap.put(rowMap.get("ORG_ID").toString(), rowMap);
37
+        });
38
+        results.forEach(rowMap -> {
39
+            parentObjMap.put(rowMap.get("ORG_ID").toString(), orgIdMap.get(rowMap.get("PARENT_ID")));
40
+        });
41
+
42
+        results.forEach(rowMap -> {
43
+            Stack<String> orgPreStack = new Stack<>();
44
+            Map<String, Object> parentObj = rowMap;
45
+
46
+            while ((parentObj = parentObjMap.get(parentObj.get("ORG_ID").toString())) != null) {
47
+                orgPreStack.push(parentObj.get("ORG_NAME").toString());
48
+            }
49
+            StringBuilder orgIdPre = new StringBuilder();
50
+            while (!orgPreStack.isEmpty()) {
51
+                orgIdPre.append(orgPreStack.pop()).append("@");
52
+            }
53
+            orgIdPre.append(rowMap.get("ORG_NAME"));
54
+            String orgIdPreStr = orgIdPre.toString();
55
+            rowMap.put("ORG_ID_PRE",orgIdPreStr);
56
+
57
+         /*   String sqlInsertOrg = String.format("insert into centralbase.cb_pc_organization(" +
58
+                            "org_id,org_name,org_code,org_level,parent_id,org_id_pre) " +
59
+                            "values('%s','%s','%s','%s','%s','%s')", rowMap.get("org_id_map_int"), rowMap.get("ORG_NAME"), rowMap.get("ORG_CODE"),
60
+                    orgIdPreStr.split("@").length
61
+                    , Objects.nonNull(orgIdMap.get(rowMap.get("PARENT_ID").toString()))?orgIdMap.get(rowMap.get("PARENT_ID").toString()).get("org_id_map_int"):"0"
62
+                    , orgIdPreStr);
63
+//            System.out.println(sqlInsertOrg);
64
+            centralbaseTemplate.execute(sqlInsertOrg);  */ //excute finish, unused
65
+        });
66
+
67
+        //excute wellsource
68
+        sql = String.format("select WELL_ID,well_common_name,org_id from " +
69
+                "zy_dxgw.pc_dev_well_attr_info where well_purpose_name LIKE '%%采油井%%'");
70
+        results = oracleTemplate.queryForList(sql);
71
+        results.forEach(rowMap -> {
72
+            String wellId = rowMap.get("WELL_ID").toString();
73
+            String wellCommonName = rowMap.get("WELL_COMMON_NAME").toString();
74
+            String wellLegalName = rowMap.get("WELL_COMMON_NAME").toString();
75
+            String orgId = rowMap.get("ORG_ID").toString();
76
+            String orgIdMapInt = orgIdMap.get(orgId).get("org_id_map_int").toString();
77
+            String stationId = orgIdMap.get(orgId).get("ORG_ID_PRE").toString();
78
+
79
+
80
+            String sqlQueryWellSource = String.format("select * from centralbase.cb_cd_well_source where well_id = '%s'",wellId);
81
+            List<Map<String, Object>> maps = centralbaseTemplate.queryForList(sqlQueryWellSource);
82
+            if(maps.isEmpty()){
83
+                System.out.println("===========> 添加新井: "+wellCommonName);
84
+                String  sqlInsertWellSource = String.format("insert into centralbase.cb_cd_well_source(well_id,well_common_name," +
85
+                                "well_legal_name,org_id,station_id) select '%s','%s','%s','%s','%s' " +
86
+                                "where not exists (select * from centralbase.cb_cd_well_source where well_id = '%s')",
87
+                        wellId, wellCommonName, wellLegalName, orgIdMapInt, stationId, wellId);
88
+//            System.out.println(sqlInsertWellSource);
89
+                centralbaseTemplate.execute(sqlInsertWellSource);
90
+
91
+                String sqlInsertSysWellControl = String.format("insert into centralbase.sys_access_well_control(well_id," +
92
+                        "well_common_name,org_id,access_date,access_status) select '%s','%s','%s','%s','%s' " +
93
+                                " where not exists (select * from centralbase.sys_access_well_control where well_id = '%s')",
94
+                        wellId, wellCommonName, orgIdMapInt,
95
+                        LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),"1",wellId);
96
+                centralbaseTemplate.execute(sqlInsertSysWellControl);
97
+            }
98
+        });
99
+    }
100
+
101
+    @Override
102
+    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
103
+        orgUpdate();
104
+    }
105
+}
106
+

+ 1 - 1
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCCofRealTimeConfiguration.java

@@ -67,7 +67,7 @@ public class CamelJDBCCofRealTimeConfiguration {
67 67
     public void sendDataToRocketMQ(String wellName, String wellId, String prodDate, Double stroke_length, Double stroke_frequency, String sgt) {
68 68
         String orgId = "0";
69 69
         DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), stroke_length, stroke_frequency);
70
-        System.out.println(diagnoseMsg);
70
+//        System.out.println(diagnoseMsg);
71 71
         sendMsgRunTime++;
72 72
         producer.send(diagnoseMsg);
73 73
     }

+ 4 - 0
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -52,6 +52,7 @@ public class CamelJDBCConfiguration {
52 52
             @Override
53 53
             public void configure() throws Exception {
54 54
 
55
+
55 56
                 from("timer:update-wellControl?period=3600000")
56 57
                         .routeId("update-wellControl")
57 58
                         .setBody(simple("select scc.well_id,wo.well_common_name,op.org_id from centralbase.sys_access_well_control scc\n" +
@@ -168,7 +169,10 @@ public class CamelJDBCConfiguration {
168 169
                         .split(body())
169 170
                         .setBody(simple("update centralbase.cb_pc_pro_wellbore_status_daily set pump_depth = '${body[PUMP_DEPTH]}'  " +
170 171
                                 "where well_id = '${header.well_id}' and prod_date::date  = '${header.date}' "))
172
+                        .doTry()
171 173
                         .to("jdbc:centralbase")
174
+                        .doCatch(Exception.class)
175
+                        .log("${header.date}" + " routeId:update-statusDaily-BS ->  centralbase.cb_pc_pro_wellbore_status_daily update data failed")
172 176
                         .end();
173 177
 
174 178
                 RouteDefinition statusDailyBJ = from("timer:mytimer-update-statusDaily-BJ?period=10800000")

+ 4 - 1
src/main/resources/application.yml

@@ -158,4 +158,7 @@ gct:
158 158
 swagger:
159 159
   authorization:
160 160
     key-name: token
161
-  enabled: true
161
+  enabled: true
162
+logging:
163
+  level:
164
+    root: error