Reservoir Sampling: Random Selection from Data Streams
Master the essential randomized algorithm for randomly selecting k items from unlimited data streams. Reservoir sampling is a family of randomized algorithms designed to handle streams of large or unknown size, making it perfect for interviews at Google, Amazon, and other tech giants.
🎯 What is Reservoir Sampling?
Reservoir sampling is a family of randomized algorithms specifically designed to solve the challenge of selecting a uniform random sample from a data stream of large or unknown number of elements. This sophisticated randomized algorithm maintains a "reservoir" containing exactly k items, where each element in the stream has an equal probability of ending up in the final sample, regardless of the stream's total size.
The core challenge this powerful randomized algorithm solves is: How do you fairly select k items from a stream when you don't know the total size? Traditional random sampling methods require knowing the population size to generate a random number within the appropriate range, but streams of large or unknown number make this approach impossible.
The beauty of this randomized algorithm approach is that each new item has a calculated probability of replacing one of the items in the reservoir, ensuring perfect statistical fairness regardless of whether the stream contains a large or unknown number of elements.
Step-by-step explanation: This implementation demonstrates the classic Reservoir Sampling randomized algorithm (Algorithm R). It maintains a "reservoir" array of exactly k items and processes each incoming stream element from data streams of large or unknown size, intelligently deciding whether to keep it based on calculated probabilities.
Algorithm Deep Dive
🎯 How It Works (Simple Explanation)
Think of it like this: you're trying to pick k random people from a never-ending line, but you can only remember k names at a time!
- Step 1: Keep the first k people (they're guaranteed spots)
- Step 2: For each new person, flip a weighted coin to decide if they deserve a spot
- Step 3: If they win, randomly kick out someone already selected
- Magic Result: Everyone gets a perfectly fair chance, no matter when they arrived!
⚡ Complexity Analysis
Time Complexity: O(n) - single pass through stream
Space Complexity: O(k) - only store k items
Stream Size: Unknown/infinite - works with any size
Randomness: Uniform distribution guaranteed
🎨 Algorithm Visualization
Watch how reservoir sampling maintains fairness as the stream grows
🧮 Mathematical Foundation & Proof
The Key Equation
Where k is the reservoir size and i is the current element's position in the stream.
Mathematical Proof by Induction
Base Case (i ≤ k):
The first k elements are automatically included with probability 1. Each has probability k/k = 1 of being selected initially.
Inductive Step (i > k):
Assume after processing (i-1) elements, each element has probability k/(i-1) of being in the reservoir.
When element i arrives:
• Probability of including element i: k/i
• If included, it replaces a random existing element
• Probability any existing element gets replaced: (k/i) × (1/k) = 1/i
Therefore, probability any existing element remains = 1 - 1/i = (i-1)/i
Combined probability = [k/(i-1)] × [(i-1)/i] = k/i ✓
🎯 Result: After processing n elements, every element has exactly k/n probability of being selected!
Intuitive Understanding
🎯 Perfect Fairness
Every element gets equal treatment regardless of when it arrives in the stream
⚡ Efficiency
O(k) space complexity, O(1) per-element processing time
🌊 Stream-Friendly
Works with infinite streams without knowing total size
🐍 Complete Python Implementation
Basic Reservoir Sampling Implementation
import random def reservoir_sampling(stream, k): """ Reservoir Sampling Algorithm (Algorithm R) Args: stream: Iterable of elements to sample from k: Number of elements to sample Returns: List of k randomly sampled elements """ reservoir = [] for i, element in enumerate(stream): if i < k: # Fill reservoir with first k elements reservoir.append(element) else: # Replace elements with probability k/i j = random.randint(0, i) if j < k: reservoir[j] = element return reservoir # Example usage stream = range(1, 1001) # Stream of numbers 1-1000 k = 10 sample = reservoir_sampling(stream, k) print("Random sample from stream:", sample)
Advanced Implementation with Visualization 1
import matplotlib.pyplot as plt import seaborn as sns import numpy as np from collections import defaultdict sns.set_context('poster') def plot_reservoir_sampling(stream, k, iterations=1000): """ Visualize the fairness of reservoir sampling Args: stream: Range of elements to sample from k: Reservoir size iterations: Number of sampling iterations for statistics """ counts = [0] * len(stream) # Run multiple iterations to gather statistics for _ in range(iterations): sample = reservoir_sampling(stream, k) for elem in sample: counts[elem - 1] += 1 # Create visualization plt.figure(figsize=(16, 6)) plt.plot(stream, counts, linewidth=2, color='#3b82f6') plt.xlabel('Element in Stream', fontsize=14) plt.ylabel('Selection Frequency', fontsize=14) plt.title(f'Reservoir Sampling Fairness Test (k={k}, iterations={iterations})', fontsize=16) plt.grid(True, alpha=0.3) # Add expected frequency line expected_freq = iterations * k / len(stream) plt.axhline(y=expected_freq, color='red', linestyle='--', label=f'Expected frequency: {expected_freq:.1f}') plt.legend() plt.tight_layout() plt.show() # Test fairness with visualization stream = range(1, 101) # Stream of 100 elements plot_reservoir_sampling(stream, 10)
Object-Oriented Reservoir Sampler
from dataclasses import dataclass import typing as t import numpy as np A = t.TypeVar("A") @dataclass class Reservoir: """ Object-oriented reservoir sampler for streaming data Attributes: k: Reservoir size seed: Random seed for reproducibility """ k: int seed: t.Optional[int] = None def __post_init__(self): # Initialize state self.i = 0 # Number of items processed self.sample: t.List[A] = [] if self.seed is not None: assert self.seed > 0 def add(self, item: A) -> None: """ Add a new item to the stream and update reservoir Args: item: New item from the stream """ self.i += 1 # Set seed for reproducibility if self.seed is not None: np.random.seed(self.i * self.seed) if len(self.sample) < self.k: # Fill reservoir with first k elements self.sample.append(item) else: # Replace with probability k/i if np.random.rand() <= self.k / self.i: idx = np.random.choice(self.k) self.sample[idx] = item def get_sample(self) -> t.List[A]: """ Get current reservoir sample Returns: Current sample in the reservoir """ return self.sample.copy() def get_stats(self) -> dict: """ Get sampling statistics Returns: Dictionary with sampling statistics """ return { 'items_processed': self.i, 'reservoir_size': len(self.sample), 'target_size': self.k, 'current_probability': self.k / max(self.i, 1) } # Example usage reservoir = Reservoir(k=5, seed=42) # Process stream elements for x in range(20): reservoir.add(x) if x % 5 == 4: # Print every 5 elements print(f"After {x+1} elements: {reservoir.get_sample()}") print(f"Stats: {reservoir.get_stats()}") print()
Statistical Validation Test
import collections def test_reservoir_fairness(stream_size=6, k=5, iterations=100000): """ Test the fairness of reservoir sampling algorithm Args: stream_size: Size of the stream to test k: Reservoir size iterations: Number of test iterations Returns: Dictionary showing probability distribution """ cnt = collections.defaultdict(int) for idx in range(iterations): r = Reservoir(k=k, seed=idx + 1) for x in range(stream_size): r.add(x) cnt[frozenset(r.get_sample())] += 1 # Convert to probabilities probabilities = {k: v / iterations for k, v in cnt.items()} return probabilities # Test with small example: select 5 from 6 elements results = test_reservoir_fairness(stream_size=6, k=5, iterations=100000) print("Probability distribution for selecting 5 from 6 elements:") for subset, prob in sorted(results.items()): print(f"{sorted(list(subset))}: {prob:.4f}") # Expected: each subset should have probability ≈ 1/6 = 0.1667 expected_prob = 1/6 print(f"\nExpected probability: {expected_prob:.4f}") print(f"Actual probabilities range: {min(results.values()):.4f} - {max(results.values()):.4f}")
✅ Expected Output:
Each possible subset of 5 elements should appear with probability ≈ 0.1667 (1/6), demonstrating perfect fairness of the algorithm.
🤖 Machine Learning & Data Science Applications
🎯 Online Learning & Model Training
Reservoir sampling is crucial for training ML models on streaming data where you can't store the entire dataset in memory.
from sklearn.linear_model import SGDClassifier import numpy as np class OnlineLearner: def __init__(self, reservoir_size=1000, model_update_freq=100): self.reservoir = Reservoir(k=reservoir_size) self.model = SGDClassifier() self.model_update_freq = model_update_freq self.samples_seen = 0 self.is_fitted = False def process_sample(self, X, y): """Process a new training sample from the stream""" self.reservoir.add((X, y)) self.samples_seen += 1 # Retrain model periodically if self.samples_seen % self.model_update_freq == 0: self._retrain_model() def _retrain_model(self): """Retrain model on current reservoir sample""" if len(self.reservoir.get_sample()) < 10: return # Extract features and labels from reservoir samples = self.reservoir.get_sample() X_batch = np.array([sample[0] for sample in samples]) y_batch = np.array([sample[1] for sample in samples]) if not self.is_fitted: self.model.fit(X_batch, y_batch) self.is_fitted = True else: # Partial fit for online learning self.model.partial_fit(X_batch, y_batch) def predict(self, X): """Make prediction using current model""" if not self.is_fitted: return None return self.model.predict([X])[0]
💡 Key Benefits:
- Maintains representative sample for model training
- Handles concept drift in streaming data
- Memory-efficient for large-scale applications
🔍 Dynamic Feature Selection
Use reservoir sampling to maintain a diverse set of features for high-dimensional streaming data analysis.
class FeatureSelector: def __init__(self, max_features=100): self.feature_reservoir = Reservoir(k=max_features) self.feature_importance = {} def update_features(self, new_features, importance_scores): """Update feature set based on streaming importance scores""" for feature, score in zip(new_features, importance_scores): # Weight by importance for better selection weighted_feature = (feature, score) self.feature_reservoir.add(weighted_feature) self.feature_importance[feature] = score def get_selected_features(self): """Get current set of selected features""" selected = self.feature_reservoir.get_sample() return [feature for feature, _ in selected] def get_feature_matrix(self, data): """Extract selected features from data""" selected_features = self.get_selected_features() return data[:, selected_features]
📊 A/B Testing & Experimentation
Maintain representative samples for statistical analysis in streaming A/B tests.
import scipy.stats as stats class ABTestAnalyzer: def __init__(self, sample_size=1000): self.control_reservoir = Reservoir(k=sample_size) self.treatment_reservoir = Reservoir(k=sample_size) def add_observation(self, group, outcome): """Add new observation to appropriate group""" if group == 'control': self.control_reservoir.add(outcome) elif group == 'treatment': self.treatment_reservoir.add(outcome) def get_statistical_power(self): """Calculate current statistical power of the test""" control_data = self.control_reservoir.get_sample() treatment_data = self.treatment_reservoir.get_sample() if len(control_data) < 30 or len(treatment_data) < 30: return None # Perform t-test t_stat, p_value = stats.ttest_ind(control_data, treatment_data) return { 'control_mean': np.mean(control_data), 'treatment_mean': np.mean(treatment_data), 'p_value': p_value, 'significant': p_value < 0.05, 'sample_sizes': (len(control_data), len(treatment_data)) }
⚡ Real-time Data Pipeline Sampling
Integrate reservoir sampling into data pipelines for monitoring and quality assurance.
class DataQualityMonitor: def __init__(self, sample_size=500): self.data_reservoir = Reservoir(k=sample_size) self.quality_metrics = { 'null_rate': [], 'outlier_rate': [], 'schema_violations': [] } def process_record(self, record): """Process incoming data record""" self.data_reservoir.add(record) # Update quality metrics self._update_quality_metrics(record) def _update_quality_metrics(self, record): """Update data quality metrics""" # Calculate null rate null_count = sum(1 for v in record.values() if v is None) null_rate = null_count / len(record) # Detect outliers (simplified) numeric_values = [v for v in record.values() if isinstance(v, (int, float))] outlier_rate = self._detect_outliers(numeric_values) # Update running metrics self.quality_metrics['null_rate'].append(null_rate) self.quality_metrics['outlier_rate'].append(outlier_rate) def get_quality_report(self): """Generate data quality report from sample""" sample = self.data_reservoir.get_sample() return { 'sample_size': len(sample), 'avg_null_rate': np.mean(self.quality_metrics['null_rate']), 'avg_outlier_rate': np.mean(self.quality_metrics['outlier_rate']), 'data_freshness': self._calculate_freshness(sample) }
🚀 Production Use Cases:
- Recommendation Systems: Sample user interactions for model updates
- Fraud Detection: Maintain diverse transaction samples
- Log Analysis: Sample log entries for anomaly detection
- IoT Data Processing: Sample sensor readings for trend analysis
🌍 Traditional Applications
Data Analytics
Sample large datasets for statistical analysis, A/B testing, and machine learning model training without loading entire datasets into memory.
Stream Processing
Real-time sampling of network traffic, social media feeds, IoT sensor data, and financial transaction streams for monitoring and analysis.
Gaming & Randomization
Fair random selection in games, lottery systems, survey sampling, and any scenario requiring unbiased random selection from unknown populations.
Search Engines
Sample web pages for indexing, select representative documents for search result diversity, and choose random pages for quality assessment.
Financial Systems
Risk assessment through transaction sampling, fraud detection pattern analysis, and regulatory compliance reporting from continuous transaction streams.
Healthcare Analytics
Sample patient records for clinical studies, monitor real-time vital signs data, and select representative cases for medical research and quality improvement.
Interview Success Tips
💡 Key Points to Mention
- Emphasize O(1) space complexity relative to stream size
- Highlight single-pass processing capability
- Explain uniform probability distribution guarantee
- Mention scalability for infinite streams
- Discuss real-world applications in your domain
🎯 Common Follow-up Questions
- How would you modify for weighted sampling?
- What if k is larger than the stream size?
- How to handle distributed streams?
- Can you prove the uniform distribution?
- What are the trade-offs vs. other sampling methods?
🚀 Pro Interview Tips
Begin with the basic algorithm, then discuss optimizations and edge cases.
Visualize the reservoir and probability calculations to demonstrate understanding.
Write readable code with clear variable names and handle edge cases gracefully.
Discuss test cases including empty streams, k=1, and k > stream size scenarios.
Related Algorithm Guides
Explore more algorithm interview guides powered by AI coaching
Related Algorithm Resources
🔗 Related Algorithm & Interview Topics
Master these complementary algorithms and concepts for comprehensive interview preparation:
String & Pattern Algorithms
- KMP Algorithm Guide - Pattern matching techniques
- Floyd-Warshall Algorithm - Graph shortest paths
Design & Architecture
- OOP Design Patterns - Software architecture principles
- System Design Questions - Large-scale architecture
Advanced Topics
- Machine Learning Guide - ML concepts and algorithms
- Functional Programming - Modern paradigms