본문 바로가기
Spring Batch

Spring Batch JpaCursorItemReader 도입되다.

by 향로 (기억보단 기록을) 2021. 1. 25.
반응형

Spring Batch 4.3이 릴리즈 되면서 JpaCursorItemReader 가 도입되었습니다.

intro

(Spring Batch 4.3 release notes)

그 전 버전까지 (~4.2.x)는 JpaCursorItemReader가 없었음을 의미하는데요.
HibernateCursorItemReader는 존재하는데, 왜 JpaCursorItemReader는 여태 없었던 것이지? 라고 의문이 들 수 있습니다.

이는 JPA 스펙 때문인데, JPA 2.1 전까지는 데이터 스트리밍이 가능한 스펙이 별도로 없었습니다.

그래서 Hibernate의 상태 비저장 세션 (StatelessSession)과 유사한 개념이 JPA에는 없어서 Cursor 기능을 구현할 수 없었습니다.

상태 비저장 세션 (StatelessSession) 은 Hibernate에서만 지원하는 모드로서 1차/2차 캐시가 없고 상태가 없는 세션 (Session) 모드를 이야기합니다.
가장 Jdbc와 유사한 형태의 데이터 조회가 가능하여 일반적으로 데이터베이스에서 데이터를 스트리밍할때 주로 사용됩니다.

그러다 JPA 2.2부터 드디어 Query#getResultStream() 가 도입되어 이런 데이터 스트리밍이 가능하게 되었는데요.
(Add ability to stream the result of a query execution)

JPA 2.2 스펙 도입이 예전에 되었지만, 스프링 배치에서는 최근에서야 이 부분을 적용하게 되어 드디어 스프링 배치 4.3부터 Jpa에도 CursorItemReader가 도입되게 되었습니다.

기본적인 작동원리는 기존의 다른 CursorItemReader (Jdbc/Hibernate)와 다르지 않습니다.

자 그럼 실제 간단한 예제를 통해 JpaCursorItemReader를 배워보겠습니다.

1. 예제

빠르게 JpaCursorItemReader를 활용한 예제 코드를 만들어보겠습니다.

@Slf4j // log 사용을 위한 lombok 어노테이션
@RequiredArgsConstructor // 생성자 DI를 위한 lombok 어노테이션
@Configuration
public class JpaCursorItemReaderJobConfig {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    private int chunkSize;
    @Value("${chunkSize:100}")
    public void setChunkSize(int chunkSize) {
        this.chunkSize = chunkSize;
    }

    @Bean
    public Job jpaCursorItemReaderJob() {
        return jobBuilderFactory.get("jpaCursorItemReaderJob")
                .start(jpaCursorItemReaderStep())
                .build();
    }

    @Bean
    public Step jpaCursorItemReaderStep() {
        return stepBuilderFactory.get("jpaCursorItemReaderStep")
                .<Pay, Pay>chunk(chunkSize)
                .reader(jpaCursorItemReader())
                .writer(jpaCursorItemWriter())
                .build();
    }

    @Bean
    public JpaCursorItemReader<Pay> jpaCursorItemReader() {
        return new JpaCursorItemReaderBuilder<Pay>()
                .name("jpaCursorItemReader")
                .entityManagerFactory(entityManagerFactory)
                .queryString("SELECT p FROM Pay p")
                .build();
    }

    private ItemWriter<Pay> jpaCursorItemWriter() {
        return list -> {
            for (Pay pay: list) {
                log.info("Current Pay={}", pay);
            }
        };
    }
}

기존의 JpaPagingItemReader와 크게 다르지 않는 포맷인데요.
각 설정들이 하는 역할은 다음과 같습니다.

속성 소개 기본값
name 실행 컨텍스트 (ExecutionContext) 내에서 구분하기 위한 Key.
saveStatetrue 로 설정된 경우 필수
entityManagerFactory JPA를 사용하기 위한 EntityManagerFactory
queryString 사용할 JPQL 쿼리문
maxItemCount 조회할 최대 item 수 Integer.MAX_VALUE
currentItemCount 조회 Item의 시작지점 0
saveState 동일 Job 재실행시 실행 컨텍스트 내에서 ItemStream Support의 상태를 유지할지 여부 true

JpaPagingItemReader와 달리 JpaCursorItemReader에는 pageSize 설정이 없고, maxItemCount, currentItemCount 이 추가되었습니다.
Cursor 방식이 스트리밍이기 때문에 한번에 몇개의 데이터를 읽어올지를 결정하는 pageSize 는 Cursor에서는 필요가 없습니다.
그리고 maxItemCount, currentItemCount 의 경우에는 다음과 같은 역할을 하는데요.

예를 들어 아래와 같이 10개의 데이터가 조회되는 JpaCursorItemReader가 있다고 하겠습니다.

Current Pay=Pay(id=1, amount=0, txName=0, txDateTime=2021-01-24T19:36:33.690)
Current Pay=Pay(id=2, amount=1000, txName=1, txDateTime=2021-01-24T19:36:33.735)
Current Pay=Pay(id=3, amount=2000, txName=2, txDateTime=2021-01-24T19:36:33.736)
Current Pay=Pay(id=4, amount=3000, txName=3, txDateTime=2021-01-24T19:36:33.737)
Current Pay=Pay(id=5, amount=4000, txName=4, txDateTime=2021-01-24T19:36:33.738)
Current Pay=Pay(id=6, amount=5000, txName=5, txDateTime=2021-01-24T19:36:33.739)
Current Pay=Pay(id=7, amount=6000, txName=6, txDateTime=2021-01-24T19:36:33.740)
Current Pay=Pay(id=8, amount=7000, txName=7, txDateTime=2021-01-24T19:36:33.740)
Current Pay=Pay(id=9, amount=8000, txName=8, txDateTime=2021-01-24T19:36:33.741)
Current Pay=Pay(id=10, amount=9000, txName=9, txDateTime=2021-01-24T19:36:33.742)

