Java方式实现数据同步

使用java方式实现两个系统之间数据的同步。

业务背景

在新系统中设置定时任务需要实时把客户系统中的数据及时同步过来,保持数据的一致性。

实现逻辑

1.根据客户提供的接口,本系统中采用Http的Post请求方式获取接口数据。
2.由于客户提供的接口必带页码和页面容量,因此会涉及到多次请求接口才能拿到全量数据,因此相同的操作可以采用递归的方式进行。
3.每次请求一次接口根据页面容量(pageSize)可获取多条数据,此时可以采用批量添加数据库的操作,使用批量SQL添加语句。
4.由于数据同步需要保持两个系统数据的一致性,因此需要使用定时任务并规定同步频率,例如:一天一次或者一天两次。
5.定时任务的使用会产生数据重复的问题,因此根据某个唯一字段建立唯一索引来避免数据重复添加的问题。
6.唯一索引的建立可以避免同一记录重复添加的问题,但是避免不了同一条记录除唯一索引字段之外其它字段对应数据发生变化问题,因此使用replace into添加SQL语句可以解决此问题。
提示: a. 如果发现表中已经有此行数据(根据主键或者唯一索引判断)则先删除此行数据,然后插入新的数据。 b. 不然的话,直接插入新的数据。

使用技术

1.设置定时任务。
2.采用Http的Post方式获取接口数据。
3.涉及多页数据采用递归方式循环调用。
4.批量操作数据库(replace into)。
5.建立唯一索引避免重复插入数据。

代码详情

1.StudentMapper.java

/**
     * 批量添加数据同步接口学生数据
     * @param studentList
     * @return
     */
    int addBatchStudent(@Param(value = "studentList") List<Student> studentList);

2.SyncStudentServiceImpl.java代码如下:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import ***.common.utils.HttpUtils;
import ***.common.utils.StringUtils;
import ***.entity.sync.Student;
import ***.mapper.sync.StudentMapper;
import ***.service.sync.SyncStudentService;
import ***.vo.StudentVO;
import lombok.extern.slf4j.Slf4j;

/**
 * 数据同步的学生实现类
 * @author hc
 * @create 2021/03/25 11:20
 * @version 1.0
 * @since 1.0
 */
@Service
@Slf4j
public class SyncStudentServiceImpl implements SyncStudentService {
    @Autowired
    private StudentMapper studentMapper;
    @Autowired
    private HttpUtils httpUtils;

    @Override
    public void bulkAddStudent(StudentVO studentVO) {
    	log.info("调取学生接口传的参数对象studentVO:"+studentVO);
    	log.info("开始调取学生接口获取第" + studentVO.getPageIndex() + "页数据");
    	//如何页面容量小于100,则按100计算
    	if(studentVO.getPageSize() < 100) {
    		studentVO.setPageSize(100);
    	}
        //根据当前页码和页面容量调取获取学生数据接口
        JSONObject jsonObject = this.sendStudentHttpPost(studentVO);
        //判断返回JSONObject是否为null
        if (StringUtils.isNotNull(jsonObject) && jsonObject.containsKey("errcode") && jsonObject.getInteger("errcode") == 0) {
            if(jsonObject.containsKey("data")){
            	JSONArray jsonArray = jsonObject.getJSONArray("data");
            	//通过判断获取的jsonObject对象中key值为data是否为null和其 jsonArray的长度来判断是否进行递归
            	//提示:此判断方法好于通过计算总页码的方式来递归拿数据(对获取的total依赖过大,因此放弃此方式)
            	if(jsonObject.getString("data") != null && jsonArray.size() > 0) {
            	   log.info("当前数据加载到几页》》》》:{}", studentVO.getPageIndex());
    	           //调取批量添加数据
                   this.addStudentCycleData(jsonObject, studentVO);
                   //页码加1,继续调取下一页数据
               	   studentVO.setPageIndex(studentVO.getPageIndex() + 1);
              	   //采用递归方式直至循环结束
                   this.bulkAddStudent(studentVO);
            	}else {
					log.info("学生数据同步结束》》》");
				}
            }
		}
    }

