Batch Processing

배치 처리 (Batch Processing) 는 여러 데이터 작업을 묶어 한 번에 처리하는 데이터 처리 방식이다. 이는 주로 대용량 데이터 집계, 변환, 백업, 보고서 생성 등 반복적이고 대규모 작업에 적용된다. 배치 처리는 데이터 흐름 아키텍처의 한 형태로, 자동화, 확장성, 일관성, 오류 복구 등에서 강점을 가지며, 실시간 처리와는 다른 효율성과 안정성을 제공한다. 현대 IT 시스템에서는 분산 처리, 스케줄링, 장애 복구 등과 결합되어 다양한 산업에서 핵심 역할을 수행한다.

배경

목적 및 필요성

주요 목적

필요성

핵심 개념

배치 처리 (Batch Processing) 란, 대량의 데이터를 일정한 시간 간격이나 조건에 따라 일괄적으로 처리하는 데이터 처리 방식이다. 실시간 처리 (Real-time Processing) 와 달리, 즉각적인 결과가 필요한 것이 아니라, 데이터가 충분히 모인 후 한 번에 처리한다. 데이터 플로우 아키텍처 (Data-Flow Architecture) 내에서, 데이터가 여러 처리 단계를 거치며 흐르는 구조로 설계된다.

기본 개념

배치 처리에서 반드시 알아야 하는 핵심 개념들을 분석하면 다음과 같다:

항목정의주요 특징 및 목적
배치 작업 (Batch Job)사용자 개입 없이 자동 실행되는 일련의 명령어 또는 프로그램 집합- 순차 실행
- 정해진 스케줄 기반 실행
- 무인 처리, 반복 가능
청크 처리 (Chunk Processing)대용량 데이터를 작은 단위 (청크) 로 나누어 순차적으로 처리- 메모리 사용 최소화
- 장애 발생 시 청크 단위 복구
- 효율적 리소스 관리
배치 윈도우 (Batch Window)시스템이 배치 작업을 실행하도록 허용된 특정 시간 구간- 낮은 트래픽 시간 활용
- 시스템 부하 최소화
- 운영 안정성 확보
ETL 패턴데이터를 추출 (Extract) 하고, 변환 (Transform) 하여, 적재 (Load) 하는 3 단계 처리 패턴- 데이터 웨어하우스, BI 시스템의 기본
- 주로 배치 작업으로 수행됨

실무 구현을 위한 연관성 분석

핵심 개념들이 실무 구현에서 갖는 연관성을 다음과 같이 분석할 수 있다:

처리 효율성 측면

시스템 안정성 측면

운영 관리 측면

주요 기능 및 역할

데이터 수집 및 집계

스케줄링 및 워크플로우 관리

리소스 관리

특징

구분특징설명
처리 방식일괄 처리데이터를 모아서 한 번에 처리
실행 시점지연 처리실시간이 아닌 스케줄된 시점에 실행
리소스 사용집중적 사용특정 시간대에 리소스를 집중 활용
처리량고처리량단위 시간당 많은 데이터 처리 가능

핵심 원칙

  1. 단순성 (Simplicity)

    • 복잡한 실시간 동기화 없이 단순한 순차 처리
  2. 확장성 (Scalability)

    • 데이터 양과 처리 요구사항에 따른 수평적 확장
  3. 내결함성 (Fault Tolerance)

    • 장애 발생 시 복구 및 재시작 능력
  4. 재현성 (Reproducibility)

    • 동일한 입력에 대해 일관된 결과 보장

주요 원리

graph TD
    A[데이터 소스] --> B[데이터 수집]
    B --> C[큐/버퍼]
    C --> D[배치 작업 스케줄러]
    D --> E[배치 프로세서]
    E --> F[데이터 변환]
    F --> G[결과 저장]
    G --> H[모니터링 및 로깅]
    
    D --> I[작업1]
    D --> J[작업2]
    D --> K[작업N]
    
    I --> L[청크 처리]
    J --> M[청크 처리]
    K --> N[청크 처리]

배치 처리의 핵심 작동 방식은 다음과 같다:

  1. 데이터 수집 단계: 다양한 소스에서 데이터를 수집하여 임시 저장소에 축적
  2. 스케줄링 단계: 미리 정의된 스케줄에 따라 배치 작업 실행
  3. 처리 단계: 수집된 데이터를 청크 단위로 나누어 순차적 또는 병렬 처리
  4. 출력 단계: 처리 결과를 목적지 시스템에 저장

작동 원리 및 방식

Batch Processing 은 보통 다음과 같은 순서로 동작한다:

sequenceDiagram
  participant Scheduler
  participant Source
  participant Staging
  participant Processor
  participant Target
  participant Monitor

  Scheduler->>Source: Trigger Batch Job
  Source->>Staging: Extract Data
  Staging->>Processor: Provide Raw Data
  Processor->>Target: Load Transformed Data
  Processor->>Monitor: Emit Logs & Metrics
  Monitor-->>Scheduler: Notify Completion/Error
  1. Trigger: 특정 시간/이벤트 기반으로 배치 작업 시작
  2. Ingestion: 원본 시스템에서 데이터 추출
  3. Staging: 임시 저장소에 데이터 저장 (HDFS, S3 등)
  4. Processing: Spark, Hadoop, SQL 등으로 처리 수행
  5. Validation: 처리 결과 유효성 검사 (null, outlier 등)
  6. Load: 결과를 Data Warehouse 또는 DB 에 저장
  7. Logging/Monitoring: 실행 이력 및 에러 로깅
  8. Alerting: 실패 시 알림 전송

구조 및 아키텍처

Batch Processing 시스템은 다음과 같은 주요 컴포넌트로 구성된다.
이 구조는 데이터 수집 → 처리 → 저장 → 모니터링/알림까지 전체 워크플로우를 포괄한다.

