Asynchronism

백엔드 시스템의 성능을 최적화하는 것은 현대 애플리케이션 개발에서 핵심적인 과제이다. 특히 비동기 처리(Asynchronism)는 시스템의 응답성과 확장성을 크게 향상시킬 수 있는 강력한 패러다임이다.

Offloading Heavy Tasks to Background Jobs or Queues

기본 개념

백그라운드 작업 처리는 시간이 오래 걸리거나 리소스를 많이 소모하는 작업을 주 실행 스레드에서 분리하여 별도의 프로세스나 스레드에서 비동기적으로 실행하는 기법이다. 이를 통해 사용자 요청에 대한 응답 시간을 크게 개선할 수 있다.

구현 방법

작업 큐 시스템 활용

작업 큐 시스템은 백그라운드 작업 처리의 가장 일반적인 구현 방법이다.

이 시스템은 주로 다음 세 가지 구성 요소로 이루어진다:

  1. 큐(Queue): 처리해야 할 작업들이 저장되는 장소이다.
  2. 생산자(Producer): 큐에 작업을 추가하는 주체이다.
  3. 소비자(Consumer/Worker): 큐에서 작업을 가져와 실행하는 주체이다.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Python과 Celery를 사용한 백그라운드 작업 예시
from celery import Celery

# Celery 인스턴스 생성 및 설정
app = Celery('tasks', broker='redis://localhost:6379/0')

# 백그라운드에서 실행될 작업 정의
@app.task
def process_image(image_id):
    # 이미지 처리 로직
    image = download_image(image_id)
    thumbnail = create_thumbnail(image)
    upload_thumbnail(thumbnail, image_id)
    send_notification(image_id)
    return {"status": "success", "image_id": image_id}

# API 엔드포인트에서 작업 큐에 작업 추가
def upload_image_endpoint(request):
    image_data = request.files['image']
    image_id = save_original_image(image_data)
    
    # 비동기 작업 시작 - 즉시 반환됨
    process_image.delay(image_id)
    
    return {"status": "processing", "image_id": image_id}
스케줄링 시스템 활용

주기적으로 실행해야 하는 백그라운드 작업은 스케줄링 시스템을 통해 관리할 수 있다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Node.js에서 node-cron을 사용한 스케줄링 예시
const cron = require('node-cron');
const reportGenerator = require('./report-generator');

// 매일 자정에 일일 보고서 생성 작업 스케줄링
cron.schedule('0 0 * * *', async () => {
  try {
    console.log('일일 보고서 생성 시작...');
    await reportGenerator.generateDailyReport();
    console.log('일일 보고서 생성 완료');
  } catch (error) {
    console.error('보고서 생성 중 오류 발생:', error);
  }
});

주요 사용 사례

  1. 이메일 및 알림 전송: 사용자 등록 완료 후 환영 이메일 발송, 알림 전송 등
  2. 미디어 처리: 이미지 리사이징, 동영상 인코딩, 파일 변환
  3. 보고서 생성: 복잡한 데이터 집계 및 보고서 생성
  4. 데이터 내보내기/가져오기: 대용량 CSV 파일 생성, 데이터 마이그레이션
  5. 정기 유지보수 작업: 데이터베이스 청소, 캐시 갱신, 만료된 콘텐츠 삭제

