PAGINATE Strategy¶
The PAGINATE strategy processes multi-page documents by handling each page independently and then intelligently merging the results, including sophisticated conflict resolution when pages contain overlapping information.

How It Works¶
Page Separation
- Identifies individual pages
- Preserves page metadata
- Maintains document structure
Parallel Processing
- Each page processed independently
- Uses full context window per page
- Handles page-specific content
Result Collection
- Gathers results from all pages
- Validates individual page results
- Prepares for merging
Conflict Resolution
- Detects overlapping information
- Resolves conflicts using confidence scores
- Maintains data consistency
Usage¶
from extract_thinker import Extractor
from extract_thinker.models.completion_strategy import CompletionStrategy
extractor = Extractor()
extractor.load_llm("gpt-4")
result = extractor.extract(
file_path,
ResponseModel,
completion_strategy=CompletionStrategy.PAGINATE
)
Benefits¶
- Cheaper: Reduced parallel context window would be cheaper than a long Concatenate Strategy
- Parallel Processing: Pages can be processed independently
- Conflict Resolution: Smart merging of results from different pages
- Scalability: Handles documents of any length
- Accuracy: Each page gets full context window attention
Implementation Details¶
Pagination Handler Implementation
The PaginationHandler implements the PAGINATE strategy:
import copy
from typing import Any, Dict, List, Optional, get_origin, get_args, Union
from pydantic import BaseModel, Field
from instructor.exceptions import IncompleteOutputException
from extract_thinker.completion_handler import CompletionHandler
from extract_thinker.utils import encode_image, json_to_formatted_string, make_all_fields_optional
import yaml
from concurrent.futures import ThreadPoolExecutor, as_completed
class ConflictResolution(BaseModel):
resolved_fields: Dict[str, Dict[str, Any]] = Field(
description="Dictionary of resolved field values with confidence scores",
default_factory=dict
)
class PaginationHandler(CompletionHandler):
def __init__(self, llm):
super().__init__(llm)
def _make_hashable(self, item: Any) -> Any:
"""Recursively convert a value to something hashable."""
if isinstance(item, dict):
return tuple(sorted((k, self._make_hashable(v)) for k, v in item.items()))
elif isinstance(item, list):
return tuple(self._make_hashable(x) for x in item)
return item
def handle(self,
content: List[Dict[str, Any]],
response_model: type[BaseModel],
vision: bool = False,
extra_content: Optional[str] = None) -> Any:
# Make fields optional to allow partial results
response_model_optional = make_all_fields_optional(response_model)
# Process pages in parallel
results = []
with ThreadPoolExecutor() as executor:
futures = []
for page in content:
# Build messages for each page
messages = self._build_messages(page, vision)
if extra_content:
self._add_extra_content(messages, extra_content)
# Submit page processing task
future = executor.submit(
self._process_page,
messages,
response_model_optional
)
futures.append(future)
# Collect results as they complete
for future in as_completed(futures):
try:
result = future.result()
results.append(result)
except Exception as e:
# Log error but continue processing other pages
print(f"Error processing page: {str(e)}")
if not results:
raise ValueError("No valid results obtained from any page")
# Pair up the pages with their results for context
pages_data = list(zip(content, results))
return self._merge_results(results, response_model, pages_data)
def _process_page(self, messages: List[Dict[str, Any]], response_model: type[BaseModel]) -> Any:
"""Process a single page with retry logic for incomplete responses"""
try:
return self.llm.request(messages, response_model)
except IncompleteOutputException as e:
# Handle partial response
partial_result = self._handle_partial_response(e, messages, response_model)
partial_dict = partial_result.model_dump()
if self._has_conflicts(partial_dict, response_model):
# We'll resolve conflicts later after merging all pages
return partial_result
return partial_result
def _merge_results(self, results: List[Any], response_model: type[BaseModel], pages_data: List[Any]) -> Any:
"""Merge results from multiple pages into a dictionary, detect conflicts, resolve if needed, then return model."""
# Collect all values for each field
field_values = {}
for _, result in pages_data:
result_dict = result.model_dump()
for field_name, field_value in result_dict.items():
field_values.setdefault(field_name, []).append(field_value)
# Merge fields
merged = {}
for field_name, values in field_values.items():
# Get the annotated type from the response model
field_type = response_model.model_fields[field_name].annotation if field_name in response_model.model_fields else None
non_null_values = [v for v in values if v is not None]
if field_type and get_origin(field_type) is list:
# Merge list fields using a more sophisticated approach
merged_list = self._merge_list_field(field_name, values, field_type)
merged[field_name] = merged_list
else:
# Scalar field handling
if len(non_null_values) == 0:
# If the field is expected to be a string, default to an empty string.
if field_type == str or (get_origin(field_type) is Union and str in get_args(field_type)):
merged[field_name] = ""
else:
continue
else:
# Build a mapping from the hashable version of each candidate to the original candidate.
distinct_map = {}
for candidate in non_null_values:
key = self._make_hashable(candidate)
if key not in distinct_map:
distinct_map[key] = candidate
distinct_values = list(distinct_map.values())
if len(distinct_values) == 1:
merged[field_name] = distinct_values[0]
else:
# Store conflicts in a special structure
merged[field_name] = {
"_conflict": True,
"candidates": distinct_values
}
# Check for conflicts and resolve if necessary
if self._has_conflicts(merged, response_model):
merged = self._resolve_conflicts(merged, response_model, pages_data, field_values)
# Clean merged dictionary to ensure it's compatible with the response model
merged = self._clean_merged_dict(merged, response_model)
# Filter out any keys with a None value,
# now every required field (e.g., a string like "thinking") will be non-null.
merged = {k: v for k, v in merged.items() if v is not None}
return response_model(**merged)
def _merge_list_field(self, field_name: str, values: List[Any], field_type: Any) -> List[Any]:
"""
Merge list fields from multiple pages. If the list is a list of Pydantic models,
we try to merge based on a unique key field (e.g. `country` for countries, `region` for regions).
If it's not a list of models or no unique key is found, we just concatenate and
rely on conflict resolution later.
"""
# Flatten all lists
flattened = []
for v in values:
if isinstance(v, list):
flattened.extend(v)
elif v is not None:
flattened.append(v)
# Attempt to detect if we're dealing with a Pydantic model list
args = get_args(field_type)
if args:
model_type = args[0]
if hasattr(model_type, '__fields__'):
# We have a Pydantic model class in the list
# Identify a unique key to merge on.
candidate_keys = ['country', 'region', 'id', 'name']
unique_key = None
for ck in candidate_keys:
if ck in model_type.__fields__:
unique_key = ck
break
if unique_key:
# Merge by unique key using case-insensitive comparison
merged_by_key = {}
for item in flattened:
if hasattr(item, 'model_dump'):
item_dict = item.model_dump()
else:
item_dict = item
key_val = item_dict.get(unique_key)
if key_val is not None:
normalized_key = str(key_val).lower()
if normalized_key in merged_by_key:
merged_by_key[normalized_key] = self._merge_two_models(
merged_by_key[normalized_key],
item_dict
)
else:
merged_by_key[normalized_key] = item_dict
else:
# If no unique key found for this item, just store it uniquely
merged_by_key[f"no_key_{len(merged_by_key)}"] = item_dict
return list(merged_by_key.values())
else:
# No unique key found, just return flattened list
return flattened
else:
# Not a pydantic model list
return flattened
else:
# Not a parametrized list type
return flattened
def _merge_two_models(self, existing: Dict[str, Any], new: Dict[str, Any]) -> Dict[str, Any]:
"""
Merge two dictionaries representing the same entity. For scalar values,
prefer the existing if both are non-null or prefer non-null values.
For lists, combine them.
"""
merged = existing.copy()
for k, v in new.items():
if k not in merged or merged[k] is None:
merged[k] = v
else:
if isinstance(merged[k], list) and isinstance(v, list):
# Extend lists
merged[k].extend(v)
# If there's a scalar conflict, we can keep the first non-null,
# or implement custom conflict handling here.
# For now, do nothing if both have a value, keep existing.
return merged
def _clean_merged_dict(self, merged: Dict[str, Any], response_model: type[BaseModel]) -> Dict[str, Any]:
"""Clean the merged dictionary after conflict resolution to remove any leftover special structures
and ensure values are compatible with the response model."""
cleaned = {}
for field_name, field_value in merged.items():
# If there's still a conflict structure, remove it or handle it
if isinstance(field_value, dict) and field_value.get("_conflict"):
# If somehow unresolved (shouldn't happen), just pick one candidate or None
candidates = field_value.get("candidates", [])
cleaned[field_name] = candidates[0] if candidates else None
else:
cleaned[field_name] = field_value
# Pydantic will do additional type coercion upon instantiation
return cleaned
def _has_conflicts(self, result_dict: Dict[str, Any], response_model: type[BaseModel]) -> bool:
"""Check if result dictionary has any conflicting fields."""
for field_name, field_value in result_dict.items():
if field_name not in response_model.model_fields:
continue
field_type = response_model.model_fields[field_name].annotation
# Check for special conflict dictionary (scalar conflict)
if isinstance(field_value, dict) and field_value.get("_conflict"):
return True
# Check list field duplicates (if needed)
# Here you could implement more robust checks if required.
return False
def _identify_conflicts(self, result_dict: Dict[str, Any], response_model: type[BaseModel]) -> Dict[str, Any]:
"""Identify conflicting fields in the result dictionary."""
conflicts = {}
for field_name, field_value in result_dict.items():
if field_name not in response_model.model_fields:
continue
field_type = response_model.model_fields[field_name].annotation
# Scalar conflict
if isinstance(field_value, dict) and field_value.get("_conflict"):
conflicts[field_name] = field_value["candidates"]
# Could add checks for list conflicts if needed.
return conflicts
def _resolve_conflicts(self, result_dict: Dict[str, Any], response_model: type[BaseModel],
pages_data: List[Any], field_values: Dict[str, List[Any]]) -> Dict[str, Any]:
"""Resolve conflicts in the dictionary using the LLM."""
conflicts = self._identify_conflicts(result_dict, response_model)
if not conflicts:
return result_dict
resolved = self._request_conflict_resolution(conflicts, pages_data, field_values)
return self._merge_resolved_conflicts(result_dict, resolved, response_model)
def _request_conflict_resolution(
self,
conflicts: Dict[str, List[Any]],
pages_data: List[Any],
field_values: Dict[str, List[Any]]
) -> Dict[str, Dict[str, Any]]:
"""Request LLM to resolve conflicts."""
message_content = self._build_conflict_resolution_prompt(conflicts, pages_data, field_values)
messages = [
{
"role": "system",
"content": "You are a server API that resolves field conflicts. You have access to the original page contents and the conflicting values extracted from them."
},
{
"role": "user",
"content": message_content
}
]
try:
response: ConflictResolution = self.llm.request(messages, ConflictResolution)
return response.resolved_fields
except Exception as e:
raise ValueError(f"Failed to resolve conflicts: {str(e)}. This may happen if the context was too big or the LLM couldn't resolve the conflicts.")
def _build_conflict_resolution_prompt(
self,
conflicts: Dict[str, List[Any]],
pages_data: List[Any],
field_values: Dict[str, List[Any]]
) -> Union[str, List[Dict[str, Any]]]:
"""Build prompt for conflict resolution with context from all pages."""
# Check if any page has vision content
has_vision_content = any(
isinstance(page_content, dict) and ('image' in page_content or 'images' in page_content)
for page_content, _ in pages_data
)
if has_vision_content:
# Build vision-compatible message content
message_content = []
# Add initial text content
intro_text = [
"Please resolve conflicts in these fields by choosing the correct value and providing a confidence score (1-10).",
"You have the contents of each page that contributed data, and the conflicting values they produced.",
"Return JSON in this format:\n{\n \"resolved_fields\": {\n \"field_name\": {\"value\": \"chosen_value\", \"confidence\": 9}\n }\n}\n",
"Conflicts to resolve:",
str(conflicts),
"\nHere are the original pages and their extracted values:"
]
message_content.append({
"type": "text",
"text": "\n".join(intro_text)
})
# Add each page's content and images
for i, (page_content, page_result) in enumerate(pages_data):
page_text = [f"\n--- Page {i+1} ---", "Original page content:"]
if isinstance(page_content, dict):
# Add text content if available
content_data = self._process_content_data(page_content)
if content_data:
page_text.append(content_data)
# Add extracted values
page_text.append("Extracted values for all fields on this page:")
for field_name, values_for_field in field_values.items():
page_value = values_for_field[i] if i < len(values_for_field) else None
page_text.append(f"{field_name}: {page_value}")
message_content.append({
"type": "text",
"text": "\n".join(page_text)
})
# Add images if present
if 'image' in page_content:
message_content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encode_image(page_content['image'])}"
}
})
elif 'images' in page_content:
for img in page_content['images']:
message_content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encode_image(img)}"
}
})
return message_content
else:
# Build regular text prompt
prompt_parts = []
prompt_parts.extend([
"Please resolve conflicts in these fields by choosing the correct value and providing a confidence score (1-10).",
"You have the contents of each page that contributed data, and the conflicting values they produced.",
"Return JSON in this format:\n{\n \"resolved_fields\": {\n \"field_name\": {\"value\": \"chosen_value\", \"confidence\": 9}\n }\n}\n",
"Conflicts to resolve:",
str(conflicts),
"\nHere are the original pages and their extracted values:"
])
for i, (page_content, page_result) in enumerate(pages_data):
prompt_parts.extend([
f"\n--- Page {i+1} ---",
"Original page content:",
yaml.dump(page_content) if isinstance(page_content, dict) else str(page_content),
"Extracted values for all fields on this page:"
])
for field_name, values_for_field in field_values.items():
page_value = values_for_field[i] if i < len(values_for_field) else None
prompt_parts.append(f"{field_name}: {page_value}")
return "\n".join(prompt_parts)
def _process_content_data(self, content: Union[Dict[str, Any], List[Any], str]) -> Optional[str]:
"""Process content data by filtering out images and converting to a string."""
if isinstance(content, dict):
filtered_content = {
k: v for k, v in content.items()
if k not in ('images', 'image') and not hasattr(v, 'read')
}
if filtered_content.get("is_spreadsheet", False):
content_str = json_to_formatted_string(filtered_content.get("data", {}))
else:
content_str = yaml.dump(filtered_content, default_flow_style=True)
return content_str
return None
def _merge_resolved_conflicts(
self,
original: Dict[str, Any],
resolved: Dict[str, Dict[str, Any]],
response_model: type[BaseModel]
) -> Dict[str, Any]:
"""Merge resolved conflicts back into the dictionary."""
result_dict = copy.deepcopy(original)
for field_name, resolution in resolved.items():
if field_name in result_dict:
result_dict[field_name] = resolution["value"]
return result_dict
def _handle_partial_response(
self,
exception: IncompleteOutputException,
messages: List[Dict[str, Any]],
response_model: type[BaseModel]
) -> Any:
"""Handle partial response by continuing the request"""
partial_content = exception.last_completion.choices[0].message.content
continuation_messages = self._build_continuation_messages(messages, partial_content)
try:
return self.llm.request(continuation_messages, response_model)
except Exception as e:
raise ValueError(f"Failed to complete partial response: {str(e)}")
def _build_continuation_messages(
self,
messages: List[Dict[str, Any]],
partial_content: str
) -> List[Dict[str, Any]]:
"""Build messages for continuation request."""
continuation_messages = copy.deepcopy(messages)
# Add partial response as assistant message
continuation_messages.append({
"role": "assistant",
"content": partial_content
})
# Add continuation prompt
continuation_messages.append({
"role": "user",
"content": "## CONTINUE JSON"
})
return continuation_messages
def _build_messages(self, content: Any, vision: bool) -> List[Dict[str, Any]]:
"""Build messages for LLM request."""
system_message = {
"role": "system",
"content": "You are a server API that receives document information and returns specific fields in JSON format."
}
if vision:
message_content = self._build_vision_content(content)
messages = [
system_message,
{
"role": "user",
"content": message_content
}
]
else:
message_content = self._build_text_content(content)
messages = [
system_message,
{
"role": "user",
"content": message_content
}
]
return messages
def _build_vision_content(self, content: Any) -> List[Dict[str, Any]]:
"""Build content for vision request."""
message_content = []
# If there's textual 'content', push it first
if isinstance(content, dict) and "content" in content:
message_content.append({
"type": "text",
"text": f"##Content\n\n{content['content']}"
})
# Now handle multiple images
if isinstance(content, dict):
images = []
if "images" in content and isinstance(content["images"], list):
images.extend(content["images"])
if "image" in content and content["image"] is not None:
images.append(content["image"])
for img in images:
if img:
message_content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encode_image(img)}"
}
})
return message_content
def _build_text_content(self, content: Any) -> str:
"""Build content for text request."""
if isinstance(content, dict):
return f"##Content\n\n{yaml.dump(content)}"
elif isinstance(content, str):
return f"##Content\n\n{content}"
else:
return f"##Content\n\n{str(content)}"
def _add_extra_content(self, messages: List[Dict[str, Any]], extra_content: str) -> None:
"""Add extra content to messages."""
messages.insert(1, {
"role": "user",
"content": f"##Extra Content\n\n{extra_content}"
})
When to Use¶
PAGINATE is the best choice when:
Context window is small
- For local LLMs with smaller context windows (e.g Llama 3.3 8k context window).
The content is too Big
- When the file will not fit in the entire context window (e.g 500 page document)
Model Accuracy
- Sometimes LLMs can lose focus when the context is too big, Paginate strategy will solve this problem