Pipe-Filter Pattern

파이프-필터 패턴(Pipe-Filter Pattern)은 데이터 스트림을 처리하는 시스템에서 사용되는 소프트웨어 아키텍처 패턴.
이 패턴은 복잡한 처리 과정을 독립적인 단계로 나누어 모듈화하고, 이들을 순차적으로 연결하여 데이터를 처리한다.

주요 구성 요소

  1. 필터(Filter):

    • 단일 작업을 수행하는 처리 컴포넌트입니다
    • 입력을 받아 처리하고 출력을 생성합니다
    • 독립적으로 동작하며 다른 필터에 대해 알지 못합니다
    • 재사용이 가능하고 조합할 수 있어야 합니다
  2. 파이프(Pipe):

    • 필터 간의 데이터 전달을 담당합니다
    • 데이터 버퍼링과 동기화를 처리합니다
    • 필터들을 느슨하게 결합시킵니다
    • 대개 큐나 스트림으로 구현됩니다
  3. 파이프라인(Pipeline):

    • 필터들을 특정 순서로 연결한 전체 처리 과정입니다
    • 필터의 추가/제거/재배치가 가능합니다
    • 전체 데이터 흐름을 관리합니다

작동 방식

  1. 데이터 소스에서 원본 데이터가 생성됩니다.
  2. 데이터는 파이프를 통해 첫 번째 필터로 전달됩니다.
  3. 각 필터는 받은 데이터를 처리하고 결과를 다음 파이프로 전달합니다.
  4. 이 과정이 마지막 필터까지 반복됩니다.
  5. 최종 결과는 데이터 싱크(Data Sink)에 저장됩니다.

장점

  1. 모듈성: 각 필터가 독립적으로 작동하여 시스템의 모듈성이 향상됩니다.
  2. 재사용성: 필터는 다른 파이프라인에서도 재사용될 수 있습니다.
  3. 유연성: 필터를 추가, 제거, 재배치하여 다양한 파이프라인을 구축할 수 있습니다.
  4. 확장성: 새로운 필터를 쉽게 추가할 수 있어 시스템 확장이 용이합니다.
  5. 병렬 처리: 필터들이 동시에 작업을 수행할 수 있어 성능 향상이 가능합니다.

단점

  1. 성능 제한: 가장 느린 필터에 의해 전체 시스템의 성능이 제한될 수 있습니다.
  2. 데이터 변환 오버헤드: 필터 간 데이터 이동 시 변환 작업으로 인한 오버헤드가 발생할 수 있습니다.
  3. 복잡성: 대규모 시스템에서는 파이프-필터 구조가 복잡해질 수 있습니다.

구현 시 고려사항

  1. 데이터 형식:

    • 필터 간 데이터 형식을 표준화해야 합니다
    • 데이터 변환 오버헤드를 고려해야 합니다
    • 데이터 무결성을 보장해야 합니다
  2. 버퍼 관리:

    • 파이프의 버퍼 크기를 적절히 설정해야 합니다
    • 메모리 사용량을 관리해야 합니다
    • 데드락을 방지해야 합니다
  3. 에러 처리:

    • 각 필터의 에러를 적절히 처리해야 합니다
    • 파이프라인 전체의 에러 복구 전략이 필요합니다
    • 실패한 처리를 어떻게 재시도할지 결정해야 합니다

활용 사례

  1. 컴파일러: 어휘 분석, 구문 분석, 의미 분석, 코드 생성 등의 단계를 필터로 구현.
  2. UNIX 셸: 명령어를 파이프로 연결하여 복잡한 작업을 수행.
  3. ETL(Extract, Transform, Load) 프로세스: 데이터 웨어하우스에서 데이터 처리.
  4. 디지털 신호 처리: 오디오나 비디오 파일 처리.
  5. IoT 데이터 처리: 센서 데이터의 수집, 필터링, 분석.

구현 예시

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from abc import ABC, abstractmethod
from typing import Any, List
import queue
import threading
import time