graph TD
  A[Data Source] --> B[Data Ingestion]
  B --> C[Staging Area / Raw Storage]
  C --> D["Batch Scheduler (e.g., Airflow, StepFn)"]
  D --> E["Batch Processing Engine (e.g., Spark, Hadoop)"]
  E --> F["Target System (e.g., Data Warehouse)"]
  E --> G[Monitoring & Logging]
  G --> H["Alerting System (e.g., SNS, Email)"]

구성 요소

구분구성 요소설명기능 및 역할
필수Data Source원본 데이터운영 DB, 로그, API, 파일 등 다양한 소스에서 데이터 수집
Ingestion Layer데이터 수집 계층배치 주기 또는 이벤트 기반으로 원본 데이터를 수집
Staging / Raw Storage임시 저장소수집된 원본 데이터를 저장하여 이후 처리나 검증을 위한 중간 저장소 역할
Batch Scheduler작업 스케줄러정해진 시간이나 조건에 따라 배치 작업 흐름 제어 (예: cron, Airflow 등)
Processing Engine데이터 처리 엔진Spark, Hadoop, SQL 엔진 등을 이용하여 데이터를 변환, 가공, 집계
Target System결과 저장소처리된 데이터를 적재할 최종 저장소 (예: DWH, 데이터 마트, 분석 시스템 등)
선택Monitoring System모니터링 도구작업 실행 상태 모니터링, 성능 분석, 리소스 사용량 시각화 (예: Grafana, CloudWatch)
Alerting System알림 시스템오류/실패 발생 시 즉시 알림 전송 (예: SNS, Slack, PagerDuty 등)
Metadata Store메타데이터 저장소배치 작업 실행 이력, 파이프라인 버전 관리, 작업 상태 저장
Lineage Tracker데이터 라인리지 추적 시스템데이터가 어디서 왔고 어떻게 가공되었는지를 추적하여 거버넌스와 감사 용도로 사용

구현 기법

카테고리구현 기법정의/핵심 기술대표 도구/프레임워크적용 시나리오/예시
처리 단위 설계청크 기반 처리 (Chunking)대용량 데이터를 일정 단위로 분할하여 메모리 효율적 처리Spring Batch, JSR-3521 천만 건 주문 데이터 → 1 천 건 단위로 청크 처리
증분 처리 (Incremental Load)변경된 데이터만 추출하여 적재; 전체 리소스 소모 절감Airbyte, dbt, Kafka Connect전날 이후 변경된 사용자 로그만 적재
파이프라인 구성파이프라인 처리 (Pipeline)데이터 흐름을 단계별로 처리 (필터링, 변환, 집계 등)Spring Batch, Cascading, NiFi로그 → 파싱 → 필터링 → 집계 → 저장
파이프라인 추상화Source-Pipe-Sink 추상화 구조로 흐름 재사용Cascading, StreamSets재사용 가능한 데이터 흐름 구성
실행 및 스케줄링정기 스케줄링 (Scheduling)정해진 시간에 자동 실행; 주로 cron 기반Linux Cron, Windows Scheduler매일 새벽 2 시 로그 집계 작업 수행
워크플로 오케스트레이션작업 간 의존성, 재시도, 분기 흐름을 제어하는 복합 스케줄링Apache Airflow, AWS Step Functions, Luigi단계별 ETL 처리: 추출 → 정제 → 분석 → 적재
병렬 및 분산 처리병렬 처리 (Parallelism)병렬로 독립적인 작업 실행; 처리 시간 단축Java ThreadPool, Python multiprocessing지역별 데이터 집계 병렬 수행
분산 처리 (Distributed Batch)여러 노드/서버에 작업 분산; 대용량 고속 처리Hadoop MapReduce, Apache Spark수 TB 규모의 로그 처리 작업 분산 수행
이벤트 기반이벤트 트리거 처리이벤트 발생 시 배치 트리거 실행 (on-demand batch)AWS Lambda + S3, Cloud Functions + PubSub파일 업로드 → 자동 데이터 추출 및 적재 실행
트랜잭션/복구트랜잭션 및 롤백 처리청크 단위 트랜잭션, 실패 시 롤백 및 재처리 지원Spring Batch, Quartz일부 실패 시 재시도 가능하도록 트랜잭션 설정
체크포인트/라인리지처리 중단 시 상태 저장 → 이후 이어서 실행 가능Apache Spark Lineage, Apache Flink Checkpoint실패 시 체크포인트부터 다시 실행
최적화 기법파티셔닝파티션 기준으로 데이터 분할 → I/O 최적화Hive, BigQuery, Delta Lake일자별 파티션으로 로그 처리 속도 향상
병렬 청크 처리청크 단위로 병렬 처리 및 병합; 트랜잭션 범위 내에서 동작Spring Batch Parallel Steps고객 데이터 10 만 건 병렬 청크 처리
ETL 구성전통적 ETL 패턴Extract → Transform → Load 순서의 일괄 처리Informatica, Talend, Apache BeamERP 데이터 → 정규화 → 분석용 적재

장점