구현 시 고려사항

  1. 멱등성(Idempotency): 작업이 여러 번 실행되더라도 동일한 결과를 보장해야 한다. 네트워크 오류 등으로 작업이 중복 실행될 수 있기 때문이다.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    // 멱등성을 보장하는 작업 예시
    async function processPayment(paymentId) {
      // 작업이 이미 처리되었는지 확인
      const paymentStatus = await getPaymentStatus(paymentId);
      if (paymentStatus === 'processed') {
        return { status: 'already_processed', paymentId };
      }
    
      // 작업 처리
      const result = await executePayment(paymentId);
    
      // 작업 처리 상태 기록
      await markPaymentAsProcessed(paymentId);
    
      return result;
    }
    
  2. 재시도 메커니즘: 일시적인 오류로 실패한 작업을 자동으로 재시도할 수 있는 메커니즘이 필요하다.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
    # Python과 Celery를 사용한 재시도 메커니즘 예시
    @app.task(bind=True, max_retries=3, default_retry_delay=60)
    def send_notification(self, user_id, message):
        try:
            # 알림 전송 로직
            result = notification_service.send(user_id, message)
            return result
        except TemporaryFailure as exc:
            # 일시적인 오류 발생 시 재시도
            self.retry(exc=exc)
        except PermanentFailure as exc:
            # 영구적인 오류는 로깅 후 실패 처리
            logger.error(f"Notification failed permanently: {exc}")
            raise
    
  3. 우선순위 관리: 작업의 중요도에 따라 우선순위를 설정하여 중요한 작업이 먼저 처리되도록 해야 한다.

  4. 모니터링 및 로깅: 백그라운드 작업의 상태, 실행 시간, 성공/실패 여부 등을 모니터링하고 로깅해야 한다.

  5. 데드 레터 큐(Dead Letter Queue): 여러 번 재시도해도 실패하는 작업을 분리하여 관리하는 전략이 필요하다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// Java와 Spring Boot를 사용한 데드 레터 큐 구성 예시
@Configuration
public class RabbitMQConfig {

    @Bean
    public Queue workQueue() {
        return QueueBuilder.durable("work-queue")
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", "dead-letter-queue")
                .build();
    }

    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable("dead-letter-queue").build();
    }
}

Utilizing Message Brokers for Async Communication Between Services

기본 개념

메시지 브로커는 분산 시스템에서 서비스 간 통신을 중개하는 소프트웨어이다. 메시지 브로커를 활용한 비동기 통신은 서비스 간 직접적인 의존성을 줄이고, 시스템의 전체적인 복원력(resilience)과 확장성(scalability)을 향상시킨다.

주요 메시지 브로커 시스템

  1. RabbitMQ: AMQP(Advanced Message Queuing Protocol)를 구현한 메시지 브로커로, 높은 신뢰성과 유연한 라우팅 옵션을 제공한다.
  2. Apache Kafka: 대용량 메시지 처리에 최적화된 분산 스트리밍 플랫폼으로, 높은 처리량과 내구성을 보장한다.
  3. Redis Pub/Sub: 메모리 기반 데이터 저장소인 Redis에서 제공하는 간단한 발행-구독 패턴 구현이다.
  4. Amazon SQS/SNS: AWS에서 제공하는 메시지 큐 및 알림 서비스이다.
  5. Google Cloud Pub/Sub: Google Cloud Platform의 완전 관리형 실시간 메시지 서비스이다.

