Concatenate Strategy¶
The Concatenate strategy is designed to handle content that exceeds the LLM's context window by splitting it into manageable chunks, processing them separately, and then combining the results.

How It Works¶
1. Initial Request
- Sends the content to the LLM with the desired response structure
- Monitors the LLM's response completion status
2. Continuation Process
- If response is truncated (finish_reason="length"), builds a continuation request
- Includes previous partial response for context
- Continues until LLM indicates completion
3. Validation
- When LLM indicates completion (finish_reason="stop")
- Validates the combined JSON response
- Raises error if invalid JSON is received on completion
4. Response Processing
- Combines all response parts
- Validates against the specified response model
- Returns structured data
Usage¶
from extract_thinker import Extractor
from extract_thinker.models.completion_strategy import CompletionStrategy
extractor = Extractor()
extractor.load_llm("gpt-4o")
result = extractor.extract(
file_path,
ResponseModel,
completion_strategy=CompletionStrategy.CONCATENATE
)
Benefits¶
- Handles Large Content: Can process documents larger than the output context window
- Maintains Context: Attempts to keep related content together
Implementation Details¶
Concatenation Handler Implementation
The ConcatenationHandler implements the CONCATENATE strategy:
import copy
import yaml
import json
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
from extract_thinker.completion_handler import CompletionHandler
from extract_thinker.utils import encode_image, add_classification_structure
class ConcatenationHandler(CompletionHandler):
def __init__(self, llm):
super().__init__(llm)
self.json_parts = []
def _is_valid_json_continuation(self, response: str) -> bool:
"""Check if the response is a valid JSON continuation."""
cleaned_response = response.strip()
# Check if response contains JSON markers
has_json_markers = (
"```json" in cleaned_response or
"{" in cleaned_response or
"[" in cleaned_response
)
return has_json_markers
def handle(self, content: Any, response_model: type[BaseModel], vision: bool = False, extra_content: Optional[str] = None) -> Any:
self.json_parts = []
messages = self._build_messages(content, vision, response_model)
if extra_content:
self._add_extra_content(messages, extra_content)
retry_count = 0
max_retries = 3
while True:
try:
response = self.llm.raw_completion(messages)
# Validate if it's a proper JSON continuation
if not self._is_valid_json_continuation(response):
retry_count += 1
if retry_count >= max_retries:
raise ValueError("Maximum retries reached with invalid JSON continuations")
continue
self.json_parts.append(response)
# Try to process and validate the JSON
result = self._process_json_parts(response_model)
return result
except ValueError as e:
if retry_count >= max_retries:
raise ValueError(f"Maximum retries reached: {str(e)}")
retry_count += 1
messages = self._build_continuation_messages(messages, response)
def _process_json_parts(self, response_model: type[BaseModel]) -> Any:
"""Process collected JSON parts into a complete response."""
if not self.json_parts:
raise ValueError("No JSON content collected")
processed_parts = []
for content in self.json_parts:
# Remove code fences and extraneous formatting artifacts
cleaned = (content
.replace('```json', '')
.replace('```', '')
.replace('\njson', '')
.replace('\n', ' ')
.strip())
# If there's still something left after cleaning, keep it
if cleaned:
processed_parts.append(cleaned)
if not processed_parts:
raise ValueError("No valid JSON content found in the response")
# Combine all cleaned parts into one string
combined_json = "".join(processed_parts)
# Attempt to parse the combined JSON
try:
parsed = json.loads(combined_json)
except json.JSONDecodeError as e:
raise ValueError(f"Failed to parse combined JSON: {str(e)}\nJSON: {combined_json}")
# Validate the parsed JSON against the Pydantic model
try:
return response_model.model_validate(parsed)
except Exception as e:
raise ValueError(f"Failed to validate parsed JSON: {str(e)}\nJSON: {combined_json}")
def _build_continuation_messages(
self,
messages: List[Dict[str, Any]],
partial_content: str
) -> List[Dict[str, Any]]:
"""Build messages for continuation request."""
continuation_messages = copy.deepcopy(messages)
# Add partial response as assistant message
continuation_messages.append({
"role": "assistant",
"content": partial_content
})
# Add continuation prompt
continuation_messages.append({
"role": "user",
"content": "## CONTINUE JSON"
})
return continuation_messages
def _build_messages(self, content: Any, vision: bool, response_model: type[BaseModel]) -> List[Dict[str, Any]]:
"""Build messages for LLM request."""
system_message = {
"role": "system",
"content": (
"You are a server API that receives document information and returns specific fields in JSON format.\n"
"Please follow the response structure exactly as specified below.\n\n"
f"{add_classification_structure(response_model)}\n"
)
}
if vision:
message_content = self._build_vision_content(content)
messages = [
system_message,
{
"role": "user",
"content": message_content
}
]
else:
message_content = self._build_text_content(content)
messages = [
system_message,
{
"role": "user",
"content": message_content
}
]
return messages
def _build_vision_content(self, content: Any) -> List[Dict[str, Any]]:
"""Build content for vision request."""
message_content = []
if isinstance(content, list):
# Handle list of content items
for item in content:
# Add text content if available
if isinstance(item, dict) and "content" in item:
message_content.append({
"type": "text",
"text": f"##Content\n\n{item['content']}"
})
# Add images if available
if isinstance(item, dict):
images = []
if "images" in item and isinstance(item["images"], list):
images.extend(item["images"])
if "image" in item and item["image"] is not None:
images.append(item["image"])
for img in images:
if img:
message_content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encode_image(img)}"
}
})
else:
# Handle single item
if isinstance(content, dict):
# Add text content if available
if "content" in content:
message_content.append({
"type": "text",
"text": f"##Content\n\n{content['content']}"
})
# Add images
images = []
if "images" in content and isinstance(content["images"], list):
images.extend(content["images"])
if "image" in content and content["image"] is not None:
images.append(content["image"])
for img in images:
if img:
message_content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encode_image(img)}"
}
})
return message_content
def _build_text_content(self, content: Any) -> str:
"""Build content for text request."""
if isinstance(content, dict):
return f"##Content\n\n{yaml.dump(content)}"
elif isinstance(content, str):
return f"##Content\n\n{content}"
else:
return f"##Content\n\n{str(content)}"
def _add_extra_content(self, messages: List[Dict[str, Any]], extra_content: str) -> None:
"""Add extra content to messages."""
messages.insert(1, {
"role": "user",
"content": f"##Extra Content\n\n{extra_content}"
})
When to Use¶
CONCATENATE is the best choice when:
Context window is large
- For models like gpt-4o, claude-3-5-sonnet, etc.
The content is not too large
- Should be used for documents that are not too large (e.g. 500 pages)
For handling bigger documents, consider using the PAGINATE strategy.