카테고리항목설명
1. 자원 효율성리소스 집중 사용오프피크 시간대에 작업 수행하여 유휴 자원을 효과적으로 활용
높은 처리량대량 데이터를 청크 기반으로 효율적으로 병렬 처리 가능
시스템 리소스 최적화특정 시간대에 집중 처리하여 전체적인 시스템 리소스 사용률 향상
2. 신뢰성과 일관성데이터 일관성 보장트랜잭션 단위 처리와 검증을 통해 정합성과 무결성 유지 가능
장애 복구 능력처리 중 장애 발생 시 중단된 청크부터 재처리 가능, 복구 메커니즘 내장
예측 가능성정해진 흐름과 순서로 동작하여 안정적이고 일관된 결과 제공
3. 자동화 및 운영 효율자동화된 실행스케줄러 또는 이벤트 기반으로 무중단 자동 실행 가능
운영 효율성반복 작업을 자동화하여 인적 개입 최소화, 오류 가능성 감소
4. 확장성과 유연성확장성분산 처리 엔진 (Spark 등) 과 결합해 대규모 워크로드도 손쉽게 확장 가능
다양한 처리 방식 지원청크 처리, 병렬 처리, 파이프라인 구성 등 다양한 처리 전략 적용 가능
5. 비용 최적화인프라 비용 절감고성능 실시간 시스템 대신 저비용 인스턴스로도 운영 가능
온디맨드 자원 사용클라우드 환경에서 작업 시간에만 리소스를 할당하여 비용 효율 극대화

단점과 문제점 그리고 해결방안

단점

카테고리항목설명대표 해결책
1. 실시간성 부족지연 시간배치 주기 사이의 공백으로 인해 실시간 피드백 불가마이크로 배치, Lambda/Kappa 아키텍처 구성
실시간성 미지원긴 주기 또는 단일 대량 처리로 인해 실시간 이벤트 대응 어려움실시간 처리와의 하이브리드 구성
2. 리소스 문제리소스 집중특정 시간대 (배치 윈도우) 에 과도한 부하 발생작업 분산, Auto-Scaling, 스케줄링 최적화
자원 경합복수 배치 병렬 실행 시 CPU, IO 병목 발생실행 타임슬롯 분리, QoS 기반 스케줄링, 쓰로틀링 적용
3. 복잡성 및 운영 이슈시스템 복잡성클러스터, 의존성, 스케줄, 실패 처리 등 복잡한 오케스트레이션 필요표준화된 파이프라인 설계, 워크플로우 엔진 사용
디버깅 어려움대량 처리 중 발생한 오류를 찾기 어려움세분화된 로그, 샘플링 기반 테스트, APM 도구 도입
의존성 관리 어려움순서와 의존성이 많은 경우 관리가 복잡해짐의존성 그래프 및 워크플로우 엔진 활용
4. 장애에 대한 취약성전체 실패 위험하나의 오류로 전체 작업 실패 가능성청크 단위 처리, 단계별 체크포인트, 실패 격리 로직 적용

문제점

카테고리항목원인영향탐지 및 진단 방식예방 전략해결 기법
1. 처리 실패작업 실패네트워크 장애, 시스템 다운 등전체 작업 실패, 데이터 유실시스템/네트워크 모니터링, 알림 시스템장애 복구 구조 (Failover), 이중화재시도 로직, 체크포인트 복원
스케줄 실패트리거 누락, 시간 이벤트 불일치배치 누락, 데이터 손실스케줄 모니터링, 로그 분석이벤트 조합 기반 스케줄링재스케줄링, 예외 알람
2. 데이터 무결성데이터 불일치처리 도중 원본 데이터 변경결과 오류, 비즈니스 영향버전 관리, 체크섬 검증스냅샷 처리, 락 메커니즘 적용트랜잭션 격리, 재처리 로직
부분 실패 시 전체 실패일괄 커밋 구조에서 일부 청크 오류가 전체 실패로 확대됨재처리 비용 증가, 운영 지연에러 로그 추적, 청크 단위 상태 기록청크 단위 커밋, 중간 체크포인트 적용실패 청크 재처리, Retry 로직 적용
3. 리소스 관리 실패메모리 부족대용량 데이터 처리 중 메모리 초과작업 실패, OOM, 성능 저하JVM 모니터링, 메모리 사용량 추적청크 크기 조정, 힙 메모리 설정 최적화GC 튜닝, 스트리밍 처리 적용
리소스 과다 사용동시 작업 및 대량 데이터 투입시스템 전체 성능 저하, 비용 상승Prometheus, CloudWatch 등 리소스 모니터링배치 분산, 시간대 분리분산 처리 엔진 도입, 리소스 제한 설정
4. 의존성 오류의존성 충돌 및 순서 오류설계 미비, 선행 작업 실패데이터 누락, 오류 전파워크플로우 상태 시각화, 그래프 기반 검증순서 명확화, 의존성 도식화작업 재설계, 조건 기반 실행 제어

도전 과제

카테고리도전 과제원인/배경영향해결 방안적용 아키텍처/기법
실시간성 통합실시간성 요구 증가데이터 기반 의사결정 속도 요구 증가기존 배치 처리의 지연 한계 노출Kappa/Lambda 구조, 하이브리드 처리 설계Lambda, Kappa, Streaming + Batch 병합
복잡도 관리워크플로우 복잡성 증가의존성, 조건 분기, 트랜잭션 제어 로직 증가설계/테스트/운영 난이도 상승Airflow/Luigi 등 DSL 기반 워크플로 도구 활용, 작업 단위 모듈화DAG 기반 워크플로우, Step Functions
장애 복구/안정성분산 환경에서의 장애 대응병렬·분산 처리 환경 확대로 오류 감지/복구 어려움처리 실패 후 전체 재처리 부담, SLA 위반 위험체크포인트, 라인리지, 멱등성 설계, 이벤트 소싱Spark Lineage, Flink Checkpoint
데이터 품질자동 검증/정합성 보장대량 처리 시 수동 검증 불가, 오류 발생 위치 추적 어려움품질 저하, 데이터 분석/ML 결과 왜곡검증 파이프라인 삽입, 테스트 더블, Schema Registry, Validation Rule 자동화Great Expectations, dbt tests
규제 대응/보안데이터 거버넌스 및 컴플라이언스GDPR, SOC2, ISMS 등 외부 감사 기준 강화로그 누락/무단 접근 시 법적 이슈Masking, Logging 강화, Role 기반 접근제어, 감사로그 기록Fine-grained Audit Logging
클라우드 전환클라우드 네이티브 최적화온프레미스 배치 시스템의 클라우드 이식성 부족리소스 낭비, 배포 복잡성, 비용 상승서버리스 (FaaS) 배치, 자동 스케일링 도구, 관리형 워크플로우 도입AWS Batch, GCP Dataflow, Argo
확장성/성능대규모 데이터 볼륨 증가로그, IoT, 이벤트 기반 시스템에서 데이터 폭증처리 지연, I/O 병목, 스토리지 비용 증가데이터 파티셔닝, 병렬 청크 처리, 마이크로서비스 기반 분산 설계Kafka + Spark, Containerized Batcher
비용/리소스리소스 최적화 요구클라우드 과금 모델로 비용 예측 어려움비효율적인 리소스 할당으로 비용 급등Auto-scaling, 비용 예측 모델링, 스팟 인스턴스 활용Kubernetes CronJob + Horizontal Autoscaler

