Quellcode durchsuchen

同步井信息

xsr vor 1 Monat
Ursprung
Commit
f2f770f3be

+ 0 - 0
mvnw


+ 4 - 0
pom.xml

@@ -34,6 +34,10 @@
34 34
     </dependencyManagement>
35 35
     <dependencies>
36 36
         <dependency>
37
+            <groupId>org.springframework.boot</groupId>
38
+            <artifactId>spring-boot-starter-quartz</artifactId>
39
+        </dependency>
40
+        <dependency>
37 41
             <groupId>mysql</groupId>
38 42
             <artifactId>mysql-connector-java</artifactId>
39 43
         </dependency>

+ 30 - 0
src/main/java/com/gct/tools/etlcamelhuge/camelconfig/QuartzConfig.java

@@ -0,0 +1,30 @@
1
+package com.gct.tools.etlcamelhuge.camelconfig;
2
+
3
+import com.gct.tools.etlcamelhuge.quartz.SyncWellSourceTask;
4
+import org.quartz.*;
5
+import org.springframework.context.annotation.Bean;
6
+import org.springframework.context.annotation.Configuration;
7
+
8
+import static org.quartz.CronScheduleBuilder.cronSchedule;
9
+
10
+@Configuration
11
+public class QuartzConfig {
12
+
13
+
14
+    @Bean
15
+    public JobDetail jobDetail() {
16
+        return JobBuilder.newJob(SyncWellSourceTask.class)
17
+                .withIdentity("ModbusReadTask" ,"Group1")
18
+                .storeDurably()
19
+                .build();
20
+    }
21
+
22
+    @Bean
23
+    public Trigger trigger() {
24
+        return TriggerBuilder.newTrigger()
25
+                .forJob("ModbusReadTask", "Group1")
26
+                .withSchedule(SimpleScheduleBuilder.simpleSchedule()
27
+                        .withIntervalInHours(1)  // 间隔1小时
28
+                        .repeatForever()).build();
29
+    }
30
+}

+ 102 - 0
src/main/java/com/gct/tools/etlcamelhuge/quartz/SyncWellSourceTask.java

