Java Stream Api를 활용해 OOM 해결, 성능 개선
사내 서비스에서는 상품을 검색할 때 사용하는 자동완성 데이터를 생성해주는 스프링부트 서버(이하 자동완성 키워드 전처리 서버)가 있는데, 스케쥴링을 통해 주기적으로 동작합니다.
어느 날 자동완성 키워드 전처리 서버에서 OOM 오류가 발생했습니다.
GC 오버헤드로 인한 OOM의 원인은 다음과 같습니다.
1. Garbage Collector 가 너무 빈번하게 일어나서 오버헤드가 발생하는 경우,
2. Java 프로세스가 GC를 수행하는 시간의 98%를 소비했지만, Heap 메모리의 2% 미만으로 복구되는 경우,
3. 응용 프로그램이 사용 가능한 거의 모든 메모리를 소진했고, GC가 메모리를 정리하는데 너무 많은 시간과 반복적 실패를 했을 경우
출처 : https://shs2810.tistory.com/69
GC Overhead Limit Exceeded 발생..
웹서비스 운영 담당하고있으면 장소와 시간의 제약없이 핸드폰이 울린다.. 여러 Exception들이나 무수한 에러들, 여러 서버의 상태 등등 평소와 다른 일반적이지 않은 상태에 대한 내용이 핸드폰
shs2810.tistory.com
OOM이 발생하는 코드는 JDBC의 ResultSet에서 가져온 모든 데이터를 map에 담는 로직이었었는데, 중간중간에 String 객체를 너무 많이 생성하는 등 GC 수행은 많아지고, 힙 메모리는 점차 줄어드는 상황이었습니다.
3월 1일과 3월 29일에 수집한 자동완성 데이터의 개수를 비교했을 때, 시간이 지남에 따라 키워드 수가 지속적으로 증가하여 서버가 수용할 수 있는 한계에 도달한 것을 확인할 수 있었습니다.
이 문제를 해결하기 위한 몇 가지 방안을 고려했습니다.
첫 번째 방법은 서버의 힙 메모리를 증가시키는 것으로, 이는 단기적인 해결책일 수 있으나 장기적으로 동일한 문제가 재발할 가능성이 높습니다.
두 번째 방법은 데이터를 페이징하여 쿼리하는 것으로, 이 방식은 DB 서버에 부하를 줄 수 있으며 쿼리마다 새로운 커넥션과 statement 객체를 생성해야 한다는 단점이 있습니다.
세 번째 방법은 Out Of Memory(OOM) 오류의 주요 원인인 한 번에 모든 값을 map에 저장하는 대신, 여러 연산을 연결하여 처리하는 파이프라이닝 기법을 적용하는 것입니다.
첫 번째 방법의 경우에는 당장에는 오류가 해결되겠지만 근 시일 내에 다시 문제가 발생하는 것이 자명하기 때문에, 세 번째 방법을 선택했고, 자바의 스트림 API를 활용했습니다.
또한, 비즈니스 로직에서 사용되는 String 객체 대신 StringBuilder를 사용함으로써 가비지 컬렉션(GC)의 수행 횟수를 최소화하여 성능을 개선했습니다.
이를 구현하기 위해, ResultSet의 각 row를 순차적으로 처리할 수 있도록 Spliterator 인터페이스를 커스텀 구현했습니다.
개선된 접근 방식과 Spliterator 구현 예제는 다음과 같습니다
private static class CustomSpliterator<T> implements Spliterator<T> {
private int rowNum = 0;
private final ResultSet rs;
private final RowMapper<T> rowMapper;
public CustomSpliterator(ResultSet rs, RowMapper<T> rowMapper) {
this.rs = rs;
this.rowMapper = rowMapper;
}
public static <T> Stream<T> queryForStream(DataSource dataSource, String sql, RowMapper<T> rowMapper) throws SQLException {
Connection connection = DataSourceUtils.getConnection(dataSource);
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sql);
return StreamSupport.stream(new CustomSpliterator<>(rs, rowMapper), false).onClose(() -> {
JdbcUtils.closeResultSet(rs);
JdbcUtils.closeStatement(stmt);
DataSourceUtils.releaseConnection(connection, dataSource);
});
}
@Override
public boolean tryAdvance(Consumer<? super T> consumer) {
try {
if (rs.next()) {
consumer.accept(rowMapper.mapRow(rs, rowNum++));
return true;
} else {
return false;
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize() {
return 0;
}
@Override
public int characteristics() {
return ORDERED;
}
}
tryAdvance 메서드의 경우 ResultSet에서 다음 row가 존재할 경우에 RowMapper를 반환하도록 하여, 중간 연산으로 RowMapper 객체에 대한 작업이 가능하도록 구현했습니다.
사용 방법은 다음과 같습니다.
//EmployeeRepository.java
public <T> Stream<T> resultSetStream(RowMapper<T> rowMapper) {
String sql = "SELECT * FROM employee";
try {
return CustomSpliterator.queryForStream(dataSource, sql, rowMapper);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
.
.
.
.
//EmployeeService.java
public void findAllEmployees(String filePath) {
fileWriteService.writeHeader(EmployeeDto.class, filePath);
employeeRepositoryV3.resultSetStream((rs, rowNum) -> new Employee(
rs.getString("first_name"),
rs.getString("last_name"),
rs.getString("email"),
rs.getString("department"),
rs.getDouble("salary"),
rs.getDate("hire_date")))
.map(EmployeeDto::create)
.forEach(employeeDto -> fileWriteService.writeBody(EmployeeDto.class, employeeDto, filePath));
}
ResultSet에서 하나의 row를 받아올 때 마다 중간연산으로 필요한 객체를 생성해, 최종 연산에서 파일을 생성해줍니다.
이 방법을 사용했을 경우 이전과 다르게 ResulSet의 모든 row를 서버 메모리에 저장할 필요가 없고, 가독성이 좋은 코드를 작성할 수 있습니다.
좀 더 다양한 기능을 추가한 커스텀한 Spliterator의 코드는 다음과 같습니다.
사실 queryForStream 메서드의 경우, JdbcTemplate에서 똑같은 기능을 지원해주는 메서드가 있습니다. 하지만 커스텀하게 구현했을 경우 개발자가 추가적으로 필요한 기능을 넣어줄 수 있습니다.
이 예제에서는 queryForStream 메서드를 커스텀하여, Consumer 함수형 인터페이스를 파라미터로 받아 특정 지점(pinPoint)에서 원하는 동작을 실행할 수 있도록 확장했습니다.
예제는 다음과 같습니다.
public void writeFileByInterface(String filePath) {
fileWriteService.writeHeader(EmployeeDto.class, filePath);
employeeRepositoryV4.resultSetStream((rs, rowNum) -> new Employee(
rs.getString("first_name"),
rs.getString("last_name"),
rs.getString("email"),
rs.getString("department"),
rs.getDouble("salary"),
rs.getDate("hire_date")
), rowNum -> log.info("현재 {}개 처리되었습니다.", rowNum))
.map(EmployeeDto::create)
.forEach(employeeDto -> fileWriteService.writeBody(employeeDto, filePath));
}
.
.
.
.
public static <T> Stream<T> queryForStream(DataSource dataSource,
String sql,
RowMapper<T> rowMapper,
Consumer<Integer> listener) throws SQLException {
ComfortableConnection connection = new ComfortableConnection(dataSource, sql);
return StreamSupport.stream(new CustomSpliterator<>(connection.resultSet, rowMapper)
.pinned(listener), false)
.onClose(connection::close);
}
이전의 코드와 달라진 점은, queryForStream의 파라미터로 Consumer 함수형 인터페이스를 받을 수 있는 점입니다.
구현한 CustomSpliterator의 전체 코드를 먼저 확인하겠습니다.
private static class CustomSpliterator<T> implements Spliterator<T> {
private static final int DEFAULT_BULK_POINT = 10_000;
private int bulkPoint = DEFAULT_BULK_POINT;
private int rowNum = 0;
private Consumer<Integer> listener;
private final ResultSet rs;
private final RowMapper<T> rowMapper;
public CustomSpliterator(ResultSet rs, RowMapper<T> rowMapper, int bulkPoint) {
this.rs = rs;
this.rowMapper = rowMapper;
this.bulkPoint = bulkPoint;
}
public CustomSpliterator(ResultSet rs, RowMapper<T> rowMapper) {
this.rs = rs;
this.rowMapper = rowMapper;
}
public EmployeeRepositoryV4.CustomSpliterator<T> pinned(Consumer<Integer> listener) {
this.listener = listener;
return this;
}
public static <T> Stream<T> queryForStream(DataSource dataSource,
String sql,
RowMapper<T> rowMapper,
Consumer<Integer> listener) throws SQLException {
ComfortableConnection connection = new ComfortableConnection(dataSource, sql);
return StreamSupport.stream(new CustomSpliterator<>(connection.resultSet, rowMapper)
.pinned(listener), false)
.onClose(connection::close);
}
.
.
.
private static class ComfortableConnection {
.
.
.
}
@Override
public boolean tryAdvance(Consumer<? super T> consumer) {
try {
if (rs.next()) {
T t = rowMapper.mapRow(rs, rowNum++);
consumer.accept(t);
if (rowNum > 0 && rowNum % bulkPoint == 0) {
listener.accept(rowNum);
}
return true;
} else {
listener.accept(rowNum);
return false;
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public Spliterator<T> trySplit() {
return null;
}
@Override
public long estimateSize() {
return 0;
}
@Override
public int characteristics() {
return ORDERED;
}
}
구현된 CustomSpliterator의 주요 변경점은 추가된 필드와 메서드로, Consumer 함수형 인터페이스를 사용하여 지정된 시점마다 특정 메서드를 실행할 수 있게 구성되어 있습니다.
private static final int DEFAULT_BULK_POINT = 10_000;
private int bulkPoint = DEFAULT_BULK_POINT;
.
.
.
@Override
public boolean tryAdvance(Consumer<? super T> consumer) {
try {
if (rs.next()) {
T t = rowMapper.mapRow(rs, rowNum++);
consumer.accept(t);
if (rowNum > 0 && rowNum % bulkPoint == 0) {
listener.accept(rowNum);
}
return true;
} else {
listener.accept(rowNum);
return false;
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
.
.
.
public void writeFileByInterface(String filePath) {
fileWriteService.writeHeader(EmployeeDto.class, filePath);
employeeRepositoryV4.resultSetStream((rs, rowNum) -> new Employee(
rs.getString("first_name"),
rs.getString("last_name"),
rs.getString("email"),
rs.getString("department"),
rs.getDouble("salary"),
rs.getDate("hire_date")
), rowNum -> log.info("현재 {}개 처리되었습니다.", rowNum))
.map(EmployeeDto::create)
.forEach(employeeDto -> fileWriteService.writeBody(employeeDto, filePath));
}
tryAdvance 메서드를 살펴보면, 새로운 조건문이 포함된 것을 볼 수 있습니다.
이를 통해 resultSetStream에 rowNum을 로깅하는 새로운 파라미터를 추가하였고, 이 조건문이 참이 될 때마다 로그를 출력하도록 설정되어 있습니다.
간단히 말해, 10,000건마다 로그를 출력하도록 구성된 것입니다.
이러한 커스텀 구현을 통해, 표준 기능을 넘어서는 유연성과 확장성을 얻을 수 있습니다.
Spliterator 인터페이스를 커스텀하게 구현하는 것 외에도, 두 가지 방법을 통해 성능을 개선했습니다.
자동완성 전처리 서버에서는 동일한 쿼리를 사용하여 두 가지 파일을 생성합니다.
하나는 Elasticsearch 인덱스 구축을 위한 dump 파일이고, 다른 하나는 다른 서버에서 활용하기 위한 JSON 파일입니다.
원래 이 두 파일은 순차적으로 생성되도록 코드가 구현되어 있었으나, I/O 작업의 효율성을 높이기 위해 멀티 쓰레드를 활용하여 동시에 진행하도록 리팩토링하였습니다.
또한, 쿼리 실행 시간이 긴 점을 고려하여 전처리가 시작될 때 쿼리를 비동기적으로 수행하도록 개선했습니다. 이러한 변경을 통해 전체적인 성능을 향상시켰습니다.
개선한 방식은 다음과 같습니다.
ExecutorService executorService = Executors.newFixedThreadPool(5);
.
.
.
try(Connection conn = databaseConnector.getConn();
PreparedStatement preparedStatement = conn.prepareStatement(selectSql)) {
// 메서드 실행 시점에 곧바로 별개의 쓰레드로 쿼리를 수행함.
// dump, json 파일을 만드는데 각각의 ResultSet이 필요
CompletableFuture<ResultSet[]> resultSetFuture = CompletableFuture.supplyAsync(() -> {
try {
ResultSet[] resultSets = {preparedStatement.executeQuery(), preparedStatement.executeQuery()};
return resultSets;
} catch (SQLException e) {
logger.error("sql 수행 도중 오류 발생");
job.setStatus(IndexJobRunner.STATUS.ERROR.name());
}
}, executorService);
.
.
.
// dump파일, json파일 동시 생성
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> makeDump(resultSetFuture, fastcatSavePath, acKeywordValueSet, standardCount, accKeywordResultMap), executorService),
CompletableFuture.runAsync(() -> makeJson(resultSetFuture, elasticSavePath, acKeywordValueSet, acKeyword, standardCount), executorService)
).join();
개선 결과는 다음과 같습니다.
↓ ↓ ↓ ↓
아래의 경우 9시 35분에 수동으로 수행한 결과입니다. cpu의 사용량이 현저히 줄어든 것을 알 수 있습니다.
↓ ↓ ↓ ↓
소요 시간 또한 크게 줄어든 것을 확인할 수 있습니다.