Polling Publisher

Polling publisher는 마이크로서비스 아키텍처(MSA)에서 트랜잭셔널 메시징을 구현하는 방법 중 하나이다.
이 패턴은 Transactional Outbox 패턴과 함께 사용되어 데이터 일관성을 유지하면서 메시지를 안정적으로 발행하는 데 도움을 준다.

Polling publisher는 특히 소규모 시스템이나 간단한 구현이 필요한 경우에 적합한 방식이다. 그러나 대규모 시스템이나 실시간성이 중요한 경우에는 Transaction Log Tailing과 같은 다른 방식을 고려할 수 있다.

기본 개념

Outbox 테이블에 저장된 메시지를 주기적으로 조회(polling)하여 메시지 브로커로 발행한다.
데이터베이스 트랜잭션의 일부로 메시지를 Outbox 테이블에 저장한 후, 별도의 프로세스에서 이를 읽어 발행한다.

작동 방식

  • 애플리케이션이 데이터를 변경할 때 Outbox 테이블에 메시지를 저장한다.
  • Polling publisher는 주기적으로 Outbox 테이블을 조회하여 미발행 메시지를 찾는다.
  • 발견된 메시지를 메시지 브로커(예: Kafka, RabbitMQ)로 발행한다.
  • 발행이 성공하면 해당 메시지를 Outbox 테이블에서 삭제하거나 발행 완료로 표시한다.

장점

  • 구현이 비교적 간단하다.
  • 대부분의 관계형 데이터베이스와 호환된다.
  • 메시지 발행과 데이터베이스 트랜잭션을 분리하여 안정성을 높인다.

단점

  • 주기적인 폴링으로 인해 데이터베이스에 추가적인 부하가 발생할 수 있다.
  • 실시간성이 떨어질 수 있다(폴링 주기에 따라 지연 발생).
  • 메시지 순서 보장이 어려울 수 있다.

구현 시 고려사항

  • 폴링 주기 최적화: 너무 빈번한 폴링은 DB 부하를 증가시키고, 너무 긴 주기는 메시지 지연을 초래할 수 있다.
  • 배치 처리: 한 번에 여러 메시지를 처리하여 효율성을 높일 수 있다.
  • 오류 처리: 메시지 발행 실패 시 재시도 메커니즘을 구현해야 한다.
  • 동시성 제어: 여러 인스턴스가 동시에 폴링할 경우 중복 처리를 방지해야 한다.

구현 예시

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class TransactionalPollingPublisher:
    def __init__(self, config):
        self.database = Database(config['database_url'])
        self.message_broker = MessageBroker(config['broker_url'])
        self.outbox_table = config['outbox_table']
        self.batch_size = config['batch_size']
        self.state_manager = StateManager()
    
    async def poll_and_publish(self):
        """트랜잭션 아웃박스 패턴과 결합된 폴링 발행"""
        while True:
            async with self.database.transaction() as txn:
                try:
                    # 미발행 메시지 조회
                    messages = await self.get_unpublished_messages(
                        batch_size=self.batch_size
                    )
                    
                    if not messages:
                        continue
                    
                    # 메시지 발행
                    published_ids = []
                    for message in messages:
                        success = await self.publish_message(message)
                        if success:
                            published_ids.append(message.id)
                    
                    # 발행 상태 업데이트
                    if published_ids:
                        await self.mark_as_published(published_ids)
                        await txn.commit()
                    
                except Exception as e:
                    await txn.rollback()
                    await self.handle_publishing_error(e)
                
                finally:
                    await asyncio.sleep(self.get_adaptive_interval())
    
    async def get_unpublished_messages(self, batch_size):
        """미발행 메시지를 조회합니다"""
        query = f"""
            SELECT id, payload, created_at
            FROM {self.outbox_table}
            WHERE published = false
            ORDER BY created_at
            LIMIT {batch_size}
            FOR UPDATE SKIP LOCKED
        """
        return await self.database.fetch_all(query)

참고 및 출처