메시지 교환 패턴

  1. 점대점(Point-to-Point): 하나의 생산자가 메시지를 큐에 전송하고, 하나의 소비자가 메시지를 처리합니다. 메시지는 한 번만 처리된다.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    // Java와 JMS를 사용한 점대점 통신 예시
    @JmsListener(destination = "order.queue")
    public void processOrder(Order order) {
        // 주문 처리 로직
        orderProcessor.process(order);
    
        // 처리 결과 로깅
        logger.info("Order {} processed successfully", order.getId());
    }
    
  2. 발행-구독(Publish-Subscribe): 하나의 생산자가 메시지를 발행하고, 여러 구독자가 동일한 메시지를 수신한다.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    
    // Node.js와 Redis를 사용한 발행-구독 패턴 예시
    const redis = require('redis');
    
    // 발행자 클라이언트 생성
    const publisher = redis.createClient();
    
    // 구독자 클라이언트 생성
    const subscriber = redis.createClient();
    
    // 구독 설정
    subscriber.subscribe('user-activity');
    
    // 메시지 수신 처리
    subscriber.on('message', (channel, message) => {
      if (channel === 'user-activity') {
        const activity = JSON.parse(message);
        console.log(`사용자 ${activity.userId}${activity.action} 작업을 수행했습니다.`);
        // 활동 로깅, 분석 등의 처리
      }
    });
    
    // 이벤트 발행 함수
    function publishUserActivity(userId, action) {
      const activity = {
        userId,
        action,
        timestamp: Date.now()
      };
      publisher.publish('user-activity', JSON.stringify(activity));
    }
    
    // 사용 예시
    publishUserActivity('user123', 'login');
    
  3. 요청-응답(Request-Reply): 클라이언트가 요청을 보내고 서버가 응답을 반환하는 패턴으로, 비동기적으로 구현될 수 있다.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    
    # Python과 RabbitMQ를 사용한 비동기 요청-응답 패턴 예시
    import pika
    import uuid
    import json
    
    class PricingServiceClient:
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
            self.channel = self.connection.channel()
    
            # 응답을 받을 익명 큐 생성
            result = self.channel.queue_declare(queue='', exclusive=True)
            self.callback_queue = result.method.queue
    
            # 응답 처리를 위한 소비자 설정
            self.channel.basic_consume(
                queue=self.callback_queue,
                on_message_callback=self.on_response,
                auto_ack=True
            )
    
            self.responses = {}
    
        def on_response(self, ch, method, props, body):
            # 응답 처리
            self.responses[props.correlation_id] = json.loads(body)
    
        def get_product_price(self, product_id):
            # 상관 ID 생성
            correlation_id = str(uuid.uuid4())
    
            # 요청 전송
            self.channel.basic_publish(
                exchange='',
                routing_key='pricing_service',
                properties=pika.BasicProperties(
                    reply_to=self.callback_queue,
                    correlation_id=correlation_id,
                ),
                body=json.dumps({'product_id': product_id})
            )
    
            # 응답 대기
            while correlation_id not in self.responses:
                self.connection.process_data_events()
    
            return self.responses.pop(correlation_id)
    

이벤트 기반 아키텍처(Event-Driven Architecture)

이벤트 기반 아키텍처는 시스템 내의 상태 변화를 이벤트로 발행하고, 관심 있는 컴포넌트들이 이를 구독하여 처리하는 방식이다. 이는 서비스 간 결합도를 낮추고 확장성을 높이는 데 효과적이다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// C#과 Azure Event Grid를 사용한 이벤트 기반 아키텍처 예시
public class OrderController : ApiController
{
    private readonly IEventGridPublisher _eventPublisher;
    private readonly IOrderRepository _orderRepository;

    public OrderController(IEventGridPublisher eventPublisher, IOrderRepository orderRepository)
    {
        _eventPublisher = eventPublisher;
        _orderRepository = orderRepository;
    }

    [HttpPost]
    public async Task<IActionResult> CreateOrder([FromBody] OrderRequest request)
    {
        // 주문 저장
        var order = new Order(request);
        await _orderRepository.SaveOrderAsync(order);
        
        // 주문 생성 이벤트 발행
        var orderCreatedEvent = new OrderCreatedEvent
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            TotalAmount = order.TotalAmount,
            CreatedAt = DateTime.UtcNow
        };
        
        await _eventPublisher.PublishEventAsync("order-events", "OrderCreated", orderCreatedEvent);
        
        return Ok(new { OrderId = order.Id });
    }
}

