Pipe-and-Filter

Pipe-and-Filter 는 데이터를 처리 흐름에 따라 독립된 필터 (Filter) 모듈로 나누고, 각 모듈 간 데이터를 파이프 (Pipe) 를 통해 전달하며 시스템 전체를 구성하는 아키텍처 스타일이다. 각 필터는 특정 작업 (검증, 변환, 집계 등) 에 집중하며, 파이프는 데이터를 순서대로 전달한다. 이런 설계는 모듈성, 재사용성, 조합성 등을 극대화해 유지보수와 확장을 용이하게 한다. 반면, 필터의 수가 많아질수록 오버헤드, 지연 (latency), 디버깅 복잡도, 포맷 호환 문제 등이 발생할 수 있다. 실무에서는 컴파일러, Unix 셸 명령어, ETL 파이프라인, 미디어 처리 시스템 등에 주로 적용된다.

배경

Pipe-and-Filter 는 1970 년대 Unix 운영체제의 파이프라인 철학에서 기원했다. Doug McIlroy 가 제안한 " 작은 프로그램들을 조합하여 복잡한 작업을 수행한다 " 는 Unix 철학이 이 아키텍처의 기반이 되었다. 이후 소프트웨어 공학 분야에서 정형화되어 Garlan 과 Shaw 의 소프트웨어 아키텍처 연구를 통해 체계적으로 정리되었다.

목적 및 필요성

주요 목적:

필요성:

핵심 개념

Pipe-and-Filter 는 데이터 스트림을 일련의 처리 단계로 나누는 아키텍처 스타일이다. 각 단계는 “Filter” 로 구현되며, 처리 결과는 “Pipe” 를 통해 다음 필터로 전달된다.

이와 같은 특성을 가지고 있다:

기본 개념

  1. 파이프 (Pipe)

    • 데이터 전송을 담당하는 커넥터 또는 채널
    • 한 필터의 출력을 다음 필터의 입력으로 연결
    • 데이터 스트림의 순서와 동기화 보장
    • 구현 방식: 메시지 큐, 소켓, 공유 메모리, 파일 시스템
  2. 필터 (Filter)

    • 독립적인 데이터 처리 컴포넌트
    • 입력 데이터를 받아 변환, 가공, 검증 후 출력
    • 상태를 가지지 않는 순수 함수적 특성 (Stateless)
    • 재사용 가능하고 교체 가능한 모듈
  3. 데이터 소스 (Data Source)

    • 파이프라인의 시작점
    • 외부 시스템, 파일, 사용자 입력 등에서 데이터 제공
    • 초기 데이터를 첫 번째 필터로 전송
  4. 데이터 싱크 (Data Sink)

    • 파이프라인의 종료점
    • 최종 처리된 데이터를 외부 시스템, 파일, 화면 등에 출력
    • 결과 데이터의 최종 목적지

심화 개념

  1. 포트 (Port)

    • 필터와 파이프 간의 연결점
    • 입력 포트 (Sink Port): 데이터가 들어오는 지점
    • 출력 포트 (Source Port): 데이터가 나가는 지점
    • 데이터 타입과 프로토콜 협상 지원
  2. 버퍼링 (Buffering)

    • 파이프에서 데이터를 임시 저장하는 메커니즘
    • 필터 간 처리 속도 차이 조절
    • 백프레셔 (Backpressure) 처리
  3. 라우팅 (Routing)

    • 조건부 데이터 흐름 제어
    • 분기 및 병합 패턴 지원
    • 동적 경로 선택

실무 구현 연관성

  1. 스트림 처리 시스템과의 연관성:

    • Apache Kafka Streams, Apache Flink 의 핵심 설계 패러다임
    • 실시간 데이터 처리 파이프라인 구축의 기본 원칙
    • 마이크로서비스 아키텍처에서 데이터 흐름 설계
  2. 미디어 처리 시스템과의 연관성:

    • GStreamer, FFmpeg 의 핵심 아키텍처
    • 비디오/오디오 인코딩/디코딩 파이프라인
    • 실시간 멀티미디어 스트리밍
  3. 컴파일러 설계와의 연관성:

    • 렉시컬 분석 → 구문 분석 → 의미 분석 → 코드 생성
    • 각 단계가 독립적인 필터로 구현
    • 최적화 패스의 모듈화

실무 연계

활용 분야적용 방식 및 특징
데이터 처리ETL 파이프라인, 스트림 처리 시스템에서 필터를 연산 모듈로 사용
이미지/비디오 처리GStreamer, FFmpeg 와 같은 프레임워크에서 필터 체인으로 변환 수행
컴파일러 설계Lexer → Parser → Optimizer 등의 모듈 연결
IoT 처리센서 데이터 → 필터 처리 → 전송 → 경보 시스템 등으로 처리 단계화 가능

주요 기능 및 역할

  1. 시스템 분해 기능:

    • 복잡한 처리 로직을 독립적인 단위로 분해
    • 각 필터가 단일 책임을 갖도록 설계
    • 관심사의 분리 (Separation of Concerns) 구현
  2. 데이터 흐름 제어:

    • 순차적 데이터 처리 흐름 관리
    • 데이터 변환 체인 구성
    • 오류 전파 및 복구 메커니즘
  3. 조합성 제공:

    • 필터들의 다양한 조합으로 새로운 기능 창출
    • 런타임 구성 변경 지원
    • 플러그인 아키텍처 구현

특징

  1. 독립성 (Independence):

    • 각 필터는 다른 필터와 독립적으로 동작
    • 필터 간 직접적인 상태 공유 없음
    • 인터페이스를 통한 느슨한 결합
  2. 순차성 (Sequential Processing):

    • 데이터가 파이프라인을 따라 순차적으로 흐름
    • 각 단계에서 점진적인 데이터 변환
    • 선형적 데이터 흐름 패턴
  3. 투명성 (Transparency):

    • 각 필터는 이전/다음 필터를 알 필요 없음
    • 데이터 소스와 싱크의 추상화
    • 구현 세부사항 은닉

핵심 원칙

  1. 단일 책임 원칙 (Single Responsibility Principle):

    • 각 필터는 하나의 명확한 기능만 수행
    • 응집도 높은 컴포넌트 설계
    • 기능별 독립적 테스트 가능
  2. 개방 - 폐쇄 원칙 (Open-Closed Principle):

    • 새로운 필터 추가에는 열려있음
    • 기존 필터 수정에는 닫혀있음
    • 확장 가능한 아키텍처 구현
  3. 의존성 역전 원칙 (Dependency Inversion Principle):

    • 필터는 추상화된 인터페이스에 의존
    • 구체적 구현이 아닌 계약에 의존
    • 유연한 컴포넌트 교체 가능

주요 원리

  1. 필터 독립성

    • 무상태 (stateless): 필터는 상태를 공유하지 않아야 하며, 내부 로직만 수행.
    • 블라인드 연결: 필터는 ’ 위/아래 필터 ’ 이름을 알 필요 없음.
  2. 데이터 스트림 처리

    • 입력을 읽어 바로 출력 → 파이프 단계마다 대기 없는 흐름
    • 유형: 배치형 (batch-sequential) 또는 스트림 (streaming) 형태로 구현 가능
  3. 병렬성 및 확장성

    • 각 필터는 독립 실행 가능 → 다중 인스턴스로 확장 가능
    • 병렬성에는 버퍼, 직렬화 비용 고려 필요

작동 원리 및 방식

핵심 원리

데이터 흐름 원리:

  1. 순방향 흐름: 데이터는 소스에서 싱크로 한 방향으로 흐름
  2. 변환 체인: 각 필터에서 데이터가 점진적으로 변환
  3. 비동기 처리: 필터들이 독립적으로 병렬 처리 가능

작동 방식

sequenceDiagram
    participant Source
    participant Filter1
    participant Filter2
    participant Filter3
    participant Sink

    Source->>Filter1: 데이터 스트림 전송
    Filter1->>Filter2: 처리된 데이터 전송
    Filter2->>Filter3: 추가 처리 후 전송
    Filter3->>Sink: 최종 결과 출력

10. 구조 및 아키텍처

구성 요소

구성 요소설명기능/특징필수 여부
Pump소스데이터 스트림 시작필수
Pipe전달 채널필터 간 데이터 전송, 버퍼링필수
Filter처리 모듈transform, validate, enrich 등필수
Sink결과 소비파일, DB, API 등에 최종 저장필수
버퍼/큐비동기 처리처리 불일치 보완, 고성능 지원선택
메시징 시스템비동기, 장애 대응Kafka, RabbitMQ 기반 파이프선택
모니터링/로깅상태 추적필터 간 메트릭, 장애 탐지선택
컨트롤러/오케스트레이터파이프라인 구성 관리YAML/DSL 기반 연결 정의선택

구조 및 아키텍처

Pipe-and-Filter 아키텍처는 아래 세 가지 구성 요소를 중심으로 구성된다:

graph LR
    A[Data Source] --> B[Pipe 1]
    B --> C[Filter 1]
    C --> D[Pipe 2]
    D --> E[Filter 2]
    E --> F[Pipe 3]
    F --> G[Filter 3]
    G --> H[Pipe 4]
    H --> I[Data Sink]
    
    style A fill:#e1f5fe
    style C fill:#f3e5f5
    style E fill:#f3e5f5
    style G fill:#f3e5f5
    style I fill:#e8f5e8

구성요소

구분구성요소기능역할특징
필수데이터 소스 (Data Source)초기 데이터를 파이프라인에 공급외부 시스템 (파일, API 등) 로부터 데이터 수신파이프라인 시작점, 데이터 생성/수신 담당
필터 (Filter)데이터 변환 및 처리 수행입력을 받아 변환 후 출력 생성상태 없음, 모듈화 가능, 독립 실행 가능
파이프 (Pipe)필터 간 데이터 전달데이터 흐름 전달 및 형식 표준화 역할버퍼링, 동기화, 데이터 타입 통합 지원
데이터 싱크 (Data Sink)처리된 데이터 최종 수집외부 저장소/시스템으로 결과 데이터 전송파이프라인 종료점, 결과 저장/출력 담당
선택스플리터 (Splitter)데이터 스트림을 여러 경로로 분기병렬 처리 또는 라우팅 분기점조건부 분기, 복사 및 분산 지원
집계기 (Aggregator)다중 스트림을 하나로 병합분산 처리된 결과 통합데이터 정합성 보장, 순서 제어, 병합 처리
라우터 (Router)조건 기반 라우팅 수행동적 처리 경로 선택, 부하 분산규칙 기반 조건 분기, 우선순위 분기
트랜스포머 (Transformer)데이터 포맷/스키마 변환 수행인코딩 변경, 형식 변환, 스키마 매핑양방향 변환 가능, 복잡한 구조도 처리 가능

구성 요소 상세 분석

필터 상세 구조
graph TB
    subgraph "Filter Internal Structure"
        A[Input Port] --> B[Input Validation]
        B --> C[Processing Logic]
        C --> D[Output Generation]
        D --> E[Output Port]
        
        F[Configuration] --> C
        G[Error Handler] --> C
        H[Monitoring] --> C
    end
