Data Streams Algorithm

Reservoir Sampling: Random Selection from Data Streams

Master the essential randomized algorithm for randomly selecting k items from unlimited data streams. Reservoir sampling is a family of randomized algorithms designed to handle streams of large or unknown size, making it perfect for interviews at Google, Amazon, and other tech giants.

🎯 What is Reservoir Sampling?

Reservoir sampling is a family of randomized algorithms specifically designed to solve the challenge of selecting a uniform random sample from a data stream of large or unknown number of elements. This sophisticated randomized algorithm maintains a "reservoir" containing exactly k items, where each element in the stream has an equal probability of ending up in the final sample, regardless of the stream's total size.

The core challenge this powerful randomized algorithm solves is: How do you fairly select k items from a stream when you don't know the total size? Traditional random sampling methods require knowing the population size to generate a random number within the appropriate range, but streams of large or unknown number make this approach impossible.

The beauty of this randomized algorithm approach is that each new item has a calculated probability of replacing one of the items in the reservoir, ensuring perfect statistical fairness regardless of whether the stream contains a large or unknown number of elements.

Select k Items from a Stream

Step-by-step explanation: This implementation demonstrates the classic Reservoir Sampling randomized algorithm (Algorithm R). It maintains a "reservoir" array of exactly k items and processes each incoming stream element from data streams of large or unknown size, intelligently deciding whether to keep it based on calculated probabilities.

function reservoirSample(stream, k) { const reservoir = []; let i = 0; for (const item of stream) { if (i < k) { reservoir.push(item); } else { const j = Math.floor(Math.random() * (i + 1)); if (j < k) { reservoir[j] = item; } } i++; } return reservoir; }
💡 AI Coach Insight: The key insight of this randomized algorithm is in the probability calculation! When we encounter the i-th item (where i ≥ k), we give it a k/i chance of replacing an existing reservoir item. This mathematical elegance of the family of randomized algorithms ensures perfect fairness: after processing n total items from streams of large or unknown size, every single item has exactly k/n probability of being selected. Beautiful, right?

Algorithm Deep Dive

🎯 How It Works (Simple Explanation)