분류 기준에 따른 종류 및 유형

분류 기준유형설명주요 특징 및 기술 예시
1. 실행 시점시간 기반 (Time-based)지정된 시간에 정기적으로 실행됨 (예: cron)예측 가능성 높음, 주기적 데이터 수집
이벤트 기반 (Event-driven)특정 트리거 또는 이벤트 발생 시 실행실시간 연계 가능, 동적 처리
2. 처리 방식순차 처리 (Sequential)단일 스레드 또는 단일 노드에서 순차적으로 실행단순 구조, 디버깅 용이
병렬 처리 (Parallel)다중 스레드/프로세스를 활용한 동시 실행처리 성능 향상, 리소스 최적화
분산 처리 (Distributed)클러스터 기반 병렬 처리 (예: Hadoop, Spark)대규모 데이터 처리 적합, 복잡한 인프라 필요
3. 실행 환경온프레미스 (On-Premises)자체 서버 또는 데이터센터 기반에서 실행보안/제어 용이, 초기 투자 비용 큼
클라우드 (Cloud-based)AWS Batch, Google Dataflow 등 클라우드 서비스 기반 실행확장성, 유연성, 사용량 기반 비용 구조
4. 스케줄링 방식정기적 배치 (Scheduled)주기적으로 실행되는 방식 (시간 기반)크론, Airflow 등 활용 가능
이벤트 기반 배치메시지 수신, 파일 도착 등 이벤트 발생 시 트리거Lambda, Step Functions 등과 연계
5. 데이터 범위Full Batch전체 데이터를 매번 새로 적재/처리데이터 재처리 중심, 정확도 중시
Incremental / Micro-Batch변경된 데이터만 처리 또는 수초 단위의 소규모 배치 처리성능/자원 최적화, 실시간성과 유사
6. 처리 위치단일 서버 (Single-node)하나의 서버에서 전체 파이프라인 처리설정 간편, 확장성 낮음
분산 시스템 (Multi-node)여러 서버에서 역할 분산하여 처리 (ETL/저장 등 분리)병목 방지, 안정성 향상
7. 저장소 유형파일 기반 배치CSV, Parquet 등 파일 입출력을 중심으로 처리S3, HDFS 등과 연계, 로그 기반 처리에 적합
DB 기반 배치RDB, NoSQL 등에서 데이터를 읽고 쓰는 방식정형 데이터 중심, 트랜잭션 보장 필요 시 적합

실무 사용 예시

도메인/산업사용 목적대표 기술 스택성과/효과
금융일일 거래/지급 내역 정산RDBMS + 배치 스케줄러 (Quartz, cron), ETL회계 정확도 향상, 규정 준수, 수작업 90% 감소
전자상거래상품 추천 모델 학습 전처리Spark, Kafka, ML Pipeline개인화 향상, 학습 시간 2 배 단축, 매출 증대
헬스케어의료 이미지 데이터 전처리AI/ML 플랫폼 (TensorFlow, PyTorch), Object Storage진단 정확도 향상, 처리 시간 단축
제조업생산/품질 데이터 집계 및 분석IoT 센서 + Data Warehouse (Snowflake, BigQuery)생산성 향상, 품질 제어, 불량률 감소
통신통화/네트워크 로그 분석Hadoop, Flink, Spark네트워크 부하 분석, 요금 계산 정확도 개선
웹/플랫폼 서비스서버/접속 로그 집계MapReduce, Spark, CloudWatch Logs탐지 시간 단축 (24h → 1h), 스토리지 비용 절감
관리/운영정기 리포트 및 보고서 자동화Airflow, Excel Automation, PDF Generator반복 작업 자동화, 인적 오류 감소, 의사결정 속도 향상
시스템 관리데이터 마이그레이션/백업DB 덤프 + 정기 스케줄러, S3, Rsync안정성 확보, 데이터 정합성 유지, 수작업 제거

활용 사례

사례 1: 데이터 웨어하우스 ETL 배치 처리

시스템 구성:

Workflow

  1. 운영 DB 에서 데이터 추출
  2. 변환 및 정제
  3. 데이터 웨어하우스에 적재
  4. 결과 모니터링, 오류 발생 시 알림 및 재처리
flowchart TD
    A[운영 DB] --> B[배치 스케줄러]
    B --> C[ETL 처리]
    C --> D[데이터 웨어하우스]
    C --> E[모니터링/알림]

주제의 역할: 대량의 운영 데이터를 정기적으로 집계·정제·적재하여 분석 환경에 제공

유무에 따른 차이점: 배치 처리 미적용 시, 데이터 적재 지연, 인적 오류, 일관성 저하 발생

사례 2: 은행 EOD(Transaction Reconciliation)

