Chapter 14: Automation, Scripting, and Investigative Pipelines¶
Learning Objectives¶
By the end of this chapter, you will be able to: - Design modular, maintainable investigative automation systems - Build production-grade data collection pipelines with error handling and logging - Implement scheduling and monitoring for ongoing investigations - Create reusable investigative workflow components - Handle rate limiting, authentication, and API management at scale - Design alerting and notification systems for monitoring workflows - Apply testing practices to investigative automation code
14.1 The Case for Investigative Automation¶
Manual OSINT investigation is fundamentally time-limited. A skilled investigator can monitor a few dozen sources, process hundreds of documents per week, and track several active investigations simultaneously. For the many OSINT applications that require continuous monitoring, large-scale data processing, or coordination across multiple data streams, manual workflows cannot scale.
Automation addresses this gap. A well-designed investigative pipeline can: - Monitor hundreds of sources continuously for relevant changes - Process thousands of documents per day through NLP analysis - Alert investigators when significant findings emerge - Maintain consistent collection documentation automatically - Repeat workflows reliably without human error variation
The discipline required to build good automation is the same discipline required for good investigation: clear requirements, systematic design, and rigorous quality management.
14.2 Pipeline Architecture Principles¶
The ETL Framework Applied to OSINT¶
OSINT pipelines follow an Extract-Transform-Load (ETL) pattern:
Extract: Collect raw data from sources (web scraping, API queries, database access) Transform: Process raw data into structured, analyzed intelligence (NLP, entity extraction, classification) Load: Store processed intelligence for analysis and reporting
[Sources] → [Collectors] → [Raw Store] → [Processors] → [Intelligence Store] → [Analysts/Reports]
↑ ↑
[Scheduler] [ML Models]
↑ ↑
[Monitor] [API Services]
Design Principles¶
Idempotency: Running the same pipeline stage twice on the same input should produce the same output. This enables safe retry on failure.
Observability: Pipelines should emit logs, metrics, and alerts that enable understanding what is happening inside them.
Failure isolation: Failure in one pipeline stage should not corrupt data or prevent other stages from operating correctly.
Source independence: Collectors for different sources should be independent, so API restrictions or failures affecting one source don't halt the whole pipeline.
Configuration over code: Pipeline targets, frequencies, and parameters should be configurable without code changes.
14.3 Modular Collector Design¶
import abc
import logging
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Optional, Generator
import hashlib
import json
import sqlite3
from pathlib import Path
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
logger = logging.getLogger(__name__)
@dataclass
class CollectedItem:
"""Standardized container for a collected data item"""
item_id: str
source_name: str
source_url: str
collected_at: str
content: str
content_type: str # 'html', 'json', 'pdf', 'text'
metadata: Dict = field(default_factory=dict)
content_hash: str = field(init=False)
def __post_init__(self):
self.content_hash = hashlib.sha256(
self.content.encode() if isinstance(self.content, str)
else self.content
).hexdigest()
class BaseCollector(abc.ABC):
"""Abstract base class for all OSINT collectors"""
def __init__(self, source_name: str, rate_limit_seconds: float = 1.0):
self.source_name = source_name
self.rate_limit_seconds = rate_limit_seconds
self._last_request_time = 0
self.session = self._create_session()
def _create_session(self) -> requests.Session:
"""Create a requests session with retry logic"""
session = requests.Session()
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS"],
backoff_factor=2
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
session.headers.update({
'User-Agent': 'OSINT Research Bot/1.0 (research purposes)',
})
return session
def _rate_limit(self):
"""Enforce rate limiting between requests"""
elapsed = time.time() - self._last_request_time
if elapsed < self.rate_limit_seconds:
time.sleep(self.rate_limit_seconds - elapsed)
self._last_request_time = time.time()
def _make_request(self, url: str, **kwargs) -> Optional[requests.Response]:
"""Make an HTTP request with rate limiting and error handling"""
self._rate_limit()
try:
response = self.session.get(url, timeout=30, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
logger.warning(f"Request failed for {url}: {e}")
return None
@abc.abstractmethod
def collect(self, **kwargs) -> Generator[CollectedItem, None, None]:
"""
Collect items from the source.
Yields CollectedItem objects.
"""
pass
@abc.abstractmethod
def health_check(self) -> bool:
"""Verify that the source is accessible and the collector is functional"""
pass
class NewsAPICollector(BaseCollector):
"""Collector for news articles via NewsAPI.org"""
def __init__(self, api_key: str, rate_limit_seconds: float = 0.5):
super().__init__('newsapi', rate_limit_seconds)
self.api_key = api_key
self.session.headers.update({'X-Api-Key': api_key})
def collect(self, query: str, language: str = 'en',
from_date: str = None, to_date: str = None,
max_articles: int = 100) -> Generator[CollectedItem, None, None]:
"""Collect news articles matching a query"""
page = 1
collected_count = 0
while collected_count < max_articles:
params = {
'q': query,
'language': language,
'pageSize': min(100, max_articles - collected_count),
'page': page,
}
if from_date:
params['from'] = from_date
if to_date:
params['to'] = to_date
response = self._make_request(
'https://newsapi.org/v2/everything',
params=params
)
if not response:
break
data = response.json()
if data.get('status') != 'ok':
logger.error(f"NewsAPI error: {data.get('message')}")
break
articles = data.get('articles', [])
if not articles:
break
for article in articles:
item_id = hashlib.md5(article.get('url', '').encode()).hexdigest()
yield CollectedItem(
item_id=item_id,
source_name=self.source_name,
source_url=article.get('url', ''),
collected_at=datetime.now().isoformat(),
content=json.dumps({
'title': article.get('title', ''),
'description': article.get('description', ''),
'content': article.get('content', ''),
'author': article.get('author', ''),
'publishedAt': article.get('publishedAt', ''),
'source': article.get('source', {}).get('name', ''),
}),
content_type='json',
metadata={
'query': query,
'published_at': article.get('publishedAt'),
'source_name': article.get('source', {}).get('name', ''),
}
)
collected_count += 1
page += 1
if len(articles) < 100:
break
def health_check(self) -> bool:
"""Check if NewsAPI is accessible with current credentials"""
response = self._make_request(
'https://newsapi.org/v2/sources',
params={'apiKey': self.api_key}
)
return response is not None and response.status_code == 200
class RSSCollector(BaseCollector):
"""Collector for RSS/Atom feed content"""
def __init__(self, rate_limit_seconds: float = 2.0):
super().__init__('rss', rate_limit_seconds)
def collect(self, feed_url: str, feed_name: str = None) -> Generator[CollectedItem, None, None]:
"""Collect items from an RSS feed"""
import feedparser
try:
feed = feedparser.parse(feed_url)
except Exception as e:
logger.error(f"RSS parse error for {feed_url}: {e}")
return
source_name = feed_name or feed.feed.get('title', feed_url)
for entry in feed.entries:
entry_id = hashlib.md5(
entry.get('link', entry.get('id', str(entry))).encode()
).hexdigest()
# Get full content if available
content = ''
if hasattr(entry, 'content'):
content = entry.content[0].get('value', '')
elif hasattr(entry, 'summary'):
content = entry.summary
elif hasattr(entry, 'description'):
content = entry.description
yield CollectedItem(
item_id=entry_id,
source_name=source_name,
source_url=entry.get('link', feed_url),
collected_at=datetime.now().isoformat(),
content=json.dumps({
'title': entry.get('title', ''),
'summary': entry.get('summary', ''),
'content': content,
'published': str(entry.get('published_parsed', '')),
'author': entry.get('author', ''),
}),
content_type='json',
metadata={
'feed_url': feed_url,
'feed_name': source_name,
'published': entry.get('published', ''),
}
)
def health_check(self) -> bool:
return True # RSS parsing is local
class SecEdgarCollector(BaseCollector):
"""Collector for SEC EDGAR filings"""
def __init__(self, rate_limit_seconds: float = 0.2):
super().__init__('sec_edgar', rate_limit_seconds)
# SEC requires User-Agent with contact info
self.session.headers.update({
'User-Agent': 'OSINT Research Tool research@example.com',
'Accept-Encoding': 'gzip, deflate',
})
def collect(self, company_name: str = None, cik: str = None,
form_types: List[str] = None,
date_start: str = None) -> Generator[CollectedItem, None, None]:
"""Collect SEC filings for a company"""
form_types = form_types or ['10-K', '10-Q', '8-K', 'DEF 14A']
# Get company CIK if not provided
if not cik and company_name:
cik = self._lookup_cik(company_name)
if not cik:
logger.error(f"Could not find CIK for {company_name}")
return
# Get submissions for company
url = f"https://data.sec.gov/submissions/CIK{str(cik).zfill(10)}.json"
response = self._make_request(url)
if not response:
return
submissions = response.json()
filings = submissions.get('filings', {}).get('recent', {})
forms = filings.get('form', [])
accession_numbers = filings.get('accessionNumber', [])
dates = filings.get('filingDate', [])
documents = filings.get('primaryDocument', [])
for form, accession, date, doc in zip(forms, accession_numbers, dates, documents):
if form not in form_types:
continue
if date_start and date < date_start:
continue
# Build document URL
accession_clean = accession.replace('-', '')
doc_url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{accession_clean}/{doc}"
item_id = hashlib.md5(doc_url.encode()).hexdigest()
yield CollectedItem(
item_id=item_id,
source_name='sec_edgar',
source_url=doc_url,
collected_at=datetime.now().isoformat(),
content=json.dumps({
'company_name': submissions.get('name', ''),
'cik': cik,
'form_type': form,
'filing_date': date,
'accession_number': accession,
'document_url': doc_url,
}),
content_type='json',
metadata={
'form_type': form,
'filing_date': date,
'company_cik': cik,
}
)
def _lookup_cik(self, company_name: str) -> Optional[str]:
"""Look up CIK for a company name"""
response = self._make_request(
'https://efts.sec.gov/LATEST/search-index',
params={'q': f'"{company_name}"', 'dateRange': 'custom', 'startdt': '2020-01-01'}
)
if response and response.status_code == 200:
data = response.json()
hits = data.get('hits', {}).get('hits', [])
if hits:
return hits[0]['_source'].get('entity_id', '')
return None
def health_check(self) -> bool:
response = self._make_request('https://data.sec.gov/submissions/CIK0000320193.json')
return response is not None and response.status_code == 200
14.4 Collection State Management¶
Preventing duplicate processing and tracking collection state:
class CollectionStateManager:
"""Manages collection state to prevent duplicates and support resumption"""
def __init__(self, db_path: str = 'collection_state.db'):
self.db_path = db_path
self._init_db()
def _init_db(self):
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS collected_items (
item_id TEXT PRIMARY KEY,
source_name TEXT,
source_url TEXT,
collected_at TEXT,
processed_at TEXT,
content_hash TEXT,
processing_status TEXT DEFAULT 'pending',
error_message TEXT
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS collection_runs (
run_id TEXT PRIMARY KEY,
source_name TEXT,
started_at TEXT,
completed_at TEXT,
items_collected INTEGER DEFAULT 0,
items_processed INTEGER DEFAULT 0,
status TEXT DEFAULT 'running',
error_message TEXT
)
''')
conn.commit()
def is_seen(self, item_id: str) -> bool:
"""Check if an item has been previously collected"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
'SELECT 1 FROM collected_items WHERE item_id = ?', (item_id,)
)
return cursor.fetchone() is not None
def mark_collected(self, item: CollectedItem):
"""Record that an item has been collected"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
INSERT OR REPLACE INTO collected_items
(item_id, source_name, source_url, collected_at, content_hash, processing_status)
VALUES (?, ?, ?, ?, ?, 'pending')
''', (item.item_id, item.source_name, item.source_url,
item.collected_at, item.content_hash))
conn.commit()
def mark_processed(self, item_id: str, error: str = None):
"""Mark an item as processed"""
status = 'error' if error else 'processed'
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
UPDATE collected_items
SET processing_status = ?, processed_at = ?, error_message = ?
WHERE item_id = ?
''', (status, datetime.now().isoformat(), error, item_id))
conn.commit()
def get_pending_items(self, source_name: str = None, limit: int = 100) -> list:
"""Get items pending processing"""
query = "SELECT * FROM collected_items WHERE processing_status = 'pending'"
params = []
if source_name:
query += " AND source_name = ?"
params.append(source_name)
query += f" LIMIT {limit}"
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
14.5 Pipeline Orchestration¶
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
import schedule
import threading
from pathlib import Path
class OSINTPipeline:
"""Full OSINT pipeline orchestrator"""
def __init__(self, config: dict):
self.config = config
self.state_manager = CollectionStateManager()
self.nlp_pipeline = None # Lazy initialization
self.running = False
self.executor = ThreadPoolExecutor(max_workers=config.get('max_workers', 4))
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('osint_pipeline.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger('OSINTPipeline')
def collect_from_source(self, source_config: dict) -> List[CollectedItem]:
"""Run collection for a single source configuration"""
source_type = source_config['type']
new_items = []
try:
if source_type == 'newsapi':
collector = NewsAPICollector(source_config['api_key'])
generator = collector.collect(
query=source_config['query'],
max_articles=source_config.get('max_articles', 100)
)
elif source_type == 'rss':
collector = RSSCollector()
generator = collector.collect(
feed_url=source_config['url'],
feed_name=source_config.get('name')
)
elif source_type == 'sec_edgar':
collector = SecEdgarCollector()
generator = collector.collect(
company_name=source_config.get('company_name'),
cik=source_config.get('cik'),
form_types=source_config.get('form_types', ['8-K'])
)
else:
self.logger.error(f"Unknown source type: {source_type}")
return []
for item in generator:
if not self.state_manager.is_seen(item.item_id):
self.state_manager.mark_collected(item)
new_items.append(item)
self.logger.info(f"Collected new item: {item.source_url[:80]}")
except Exception as e:
self.logger.error(f"Collection error for {source_config.get('name', source_type)}: {e}")
return new_items
def process_item(self, item: CollectedItem) -> dict:
"""Process a single collected item through the NLP pipeline"""
try:
# Parse content
if item.content_type == 'json':
content_data = json.loads(item.content)
text = ' '.join(filter(None, [
content_data.get('title', ''),
content_data.get('description', ''),
content_data.get('content', ''),
content_data.get('summary', ''),
]))
else:
text = item.content
if not text.strip():
self.state_manager.mark_processed(item.item_id)
return {}
# Run NLP processing
if not self.nlp_pipeline:
import spacy
self.nlp_pipeline = spacy.load("en_core_web_sm")
doc = self.nlp_pipeline(text[:10000])
# Extract entities
entities = {}
for ent in doc.ents:
if ent.label_ not in entities:
entities[ent.label_] = []
if ent.text not in entities[ent.label_]:
entities[ent.label_].append(ent.text)
result = {
'item_id': item.item_id,
'source_url': item.source_url,
'collected_at': item.collected_at,
'entities': entities,
'word_count': len(text.split()),
'has_persons': bool(entities.get('PERSON')),
'has_organizations': bool(entities.get('ORG')),
'has_money': bool(entities.get('MONEY')),
}
self.state_manager.mark_processed(item.item_id)
return result
except Exception as e:
self.state_manager.mark_processed(item.item_id, error=str(e))
self.logger.error(f"Processing error for {item.item_id}: {e}")
return {}
def run_collection_cycle(self):
"""Run one collection cycle across all configured sources"""
self.logger.info("Starting collection cycle")
all_new_items = []
# Collect from all sources
futures = {}
for source in self.config.get('sources', []):
if source.get('enabled', True):
future = self.executor.submit(self.collect_from_source, source)
futures[future] = source.get('name', 'unknown')
for future in as_completed(futures):
source_name = futures[future]
try:
items = future.result()
all_new_items.extend(items)
self.logger.info(f"Collected {len(items)} new items from {source_name}")
except Exception as e:
self.logger.error(f"Error collecting from {source_name}: {e}")
# Process new items
processed_results = []
for item in all_new_items:
result = self.process_item(item)
if result:
processed_results.append(result)
self.logger.info(
f"Collection cycle complete. "
f"Collected: {len(all_new_items)}, Processed: {len(processed_results)}"
)
# Check for alerts
self._check_alerts(processed_results)
return processed_results
def _check_alerts(self, results: List[dict]):
"""Check results against alert conditions and notify if triggered"""
alert_conditions = self.config.get('alerts', [])
for condition in alert_conditions:
matched = []
for result in results:
entities = result.get('entities', {})
# Check for specific entity mentions
for entity_type, search_terms in condition.get('entities', {}).items():
found = entities.get(entity_type, [])
for term in search_terms:
if any(term.lower() in f.lower() for f in found):
matched.append({
'result': result,
'matched_term': term,
'condition_name': condition.get('name')
})
if matched:
self._send_alert(condition.get('name', 'Alert'), matched)
def _send_alert(self, alert_name: str, matches: List[dict]):
"""Send alert notification"""
# In production, implement email, Slack, PagerDuty, etc.
self.logger.warning(f"ALERT: {alert_name} — {len(matches)} matches found")
for match in matches[:5]:
self.logger.warning(
f" Matched '{match['matched_term']}' in: {match['result'].get('source_url', '')[:80]}"
)
def start_scheduled(self, collection_interval_minutes: int = 60):
"""Start the pipeline on a schedule"""
self.running = True
def run_and_schedule():
schedule.every(collection_interval_minutes).minutes.do(self.run_collection_cycle)
# Run immediately on start
self.run_collection_cycle()
while self.running:
schedule.run_pending()
time.sleep(60)
thread = threading.Thread(target=run_and_schedule, daemon=True)
thread.start()
self.logger.info(f"Pipeline started, collecting every {collection_interval_minutes} minutes")
return thread
def stop(self):
"""Stop the scheduled pipeline"""
self.running = False
self.logger.info("Pipeline stopped")
# Pipeline configuration example
PIPELINE_CONFIG = {
'max_workers': 4,
'sources': [
{
'name': 'General News - Subject A',
'type': 'newsapi',
'api_key': 'YOUR_NEWSAPI_KEY',
'query': '"Target Company" OR "Target Person"',
'max_articles': 100,
'enabled': True,
},
{
'name': 'SEC EDGAR - Target Company',
'type': 'sec_edgar',
'company_name': 'Target Company Inc',
'form_types': ['8-K', '10-Q'],
'enabled': True,
},
{
'name': 'Industry RSS Feed',
'type': 'rss',
'url': 'https://example.com/feed.rss',
'name': 'Industry News',
'enabled': True,
}
],
'alerts': [
{
'name': 'Litigation Alert',
'entities': {
'ORG': ['Target Company', 'Target Corp'],
'PERSON': ['Target Person']
},
}
]
}
14.6 Testing Investigative Automation¶
Production investigative automation requires testing:
import unittest
from unittest.mock import patch, MagicMock
class TestNewsAPICollector(unittest.TestCase):
def setUp(self):
self.collector = NewsAPICollector('test_api_key')
@patch('requests.Session.get')
def test_collect_returns_items(self, mock_get):
"""Test that collection returns properly structured items"""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
'status': 'ok',
'articles': [
{
'title': 'Test Article',
'url': 'https://example.com/article',
'description': 'Test description',
'content': 'Test content',
'publishedAt': '2024-01-01T00:00:00Z',
'source': {'name': 'Test Source'}
}
]
}
mock_get.return_value = mock_response
items = list(self.collector.collect(query='test', max_articles=10))
self.assertEqual(len(items), 1)
self.assertIsInstance(items[0], CollectedItem)
self.assertEqual(items[0].source_name, 'newsapi')
self.assertIsNotNone(items[0].content_hash)
@patch('requests.Session.get')
def test_collect_handles_api_error(self, mock_get):
"""Test that API errors are handled gracefully"""
mock_get.side_effect = requests.exceptions.ConnectionError("Connection failed")
items = list(self.collector.collect(query='test'))
self.assertEqual(len(items), 0)
class TestCollectionStateManager(unittest.TestCase):
def setUp(self):
import tempfile
self.tmp_dir = tempfile.mkdtemp()
self.state_manager = CollectionStateManager(
db_path=f"{self.tmp_dir}/test_state.db"
)
def test_is_seen_returns_false_for_new_item(self):
self.assertFalse(self.state_manager.is_seen('new_item_id'))
def test_mark_collected_and_is_seen(self):
item = CollectedItem(
item_id='test_123',
source_name='test',
source_url='https://example.com',
collected_at=datetime.now().isoformat(),
content='test content',
content_type='text'
)
self.state_manager.mark_collected(item)
self.assertTrue(self.state_manager.is_seen('test_123'))
def test_mark_processed_updates_status(self):
item = CollectedItem(
item_id='test_456',
source_name='test',
source_url='https://example.com',
collected_at=datetime.now().isoformat(),
content='test content',
content_type='text'
)
self.state_manager.mark_collected(item)
self.state_manager.mark_processed('test_456')
pending = self.state_manager.get_pending_items()
pending_ids = [p['item_id'] for p in pending]
self.assertNotIn('test_456', pending_ids)
if __name__ == '__main__':
unittest.main()
Summary¶
Investigative automation enables OSINT practice to scale beyond what is possible with manual methods. Well-designed pipelines with modular collectors, state management, and processing stages provide repeatable, reliable data collection and analysis.
The architectural principles that matter most: idempotency (so pipelines can be safely restarted), observability (so failures are visible), and source independence (so one failing source doesn't halt everything). Testing automation code prevents silent failures that produce incomplete or incorrect results.
Building automation is an investment that pays off for recurring investigations, ongoing monitoring requirements, and large-scale data processing. For one-time investigations, manual methods may be more appropriate.
Common Mistakes and Pitfalls¶
- No deduplication: Processing the same item multiple times wastes resources and distorts analysis
- Silent failures: Pipelines that log errors but continue silently produce incomplete results without warning
- Hard-coded credentials: API keys and credentials in code are a security risk; use environment variables or secrets management
- No rate limiting: Violating API rate limits causes throttling and potential account suspension
- Untested pipelines: Automation bugs discovered in production are costly; test with realistic sample data before deployment
- Missing resume capability: Pipelines that cannot resume from failure force expensive restarts from scratch
Further Reading¶
- Apache Airflow documentation — production workflow orchestration
- Scrapy documentation — Python web scraping framework
- Python asyncio documentation — asynchronous I/O for high-throughput collection
- Kafka documentation — stream processing for real-time OSINT data
- Prometheus and Grafana — pipeline monitoring and observability