Skip to content

Commit 62883be

Browse files
a-klosCopilot
andauthored
feat: exponential retry decorator (#88)
This pull request introduces a robust, configurable retry decorator with exponential backoff and rate-limit handling, and integrates it across the RAG stack for both the embedder and summarizer components. The retry behavior is now centrally managed, with clear support for both global and per-component overrides via environment variables and Helm chart values. The documentation has been updated to explain configuration and usage, and the Helm templates and values have been extended to support the new settings. **Retry decorator integration and configuration:** * Added a shared retry decorator (`retry_with_backoff`) in `rag-core-lib`, with support for both sync and async callables, rate-limit awareness, and extensive configuration via environment variables or Helm values. Documentation in `libs/README.md` details usage, configuration, and advanced features. * Updated Helm chart templates and values to define and inject retry-related settings for both backend and admin-backend deployments. This includes new configmaps, environment variable wiring, and appropriate value structure in `infrastructure/rag/values.yaml` and related templates. [[1]](diffhunk://#diff-d72bec7914fc3e7d3fe01a8c0cbdb24832a26956bae5563d109bf8bb19955e0eR27-R35) [[2]](diffhunk://#diff-fc1811e62c75e69c462701871157493af8b72480c2971bfc826f3b2d9c2eacf4R13-R16) [[3]](diffhunk://#diff-9f487482fa54d28d71fff497724bbc6741cced8a1e35c9b8829d1c6bd01dca0aR134-R135) [[4]](diffhunk://#diff-2b6f7f2ec4938055207faa53acf7a300e0ec235db31d1cfb6896703b97292348R109-R110) [[5]](diffhunk://#diff-673dd2d3d4e66a8fd4e45f9c1c9900711313f946bf8b6a89e96c954988fc14f3R200-R207) [[6]](diffhunk://#diff-673dd2d3d4e66a8fd4e45f9c1c9900711313f946bf8b6a89e96c954988fc14f3R330-R337) [[7]](diffhunk://#diff-673dd2d3d4e66a8fd4e45f9c1c9900711313f946bf8b6a89e96c954988fc14f3R465-R472) **Embedder and summarizer retry logic:** * The `StackitEmbedder` (backend) and `LangchainSummarizer` (admin-backend) now both use the shared retry decorator, with per-component settings overriding global defaults as needed. This is documented in detail in `libs/README.md` and supported by new environment variable keys and Helm values. [[1]](diffhunk://#diff-34194a117b05d75d22ca968cdb7d540839dc7a0eb33960fbca668b5a6ade87cbR103-R128) [[2]](diffhunk://#diff-34194a117b05d75d22ca968cdb7d540839dc7a0eb33960fbca668b5a6ade87cbR201-R226) * The dependency injection container for the admin API library (`DependencyContainer`) now wires the new `retry_decorator_settings` and passes it to the summarizer implementation, ensuring the retry logic is properly configured at runtime. [[1]](diffhunk://#diff-8b7c1816cb3e0a40b7965721c550eefdc184c5d914ec023e36527255613381e7R67) [[2]](diffhunk://#diff-8b7c1816cb3e0a40b7965721c550eefdc184c5d914ec023e36527255613381e7R90) [[3]](diffhunk://#diff-8b7c1816cb3e0a40b7965721c550eefdc184c5d914ec023e36527255613381e7L139-R143) **Documentation improvements:** * Expanded `libs/README.md` to include new sections describing the retry decorator, its configuration (including environment variables and Helm usage), and how the embedder and summarizer resolve their retry settings. [[1]](diffhunk://#diff-34194a117b05d75d22ca968cdb7d540839dc7a0eb33960fbca668b5a6ade87cbR11-R23) [[2]](diffhunk://#diff-34194a117b05d75d22ca968cdb7d540839dc7a0eb33960fbca668b5a6ade87cbR103-R128) [[3]](diffhunk://#diff-34194a117b05d75d22ca968cdb7d540839dc7a0eb33960fbca668b5a6ade87cbR201-R226) [[4]](diffhunk://#diff-34194a117b05d75d22ca968cdb7d540839dc7a0eb33960fbca668b5a6ade87cbR323-R372) * Minor documentation clarifications and code formatting improvements in `libs/README.md`. [[1]](diffhunk://#diff-34194a117b05d75d22ca968cdb7d540839dc7a0eb33960fbca668b5a6ade87cbL117-R147) [[2]](diffhunk://#diff-34194a117b05d75d22ca968cdb7d540839dc7a0eb33960fbca668b5a6ade87cbR76) [[3]](diffhunk://#diff-34194a117b05d75d22ca968cdb7d540839dc7a0eb33960fbca668b5a6ade87cbL159-R192) [[4]](diffhunk://#diff-34194a117b05d75d22ca968cdb7d540839dc7a0eb33960fbca668b5a6ade87cbR255) [[5]](diffhunk://#diff-34194a117b05d75d22ca968cdb7d540839dc7a0eb33960fbca668b5a6ade87cbR274) **Settings and type improvements:** * Extended `SummarizerSettings` to support optional retry-related fields, aligning with the new decorator's configuration model. These changes centralize and standardize retry logic across the stack, making it easier to tune reliability and rate-limiting behavior per environment and per component. --------- Co-authored-by: Copilot <[email protected]>
1 parent c9ae73e commit 62883be

File tree

18 files changed

+907
-55
lines changed

18 files changed

+907
-55
lines changed

infrastructure/rag/templates/_helpers.tpl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
{{- printf "%s-usecase-configmap" .Release.Name | trunc 63 | trimSuffix "-" -}}
1111
{{- end -}}
1212

13+
{{- define "configmap.retryDecoratorName" -}}
14+
{{- printf "%s-retry-decorator-configmap" .Release.Name | trunc 63 | trimSuffix "-" -}}
15+
{{- end -}}
16+
1317
{{- define "secret.usecaseName" -}}
1418
{{- printf "%s-usecase-secret" .Release.Name | trunc 63 | trimSuffix "-" -}}
1519
{{- end -}}

infrastructure/rag/templates/admin-backend/deployment.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ spec:
106106
name: {{ template "configmap.keyValueStoreName" . }}
107107
- configMapRef:
108108
name: {{ template "configmap.sourceUploaderName" . }}
109+
- configMapRef:
110+
name: {{ template "configmap.retryDecoratorName" . }}
109111
- secretRef:
110112
name: {{ template "secret.langfuseName" . }}
111113
- secretRef:

infrastructure/rag/templates/backend/deployment.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ spec:
131131
name: {{ template "configmap.fakeEmbedderName" . }}
132132
- configMapRef:
133133
name: {{ template "configmap.chatHistoryName" . }}
134+
- configMapRef:
135+
name: {{ template "configmap.retryDecoratorName" . }}
134136
- secretRef:
135137
name: {{ template "secret.langfuseName" . }}
136138
- secretRef:

infrastructure/rag/templates/configmap.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,12 @@ data:
2424
{{- range $key, $value := .Values.shared.envs.usecase }}
2525
{{ $key }}: {{ $value | quote }}
2626
{{- end }}
27+
---
28+
apiVersion: v1
29+
kind: ConfigMap
30+
metadata:
31+
name: {{ template "configmap.retryDecoratorName" . }}
32+
data:
33+
{{- range $key, $value := .Values.shared.envs.retryDecorator }}
34+
{{ $key }}: {{ $value | quote }}
35+
{{- end }}

infrastructure/rag/values.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,14 @@ backend:
197197
stackitEmbedder:
198198
STACKIT_EMBEDDER_MODEL: "intfloat/e5-mistral-7b-instruct"
199199
STACKIT_EMBEDDER_BASE_URL: https://api.openai-compat.model-serving.eu01.onstackit.cloud/v1
200+
# Retry settings (optional). If omitted, fall back to shared RETRY_DECORATOR_* values.
201+
STACKIT_EMBEDDER_MAX_RETRIES: "5"
202+
STACKIT_EMBEDDER_RETRY_BASE_DELAY: "0.5"
203+
STACKIT_EMBEDDER_RETRY_MAX_DELAY: "600"
204+
STACKIT_EMBEDDER_BACKOFF_FACTOR: "2"
205+
STACKIT_EMBEDDER_ATTEMPT_CAP: "6"
206+
STACKIT_EMBEDDER_JITTER_MIN: "0.05"
207+
STACKIT_EMBEDDER_JITTER_MAX: "0.25"
200208
ollama:
201209
OLLAMA_MODEL: "llama3.2:3b-instruct-fp16"
202210
OLLAMA_BASE_URL: "http://rag-ollama:11434"
@@ -319,6 +327,14 @@ adminBackend:
319327
summarizer:
320328
SUMMARIZER_MAXIMUM_INPUT_SIZE: "8000"
321329
SUMMARIZER_MAXIMUM_CONCURRENCY: "10"
330+
# Retry settings (optional). If omitted, fall back to shared RETRY_DECORATOR_* values.
331+
SUMMARIZER_MAX_RETRIES: "5"
332+
SUMMARIZER_RETRY_BASE_DELAY: "0.5"
333+
SUMMARIZER_RETRY_MAX_DELAY: "600"
334+
SUMMARIZER_BACKOFF_FACTOR: "2"
335+
SUMMARIZER_ATTEMPT_CAP: "6"
336+
SUMMARIZER_JITTER_MIN: "0.05"
337+
SUMMARIZER_JITTER_MAX: "0.25"
322338
ragapi:
323339
RAG_API_HOST: "http://backend:8080"
324340
chunker:
@@ -446,6 +462,14 @@ shared:
446462
s3:
447463
S3_ENDPOINT: http://rag-minio:9000
448464
S3_BUCKET: documents
465+
retryDecorator:
466+
RETRY_DECORATOR_MAX_RETRIES: "5"
467+
RETRY_DECORATOR_RETRY_BASE_DELAY: "0.5"
468+
RETRY_DECORATOR_RETRY_MAX_DELAY: "600"
469+
RETRY_DECORATOR_BACKOFF_FACTOR: "2"
470+
RETRY_DECORATOR_ATTEMPT_CAP: "6"
471+
RETRY_DECORATOR_JITTER_MIN: "0.05"
472+
RETRY_DECORATOR_JITTER_MAX: "0.25"
449473
usecase:
450474

451475

libs/README.md

Lines changed: 112 additions & 4 deletions
Large diffs are not rendered by default.

libs/admin-api-lib/src/admin_api_lib/dependency_container.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
from rag_core_lib.impl.settings.langfuse_settings import LangfuseSettings
6565
from rag_core_lib.impl.settings.ollama_llm_settings import OllamaSettings
6666
from rag_core_lib.impl.settings.rag_class_types_settings import RAGClassTypeSettings
67+
from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings
6768
from rag_core_lib.impl.settings.stackit_vllm_settings import StackitVllmSettings
6869
from rag_core_lib.impl.tracers.langfuse_traced_runnable import LangfuseTracedRunnable
6970
from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore
@@ -86,6 +87,7 @@ class DependencyContainer(DeclarativeContainer):
8687
key_value_store_settings = KeyValueSettings()
8788
summarizer_settings = SummarizerSettings()
8889
source_uploader_settings = SourceUploaderSettings()
90+
retry_decorator_settings = RetryDecoratorSettings()
8991

9092
key_value_store = Singleton(FileStatusKeyValueStore, key_value_store_settings)
9193
file_service = Singleton(S3Service, s3_settings=s3_settings)
@@ -136,7 +138,9 @@ class DependencyContainer(DeclarativeContainer):
136138
LangchainSummarizer,
137139
langfuse_manager=langfuse_manager,
138140
chunker=summary_text_splitter,
139-
semaphore=Singleton(AsyncThreadsafeSemaphore, summarizer_settings.maximum_concurrreny),
141+
semaphore=Singleton(AsyncThreadsafeSemaphore, summarizer_settings.maximum_concurrency),
142+
summarizer_settings=summarizer_settings,
143+
retry_decorator_settings=retry_decorator_settings,
140144
)
141145

142146
summary_enhancer = List(
Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
"""Contains settings for summarizer."""
22

3-
from pydantic import Field
4-
from pydantic_settings import BaseSettings
3+
from typing import Optional
4+
from pydantic import Field, PositiveInt, model_validator
5+
from pydantic_settings import BaseSettings, SettingsConfigDict
56

67

78
class SummarizerSettings(BaseSettings):
@@ -12,15 +13,74 @@ class SummarizerSettings(BaseSettings):
1213
----------
1314
maximum_input_size : int
1415
The maximum size of the input that the summarizer can handle. Default is 8000.
15-
maximum_concurrreny : int
16+
maximum_concurrency : int
1617
The maximum number of concurrent summarization processes. Default is 10.
18+
max_retries: Optional[PositiveInt]
19+
Total retries, not counting the initial attempt.
20+
retry_base_delay: Optional[float]
21+
Base delay in seconds for the first retry.
22+
retry_max_delay: Optional[float]
23+
Maximum delay cap in seconds for any single wait.
24+
backoff_factor: Optional[float]
25+
Exponential backoff factor (>= 1).
26+
attempt_cap: Optional[int]
27+
Cap for exponent growth (backoff_factor ** attempt_cap).
28+
jitter_min: Optional[float]
29+
Minimum jitter in seconds.
30+
jitter_max: Optional[float]
31+
Maximum jitter in seconds.
1732
"""
1833

19-
class Config:
20-
"""Config class for reading Fields from env."""
21-
22-
env_prefix = "SUMMARIZER_"
23-
case_sensitive = False
34+
model_config = SettingsConfigDict(env_prefix="SUMMARIZER_", case_sensitive=False)
2435

2536
maximum_input_size: int = Field(default=8000)
26-
maximum_concurrreny: int = Field(default=10)
37+
maximum_concurrency: int = Field(default=10)
38+
max_retries: Optional[PositiveInt] = Field(
39+
default=None,
40+
title="Max Retries",
41+
description="Total retries, not counting the initial attempt.",
42+
)
43+
retry_base_delay: Optional[float] = Field(
44+
default=None,
45+
ge=0,
46+
title="Retry Base Delay",
47+
description="Base delay in seconds for the first retry.",
48+
)
49+
retry_max_delay: Optional[float] = Field(
50+
default=None,
51+
gt=0,
52+
title="Retry Max Delay",
53+
description="Maximum delay cap in seconds for any single wait.",
54+
)
55+
backoff_factor: Optional[float] = Field(
56+
default=None,
57+
ge=1.0,
58+
title="Backoff Factor",
59+
description="Exponential backoff factor (>= 1).",
60+
)
61+
attempt_cap: Optional[int] = Field(
62+
default=None,
63+
ge=0,
64+
title="Attempt Cap",
65+
description="Cap for exponent growth (backoff_factor ** attempt_cap).",
66+
)
67+
jitter_min: Optional[float] = Field(
68+
default=None,
69+
ge=0.0,
70+
title="Jitter Min (s)",
71+
description="Minimum jitter in seconds.",
72+
)
73+
jitter_max: Optional[float] = Field(
74+
default=None,
75+
ge=0.0,
76+
title="Jitter Max (s)",
77+
description="Maximum jitter in seconds.",
78+
)
79+
80+
@model_validator(mode="after")
81+
def _check_relations(self) -> "SummarizerSettings":
82+
if not self.jitter_min or not self.jitter_max:
83+
return self
84+
if self.jitter_max < self.jitter_min:
85+
raise ValueError("jitter_max must be >= jitter_min")
86+
return self

libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
"""Module for the LangchainSummarizer class."""
22

3+
import asyncio
34
import logging
4-
import traceback
55
from typing import Optional
66

77
from langchain.text_splitter import RecursiveCharacterTextSplitter
88
from langchain_core.documents import Document
99
from langchain_core.runnables import Runnable, RunnableConfig, ensure_config
10+
from openai import APIConnectionError, APIError, APITimeoutError, RateLimitError
1011

12+
from admin_api_lib.impl.settings.summarizer_settings import SummarizerSettings
1113
from admin_api_lib.summarizer.summarizer import (
1214
Summarizer,
1315
SummarizerInput,
1416
SummarizerOutput,
1517
)
1618
from rag_core_lib.impl.langfuse_manager.langfuse_manager import LangfuseManager
19+
from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings
1720
from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore
21+
from rag_core_lib.impl.utils.retry_decorator import create_retry_decorator_settings, retry_with_backoff
1822

1923
logger = logging.getLogger(__name__)
2024

@@ -32,10 +36,13 @@ def __init__(
3236
langfuse_manager: LangfuseManager,
3337
chunker: RecursiveCharacterTextSplitter,
3438
semaphore: AsyncThreadsafeSemaphore,
39+
summarizer_settings: SummarizerSettings,
40+
retry_decorator_settings: RetryDecoratorSettings,
3541
):
3642
self._chunker = chunker
3743
self._langfuse_manager = langfuse_manager
3844
self._semaphore = semaphore
45+
self._retry_decorator_settings = create_retry_decorator_settings(summarizer_settings, retry_decorator_settings)
3946

4047
async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] = None) -> SummarizerOutput:
4148
"""
@@ -65,40 +72,46 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig]
6572
"""
6673
assert query, "Query is empty: %s" % query # noqa S101
6774
config = ensure_config(config)
68-
tries_remaining = config.get("configurable", {}).get("tries_remaining", 3)
69-
logger.debug("Tries remaining %d" % tries_remaining)
7075

71-
if tries_remaining < 0:
72-
raise Exception("Summary creation failed.")
7376
document = Document(page_content=query)
7477
langchain_documents = self._chunker.split_documents([document])
78+
logger.debug("Summarizing %d chunk(s)...", len(langchain_documents))
7579

76-
outputs = []
77-
for langchain_document in langchain_documents:
78-
async with self._semaphore:
79-
try:
80-
result = await self._create_chain().ainvoke({"text": langchain_document.page_content}, config)
81-
# Extract content from AIMessage if it's not already a string
82-
content = result.content if hasattr(result, "content") else str(result)
83-
outputs.append(content)
84-
except Exception as e:
85-
logger.error("Error in summarizing langchain doc: %s %s", e, traceback.format_exc())
86-
config["tries_remaining"] = tries_remaining - 1
87-
result = await self._create_chain().ainvoke({"text": langchain_document.page_content}, config)
88-
# Extract content from AIMessage if it's not already a string
89-
content = result.content if hasattr(result, "content") else str(result)
90-
outputs.append(content)
80+
# Fan out with concurrency, bounded by your semaphore inside _summarize_chunk
81+
tasks = [asyncio.create_task(self._summarize_chunk(doc.page_content, config)) for doc in langchain_documents]
82+
outputs = await asyncio.gather(*tasks)
9183

9284
if len(outputs) == 1:
9385
return outputs[0]
94-
summary = " ".join(outputs)
86+
87+
merged = " ".join(outputs)
88+
9589
logger.debug(
96-
"Reduced number of chars from %d to %d"
97-
% (len("".join([x.page_content for x in langchain_documents])), len(summary))
90+
"Reduced number of chars from %d to %d",
91+
len("".join([x.page_content for x in langchain_documents])),
92+
len(merged),
9893
)
99-
return await self.ainvoke(summary, config)
94+
return await self._summarize_chunk(merged, config)
10095

10196
def _create_chain(self) -> Runnable:
10297
return self._langfuse_manager.get_base_prompt(self.__class__.__name__) | self._langfuse_manager.get_base_llm(
10398
self.__class__.__name__
10499
)
100+
101+
def _retry_with_backoff_wrapper(self):
102+
return retry_with_backoff(
103+
settings=self._retry_decorator_settings,
104+
exceptions=(APIError, RateLimitError, APITimeoutError, APIConnectionError),
105+
rate_limit_exceptions=(RateLimitError,),
106+
logger=logger,
107+
)
108+
109+
async def _summarize_chunk(self, text: str, config: Optional[RunnableConfig]) -> SummarizerOutput:
110+
@self._retry_with_backoff_wrapper()
111+
async def _call(text: str, config: Optional[RunnableConfig]) -> SummarizerOutput:
112+
response = await self._create_chain().ainvoke({"text": text}, config)
113+
return response.content if hasattr(response, "content") else str(response)
114+
115+
# Hold the semaphore for the entire retry lifecycle
116+
async with self._semaphore:
117+
return await _call(text, config)

libs/rag-core-api/src/rag_core_api/dependency_container.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
from rag_core_lib.impl.settings.langfuse_settings import LangfuseSettings
6464
from rag_core_lib.impl.settings.ollama_llm_settings import OllamaSettings
6565
from rag_core_lib.impl.settings.rag_class_types_settings import RAGClassTypeSettings
66+
from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings
6667
from rag_core_lib.impl.settings.stackit_vllm_settings import StackitVllmSettings
6768
from rag_core_lib.impl.tracers.langfuse_traced_runnable import LangfuseTracedRunnable
6869
from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore
@@ -89,6 +90,7 @@ class DependencyContainer(DeclarativeContainer):
8990
stackit_embedder_settings = StackitEmbedderSettings()
9091
chat_history_settings = ChatHistorySettings()
9192
sparse_embedder_settings = SparseEmbedderSettings()
93+
retry_decorator_settings = RetryDecoratorSettings()
9294
chat_history_config.from_dict(chat_history_settings.model_dump())
9395

9496
class_selector_config.from_dict(rag_class_type_settings.model_dump() | embedder_class_type_settings.model_dump())
@@ -98,7 +100,7 @@ class DependencyContainer(DeclarativeContainer):
98100
ollama=Singleton(
99101
LangchainCommunityEmbedder, embedder=Singleton(OllamaEmbeddings, **ollama_embedder_settings.model_dump())
100102
),
101-
stackit=Singleton(StackitEmbedder, stackit_embedder_settings),
103+
stackit=Singleton(StackitEmbedder, stackit_embedder_settings, retry_decorator_settings),
102104
)
103105

104106
sparse_embedder = Singleton(FastEmbedSparse, **sparse_embedder_settings.model_dump())

0 commit comments

Comments
 (0)