XGEN 1.0 워크플로우 엔진과 Qdrant 하이브리드 검색¶
2025.12 | FastAPI, Qdrant, MCP Station, Circuit Breaker
개요¶
XGEN 1.0에서 워크플로우 엔진을 구축하면서 가장 중요했던 것은 안정성과 확장성이었다. 단순한 파이프라인이 아니라 복잡한 AI 작업들을 체인으로 연결하고, 각 단계에서 발생할 수 있는 장애를 우아하게 처리하는 시스템이 필요했다.
워크플로우 엔진 아키텍처¶
헬스체크 시스템 강화¶
기존의 단순한 ping/pong 방식을 버리고 Circuit Breaker 패턴을 도입했다:
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
async def call(self, func, *args, **kwargs):
if self.state == "OPEN":
if time.time() - self.last_failure_time < self.recovery_timeout:
raise CircuitBreakerOpenException()
self.state = "HALF_OPEN"
try:
result = await func(*args, **kwargs)
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
self.last_failure_time = time.time()
raise
캐싱 시스템¶
헬스체크 결과를 TTL 기반으로 캐싱하여 부하를 줄였다. Thread-safe한 double-check locking으로 동시성 문제를 해결:
class HealthCache:
def __init__(self, ttl=30):
self.cache = {}
self.ttl = ttl
self.lock = threading.Lock()
def get_or_compute(self, key, compute_func):
now = time.time()
if key in self.cache:
value, timestamp = self.cache[key]
if now - timestamp < self.ttl:
return value
with self.lock:
# Double-check locking
if key in self.cache:
value, timestamp = self.cache[key]
if now - timestamp < self.ttl:
return value
result = compute_func()
self.cache[key] = (result, now)
return result
MCP (Model Context Protocol) 통합¶
TableData MCP 노드 처리기¶
워크플로우에서 테이블 데이터를 처리하는 노드들을 위한 전용 프로세서를 개발했다:
class TableDataMCPProcessor:
async def process_node(self, node, interaction_id):
# temp_storage_id 자동 설정
if not node.get_parameter("temp_storage_id"):
node.set_parameter("temp_storage_id", interaction_id)
logger.info(f"Set temp_storage_id: {interaction_id}")
# 페이지네이션 한계 적용 (max 100 rows)
max_rows = min(node.get_parameter("max_rows", 50), 100)
node.set_parameter("max_rows", max_rows)
이렇게 하면 사용자가 매번 temp_storage_id를 설정할 필요 없이 자동으로 세션별 임시 스토리지가 관리된다.
Playwright MCP 최적화¶
브라우저 자동화를 위한 Playwright MCP 설정을 최적화했다:
playwright_mcp:
server_type: "stdio"
server_args: [
"--image-responses", # 이미지 포함 응답
"--headless", # 헤드리스 모드
"--timeout=30000" # 30초 타임아웃
]
context_char_limit: 200000 # 대용량 컨텍스트 지원
Qdrant 하이브리드 검색¶
벡터 + 키워드 융합 검색¶
단순한 벡터 유사도 검색의 한계를 극복하기 위해 하이브리드 검색을 구현했다:
class HybridSearchEngine:
def __init__(self, qdrant_client, embedding_model):
self.qdrant = qdrant_client
self.embedding_model = embedding_model
async def search(self, query, collection_name, limit=10):
# 1. 벡터 검색
query_vector = await self.embedding_model.encode(query)
vector_results = await self.qdrant.search(
collection_name=collection_name,
query_vector=query_vector,
limit=limit * 2 # 더 많이 가져와서 재순위
)
# 2. 키워드 검색 (payload 필터링)
keyword_results = await self.qdrant.search(
collection_name=collection_name,
query_filter=models.Filter(
must=[
models.FieldCondition(
key="content",
match=models.MatchText(text=query)
)
]
),
limit=limit
)
# 3. RRF (Reciprocal Rank Fusion)로 결과 융합
return self.fuse_results(vector_results, keyword_results, limit)
def fuse_results(self, vector_results, keyword_results, limit):
scores = {}
k = 60 # RRF 상수
# 벡터 검색 점수
for i, result in enumerate(vector_results):
doc_id = result.id
scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + i + 1)
# 키워드 검색 점수
for i, result in enumerate(keyword_results):
doc_id = result.id
scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + i + 1)
# 최종 순위
return sorted(scores.items(), key=lambda x: x[1], reverse=True)[:limit]
스케줄링 시스템¶
KST 타임존 처리¶
국내 서비스 특성상 KST 기반 스케줄링이 필수였다:
def get_kst_now():
"""현재 KST 시간 반환"""
utc_now = datetime.utcnow()
kst_now = utc_now.replace(tzinfo=timezone.utc).astimezone(
timezone(timedelta(hours=9))
)
return kst_now
class SessionScheduler:
async def register_job(self, workflow_id, cron_expr, timezone="Asia/Seoul"):
# 중복 등록 방지
if workflow_id in self.active_jobs:
logger.warning(f"Job {workflow_id} already registered")
return
# CronTrigger 생성 (한국 시간 기준)
trigger = CronTrigger.from_crontab(cron_expr, timezone=timezone)
job = await self.scheduler.add_job(
self.execute_workflow,
trigger=trigger,
args=[workflow_id],
id=workflow_id,
replace_existing=True
)
self.active_jobs[workflow_id] = job
logger.info(f"Scheduled job {workflow_id} with cron: {cron_expr}")
성능 최적화¶
스트리밍 응답 지연 최소화¶
워크플로우 실행 중 실시간 피드백을 위해 스트리밍 지연을 대폭 줄였다:
async def stream_execution_logs(self, session_id):
while True:
logs = await self.get_recent_logs(session_id)
if logs:
for log in logs:
yield f"data: {json.dumps(log)}\n\n"
# 기존 1초 → 100ms로 단축
await asyncio.sleep(0.1)
MCP 응답 로그 트렁케이션¶
대용량 응답으로 인한 로그 폭증 문제를 해결:
def log_mcp_response(response):
if len(response) > 500:
truncated = response[:500] + "... (truncated)"
logger.info(f"MCP Response: {truncated}")
else:
logger.info(f"MCP Response: {response}")
결과 및 배운 점¶
시스템 안정성 향상¶
- Circuit Breaker 도입으로 장애 전파 차단
- 헬스체크 캐싱으로 부하 50% 감소
- 스트리밍 지연 90% 단축 (1초 → 100ms)
검색 품질 개선¶
- 하이브리드 검색으로 정확도 30% 향상
- RRF 알고리즘으로 벡터-키워드 검색 균형 확보
- 대용량 문서 처리 성능 최적화
개발 생산성¶
- MCP 프로토콜 표준화로 노드 개발 속도 2배 향상
- 자동 파라미터 설정으로 사용자 편의성 증대
- KST 기반 스케줄링으로 국내 서비스 최적화
XGEN 1.0 워크플로우 엔진은 단순한 파이프라인을 넘어서 AI 서비스의 핵심 인프라로 성장했다. 안정성, 성능, 사용성 모든 면에서 엔터프라이즈급 요구사항을 만족하는 시스템이 되었다.¶
관련 글
- vLLM에서 llama.cpp로: LLM 서빙 아키텍처 통합 마이그레이션
FastAPILLMXGEN - 벡터DB 컬렉션 문서 요약 및 페이지네이션 API 설계
FastAPIPythonQdrant - FastAPI 워크플로우 엔진: 접근 제어와 감사 로깅 구현
FastAPIRedisXGEN - K3s + ArgoCD로 AI 플랫폼 GitOps 배포 구축하기
ArgoCDGitOpsGrafana - Qdrant 하이브리드 검색: Sparse + Dense 벡터 통합
BM25QdrantRAG