    /**
     * 批量添加学生数据
     * @param jsonObject
     * @param areaVO
     * @return
     */
    public void addStudentCycleData(JSONObject jsonObject, StudentVO studentVO){
    	List<Student> studentList = null;
        //判断jsonArray时候为空
        if (jsonObject != null && StringUtils.isNotBlank(jsonObject.getString("data"))) {
	        //把JsonArray转成对应实体类集合
        	studentList = JSONObject.parseArray(jsonObject.getString("data"), Student.class);
        }
        try {
        	log.info("学生接口第" + studentVO.getPageIndex() + "页数据开始入库...");
        	//调取方法批量进行添加学生数据
        	studentMapper.addBatchStudent(studentList);
        	log.info("学生接口第" + studentVO.getPageIndex() + "页数据入库成功...");
		} catch (Exception e) {
			log.error("学生批量添加数据库异常:{}", e.getMessage());
		}
    }

    /**
     * 根据studentVO(当前页码和页面容量)发送获取学生数据的请求
     * @param studentVO
     * @return
     */
    public JSONObject sendStudentHttpPost(StudentVO studentVO){
    	JSONObject jsonObject = null;
		String studentUrl = "http://*****/async-api/jc/student";
    	try {
    		if (StringUtils.isNotEmpty(studentUrl)) {
        		Map<String, Object> param = new HashMap<>();
        		param.put("pageIndex", studentVO.getPageIndex());
        		param.put("pageSize", studentVO.getPageSize());
                log.info("开始发起http请求...");
                jsonObject = httpUtils.sendHttpPost(param, studentUrl);
    		}
		} catch (Exception e) {
			log.error("调取客户学生同步接口出现异常:{},页面容量为:{},页码:{}", e.getMessage(), 
			studentVO.getPageSize(), studentVO.getPageIndex());
		}
        return jsonObject;
    }
}

3.StudentVO.java代码如下:

import lombok.Data;
/**
 * 数据同步接口获取学生数据传的参数VO类
 * @author hc
 * @create 2021/3/11 10:35
 * @version 1.0
 * @since 1.0
 */
@Data
public class StudentVO{
	//当前页码(初始值为0)
	private Integer pageIndex = 0;
	//页码容量
	private Integer pageSize;
}

4.HttpUtils.java代码如下:

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;
import java.util.Map;

/**
 * Http请求工具类
 * @author hc
 * @create 2021/3/4
 * @version 1.0
 */
@Component
@Slf4j
public class HttpUtils {
    @Autowired
    private RestTemplate restTemplate;
	
    /**
     * 发送http的post请求方法
     * @param param
     * @return
     */
    public JSONObject sendHttpPost(Integer type, Map<String, Object> param, String url){
	    log.info("调取同步接口Url:{}", url);
        JSONObject jsonObject = null;
		//发起http的post准备工作
		HttpHeaders httpHeaders = new HttpHeaders();
		httpHeaders.add("Content-Type", "application/json");
		HttpEntity<Map<String, Object>> httpEntity = new HttpEntity<>(param, httpHeaders);
		ResponseEntity<String> response = null;
		try {
		    log.info("param参数为:{}",param.toString());
			response = restTemplate.postForEntity(url, httpEntity, String.class);
		} catch (HttpClientErrorException e) {
			log.error("发起http请求报错信息:{}",e.getResponseBodyAsString());
		}
		String bodyData = response.getBody();
		if (StringUtils.isNotEmpty(bodyData)) {
			jsonObject = JSONObject.parseObject(bodyData);
		}
        return jsonObject;
    }
}

5.StudentMapper.xml的SQL语句如下:

<!-- 批量添加数据同步接口中获取的学生数据 -->
<insert id="addBatchStudent" parameterType="***.entity.sync.Student">
	replace into xf_clue_sync_student(id, student_code, student_name, status, create_date, update_date)
	<foreach collection="studentList" item="student" open="values" separator="," >
	   (#{student.id,jdbcType=BIGINT}, #{student.studentCode,jdbcType=INTEGER}, #{student.studentName,jdbcType=VARCHAR}, 
	    #{student.status,jdbcType=INTEGER}, #{student.createDate,jdbcType=VARCHAR}, #{student.updateDate,jdbcType=VARCHAR})
	</foreach>
</insert>

功能小结

1.定时任务配置相关代码此处不再展示,SpringBoot框架使用注解的方式即可设置定时任务以及调取频率。
2.数据同步接口开发需要根据具体应用场景采用不同的方法,需视情况而定,例如:也可以使用kettle工具等等。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