본문 바로가기
개발 이론/Spring Batch

[Spring Batch] 예제만들어보기

by dal_been 2024. 2. 29.
728x90

이전 블로그에서  간단하게 개념을 살펴보았다. 이번에는 Spring Batch 5 기준 예제를 만들어서 적용해볼 예정이다.

이번 예제는 PayHistory테이블에서 pay_done상태의 데이터를 뽑아 Adjust와 AdjustDetail테이블에 정산하는 것이다.

 

여기서 주의하실점 PayHistory테이블의 userId는 서비스 수행자의 id라고 가정한다.

 

Tasklet을 이용

 

일단 나의 로직은 이러하다

PayHistory에서 pay_done인 데이터들 가져와서

-> Adjust에 PayHistory의 userId기준으로 데이터가 있다면 거기다가 값을 더해주고 

-> AdjustDetail 데이터도 생성한다.

 

-> 만약 Adjust에 PayHistory의 userId기준으로 데이터가 없다면 Adjust 데이터를 만들고

-> AdjustDetail 데이터를 생성한다.

 

@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableBatchProcessing
public class SimpleJobConfig {

  private final PayHistoryRepository payHistoryRespository;
  private final WalkerAdjustRepository walkerAdjustRepository;
  private final WalkerAdjustDetailRepository walkerAdjustDetailRepository;

  @Bean
  public Job adjustJob(JobRepository jobRepository , Step step1) {
    return new JobBuilder("adjust" , jobRepository)
        .start(step1)
        .build();
  }
 }

(코드의 일부분만 가져왔다. 차차 밑에서 더 가져올 예정)

 

  • adjust라는 잡을 생성하고 step1이라는 step을 등록했다.
  @Bean
  public Step adjustStep(JobRepository jobRepository , PlatformTransactionManager transactionManager) {
    return new StepBuilder("step1" , jobRepository)
        .tasklet(myTasklet() , transactionManager)
        .build();
  }
  • step에는 myTasklet을 등록
  @Bean
  public Tasklet myTasklet () {
    return ((contribution, chunkContext) -> {
		
      //PayHistory 찾고
      List <PayHistory> payHistoryList = payHistoryRespository.findByPayStatusOrderByUserId(PAY_DONE);
      for (PayHistory payHistory : payHistoryList) {
      
        Optional <WalkerAdjust> walkerAdjust = walkerAdjustRepository.findByUserIdAndAndWalkerAdjustDate(
            payHistory.getUserId() , LocalDate.now());
		//이미 Adjust가 있다면
        if(walkerAdjust.isPresent()) {
          WalkerAdjust adjust = walkerAdjust.get();
          adjust.setPrice(adjust.getWalkerTtlPrice()+payHistory.getPayPrice());
          walkerAdjustRepository.save(adjust);

          WalkerAdjustDetail walkerAdjustDetail = WalkerAdjustDetail.builder()
              .walkerAdjust(adjust)
              .walkerAdjustPrice(payHistory.getPayPrice())
              .payhistoryId(payHistory.getPayId())
              .build();
          walkerAdjustDetailRepository.save(walkerAdjustDetail);
		
        //adjust가 업다면
        } else {
          WalkerAdjust adjust = WalkerAdjust.builder()
              .userId(payHistory.getUserId())
              .walkerAdjustDate(LocalDate.now())
              .build();

          adjust.setPrice(payHistory.getPayPrice());
          walkerAdjustRepository.save(adjust);

          WalkerAdjustDetail walkerAdjustDetail = WalkerAdjustDetail.builder()
              .walkerAdjust(adjust)
              .walkerAdjustPrice(payHistory.getPayPrice())
              .payhistoryId(payHistory.getPayId())
              .build();
          walkerAdjustDetailRepository.save(walkerAdjustDetail);
        }
      }

      return RepeatStatus.FINISHED;
    });
  }

 

  • 좀 코드가 길지만 어째든 앞서 얘기한 흐름이다.

tasklet을 사용하면서 느낌점은 구분점이 없다는 것이다... 뭐랄까... 데이터를 읽는 부분과 수정해야하는 부분과 save해야하는 부분이 한곳에 다 모여있으니 일단 가독성이 떨어지기도 하지만... 중요한 것은 만약 저기 어디가에서 에러가 나오면..저 코드를 하나씩 분석해봐야한다..

 

