취준생 프로젝트

[Refactoring] BatchUpdate JdbcTemplate 사람인 데이터 동기화

한둥둥 2024. 12. 29. 04:18

우선 해당 작업을 해주게 된 계기는 데이터베이스에 중복된 데이터가 저장되는 문제가 발생하였다. 

그래서 positionId에 Id값을 FK로 잡아주게 되었습니다. 덕분에 api를 테스트 코드에서 호출해서 확인 해보니, 정상적으로 넣어지지 않습니다. 

 

이유는 기존에 있는 정보 positionId중에 사람인에서 바뀐 정보가 있다면 update해주는 로직이 존재하였습니다. 

 

JobInfos jobInfos = jobInfoAggregator.makeUpdateJobInfos();

        int totalUpdateCount = 0;
        for (int i = 0; i < jobInfos.size(); i += 500) {
            totalUpdateCount+= jobInfoJpaRepository.saveAll(jobInfos.subList(i, Math.min(i + 500, jobInfos.size())));
        }

        return totalUpdateCount;

 

해당 코드였습니다. 하지만, saveAll이기 때문에 해당 positionId가 그냥 저장되어버리기 때문에 중복저장되어버립니다. 

positionId는 사람인 api 아이디 값입니다. 

 

package darkoverload.itzip.feature.job.scheduler;

import darkoverload.itzip.feature.job.domain.job.JobInfoAggregator;
import darkoverload.itzip.feature.job.domain.job.JobInfos;
import darkoverload.itzip.feature.job.repository.JobInfoJpaRepository;
import darkoverload.itzip.feature.job.service.connect.JobInfoConnectService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Component
@RequiredArgsConstructor
public class JobInfoScheduler {
    private static final String JOB_INFO_SCHEDULER_CON = "1 30 0 * * *";

    private final JobInfoJpaRepository jobInfoJpaRepository;
    private final JobInfoConnectService service;

    /**
     * Saramin API에서 최신 JobInfo 데이터를 가져와 데이터베이스와 비교하여
     * 삭제, 업데이트, 삽입 작업을 수행하는 메서드입니다. 이 메서드는 매일 00:30에 실행됩니다.
     *
     * @Transactional 트랜잭션이 보장되는 환경에서 실행되며, 작업이 완료될 때까지
     * 트랜잭션이 유지됩니다. 오류가 발생하면 모든 작업이 롤백됩니다.
     * @Scheduled(cron = "1 30 0 * * *") 크론 표현식을 사용하여 매일 01:30에 실행됩니다.
     */

    @Transactional
    @Scheduled(cron = JOB_INFO_SCHEDULER_CON)
    public void jobInfoConnectApi() {
        // 데이터베이스에서 모든 JobInfo 데이터를 조회하고, 도메인 객체 리스트로 변환
        JobInfos dbJobInfos = new JobInfos(jobInfoJpaRepository.findAll().stream().toList());

        // Saramin API를 호출하여 최신 JobInfo 데이터를 가져옴
        JobInfos apiJobInfos = new JobInfos(service.jobInfoConnect());

        JobInfoAggregator aggregator = JobInfoAggregator.create(apiJobInfos, dbJobInfos);

        // 데이터베이스에 있는 JobInfo 데이터를 API 데이터와 비교하여 삭제 작업 수행
        int deleteCount = service.jobInfoDelete(aggregator);
        log.info("==== Saramin API Data deleteCount :: {} ====", deleteCount);

        // 데이터베이스에 있는 JobInfo 데이터를 API 데이터와 비교하여 업데이트 작업 수행
        int updateCount = service.jobInfoUpdate(aggregator);
        log.info("==== Saramin API Data updateCount :: {} ====", updateCount);

        // API 데이터 중에서 데이터베이스에 없는 데이터를 삽입하는 작업 수행
        int saveCount = service.jobInfoSave(aggregator);
        log.info("==== Saramin API Data Data saveCount :: {} ====", saveCount);
    }

}

 

스케줄러의 구조는 이런식으로 되어있습니다. 

각각은 정상적으로 동작한다면 몇개나 바꿔주었는지 로그를 통해서 남겨주도록 만들었습니다. 

각각의 도메인을 통해서 메시지를 던져서 값을 받아오도록 설계를 해주었고, 이를 통해서 해당 객체를 service에 던져주면 알아서 바뀌도록 구현해주었습니다. 

이런 점은 역할과 책임을 분리하여 조금 더 수월하게 테스트를 진행 할 수 있습니다.  << 해당 글은 이미 정리하였기에 생략하겠습니다. 

 

 

JobInfoConnectRepository

package darkoverload.itzip.feature.job.service.connect.port;

import darkoverload.itzip.feature.job.domain.job.JobInfo;

import java.util.List;

public interface JobInfoConnectRepository {

    int deleteAll(List<Long> positionId);

    int saveAll(List<JobInfo> jobInfos);

    int updateAll(List<JobInfo> jobInfos);

}

인터페이스를 만들어서 Impl 클래스에서 반드시 구현해줘야할 JobInfoConnect 사람인 데이터베이스에서 작업할 메소드들을 선언해주었습니다. 

 

 

JobInfoConnectRepositoryImpl

package darkoverload.itzip.feature.job.repository.connect;

import darkoverload.itzip.feature.job.domain.job.JobInfo;
import darkoverload.itzip.feature.job.service.connect.port.JobInfoConnectRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Repository;

import java.util.List;

@Repository
@RequiredArgsConstructor
public class JobInfoConnectRepositoryImpl implements JobInfoConnectRepository{