Think of it like this: you're trying to pick k random people from a never-ending line, but you can only remember k names at a time!

  • Step 1: Keep the first k people (they're guaranteed spots)
  • Step 2: For each new person, flip a weighted coin to decide if they deserve a spot
  • Step 3: If they win, randomly kick out someone already selected
  • Magic Result: Everyone gets a perfectly fair chance, no matter when they arrived!

⚡ Complexity Analysis

Time Complexity: O(n) - single pass through stream

Space Complexity: O(k) - only store k items

Stream Size: Unknown/infinite - works with any size

Randomness: Uniform distribution guaranteed

🎨 Algorithm Visualization

Watch how reservoir sampling maintains fairness as the stream grows

Reservoir Sampling: k=3 Example Incoming Stream: A B C D E F ... Reservoir (k=3) A B C Algorithm Steps Phase 1: Fill Reservoir • First k=3 elements • Go directly into reservoir • No probability calculation Phase 2: Random Replace • For element i > k • Include with P = k/i • Replace random slot Result: Perfect Fairness • Each element has equal • probability k/n of selection • Uniform distribution Probability Examples Element D (i=4): P = 3/4 = 75% Element E (i=5): P = 3/5 = 60% Element F (i=6): P = 3/6 = 50% Element 100 (i=100): P = 3/100 = 3% Probability decreases as stream grows Core Formula P(include element i) = k / i

🧮 Mathematical Foundation & Proof

The Key Equation

P(include i-th element) = k / i

Where k is the reservoir size and i is the current element's position in the stream. 1 This elegant formula ensures that as the stream progresses, each element has a smaller chance of being included, but every element up to the current point maintains equal probability of selection.

Mathematical Proof by Induction

Base Case (i ≤ k):

The first k elements are automatically included with probability 1. Each has probability k/k = 1 of being selected initially.

Inductive Step (i > k):

Assume after processing (i-1) elements, each element has probability k/(i-1) of being in the reservoir.

When element i arrives:

• Probability of including element i: k/i

• If included, it replaces a random existing element

• Probability any existing element gets replaced: (k/i) × (1/k) = 1/i

Therefore, probability any existing element remains = 1 - 1/i = (i-1)/i

Combined probability = [k/(i-1)] × [(i-1)/i] = k/i ✓

🎯 Result: After processing n elements, every element has exactly k/n probability of being selected!

Intuitive Understanding

🎯 Perfect Fairness

Every element gets equal treatment regardless of when it arrives in the stream

⚡ Efficiency

O(k) space complexity, O(1) per-element processing time

🌊 Stream-Friendly

Works with infinite streams without knowing total size

🐍 Complete Python Implementation

Basic Reservoir Sampling Implementation

import random

def reservoir_sampling(stream, k):
    """
    Reservoir Sampling Algorithm (Algorithm R)
    
    Args:
        stream: Iterable of elements to sample from
        k: Number of elements to sample
    
    Returns:
        List of k randomly sampled elements
    """
    reservoir = []
    
    for i, element in enumerate(stream):
        if i < k:
            # Fill reservoir with first k elements
            reservoir.append(element)
        else:
            # Replace elements with probability k/i
            j = random.randint(0, i)
            if j < k:
                reservoir[j] = element
    
    return reservoir

# Example usage
stream = range(1, 1001)  # Stream of numbers 1-1000
k = 10
sample = reservoir_sampling(stream, k)
print("Random sample from stream:", sample)

Advanced Implementation with Visualization 1

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from collections import defaultdict

sns.set_context('poster')

def plot_reservoir_sampling(stream, k, iterations=1000):
    """
    Visualize the fairness of reservoir sampling
    
    Args:
        stream: Range of elements to sample from
        k: Reservoir size
        iterations: Number of sampling iterations for statistics
    """
    counts = [0] * len(stream)
    
    # Run multiple iterations to gather statistics
    for _ in range(iterations):
        sample = reservoir_sampling(stream, k)
        for elem in sample:
            counts[elem - 1] += 1
    
    # Create visualization
    plt.figure(figsize=(16, 6))
    plt.plot(stream, counts, linewidth=2, color='#3b82f6')
    plt.xlabel('Element in Stream', fontsize=14)
    plt.ylabel('Selection Frequency', fontsize=14)
    plt.title(f'Reservoir Sampling Fairness Test (k={k}, iterations={iterations})', fontsize=16)
    plt.grid(True, alpha=0.3)
    
    # Add expected frequency line
    expected_freq = iterations * k / len(stream)
    plt.axhline(y=expected_freq, color='red', linestyle='--', 
                label=f'Expected frequency: {expected_freq:.1f}')
    plt.legend()
    plt.tight_layout()
    plt.show()

# Test fairness with visualization
stream = range(1, 101)  # Stream of 100 elements
plot_reservoir_sampling(stream, 10)

Object-Oriented Reservoir Sampler

from dataclasses import dataclass
import typing as t
import numpy as np

A = t.TypeVar("A")

@dataclass
class Reservoir:
    """
    Object-oriented reservoir sampler for streaming data
    
    Attributes:
        k: Reservoir size
        seed: Random seed for reproducibility
    """
    k: int
    seed: t.Optional[int] = None
    
    def __post_init__(self):
        # Initialize state
        self.i = 0  # Number of items processed
        self.sample: t.List[A] = []
        
        if self.seed is not None:
            assert self.seed > 0
    
    def add(self, item: A) -> None:
        """
        Add a new item to the stream and update reservoir
        
        Args:
            item: New item from the stream
        """
        self.i += 1
        
        # Set seed for reproducibility
        if self.seed is not None:
            np.random.seed(self.i * self.seed)
        
        if len(self.sample) < self.k:
            # Fill reservoir with first k elements
            self.sample.append(item)
        else:
            # Replace with probability k/i
            if np.random.rand() <= self.k / self.i:
                idx = np.random.choice(self.k)
                self.sample[idx] = item
    
    def get_sample(self) -> t.List[A]:
        """
        Get current reservoir sample
        
        Returns:
            Current sample in the reservoir
        """
        return self.sample.copy()
    
    def get_stats(self) -> dict:
        """
        Get sampling statistics
        
        Returns:
            Dictionary with sampling statistics
        """
        return {
            'items_processed': self.i,
            'reservoir_size': len(self.sample),
            'target_size': self.k,
            'current_probability': self.k / max(self.i, 1)
        }

# Example usage
reservoir = Reservoir(k=5, seed=42)

# Process stream elements
for x in range(20):
    reservoir.add(x)
    if x % 5 == 4:  # Print every 5 elements
        print(f"After {x+1} elements: {reservoir.get_sample()}")
        print(f"Stats: {reservoir.get_stats()}")
        print()

Statistical Validation Test

import collections

def test_reservoir_fairness(stream_size=6, k=5, iterations=100000):
    """
    Test the fairness of reservoir sampling algorithm
    
    Args:
        stream_size: Size of the stream to test
        k: Reservoir size
        iterations: Number of test iterations
    
    Returns:
        Dictionary showing probability distribution
    """
    cnt = collections.defaultdict(int)
    
    for idx in range(iterations):
        r = Reservoir(k=k, seed=idx + 1)
        for x in range(stream_size):
            r.add(x)
        cnt[frozenset(r.get_sample())] += 1
    
    # Convert to probabilities
    probabilities = {k: v / iterations for k, v in cnt.items()}
    
    return probabilities

# Test with small example: select 5 from 6 elements
results = test_reservoir_fairness(stream_size=6, k=5, iterations=100000)

print("Probability distribution for selecting 5 from 6 elements:")
for subset, prob in sorted(results.items()):
    print(f"{sorted(list(subset))}: {prob:.4f}")

# Expected: each subset should have probability ≈ 1/6 = 0.1667
expected_prob = 1/6
print(f"\nExpected probability: {expected_prob:.4f}")
print(f"Actual probabilities range: {min(results.values()):.4f} - {max(results.values()):.4f}")

✅ Expected Output:

Each possible subset of 5 elements should appear with probability ≈ 0.1667 (1/6), demonstrating perfect fairness of the algorithm.

🤖 Machine Learning & Data Science Applications

🎯 Online Learning & Model Training

Reservoir sampling is crucial for training ML models on streaming data where you can't store the entire dataset in memory.

from sklearn.linear_model import SGDClassifier
import numpy as np

class OnlineLearner:
    def __init__(self, reservoir_size=1000, model_update_freq=100):
        self.reservoir = Reservoir(k=reservoir_size)
        self.model = SGDClassifier()
        self.model_update_freq = model_update_freq
        self.samples_seen = 0
        self.is_fitted = False
    
    def process_sample(self, X, y):
        """Process a new training sample from the stream"""
        self.reservoir.add((X, y))
        self.samples_seen += 1
        
        # Retrain model periodically
        if self.samples_seen % self.model_update_freq == 0:
            self._retrain_model()
    
    def _retrain_model(self):
        """Retrain model on current reservoir sample"""
        if len(self.reservoir.get_sample()) < 10:
            return
        
        # Extract features and labels from reservoir
        samples = self.reservoir.get_sample()
        X_batch = np.array([sample[0] for sample in samples])
        y_batch = np.array([sample[1] for sample in samples])
        
        if not self.is_fitted:
            self.model.fit(X_batch, y_batch)
            self.is_fitted = True
        else:
            # Partial fit for online learning
            self.model.partial_fit(X_batch, y_batch)
    
    def predict(self, X):
        """Make prediction using current model"""
        if not self.is_fitted:
            return None
        return self.model.predict([X])[0]

💡 Key Benefits:

  • Maintains representative sample for model training
  • Handles concept drift in streaming data
  • Memory-efficient for large-scale applications

🔍 Dynamic Feature Selection

Use reservoir sampling to maintain a diverse set of features for high-dimensional streaming data analysis.

class FeatureSelector:
    def __init__(self, max_features=100):
        self.feature_reservoir = Reservoir(k=max_features)
        self.feature_importance = {}
    
    def update_features(self, new_features, importance_scores):
        """Update feature set based on streaming importance scores"""
        for feature, score in zip(new_features, importance_scores):
            # Weight by importance for better selection
            weighted_feature = (feature, score)
            self.feature_reservoir.add(weighted_feature)
            self.feature_importance[feature] = score
    
    def get_selected_features(self):
        """Get current set of selected features"""
        selected = self.feature_reservoir.get_sample()
        return [feature for feature, _ in selected]
    
    def get_feature_matrix(self, data):
        """Extract selected features from data"""
        selected_features = self.get_selected_features()
        return data[:, selected_features]

📊 A/B Testing & Experimentation

Maintain representative samples for statistical analysis in streaming A/B tests.

import scipy.stats as stats

class ABTestAnalyzer:
    def __init__(self, sample_size=1000):
        self.control_reservoir = Reservoir(k=sample_size)
        self.treatment_reservoir = Reservoir(k=sample_size)
    
    def add_observation(self, group, outcome):
        """Add new observation to appropriate group"""
        if group == 'control':
            self.control_reservoir.add(outcome)
        elif group == 'treatment':
            self.treatment_reservoir.add(outcome)
    
    def get_statistical_power(self):
        """Calculate current statistical power of the test"""
        control_data = self.control_reservoir.get_sample()
        treatment_data = self.treatment_reservoir.get_sample()
        
        if len(control_data) < 30 or len(treatment_data) < 30:
            return None
        
        # Perform t-test
        t_stat, p_value = stats.ttest_ind(control_data, treatment_data)
        
        return {
            'control_mean': np.mean(control_data),
            'treatment_mean': np.mean(treatment_data),
            'p_value': p_value,
            'significant': p_value < 0.05,
            'sample_sizes': (len(control_data), len(treatment_data))
        }

⚡ Real-time Data Pipeline Sampling

Integrate reservoir sampling into data pipelines for monitoring and quality assurance.

class DataQualityMonitor:
    def __init__(self, sample_size=500):
        self.data_reservoir = Reservoir(k=sample_size)
        self.quality_metrics = {
            'null_rate': [],
            'outlier_rate': [],
            'schema_violations': []
        }
    
    def process_record(self, record):
        """Process incoming data record"""
        self.data_reservoir.add(record)
        
        # Update quality metrics
        self._update_quality_metrics(record)
    
    def _update_quality_metrics(self, record):
        """Update data quality metrics"""
        # Calculate null rate
        null_count = sum(1 for v in record.values() if v is None)
        null_rate = null_count / len(record)
        
        # Detect outliers (simplified)
        numeric_values = [v for v in record.values() if isinstance(v, (int, float))]
        outlier_rate = self._detect_outliers(numeric_values)
        
        # Update running metrics
        self.quality_metrics['null_rate'].append(null_rate)
        self.quality_metrics['outlier_rate'].append(outlier_rate)
    
    def get_quality_report(self):
        """Generate data quality report from sample"""
        sample = self.data_reservoir.get_sample()
        
        return {
            'sample_size': len(sample),
            'avg_null_rate': np.mean(self.quality_metrics['null_rate']),
            'avg_outlier_rate': np.mean(self.quality_metrics['outlier_rate']),
            'data_freshness': self._calculate_freshness(sample)
        }

🚀 Production Use Cases:

  • Recommendation Systems: Sample user interactions for model updates
  • Fraud Detection: Maintain diverse transaction samples
  • Log Analysis: Sample log entries for anomaly detection
  • IoT Data Processing: Sample sensor readings for trend analysis

🌍 Traditional Applications

📊

Data Analytics

Sample large datasets for statistical analysis, A/B testing, and machine learning model training without loading entire datasets into memory.

🌊

Stream Processing

Real-time sampling of network traffic, social media feeds, IoT sensor data, and financial transaction streams for monitoring and analysis.

🎮

Gaming & Randomization

Fair random selection in games, lottery systems, survey sampling, and any scenario requiring unbiased random selection from unknown populations.

🔍

Search Engines

Sample web pages for indexing, select representative documents for search result diversity, and choose random pages for quality assessment.

💰

Financial Systems

Risk assessment through transaction sampling, fraud detection pattern analysis, and regulatory compliance reporting from continuous transaction streams.

🏥

Healthcare Analytics

Sample patient records for clinical studies, monitor real-time vital signs data, and select representative cases for medical research and quality improvement.

Interview Success Tips

💡 Key Points to Mention

  • Emphasize O(1) space complexity relative to stream size
  • Highlight single-pass processing capability
  • Explain uniform probability distribution guarantee
  • Mention scalability for infinite streams
  • Discuss real-world applications in your domain

🎯 Common Follow-up Questions

  • How would you modify for weighted sampling?
  • What if k is larger than the stream size?
  • How to handle distributed streams?
  • Can you prove the uniform distribution?
  • What are the trade-offs vs. other sampling methods?

🚀 Pro Interview Tips

Start Simple:

Begin with the basic algorithm, then discuss optimizations and edge cases.

Draw Diagrams:

Visualize the reservoir and probability calculations to demonstrate understanding.

Code Cleanly:

Write readable code with clear variable names and handle edge cases gracefully.

Test Thoroughly:

Discuss test cases including empty streams, k=1, and k > stream size scenarios.

Related Algorithm Guides

Explore more algorithm interview guides powered by AI coaching

Interview Salary Negotiation Techniques
AI-powered interview preparation guide
Compensation Discussion Interview Preparation
AI-powered interview preparation guide
Slide Deck Presentation Interview Tips
AI-powered interview preparation guide
Logistics Supply Chain Analyst Interview Questions
AI-powered interview preparation guide

Related Algorithm Resources

All Interview Solutions
Browse our complete collection of AI-powered interview preparation guides.
GeeksforGeeks Algorithms
Comprehensive algorithm tutorials and practice problems.
LeetCode Practice
Algorithm coding challenges and interview preparation.
Algorithm Visualizations
Interactive visualizations for understanding algorithms.

🔗 Related Algorithm & Interview Topics

Master these complementary algorithms and concepts for comprehensive interview preparation:

String & Pattern Algorithms

Design & Architecture

Advanced Topics