그래서 사용하는게 itemreader, itemprocessor, itemwriter이다

 

 

itemreader, itemprocessor, itemwriter

 

플로우는 위에거랑  비슷하지만 여기서 조금 다른점은 step이 두개로 나누었다. 

그 이유는 하나의 reader에서 payHistory도 조회해야하고 Adjust도 존재하는지 조회해야하는데

만약 존재하지 않는다면 processor에서 Adjust를 생성하고 save를 한뒤에 AdjustDetail을 생성할 수 있다. 그러면 결국에는 processor에 AdjustRepository의 save 코드가 들어가야한다.. 이게 맞을까..? processor에 insert 코드가 들어가는 것이..?? 

 

너무 Processor에 많은 기능을 추가한 것같고 insert 쿼리가  processr의 기능인 가공의 느낌이 아니다.

그래서 step을 둘로 나누었다.

step1 : PayHistory에서 userId기준으로 'PAY_DONE'인 데이터를 groupBy한후 userId기준으로 Adjust를 생성한다

step2 : PayHistory에서 'PAY_DONE'인 데이터를 찾아온 후 step1에서 생성한 Adjust를 찾아온다음에 금액 변경 + AdjustDetail 데이터를 생성해준다.

 

간단하게 글로 설명하면 위와 같고 코드로 봐보자

 

@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableBatchProcessing
public class SimpleJobConfig {

  @Bean
  public Job adjustJob(JobRepository jobRepository ,@Qualifier("Adjust") Step adjustStep, @Qualifier("AdjustDetail")
      Step adjustDetailStep) {
    return new JobBuilder("adjust" , jobRepository)
        .start(adjustStep)
        .next(adjustDetailStep)
        .build();
  }

}

 

  • start에 앞서 말한 step1을 등록하고 next에 step2를 등록한 것이다

 

@Configuration
@RequiredArgsConstructor
public class AdjustStepConfig {

  private final EntityManagerFactory entityManagerFactory;


  @Bean(name = "Adjust")
  @JobScope
  public Step adjustStep(JobRepository jobRepository , PlatformTransactionManager transactionManager) {
    return new StepBuilder("adjustStep" , jobRepository)
        .<Long,WalkerAdjust>chunk(10,transactionManager)
        .reader(payHistoryItemReader())
        .processor(payHistoryItemProcessor())
        .writer(walkerAdjustDetailItemWriter())
        .build();
  }

  @Bean
  public ItemReader<Long> payHistoryItemReader() {
    Map<String, Object> parameters = new HashMap <>();
    parameters.put("status", PAY_DONE);
    JpaPagingItemReader <Long> reader = new JpaPagingItemReader <>();
    reader.setName("payHistoryReader");
    reader.setEntityManagerFactory(entityManagerFactory);
    reader.setQueryString("SELECT p.userId FROM PayHistory p WHERE p.payStatus = :status GROUP BY p.userId ORDER BY p.userId");

    reader.setPageSize(10);
    reader.setParameterValues(parameters);
    return reader;
  }

   @Bean
  public ItemProcessor <Long, WalkerAdjust> payHistoryItemProcessor() {
    return userId -> {

      return WalkerAdjust.builder()
          .userId(userId)
          .walkerAdjustDate(LocalDate.now())
          .build();
    };
  }

  @Bean
  public ItemWriter<WalkerAdjust> walkerAdjustDetailItemWriter(){
    return new JpaItemWriterBuilder <WalkerAdjust>()
        .entityManagerFactory(entityManagerFactory)
        .build();
  }

}
  • ItemReader -> PayHistory테이블에서 'PAY_DONE'인 데이터들만 userId로 groupBy해서 조회한다(user한명당 하나의 Adjust만 생성되면 되기 때문에)
  • ItemProcessor -> Adjust를 생성한다
  • ItemWriter -> DB에 반영

 

@Configuration
@RequiredArgsConstructor
public class AdjustDetailConfig {
  private final WalkerAdjustRepository walkerAdjustRepository;
  private final EntityManagerFactory entityManagerFactory;

  @Bean (name = "AdjustDetail")
  @JobScope
  public Step adjustStep(JobRepository jobRepository , PlatformTransactionManager transactionManager) {
    return new StepBuilder("adjustStep" , jobRepository)
        .<PayHistory, WalkerAdjustDetail>chunk(10,transactionManager)
        .reader(adjustDetailReader())
        .processor(adjustDetailProcessor())
        .writer(adjustDetailItemWriter())
        .build();
  }