@@ -0,0 +1,102 @@
1
+package com.gct.tools.etlcamelhuge.quartz;
2
+
3
+import org.quartz.JobExecutionContext;
4
+import org.quartz.JobExecutionException;
5
+import org.springframework.jdbc.core.JdbcTemplate;
6
+import org.springframework.scheduling.quartz.QuartzJobBean;
7
+
8
+import javax.annotation.Resource;
9
+import javax.sql.DataSource;
10
+import java.time.LocalDateTime;
11
+import java.time.format.DateTimeFormatter;
12
+import java.util.*;
13
+import java.util.concurrent.atomic.AtomicInteger;
14
+
15
+public class SyncWellSourceTask extends QuartzJobBean {
16
+
17
+    @Resource(name = "oracle")
18
+    DataSource oracle;
19
+
20
+    @Resource(name = "centralbase")
21
+    DataSource centralbase;
22
+
23
+    private void orgUpdate(){
24
+        JdbcTemplate oracleTemplate = new JdbcTemplate(oracle);
25
+        final JdbcTemplate centralbaseTemplate = new JdbcTemplate(centralbase);
26
+        String sql = String.format("select * from zy_dxgw.V_PC_ORGANIZATION_T where CANTON = '重油公司'");
27
+        List<Map<String, Object>> results = oracleTemplate.queryForList(sql);
28
+
29
+        Map<String, Map<String, Object>> parentObjMap = new HashMap<>();
30
+        Map<String, Map<String, Object>> orgIdMap = new HashMap<>();
31
+        AtomicInteger a = new AtomicInteger(1);
32
+        results.forEach(rowMap -> {
33
+            rowMap.put("org_id_map_int",a.getAndIncrement());
34
+            orgIdMap.put(rowMap.get("ORG_ID").toString(), rowMap);
35
+        });
36
+        results.forEach(rowMap -> {
37
+            parentObjMap.put(rowMap.get("ORG_ID").toString(), orgIdMap.get(rowMap.get("PARENT_ID")));
38
+        });
39
+
40
+        results.forEach(rowMap -> {
41
+            Stack<String> orgPreStack = new Stack<>();
42
+            Map<String, Object> parentObj = rowMap;
43
+
44
+            while ((parentObj = parentObjMap.get(parentObj.get("ORG_ID").toString())) != null) {
45
+                orgPreStack.push(parentObj.get("ORG_NAME").toString());
46
+            }
47
+            StringBuilder orgIdPre = new StringBuilder();
48
+            while (!orgPreStack.isEmpty()) {
49
+                orgIdPre.append(orgPreStack.pop()).append("@");
50
+            }
51
+            orgIdPre.append(rowMap.get("ORG_NAME"));
52
+            String orgIdPreStr = orgIdPre.toString();
53
+            rowMap.put("ORG_ID_PRE",orgIdPreStr);
54
+
55
+         /*   String sqlInsertOrg = String.format("insert into centralbase.cb_pc_organization(" +
56
+                            "org_id,org_name,org_code,org_level,parent_id,org_id_pre) " +
57
+                            "values('%s','%s','%s','%s','%s','%s')", rowMap.get("org_id_map_int"), rowMap.get("ORG_NAME"), rowMap.get("ORG_CODE"),
58
+                    orgIdPreStr.split("@").length
59
+                    , Objects.nonNull(orgIdMap.get(rowMap.get("PARENT_ID").toString()))?orgIdMap.get(rowMap.get("PARENT_ID").toString()).get("org_id_map_int"):"0"
60
+                    , orgIdPreStr);
61
+//            System.out.println(sqlInsertOrg);
62
+            centralbaseTemplate.execute(sqlInsertOrg);  */ //excute finish, unused
63
+        });
64
+
65
+        if(a.get()>1)return;
66
+        //excute wellsource
67
+        sql = String.format("select WELL_ID,well_common_name,org_id from " +
68
+                "zy_dxgw.pc_dev_well_attr_info where well_purpose_name='采油井'");
69
+        results = oracleTemplate.queryForList(sql);
70
+        results.forEach(rowMap -> {
71
+            String wellId = rowMap.get("WELL_ID").toString();
72
+            String wellCommonName = rowMap.get("WELL_COMMON_NAME").toString();
73
+            String wellLegalName = rowMap.get("WELL_COMMON_NAME").toString();
74
+            String orgId = rowMap.get("ORG_ID").toString();
75
+            String orgIdMapInt = orgIdMap.get(orgId).get("org_id_map_int").toString();
76
+            String stationId = orgIdMap.get(orgId).get("ORG_ID_PRE").toString();
77
+
78
+
79
+            String sqlQueryWellSource = String.format("select * from centralbase.cb_cd_well_source where well_id = '%s'",wellId);
80
+            List<Map<String, Object>> maps = centralbaseTemplate.queryForList(sqlQueryWellSource);
81
+            if(maps.isEmpty()){
82
+                System.out.println("===========> 添加新井: "+wellCommonName);
83
+                String  sqlInsertWellSource = String.format("insert into centralbase.cb_cd_well_source(well_id,well_common_name," +
84
+                                "well_legal_name,org_id,station_id) values('%s','%s','%s','%s','%s')",
85
+                        wellId, wellCommonName, wellLegalName, orgIdMapInt, stationId);
86
+//            System.out.println(sqlInsertWellSource);
87
+                centralbaseTemplate.execute(sqlInsertWellSource);
88
+
89
+                String sqlInsertSysWellControl = String.format("insert into centralbase.sys_access_well_control(well_id," +
90
+                        "well_common_name,org_id,access_date,access_status) values('%s','%s','%s','%s,'%s')",
91
+                        wellId, wellCommonName, orgIdMapInt, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),"1");
92
+                centralbaseTemplate.execute(sqlInsertSysWellControl);
93
+            }
94
+        });
95
+    }
96
+
97
+    @Override
98
+    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
99
+        orgUpdate();
100
+    }
101
+}
102
+

+ 0 - 107
src/main/java/com/gct/tools/etlcamelhuge/routeconfig/CamelJDBCConfiguration.java

