Polling Publisher#
Polling publisher는 마이크로서비스 아키텍처(MSA)에서 트랜잭셔널 메시징을 구현하는 방법 중 하나이다.
이 패턴은 Transactional Outbox 패턴과 함께 사용되어 데이터 일관성을 유지하면서 메시지를 안정적으로 발행하는 데 도움을 준다.
Polling publisher는 특히 소규모 시스템이나 간단한 구현이 필요한 경우에 적합한 방식이다. 그러나 대규모 시스템이나 실시간성이 중요한 경우에는 Transaction Log Tailing과 같은 다른 방식을 고려할 수 있다.
기본 개념#
Outbox 테이블에 저장된 메시지를 주기적으로 조회(polling)하여 메시지 브로커로 발행한다.
데이터베이스 트랜잭션의 일부로 메시지를 Outbox 테이블에 저장한 후, 별도의 프로세스에서 이를 읽어 발행한다.
작동 방식#
- 애플리케이션이 데이터를 변경할 때 Outbox 테이블에 메시지를 저장한다.
- Polling publisher는 주기적으로 Outbox 테이블을 조회하여 미발행 메시지를 찾는다.
- 발견된 메시지를 메시지 브로커(예: Kafka, RabbitMQ)로 발행한다.
- 발행이 성공하면 해당 메시지를 Outbox 테이블에서 삭제하거나 발행 완료로 표시한다.
- 구현이 비교적 간단하다.
- 대부분의 관계형 데이터베이스와 호환된다.
- 메시지 발행과 데이터베이스 트랜잭션을 분리하여 안정성을 높인다.
- 주기적인 폴링으로 인해 데이터베이스에 추가적인 부하가 발생할 수 있다.
- 실시간성이 떨어질 수 있다(폴링 주기에 따라 지연 발생).
- 메시지 순서 보장이 어려울 수 있다.
구현 시 고려사항#
- 폴링 주기 최적화: 너무 빈번한 폴링은 DB 부하를 증가시키고, 너무 긴 주기는 메시지 지연을 초래할 수 있다.
- 배치 처리: 한 번에 여러 메시지를 처리하여 효율성을 높일 수 있다.
- 오류 처리: 메시지 발행 실패 시 재시도 메커니즘을 구현해야 한다.
- 동시성 제어: 여러 인스턴스가 동시에 폴링할 경우 중복 처리를 방지해야 한다.
구현 예시#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
| class TransactionalPollingPublisher:
def __init__(self, config):
self.database = Database(config['database_url'])
self.message_broker = MessageBroker(config['broker_url'])
self.outbox_table = config['outbox_table']
self.batch_size = config['batch_size']
self.state_manager = StateManager()
async def poll_and_publish(self):
"""트랜잭션 아웃박스 패턴과 결합된 폴링 발행"""
while True:
async with self.database.transaction() as txn:
try:
# 미발행 메시지 조회
messages = await self.get_unpublished_messages(
batch_size=self.batch_size
)
if not messages:
continue
# 메시지 발행
published_ids = []
for message in messages:
success = await self.publish_message(message)
if success:
published_ids.append(message.id)
# 발행 상태 업데이트
if published_ids:
await self.mark_as_published(published_ids)
await txn.commit()
except Exception as e:
await txn.rollback()
await self.handle_publishing_error(e)
finally:
await asyncio.sleep(self.get_adaptive_interval())
async def get_unpublished_messages(self, batch_size):
"""미발행 메시지를 조회합니다"""
query = f"""
SELECT id, payload, created_at
FROM {self.outbox_table}
WHERE published = false
ORDER BY created_at
LIMIT {batch_size}
FOR UPDATE SKIP LOCKED
"""
return await self.database.fetch_all(query)
|
참고 및 출처#