# 필터의 기본 인터페이스
class Filter(ABC):
    def __init__(self, name: str):
        self.name = name
        self.input_pipe = queue.Queue()
        self.output_pipe = queue.Queue()
        self._is_running = False
        self._thread = None

    @abstractmethod
    def process(self, data: Any) -> Any:
        """데이터를 처리하는 추상 메서드"""
        pass

    def start(self):
        """필터 처리를 시작"""
        self._is_running = True
        self._thread = threading.Thread(target=self._run)
        self._thread.daemon = True
        self._thread.start()

    def stop(self):
        """필터 처리를 중지"""
        self._is_running = False
        if self._thread:
            self._thread.join()

    def _run(self):
        """필터의 메인 처리 루프"""
        while self._is_running:
            try:
                # 입력 파이프에서 데이터를 가져옴
                data = self.input_pipe.get(timeout=1.0)
                if data is None:  # 종료 시그널
                    break
                    
                # 데이터 처리 및 결과를 출력 파이프로 전달
                result = self.process(data)
                if result is not None:
                    self.output_pipe.put(result)
                    
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Error in {self.name}: {e}")

# 구체적인 필터 구현
class TextNormalizationFilter(Filter):
    """텍스트를 정규화하는 필터"""
    def process(self, text: str) -> str:
        # 소문자로 변환하고 앞뒤 공백 제거
        normalized = text.lower().strip()
        print(f"{self.name}: Normalized text to '{normalized}'")
        return normalized

class PunctuationRemovalFilter(Filter):
    """구두점을 제거하는 필터"""
    def process(self, text: str) -> str:
        # 기본적인 구두점 제거
        cleaned = ''.join(char for char in text if char.isalnum() or char.isspace())
        print(f"{self.name}: Removed punctuation -> '{cleaned}'")
        return cleaned

class WordCountFilter(Filter):
    """단어 수를 세는 필터"""
    def process(self, text: str) -> dict:
        # 단어별 출현 횟수 계산
        words = text.split()
        word_count = {}
        for word in words:
            word_count[word] = word_count.get(word, 0) + 1
        print(f"{self.name}: Counted words -> {word_count}")
        return word_count

# 파이프라인 관리자
class Pipeline:
    def __init__(self):
        self.filters: List[Filter] = []

    def add_filter(self, filter: Filter):
        """파이프라인에 필터 추가"""
        if self.filters:
            # 이전 필터의 출력을 현재 필터의 입력으로 연결
            prev_filter = self.filters[-1]
            filter.input_pipe = prev_filter.output_pipe
        self.filters.append(filter)

    def start(self):
        """모든 필터 시작"""
        print("Starting pipeline…")
        for filter in self.filters:
            filter.start()

    def stop(self):
        """모든 필터 중지"""
        print("Stopping pipeline…")
        for filter in self.filters:
            filter.stop()

    def process(self, data: Any):
        """데이터 처리 시작"""
        if not self.filters:
            return data
            
        # 첫 번째 필터에 데이터 입력
        first_filter = self.filters[0]
        first_filter.input_pipe.put(data)
        
        # 마지막 필터의 결과 반환
        return self.filters[-1].output_pipe.get()

# 사용 예시
def main():
    # 파이프라인 생성 및 필터 추가
    pipeline = Pipeline()
    pipeline.add_filter(TextNormalizationFilter("Normalizer"))
    pipeline.add_filter(PunctuationRemovalFilter("PunctuationCleaner"))
    pipeline.add_filter(WordCountFilter("WordCounter"))

    # 파이프라인 시작
    pipeline.start()

    try:
        # 테스트 데이터 처리
        input_text = "Hello, World! Hello, Python… Hello, Pipe-Filter Pattern!"
        print(f"\nProcessing text: '{input_text}'")
        
        result = pipeline.process(input_text)
        print(f"\nFinal result: {result}")

    finally:
        # 파이프라인 종료
        pipeline.stop()

if __name__ == "__main__":
    main()

참고 및 출처