Skip to content
SON BLOG
Go back

aiohttp로 임베딩 API 클라이언트 만들기: 타임아웃과 배치 분할 최적화

Edit page

배경

XGEN RAG 시스템의 임베딩은 llama-cpp로 구동되는 별도 임베딩 서버에서 처리한다. XGEN 워크플로우 서비스에서 이 서버에 HTTP 요청을 보내 텍스트를 벡터로 변환한다.

초기 구현은 매 요청마다 새로운 HTTP 세션을 열고 닫았다. 임베딩 요청이 잦은 RAG 시스템에서 이 방식은 세션 생성 오버헤드가 상당했다. 또한 llama-cpp 서버가 무거운 요청을 처리하는 동안 타임아웃이 너무 짧으면 연결이 끊기고, 너무 길면 실제 장애를 감지하지 못한다.

세 가지를 개선했다. 영구 세션 유지, 타임아웃 세분화, 배치 분할 + 재시도.

# 2025-12-01 커밋: feat: Refactor CustomHTTPEmbedding client for improved functionality and logging
# 2025-12-01 커밋: feat: Update timeout settings and batch size in CustomHTTPEmbedding
# 2025-12-30 커밋: feat: Enhance CustomHTTPEmbedding with persistent session management

클래스 구조

# src/service/embedding/custom_http_embedding.py

import aiohttp
import asyncio
import logging
from typing import Optional

logger = logging.getLogger(__name__)

class CustomHTTPEmbedding:
    """llama-cpp 임베딩 서버 HTTP 클라이언트"""

    def __init__(
        self,
        api_url: str,
        api_key: Optional[str] = None,
        timeout_total: int = 120,
        timeout_connect: int = 10,
        timeout_sock_read: int = 60,
        batch_size: int = 5,
        max_retries: int = 3,
    ):
        self.api_url = api_url
        self.api_key = api_key
        self.timeout = aiohttp.ClientTimeout(
            total=timeout_total,
            connect=timeout_connect,
            sock_read=timeout_sock_read,
        )
        self.batch_size = batch_size
        self.max_retries = max_retries
        self._session: Optional[aiohttp.ClientSession] = None
        self._connector: Optional[aiohttp.TCPConnector] = None

영구 세션 관리

TCPConnector 설정

async def _ensure_session(self) -> aiohttp.ClientSession:
    """세션이 닫혔거나 없으면 새로 생성"""
    if self._session is None or self._session.closed:
        self._connector = aiohttp.TCPConnector(
            limit=100,              # 전체 동시 연결 최대
            limit_per_host=30,     # 호스트당 동시 연결 최대
            ttl_dns_cache=300,     # DNS 캐시 5분
            force_close=False,     # keep-alive 활성화
            enable_cleanup_closed=True,
        )

        headers = {"Content-Type": "application/json"}
        if self.api_key:
            headers["Authorization"] = f"Bearer {self.api_key}"

        self._session = aiohttp.ClientSession(
            connector=self._connector,
            timeout=self.timeout,
            headers=headers,
        )
        logger.info("New aiohttp session created for %s", self.api_url)

    return self._session

async def close(self) -> None:
    """세션 명시적 종료"""
    if self._session and not self._session.closed:
        await self._session.close()
        self._session = None
    if self._connector and not self._connector.closed:
        await self._connector.close()
        self._connector = None

매번 aiohttp.ClientSession()을 새로 만들면 TCP 연결을 매번 새로 맺는다. force_close=False로 keep-alive를 활성화하고 커넥터를 재사용하면 반복 요청에서 연결 수립 오버헤드가 없어진다.

타임아웃 세분화

self.timeout = aiohttp.ClientTimeout(
    total=120,       # 전체 요청 완료까지 최대 2분
    connect=10,      # TCP 연결 수립 타임아웃 10초
    sock_read=60,    # 데이터 수신 타임아웃 60초
)

total, connect, sock_read를 분리한 이유가 있다.

초기에는 total=30으로 설정했다가 긴 문서(수천 토큰) 임베딩에서 타임아웃이 발생했다. llama-cpp 서버의 처리 시간 특성을 보고 120초로 늘렸다.

배치 분할 처리

배치 크기 5의 이유

self.batch_size = 5   # llama-cpp 안정성 기준

한 번에 보내는 텍스트 수를 제한하는 이유는 llama-cpp 서버의 메모리 사용량 때문이다. 배치를 크게 보내면 서버 메모리가 급증하고 OOM이 발생할 수 있다. 실험 결과 배치 크기 5가 안정성과 처리 속도의 균형점이었다.

async def embed(self, texts: list[str]) -> list[list[float]]:
    """텍스트 목록을 배치로 분할해서 임베딩"""
    all_embeddings = []

    # 배치 분할
    batches = [
        texts[i:i + self.batch_size]
        for i in range(0, len(texts), self.batch_size)
    ]

    for batch_idx, batch in enumerate(batches):
        logger.debug(
            "Processing batch %d/%d (%d texts)",
            batch_idx + 1, len(batches), len(batch)
        )
        embeddings = await self._embed_batch_with_retry(batch)
        all_embeddings.extend(embeddings)

    return all_embeddings