    private final JobInfoConnectJdbcRepository repository;

    @Override
    public int deleteAll(List<Long> positionId) {
        return repository.deleteAll(positionId);
    }

    public int saveAll(List<JobInfo> jobInfos) {
        return repository.saveAll(jobInfos);
    }

    public int updateAll(List<JobInfo> jobInfos) {
        return repository.updateAll(jobInfos);
    }

}

 

JobInfoConnectJdbcRepositoryImpl은 어댑터 패턴에 의해서 언제든지 쉽게 교체 해주기 위하여 만들어주었습니다. 

덕분에 JobInfoConnectJdbcRepository와의 결합이 낮아졌기에 언제든지 JPA 또는 MyBatis로 바꿔 줄 수 있도록 구현하였습니다. 

 

 

JobInfoConnectJdbcRepository

package darkoverload.itzip.feature.job.repository.connect;

import darkoverload.itzip.feature.job.domain.job.JobInfo;
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;

import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;

@Repository
@RequiredArgsConstructor
public class JobInfoConnectJdbcRepository {

    private final JdbcTemplate jdbcTemplate;

    public int deleteAll(List<Long> positionIds) {
        String sql = "DELETE FROM job_infos WHERE position_id = ?";

        int[][] batchUpdate = jdbcTemplate.batchUpdate(sql, positionIds, 500, (ps, positionId) -> {
            ps.setLong(1, positionId);
        });

        return Arrays.stream(batchUpdate).flatMapToInt(Arrays::stream).sum();
    }

    public int saveAll(List<JobInfo> jobInfos) {
        String sql = "INSERT INTO job_infos(position_id, company_name, url, title, industry_name, location_code, location_name, job_name, experience_min, experience_max, experience_name, keyword, posting_date, expiration_date, scrap_count, create_date, modify_date) " +
                "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";

        int[][] batchInsert = jdbcTemplate.batchUpdate(sql, jobInfos, 500, (ps, jobInfo) -> {
            ps.setLong(1, jobInfo.getPositionId());
            ps.setString(2, jobInfo.getCompanyName());
            ps.setString(3, jobInfo.getUrl());
            ps.setString(4, jobInfo.getTitle());
            ps.setString(5, jobInfo.getIndustryName());
            ps.setString(6, jobInfo.getLocationCode());
            ps.setString(7, jobInfo.getLocationName());
            ps.setString(8, jobInfo.getJobName());
            ps.setLong(9, jobInfo.getExperienceMin());
            ps.setLong(10, jobInfo.getExperienceMax());
            ps.setString(11, jobInfo.getExperienceName());
            ps.setString(12, jobInfo.getKeyword());
            ps.setObject(13, jobInfo.getPostingDate());
            ps.setObject(14, jobInfo.getExpirationDate());
            ps.setLong(15, jobInfo.getScrapCount());
            ps.setObject(16, LocalDateTime.now());
            ps.setObject(17, LocalDateTime.now());
        });

        return Arrays.stream(batchInsert).flatMapToInt(Arrays::stream).sum();
    }

    public int updateAll(List<JobInfo> jobInfos) {
        String sql = "UPDATE job_infos SET " +
                "company_name = ?, " +
                "url = ?, " +
                "title = ?, " +
                "industry_name = ?, " +
                "location_code = ?, " +
                "location_name = ?, " +
                "job_name = ?, " +
                "experience_min = ?, " +
                "experience_max = ?, " +
                "experience_name = ?, " +
                "keyword = ?, " +
                "posting_date = ?, " +
                "expiration_date = ?, " +
                "modify_date = ? " +
                "where position_id = ?";

        int[][] batchUpdate = jdbcTemplate.batchUpdate(sql, jobInfos, 500, (ps, jobInfo) -> {
            ps.setString(1, jobInfo.getCompanyName());
            ps.setString(2, jobInfo.getUrl());
            ps.setString(3, jobInfo.getTitle());
            ps.setString(4, jobInfo.getIndustryName());
            ps.setString(5, jobInfo.getLocationCode());
            ps.setString(6, jobInfo.getLocationName());
            ps.setString(7, jobInfo.getJobName());
            ps.setLong(8, jobInfo.getExperienceMin());
            ps.setLong(9, jobInfo.getExperienceMax());
            ps.setString(10, jobInfo.getExperienceName());
            ps.setString(11, jobInfo.getKeyword());
            ps.setObject(12, jobInfo.getPostingDate());
            ps.setObject(13, jobInfo.getExpirationDate());
            ps.setObject(14, LocalDateTime.now());
            ps.setLong(15, jobInfo.getPositionId());
        });

        return Arrays.stream(batchUpdate).flatMapToInt(Arrays::stream).sum();
    }

}

해당 코드는 jdbc를 통해서 bulkInsert를 해주기 위하여 구현한 코드입니다. 

 

jdbc를 하면서 느낀 점은 실수를 할 확률이 너무 높다였습니다.. 하지만 한번 만들면 왠만해서 사람인 API데이터를 불러오도록 만들어주는 컬럼 값은 바뀔 일이 거의없고 성능적으로도 좋기 때문에 해당 방법을 채택하였습니다. 

jdbc 적용하기전 43초 시간이 걸림 

jdbc 적용 후 31초라는 시간이 걸린다. 

 

정확히 데이터가 6566개를 기준으로 하였을 때, 10초라는 시간이 걸렸기에 더 많은 데이터가 있다면 무조건 jdbc를 채택하는 것이 좋아 보인다.