구성: DB→추출→Spark MapReduce 집계→데이터 웨어하우스 로드

워크플로:

  1. Trigger: 23:00 스케줄러 실행
  2. Extract: 당일 트랜잭션 추출
  3. Transform: MapReduce 집계
  4. Load: 보고용 테이블에 저장
  5. Report: 알림 및 리포트 전송
flowchart TD
  SCHEDULER-->EXTRACT[Extract DB]
  EXTRACT-->MAPREDUCE[Batch Transform]
  MAPREDUCE-->LOAD[Load to Warehouse]
  LOAD-->REPORT[Generate Reports]

사례 3: 전자상거래 상품 추천 시스템

시스템 구성

graph TB
    subgraph "상품 추천 배치 시스템"
        A[웹/앱 로그] --> B[데이터 수집]
        C[거래 DB] --> B
        D[상품 DB] --> B
        
        B --> E[데이터 전처리]
        E --> F[특성 추출]
        F --> G[모델 학습]
        G --> H[추천 결과 생성]
        H --> I[Redis 캐시]
        H --> J[추천 DB]
        
        K[Airflow] --> E
        K --> F
        K --> G
        K --> H
    end

Workflow

  1. 매일 새벽 2 시에 Airflow 가 배치 작업 실행
  2. 전날 수집된 사용자 행동 데이터 전처리
  3. Spark 클러스터에서 협업 필터링 모델 학습
  4. 모든 사용자에 대한 추천 결과 생성
  5. Redis 와 데이터베이스에 결과 저장

배치 처리의 역할

배치 처리 유무에 따른 차이점

구현 예시

Python: 전자상거래 상품 추천 배치 처리 시스템

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# 전자상거래 상품 추천 배치 처리 시스템
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import logging
import time

class RecommendationBatchProcessor:
    """
    상품 추천을 위한 배치 처리 시스템
    """
    
    def __init__(self, chunk_size=10000):
        """
        배치 프로세서 초기화
        
        Args:
            chunk_size: 청크 단위로 처리할 데이터 크기
        """
        self.chunk_size = chunk_size
        self.logger = self._setup_logger()
        
    def _setup_logger(self):
        """로깅 설정"""
        logging.basicConfig(level=logging.INFO)
        return logging.getLogger(__name__)
    
    def extract_user_behavior_data(self, start_date, end_date):
        """
        사용자 행동 데이터 추출 (ETL의 Extract 단계)
        
        Args:
            start_date: 데이터 추출 시작 날짜
            end_date: 데이터 추출 종료 날짜
            
        Returns:
            DataFrame: 사용자 행동 데이터
        """
        self.logger.info(f"사용자 행동 데이터 추출 시작: {start_date} ~ {end_date}")
        
        # 실제 구현에서는 데이터베이스에서 데이터를 추출
        # 여기서는 샘플 데이터 생성
        sample_data = {
            'user_id': np.random.randint(1, 10000, 50000),
            'product_id': np.random.randint(1, 1000, 50000),
            'action': np.random.choice(['view', 'cart', 'purchase'], 50000),
            'timestamp': pd.date_range(start_date, end_date, periods=50000)
        }
        
        return pd.DataFrame(sample_data)
    
    def transform_data(self, raw_data):
        """
        데이터 변환 및 전처리 (ETL의 Transform 단계)
        
        Args:
            raw_data: 원시 데이터
            
        Returns:
            DataFrame: 변환된 데이터
        """
        self.logger.info("데이터 변환 시작")
        
        # 사용자-상품 상호작용 매트릭스 생성
        user_item_matrix = raw_data.pivot_table(
            index='user_id', 
            columns='product_id', 
            values='action',
            aggfunc='count',
            fill_value=0
        )
        
        # 가중치 적용 (구매 > 장바구니 > 조회)
        weights = {'view': 1, 'cart': 3, 'purchase': 5}
        for action, weight in weights.items():
            mask = raw_data['action'] == action
            user_item_matrix += weight * mask.groupby(['user_id', 'product_id']).size().unstack(fill_value=0)
        
        return user_item_matrix
    
    def process_in_chunks(self, data, processing_func):
        """
        청크 단위로 데이터 처리
        
        Args:
            data: 처리할 데이터
            processing_func: 처리 함수
            
        Returns:
            결과 데이터
        """
        self.logger.info(f"청크 단위 처리 시작 (청크 크기: {self.chunk_size})")
        
        total_chunks = len(data) // self.chunk_size + 1
        results = []
        
        for i in range(0, len(data), self.chunk_size):
            chunk = data[i:i + self.chunk_size]
            chunk_result = processing_func(chunk)
            results.append(chunk_result)
            
            self.logger.info(f"청크 처리 완료: {i//self.chunk_size + 1}/{total_chunks}")
            
        return pd.concat(results) if results else pd.DataFrame()
    
    def generate_recommendations(self, user_item_matrix, top_k=10):
        """
        협업 필터링을 통한 추천 생성
        
        Args:
            user_item_matrix: 사용자-상품 매트릭스
            top_k: 추천할 상품 개수
            
        Returns:
            dict: 사용자별 추천 상품 리스트
        """
        self.logger.info("추천 생성 시작")
        
        # 코사인 유사도 계산
        user_similarity = cosine_similarity(user_item_matrix)
        
        recommendations = {}
        
        for user_idx, user_id in enumerate(user_item_matrix.index):
            # 유사한 사용자 찾기
            similar_users = np.argsort(user_similarity[user_idx])[::-1][1:11]  # 상위 10명
            
            # 추천 점수 계산
            scores = np.zeros(len(user_item_matrix.columns))
            
            for similar_user_idx in similar_users:
                similarity_score = user_similarity[user_idx][similar_user_idx]
                user_ratings = user_item_matrix.iloc[similar_user_idx].values
                scores += similarity_score * user_ratings
            
            # 이미 상호작용한 상품 제외
            user_interactions = user_item_matrix.iloc[user_idx].values
            scores[user_interactions > 0] = 0
            
            # 상위 K개 상품 추천
            top_products = np.argsort(scores)[::-1][:top_k]
            recommendations[user_id] = user_item_matrix.columns[top_products].tolist()
        
        return recommendations
    
    def load_recommendations(self, recommendations):
        """
        추천 결과 저장 (ETL의 Load 단계)
        
        Args:
            recommendations: 추천 결과
        """
        self.logger.info("추천 결과 저장 시작")
        
        # 실제 구현에서는 데이터베이스나 캐시에 저장
        # 여기서는 로깅으로 대체
        for user_id, products in recommendations.items():
            self.logger.info(f"사용자 {user_id}: {products}")
    
    def run_batch_job(self):
        """
        배치 작업 실행 메인 함수
        """
        start_time = time.time()
        self.logger.info("배치 작업 시작")
        
        try:
            # 1. 데이터 추출
            yesterday = datetime.now() - timedelta(days=1)
            today = datetime.now()
            
            raw_data = self.extract_user_behavior_data(yesterday, today)
            
            # 2. 데이터 변환
            user_item_matrix = self.transform_data(raw_data)
            
            # 3. 추천 생성 (청크 단위 처리)
            recommendations = self.generate_recommendations(user_item_matrix)
            
            # 4. 결과 저장
            self.load_recommendations(recommendations)
            
            # 5. 작업 완료 로깅
            end_time = time.time()
            processing_time = end_time - start_time
            self.logger.info(f"배치 작업 완료 (처리 시간: {processing_time:.2f}초)")
            
        except Exception as e:
            self.logger.error(f"배치 작업 실패: {str(e)}")
            raise

