Spring Kafka · open · depends on #4505
PR #4512 — GH-4504: Bound async retry to linear delivery count per record
PR #4505가 데이터 유실은 잡았으나 같은 파티션에 항상 실패하는 N개 레코드가 들어오면 suspend 리스너의 invocation 수가 N·(N+2) (quadratic)로 증가하는 head-of-line amplification이 남아 있었습니다. 메인테이너 요청으로 PR을 분리, 본 PR에서 asyncRetryOffsets + seenMultiAttemptRetry 게이트 + manual seek로 invocation을 N·(n+2)−1 (linear)로 bound. @RetryableTopic 시나리오(publish-and-done recoverer) 회귀 없이.
맥락
PR #4505 (데이터 유실 fix)가 머지되기 전 reporter가 burst 측정을 보내옴 — DefaultErrorHandler(FixedBackOff(100ms, 2))로 N개 always-failing 레코드를 한 파티션에 보냈을 때 suspend 리스너의 총 invocation이 blocking 리스너의 N·(n+1)과 다르게 N·(N+2)로 증가 (N=20에서 60 vs 440). 원인: 데이터 유실 fix는 head가 retry 중일 때 같은 파티션의 뒤 레코드를 deque에서 drop만 하지, dispatch loop의 listener invocation은 막지 않음 → 매 retry cycle마다 컨슈머가 seek-back으로 전체 backlog를 re-poll하면서 listener를 다시 호출.
이전 시도와 회귀
이전 commit 48d486bc가 같은 문제를 풀려 했으나 recovery 시점에 offsetsInThisBatch[tp]를 wholesale wipe하면서 DeadLetterPublishingRecoverer 기반 Async{CompletableFuture,Mono}RetryTopicScenarioTests(allFailCaseTest, oneLongSuccessMsgBetween49ShortFailMsg)의 in-flight ack tracking이 깨져 669a66e4로 revert. 본 PR은 wholesale wipe 없이 individual removeOffsetsInBatch만 사용.
수정
asyncRetryOffsets: Map<TopicPartition, Long>— partition별 in-flight retry offset 추적.RecordInRetryException이 throw됐을 때만 채워짐.seenMultiAttemptRetry플래그 — 첫RecordInRetryException에서 켜짐. dispatch loop의 sync-failure detection이asyncRetryOffsets을 populate하기 전 이 게이트를 확인 → publish-and-done recoverer(RetryTopic경로)는RecordInRetryException을 던지지 않아 게이트가 false로 유지, skip 자체가 발동 안 됨.doInvokeWithRecords— listener invoke 전에shouldSkipForAsyncRetry로 partition의 in-flight retry offset보다 큰 offset 레코드를 skip +removeOffsetsInBatch(List.of(cRecord))로 out-of-commit container ack pause 방지.handleAsyncFailure— handler가 normal return하고asyncRetryOffsets[tp]가 그 offset이면 head가 recover된 것 →consumer.seek(tp, offset + 1)로 컨슈머 position을 다음 레코드로 advance,asyncRetryOffsets.remove(tp).
측정
DefaultErrorHandler(FixedBackOff(100ms, 2)), N개 always-failing 레코드, 단일 파티션. ✓ = 로컬 측정, ✱ = 인접 측정점의 공식 예측.
N blocking N·(n+1) this PR (suspend) #4505만 적용 (suspend)
5 15 19 ✓ 35
10 30 39 ✓ 120
15 45 59 ✱ 255
20 60 79 ✓ 440
Linear in N at N·(n+2)−1. non-head 레코드당 +1 extra delivery는 iter-0 dispatch invocation (게이트가 첫 RecordInRetryException에서 켜지기 직전). 이걸 더 줄이려면 BackOff를 설정 시점에 introspect하거나 per-poll parallelism을 포기해야 함 — "record당 +1"에서 라인을 그음.
회귀 테스트
EnableKafkaKotlinCoroutinesTests#test suspend function bounded burst stays linear with N 추가 (N=10). 총 listener invocation이 N·(n+2)−1 = 39인지 검증. dispatch-loop skip 없으면 N·(N+2) = 120이 나와 assertion 실패. 같은 파일의 two-record 테스트 assertion도 r2 = 5 → r2 = 4로 조정 (iter-0 dispatch + own n+1 retry cycle).
RetryTopic 시나리오
@RetryableTopic 경로는 RecordInRetryException을 던지지 않아 seenMultiAttemptRetry 게이트가 false 유지 → asyncRetryOffsets이 빈 채로 남고 dispatch-loop skip 자체가 발동 안 됨. Async{CompletableFuture,Mono}RetryTopicScenarioTests 7개 + AsyncMonoRetryTopicComplexScenarioTests + RetryTopicIntegrationTests 등 회귀 없음. 전체 :spring-kafka:test sweep 통과.