구현 시 고려사항

  1. 메시지 직렬화 포맷: JSON, Protocol Buffers, Avro 등 여러 직렬화 포맷 중 적절한 것을 선택해야 한다.

  2. 메시지 스키마 관리: 메시지 구조가 변경될 때 호환성을 유지하기 위한 스키마 관리 전략이 필요하다.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    
    // Java와 Avro를 사용한 스키마 기반 메시지 직렬화 예시
    // 스키마 정의 (order.avsc)
    {
      "namespace": "com.example.order",
      "type": "record",
      "name": "Order",
      "fields": [
        {"name": "id", "type": "string"},
        {"name": "customerId", "type": "string"},
        {"name": "items", "type": {"type": "array", "items": {
          "type": "record",
          "name": "OrderItem",
          "fields": [
            {"name": "productId", "type": "string"},
            {"name": "quantity", "type": "int"},
            {"name": "price", "type": "double"}
          ]
        }}},
        {"name": "totalAmount", "type": "double"},
        {"name": "createdAt", "type": "long", "logicalType": "timestamp-millis"}
      ]
    }
    
    // Avro를 사용한 메시지 직렬화 및 전송
    public void sendOrderMessage(Order order) throws IOException {
        // 스키마 로드
        Schema schema = new Schema.Parser().parse(new File("order.avsc"));
    
        // 레코드 생성
        GenericRecord avroOrder = new GenericData.Record(schema);
        avroOrder.put("id", order.getId());
        avroOrder.put("customerId", order.getCustomerId());
        // ... 나머지 필드 설정
    
        // 직렬화
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
        BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
        writer.write(avroOrder, encoder);
        encoder.flush();
    
        // 메시지 전송
        byte[] serializedOrder = outputStream.toByteArray();
        kafkaTemplate.send("orders", order.getId(), serializedOrder);
    }
    
  3. 메시지 라우팅: 메시지를 적절한 소비자에게 전달하는 라우팅 전략을 설계해야 한다.

  4. 메시지 순서 보장: 순서가 중요한 경우, 메시지의 순서를 보장하는 메커니즘이 필요하다.

  5. 장애 처리 및 복원력: 메시지 브로커나 소비자 서비스의 장애 상황에 대비한 전략이 필요하다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Node.js와 Kafka를 사용한 장애 대응 예시
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'], // 여러 브로커 지정
  retry: {
    initialRetryTime: 100,
    retries: 8  // 재시도 횟수 설정
  }
});

const consumer = kafka.consumer({ groupId: 'payment-processing-group' });

// 소비자 설정
async function setupConsumer() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'payment-events', fromBeginning: false });
  
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      try {
        const paymentEvent = JSON.parse(message.value.toString());
        await processPaymentEvent(paymentEvent);
        
        // 메시지 처리 성공 로깅
        console.log(`[${topic}] Processed payment event: ${paymentEvent.id}`);
      } catch (error) {
        console.error(`Error processing message: ${error.message}`);
        
        // 심각한 오류가 아니라면 계속 처리
        if (!isFatalError(error)) {
          return;
        }
        
        // 심각한 오류인 경우 소비자 종료 및 재시작 로직
        await consumer.disconnect();
        setTimeout(setupConsumer, 5000);  // 5초 후 재연결 시도
      }
    }
  });
}

setupConsumer();

비동기 처리의 성능적 이점

  1. 응답 시간 개선
    비동기 처리를 통해 사용자 요청에 대한 응답 시간을 크게 개선할 수 있다. 예를 들어, 사용자가 대용량 파일을 업로드할 때, 파일 처리 작업을 백그라운드로 오프로딩하면 사용자는 파일 처리가 완료되기를 기다리지 않고 다른 작업을 계속할 수 있다.

    1
    2
    3
    4
    5
    6
    7
    
    동기 처리 방식:
    사용자 요청 → 파일 업로드 → 파일 처리 → 응답 반환
    (총 응답 시간 = 업로드 시간 + 처리 시간)
    
    비동기 처리 방식:
    사용자 요청 → 파일 업로드 → 작업 큐에 추가 → 응답 반환
    (총 응답 시간 = 업로드 시간 + 큐 추가 시간)
    
  2. 시스템 처리량 증가
    비동기 처리를 통해 시스템의 전체 처리량을 증가시킬 수 있다. 작업을 병렬로 처리하고, 자원을 효율적으로 활용할 수 있기 때문이다.

  3. 시스템 확장성 향상
    비동기 아키텍처는 시스템의 확장성을 크게 향상시킨다. 작업 처리 부하가 증가하면 워커(소비자) 인스턴스를 추가하여 수평적으로 확장할 수 있다.

    1
    2
    
    # Docker Swarm을 사용한 워커 확장 예시
    docker service scale my-worker-service=10
    
  4. 시스템 복원력 강화
    비동기 처리는 시스템의 복원력을 강화한다. 한 서비스의 장애가 전체 시스템에 미치는 영향을 최소화하고, 메시지 지속성을 통해 데이터 손실을 방지할 수 있다.


용어 정리

용어설명

참고 및 출처