Spring
Spring Batch - 청크 기반 프로세스
- -
Chunk
- 여러 개의 아이템을 묶은 하나의 덩어리를 의미합니다.
- 한 번에 하나씩 아이템을 입력받아 Chunk 단위의 덩어리로 만든 후 Chunk 단위로 트랜잭션을 처리합니다.
- Chunk 단위로 Commit과 Rollback
- Chunk <I>
Chunk<I>, Chunk <O>
- Chunk <I>
- ItemReader로 읽은 하나의 아이템을 Chunk 크기만큼 반복해서 저장하는 타입
- Chunk <O>
- ItemReader로부터 전달받은 Chunk <I>를 참조해서 ItemProcessor에서 적절하게 가공한 뒤 ItemWriter로 전달되는 타입
- 여기서 O는 Processor가 없다면 ItemReader로부터 전달받는 타입, Processor가 있다면 Processor로부터 전달받는 타입입니다.
아키텍처
- ItemReader가 Source를 한 건씩 읽고 한 건씩 Chunk크기만큼 Chunk <I>에 저장합니다.
- Chunk 크기만큼 쌓였다면 Chunk <I>를 ItemProcessor에 전달합니다.
- ItemProcessor는 전달받은 Chunk를 적절하게 가공해서 Chunk <O>에 저장합니다.
- Chunk <O>를 ItemWriter에 전달합니다.
- itemWriter는 데이터를 쓰기 작업합니다.
ItemReader와 ItemProcessor는 각각의 하나씩 아이템을 처리하지면 ItemWriter는 Chunk 크기만큼을 한 번에 일괄 처리합니다.
ChunkOrientedTasklet
- 스프링 배치에서 제공하는 Tasklet 구현체로 Chunk 지향 프로세싱을 담당하는 도메인 객체입니다.
- ItemReader, ItemWriter, ItemProcessor를 사용해 Chunk 기반 데이터 입출력을 담당합니다.
- TaskletStep에 의해서 반복적으로 실행되며, ChunkOrientedTasklet이 실행될 때마다 매번 새로운 트랜잭션이 생성되어 처리됩니다.
- exception이 발생할 경우, 해당 Chunk는 롤백되며 이전에 커밋한 Chunk는 완료 상태가 유지됩니다.
- 내부적으로 ItemReader를 핸들링하는 ChunkProvider와 ItemProcessor, ItemWriter를 핸들링하는 ChunkProcessor 타입의 구현체를 갖습니다.
ChunkProvider
- ItemReader를 사용해서 소스로부터 아이템을 Chunk size만큼 읽어서 Chunk단위로 만들어 제공하는 도메인 객체
- Chunk <I>를 만들고 내부적으로 반복문을 사용해서 ItemReader.read()를 계속 호출하면서 item을 Chunk <I>에 쌓습니다.
- Chunk size만큼 item을 읽으면 반복문이 종료되고 ChunkProcessor로 넘깁니다.
- ItemReader가 읽은 item이 null일 경우 read 반복문이 종료되고 해당 Step의 반복문도 종료됩니다.
- 외부로부터 ChunkProvider이 호출될 때마다 새로운 Chunk를 생성합니다.
ChunkProcessor
- ItemProcessor를 사용해서 Item을 변형, 가공, 필터링하고 ItemWriter를 사용해서 Chunk 데이터를 저장, 출력합니다.
- Chunk <O>를 만들고 앞에서 넘어온 Chunk <I>의 item을 한 건씩 itemProcessor를 통해 처리한 후 Chunk <O>에 저장합니다.
- 만약 ItemProcessor가 존재하지 않는다면 바로 Chunk<O>에 저장합니다.
- ItemProcessor 처리가 완료되면 Chunk<O>에 있는 List <item>을 ItemWriter에게 전달합니다.
- ItemWriter 처리가 완료되면 Chunk 트랜잭션이 종료되고 Step 반복문에서는 다시 ChunkOrientedTasklet가 새롭게 실행됩니다.
- ItemWriter는 Chunk size만큼 데이터를 커밋하기 때문에 Chunk size는 곧 Commit Interval(커밋 간격)이 됩니다.
실행 순서
- TaskletStep이 execute 메서드로 ChunkOrientedTasklet를 호출합니다.
- ChunkOrientedTasklet는 provide 메서드로 ChunkProvider를 호출합니다.
- ChunkProvider는 ItemReader에게 Item을 한 건씩 read 하도록 지시합니다.
- 이 과정이 Chunk size만큼 반복됩니다.
- ChunkOrientedTasklet는 ChunkProcessor에게 읽은 데이터를 가공하라고 명령합니다.
- ChunkProcessor는 ItemProcessor에게 명령하고 ItemProcessor는 전달된 아이템 개수만큼 반복하여 가공합니다.
- ChunkProcessor는 가공된 아이템을 ItemWriter에 전달합니다.
- ItemWriter는 저장하는 등 쓰기 처리를 합니다.
- 이것이 하나의 Chunk Size 사이클로 이후 다시 ChunkOrientedTasklet에 가서 읽을 Item이 없을 때까지 반복합니다.
ItemReader
- 다양한 입력으로부터 데이터를 읽어서 제공하는 인터페이스입니다.
- 플랫 파일 - csv, txt
- XML, Jsono
- Database
- Message Queuing 서비스
- Custom reader
- 다수의 구현체들이 ItemReader와 ItemStream 인터페이스를 동시에 구현하고 있습니다.
- ItemStream은 파일 스트림 연결 종료, DB 커넥션 연결 종료 등의 장치 초기화 등의 작업에 사용됩니다.
- ExecutionContext에 read와 관련된 여러 가지 상태 정보를 저장해 두고 재시작 시 참조됩니다.
- ChunkOrientedTasklet 실행 시 필수적 요소로 설정해야 합니다.
- T read()
- 입력 데이터를 읽고 다음 데이터로 이동합니다.
- 아이템 하나를 리턴하며 더 이상 아이템이 없는 경우 null 리턴합니다.
- 아이템 하나는 파일의 한 줄, DB의 한 row, XML 파일에서 하나의 엘리먼트를 의미합니다.
- 더 이상 처리해야 할 item이 없어도 예외가 발생하지 않고 itemProcessor와 같은 다음 단계로 넘어갑니다.
ItemWriter
- Chunk 단위로 데이터를 받아 일괄 출력 작업을 위한 인터페이스입니다.
- 플랫 파일 - csv, txt
- XML, Jsono
- Database
- Message Queuing 서비스
- Mail Service
- Custom reader
- 다수의 구현체들이 itemReader와 같은 맥락으로 itemWriter와 ItemStream을 동시에 구현하고 있습니다.
- 하나의 아이템이 아닌 아이템 리스트를 전달받아 수행합니다.
- ChunkOrientedTasklet 실행 시 필수적 요소로 설정해야 합니다.
- void write()
- 출력 데이터를 아이템 리스트로 받아서 처리합니다.
- 출력이 완료되고 트랜잭션이 종료되면 새로운 Chunk 단위 프로세스로 이동합니다.
ItemProcessor
- 데이터를 출력하기 전에 데이터를 가공 및 필터링 역할을 하는 인터페이스입니다.
- ItemReader 및 ItemWriter와 분리되어 비즈니스 로직을 구현할 수 있습니다.
- ItemReader로부터 받은 아이템을 특정 타입으로 변환해서 ItemWriter에 넘겨줄 수 있습니다.
- Itemreader로부터 받은 아이템들 중 필터과정을 거쳐서 원하는 아이템들만 ItemWriter로 넘겨줄 수 있습니다.
- ChunkOrientedTasklet 실행 시 선택적 요소기 때문에 필수 요소는 아닙니다.
- O process()
- I 제네릭은 ItemReader에서 받을 데이터 타입
- O 제네릭은 ItemWriter에게 보낼 데이터 타입
- 아이템을 하나씩 가공 처리하며 null을 리턴할 경우 해당 아이템은 Chunk <O>에 저장되지 않습니다.
- ItemStream을 구현하지 않고 거의 대부분 Customizing 해서 사용하기 때문에 기본적으로 제공되는 구현체가 적습니다.
ItemStream
- ItemReader와 ItemWriter 처리 과정 중 상태를 저장하고 오류가 발생하여 재시작 시 해당 상태를 참조하여 실패한 곳부터 재시작하도록 지원합니다.
- 리소스를 열고(open) 닫아야(close) 하며 입출력 장치 초기화 등의 작업을 해야 하는 경우 사용합니다.
- 대부분의 구현체는 다 만들어져 있기 때문에 구현체를 사용한다면 직접 구현할 일은 없습니다.
- open과 update 메서드에서 ExecutionContext를 인수로 받는데 이는 상태 정보를 ExecutionContext에 업데이트해두고 재시작 시 open에서 해당 정보를 가져와서 사용하기 때문입니다.
- 예를 들어, 총 10개의 데이터를 Chunk 5 단위로 진행한다면 총 2번의 update가 발생합니다. 만약 9번째 데이터를 읽는 과정에서 문제가 발생하면 첫 번째 청크 커밋은 완료가 된 상태로 재시작 시에는 open에서 ExecutionContext에서 정보를 가져와 6번째 데이터부터 다시 시작할 수 있습니다.
- Stream이 구현된 ItemReader와 ItemWriter를 직접 만들려면 ItemStreamReader, ItemStreamWrtier 인터페이스를 구현하면 됩니다.
청크 기반 프로세스 아키텍처
- Job을 실행하면 TaskletStep이 실행됩니다.
- Tasklet은 내부에 RepeatTemplate라는 반복기를 가지고 있어 ChunkOrientedTasklet을 반복합니다.
- ChunkOrientedTasklet이 실행될 때 스프링 배치는 Transaction 경계를 생성합니다.
- Chunk 단위로 작업을 시작합니다.
- SimpleChunkProvider도 내부적으로 RepeatTmplate 반복기를 갖고 있어 ItemReader을 Chunk size만큼 반복시켜서 데이터를 읽습니다.
- Chunk Size만큼 읽고 읽은 아이템이 담긴 Chunk<I>를 SimpleChunkProcessor에 넘깁니다.
- SimpleChunkProcessor는 전달받은 Chunk 데이터를 한 건씩 읽어서 ItemProcessor로 데이터를 가공하여 Chunk<O>에 저장합니다.
- ItemWriter에게 Chunk가 갖고 있는 List값을 전달하고 ItemWriter는 출력 처리를 합니다.(트랜잭션 커밋)
- 이 과정이 청크 단위로 반복되고 ItemReader에서 null 값을 읽을 때 반복 작업이 끝나게 됩니다.
중간에 예외가 발생한다면 트랜잭션 롤백이 발생하고 작업이 중단되며, ItemReader에서 null값을 읽어오게 된다면 RepeatStatus.FINISHED를 통해 현재 작업을 마지막으로 다음부터는 반복 작업이 일어나지 않게 됩니다. Chunk 단위마다 새로운 트랜잭션 이 생성되고 커밋되는 과정이 존재하는 것을 알아둡시다.
'Spring' 카테고리의 다른 글
Spring Batch - ItemWriter (0) | 2024.01.28 |
---|---|
Spring Batch - ItemReader (0) | 2024.01.28 |
Spring Batch - DB 스키마와 도메인 (0) | 2024.01.24 |
Spring - hikariCP 옵션 정리 및 권장 설정 (0) | 2023.12.31 |
Spring - restTemplate 대용량 파일 업로드, 다운로드 설정 (0) | 2023.12.31 |
Contents
소중한 공감 감사합니다