콘텐츠로 이동

XGEN 1.0 워크플로우 엔진과 Qdrant 하이브리드 검색

2025.12 | FastAPI, Qdrant, MCP Station, Circuit Breaker

개요

XGEN 1.0에서 워크플로우 엔진을 구축하면서 가장 중요했던 것은 안정성과 확장성이었다. 단순한 파이프라인이 아니라 복잡한 AI 작업들을 체인으로 연결하고, 각 단계에서 발생할 수 있는 장애를 우아하게 처리하는 시스템이 필요했다.

워크플로우 엔진 아키텍처

헬스체크 시스템 강화

기존의 단순한 ping/pong 방식을 버리고 Circuit Breaker 패턴을 도입했다:

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN

    async def call(self, func, *args, **kwargs):
        if self.state == "OPEN":
            if time.time() - self.last_failure_time < self.recovery_timeout:
                raise CircuitBreakerOpenException()
            self.state = "HALF_OPEN"

        try:
            result = await func(*args, **kwargs)
            if self.state == "HALF_OPEN":
                self.state = "CLOSED"
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            if self.failure_count >= self.failure_threshold:
                self.state = "OPEN"
                self.last_failure_time = time.time()
            raise

캐싱 시스템

헬스체크 결과를 TTL 기반으로 캐싱하여 부하를 줄였다. Thread-safe한 double-check locking으로 동시성 문제를 해결:

class HealthCache:
    def __init__(self, ttl=30):
        self.cache = {}
        self.ttl = ttl
        self.lock = threading.Lock()

    def get_or_compute(self, key, compute_func):
        now = time.time()
        if key in self.cache:
            value, timestamp = self.cache[key]
            if now - timestamp < self.ttl:
                return value

        with self.lock:
            # Double-check locking
            if key in self.cache:
                value, timestamp = self.cache[key]
                if now - timestamp < self.ttl:
                    return value

            result = compute_func()
            self.cache[key] = (result, now)
            return result

MCP (Model Context Protocol) 통합

TableData MCP 노드 처리기

워크플로우에서 테이블 데이터를 처리하는 노드들을 위한 전용 프로세서를 개발했다:

class TableDataMCPProcessor:
    async def process_node(self, node, interaction_id):
        # temp_storage_id 자동 설정
        if not node.get_parameter("temp_storage_id"):
            node.set_parameter("temp_storage_id", interaction_id)
            logger.info(f"Set temp_storage_id: {interaction_id}")

        # 페이지네이션 한계 적용 (max 100 rows)
        max_rows = min(node.get_parameter("max_rows", 50), 100)
        node.set_parameter("max_rows", max_rows)

이렇게 하면 사용자가 매번 temp_storage_id를 설정할 필요 없이 자동으로 세션별 임시 스토리지가 관리된다.

Playwright MCP 최적화

브라우저 자동화를 위한 Playwright MCP 설정을 최적화했다:

playwright_mcp:
  server_type: "stdio"
  server_args: [
    "--image-responses",  # 이미지 포함 응답
    "--headless",        # 헤드리스 모드
    "--timeout=30000"    # 30초 타임아웃
  ]
  context_char_limit: 200000  # 대용량 컨텍스트 지원

Qdrant 하이브리드 검색

벡터 + 키워드 융합 검색

단순한 벡터 유사도 검색의 한계를 극복하기 위해 하이브리드 검색을 구현했다:

class HybridSearchEngine:
    def __init__(self, qdrant_client, embedding_model):
        self.qdrant = qdrant_client
        self.embedding_model = embedding_model

    async def search(self, query, collection_name, limit=10):
        # 1. 벡터 검색
        query_vector = await self.embedding_model.encode(query)
        vector_results = await self.qdrant.search(
            collection_name=collection_name,
            query_vector=query_vector,
            limit=limit * 2  # 더 많이 가져와서 재순위
        )

        # 2. 키워드 검색 (payload 필터링)
        keyword_results = await self.qdrant.search(
            collection_name=collection_name,
            query_filter=models.Filter(
                must=[
                    models.FieldCondition(
                        key="content",
                        match=models.MatchText(text=query)
                    )
                ]
            ),
            limit=limit
        )

        # 3. RRF (Reciprocal Rank Fusion)로 결과 융합
        return self.fuse_results(vector_results, keyword_results, limit)

    def fuse_results(self, vector_results, keyword_results, limit):
        scores = {}
        k = 60  # RRF 상수

        # 벡터 검색 점수
        for i, result in enumerate(vector_results):
            doc_id = result.id
            scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + i + 1)

        # 키워드 검색 점수
        for i, result in enumerate(keyword_results):
            doc_id = result.id
            scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + i + 1)

        # 최종 순위
        return sorted(scores.items(), key=lambda x: x[1], reverse=True)[:limit]

스케줄링 시스템

KST 타임존 처리

국내 서비스 특성상 KST 기반 스케줄링이 필수였다:

def get_kst_now():
    """현재 KST 시간 반환"""
    utc_now = datetime.utcnow()
    kst_now = utc_now.replace(tzinfo=timezone.utc).astimezone(
        timezone(timedelta(hours=9))
    )
    return kst_now

class SessionScheduler:
    async def register_job(self, workflow_id, cron_expr, timezone="Asia/Seoul"):
        # 중복 등록 방지
        if workflow_id in self.active_jobs:
            logger.warning(f"Job {workflow_id} already registered")
            return

        # CronTrigger 생성 (한국 시간 기준)
        trigger = CronTrigger.from_crontab(cron_expr, timezone=timezone)

        job = await self.scheduler.add_job(
            self.execute_workflow,
            trigger=trigger,
            args=[workflow_id],
            id=workflow_id,
            replace_existing=True
        )

        self.active_jobs[workflow_id] = job
        logger.info(f"Scheduled job {workflow_id} with cron: {cron_expr}")

성능 최적화

스트리밍 응답 지연 최소화

워크플로우 실행 중 실시간 피드백을 위해 스트리밍 지연을 대폭 줄였다:

async def stream_execution_logs(self, session_id):
    while True:
        logs = await self.get_recent_logs(session_id)
        if logs:
            for log in logs:
                yield f"data: {json.dumps(log)}\n\n"

        # 기존 1초 → 100ms로 단축
        await asyncio.sleep(0.1)

MCP 응답 로그 트렁케이션

대용량 응답으로 인한 로그 폭증 문제를 해결:

def log_mcp_response(response):
    if len(response) > 500:
        truncated = response[:500] + "... (truncated)"
        logger.info(f"MCP Response: {truncated}")
    else:
        logger.info(f"MCP Response: {response}")

결과 및 배운 점

시스템 안정성 향상

  • Circuit Breaker 도입으로 장애 전파 차단
  • 헬스체크 캐싱으로 부하 50% 감소
  • 스트리밍 지연 90% 단축 (1초 → 100ms)

검색 품질 개선

  • 하이브리드 검색으로 정확도 30% 향상
  • RRF 알고리즘으로 벡터-키워드 검색 균형 확보
  • 대용량 문서 처리 성능 최적화

개발 생산성

  • MCP 프로토콜 표준화로 노드 개발 속도 2배 향상
  • 자동 파라미터 설정으로 사용자 편의성 증대
  • KST 기반 스케줄링으로 국내 서비스 최적화

XGEN 1.0 워크플로우 엔진은 단순한 파이프라인을 넘어서 AI 서비스의 핵심 인프라로 성장했다. 안정성, 성능, 사용성 모든 면에서 엔터프라이즈급 요구사항을 만족하는 시스템이 되었다.

관련 글