Skip to content
SON BLOG
Go back

FastAPI 워크플로우 엔진에 Qdrant 하이브리드 검색 붙이기

Edit page
시리즈 · XGEN 개발기 (3 / 6편)
  1. Code Assistant 개발기 - AI 기반 개발 도우미 시스템 구축
  2. vLLM + llama.cpp GPU 모델 서빙 최적화 실전기
  3. FastAPI 워크플로우 엔진에 Qdrant 하이브리드 검색 붙이기 — 현재 글
  4. vLLM에서 llama.cpp로: LLM 서빙 아키텍처 통합 마이그레이션
  5. 임베딩 전용 서버 분리와 대용량 배치 처리 최적화
  6. FastAPI 워크플로우 엔진: 접근 제어와 감사 로깅 구현

XGEN 1.0 워크플로우 엔진과 Qdrant 하이브리드 검색

2025.12 | FastAPI, Qdrant, MCP Station, Circuit Breaker

개요

XGEN 1.0에서 워크플로우 엔진을 구축하면서 가장 중요했던 것은 안정성과 확장성이었다. 단순한 파이프라인이 아니라 복잡한 AI 작업들을 체인으로 연결하고, 각 단계에서 발생할 수 있는 장애를 우아하게 처리하는 시스템이 필요했다.

워크플로우 엔진 아키텍처

헬스체크 시스템 강화

기존의 단순한 ping/pong 방식을 버리고 Circuit Breaker 패턴을 도입했다:

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
    
    async def call(self, func, *args, **kwargs):
        if self.state == "OPEN":
            if time.time() - self.last_failure_time < self.recovery_timeout:
                raise CircuitBreakerOpenException()
            self.state = "HALF_OPEN"
        
        try:
            result = await func(*args, **kwargs)
            if self.state == "HALF_OPEN":
                self.state = "CLOSED"
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            if self.failure_count >= self.failure_threshold:
                self.state = "OPEN"
                self.last_failure_time = time.time()
            raise

캐싱 시스템

헬스체크 결과를 TTL 기반으로 캐싱하여 부하를 줄였다. Thread-safe한 double-check locking으로 동시성 문제를 해결:

class HealthCache:
    def __init__(self, ttl=30):
        self.cache = {}
        self.ttl = ttl
        self.lock = threading.Lock()
    
    def get_or_compute(self, key, compute_func):
        now = time.time()
        if key in self.cache:
            value, timestamp = self.cache[key]
            if now - timestamp < self.ttl:
                return value
        
        with self.lock:
            # Double-check locking
            if key in self.cache:
                value, timestamp = self.cache[key]
                if now - timestamp < self.ttl:
                    return value
            
            result = compute_func()
            self.cache[key] = (result, now)
            return result

MCP (Model Context Protocol) 통합

TableData MCP 노드 처리기

워크플로우에서 테이블 데이터를 처리하는 노드들을 위한 전용 프로세서를 개발했다:

class TableDataMCPProcessor:
    async def process_node(self, node, interaction_id):
        # temp_storage_id 자동 설정
        if not node.get_parameter("temp_storage_id"):
            node.set_parameter("temp_storage_id", interaction_id)
            logger.info(f"Set temp_storage_id: {interaction_id}")
        
        # 페이지네이션 한계 적용 (max 100 rows)
        max_rows = min(node.get_parameter("max_rows", 50), 100)
        node.set_parameter("max_rows", max_rows)

이렇게 하면 사용자가 매번 temp_storage_id를 설정할 필요 없이 자동으로 세션별 임시 스토리지가 관리된다.

Playwright MCP 최적화

브라우저 자동화를 위한 Playwright MCP 설정을 최적화했다:

playwright_mcp:
  server_type: "stdio"
  server_args: [
    "--image-responses",  # 이미지 포함 응답
    "--headless",        # 헤드리스 모드
    "--timeout=30000"    # 30초 타임아웃
  ]
  context_char_limit: 200000  # 대용량 컨텍스트 지원

Qdrant 하이브리드 검색

벡터 + 키워드 융합 검색