  @Bean
  public ItemReader <PayHistory> adjustDetailReader() {
    Map <String, Object> parameters = new HashMap <>();
    parameters.put("status", PAY_DONE);
    JpaPagingItemReader <PayHistory> reader = new JpaPagingItemReader <>(){
      @Override
      public int getPage() {
        return 0;
      }
    };
    reader.setName("adjustReader");
    reader.setEntityManagerFactory(entityManagerFactory);
    reader.setParameterValues(parameters);
    reader.setPageSize(10);
    reader.setQueryString("select p from PayHistory p WHERE p.payStatus = :status");
    return reader;
  }

  @Bean
  public ItemProcessor <PayHistory, WalkerAdjustDetail> adjustDetailProcessor() {
    return payHistory -> {

      WalkerAdjust adjust = walkerAdjustRepository.findByUserIdAndAndWalkerAdjustDate(
          payHistory.getUserId() , LocalDate.now()).get();
      adjust.setPrice(adjust.getWalkerTtlPrice()+payHistory.getPayPrice());
      payHistory.setStatus(ADJUST_DONE);
      return WalkerAdjustDetail.builder()
          .walkerAdjust(adjust)
          .payHistory(payHistory)
          .walkerAdjustPrice(payHistory.getPayPrice())
          .build();

    };
  }

  @Bean
  public ItemWriter <WalkerAdjustDetail> adjustDetailItemWriter(){
    return new JpaItemWriterBuilder <WalkerAdjustDetail>()
        .entityManagerFactory(entityManagerFactory)
        .build();
  }
}
  • ItemReader -> PayHistory에서 'PAY_DONE'인 데이터들을 조회한다
  • ItemProcessor -> Adjust를 조회하고 값을 변경,  PayHistory 상태 변경 ,AdjustDetail생성하고 writer에 AdjustDetail만 넘긴다(연관관계 cascadeType 활용)
  • ItemWriter -> DB에 AdjustDetail을 반영한다

 

테스트코드

 

itemReader, itemProcessor, itemWriter를 이용한 코드로 테스트코드를 작성했다.

 

@SpringBootTest
@SpringBatchTest
class SimpleJobConfigTest {

  @Autowired
  private JobLauncherTestUtils jobLauncherTestUtils;

  @Autowired
  private PayHistoryRepository payHistoryRespository;
  @Autowired
  private WalkerAdjustRepository walkerAdjustRepository;


  @Test
  void test() throws Exception {
    JobParameters jobParameters=new JobParametersBuilder()
      .addLong("time",2L)
      .toJobParameters();
    jobLauncherTestUtils.launchJob(jobParameters);

  }

  @Test
  void test2(){
    payHistoryRespository.deleteAll();
    for(int i =0;i<80;i++){
      payHistoryRespository.save(PayHistory.builder()
          .payPrice((long) i)
          .userId(1L+i/10)
          .payMethod("CARD")
          .build());

    }
    System.out.println(payHistoryRespository.findAll().size());
  }
}

(test2는 데이터를 넣고 테스트 하기 위해 존재한다).

 

값의 개수를 80개로 넣은건 데이터가 잘들어갔는지 보여주려면 데이터 개수를 줄여야했다. 원래는  550개로 해봤는데 잘되더라!

일단 test2로 들어간 payHistory 값을 보면

잘 들어갔다. 

 

이후 test코드를 돌리면..

  • PayHistory의 상태값이 Adjust_done으로 변경되었다.

  • PayHistory에 'PAY_DONE'인 데이터에서 user가 8번까지 있었는데 그에따라 Adjust도 잘 생성되었다

  • AdjustDetail도 PayHistory개수 만틈 생성되었다(현재 PayHistory status가 다 PAY_DONE이라서) 

지금까지 Spring Batch의 예제를 만들어봤다. 현재 코드들은 여기 깃허브에 있다.

지금 코드들은 쿼리가 나가는 거라든지, 실패했을때의 처리라든지에 대한 것을 안되어있다. 그부분은 나중에 조금더 보충할 예정이다.

 

'개발 이론 > Spring Batch' 카테고리의 다른 글

[Spring Batch] 개념  (1) 2024.02.27