구성 요소주요 기능세부 설명
입력 포트 (Input Port)데이터 수신 인터페이스외부 파이프 또는 소스로부터 데이터 수신
데이터 형식 검증입력 데이터가 기대하는 스키마 또는 포맷인지 확인
버퍼링 관리처리 지연 또는 병목에 대비해 버퍼에 저장
처리 로직 (Processing Logic)핵심 비즈니스 로직 수행데이터 필터링, 변환, 계산 등의 로직 구현
상태 없는 처리외부 상태 의존성 없이 순수 함수 기반 처리 권장
예외 처리오류 발생 시 예외 감지 및 로깅/우회 처리
출력 포트 (Output Port)처리 결과 전송다음 필터 또는 싱크로 데이터 전달
형식 표준화출력 데이터의 스키마 및 포맷 정렬
품질 보증 (Validation)누락, 중복, 무결성 등 출력 데이터 검증
파이프 상세 구조
graph TB
    subgraph "Pipe Internal Structure"
        A[Input Interface] --> B[Buffer Management]
        B --> C[Data Serialization]
        C --> D[Transport Layer]
        D --> E[Data Deserialization]
        E --> F[Output Interface]
        
        G[Flow Control] --> B
        H[Error Recovery] --> D
        I[Monitoring] --> D
    end
구성 요소주요 기능세부 설명
버퍼 관리 (Buffer Management)데이터 임시 저장필터 간 처리 속도 차이 완충을 위해 임시 저장소 역할 수행
흐름 제어과도한 데이터 유입을 제어해 시스템 안정성 확보 (flow control)
백프레셔 처리처리 병목 시 upstream 으로 신호 전달하여 속도 조절 (backpressure)
전송 계층 (Transport Layer)네트워크 통신필터 간 데이터 전송 시 네트워크 경로 확보 및 관리
프로토콜 처리TCP, UDP, HTTP 등 다양한 통신 프로토콜 지원
안정성 보장전송 실패 시 재시도, 순서 보장, 신뢰성 있는 전달 보장
직렬화 / 역직렬화 (Serialization / Deserialization)데이터 형식 변환객체 ↔ 바이트 스트림 또는 JSON, Avro 등 포맷 간 변환
압축 / 압축 해제대역폭 절감 또는 저장 공간 최적화를 위한 처리
인코딩 / 디코딩문자 인코딩, 바이너리 ↔ 텍스트 포맷 처리 등

구현 기법