단순한 벡터 유사도 검색의 한계를 극복하기 위해 하이브리드 검색을 구현했다:

class HybridSearchEngine:
    def __init__(self, qdrant_client, embedding_model):
        self.qdrant = qdrant_client
        self.embedding_model = embedding_model
    
    async def search(self, query, collection_name, limit=10):
        # 1. 벡터 검색
        query_vector = await self.embedding_model.encode(query)
        vector_results = await self.qdrant.search(
            collection_name=collection_name,
            query_vector=query_vector,
            limit=limit * 2  # 더 많이 가져와서 재순위
        )
        
        # 2. 키워드 검색 (payload 필터링)
        keyword_results = await self.qdrant.search(
            collection_name=collection_name,
            query_filter=models.Filter(
                must=[
                    models.FieldCondition(
                        key="content",
                        match=models.MatchText(text=query)
                    )
                ]
            ),
            limit=limit
        )
        
        # 3. RRF (Reciprocal Rank Fusion)로 결과 융합
        return self.fuse_results(vector_results, keyword_results, limit)
    
    def fuse_results(self, vector_results, keyword_results, limit):
        scores = {}
        k = 60  # RRF 상수
        
        # 벡터 검색 점수
        for i, result in enumerate(vector_results):
            doc_id = result.id
            scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + i + 1)
        
        # 키워드 검색 점수
        for i, result in enumerate(keyword_results):
            doc_id = result.id
            scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + i + 1)
        
        # 최종 순위
        return sorted(scores.items(), key=lambda x: x[1], reverse=True)[:limit]

스케줄링 시스템

KST 타임존 처리

국내 서비스 특성상 KST 기반 스케줄링이 필수였다:

def get_kst_now():
    """현재 KST 시간 반환"""
    utc_now = datetime.utcnow()
    kst_now = utc_now.replace(tzinfo=timezone.utc).astimezone(
        timezone(timedelta(hours=9))
    )
    return kst_now

class SessionScheduler:
    async def register_job(self, workflow_id, cron_expr, timezone="Asia/Seoul"):
        # 중복 등록 방지
        if workflow_id in self.active_jobs:
            logger.warning(f"Job {workflow_id} already registered")
            return
        
        # CronTrigger 생성 (한국 시간 기준)
        trigger = CronTrigger.from_crontab(cron_expr, timezone=timezone)
        
        job = await self.scheduler.add_job(
            self.execute_workflow,
            trigger=trigger,
            args=[workflow_id],
            id=workflow_id,
            replace_existing=True
        )
        
        self.active_jobs[workflow_id] = job
        logger.info(f"Scheduled job {workflow_id} with cron: {cron_expr}")

성능 최적화

스트리밍 응답 지연 최소화

워크플로우 실행 중 실시간 피드백을 위해 스트리밍 지연을 대폭 줄였다:

async def stream_execution_logs(self, session_id):
    while True:
        logs = await self.get_recent_logs(session_id)
        if logs:
            for log in logs:
                yield f"data: {json.dumps(log)}\n\n"
        
        # 기존 1초 → 100ms로 단축
        await asyncio.sleep(0.1)

MCP 응답 로그 트렁케이션

대용량 응답으로 인한 로그 폭증 문제를 해결:

def log_mcp_response(response):
    if len(response) > 500:
        truncated = response[:500] + "... (truncated)"
        logger.info(f"MCP Response: {truncated}")
    else:
        logger.info(f"MCP Response: {response}")

결과 및 배운 점

시스템 안정성 향상

검색 품질 개선

개발 생산성

XGEN 1.0 워크플로우 엔진은 단순한 파이프라인을 넘어서 AI 서비스의 핵심 인프라로 성장했다. 안정성, 성능, 사용성 모든 면에서 엔터프라이즈급 요구사항을 만족하는 시스템이 되었다.


Edit page
Share this post:

Previous Post
Workflow Execution Panel: 검증과 에러 핸들링 UI 패턴
Next Post
SSE 기반 파일 업로드 진행률 표시 + 취소 기능