Spring Kafka · open

PR #4505 — GH-4504: Fix silent record loss with two failing async records on one partition

suspend @KafkaListener + DefaultErrorHandler에서 같은 파티션의 두 레코드가 같이 비동기 실패하면 earlier-offset 레코드(r1)가 한 번만 invoke되고 recoverer에 도달하지 못해 silent 유실되는 회귀(#4504). offset 오름차순 정렬 후 partition-in-retry set으로 head 외 레코드를 큐에서 drop하는 fix를 PR로 제출. PR #4469의 후속.

증상

같은 파티션에 항상 실패하는 두 레코드 r1(offset 0), r2(offset 1)를 보낸 경우:

  • r1은 1회만 delivered, retried 0회, recoverer 호출 안 됨 — DeadLetterPublishingRecoverer가 있어도 DLT 도달 못 함.
  • r2 recover 후 committed offset이 2로 advance — r1 영구 유실.

원인

ListenerConsumer#handleAsyncFailurefailedRecords를 도착 순서로 하나씩 꺼내 handleRemaining(rte, List.of(cRecord), ...)를 호출. SeekUtils.doSeeks는 호출마다 자기 record의 partition·offset을 seek 맵에 등록 → 두 번째 호출의 r2 seek이 첫 번째 호출의 r1 seek을 덮어씀 → 다음 poll에서 컨슈머가 r2.offset으로 점프 → r1 건너뜀.

수정

handleAsyncFailure의 snapshot을 (topic, partition, offset)으로 정렬. 같은 파티션에서 RecordInRetryException이 throw된 직후, 같은 파티션의 더 높은 offset 레코드들은 이번 iter에서 큐 claim만 하고 처리는 skip — 등록된 seek이 다음 poll에서 자연 재배달하도록 맡김. partitionsInRetry set으로 iter 내 상태 추적. 재배달 시 fresh FailedRecordTuple이 콜백으로 다시 큐에 들어오기 때문에 re-queue 없이도 attempt 카운터가 정상 누적 — GH-4465의 no-re-queue 불변량 유지.

회귀 테스트

EnableKafkaKotlinCoroutinesTests에 2-record 시나리오 추가. r1·r2 둘 다 recoverer 도달 + committed offset이 두 레코드 너머 advance 검증. 픽스 없이는 recoverer latch가 timeout으로 fail, 픽스 후엔 통과.

스코프 및 트레이드오프

데이터 유실은 잡혔으나 head-of-line amplification (later-offset 레코드의 invocation 수가 (N+2)/(n+1) 배로 증가)은 본 PR 스코프에서 제외. 엄격한 per-record N+1을 원하면 @RetryableTopic 권장 — non-blocking retry 모델로 분리.

관련 자료