카테고리구현 기법정의 / 구성 요소핵심 목적실제 예시
1. 실행 모델 기반동기식 파이프라인필터가 순차적으로 실행되며, 이전 필터가 끝나야 다음으로 전달 (blocking I/O)단순성, 순차 처리, 디버깅 용이`cat file
비동기식 파이프라인필터 간 큐/채널을 사용하여 독립 실행 (non-blocking, 병렬 처리 가능)처리량 증가, 지연 허용, 확장성 확보Kafka Streams, Node.js Streams, Python Queue 기반 파이프라인
스트림 기반 처리연속적인 데이터 스트림을 필터로 전달하며 흐름 중심 처리 (windowing, backpressure 포함)실시간 스트림 분석, 저지연 처리Apache Flink, Node.js Transform Stream
이벤트 기반 파이프라인이벤트 트리거에 따라 필터 실행 (event-driven consumer)유연성, 느슨한 결합, 이벤트 처리 구조화AWS Lambda + SQS, Kafka + Lambda
2. 시스템 구조 기반분산 처리 구조필터를 마이크로서비스 또는 컨테이너로 분리, 노드 간 메시지 큐로 연결확장성, 장애 격리, 고가용성Apache Flink Cluster, Celery + Redis
서버리스 기반 구성각 필터를 Function 단위로 분리, 클라우드 이벤트로 실행비용 효율성, 유지보수 최소화AWS Lambda + EventBridge, Azure Durable Functions
3. 최적화 전략 기반Filter Fusion다수 필터 연산을 하나의 노드로 통합, 연산 비용 및 컨텍스트 전환 최소화성능 최적화, 연산 병합Spark Stage Fusion, Flink Operator Chain
Buffered Pipes파이프 내부에 버퍼 큐를 도입하여 프로듀서-컨슈머 간 처리 속도 차이 완화비동기 처리, 유실 방지, 흐름 제어Node.js Stream, Akka Streams
Parallel Filters동일 필터를 다수 인스턴스로 병렬 배치하여 throughput 개선대용량 처리, 리소스 분산FastAPI + Celery, Beam ParDo with MultiWorkers
4. 구현 방식 기반함수형 파이프라인불변 데이터 + 고차 함수 + 체이닝 기반 필터 구성재사용성, 테스트 용이성JavaScript compose, Python functools, F# pipeline
프로그래밍 라이브러리 기반언어 내장 스트림 API 또는 체이닝 메커니즘 활용간결한 표현식, 가독성Java Stream API, RxJS,.NET LINQ
5. 인프라/도구 기반워크플로우 엔진 기반GUI 또는 DSL 기반으로 필터를 시각적 또는 선언적으로 정의배치 자동화, 단계 의존성 관리Apache NiFi, Airflow, Spring Cloud Data Flow
메시지 큐 기반 처리필터 간 Kafka, RabbitMQ, SQS 등 메시지 큐로 연결비동기성, 느슨한 결합, 처리 병렬화Kafka + Consumer Group, RabbitMQ Worker Pattern
6. 특수 목적 처리컴파일러 필터 체인어휘 → 구문 → 의미 분석 → IR 생성 등 순차 필터 구성구조적 처리, 분리된 책임GCC, Clang, Babel
멀티미디어 필터 체인이미지/비디오 처리 필터 체인 구성실시간 필터링, 효과 적용GStreamer, FFmpeg

동기식 파이프라인 (Synchronous Pipeline)

정의: 각 필터가 순차적으로 실행되는 구현 방식

구성:

목적: 간단한 데이터 변환 체인 구현

실제 예시:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 동기식 파이프라인 예시
class SyncPipeline:
    def __init__(self):
        self.filters = []
    
    def add_filter(self, filter_func):
        self.filters.append(filter_func)
    
    def execute(self, data):
        result = data
        for filter_func in self.filters:
            result = filter_func(result)
        return result

# 사용 예시
pipeline = SyncPipeline()
pipeline.add_filter(lambda x: x.upper())  # 대문자 변환
pipeline.add_filter(lambda x: x.strip())  # 공백 제거
pipeline.add_filter(lambda x: x.replace(' ', '_'))  # 공백을 언더스코어로 변경

result = pipeline.execute("  hello world  ")
print(result)  # "HELLO_WORLD"

비동기식 파이프라인 (Asynchronous Pipeline)

정의: 각 필터가 독립적인 스레드나 프로세스에서 실행

구성:

목적: 고성능 실시간 데이터 처리

실제 예시:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import asyncio
import queue
import threading

class AsyncPipeline:
    def __init__(self):
        self.filters = []
        self.queues = []
    
    def add_filter(self, filter_func):
        self.filters.append(filter_func)
        self.queues.append(queue.Queue())
    
    def worker(self, filter_func, input_queue, output_queue):
        while True:
            try:
                data = input_queue.get(timeout=1)
                if data is None:  # 종료 신호
                    break
                result = filter_func(data)
                if output_queue:
                    output_queue.put(result)
                input_queue.task_done()
            except queue.Empty:
                continue
    
    def execute_async(self, data_stream):
        # 워커 스레드 시작
        threads = []
        for i, filter_func in enumerate(self.filters):
            input_queue = self.queues[i]
            output_queue = self.queues[i + 1] if i + 1 < len(self.queues) else None
            
            thread = threading.Thread(
                target=self.worker, 
                args=(filter_func, input_queue, output_queue)
            )
            thread.start()
            threads.append(thread)
        
        # 데이터 입력
        for data in data_stream:
            self.queues[0].put(data)
        
        # 스레드 종료
        for i, thread in enumerate(threads):
            self.queues[i].put(None)
            thread.join()

스트림 기반 파이프라인 (Stream-based Pipeline)

정의: 연속적인 데이터 스트림을 처리하는 구현 방식

구성:

목적: 실시간 스트림 데이터 처리

실제 예시:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// Node.js 스트림 기반 파이프라인
const { Transform, Readable, Writable } = require('stream');

class UpperCaseFilter extends Transform {
    _transform(chunk, encoding, callback) {
        // 데이터를 대문자로 변환
        this.push(chunk.toString().toUpperCase());
        callback();
    }
}

class WordCountFilter extends Transform {
    constructor(options) {
        super(options);
        this.wordCount = 0;
    }
    
    _transform(chunk, encoding, callback) {
        // 단어 수 계산
        const words = chunk.toString().split(/\s+/).filter(w => w.length > 0);
        this.wordCount += words.length;
        this.push(`Words: ${this.wordCount}\n`);
        callback();
    }
}

// 파이프라인 구성
const inputStream = new Readable({
    read() {
        this.push('hello world\n');
        this.push('pipe and filter\n');
        this.push(null); // 스트림 종료
    }
});

const outputStream = new Writable({
    write(chunk, encoding, callback) {
        console.log('Output:', chunk.toString());
        callback();
    }
});

// 파이프라인 실행
inputStream
    .pipe(new UpperCaseFilter())
    .pipe(new WordCountFilter())
    .pipe(outputStream);

이벤트 기반 파이프라인 (Event-driven Pipeline)

정의: 이벤트 발생에 따라 필터가 활성화되는 구현 방식

구성:

목적: 유연하고 확장 가능한 이벤트 처리

장점

카테고리항목설명
1. 설계적 장점모듈성 (Modularity)각 필터는 단일 책임을 갖는 독립 모듈로 구성되어 변경에 유연하고, 유지보수에 강함
재사용성 (Reusability)범용적으로 설계된 필터는 다양한 파이프라인에서 재활용 가능하며, 라이브러리화가 용이함
유연성 (Flexibility)필터 순서 변경, 조건 분기, 동적 구성 등을 통해 다양한 흐름 구성 가능
조합성 (Composability)필터를 조합하여 복잡한 기능을 단순한 단계로 구성할 수 있음
2. 운영/개발 편의성테스트 용이성 (Testability)각 필터 단위로 독립 테스트가 가능하며, 입출력 명확성 덕분에 유닛 테스트가 쉬움
디버깅 편의성필터 간 경계에서 중간 데이터를 확인할 수 있어 문제 추적이 쉬움
독립 배포 (Deployability)필터를 컨테이너 등으로 독립 배포해 장애를 격리하고 점진적 릴리즈 가능
3. 확장성과 유연성확장성 (Scalability)Stateless 한 필터는 수평 확장이 용이하고, 병렬 파이프라인 구성에도 적합
분산 처리 (Distributed Processing)각 필터를 서로 다른 노드에 분산 배치해 확장성과 복원력을 향상시킬 수 있음
동적 라우팅/구성 가능성Dispatcher 혹은 조건 기반 분기로 필터 실행 흐름을 유연하게 구성 가능
4. 성능 최적화병렬 처리 (Parallelism)필터가 독립적으로 실행되어 병렬 처리 가능, Throughput 을 높이기 쉬움
리소스 분리 및 격리각 필터는 독립 실행이 가능하여 자원 할당/격리가 용이하며 성능 이슈를 국소화할 수 있음
데이터 흐름 가시화파이프라인이 시각화되어 설계 구조의 명확성과 운영 추적성이 향상됨

단점과 문제점 그리고 해결방안

단점

구분항목설명해결 방안 및 전략
설계적 한계복잡성 증가필터 수가 많아질수록 전체 시스템의 흐름 파악과 테스트, 변경이 어려워짐필터 그룹화, 표준 인터페이스 정의, 구성 자동화
성능성능 오버헤드필터 간 데이터 복사, 직렬화/역직렬화, 파이프 I/O 등으로 인한 성능 저하Zero-Copy, 병렬 처리, 로컬 캐싱, 배치 처리
상태 관리전역 상태 공유 제한각 필터는 Stateless 하게 설계되므로 복잡한 상태 공유 또는 흐름 기반 알고리즘 구현이 어려움외부 상태 저장소 사용 (Redis 등), 컨텍스트 전달 구조 적용
유지보수디버깅 어려움분산된 필터 환경에서 장애 추적 및 중간 데이터 확인이 어려움통합 로깅, 분산 추적 시스템 (OpenTelemetry, Zipkin)
형식 일관성데이터 포맷 제한필터 간 데이터 포맷 불일치 시 오류 발생 가능성 높음Avro/Protobuf 등 표준 스키마 적용, 어댑터 필터 도입
테스트제어 흐름 제한조건 분기, 반복 등 복잡한 로직 구현이 제한적조건 필터, 라우터/분기 필터, 루프백 필터 설계
운영높은 지연 (Latency)필터 수가 많아질수록 동기 흐름에서 누적 지연 발생 가능비동기 처리, 경량 필터 분할, 병렬 처리 구조 설계

문제점

구분문제 항목원인영향탐지 및 진단예방 방안해결 방안 및 기법
성능병목 필터특정 필터 처리 성능 저하전체 처리 속도 감소Prometheus/Grafana 지표, 단계별 처리 시간 추적병렬 처리, 필터 분리필터 병렬화, 캐시 적용
신뢰성데이터 손실파이프 버퍼 오버플로우, 필터 장애데이터 유실, 결과 불일치로그 분석, 재시도 횟수 모니터링버퍼 크기 조정, 신뢰성 높은 큐 사용트랜잭션 처리, 재처리 메커니즘, WAL 적용
안정성백프레셔 (Backpressure)다운스트림 필터의 느린 처리 속도시스템 과부하, 메모리 고갈큐 길이 모니터링, 처리량 측정적응형 버퍼, 처리량 기반 Throttle 설정플로우 제어, Drop 정책, 버퍼 리사이징
통신포맷 불일치필터 간 데이터 스키마 불일치필터 간 통신 오류계약 기반 테스트, 포맷 검증 로직Typed Pipe, 스키마 레지스트리데이터 변환 필터, Adaptor 도입
안정성오류 전파 부족필터 간 느슨한 결합 구조로 오류 감지가 어렵고 전파되지 않음시스템 오류 탐지 실패Error Logging, Dead Letter Queue 설정필터별 오류 처리, Retry/Circuit Breaker재시도 정책, DLQ 격리
일관성순서 보장 실패병렬 처리나 비동기 흐름에서 메시지 순서가 어긋남결과 불일치, 동기화 오류 발생메시지 ID 추적, Sequence LogOrdered Queue, 파티셔닝 설계Reorder Buffer, 순서 복원 로직
안정성데드락순환 대기, 자원 고갈 등으로 필터 간 상호 대기 발생전체 파이프라인 정지스레드/버퍼 상태 모니터링자원 우선순위 지정, 타임아웃 설정데드락 탐지 알고리즘, 자원 순서화
확장성지연 누적각 필터의 처리 지연이 누적되어 전체 응답 시간 증가실시간 요구 미충족, 성능 SLA 위반엔드투엔드 타이밍 측정병렬 처리 구조 설계, 캐싱 전략 적용필터 재구성, 사전 처리 (Preprocessing) 도입

도전 과제

카테고리항목원인영향대응 전략
1. 성능 및 확장성병목 현상 (Bottleneck)필터 간 처리 속도 불균형, 단일 필터 과부하처리량 감소, 지연 증가, 자원 낭비병렬 필터 배치, 부하 테스트, 필터 분리 또는 로드밸런싱
멀티스테이지 오버헤드파이프라인 단계 증가로 인한 컨텍스트 스위칭 및 전송 비용실시간 요구 미충족, 성능 저하Filter Fusion, 캐시 최적화, 단계를 축소한 재설계
메모리 누수장기 실행 필터에서 해제 누락, GC 미흡시스템 자원 고갈, 크래시 위험메모리 풀 도입, 프로파일링 도구 사용, 리소스 해제 자동화
상태 기반 확장성 제약필터 내부 상태 유지 (Stateful)수평 확장 제한, 상태 불일치 위험외부 상태 저장소 (예: Redis, DB) 분리 및 스냅샷 주기화
2. 안정성 및 복구력데이터 정합성 오류비동기 처리 중 순서 변경, 중복 처리, 실패 전파데이터 손상, 로직 오류 발생Exactly-once 처리 보장, 메시지 순서화 (Ordered Queue), 이벤트 소싱 도입
상태 유실시스템 재시작, 필터 충돌 시 휘발성 메모리 상태 소실데이터 손실, 재처리 불가외부 상태 저장소, Checkpoint, Drain & Replay 전략 적용
흐름 중단필터 단일 장애 발생 시 전체 파이프라인 정지전체 서비스 중단, 복구 시간 증가Circuit Breaker, Retry + Fallback, 필터별 독립 실행 환경
네트워크/노드 장애클러스터 내 통신 실패, 서비스 장애중단, 일시적 비가용 상태자동 재시작, 서비스 메시 (Canary Routing), 장애 허용 설계
3. 관찰성 및 모니터링로그 분산 및 추적 불가필터 간 모듈화로 로그가 여러 위치에 존재, 상관관계 파악 어려움원인 분석 지연, 문제 해결 시간 증가중앙 집중 로깅 (ELK, Loki), 분산 트레이싱 (OpenTelemetry, Zipkin)
실시간 모니터링 미비메트릭 노출 미흡, 지표 수집 구조 없음성능 이상 탐지 지연, SLA 위반표준화된 메트릭 (Prometheus), 대시보드 구성 (Grafana), 경고 시스템
복잡도 증가로 시각화 어려움필터 수 증가, 구조 복잡화구조 파악 곤란, 유지보수 어려움파이프라인 다이어그램 자동 생성, 의존성 그래프 도구 도입
4. 구성 및 배포 관리필터 버전 불일치독립 배포로 인한 스키마/계약 불일치다운스트림 오류, 데이터 손상API 계약 기반 개발, JSON Schema/Avro 사용, Schema Registry 연동
설정 일관성 부족필터 구성/환경 변수 누락파이프라인 오작동, 디버깅 어려움공통 구성 템플릿화, CI/CD 환경변수 검증 자동화
배포 복잡성필터 수 증가로 Helm/K8s manifest 증가관리 포인트 증가, 오류 가능성 상승GitOps 기반 배포 자동화, 필터 템플릿 통합
5. 동적 구성 대응성실시간 필터 추가/제거 제한정적 설정 기반 파이프라인 설계유연성 부족, 빠른 대응 불가동적 구성 가능 엔진 (NiFi, Spring Cloud Data Flow), 런타임 재구성 API 도입
파이프라인 단계 간 강한 결합성출력 - 입력 간 인터페이스 강결합변경 시 전체 영향, 반복 배포 필요이벤트 기반 구조 (Pub/Sub), 메시지 포맷 표준화 (e.g., Avro + Schema Evolution)
6. 데이터 처리 특수성무한 스트림 처리 시 지연 관리처리량 증가에 따른 메모리 증가, backpressure 대응 미흡처리 중단, 데이터 유실 위험윈도우 기반 집계, Watermark 설계, Kafka/Flink 기반 backpressure 지원
데이터 포맷 불일치이기종 시스템 간 포맷 차이, encoding 문제파싱 실패, 필터 작동 불가포맷 표준화 (JSON/Avro/Parquet), 포맷 변환 필터 삽입

분류 기준에 따른 종류 및 유형

분류 기준유형설명적용 사례
1. 데이터 흐름 방식선형 파이프라인 (Linear)필터가 일방향으로 순차적으로 연결되어 데이터가 흐름ETL 처리, 컴파일러
분기 파이프라인 (Branching)특정 조건이나 흐름에 따라 여러 필터로 분기됨라우팅 기반 데이터 흐름
병합 파이프라인 (Merging)여러 입력을 하나로 통합하여 후속 필터에 전달로그 집계, 데이터 통합
피드백 루프 (Feedback Loop)결과를 초기 필터로 다시 전달하는 구조, 반복 처리를 위한 설계머신러닝 재학습, 정제 반복처리 구조
2. 실행 방식동기식 (Synchronous)순차적으로 처리되며 각 필터는 이전 필터의 완료를 기다림전통적 Unix 파이프라인
비동기식 (Asynchronous)각 필터가 독립적으로 병렬 실행되며, 큐 또는 채널을 통해 데이터 전달Kafka + Flink 기반 처리
하이브리드 (Hybrid)일부 필터는 동기, 일부는 비동기로 구성된 복합 구조이벤트 기반 워크플로우
3. 상태 관리 방식Stateless Filter상태를 저장하지 않고 입력만으로 처리, 재시작 및 병렬 처리에 유리단순 필터링, 변환
Stateful Filter내부 상태 (세션, 누적값 등) 를 유지하며 처리, 외부 상태 저장소 필요집계, 세션 관리, 윈도우 기반 연산
4. 데이터 처리 단위배치 기반 (Batch-oriented)일정 단위로 데이터를 모아서 일괄 처리일일 통계, 보고서 생성
스트림 기반 (Stream-oriented)데이터가 발생할 때마다 즉시 처리 (low-latency)실시간 모니터링, IoT
이벤트 기반 (Event-driven)비동기 이벤트 발생 시 필터가 트리거됨CEP, 실시간 알림 시스템
5. 파이프 연결 구조단방향 (One-way)입력 → 출력으로만 흐름이 존재하며 단방향 전파대부분의 ETL 파이프라인
양방향 (Two-way)제어 신호 혹은 응답이 반대 방향으로도 흐를 수 있음제어 피드백 기반 시스템
Push 기반데이터 생산자가 즉시 데이터를 다음 필터로 전달Kafka Producer → Consumer 구조
Pull 기반소비자가 필요할 때 상위 필터로부터 데이터를 요청Flow Control 기반 처리 구조
6. 필터 활성화 모델Active Filter필터가 자체적으로 데이터를 가져오고 처리Kafka Consumer Worker 구조
Passive Filter외부 트리거나 파이프가 데이터를 전달해줄 때만 처리파이프 지향 모델
7. 배포 및 실행 모델모놀리식 구조 (Monolithic)단일 프로세스 내에 모든 필터 포함, 간단하지만 유연성 부족내장형 로컬 파이프라인
분산 시스템 구조 (Distributed)각 필터가 독립 노드 또는 컨테이너에서 실행, 스케일링 및 장애 격리에 유리Kafka + Flink 클러스터
마이크로서비스 구조 (Microservice)각 필터를 개별 마이크로서비스로 배포하여 유연성과 확장성 강화Kubernetes 기반 이벤트 파이프라인
8. 구현 방식소프트웨어 기반프로그래밍 언어의 스트림 처리 API 혹은 메시지 기반 미들웨어 활용Java Stream, Node.js Stream
하드웨어 기반CPU/GPU 등에서의 물리적 연산 파이프 구조 사용CPU 파이프라인, 디지털 신호 처리기 DSP

아키텍처 패턴 유형

선형 파이프라인 (Linear Pipeline)

graph LR
    A[Source] --> B[Filter1] --> C[Filter2] --> D[Filter3] --> E[Sink]

분기 파이프라인 (Branching Pipeline)

graph TD
    A[Source] --> B[Splitter]
    B --> C[Filter1]
    B --> D[Filter2]
    C --> E[Aggregator]
    D --> E
    E --> F[Sink]

복합 파이프라인 (Complex Pipeline)

graph TD
    A[Source1] --> C[Filter1]
    B[Source2] --> C
    C --> D{Router}
    D --> E[Filter2]
    D --> F[Filter3]
    E --> G[Aggregator]
    F --> G
    G --> H[Sink]

실무 사용 예시

도메인활용 목적기술 및 구성 요소핵심 효과
ETL / 데이터 파이프라인데이터 수집, 정제, 집계, 적재Apache NiFi, Apache Flink, Spark, Spring Cloud Data Flow, Talend유연한 구성, 코드 재사용성 ↑, 처리 시간 단축, 대용량 처리 최적화
로그 처리 / 보안 분석실시간 로그 필터링, 정제, 이상 탐지Kafka, Flink, ELK Stack, Splunk, Promtail, Fluent Bit실시간 처리, 보안 강화, 로그 통합 및 알림 기반 대응
스트리밍 분석 / 실시간 처리실시간 이벤트 흐름 분석, 반응형 처리Kafka, Storm, Flink, Apache Pulsar, InfluxDB, MQTT저지연 인사이트, 상태 기반 알림, IoT/센서 기반 분석 지원
컴파일러 / 코드 변환소스코드 분석, 변환, 최적화GCC, LLVM, Lexer → Parser → IR Optimizer → Code Generator코드 유지보수 용이, 다중 타겟 지원, 구조적 최적화
이미지/비디오 처리미디어 인코딩, 리사이징, 필터 효과 적용FFmpeg, GStreamer, ImageMagick, OpenCV자동화된 처리, 실시간 스트림 적용, 품질 보정 및 압축 최적화
웹 애플리케이션HTTP 요청 흐름 전처리 및 후처리FastAPI Middleware, Express.js, ASP.NET Core 미들웨어 체인요청별 모듈화 구성, 보안 처리 분리, 유지보수 효율화
IoT 시스템센서 데이터 수집 → 필터링 → 트리거 실행MQTT, Apache NiFi, AWS IoT Core, InfluxDB이벤트 기반 경고 시스템, 실시간 분석, 엣지 연산 구성 가능
Unix/Linux Shell텍스트 기반 데이터 흐름 처리cat, grep, awk, sort, uniq 등의 Unix 파이프 조합스크립트화 통한 자동화, 복잡한 처리 간소화, 데이터 전처리 간편화

기술 흐름별 파이프라인 구성 예시

파이프라인 유형구성 단계 (예시)
ETL 파이프라인Extractor → Transformer → Validator → Loader
로그 처리 파이프라인Log Collector → Filter → Enricher → Aggregator → Storage
스트리밍 분석 파이프라인Producer → Processor (e.g., Windowing) → Alert Generator → Notifier
미디어 처리 파이프라인Decoder → Filter (Noise Reduction) → Transcoder → Stream Output
컴파일러 파이프라인Lexer → Parser → IR Generator → Optimizer → Code Generator
웹 요청 처리 파이프라인Parser → Auth Middleware → Routing → Business Logic Handler → Response Formatter
이미지 처리 파이프라인Capture → Resize → Color Correct → Filter → Store/Display
보안 분석 파이프라인Packet Capture → Protocol Parser → Threat Analyzer → Alert System → Audit Log

활용 사례

사례 1: 영상 처리 파이프라인 (FFmpeg 기반)

구성도

graph LR
    I[Input Video] --> D1[Decode Filter]
    D1 --> F1[Filter: Crop]
    F1 --> F2[Filter: Scale]
    F2 --> F3[Filter: Overlay Watermark]
    F3 --> E1[Encode Filter]
    E1 --> O[Output File]

Pipe-and-Filter 의 역할:

유무 비교

기준Pipe-and-Filter 사용미사용
처리 흐름명확한 모듈 분리 및 재사용 가능처리 순서 변경 어려움
확장성필터만 추가하면 기능 확장 가능전체 시스템 수정 필요
유지보수개별 필터만 테스트 가능전체 로직 수정 필요

사례 2: 실시간 주문 처리 시스템

시스템 구성:

graph LR
    subgraph "Order Processing Pipeline"
        A[Order Source] --> B[Validation Filter]
        B --> C[Fraud Detection Filter]
        C --> D[Inventory Check Filter]
        D --> E[Payment Processing Filter]
        E --> F[Notification Filter]
        F --> G[Order Completion Sink]
        
        H[Audit Log] --> B
        H --> C
        H --> D
        H --> E
        H --> F
    end

시스템 워크플로우:

  1. 주문 수신 (Order Source)

    • 웹 애플리케이션에서 주문 데이터 수신
    • JSON 형식으로 표준화
    • 초기 메타데이터 추가
  2. 주문 검증 (Validation Filter)

    • 필수 필드 존재 여부 확인
    • 데이터 형식 검증
    • 비즈니스 규칙 적용
  3. 사기 탐지 (Fraud Detection Filter)

    • 머신러닝 모델을 통한 이상 패턴 감지
    • 블랙리스트 확인
    • 위험도 점수 계산
  4. 재고 확인 (Inventory Check Filter)

    • 실시간 재고 수량 조회
    • 재고 예약 처리
    • 배송 가능 여부 판단
  5. 결제 처리 (Payment Processing Filter)

    • 결제 게이트웨이 연동
    • 결제 승인 요청
    • 결제 결과 처리
  6. 알림 발송 (Notification Filter)

    • 고객 알림 메시지 생성
    • 이메일/SMS 발송
    • 내부 시스템 알림

Pipe-and-Filter 패턴의 역할:

패턴 유무에 따른 차이점:

구현 예시:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
#!/usr/bin/env python3
"""
Pipe-and-Filter 패턴을 활용한 실시간 주문 처리 시스템 구현

이 예시는 전자상거래 주문 처리를 위한 파이프라인을 보여줍니다.
각 필터는 독립적인 처리 단계를 담당하며, 파이프를 통해 연결됩니다.
"""

import abc
import json
import logging
import time
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio
from concurrent.futures import ThreadPoolExecutor

# 데이터 모델 정의
@dataclass
class OrderData:
    """주문 데이터 모델"""
    order_id: str
    customer_id: str
    product_id: str
    quantity: int
    price: float
    payment_method: str
    metadata: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}

class ProcessingStatus(Enum):
    """처리 상태 열거형"""
    SUCCESS = "success"
    FAILED = "failed"
    RETRY = "retry"

@dataclass
class ProcessingResult:
    """처리 결과 클래스"""
    status: ProcessingStatus
    data: Optional[OrderData] = None
    error_message: Optional[str] = None
    retry_count: int = 0

# 추상 필터 인터페이스
class Filter(abc.ABC):
    """필터 추상 클래스 - 모든 필터가 구현해야 하는 인터페이스"""
    
    def __init__(self, name: str):
        self.name = name
        self.logger = logging.getLogger(f"Filter.{name}")
        self.processed_count = 0
        self.error_count = 0
    
    @abc.abstractmethod
    async def process(self, data: OrderData) -> ProcessingResult:
        """데이터 처리 메서드 - 각 필터에서 구현"""
        pass
    
    def get_stats(self) -> Dict[str, int]:
        """필터 통계 정보 반환"""
        return {
            "processed": self.processed_count,
            "errors": self.error_count,
            "success_rate": (self.processed_count - self.error_count) / max(1, self.processed_count)
        }

# 구체적인 필터 구현들
class OrderValidationFilter(Filter):
    """주문 검증 필터 - 주문 데이터의 유효성을 검사"""
    
    def __init__(self):
        super().__init__("OrderValidation")
    
    async def process(self, data: OrderData) -> ProcessingResult:
        """주문 데이터 검증 처리"""
        self.logger.info(f"Validating order: {data.order_id}")
        
        try:
            # 필수 필드 검증
            if not all([data.order_id, data.customer_id, data.product_id]):
                raise ValueError("Missing required fields")
            
            # 수량 및 가격 검증
            if data.quantity <= 0 or data.price <= 0:
                raise ValueError("Invalid quantity or price")
            
            # 검증 완료 메타데이터 추가
            data.metadata["validated_at"] = time.time()
            data.metadata["validation_status"] = "passed"
            
            self.processed_count += 1
            return ProcessingResult(ProcessingStatus.SUCCESS, data)
            
        except Exception as e:
            self.error_count += 1
            self.logger.error(f"Validation failed for order {data.order_id}: {e}")
            return ProcessingResult(ProcessingStatus.FAILED, error_message=str(e))

class FraudDetectionFilter(Filter):
    """사기 탐지 필터 - 의심스러운 주문을 탐지"""
    
    def __init__(self):
        super().__init__("FraudDetection")
        self.suspicious_customers = {"CUST999", "CUST666"}  # 시뮬레이션용 블랙리스트
    
    async def process(self, data: OrderData) -> ProcessingResult:
        """사기 탐지 처리"""
        self.logger.info(f"Fraud detection for order: {data.order_id}")
        
        try:
            # 시뮬레이션: 머신러닝 모델 호출 (비동기 처리)
            await asyncio.sleep(0.1)  # 모델 추론 시간 시뮬레이션
            
            # 간단한 규칙 기반 사기 탐지
            risk_score = 0
            
            # 블랙리스트 고객 확인
            if data.customer_id in self.suspicious_customers:
                risk_score += 80
            
            # 고액 주문 확인
            if data.price > 10000:
                risk_score += 30
            
            # 대량 주문 확인
            if data.quantity > 100:
                risk_score += 20
            
            data.metadata["risk_score"] = risk_score
            data.metadata["fraud_checked_at"] = time.time()
            
            if risk_score > 70:
                self.error_count += 1
                return ProcessingResult(
                    ProcessingStatus.FAILED, 
                    error_message=f"High fraud risk: {risk_score}"
                )
            
            self.processed_count += 1
            return ProcessingResult(ProcessingStatus.SUCCESS, data)
            
        except Exception as e:
            self.error_count += 1
            self.logger.error(f"Fraud detection failed for order {data.order_id}: {e}")
            return ProcessingResult(ProcessingStatus.FAILED, error_message=str(e))

class InventoryCheckFilter(Filter):
    """재고 확인 필터 - 상품 재고를 확인하고 예약"""
    
    def __init__(self):
        super().__init__("InventoryCheck")
        # 시뮬레이션용 재고 데이터
        self.inventory = {
            "PROD001": 100,
            "PROD002": 50,
            "PROD003": 0  # 품절 상품
        }
    
    async def process(self, data: OrderData) -> ProcessingResult:
        """재고 확인 및 예약 처리"""
        self.logger.info(f"Checking inventory for order: {data.order_id}")
        
        try:
            # 시뮬레이션: 데이터베이스 조회
            await asyncio.sleep(0.05)
            
            current_stock = self.inventory.get(data.product_id, 0)
            
            if current_stock < data.quantity:
                self.error_count += 1
                return ProcessingResult(
                    ProcessingStatus.FAILED,
                    error_message=f"Insufficient stock. Available: {current_stock}, Requested: {data.quantity}"
                )
            
            # 재고 예약 (실제로는 데이터베이스 트랜잭션 처리)
            self.inventory[data.product_id] -= data.quantity
            
            data.metadata["inventory_reserved"] = data.quantity
            data.metadata["remaining_stock"] = self.inventory[data.product_id]
            data.metadata["inventory_checked_at"] = time.time()
            
            self.processed_count += 1
            return ProcessingResult(ProcessingStatus.SUCCESS, data)
            
        except Exception as e:
            self.error_count += 1
            self.logger.error(f"Inventory check failed for order {data.order_id}: {e}")
            return ProcessingResult(ProcessingStatus.FAILED, error_message=str(e))

class PaymentProcessingFilter(Filter):
    """결제 처리 필터 - 결제 게이트웨이와 연동하여 결제 처리"""
    
    def __init__(self):
        super().__init__("PaymentProcessing")
    
    async def process(self, data: OrderData) -> ProcessingResult:
        """결제 처리"""
        self.logger.info(f"Processing payment for order: {data.order_id}")
        
        try:
            # 시뮬레이션: 외부 결제 게이트웨이 호출
            await asyncio.sleep(0.2)  # 네트워크 지연 시뮬레이션
            
            total_amount = data.price * data.quantity
            
            # 간단한 결제 시뮬레이션 (90% 성공률)
            import random
            if random.random() < 0.9:
                transaction_id = f"TXN_{data.order_id}_{int(time.time())}"
                
                data.metadata["payment_status"] = "completed"
                data.metadata["transaction_id"] = transaction_id
                data.metadata["total_amount"] = total_amount
                data.metadata["payment_processed_at"] = time.time()
                
                self.processed_count += 1
                return ProcessingResult(ProcessingStatus.SUCCESS, data)
            else:
                self.error_count += 1
                return ProcessingResult(
                    ProcessingStatus.RETRY,  # 결제 실패는 재시도 가능
                    error_message="Payment gateway error"
                )
                
        except Exception as e:
            self.error_count += 1
            self.logger.error(f"Payment processing failed for order {data.order_id}: {e}")
            return ProcessingResult(ProcessingStatus.FAILED, error_message=str(e))

# 파이프라인 클래스
class OrderProcessingPipeline:
    """주문 처리 파이프라인 - 여러 필터를 연결하여 순차적으로 처리"""
    
    def __init__(self):
        self.filters: List[Filter] = []
        self.logger = logging.getLogger("Pipeline")
        self.executor = ThreadPoolExecutor(max_workers=4)
    
    def add_filter(self, filter_instance: Filter) -> 'OrderProcessingPipeline':
        """필터를 파이프라인에 추가 (체이닝 방식)"""
        self.filters.append(filter_instance)
        return self
    
    async def process_order(self, order: OrderData) -> ProcessingResult:
        """단일 주문을 파이프라인을 통해 처리"""
        self.logger.info(f"Starting pipeline processing for order: {order.order_id}")
        
        current_data = order
        
        for filter_instance in self.filters:
            try:
                result = await filter_instance.process(current_data)
                
                if result.status == ProcessingStatus.SUCCESS:
                    current_data = result.data
                    self.logger.debug(f"Filter {filter_instance.name} completed successfully")
                elif result.status == ProcessingStatus.RETRY:
                    # 재시도 로직 (간단한 구현)
                    self.logger.warning(f"Filter {filter_instance.name} requires retry: {result.error_message}")
                    await asyncio.sleep(1)  # 재시도 대기
                    result = await filter_instance.process(current_data)
                    if result.status != ProcessingStatus.SUCCESS:
                        return result
                    current_data = result.data
                else:
                    self.logger.error(f"Filter {filter_instance.name} failed: {result.error_message}")
                    return result
                    
            except Exception as e:
                self.logger.error(f"Unexpected error in filter {filter_instance.name}: {e}")
                return ProcessingResult(ProcessingStatus.FAILED, error_message=str(e))
        
        self.logger.info(f"Pipeline processing completed for order: {order.order_id}")
        return ProcessingResult(ProcessingStatus.SUCCESS, current_data)
    
    async def process_batch(self, orders: List[OrderData]) -> List[ProcessingResult]:
        """여러 주문을 병렬로 처리"""
        self.logger.info(f"Processing batch of {len(orders)} orders")
        
        tasks = [self.process_order(order) for order in orders]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 예외 처리
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                processed_results.append(ProcessingResult(
                    ProcessingStatus.FAILED,
                    error_message=f"Batch processing error: {result}"
                ))
            else:
                processed_results.append(result)
        
        return processed_results
    
    def get_pipeline_stats(self) -> Dict[str, Any]:
        """파이프라인 전체 통계 정보 반환"""
        stats = {}
        for filter_instance in self.filters:
            stats[filter_instance.name] = filter_instance.get_stats()
        return stats

# 사용 예시 및 테스트
async def main():
    """메인 실행 함수 - 파이프라인 설정 및 테스트"""
    
    # 로깅 설정
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # 파이프라인 구성
    pipeline = OrderProcessingPipeline()
    pipeline.add_filter(OrderValidationFilter()) \
           .add_filter(FraudDetectionFilter()) \
           .add_filter(InventoryCheckFilter()) \
           .add_filter(PaymentProcessingFilter())
    
    # 테스트 주문 데이터 생성
    test_orders = [
        OrderData("ORD001", "CUST001", "PROD001", 2, 100.0, "credit_card"),
        OrderData("ORD002", "CUST002", "PROD002", 1, 50.0, "debit_card"),
        OrderData("ORD003", "CUST999", "PROD001", 1, 200.0, "credit_card"),  # 사기 위험
        OrderData("ORD004", "CUST003", "PROD003", 1, 75.0, "paypal"),  # 재고 부족
        OrderData("ORD005", "CUST004", "PROD001", 150, 25000.0, "credit_card"),  # 고위험 + 대량
    ]
    
    # 배치 처리 실행
    print("=== Starting Order Processing Pipeline ===")
    start_time = time.time()
    
    results = await pipeline.process_batch(test_orders)
    
    processing_time = time.time() - start_time
    
    # 결과 출력
    print(f"\n=== Processing Results (Total time: {processing_time:f}s) ===")
    for i, result in enumerate(results):
        order = test_orders[i]
        print(f"\nOrder {order.order_id}:")
        print(f"  Status: {result.status.value}")
        if result.status == ProcessingStatus.SUCCESS:
            print(f"  Metadata: {json.dumps(result.data.metadata, indent=2)}")
        else:
            print(f"  Error: {result.error_message}")
    
    # 파이프라인 통계 출력
    print("\n=== Pipeline Statistics ===")
    stats = pipeline.get_pipeline_stats()
    for filter_name, filter_stats in stats.items():
        print(f"{filter_name}:")
        print(f"  Processed: {filter_stats['processed']}")
        print(f"  Errors: {filter_stats['errors']}")
        print(f"  Success Rate: {filter_stats['success_rate']:%}")

if __name__ == "__main__":
    # 비동기 실행
    asyncio.run(main())

사례 3: 로그 데이터 정제 및 분석 파이프라인

시스템 구성: 로그 소스 → 파싱 필터 → 정제 필터 → 분석 필터 → 저장소 → 모니터링/알림

Workflow

  1. 로그 소스에서 데이터 수집
  2. 파싱 필터에서 구조화
  3. 정제 필터에서 불필요 정보 제거
  4. 분석 필터에서 패턴 분석 및 통계 산출
  5. 결과를 저장소에 적재, 모니터링 시스템 연동
flowchart LR
    S[로그 소스] --> P[파싱 필터]
    P --> C[정제 필터]
    C --> A[분석 필터]
    A --> D[저장소]
    D --> M[모니터링/알림]

Pipe-and-Filter Architecture 의 역할: 각 단계가 독립적으로 구현되어 유지보수와 확장 용이, 장애 발생 시 단계별로 문제 격리 가능

유무에 따른 차이점: 미적용 시, 단일 모놀리식 구조로 인해 유지보수, 확장, 장애 대응이 어려움

구현 예시:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Pipe-and-Filter 구조의 간단한 로그 처리 파이프라인 예시

def parse_filter(logs):
    # 파싱: 로그를 구조화된 딕셔너리로 변환
    for log in logs:
        yield {"raw": log, "parsed": log.split(" ")}

def clean_filter(parsed_logs):
    # 정제: 불필요한 정보 제거
    for log in parsed_logs:
        if "ERROR" in log["parsed"]:
            yield log

def analyze_filter(cleaned_logs):
    # 분석: 에러 로그 카운트
    count = 0
    for log in cleaned_logs:
        count += 1
    yield {"error_count": count}

# 파이프라인 실행
logs = ["2023-01-01 INFO Start", "2023-01-01 ERROR Fail", "2023-01-01 ERROR Crash"]
parsed = parse_filter(logs)
cleaned = clean_filter(parsed)
analyzed = analyze_filter(cleaned)
for result in analyzed:
    print(result)  # {'error_count': 2}

사례 4: 실시간 로그 변환 시스템

graph LR
  LogSource --> ParseFilter --> CleanFilter --> TransformFilter --> DBSink

사례 5: Apache NiFi 를 활용한 IoT 데이터 파이프라인

Apache NiFi 는 대표적인 Pipe-and-Filter Architecture 기반의 데이터 통합 플랫폼.
IoT 센서 데이터를 실시간으로 수집, 처리, 분석하는 시스템에서 활용되었다.

시스템 구성:

시스템 구성 다이어그램:

graph TB
    subgraph "IoT Sensor Network"
        S1[Temperature Sensor]
        S2[Humidity Sensor]
        S3[Pressure Sensor]
    end
    
    subgraph "NiFi Pipeline"
        MQTT[ConsumeMQTT]
        JSON[EvaluateJsonPath]
        SPLIT[SplitJson]
        ROUTE[RouteOnAttribute]
        TRANS[JoltTransformJSON]
        VALID[ValidateRecord]
        ENRICH[EnrichData]
        AGG[MergeRecord]
    end
    
    subgraph "Storage & Analytics"
        INFLUX[InfluxDB]
        ES[Elasticsearch]
        GRAFANA[Grafana]
    end
    
    S1 --> MQTT
    S2 --> MQTT
    S3 --> MQTT
    
    MQTT --> JSON
    JSON --> SPLIT
    SPLIT --> ROUTE
    ROUTE --> TRANS
    TRANS --> VALID
    VALID --> ENRICH
    ENRICH --> AGG
    
    AGG --> INFLUX
    AGG --> ES
    INFLUX --> GRAFANA
    ES --> GRAFANA

Workflow:

  1. 데이터 수집: MQTT 브로커에서 센서 데이터 실시간 구독
  2. 데이터 파싱: JSON 구조 분석 및 필드 추출
  3. 데이터 분할: 센서 타입별로 데이터 스트림 분리
  4. 조건부 라우팅: 센서 타입에 따른 처리 경로 선택
  5. 데이터 변환: 표준 포맷으로 스키마 변환
  6. 데이터 검증: 범위 체크 및 이상 데이터 필터링
  7. 데이터 보강: 메타데이터 추가 (위치 정보, 타임스탬프)
  8. 배치 처리: 시계열 데이터를 배치 단위로 그룹화
  9. 데이터 저장: 실시간 데이터베이스와 검색 엔진에 병렬 저장

Pipe-and-Filter Architecture 의 역할:

아키텍처 유무에 따른 차이점:

구현 예시:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
"""
IoT 센서 데이터 처리 파이프라인 구현 예시
Pipe-and-Filter Architecture를 활용한 실시간 데이터 처리
"""

import json
import queue
import threading
import time
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Dict, List, Optional

# 파이프 인터페이스 정의
class Pipe:
    """필터 간 데이터 전송을 담당하는 파이프 클래스"""
    
    def __init__(self, buffer_size: int = 100):
        self.queue = queue.Queue(maxsize=buffer_size)
        self.closed = False
    
    def send(self, data: Any) -> bool:
        """데이터를 파이프에 전송"""
        if self.closed:
            return False
        try:
            self.queue.put(data, block=False)
            return True
        except queue.Full:
            return False
    
    def receive(self, timeout: float = 1.0) -> Optional[Any]:
        """파이프에서 데이터를 수신"""
        try:
            return self.queue.get(timeout=timeout)
        except queue.Empty:
            return None
    
    def close(self):
        """파이프 종료"""
        self.closed = True

# 필터 기본 클래스
class Filter(ABC):
    """모든 필터의 기본 클래스"""
    
    def __init__(self, name: str):
        self.name = name
        self.input_pipe: Optional[Pipe] = None
        self.output_pipe: Optional[Pipe] = None
        self.running = False
        self.thread: Optional[threading.Thread] = None
    
    def connect_input(self, pipe: Pipe):
        """입력 파이프 연결"""
        self.input_pipe = pipe
    
    def connect_output(self, pipe: Pipe):
        """출력 파이프 연결"""
        self.output_pipe = pipe
    
    @abstractmethod
    def process(self, data: Any) -> Optional[Any]:
        """데이터 처리 로직 (서브클래스에서 구현)"""
        pass
    
    def run(self):
        """필터 실행 메인 루프"""
        self.running = True
        while self.running:
            if self.input_pipe:
                data = self.input_pipe.receive()
                if data is not None:
                    processed_data = self.process(data)
                    if processed_data is not None and self.output_pipe:
                        success = self.output_pipe.send(processed_data)
                        if not success:
                            print(f"[{self.name}] Output pipe full, dropping data")
            else:
                time.sleep(0.1)
    
    def start(self):
        """필터 시작"""
        self.thread = threading.Thread(target=self.run, name=self.name)
        self.thread.start()
    
    def stop(self):
        """필터 종료"""
        self.running = False
        if self.thread:
            self.thread.join()

# 소스 필터 - MQTT 센서 데이터 시뮬레이션
class SensorSourceFilter(Filter):
    """IoT 센서 데이터를 생성하는 소스 필터"""
    
    def __init__(self, sensor_id: str, sensor_type: str):
        super().__init__(f"SensorSource-{sensor_id}")
        self.sensor_id = sensor_id
        self.sensor_type = sensor_type
        self.data_counter = 0
    
    def process(self, data: Any) -> Optional[Any]:
        # 소스 필터는 입력 데이터를 처리하지 않음
        return None
    
    def run(self):
        """센서 데이터 생성 및 전송"""
        self.running = True
        while self.running:
            # 센서 데이터 시뮬레이션
            sensor_data = {
                "sensor_id": self.sensor_id,
                "sensor_type": self.sensor_type,
                "timestamp": datetime.now().isoformat(),
                "value": self._generate_sensor_value(),
                "sequence": self.data_counter
            }
            
            if self.output_pipe:
                success = self.output_pipe.send(sensor_data)
                if success:
                    self.data_counter += 1
                    print(f"[{self.name}] Generated: {sensor_data}")
                else:
                    print(f"[{self.name}] Output pipe full")
            
            time.sleep(2)  # 2초마다 데이터 생성
    
    def _generate_sensor_value(self) -> float:
        """센서 타입에 따른 값 생성"""
        import random
        if self.sensor_type == "temperature":
            return round(random.uniform(20.0, 30.0), 2)
        elif self.sensor_type == "humidity":
            return round(random.uniform(40.0, 80.0), 2)
        elif self.sensor_type == "pressure":
            return round(random.uniform(1000.0, 1020.0), 2)
        return 0.0

# JSON 파싱 필터
class JsonParseFilter(Filter):
    """JSON 데이터를 파싱하고 검증하는 필터"""
    
    def __init__(self):
        super().__init__("JsonParser")
    
    def process(self, data: Any) -> Optional[Any]:
        """JSON 데이터 파싱 및 구조 검증"""
        try:
            if isinstance(data, str):
                parsed_data = json.loads(data)
            else:
                parsed_data = data
            
            # 필수 필드 검증
            required_fields = ["sensor_id", "sensor_type", "timestamp", "value"]
            for field in required_fields:
                if field not in parsed_data:
                    print(f"[{self.name}] Missing field: {field}")
                    return None
            
            print(f"[{self.name}] Parsed: {parsed_data['sensor_id']}")
            return parsed_data
            
        except json.JSONDecodeError as e:
            print(f"[{self.name}] JSON parse error: {e}")
            return None
        except Exception as e:
            print(f"[{self.name}] Processing error: {e}")
            return None

# 데이터 검증 필터
class ValidationFilter(Filter):
    """센서 데이터 값의 유효성을 검증하는 필터"""
    
    def __init__(self):
        super().__init__("Validator")
        # 센서 타입별 유효 범위 정의
        self.valid_ranges = {
            "temperature": (0.0, 50.0),
            "humidity": (0.0, 100.0),
            "pressure": (950.0, 1050.0)
        }
    
    def process(self, data: Any) -> Optional[Any]:
        """데이터 값 범위 검증"""
        try:
            sensor_type = data["sensor_type"]
            value = float(data["value"])
            
            if sensor_type in self.valid_ranges:
                min_val, max_val = self.valid_ranges[sensor_type]
                if min_val <= value <= max_val:
                    print(f"[{self.name}] Valid: {sensor_type}={value}")
                    return data
                else:
                    print(f"[{self.name}] Invalid range: {sensor_type}={value}")
                    return None
            else:
                print(f"[{self.name}] Unknown sensor type: {sensor_type}")
                return None
                
        except (KeyError, ValueError, TypeError) as e:
            print(f"[{self.name}] Validation error: {e}")
            return None

# 데이터 변환 필터
class TransformFilter(Filter):
    """데이터 형식을 표준화하는 변환 필터"""
    
    def __init__(self):
        super().__init__("Transformer")
    
    def process(self, data: Any) -> Optional[Any]:
        """데이터를 표준 형식으로 변환"""
        try:
            # 표준 형식으로 데이터 변환
            transformed_data = {
                "id": data["sensor_id"],
                "type": data["sensor_type"],
                "timestamp": data["timestamp"],
                "measurement": {
                    "value": data["value"],
                    "unit": self._get_unit(data["sensor_type"])
                },
                "metadata": {
                    "sequence": data.get("sequence", 0),
                    "processed_at": datetime.now().isoformat()
                }
            }
            
            print(f"[{self.name}] Transformed: {transformed_data['id']}")
            return transformed_data
            
        except Exception as e:
            print(f"[{self.name}] Transform error: {e}")
            return None
    
    def _get_unit(self, sensor_type: str) -> str:
        """센서 타입에 따른 단위 반환"""
        units = {
            "temperature": "°C",
            "humidity": "%",
            "pressure": "hPa"
        }
        return units.get(sensor_type, "")

# 라우터 필터
class RouterFilter(Filter):
    """센서 타입에 따라 데이터를 다른 경로로 라우팅"""
    
    def __init__(self):
        super().__init__("Router")
        self.output_pipes: Dict[str, Pipe] = {}
    
    def add_route(self, sensor_type: str, pipe: Pipe):
        """특정 센서 타입에 대한 출력 파이프 추가"""
        self.output_pipes[sensor_type] = pipe
    
    def process(self, data: Any) -> Optional[Any]:
        """센서 타입에 따른 라우팅"""
        try:
            sensor_type = data["type"]
            if sensor_type in self.output_pipes:
                success = self.output_pipes[sensor_type].send(data)
                if success:
                    print(f"[{self.name}] Routed {sensor_type} data")
                else:
                    print(f"[{self.name}] Route pipe full for {sensor_type}")
            else:
                print(f"[{self.name}] No route for {sensor_type}")
            
            # 라우터는 원본 데이터를 변경하지 않고 전달만 함
            return None
            
        except Exception as e:
            print(f"[{self.name}] Routing error: {e}")
            return None

# 싱크 필터
class DatabaseSinkFilter(Filter):
    """처리된 데이터를 데이터베이스에 저장하는 싱크 필터"""
    
    def __init__(self, db_name: str):
        super().__init__(f"DatabaseSink-{db_name}")
        self.db_name = db_name
        self.stored_count = 0
    
    def process(self, data: Any) -> Optional[Any]:
        """데이터베이스에 데이터 저장 (시뮬레이션)"""
        try:
            # 실제 구현에서는 데이터베이스 연결 및 저장 수행
            print(f"[{self.name}] Storing to {self.db_name}: {data['id']} = {data['measurement']['value']}")
            self.stored_count += 1
            
            # 싱크 필터는 출력이 없음
            return None
            
        except Exception as e:
            print(f"[{self.name}] Storage error: {e}")
            return None

# 파이프라인 관리자
class Pipeline:
    """전체 파이프라인을 관리하는 클래스"""
    
    def __init__(self):
        self.filters: List[Filter] = []
        self.pipes: List[Pipe] = []
    
    def add_filter(self, filter_instance: Filter):
        """필터를 파이프라인에 추가"""
        self.filters.append(filter_instance)
    
    def add_pipe(self, pipe: Pipe):
        """파이프를 파이프라인에 추가"""
        self.pipes.append(pipe)
    
    def connect(self, source_filter: Filter, target_filter: Filter, pipe: Pipe):
        """두 필터를 파이프로 연결"""
        source_filter.connect_output(pipe)
        target_filter.connect_input(pipe)
        self.add_pipe(pipe)
    
    def start(self):
        """전체 파이프라인 시작"""
        print("Starting pipeline…")
        for filter_instance in self.filters:
            filter_instance.start()
        print("Pipeline started!")
    
    def stop(self):
        """전체 파이프라인 종료"""
        print("Stopping pipeline…")
        for filter_instance in self.filters:
            filter_instance.stop()
        for pipe in self.pipes:
            pipe.close()
        print("Pipeline stopped!")

# 메인 실행 예제
def main():
    """IoT 데이터 처리 파이프라인 실행 예제"""
    
    # 파이프라인 생성
    pipeline = Pipeline()
    
    # 필터 인스턴스 생성
    temp_source = SensorSourceFilter("TEMP_01", "temperature")
    humidity_source = SensorSourceFilter("HUM_01", "humidity")
    json_parser = JsonParseFilter()
    validator = ValidationFilter()
    transformer = TransformFilter()
    router = RouterFilter()
    temp_sink = DatabaseSinkFilter("TemperatureDB")
    humidity_sink = DatabaseSinkFilter("HumidityDB")
    
    # 파이프 생성
    temp_to_parser = Pipe()
    humidity_to_parser = Pipe()
    parser_to_validator = Pipe()
    validator_to_transformer = Pipe()
    transformer_to_router = Pipe()
    temp_route_pipe = Pipe()
    humidity_route_pipe = Pipe()
    
    # 필터들을 파이프라인에 추가
    pipeline.add_filter(temp_source)
    pipeline.add_filter(humidity_source)
    pipeline.add_filter(json_parser)
    pipeline.add_filter(validator)
    pipeline.add_filter(transformer)
    pipeline.add_filter(router)
    pipeline.add_filter(temp_sink)
    pipeline.add_filter(humidity_sink)
    
    # 파이프라인 연결
    pipeline.connect(temp_source, json_parser, temp_to_parser)
    pipeline.connect(humidity_source, json_parser, humidity_to_parser)
    pipeline.connect(json_parser, validator, parser_to_validator)
    pipeline.connect(validator, transformer, validator_to_transformer)
    pipeline.connect(transformer, router, transformer_to_router)
    
    # 라우터 설정
    router.add_route("temperature", temp_route_pipe)
    router.add_route("humidity", humidity_route_pipe)
    temp_sink.connect_input(temp_route_pipe)
    humidity_sink.connect_input(humidity_route_pipe)
    pipeline.add_pipe(temp_route_pipe)
    pipeline.add_pipe(humidity_route_pipe)
    
    try:
        # 파이프라인 시작
        pipeline.start()
        
        # 30초간 실행
        print("Pipeline running for 30 seconds…")
        time.sleep(30)
        
    except KeyboardInterrupt:
        print("\nReceived interrupt signal")
    finally:
        # 파이프라인 종료
        pipeline.stop()

if __name__ == "__main__":
    main()

사례 6: 이미지 처리 파이프라인

시스템 구성: S3(소스) → Lambda(리사이징) → Lambda(워터마킹) → S3(싱크)

flowchart LR
    U[사용자 업로드] --> S3[S3 저장소]
    S3 --> L1[Lambda: 리사이징]
    L1 --> L2[Lambda: 워터마킹]
    L2 --> S3R[결과 S3 저장소]

Workflow: 사용자가 이미지를 업로드하면, S3 이벤트로 Lambda 함수 (필터) 가 순차적으로 호출되어 리사이징, 워터마킹 후 결과를 저장

Pipe-and-Filter 역할: 각 Lambda 함수가 독립적인 필터로 동작, 파이프는 S3 이벤트 및 데이터 전달 역할

유무에 따른 차이점: Pipe-and-Filter 구조 미도입 시, 모든 처리가 하나의 모놀리식 함수에서 이루어져 유지보수와 확장성이 저하됨. 도입 시 각 단계가 분리되어 관리, 확장, 장애 대응이 용이함.

구현 예시:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Pipe-and-Filter 구조의 간단한 예시
# 각 필터는 독립적으로 동작하며, 파이프 역할은 함수 호출로 구현

def resize_filter(image):
    # 이미지 리사이징 필터
    # (실제 구현은 생략)
    return f"리사이징({image})"

def watermark_filter(image):
    # 워터마킹 필터
    # (실제 구현은 생략)
    return f"워터마킹({image})"

def pipeline(image):
    # 파이프라인: 필터를 순차적으로 적용
    image = resize_filter(image)
    image = watermark_filter(image)
    return image

# 사용 예시
input_image = "input.jpg"
output_image = pipeline(input_image)
print(output_image)  # 출력: 워터마킹(리사이징(input.jpg))

구현 예시

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from abc import ABC, abstractmethod
from typing import Any, List
import queue
import threading
import time

# 필터의 기본 인터페이스
class Filter(ABC):
    def __init__(self, name: str):
        self.name = name
        self.input_pipe = queue.Queue()
        self.output_pipe = queue.Queue()
        self._is_running = False
        self._thread = None

    @abstractmethod
    def process(self, data: Any) -> Any:
        """데이터를 처리하는 추상 메서드"""
        pass

    def start(self):
        """필터 처리를 시작"""
        self._is_running = True
        self._thread = threading.Thread(target=self._run)
        self._thread.daemon = True
        self._thread.start()

    def stop(self):
        """필터 처리를 중지"""
        self._is_running = False
        if self._thread:
            self._thread.join()

    def _run(self):
        """필터의 메인 처리 루프"""
        while self._is_running:
            try:
                # 입력 파이프에서 데이터를 가져옴
                data = self.input_pipe.get(timeout=1.0)
                if data is None:  # 종료 시그널
                    break
                    
                # 데이터 처리 및 결과를 출력 파이프로 전달
                result = self.process(data)
                if result is not None:
                    self.output_pipe.put(result)
                    
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Error in {self.name}: {e}")

# 구체적인 필터 구현
class TextNormalizationFilter(Filter):
    """텍스트를 정규화하는 필터"""
    def process(self, text: str) -> str:
        # 소문자로 변환하고 앞뒤 공백 제거
        normalized = text.lower().strip()
        print(f"{self.name}: Normalized text to '{normalized}'")
        return normalized

class PunctuationRemovalFilter(Filter):
    """구두점을 제거하는 필터"""
    def process(self, text: str) -> str:
        # 기본적인 구두점 제거
        cleaned = ''.join(char for char in text if char.isalnum() or char.isspace())
        print(f"{self.name}: Removed punctuation -> '{cleaned}'")
        return cleaned

class WordCountFilter(Filter):
    """단어 수를 세는 필터"""
    def process(self, text: str) -> dict:
        # 단어별 출현 횟수 계산
        words = text.split()
        word_count = {}
        for word in words:
            word_count[word] = word_count.get(word, 0) + 1
        print(f"{self.name}: Counted words -> {word_count}")
        return word_count

# 파이프라인 관리자
class Pipeline:
    def __init__(self):
        self.filters: List[Filter] = []

    def add_filter(self, filter: Filter):
        """파이프라인에 필터 추가"""
        if self.filters:
            # 이전 필터의 출력을 현재 필터의 입력으로 연결
            prev_filter = self.filters[-1]
            filter.input_pipe = prev_filter.output_pipe
        self.filters.append(filter)

    def start(self):
        """모든 필터 시작"""
        print("Starting pipeline…")
        for filter in self.filters:
            filter.start()

    def stop(self):
        """모든 필터 중지"""
        print("Stopping pipeline…")
        for filter in self.filters:
            filter.stop()

    def process(self, data: Any):
        """데이터 처리 시작"""
        if not self.filters:
            return data
            
        # 첫 번째 필터에 데이터 입력
        first_filter = self.filters[0]
        first_filter.input_pipe.put(data)
        
        # 마지막 필터의 결과 반환
        return self.filters[-1].output_pipe.get()

# 사용 예시
def main():
    # 파이프라인 생성 및 필터 추가
    pipeline = Pipeline()
    pipeline.add_filter(TextNormalizationFilter("Normalizer"))
    pipeline.add_filter(PunctuationRemovalFilter("PunctuationCleaner"))
    pipeline.add_filter(WordCountFilter("WordCounter"))

    # 파이프라인 시작
    pipeline.start()

    try:
        # 테스트 데이터 처리
        input_text = "Hello, World! Hello, Python… Hello, Pipe-Filter Pattern!"
        print(f"\nProcessing text: '{input_text}'")
        
        result = pipeline.process(input_text)
        print(f"\nFinal result: {result}")

    finally:
        # 파이프라인 종료
        pipeline.stop()

if __name__ == "__main__":
    main()

기타 사항

보안 고려사항

데이터 암호화

접근 제어

테스트 전략

단위 테스트

통합 테스트

문서화 및 거버넌스

아키텍처 문서

개발 가이드라인

실무에서 효과적으로 적용하기 위한 고려사항 및 주의할 점

분류고려 항목설명권장 전략
설계필터 크기 / 역할 분리필터가 지나치게 작으면 오버헤드 발생, 크면 재사용성 저하단일 책임 원칙 (SRP) 을 유지하되 성능 고려하여 적절한 기능 단위 분할
데이터 포맷 표준화필터 간 포맷 불일치 시 오류 발생, 유지보수 어려움Avro/Protobuf 등 스키마 기반 포맷 사용, Schema Registry 도입
인터페이스 정의느슨한 결합을 위한 명확한 입력/출력 계약 필요인터페이스 명세 문서화, OpenAPI 또는 gRPC 사용 시 IDL 로 정의
처리 순서 보장필터의 순서가 의미에 따라 논리적으로 중요함흐름 정의 문서화, 순서 변경 허용 시 영향도 분석 및 테스트 체계 구축
상태 관리상태 공유로 인해 병렬화/확장성 저하 위험 존재Stateless 기본, 필요 시 외부 상태 저장소 (Redis 등) 사용
구현오류 격리 / 복구한 필터 오류로 전체 시스템 중단 방지 필요Circuit Breaker, DLQ(Dead Letter Queue), 재시도 로직 적용
백프레셔 / 버퍼링 관리처리 속도 차이로 인한 메모리 폭증 및 병목 발생 가능적응형 버퍼 크기, 비동기 큐, Flow Control, Bounded Channel 등 적용
병렬 처리 및 확장각 필터를 병렬로 실행하여 처리량 향상필터별 병렬 워커 수 조정, CPU 및 메시지 부하 기반 Autoscaling
비동기 처리I/O 중심 필터에서 동기 방식은 성능 저하 유발asyncio, Reactor, Kafka Consumer 등 논블로킹 방식 사용
테스트 자동화필터 단위 및 전체 흐름 테스트 중요Unit Test (pytest, JUnit), E2E Test + Mocking 구성
운영관찰성 (Observability)문제 원인 추적 및 성능 병목 분석을 위한 가시성 필요Prometheus, Grafana, OpenTelemetry, Zipkin 연동
구조화된 로깅로그 검색/추적 어려움, 분산 환경에서는 상관 관계 파악 필요JSON 기반 로그, Trace/Correlation ID 포함, 중앙 로그 수집 (ELK, Loki 등)
에러 처리 및 알림 체계 구축장애 발생 시 빠른 인지와 복구 중요Alertmanager, Slack/PagerDuty 연동, SLA 기반 에러 레벨 설정
운영 자동화배포/스케일링/복구 자동화 필요Kubernetes Job/CronJob, Helm, CI/CD 파이프라인 활용
보안데이터 보호민감한 데이터 전송 시 보안 미흡은 치명적전송 중 암호화 (TLS), 저장소 암호화 (AES), 인증/인가 기반 API 보안
인증/인가각 필터 또는 인터페이스 호출 시 무분별한 접근 방지OAuth 2.0, JWT, API Gateway 인증 정책 적용
확장성유연한 확장 설계요구사항 변화에 따른 필터 추가/제거의 용이성 확보마이크로서비스 형태 배포, Loose Coupling 구조 유지, 서비스 레지스트리 도입
수평 확장부하 증가 대응을 위한 스케일 아웃 구조 필요Kubernetes + Horizontal Pod Autoscaler, Sharding, Partitioning 전략 적용

최적화하기 위한 고려사항 및 주의할 점

최적화 영역구체 항목설명권장 전략 및 적용 기법
성능 최적화메모리 관리불필요한 데이터 복사, GC 부하 등으로 인한 성능 저하 방지Zero-copy, 객체 재사용, 메모리 풀
CPU 효율성필터 로직 복잡도, 연산 병목에 따른 CPU 자원 낭비 방지연산 병합, JIT, SIMD, 핫스팟 최적화
네트워크 효율화분산 필터 간 통신 시 전송량 및 지연 최소화압축, 배치 전송, CDN, 커넥션 풀링
데이터 복사 최소화파이프 간 참조 전달을 통해 처리 비용 절감Pass-by-reference 처리 방식 적용
처리량 향상병렬 처리필터 단위 병렬화 또는 내부 병렬 처리를 통해 전체 처리량 향상워커 풀, 멀티스레드, 멀티프로세스, async I/O
배치 처리단일 레코드 대신 일정 단위로 묶어 처리하여 I/O 및 처리 비용 감소윈도우 기반 집계, 버퍼 기반 배치 실행
상태 최소화상태 유지 시 성능 저하 우려, 외부 저장소 활용 권장상태 분리, Redis/DB 연동, CQRS 활용
지연 시간 감소실시간 처리 최적화실시간 스트림 처리 시 지연 최소화 필요인메모리 처리, 사전 할당, NoGC 환경
캐싱 전략자주 사용하는 데이터를 필터 앞단에서 캐싱하여 응답 속도 향상LRU, 계층 캐싱, local/distributed cache
로깅 최적화과도한 로그 출력은 지연 증가 및 디스크/CPU 사용률 상승 원인수준별 로깅, 핵심 이벤트 중심 로그 설정
리소스 관리리소스 사용량 관리필터별 자원 과소/과다 할당 방지 및 유휴 자원 활용리소스 쿼터 설정, JVM/Container 리밋 조정
자동 스케일링처리량에 따라 유동적으로 필터 수를 확장/축소HPA, KEDA, 워커 큐 기반 스케일링
버퍼 크기 최적화파이프 간 데이터 흐름 최적화를 위한 버퍼 동적 조정처리율 기반 버퍼 크기 튜닝, backpressure 연동
운영 안정성병목 분석특정 필터가 전체 흐름을 제한하는 경우 성능 급락 유발 가능병목 필터 분리, 병렬 처리, 필터 분해
장애 격리 및 복구특정 필터 실패가 전체 파이프라인에 영향을 미치지 않도록 설계Circuit Breaker, DLQ, Checkpointing
순서 보장병렬 처리 또는 메시지 재전송 시 순서 변경으로 인한 데이터 정합성 문제 가능메시지 ID, 정렬 버퍼, 순차 큐 처리
품질/테스트단위 테스트 자동화필터 단위 검증 없이 전체 파이프라인만 검증할 경우 문제 추적이 어려움필터 단위 테스트 작성, CI/CD 연동
성능 테스트실제 처리량, 지연 시간에 대한 사전 검증 부족 시 운영 중 장애 유발 가능부하 테스트, 지표 기반 벤치마킹 (JMeter, k6 등)

주제와 관련하여 주목할 내용

카테고리주제항목설명
1. 아키텍처 구조아키텍처 스타일Pipe-and-Filter모듈화, 재사용성, 병렬 처리에 적합한 흐름 기반 아키텍처 스타일
데이터 흐름일방향 / 양방향데이터가 일직선으로 또는 양방향으로 이동 가능
데이터 처리 모델Stateless / Stateful Filter필터의 상태 여부에 따라 구현 방식과 테스트 전략이 달라짐
동기화 구조동기 / 비동기 / 버퍼 기반파이프의 연결 방식 및 처리 흐름을 제어하는 구조
2. 구현 전략모듈화필터 분리 및 독립성각 필터는 독립적으로 개발/배포 가능하며, 테스트 단위로도 활용 가능
병렬 처리필터 병렬 실행 및 수평 확장멀티 스레드, 워커 풀, 컨테이너 기반 병렬 실행 가능
데이터 포맷 일관성표준화된 인터페이스필터 간 데이터 형식 일관성 확보 필요 (예: JSON Schema, Avro 등)
장애 대응예외 격리, DLQ, 백프레셔 등각 필터 단위로 장애를 격리하고 처리 실패 시 별도 처리 가능
3. 실무 적용 분야데이터 처리 파이프라인ETL, CDC, 로그 집계, 센서 데이터 등대용량 배치 및 실시간 스트림 처리 모두에 활용 가능
멀티미디어 처리이미지, 오디오, 비디오 파이프라인GStreamer 기반 필터 적용 예시 존재
스트림 처리실시간 데이터 스트림Kafka Streams, Flink 등 연계 가능
반응형 프로그래밍Reactive Streams, 백프레셔실시간 시스템에서 유연한 흐름 제어 가능
4. 기술 스택 연계프레임워크Kafka Streams, Flink, GStreamer필터 - 파이프 구조 기반으로 다양한 도메인에 활용 가능
서버리스 아키텍처AWS Lambda필터를 함수형으로 구현하여 유연하고 탄력적인 확장 가능
Container OrchestrationKubernetes, Docker필터 단위를 컨테이너로 배포 및 스케일링
Service MeshIstio, Linkerd 등필터 간 통신을 추적하고 정책 기반 제어 가능
5. 운영 및 품질 관리Observability분산 추적 (Tracing), APM파이프라인 가시성 확보 및 병목 원인 추적
CI/CDPipeline as Code파이프라인 구성을 코드로 관리 및 자동 배포 가능
데이터 계보 및 스키마 관리Data Lineage, Schema Registry데이터 흐름 추적 및 필터 간 스키마 호환성 보장
예외 처리 및 복구 전략DLQ, Circuit Breaker장애 시 데이터 유실 없이 복구 가능하도록 설계
6. 아키텍처 패턴 연계Event Sourcing이벤트 저장 기반의 처리 패턴파이프라인의 이벤트 기반 설계에 활용
CQRS읽기/쓰기 분리 구조처리 성능 최적화 및 필터 역할 분리
Event-Driven Architecture이벤트 기반 마이크로서비스 확장필터 간 느슨한 결합 유지에 효과적
Data Mesh / Orchestration도메인 중심 데이터 파이프라인 설계복잡한 엔터프라이즈 데이터 흐름 관리에 활용

반드시 학습해야할 내용

대분류중분류핵심 항목설명
기초 이론소프트웨어 아키텍처 개념Data-Flow Architecture흐름 기반 처리 아키텍처의 상위 개념으로, Pipe-and-Filter 포함됨
아키텍처 비교 분석Pipeline Pattern유사 구조의 구현 패턴 (필터 체인 등) 과 비교 분석
운영체제 기초Unix Pipe MechanismUnix/Linux 파이프 기반 동작 원리와 필터 개념 이해
분산 시스템 이론CAP TheoremPipe-and-Filter 아키텍처의 분산 처리 시 일관성/가용성/분할 허용성 고려 사항
통합 패턴Enterprise Integration Patterns시스템 간 통합에서 필터 및 메시지 흐름 구성에 활용되는 패턴 집합
핵심 설계 원칙객체지향 설계 원칙SOLID - SRP각 필터는 단일 책임 원칙 (SRP) 을 따라야 유지보수성 확보 가능
상태 관리 전략Stateless vs Stateful Filters필터의 상태 유지 여부에 따른 처리 방식, 병렬화/확장성 전략
동시성 모델Thread-safe Filter Design멀티스레드 환경에서의 안전한 필터 설계 원칙
흐름 제어Backpressure, Buffering처리 지연 방지 및 흐름 안정화를 위한 핵심 제어 전략
구현 기술비동기/동기 프로그래밍Async / Await, Threading필터 동작의 실행 방식과 흐름 컨트롤 방식에 대한 이해
스트림 처리 프레임워크Apache Flink, Kafka Streams실시간 필터 기반 스트림 처리 구현 기술
미디어 파이프라인GStreamer, FFmpeg오디오/비디오 스트리밍 처리에서의 필터 파이프 구조 구현
메시지 큐Apache Kafka, RabbitMQ필터 간 비동기 메시지 전달을 위한 인프라 기술
테스트 및 운영단위 및 통합 테스트필터 단위 테스트 전략각 필터의 유닛 테스트 구성과 파이프라인 전체 테스트 전략
모니터링/로깅Prometheus, Grafana, 로그 수집기파이프라인 상태 추적 및 성능 모니터링 체계 구축
장애 대응체크포인트, 재처리 전략중간 상태 저장 및 장애 복구를 위한 설계 기법
실무 적용 사례데이터 파이프라인ETL, 로그 처리실무에서 Pipe-and-Filter 기반으로 구성되는 처리 흐름의 대표적 적용 사례
컴파일러Lexer → Parser → Optimizer소스 코드 처리에서 Pipe-and-Filter 구조가 적용되는 대표 사례
IoT, 이벤트 기반 시스템센서 수집 → 필터 → 알림이벤트 기반 데이터 흐름에 필터 체인을 적용한 실시간 처리 예시

용어 정리

카테고리용어설명
아키텍처 개념Pipe-and-Filter Architecture데이터 흐름을 처리하는 여러 ** 필터 (Filter)** 와 이를 연결하는 ** 파이프 (Pipe)** 로 구성된 구조. 각 필터는 독립적으로 실행되며 파이프는 데이터 전송을 담당함.
구성 요소Filter입력 데이터를 변환하거나 처리한 뒤 출력하는 독립적 처리 단위. Stateless 또는 Stateful 구조로 구현될 수 있음.
Pipe필터 간 데이터를 전달하는 연결 경로. 직렬, 병렬, 버퍼링 구조로 구성 가능함.
Source (Pump)데이터 파이프라인의 시작점에서 데이터를 생성하거나 수집하는 요소.
Sink파이프라인의 종료점에서 결과 데이터를 소비하거나 저장하는 요소.
Buffer필터 간 데이터 흐름을 조절하기 위한 임시 저장 공간. 비동기 처리나 병렬성을 지원함.
Splitter하나의 데이터 스트림을 여러 개로 분기하는 역할을 수행하는 구성 요소.
Aggregator여러 데이터 스트림을 하나로 통합하여 처리하는 필터.
처리 모델Stateless Filter내부 상태를 저장하지 않고 입력만으로 처리하는 단순 구조. 병렬 처리 및 테스트에 적합함.
Stateful Filter세션, 집계 등 내부 상태를 유지하면서 처리하는 구조. 외부 저장소 (RocksDB 등) 와 연계됨.
Push-based Processing상위 필터가 데이터를 생성하여 하위 필터로 즉시 전달하는 방식. 실시간/스트림 처리에 적합.
Pull-based Processing하위 필터가 필요 시 상위 필터로부터 데이터를 요청하는 방식. 제어 흐름 관리에 유리함.
최적화 및 성능Backpressure데이터 생산과 소비 속도 차이로 인한 병목 현상을 제어하는 메커니즘.
Flow Control전체 파이프라인의 데이터 흐름을 조절하여 시스템의 안정성과 처리량 보장.
Zero-Copy데이터 복사를 줄여 처리 속도 및 효율성을 높이는 성능 최적화 기법.
Throughput단위 시간당 처리 가능한 데이터 양. 병렬 처리 구조와 밀접한 연관이 있음.
Latency데이터가 시스템을 통과하는 데 걸리는 시간. 파이프라인 단계 수 및 병렬성 영향 받음.
운영 및 장애 처리Checkpoint상태 저장 및 장애 발생 시 복구 가능한 중간 저장 지점. 스트림 처리 엔진에서 필수 요소.
Dead Letter Queue (DLQ)처리 실패한 메시지를 별도로 저장하여 재처리나 분석에 활용하는 큐.
Circuit Breaker장애 발생 시 전체 시스템 확산을 방지하고, 부분 장애를 격리하는 보호 패턴.
Idempotency동일한 연산을 여러 번 수행해도 결과가 동일한 성질. 메시지 중복 처리에 필수.
모니터링 및 추적Logging / Monitoring각 필터/파이프 상태를 추적하고, 운영 지표를 수집하여 장애를 조기에 감지함.
Distributed Tracing분산된 필터 간 처리 경로 전체를 추적하여 성능 병목, 지연 등을 분석하는 기법.
Correlation ID서로 다른 필터 또는 시스템 간 이벤트를 식별하고 추적할 수 있도록 해주는 고유 ID.
데이터 품질/보안Schema Registry필터 간 주고받는 데이터의 스키마를 중앙에서 관리하고 검증하는 시스템.
Data Lineage데이터의 생성부터 소멸까지의 흐름을 추적하여 변경 이력, 출처, 품질 보장 등을 확인함.

참고 및 출처

개념 및 아키텍처 설명

관련 구현 도구 및 프레임워크

아키텍처 분류 및 관련 이론

보조 자료 및 실전 블로그