# 스케줄러와 함께 사용하는 예시
def daily_recommendation_job():
    """
    일일 추천 배치 작업
    """
    processor = RecommendationBatchProcessor(chunk_size=5000)
    processor.run_batch_job()

# 실행 예시
if __name__ == "__main__":
    daily_recommendation_job()

Python + PySpark: EOD 집계 배치

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# EOD 집계 배치 예시 (PySpark)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("EOD_Reconciliation").getOrCreate()

# 1. 데이터 추출
df = spark.read.jdbc(url=DB_URL, table="transactions",
                     properties=DB_PROPS).filter("date = current_date()")

# 2. 집계 변환
agg = df.groupBy("account_id").sum("amount").withColumnRenamed("sum(amount)", "daily_total")

# 3. 로드
agg.write.mode("overwrite").jdbc(url=DW_URL, table="eod_totals", properties=DW_PROPS)

# 4. 오류 대응 + 로깅
try:
    # 실행 완료
    pass
except Exception as e:
    # 알림, 재시도 로직 등
    raise
spark.stop()

Python: 파일 읽기

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 예시: Python을 활용한 간단한 배치 처리 스크립트
import csv
import time

def batch_process(input_file, output_file):
    """
    대량의 데이터를 일괄적으로 처리하여 변환 결과를 저장하는 예시 함수
    """
    with open(input_file, 'r') as infile, open(output_file, 'w', newline='') as outfile:
        reader = csv.DictReader(infile)
        fieldnames = reader.fieldnames + ['processed_at']
        writer = csv.DictWriter(outfile, fieldnames=fieldnames)
        writer.writeheader()
        for row in reader:
            # 데이터 변환 로직(예: 타임스탬프 추가)
            row['processed_at'] = time.strftime('%Y-%m-%d %H:%M:%S')
            writer.writerow(row)

# 스케줄러(Cron 등)에서 정기적으로 호출
batch_process('input.csv', 'output.csv')

실무에서 효과적으로 적용하기 위한 고려사항 및 주의할 점

카테고리고려 항목설명권장 사항대표 도구/기술
스케줄 관리스케줄링 충돌 방지동일 자원 접근 배치 간 충돌 방지실행 시간 분산, 큐잉 처리Apache Airflow, AWS Step Functions
워크플로우 및 의존성 관리작업 간 선후 관계 명확히 정의 필요DAG 기반 설계, 조건 분기 관리Prefect, Azure Data Factory
스케줄러 통합작업 스케줄의 중앙 집중적 통제통합 스케줄러 도입Airflow, Control-M
장애 대응 및 복구체크포인트/재시도 설계실패 시 중단 지점부터 재실행정기적 상태 저장, 청크 재처리 적용Spark Checkpoint, Retry Policies
트랜잭션 및 롤백 관리데이터 상태 변경 시 원자성 보장 필요DB 트랜잭션 또는 청크 단위 트랜잭션 로그 적용Spring Batch, Kafka Transactions
실행 로그 기반 복구장애 발생 시 원인 추적 및 대상 청크 복구세밀한 로그 기록, 재처리 가능 로그 저장ELK Stack, Cloud Logging
데이터 무결성 보장데이터 품질/일관성 검증입력/출력 간 정확성 확보, 수량·형식 오류 방지체크섬, 데이터 프로파일링, Validation Rule 정의Great Expectations, Deequ
입력 데이터 사전 검증ETL 시작 전 품질 문제 탐지Schema Validation, Null/Outlier 체크JSON Schema, Marshmallow
출력 데이터 검증출력 결과값 신뢰성 보장샘플링 비교, 요약값 해시, 히스토그램 검증Custom QA Pipeline
성능 및 자원 최적화청크 크기 조절처리 효율성과 메모리 사용량 균형사전 벤치마킹 통한 최적 크기 산정Spring Batch, Spark Partitioning
리소스 할당 최적화메모리, CPU, 네트워크 등 과부하 방지사전 예측 기반 Auto-scaling 설계K8s HPA, AWS Batch
비용 효율화비효율적 리소스 사용에 따른 과금 방지Spot 인스턴스, 실행 시간 모니터링, Job 종료 조건 설정AWS Fargate, Google Cloud Composer
보안 및 컴플라이언스접근 제어 및 암호화 적용민감 정보 처리 시 권한 분리 및 암호화Role-Based Access Control (RBAC), KMS 연동IAM, KMS, Vault
규정 대응 (GDPR 등)데이터 보호법 등 외부 감사 대응마스킹, 감사 로그, 민감 필드 비식별화Data Catalog, Data Loss Prevention
감사 로그 기록작업/접근 내역 추적 가능하도록 로그화사용자/작업별 세부 이벤트 기록Cloud Audit Logs, Splunk
운영 및 모니터링실행 상태 및 성능 모니터링배치 진행률, 처리량, 실패율 추적대시보드 구축, 알림 연동, 지표 기반 Alert 설정Prometheus + Grafana, CloudWatch
알림 및 자동 대응이상 발생 시 실시간 알림 및 대응Slack/SNS 연동, 알림 기반 자동 롤백/재시작PagerDuty, Opsgenie