여기서 .maxItemCount(5) 를 추가해서 수행하게 되면 다음과 같이 5개만 최대 조회 됩니다.
즉, .maxItemCount 이란 최대로 조회할 데이터 갯수를 설정하는 것입니다.

Current Pay=Pay(id=1, amount=0, txName=0, txDateTime=2021-01-24T19:38:39.569)
Current Pay=Pay(id=2, amount=1000, txName=1, txDateTime=2021-01-24T19:38:39.616)
Current Pay=Pay(id=3, amount=2000, txName=2, txDateTime=2021-01-24T19:38:39.617)
Current Pay=Pay(id=4, amount=3000, txName=3, txDateTime=2021-01-24T19:38:39.618)
Current Pay=Pay(id=5, amount=4000, txName=4, txDateTime=2021-01-24T19:38:39.619)

이 외에 .currentItemCount(2) 를 추가하게 되면 다음과 같이 .currentItemCount 지정값 다음부터 데이터를 조회하게 됩니다.

  • .maxItemCount(5)
  • .currentItemCount(2)
Current Pay=Pay(id=3, amount=2000, txName=2, txDateTime=2021-01-24T19:35:28.344)
Current Pay=Pay(id=4, amount=3000, txName=3, txDateTime=2021-01-24T19:35:28.345)
Current Pay=Pay(id=5, amount=4000, txName=4, txDateTime=2021-01-24T19:35:28.346)
  • .maxItemCount(5)를 통해 최대 5개를 조회하도록 제한 뒤,
  • .currentItemCount(2) 를 통해 총 읽어야할 데이터 중 시작지점을 어디로 할지

각각의 설정들을 알아보았으니, 이제 테스트 코드로 검증을 해보겠습니다.

2. 테스트 코드

전체 코드는 Github에 있습니다.

Junit5를 통해 테스트 코드를 작성합니다.

@ExtendWith(SpringExtension.class)
@SpringBatchTest
@SpringBootTest(classes = {JpaCursorItemReaderJobConfig.class, TestBatchConfig.class})
public class JpaCursorItemReaderJobConfigTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private PayRepository payRepository;

    @AfterEach
    public void tearDown() throws Exception {
        payRepository.deleteAllInBatch();
    }

    @SuppressWarnings("Duplicates")
    @Test
    void JPA_Cursor_조회() throws Exception {
        //given
        for (long i = 0; i < 10; i++) {
            payRepository.save(new Pay(i * 1000, String.valueOf(i), LocalDateTime.now()));
        }

        JobParameters jobParameters = jobLauncherTestUtils.getUniqueJobParametersBuilder()
                .addString("version", "1")
                .toJobParameters();

        //when
        JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);

        //then
        assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
    }
}

총 10개의 pay 엔티티를 등록하고, 이들이 잘 노출되는지 검증하는 것인데요.
수행해보시면, 아래와 같이 Cursor용 select쿼리와 writer 로그를 볼 수 있습니다.

Hibernate: select pay0_.id as id1_0_, pay0_.amount as amount2_0_, pay0_.tx_date_time as tx_date_3_0_, pay0_.tx_name as tx_name4_0_ from pay pay0_
Current Pay=Pay(id=1, amount=0, txName=0, txDateTime=2021-01-24T19:36:33.690)
Current Pay=Pay(id=2, amount=1000, txName=1, txDateTime=2021-01-24T19:36:33.735)
Current Pay=Pay(id=3, amount=2000, txName=2, txDateTime=2021-01-24T19:36:33.736)
Current Pay=Pay(id=4, amount=3000, txName=3, txDateTime=2021-01-24T19:36:33.737)
Current Pay=Pay(id=5, amount=4000, txName=4, txDateTime=2021-01-24T19:36:33.738)
Current Pay=Pay(id=6, amount=5000, txName=5, txDateTime=2021-01-24T19:36:33.739)
Current Pay=Pay(id=7, amount=6000, txName=6, txDateTime=2021-01-24T19:36:33.740)
Current Pay=Pay(id=8, amount=7000, txName=7, txDateTime=2021-01-24T19:36:33.740)
Current Pay=Pay(id=9, amount=8000, txName=8, txDateTime=2021-01-24T19:36:33.741)
Current Pay=Pay(id=10, amount=9000, txName=9, txDateTime=2021-01-24T19:36:33.742)

마무리

JpaCursorItemReader를 통해 HQL이 아닌 JPQL로도 데이터 스트리밍 배치를 구현할 수 있게 되었습니다.
Cursor를 이용하여 1) 데이터 변경에 무관한 무결성 조회 2) 페이징 보다 높은 성능 의 배치 조회가 가능합니다.
단, 페이징과 달리 타임아웃이 굉장히 길어야하니 이 점은 주의해야겠죠?

스프링 배치는 여전히 발전중이라서, 이후에도 추가되는 기능 중 많은 분들이 도움 될만한 요소가 있다면 공유하겠습니다.


반응형