Chapter 11: Processing Unstructured Data at Scale¶
Learning Objectives¶
By the end of this chapter, you will be able to: - Design pipelines for processing large volumes of unstructured OSINT data - Apply NLP techniques for entity extraction, classification, and summarization - Process and analyze images and documents at scale - Build data normalization and enrichment workflows - Use vector databases and embeddings for semantic search - Handle multilingual content effectively - Design scalable processing architectures for OSINT data
11.1 The Unstructured Data Problem¶
The fundamental challenge of modern OSINT is not finding data — it is processing data. A comprehensive investigation of a major corporation might involve:
- Thousands of news articles across dozens of languages
- Hundreds of SEC and regulatory filings
- Thousands of social media posts
- Dozens of court documents
- Satellite imagery from multiple dates
- Audio recordings from interviews or transcribed calls
A human analyst reading at 250 words per minute would need years to process this volume. The intelligence value of the data does not diminish because there's too much of it to read — it becomes inaccessible.
AI-powered processing pipelines address this problem. They do not replace human analysis — they make human analysis possible by transforming unstructured data into structured, searchable, analyzable formats.
11.2 Natural Language Processing for OSINT¶
The NLP Toolkit¶
The core NLP tasks relevant to OSINT processing are:
Named Entity Recognition (NER): Identifying and classifying entities — people, organizations, locations, dates, financial amounts, products — within unstructured text. The output is structured entity data that can be stored, searched, and analyzed.
Relation Extraction: Identifying semantic relationships between entities: "X works for Y," "X is located in Y," "X acquired Y." Converts unstructured text into relationship graphs.
Coreference Resolution: Determining that "he," "the CEO," "Mr. Smith," and "John Smith" in a document all refer to the same entity. Critical for building coherent entity profiles from documents.
Text Classification: Categorizing documents or text passages by topic, sentiment, relevance, or other criteria. Used to filter and prioritize large document collections.
Summarization: Generating concise summaries of long documents. Both extractive (selecting key sentences) and abstractive (generating new summary text) approaches are used.
Information Extraction: The combined task of extracting structured information (events, facts, relationships) from unstructured text.
Production NLP with spaCy¶
spaCy is the production-grade Python NLP library. It provides efficient, accurate NLP pipelines suitable for processing large document volumes.
import spacy
from spacy import displacy
from collections import defaultdict
import json
# Load English model (install with: python -m spacy download en_core_web_trf)
# en_core_web_trf is transformer-based, more accurate but slower
# en_core_web_lg is large statistical model, faster
nlp = spacy.load("en_core_web_trf")
def extract_entities_from_document(text, source_url=None):
"""
Extract named entities from text with context preservation
"""
doc = nlp(text)
entities = defaultdict(list)
for ent in doc.ents:
# Get surrounding context
start = max(0, ent.start_char - 100)
end = min(len(text), ent.end_char + 100)
context = text[start:end].replace('\n', ' ')
entities[ent.label_].append({
'text': ent.text,
'label': ent.label_,
'label_description': spacy.explain(ent.label_),
'start_char': ent.start_char,
'end_char': ent.end_char,
'context': context,
'confidence': ent._.get('score', None) if hasattr(ent._, 'score') else None,
'source_url': source_url,
})
return dict(entities)
def extract_relations_from_text(text):
"""
Extract subject-verb-object relations from text
"""
doc = nlp(text)
relations = []
for sent in doc.sents:
for token in sent:
if token.dep_ in ('nsubj', 'nsubjpass'):
subject = token.text
verb = token.head.text
# Find objects
objects = [
child.text for child in token.head.children
if child.dep_ in ('dobj', 'pobj', 'attr', 'acomp')
]
if objects:
for obj in objects:
relations.append({
'subject': subject,
'predicate': verb,
'object': obj,
'sentence': sent.text.strip()
})
return relations
def batch_process_documents(documents, batch_size=50):
"""
Process multiple documents efficiently using spaCy's batch processing
documents: list of {'text': str, 'url': str, 'date': str}
"""
texts = [d['text'] for d in documents]
all_results = []
# Process in batches for efficiency
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i+batch_size]
batch_docs = documents[i:i+batch_size]
for doc_text, doc_meta, spacy_doc in zip(
batch_texts,
batch_docs,
nlp.pipe(batch_texts, batch_size=batch_size)
):
entities = defaultdict(list)
for ent in spacy_doc.ents:
entities[ent.label_].append(ent.text)
all_results.append({
'url': doc_meta.get('url'),
'date': doc_meta.get('date'),
'entities': dict(entities),
'sentence_count': len(list(spacy_doc.sents)),
'word_count': len(spacy_doc),
})
return all_results
Transformer-Based NLP¶
For higher accuracy, particularly on specialized domains, transformer-based models from Hugging Face provide state-of-the-art NLP capability:
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification
import torch
class InvestigativeNLPPipeline:
"""Production NLP pipeline for investigative text processing"""
def __init__(self):
# Named entity recognition
self.ner = pipeline(
"ner",
model="dslim/bert-large-NER",
aggregation_strategy="simple",
device=0 if torch.cuda.is_available() else -1
)
# Zero-shot classification for document categorization
self.classifier = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device=0 if torch.cuda.is_available() else -1
)
# Sentiment analysis
self.sentiment = pipeline(
"sentiment-analysis",
model="cardiffnlp/twitter-roberta-base-sentiment-latest",
device=0 if torch.cuda.is_available() else -1
)
# Summarization
self.summarizer = pipeline(
"summarization",
model="facebook/bart-large-cnn",
device=0 if torch.cuda.is_available() else -1
)
def extract_entities(self, text):
"""Extract named entities with confidence scores"""
# Handle long texts by chunking
max_length = 512
chunks = [text[i:i+max_length] for i in range(0, len(text), max_length)]
all_entities = []
for chunk in chunks:
entities = self.ner(chunk)
all_entities.extend(entities)
# Deduplicate and aggregate
entity_map = {}
for ent in all_entities:
key = (ent['word'].lower(), ent['entity_group'])
if key not in entity_map or ent['score'] > entity_map[key]['score']:
entity_map[key] = ent
return list(entity_map.values())
def classify_document(self, text, candidate_labels):
"""
Classify document into predefined categories
Useful for: relevance scoring, topic categorization, alert prioritization
"""
# Summarize first if text is long
if len(text.split()) > 300:
summary = self.summarizer(text[:1024], max_length=200, min_length=50)[0]['summary_text']
else:
summary = text
result = self.classifier(summary, candidate_labels, multi_label=True)
return dict(zip(result['labels'], result['scores']))
def analyze_sentiment_by_entity(self, text, target_entity):
"""Analyze sentiment specifically about a target entity"""
# Find sentences mentioning the entity
sentences = [s.strip() for s in text.split('.') if target_entity.lower() in s.lower()]
sentiments = []
for sentence in sentences[:20]: # Limit for performance
if len(sentence) > 10:
sentiment = self.sentiment(sentence[:512])[0]
sentiments.append({
'sentence': sentence,
'label': sentiment['label'],
'score': sentiment['score']
})
if not sentiments:
return {'entity': target_entity, 'found': False}
avg_positive = sum(s['score'] for s in sentiments if 'POS' in s['label']) / len(sentiments)
avg_negative = sum(s['score'] for s in sentiments if 'NEG' in s['label']) / len(sentiments)
return {
'entity': target_entity,
'sentence_count': len(sentiments),
'avg_positive_sentiment': avg_positive,
'avg_negative_sentiment': avg_negative,
'examples': sentiments[:3]
}
def summarize_document_set(self, documents, max_words=500):
"""Generate a coherent summary across multiple documents"""
# Concatenate document summaries
individual_summaries = []
for doc in documents[:10]: # Limit input size
if len(doc.split()) > 100:
summary = self.summarizer(doc[:1024], max_length=100, min_length=30)[0]['summary_text']
individual_summaries.append(summary)
combined = ' '.join(individual_summaries)
# Generate meta-summary
if len(combined.split()) > 200:
final_summary = self.summarizer(combined[:1024], max_length=max_words, min_length=100)[0]['summary_text']
else:
final_summary = combined
return final_summary
11.3 Document Processing at Scale¶
OSINT investigations frequently involve processing large volumes of PDF documents, court filings, news articles, and regulatory submissions. Industrial-strength document processing requires handling diverse formats reliably.
PDF Processing¶
import pdfplumber
import pdfminer
from pathlib import Path
import re
import json
def extract_text_from_pdf(pdf_path, preserve_layout=False):
"""
Extract text from PDF with optional layout preservation
"""
results = {
'file': str(pdf_path),
'pages': [],
'full_text': '',
'metadata': {}
}
try:
with pdfplumber.open(pdf_path) as pdf:
# Extract metadata
results['metadata'] = pdf.metadata or {}
for page_num, page in enumerate(pdf.pages, 1):
page_data = {
'page': page_num,
'text': page.extract_text() or '',
'tables': [],
}
# Extract tables
tables = page.extract_tables()
for table in tables:
if table:
cleaned_table = [
[cell if cell else '' for cell in row]
for row in table
]
page_data['tables'].append(cleaned_table)
results['pages'].append(page_data)
results['full_text'] += page_data['text'] + '\n'
except Exception as e:
results['error'] = str(e)
return results
def batch_process_pdfs(pdf_directory, output_file=None):
"""Process all PDFs in a directory"""
pdf_paths = list(Path(pdf_directory).glob('**/*.pdf'))
print(f"Processing {len(pdf_paths)} PDFs...")
all_results = []
for i, path in enumerate(pdf_paths):
print(f" Processing {i+1}/{len(pdf_paths)}: {path.name}")
result = extract_text_from_pdf(path)
all_results.append(result)
if output_file:
with open(output_file, 'w') as f:
json.dump(all_results, f, indent=2)
return all_results
def extract_financial_data_from_document(text):
"""
Extract financial amounts, percentages, and dates from document text
"""
patterns = {
'dollar_amounts': r'\$[\d,]+(?:\.\d{2})?(?:\s*(?:million|billion|trillion))?',
'percentages': r'\d+(?:\.\d+)?%',
'dates': r'(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},\s+\d{4}',
'quarters': r'Q[1-4]\s+\d{4}',
'fiscal_years': r'(?:fiscal year|FY)\s+\d{4}',
}
results = {}
for pattern_name, pattern in patterns.items():
matches = re.findall(pattern, text, re.IGNORECASE)
results[pattern_name] = list(set(matches)) # Deduplicate
return results
OCR for Image-Based Documents¶
Many OSINT documents are image-based scans rather than text PDFs. OCR (Optical Character Recognition) converts these to searchable text.
import pytesseract
from PIL import Image
import cv2
import numpy as np
from pathlib import Path
def preprocess_image_for_ocr(image_path):
"""Preprocess image to improve OCR accuracy"""
# Load image
img = cv2.imread(str(image_path))
# Convert to grayscale
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Denoise
denoised = cv2.fastNlMeansDenoising(gray, None, h=10)
# Adaptive thresholding for better black/white separation
thresh = cv2.adaptiveThreshold(
denoised, 255,
cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2
)
# Deskew if needed
coords = np.column_stack(np.where(thresh > 0))
angle = cv2.minAreaRect(coords)[-1]
if angle < -45:
angle = -(90 + angle)
else:
angle = -angle
if abs(angle) > 0.5:
(h, w) = thresh.shape
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
thresh = cv2.warpAffine(thresh, M, (w, h),
flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE)
return thresh
def ocr_document(image_path, language='eng', improve_quality=True):
"""
Extract text from an image document using Tesseract OCR
"""
if improve_quality:
preprocessed = preprocess_image_for_ocr(image_path)
img = Image.fromarray(preprocessed)
else:
img = Image.open(image_path)
# Tesseract configuration for documents
config = '--psm 3 --oem 3 -l eng'
text = pytesseract.image_to_string(img, lang=language, config=config)
data = pytesseract.image_to_data(img, lang=language, output_type=pytesseract.Output.DICT)
# Calculate confidence
confidences = [int(c) for c in data['conf'] if c != '-1']
avg_confidence = sum(confidences) / len(confidences) if confidences else 0
return {
'text': text,
'average_confidence': avg_confidence,
'word_count': len(text.split()),
}
11.4 Vector Embeddings and Semantic Search¶
Traditional keyword search requires knowing what words to search for. Semantic search using vector embeddings enables finding conceptually related content even when exact keywords differ.
An embedding is a numerical representation of text (or an image) in a high-dimensional vector space. Documents with similar meaning are close together in this space. Given a query, semantic search finds the closest documents regardless of exact wording.
For OSINT investigations: - Finding all news articles discussing the same event without knowing the exact terminology used - Finding social media posts about a topic across multiple languages - Matching entity descriptions across documents that use different names for the same entity - Identifying semantically similar documents in a large collection
from sentence_transformers import SentenceTransformer
import numpy as np
import sqlite3
import json
from typing import List, Tuple
class OSINTVectorStore:
"""Vector database for semantic search of OSINT documents"""
def __init__(self, db_path='osint_vectors.db', model_name='all-MiniLM-L6-v2'):
self.model = SentenceTransformer(model_name)
self.db_path = db_path
self._init_db()
def _init_db(self):
"""Initialize SQLite database with vector storage"""
conn = sqlite3.connect(self.db_path)
conn.execute('''
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_url TEXT,
title TEXT,
date TEXT,
content TEXT,
entities TEXT,
embedding BLOB,
metadata TEXT
)
''')
conn.commit()
conn.close()
def add_document(self, content: str, source_url: str, title: str = '',
date: str = '', entities: dict = None, metadata: dict = None):
"""Add a document with its embedding to the store"""
# Generate embedding
embedding = self.model.encode(content[:512]) # Truncate for embedding
embedding_bytes = embedding.tobytes()
conn = sqlite3.connect(self.db_path)
conn.execute(
'INSERT INTO documents (source_url, title, date, content, entities, embedding, metadata) VALUES (?,?,?,?,?,?,?)',
(source_url, title, date, content[:10000], # Store first 10k chars
json.dumps(entities or {}), embedding_bytes, json.dumps(metadata or {}))
)
conn.commit()
conn.close()
def semantic_search(self, query: str, top_k: int = 10,
min_similarity: float = 0.3) -> List[Tuple]:
"""
Find documents semantically similar to the query
Returns: list of (similarity_score, document) tuples
"""
query_embedding = self.model.encode(query)
conn = sqlite3.connect(self.db_path)
cursor = conn.execute('SELECT id, source_url, title, date, content, entities, embedding, metadata FROM documents')
results = []
for row in cursor:
doc_embedding = np.frombuffer(row[6], dtype=np.float32)
# Cosine similarity
similarity = np.dot(query_embedding, doc_embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(doc_embedding)
)
if similarity >= min_similarity:
results.append((
float(similarity),
{
'id': row[0],
'source_url': row[1],
'title': row[2],
'date': row[3],
'content_preview': row[4][:500],
'entities': json.loads(row[5]) if row[5] else {},
'metadata': json.loads(row[7]) if row[7] else {},
}
))
conn.close()
return sorted(results, key=lambda x: x[0], reverse=True)[:top_k]
def find_similar_documents(self, document_id: int, top_k: int = 5) -> List:
"""Find documents similar to an already-indexed document"""
conn = sqlite3.connect(self.db_path)
row = conn.execute(
'SELECT embedding FROM documents WHERE id = ?', (document_id,)
).fetchone()
conn.close()
if not row:
return []
# Use document's embedding as query
reference_embedding = np.frombuffer(row[0], dtype=np.float32)
# Create temporary query
conn = sqlite3.connect(self.db_path)
cursor = conn.execute(
'SELECT id, title, source_url, embedding FROM documents WHERE id != ?',
(document_id,)
)
results = []
for row in cursor:
doc_embedding = np.frombuffer(row[3], dtype=np.float32)
similarity = np.dot(reference_embedding, doc_embedding) / (
np.linalg.norm(reference_embedding) * np.linalg.norm(doc_embedding)
)
results.append((float(similarity), row[0], row[1], row[2]))
conn.close()
return sorted(results, reverse=True)[:top_k]
# Usage example
store = OSINTVectorStore()
# Add documents to the store
store.add_document(
content="AcmeCorp CEO John Smith announced the acquisition of TechStartup Inc. for $500 million.",
source_url="https://news.example.com/acmecorp-acquisition",
title="AcmeCorp Acquires TechStartup",
date="2024-01-15"
)
# Semantic search
results = store.semantic_search("technology company purchase deal", top_k=5)
for score, doc in results:
print(f"Similarity: {score:.3f} — {doc['title']}: {doc['content_preview'][:100]}")
11.5 Multilingual Processing¶
OSINT investigations frequently involve content in multiple languages. Building effective multilingual processing capability requires:
Translation Architecture¶
from transformers import MarianMTModel, MarianTokenizer
from langdetect import detect
import anthropic
class MultilingualProcessor:
"""Process multilingual OSINT content"""
# Language detection
def detect_language(self, text):
"""Detect the language of a text sample"""
try:
return detect(text)
except Exception:
return 'unknown'
def translate_to_english(self, text, source_language=None):
"""
Translate text to English using appropriate method
For production, cloud APIs provide better quality than local models
"""
if source_language is None:
source_language = self.detect_language(text)
if source_language in ('en', 'unknown'):
return text
# Use Claude for high-quality translation with context awareness
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-haiku-4-5-20251001", # Haiku is cost-effective for translation
max_tokens=2048,
messages=[{
"role": "user",
"content": f"""Translate the following {source_language} text to English.
Provide only the English translation, maintaining the original meaning and tone.
If the text contains proper nouns (names, organizations, places), keep them in the original language.
Text to translate:
{text}"""
}]
)
return response.content[0].text
def process_multilingual_corpus(self, documents):
"""
Process a corpus with mixed languages
documents: list of {'text': str, 'url': str, 'date': str}
"""
processed = []
for doc in documents:
lang = self.detect_language(doc['text'])
translated = self.translate_to_english(doc['text'], lang)
processed.append({
**doc,
'original_language': lang,
'english_text': translated,
'is_translation': lang != 'en'
})
return processed
11.6 Building Processing Pipelines¶
Combining the components above into cohesive processing pipelines:
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
import asyncio
import aiohttp
from datetime import datetime
import json
@dataclass
class ProcessedDocument:
"""Fully processed OSINT document"""
source_url: str
collection_date: str
original_language: str
english_text: str
entities: Dict[str, List[str]] = field(default_factory=dict)
relations: List[Dict] = field(default_factory=list)
financial_data: Dict = field(default_factory=dict)
classification: Dict[str, float] = field(default_factory=dict)
summary: str = ""
metadata: Dict = field(default_factory=dict)
processing_errors: List[str] = field(default_factory=list)
class OSINTProcessingPipeline:
"""End-to-end OSINT document processing pipeline"""
def __init__(self, ai_client=None):
self.nlp_pipeline = InvestigativeNLPPipeline()
self.multilingual = MultilingualProcessor()
self.vector_store = OSINTVectorStore()
self.ai_client = ai_client or anthropic.Anthropic()
async def process_url(self, url: str, session: aiohttp.ClientSession) -> Optional[ProcessedDocument]:
"""Fetch and process a single URL"""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
if response.status != 200:
return None
html = await response.text()
# Parse HTML to get text
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, 'html.parser')
# Remove script and style elements
for element in soup(['script', 'style', 'nav', 'footer', 'header']):
element.decompose()
text = soup.get_text(separator=' ', strip=True)
return await self.process_text(text, url)
except Exception as e:
return ProcessedDocument(
source_url=url,
collection_date=datetime.now().isoformat(),
original_language='unknown',
english_text='',
processing_errors=[str(e)]
)
async def process_text(self, text: str, source_url: str) -> ProcessedDocument:
"""Process raw text through the full pipeline"""
doc = ProcessedDocument(
source_url=source_url,
collection_date=datetime.now().isoformat(),
original_language='en',
english_text=text
)
try:
# 1. Language detection and translation
lang = self.multilingual.detect_language(text)
doc.original_language = lang
if lang != 'en':
doc.english_text = self.multilingual.translate_to_english(text, lang)
else:
doc.english_text = text
# 2. Entity extraction
entities = self.nlp_pipeline.extract_entities(doc.english_text[:4096])
doc.entities = self._aggregate_entities(entities)
# 3. Financial data extraction
doc.financial_data = extract_financial_data_from_document(doc.english_text)
# 4. Document classification
doc.classification = self.nlp_pipeline.classify_document(
doc.english_text,
candidate_labels=[
'corporate/business', 'criminal/legal', 'financial fraud',
'political', 'cybersecurity', 'geographic', 'personal/biographical'
]
)
# 5. Generate summary for long documents
if len(doc.english_text.split()) > 200:
doc.summary = self.nlp_pipeline.summarizer(
doc.english_text[:1024], max_length=150, min_length=40
)[0]['summary_text']
# 6. Add to vector store for semantic search
self.vector_store.add_document(
content=doc.english_text,
source_url=source_url,
entities=doc.entities,
metadata={'language': lang, 'classification': doc.classification}
)
except Exception as e:
doc.processing_errors.append(str(e))
return doc
def _aggregate_entities(self, entities: List[Dict]) -> Dict[str, List[str]]:
"""Aggregate entity list into type-grouped dictionary"""
aggregated = {}
for ent in entities:
label = ent.get('entity_group', 'MISC')
word = ent.get('word', '')
if label not in aggregated:
aggregated[label] = []
if word and word not in aggregated[label]:
aggregated[label].append(word)
return aggregated
async def process_url_batch(self, urls: List[str], concurrency: int = 10) -> List[ProcessedDocument]:
"""Process a batch of URLs with controlled concurrency"""
semaphore = asyncio.Semaphore(concurrency)
async with aiohttp.ClientSession() as session:
async def process_with_semaphore(url):
async with semaphore:
return await self.process_url(url, session)
tasks = [process_with_semaphore(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if isinstance(r, ProcessedDocument)]
Summary¶
Processing unstructured OSINT data at scale requires combining multiple technologies into coherent pipelines. NLP provides entity extraction, relation identification, classification, and summarization. Document processing handles PDFs and scanned materials. Semantic search via vector embeddings enables finding conceptually related content without exact keyword matching. Multilingual processing makes globally sourced content accessible.
The key architectural principle is pipeline composition: each processing stage transforms data into a form suitable for the next stage, ending with structured, searchable, analytically usable intelligence from originally unstructured raw data.
Human oversight remains essential at the output end. Automated pipelines can process volume that humans cannot, but the analytical conclusions drawn from that processing require human judgment.
Common Mistakes and Pitfalls¶
- Single-language pipelines: OSINT data is multilingual by nature; English-only processing systematically misses non-English content
- PDF extraction errors: PDFs with complex layouts, images, or security restrictions produce corrupt text extraction without specialized handling
- Embedding dimensionality mismatch: Mixing embeddings from different models in the same vector store produces unreliable similarity results
- Processing pipeline brittleness: Pipelines that fail silently on errors produce incomplete results without indication of incompleteness
- NER accuracy overestimation: NER models have meaningful error rates, particularly for domain-specific entities and non-standard text
- Context loss in chunking: Breaking long documents into chunks without overlap loses cross-chunk context
Further Reading¶
- spaCy documentation — production NLP implementation
- Hugging Face documentation — transformer model usage
- Haystack framework — end-to-end NLP pipelines for question answering and document search
- Chroma, Pinecone, Weaviate — production vector database options
- FastText — efficient multilingual embedding models from Meta