공부 할 겸 로스트아크 API를 만지작 거리고 있었는데 계속 RestTemplate만 쓰다가 WebFlux를 한번 써 보았다..
아래가 문제가 있는 코드였는데 그냥 늘 하듯이 정해진 uri로 get요청을 보내고 받은 json데이터를 dto로 파싱하는 작업이었다. 그런데 계속 unexpected end-of-input오류가 발생하고, 정작 로그로 확인하면 json데이터는 아무런 문제없이 정상적으로 왔었다...
public Flux<String> getCharacterInfo(String characterName) {
return webClient.get()
.uri("/armories/characters/" + characterName+"/profiles")
.retrieve()
.bodyToFlux(DataBuffer.class)
.map(dataBuffer -> {
try {
String stringData = dataBuffer.toString(StandardCharsets.UTF_8);
stringData = stringData.replaceAll("<[^>]*>", "");
CharacterProfileDto dto = objectMapper.readValue(stringData, CharacterProfileDto.class);
return stringData;
} catch (JsonMappingException e) {
throw new RuntimeException(e);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
} finally {
DataBufferUtils.release(dataBuffer);
}
});
}
문제는 WebFlux에서 스트리밍 방식으로 데이터를 처리했기 때문. Mono가 아니라 Flux로 처리하면 JSON데이터가 순차적으로 스트리밍 되는 중간 단계에서 파싱을 시도하기 때문에 JSON이 완전히 로드되지 않아 문제가 발생할 가능성이 크다. 이 경우 JSON 데이터는 완전하지 않아 Unexpected end-of-input과 같은 오류가 발생할 수 있다. bodyToFlux(DataBuffer.class)로 스트리밍된 데이터를 map 으로 처리할 때 JSON 데이터가 조각(fragment) 단위로 들어오기 때문에 JSON 데이터가 불완전한 상태에서 파싱을 시도. 그럼 당연히 파싱 오류가 발생한다.
원인을 알았으니 해결 방법은 당연히 JSON 데이터를 조각 단위로 처리하지 않고, 데이터를 완전히 수집한 후 파싱하면 된다. reduce()를 사용해 Flux<dataBuffer>를 하나의 StringBuilder로 축적하여 JSON데이터를 완전히 수집
DataBufferUtils.release를 호출하여 스트리밍 데이터를 수동으로 해제시켜주고 이제 완전한 JSON데이터를 파싱하면 정상적으로 작동한다.
완전한 하나의 데이터니가 반환형도 Mono로 변경.
public Mono<CharacterProfileDto> getCharacterInfo(String characterName) {
return webClient.get()
.uri("/armories/characters/" + characterName + "/profiles")
.retrieve()
.bodyToFlux(DataBuffer.class)
.reduce(new StringBuilder(), (accumulated, buffer) -> {
String chunk = buffer.toString(StandardCharsets.UTF_8);
DataBufferUtils.release(buffer);
return accumulated.append(chunk);
})
.map(StringBuilder::toString) // StringBuilder를 String으로 변환
.map(jsonString -> {
try {
jsonString = jsonString.replaceAll("<[^>]*>", "");
return objectMapper.readValue(jsonString, CharacterProfileDto.class);
} catch (JsonProcessingException e) {
throw new RuntimeException("JSON 파싱 오류: " + e.getMessage(), e);
}
});
}