재시도 로직

async def _embed_batch_with_retry(
    self,
    texts: list[str],
) -> list[list[float]]:
    last_error = None

    for attempt in range(self.max_retries):
        try:
            return await self._embed_batch(texts)

        except aiohttp.ClientConnectorError as e:
            # 연결 자체가 안 됨 — 세션 재생성 후 재시도
            logger.warning(
                "Connection error (attempt %d/%d): %s",
                attempt + 1, self.max_retries, e
            )
            await self.close()  # 세션 초기화
            last_error = e

        except aiohttp.ServerTimeoutError as e:
            logger.warning(
                "Timeout (attempt %d/%d): %s",
                attempt + 1, self.max_retries, e
            )
            last_error = e

        except Exception as e:
            logger.error("Unexpected error: %s", e)
            raise

        # 지수 백오프
        if attempt < self.max_retries - 1:
            wait_time = 2 ** attempt  # 1초, 2초, 4초
            logger.info("Retrying in %ds...", wait_time)
            await asyncio.sleep(wait_time)

    raise RuntimeError(
        f"Failed after {self.max_retries} attempts: {last_error}"
    )

ClientConnectorError는 세션이 오염된 경우도 있어서 세션 자체를 close() 후 재생성한다. 다음 _embed_batch() 호출 시 _ensure_session()이 새 세션을 만들어준다.

실제 임베딩 요청

async def _embed_batch(self, texts: list[str]) -> list[list[float]]:
    session = await self._ensure_session()

    payload = {
        "input": texts,
        "model": "text-embedding",  # llama-cpp 서버 모델명
        "encoding_format": "float",
    }

    async with session.post(
        f"{self.api_url}/v1/embeddings",
        json=payload,
    ) as response:
        response.raise_for_status()
        data = await response.json()

    # OpenAI 호환 응답 파싱: data[].embedding
    if "data" not in data:
        raise ValueError(f"Unexpected response format: {data.keys()}")

    embeddings = [item["embedding"] for item in data["data"]]

    if len(embeddings) != len(texts):
        raise ValueError(
            f"Expected {len(texts)} embeddings, got {len(embeddings)}"
        )

    return embeddings

llama-cpp 서버는 OpenAI API 형식을 따른다. 응답 형태는 {"data": [{"embedding": [...], "index": 0}, ...]}.

응답 건수(len(embeddings))가 요청 건수(len(texts))와 다르면 즉시 에러를 낸다. 이 불일치가 발생하면 벡터 인덱스가 꼬여서 나중에 디버깅하기 어려운 버그가 된다.

컨텍스트 매니저 지원

async def __aenter__(self):
    return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
    await self.close()

async with 문으로 사용할 수 있게 해서 세션 누수를 방지한다.

# 사용 예
async with CustomHTTPEmbedding(api_url="http://localhost:8002") as client:
    embeddings = await client.embed(["안녕하세요", "테스트 문장"])

싱글턴으로 앱 수명 동안 세션 유지

FastAPI 앱에서는 앱 시작 시 한 번 생성하고 앱 종료 시 닫는 방식을 사용했다.

# src/main.py
from contextlib import asynccontextmanager

embedding_client: CustomHTTPEmbedding | None = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global embedding_client
    embedding_client = CustomHTTPEmbedding(
        api_url=settings.EMBEDDING_API_URL,
        batch_size=5,
        timeout_total=120,
    )
    yield
    # 종료 시 세션 정리
    if embedding_client:
        await embedding_client.close()

app = FastAPI(lifespan=lifespan)

lifespan 컨텍스트 매니저로 관리하면 앱이 살아있는 동안 하나의 세션이 유지되고, 종료 시 깔끔하게 정리된다.

로깅 전략

# 배치 시작
logger.debug("Processing batch %d/%d (%d texts)", batch_idx+1, len(batches), len(batch))

# 성공
logger.debug("Batch %d completed in %.2fs", batch_idx+1, elapsed)

# 재시도
logger.warning("Connection error (attempt %d/%d): %s", attempt+1, max_retries, e)

# 세션 재생성
logger.info("New aiohttp session created for %s", self.api_url)

DEBUG 레벨에 배치 처리 상세 정보, WARNING 레벨에 재시도 발생, INFO 레벨에 세션 생성을 기록한다. 운영 환경에서는 INFO 이상만 보이게 해서 로그가 과도하게 쌓이지 않도록 했다.

결과

임베딩 클라이언트는 RAG 파이프라인의 병목이 되기 쉽다. 세션 관리와 타임아웃 설정을 제대로 해두지 않으면 고부하에서 조용히 실패하거나 응답이 무한정 지연되는 문제가 생긴다.


Edit page
Share this post:

Previous Post
vLLM 모델 배포: 샘플링 파라미터 튜닝 가이드
Next Post
Agent Xgen Node: AI 에이전트를 워크플로우 노드로