package com.gct.tools.etlcamelhuge.controller; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.gct.common.util.SGTUtil; import com.gct.tools.etlcamelhuge.MQ.MessageBody; import com.gct.tools.etlcamelhuge.MQ.MessageProducer; import com.gct.tools.etlcamelhuge.entity.DiagnoseMsg; import com.gct.tools.etlcamelhuge.entity.GTBody; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; import javax.sql.DataSource; import java.math.BigDecimal; import java.time.LocalDateTime; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * class name: GtController * * @author gxt * @version 1.0 * @since 2021/8/26 下午2:50 周四 */ @RestController @RequestMapping("/GTData") @Api(value = "GtController",description = "功图数据操作controller") public class GtController { private JdbcTemplate jdbcTemplate; @Resource(name = "centralbase") DataSource dataSource; @Resource(name = "gtsj") DataSource dataSourceOfGTSJ; @Resource(name = "diagnoseMessageProducer") private MessageProducer producer; @PostMapping("/sendGTSJ") @ApiOperation(value = "从 RunTime 数据表查询数据并且发送功图数据到 MQ 中") public JSONObject sendGTSJ(@RequestBody GTBody gtBody){ String startDate = gtBody.getStartDate(); String endDate = gtBody.getEndDate(); List wellList = gtBody.getWellList().stream().map(x->x.toString()).collect(Collectors.toList()); JSONObject jsonObject = new JSONObject(); jdbcTemplate = new JdbcTemplate(dataSource); long sumData = 0; try { if (wellList.isEmpty() || wellList.size()==0 ){ String sql = String.format("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so " + " where ti.well_id = so.well_id and ti.prod_date between '%s' and '%s' " ,startDate,endDate); sumData = sendDataToMQ(sql); }else { for (int i = 0; i < wellList.size(); i++) { String sql = String.format("select so.well_id,so.well_common_name,so.org_id,ti.prod_date,ti.stroke_frequency,ti.stroke_length,ti.sgt from centralbase.cb_temp_well_mech_runtime ti, centralbase.cb_cd_well_source so " + " where ti.well_id = so.well_id and ti.well_id = '%s' and ti.prod_date between '%s' and '%s' ", wellList.get(i), startDate, endDate); sumData = sendDataToMQ(sql); } } }catch (Exception e){ e.printStackTrace(); jsonObject.put("error",e.getMessage()); }finally { jsonObject.put("发送数据为sumData",sumData); } return jsonObject; } public long sendDataToMQ(String sql){ int sumData = 0; List> list = jdbcTemplate.queryForList(sql); for (Map map : list) { String wellName = map.get("well_common_name").toString(); String wellId = map.get("well_id").toString(); String orgId = map.get("org_id").toString(); String prodDate = map.get("prod_date").toString().substring(0, 19); Double strokeLength = Double.valueOf(map.get("stroke_length").toString()); Double strokeFrequency = Double.valueOf(map.get("stroke_frequency").toString()); String sgt = map.get("sgt").toString(); if (sgt == null || sgt.length() == 0) { sgt = "0,0,0,0,0,0,0,0,0,0"; } DiagnoseMsg diagnoseMsg = new DiagnoseMsg(wellId, wellName, orgId, prodDate, sgt, LocalDateTime.now().toString(), strokeLength, strokeFrequency); producer.send((MessageBody) diagnoseMsg); sumData++; } return sumData; } @GetMapping("/getGTSJ") @ApiOperation(value = "从实时的机采数据生产的表中获取数据放入到 Runtime 表中") public JSONObject getGTSJ(@RequestBody GTBody gtBody){ String startDate = gtBody.getStartDate(); String endDate = gtBody.getEndDate(); JSONArray wellList = gtBody.getWellList(); JSONObject jsonObject = new JSONObject(); jdbcTemplate = new JdbcTemplate(dataSourceOfGTSJ); int curPage = 0; int pageSize = 5000; int sumData = 0; try { for (int i = 0; i < wellList.size(); i++) { String sql = String.format("select distinct well_name,dyna_create_time,check_date,displacement,disp_load,stroke,frequency,susp_max_load,susp_min_load from public.pc_fd_pumpjack_dyna_dia_t where dyna_create_time > '%s' and well_name = '%s' offset %d limit %d ", startDate, wellList.get(i)); List> list = jdbcTemplate.queryForList(sql); for (Map map : list) { String prod_date = map.get("dyna_create_time").toString().split("\\+")[0]; map.put("dyna_create_time", prod_date); if (map.get("displacement") != null && !map.get("displacement").equals("") && map.get("disp_load") != null && !map.get("disp_load").equals("")) { String[] displacements = map.get("displacement").toString().split(";");//10 四舍五入 String[] disp_loads = map.get("disp_load").toString().split(";"); Double susp_max_load = max(disp_loads); Double susp_min_load = min(disp_loads); String sgt = ""; for (int y = 0; y < displacements.length; y++) { sgt = sgt + displacements[y] + "," + disp_loads[y] + ","; } String[] s = sgt.split(","); String w = ""; for (int x = 0; x < s.length; x++) { w += new BigDecimal(Math.round(Double.parseDouble(s[x]) * 100)).stripTrailingZeros().toPlainString() + ","; } Double[][] doubles = SGTUtil.encodeToDoubleArray(w); map.put("sgt", SGTUtil.encodeToString(doubles)); map.put("susp_max_load",susp_max_load); map.put("susp_min_load",susp_min_load); } if (map.get("stroke") == null) map.put("stroke", "0.0"); if (map.get("frequency") == null) map.put("frequency", "0.0"); if (map.get("susp_max_load") == null) map.put("susp_max_load", "0.0"); if (map.get("susp_min_load") == null) map.put("susp_min_load", "0.0"); if (map.get("frequency") != null){ BigDecimal bd=new BigDecimal(map.get("frequency").toString()); double frequency=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue(); map.put("frequency",frequency); } if (map.get("stroke") != null){ double stroke1 = Double.parseDouble(map.get("stroke").toString()); BigDecimal bd=new BigDecimal(stroke1); double stroke=bd.setScale(1,BigDecimal.ROUND_HALF_UP).doubleValue(); map.put("stroke",stroke); } jdbcTemplate.update("insert into centralbase.cb_temp_well_mech_runtime(well_id,prod_date,stroke_length,stroke_frequency,susp_max_load,susp_min_load,sgt) " + "values (?,?,?,?,?,?,?)",map.get("well_name").toString(),map.get("dyna_create_time"),map.get("stroke"),map.get("frequency"),map.get("susp_max_load"),map.get("susp_min_load"),map.get("sgt")); } sumData += list.size(); if (list.size()< pageSize){ break; } curPage ++ ; } }catch (Exception e){ jsonObject.put("error",e.getMessage()); }finally { jsonObject.put("sumData",sumData); } return jsonObject; } public Double min(String[] strings){ double[] doubles = new double[strings.length]; for (int i = 0; i < strings.length; i++) { doubles[i] = Double.parseDouble(strings[i]); } return Arrays.stream(doubles).min().getAsDouble(); } //获取最大载荷 public Double max(String[] strings){ double[] doubles = new double[strings.length]; for (int i = 0; i < strings.length; i++) { doubles[i] = Double.parseDouble(strings[i]); } return Arrays.stream(doubles).max().getAsDouble(); } }