최적화하기 위한 고려사항 및 주의할 점

카테고리고려 요소설명권장 사항 및 최적화 전략
1. 리소스 최적화메모리 활용대용량 데이터 처리 시 힙 메모리 초과 방지 및 GC 부하 최소화Executor 메모리 설정 조정, 인메모리 캐시, 지연 로딩
병렬 처리 수준 조절과도한 병렬화는 오히려 자원 경합 초래스레드/워크 수 조정, 백프레셔 (backpressure) 적용
리소스 스케줄링피크 시간 자원 충돌 방지비업무 시간대 작업 예약, 우선순위 기반 실행
캐시 및 임시 저장소 활용반복 연산이나 조회 성능 최적화를 위한 임시 데이터 저장Redis, in-memory cache, 임시 파일 활용
2. 처리 성능병렬/분산 처리독립적인 작업들을 병렬 또는 분산 처리하여 처리량 극대화Spark, Hadoop, 병렬 스레드 처리
I/O 최적화디스크 또는 네트워크 병목 해소컬럼형 포맷 (Parquet), 인덱스 최적화, 파티셔닝, 압축
데이터 전송 최적화네트워크 대역폭 사용량 감소데이터 배치 전송, 압축 전송 적용
3. 데이터 처리 전략파티셔닝 전략데이터 분할을 통해 병렬 처리 및 where 절 필터링 성능 개선날짜, 범위 기반 파티셔닝, 파티션 푸시다운 적용
데이터 스킵불필요한 데이터 조회 제거로 I/O 비용 절감파티션 기반 필터링, 메타데이터 활용
청크 크기 설정과소/과대 설정 시 성능 저하 또는 장애 발생 가능10K~100K rows 수준에서 실험 후 설정
데이터 양 추적갑작스런 입력 증가 시 병목 또는 장애 발생사전 검증, 입력 샘플링, 데이터 프로파일링 도구 사용
4. 시스템 안정성중복 실행 방지중복된 배치 실행으로 인한 데이터 오염 방지실행 이력 저장, idempotent 처리, 로그 기반 실행 기록 검증
작업 실패 복구 전략장애 발생 시 전체 작업 실패 방지체크포인트 기반 재시도, 단계별 커밋, 청크 단위 격리
실행 오류 및 병목 탐지오류 지점 파악과 지속적인 성능 개선모니터링 도구 (Prometheus, Datadog), 로그 시각화
5. 실행 계획 및 운영작업 분산 스케줄링시스템 전체 부하 균형 조절크론/스케줄러 기반 시간 분산, 우선순위 기반 운영
전략적 실행 간격 설정업무 영향과 처리 속도를 균형 있게 조정Full vs Incremental 배치 구성, 주간/야간 전략
디버깅 및 테스트 전략문제 발생 시 원인 식별 및 반복 테스트 효율화로그 샘플링, A/B 테스트 기반 검증, 시뮬레이션 환경 운영

주제와 관련하여 주목할 내용

카테고리핵심 주제핵심 항목설명 및 의미
실행 아키텍처순차 vs 병렬 처리순차 처리, 병렬 처리순차는 단순하지만 느림, 병렬은 빠르지만 동기화와 리소스 조절 필요
클라우드 환경서버리스 배치 처리AWS Lambda, Google Cloud Functions인프라 관리 없이 이벤트 기반 실행 가능, 작은 단위 배치에 적합
클라우드 환경관리형 배치 서비스AWS Batch, Azure Batch컴퓨팅 자원 자동 할당/해제, 대규모 배치 처리에 적합
오케스트레이션워크플로우 제어Apache Airflow, Prefect, AWS Step Functions작업 의존성, 실패 대응, 재시도 정책 등을 코드로 선언하고 자동 제어 가능
실행 제어 및 자동화스케줄링 정책정기 스케줄, 이벤트 트리거 기반시간 기반 실행 (cron) 또는 이벤트 기반 트리거 가능
데이터 처리 방식파이프라인 처리ETL/ELT, 데이터 흐름 기반 처리단계별 처리 (추출 - 변환 - 적재) 로 구성, 병렬 처리 및 장애 대응 구조와 결합 가능
성능 최적화처리 효율성 향상파티셔닝, 병렬화, 인크리멘탈 로딩I/O 병목 완화, 중복 제거, 처리 시간 및 리소스 절감
신뢰성 및 복구장애 복구 메커니즘체크포인트, 트랜잭션 복원, 재시도 정책실패 시 중단점부터 이어서 실행 가능, 멱등성 처리 등으로 안정성 확보
모니터링 및 가시성옵저버빌리티 확보Prometheus, Grafana, 로깅 시스템작업 진행률, 성능 지표 시각화 및 알림 연동을 통한 실시간 대응
데이터 품질 관리데이터 검증 및 정합성스키마 검증, 데이터 프로파일링, 품질 규칙입력/출력의 정확성 및 완전성 확보, 오류 탐지 및 자동 리포팅 지원