@@ -51,113 +51,6 @@ public class CamelJDBCConfiguration {
51 51
             @Override
52 52
             public void configure() throws Exception {
53 53
 
54
-                /*RouteDefinition OrgAndWellSource= (RouteDefinition) from("timer:insert-OrgAndWellSource?period=86400000")
55
-                        .routeId("insert-OrgAndWellSource")
56
-                        .setHeader("date", constant(getDate()+" 00:00:00"))
57
-                        .process(exchange -> {
58
-                            org = 0;
59
-                            orgIdPreList = new HashMap<>();
60
-                        })
61
-                        .setBody(simple("select max(org_id) from centralbase.cb_pc_organization"))
62
-                        .to("jdbc:centralbase")
63
-                        .process(exchange -> {
64
-                            HashMap body = exchange.getIn().getBody(HashMap.class);
65
-                            if (body == null || StringUtils.isEmpty(body.get("max"))) org = 0;
66
-                            else
67
-                                org = Integer.valueOf(body.get("max").toString());
68
-                        })
69
-                        .setBody(simple("select org_id_pre,org_id from centralbase.cb_pc_organization"))
70
-                        .to("jdbc:centralbase")
71
-                        .split(body()).process(exchange -> {
72
-                            HashMap body = exchange.getIn().getBody(HashMap.class);
73
-                            orgIdPreList.put(body.get("org_id_pre"),body.get("org_id"));
74
-                        }).end();
75
-                        setMyBody(OrgAndWellSource)
76
-                       .setBody(simple("select  distinct WELL_ID,cydmc,zyq,zk,qyrq,sccw,qk,REMARKS from DBA01 where PROD_DATE=to_date('${header.date}','yyyy-mm-dd hh24:mi:ss') and WELL_ID ='${header.well_id}'"))
77
-                       .to("jdbc:oracle")
78
-                       .transform()
79
-                       .body((result) -> {
80
-                              organization = new TreeSet<>();
81
-                              return result;
82
-                          })
83
-                       .step("1")
84
-                       .split(body()).process(exchange -> {
85
-                      Message in = exchange.getIn();
86
-                      HashMap<String, Object> aRow = in.getBody(HashMap.class);
87
-                      if (StringUtils.isEmpty(aRow.get("qyrq"))){
88
-                          aRow.put("QYRQ","2021-01-01 00:00:00");
89
-                      }
90
-                      if (!aRow.containsKey("JM")) aRow.put("JM",aRow.get("WELL_ID"));
91
-                      String org_level3 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC") + "@" + aRow.get("ZK");
92
-                      String org_level2 = aRow.get("ZYQ") + "@" + aRow.get("CYDMC");
93
-                      String org_level1 = aRow.get("ZYQ").toString();
94
-                      aRow.put("station_id", org_level3);
95
-                      orgID = org;
96
-                      if ((!orgIdPreList.containsKey(org_level1)) || (!orgIdPreList.containsKey(org_level2)) || (!orgIdPreList.containsKey(org_level3))) {
97
-                          if (organization.add(org_level1)) {
98
-                                if (!orgIDs.containsKey(org_level1)) orgIDs.put(org_level1,++orgID);
99
-                          }
100
-                          if (organization.add(org_level2)) {
101
-                              if (!orgIDs.containsKey(org_level2)) orgIDs.put(org_level2,++orgID);
102
-                          }
103
-                          if (organization.add(org_level3)) {
104
-                              if (!orgIDs.containsKey(org_level3)) orgIDs.put(org_level3,++orgID);
105
-                          }
106
-                      }
107
-                      if(orgIdPreList.get(org_level3) !=null){
108
-                          aRow.put("org_id",orgIdPreList.get(org_level3));
109
-                          return;
110
-                      }
111
-                      if(orgIDs.get(org_level3) !=null){
112
-                          aRow.put("org_id",orgIDs.get(org_level3));
113
-                      }
114
-                  })
115
-                  .setBody(simple("insert into centralbase.cb_cd_well_source (well_id,well_common_name,spud_date,org_id,station_id,station_name,completion_name,PRODUCING_AREA_name,remarks) " +
116
-                                  "select '${body[WELL_ID]}','${body[JM]}','${body[QYRQ]}'::timestamp,'${body[org_id]}','${body[station_id]}','${body[ZK]}','${body[SCCW]}','${body[QK]}','${body[REMARKS]}' " +
117
-                                  "where NOT EXISTS ( SELECT * FROM centralbase.cb_cd_well_source WHERE well_id = '${body[WELL_ID]}' )"))
118
-                  .to("jdbc:centralbase")
119
-                  .end()
120
-                  .transform().body((re) -> {
121
-                      List<Map<String, Object>> rows = new ArrayList<>();
122
-                      for (String s : organization) {
123
-                          Map<String, Object> row = new HashMap<>();
124
-                          String[] orgs = s.split("@");
125
-                          row.put("org_id_pre", s);
126
-                          switch (orgs.length) {
127
-                              case 1:
128
-                                  row.put("org_name", orgs[0]);
129
-                                  row.put("org_level", 1);
130
-                                  row.put("org_parent", "0");
131
-                                  break;
132
-                              case 2:
133
-                                  row.put("org_name", orgs[1]);
134
-                                  row.put("org_level", 2);
135
-                                  row.put("org_parent", orgIDs.get(orgs[0]).toString());
136
-                                  break;
137
-                              case 3:
138
-                                  row.put("org_name", orgs[2]);
139
-                                  row.put("org_level", 3);
140
-                                  row.put("org_parent", orgIDs.get(orgs[0] + "@" + orgs[1]).toString());
141
-                                  break;
142
-                          }
143
-                          if (!orgIdPreList.containsKey(s)) {
144
-                              org++;
145
-                              row.put("org_code", org);
146
-                              row.put("org_id", "" + org);
147
-                              orgIdPreList.put(s, row.get("org_id"));
148
-                              rows.add(row);
149
-                          }
150
-                      }
151
-                      return rows;
152
-                  }).split(body())
153
-                  .setBody(simple("insert into centralbase.cb_pc_organization(org_id,org_code,org_name,org_level,parent_id,org_id_pre)" +
154
-                                  "select '${body[org_id]}','${body[org_code]}','${body[org_name]}','${body[org_level]}','${body[org_parent]}','${body[org_id_pre]}' " +
155
-                                  "where NOT EXISTS ( SELECT * FROM centralbase.cb_pc_organization WHERE org_id = '${body[org_id]}')"))
156
-                  .doTry()
157
-                  .to("jdbc:centralbase")
158
-                  .doCatch(Exception.class)
159
-                  .log("${header.date}"+" routeId:insert-OrgAndWellSource->  centralbase.cb_pc_organization insert data failed")
160
-                  .end();*/
161 54
 
162 55
                 from("timer:update-wellControl?period=3600000")
163 56
                         .routeId("update-wellControl")

+ 6 - 6
src/main/resources/application.yml

@@ -7,7 +7,7 @@ spring:
7 7
     ds1:
8 8
       ## Hikari连接池的设置 Hikari 时间单位都是毫秒
9 9
       type: com.zaxxer.hikari.HikariDataSource
10
-      jdbc-url: jdbc:postgresql://11.72.150.241:54321/database
10
+      jdbc-url: jdbc:postgresql://10.72.118.135:54321/database
11 11
       username: root
12 12
       password: 123456
13 13
       driver-class-name: org.postgresql.Driver
@@ -30,7 +30,7 @@ spring:
30 30
     ds2:
31 31
       ## Hikari连接池的设置 Hikari 时间单位都是毫秒
32 32
       type: com.zaxxer.hikari.HikariDataSource
33
-      jdbc-url: jdbc:postgresql://11.72.150.241:54321/diagnosis
33
+      jdbc-url: jdbc:postgresql://10.72.118.135:54321/diagnosis
34 34
       username: root
35 35
       password: 123456
36 36
       driver-class-name: org.postgresql.Driver
@@ -75,7 +75,7 @@ spring:
75 75
     ds4:
76 76
       ## Hikari连接池的设置 Hikari 时间单位都是毫秒
77 77
       type: com.zaxxer.hikari.HikariDataSource
78
-      jdbc-url: jdbc:postgresql://11.72.150.241:54321/calliquid
78
+      jdbc-url: jdbc:postgresql://10.72.118.135:54321/calliquid
79 79
       username: root
80 80
       password: 123456
81 81
       driver-class-name: org.postgresql.Driver
@@ -118,7 +118,7 @@ server:
118 118
 
119 119
 rocketmq:
120 120
   #rocketmq的路由调度器的地址
121
-  name-server: 11.72.150.241:9876
121
+  name-server: 10.72.118.135:9876
122 122
   producer:
123 123
     # 消息分组
124 124
     group: aoid
@@ -136,7 +136,7 @@ gct:
136 136
         group: diagnose
137 137
         access-key: diagnose-msg-v1
138 138
         secret-key: diagnose-msg-v1
139
-      name-server: 11.72.150.241:9876
139
+      name-server: 10.72.118.135:9876
140 140
       consumer:
141 141
         group: diagnose
142 142
         access-key: diagnose-msg-v1
@@ -149,7 +149,7 @@ gct:
149 149
         group: warn
150 150
         access-key: warn-msg-v1
151 151
         secret-key: warn-msg-v1
152
-      name-server: 11.72.150.241:9876
152
+      name-server: 10.72.118.135:9876
153 153
       consumer:
154 154
         group: warn
155 155
         access-key: warn-msg-v1