반드시 학습해야할 내용

카테고리주제항목설명
1. 아키텍처 패턴배치 처리 패턴ETL / ELT추출 → 변환 → 적재 방식 또는 순서를 바꾼 현대적 처리 패턴
스트리밍 병합 패턴Lambda / Kappa Architecture실시간 처리와 배치 처리를 통합하여 실시간성과 일괄성의 균형 확보
분산 처리 구조 설계수평 확장, 파티셔닝고가용성 및 확장 가능한 데이터 파이프라인 설계를 위한 핵심 기법
2. 데이터 처리배치 처리 기본 개념일괄 처리 개념 이해트리거 기반이 아닌 주기적/대량 데이터 중심의 처리 전략
병렬/분산 처리 프레임워크Spark, Hadoop, Spring Batch대규모 데이터를 효율적으로 처리할 수 있는 엔진과 실행 방식 이해
3. 시스템 구성 요소워크플로 오케스트레이션Apache Airflow, Step Functions배치 작업 간 의존성, 순서, 재시도 등의 흐름 제어 및 자동화
작업 스케줄링Cron, Trigger 기반 실행시간 또는 이벤트 기반으로 워크플로 실행 제어
관리형 배치 플랫폼AWS Batch, Azure Data Factory클라우드 기반의 확장 가능하고 유지보수가 쉬운 배치 플랫폼 활용
4. 성능 최적화 및 복구메모리 관리 / 튜닝GC, Memory ConfigOut-of-Memory 방지 및 안정성 확보를 위한 세부 설정
체크포인트 및 재시도 전략실패 지점 복구, 상태 저장중간 상태 저장을 통해 장애 발생 시 재시도 및 롤백 구현
스케일링 전략Auto Scaling, 분산 처리처리량 증가 시 자동 확장 또는 부하 분산 처리 구조 설계
5. 클라우드 기반 배치클라우드 환경 구성 및 사용GCP Dataflow, AWS Glue클라우드 네이티브 환경에서 배치 파이프라인을 구성하고 운영하는 역량
서버리스 오케스트레이션Step Functions, Cloud Composer유지보수 없는 상태에서 복잡한 배치 워크플로를 구성 가능
6. 데이터 품질 및 신뢰성데이터 검증 및 품질 보장무결성, 정합성, 오류 감지배치 처리 결과의 품질과 신뢰성 확보를 위한 사전/사후 검증 기법
데이터 중복 처리 방지Idempotency, 실행 기록 관리배치 중복 실행으로 인한 데이터 오염 방지

용어 정리

카테고리용어설명
1. 기본 개념 및 처리 방식배치 처리 (Batch Processing)대용량 데이터를 일정 주기나 조건에 따라 일괄적으로 처리하는 방식
청크 (Chunk)대용량 데이터를 작은 단위로 나누어 처리하여 메모리 효율성과 장애 복원력 확보
배치 윈도우 (Batch Window)배치 작업이 실행되는 특정 시간대 (오프피크 시간 활용)
ETL (Extract, Transform, Load)데이터를 추출 → 변환 → 적재하는 전통적 배치 처리 패턴
ELT (Extract, Load, Transform)데이터를 적재한 후 변환하는 현대적 배치/분산 처리 패턴
병렬 처리 (Parallel Processing)여러 스레드 또는 노드에서 동시에 처리하여 성능을 향상시키는 방식
2. 아키텍처 및 설계 패턴Pipe-and-Filter각 단계 (필터) 를 파이프로 연결하여 데이터를 순차적으로 처리하는 구조화된 아키텍처
Lambda Architecture배치 처리 + 실시간 스트리밍 처리를 통합한 아키텍처 모델
Kappa Architecture배치 없이 스트리밍 기반으로 모든 처리를 수행하는 실시간 지향 아키텍처
DAG (Directed Acyclic Graph)워크플로우의 실행 순서를 나타내는 방향성 비순환 그래프 (의존성 표현용)
3. 실행 및 스케줄링Scheduler배치 작업의 실행 시점과 주기를 제어하는 도구 (예: Cron, Airflow 등)
CronUNIX/Linux 기반의 대표적인 시간 기반 스케줄링 도구
Batch Window배치 작업을 집중적으로 수행하는 시간대로, 시스템 부하를 분산하는 데 사용됨
4. 장애 복구 및 안정성Checkpoint작업 도중 상태를 저장하여 실패 발생 시 복구가 가능한 지점 확보
Backpressure소비자의 처리 속도보다 공급 속도가 빠를 때 속도를 제어하는 메커니즘
Watermark스트리밍에서 이벤트 시간 기준 진행 상태를 추적하는 메타데이터
5. 실행 환경 및 플랫폼YARNHadoop 에코시스템의 클러스터 자원 관리 플랫폼
RDD (Resilient Distributed Dataset)Spark 에서 불변성을 가진 분산 데이터 컬렉션 구조
Spot 인스턴스AWS 에서 저비용으로 제공하는 선점형 컴퓨팅 인스턴스
6. 데이터 분산 및 처리 최적화Partitioning (파티셔닝)데이터를 논리적으로 분할하여 병렬 처리와 I/O 최적화를 가능하게 함
Sharding (샤딩)데이터를 여러 DB 서버에 물리적으로 분산 저장하여 확장성과 성능 확보

참고 및 출처