Theoretical Models for LSM Tree Compaction Optimization: A Mathematical Analysis

1. Introduction: The Compaction Challenge in LSM Tree Systems

Log-Structured Merge Trees (LSM trees) have become a prevalent data structure for modern storage systems due to their ability to optimize write performance by converting random writes into sequential ones. However, this advantage comes with a fundamental trade-off: as deletions and updates accumulate, the system requires periodic compaction operations to maintain read performance and space efficiency. These compactions can grow prohibitively large, causing significant performance degradation that disrupts normal system operation.

This essay presents a rigorous theoretical framework for modeling and optimizing compaction processes in LSM tree systems, with particular attention to preventing excessive compaction sizes while maintaining system efficiency. We develop formal models for compaction triggering conditions, size impact assessment, and incremental compaction strategies, providing mathematical foundations for each approach.

2. Fundamental Definitions and Notation

We begin by establishing the mathematical notation and core definitions that will serve as the foundation for our analysis.

Definition 2.1 (LSM Tree Structure): An LSM tree consists of $L$ levels $\{L_0, L_1, ..., L_L\}$, where each level $L_i$ contains a set of sorted string tables (SSTables). The size of level $L_i$ is denoted by $S_i$, with the constraint that $S_{i+1} = k \cdot S_i$ for some growth factor $k > 1$.

Definition 2.2 (Valid Data and Tombstones): For a level $L_i$, we denote:

  • $V_i$: The volume of valid (non-deleted) data in bytes
  • $T_i$: The volume of tombstone data in bytes
  • Thus, $S_i = V_i + T_i + O_i$, where $O_i$ represents other metadata overhead

Definition 2.3 (Deletion Ratio): The deletion ratio of level $L_i$ is defined as:
$$D_r(L_i) = \frac{T_i}{S_i}$$

Definition 2.4 (Compaction Operation): A compaction operation $C(L_i, L_j)$ merges data from levels $L_i$ and $L_j$ (typically where $j = i+1$), removing duplicate keys and obsolete values marked by tombstones.

The following code demonstrates these fundamental structures:

from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional
import numpy as np


@dataclass
class LSMLevel:
    level_id: int
    size_bytes: int              # S_i
    valid_data_bytes: int        # V_i
    tombstone_bytes: int         # T_i
    metadata_bytes: int          # O_i
    
    @property
    def deletion_ratio(self) -> float:
        """Calculate the deletion ratio D_r(L_i)"""
        return self.tombstone_bytes / self.size_bytes if self.size_bytes > 0 else 0.0


class LSMTree:
    def __init__(self, num_levels: int, growth_factor: float = 10.0):
        """Initialize an LSM tree with specified number of levels and growth factor"""
        self.levels: List[LSMLevel] = []
        self.num_levels = num_levels
        self.growth_factor = growth_factor
        
        # Initialize empty levels
        for i in range(num_levels):
            target_size = 1000000 * (growth_factor ** i)  # Example base size of ~1MB
            self.levels.append(LSMLevel(
                level_id=i,
                size_bytes=0,
                valid_data_bytes=0,
                tombstone_bytes=0,
                metadata_bytes=0
            ))

3. Cost-Benefit Model for Compaction Decisions

We now establish a formal cost-benefit model to determine optimal compaction timing.

Definition 3.1 (Compaction Costs): The costs associated with a compaction operation $C(L_i, L_{i+1})$ are defined as:
$$C_{compaction}(L_i, L_{i+1}) = C_{IO} \cdot W_a \cdot (S_i + S_{i+1}) + C_{resources} \cdot (S_i + S_{i+1})$$
where:

  • $C_{IO}$ is the IO cost per byte
  • $W_a$ is the write amplification factor
  • $C_{resources}$ represents the cost coefficient for system resources

Definition 3.2 (Deferral Costs): The costs associated with deferring compaction include:
$$C_{defer}(L_i, L_{i+1}) = C_{space}(L_i, L_{i+1}) + C_{read}(L_i, L_{i+1})$$
where:

  • $C_{space}(L_i, L_{i+1}) = \alpha_s \cdot (T_i + T_{i+1})$ represents the cost of wasted space
  • $C_{read}(L_i, L_{i+1}) = \alpha_r \cdot (D_r(L_i) \cdot Q(L_i) \cdot P(L_i) + D_r(L_{i+1}) \cdot Q(L_{i+1}) \cdot P(L_{i+1}))$ represents the cost of degraded read performance

Theorem 3.1 (Optimal Compaction Timing): Under the cost-benefit model, the optimal time to trigger compaction for levels $L_i$ and $L_{i+1}$ is when:
$$C_{defer}(L_i, L_{i+1}) > C_{compaction}(L_i, L_{i+1})$$

Proof: The decision to compact at time $t$ versus deferring to time $t + \Delta t$ is optimal when the cost of deferral exceeds the cost of immediate compaction. This creates a threshold condition where deferring compaction would incur higher total costs than performing it immediately. Therefore, the optimal timing is precisely when $C_{defer}(L_i, L_{i+1}) = C_{compaction}(L_i, L_{i+1})$, and any deferral beyond this point increases total cost. ■

Corollary 3.1 (Deletion Ratio Threshold): From Theorem 3.1, we can derive a deletion ratio threshold that triggers compaction:
$$D_r(L_i) > \frac{C_{compaction}(L_i, L_{i+1}) - \alpha_s \cdot (T_i + T_{i+1})}{\alpha_r \cdot Q(L_i) \cdot P(L_i) + \alpha_r \cdot Q(L_{i+1}) \cdot P(L_{i+1})}$$

The following code implements this cost-benefit model:

def calculate_compaction_cost(level_i: LSMLevel, 
                             level_j: LSMLevel, 
                             io_cost_per_byte: float, 
                             write_amp_factor: float, 
                             resource_cost_coef: float) -> float:
    """
    Calculate the cost of performing compaction between two levels
    
    Args:
        level_i: The first level (typically i)
        level_j: The second level (typically i+1)
        io_cost_per_byte: Cost coefficient for IO operations
        write_amp_factor: Write amplification factor
        resource_cost_coef: Cost coefficient for system resources
    
    Returns:
        The total cost of compaction
    """
    total_size = level_i.size_bytes + level_j.size_bytes
    io_cost = io_cost_per_byte * write_amp_factor * total_size
    resource_cost = resource_cost_coef * total_size
    
    return io_cost + resource_cost


def calculate_deferral_cost(level_i: LSMLevel, 
                           level_j: LSMLevel,
                           alpha_space: float,
                           alpha_read: float,
                           query_freq_i: float,
                           query_freq_j: float,
                           read_penalty_i: float,
                           read_penalty_j: float) -> float:
    """
    Calculate the cost of deferring compaction between two levels
    
    Args:
        level_i: The first level (typically i)
        level_j: The second level (typically i+1)
        alpha_space: Weight coefficient for space costs
        alpha_read: Weight coefficient for read performance costs
        query_freq_i: Query frequency for level i (Q(L_i))
        query_freq_j: Query frequency for level j (Q(L_j))
        read_penalty_i: Read performance penalty for level i (P(L_i))
        read_penalty_j: Read performance penalty for level j (P(L_j))
        
    Returns:
        The total cost of deferring compaction
    """
    # Space waste cost
    space_cost = alpha_space * (level_i.tombstone_bytes + level_j.tombstone_bytes)
    
    # Read performance degradation cost
    read_cost_i = level_i.deletion_ratio * query_freq_i * read_penalty_i
    read_cost_j = level_j.deletion_ratio * query_freq_j * read_penalty_j
    read_cost = alpha_read * (read_cost_i + read_cost_j)
    
    return space_cost + read_cost


def should_trigger_compaction(level_i: LSMLevel, 
                             level_j: LSMLevel,
                             system_params: Dict[str, float]) -> bool:
    """
    Determine if compaction should be triggered based on cost-benefit analysis
    
    Args:
        level_i: The first level (typically i)
        level_j: The second level (typically i+1)
        system_params: Dictionary of system parameters
        
    Returns:
        True if compaction should be triggered, False otherwise
    """
    defer_cost = calculate_deferral_cost(
        level_i, level_j,
        system_params['alpha_space'],
        system_params['alpha_read'],
        system_params['query_freq_i'],
        system_params['query_freq_j'],
        system_params['read_penalty_i'],
        system_params['read_penalty_j']
    )
    
    compaction_cost = calculate_compaction_cost(
        level_i, level_j,
        system_params['io_cost_per_byte'],
        system_params['write_amp_factor'],
        system_params['resource_cost_coef']
    )
    
    return defer_cost > compaction_cost

4. Performance Impact Modeling of Compaction Size

We now establish a mathematical model for the performance impact of compaction operations based on their size.

Definition 4.1 (Performance Impact Function): The performance impact of a compaction operation of size $S_c$ with available system resources $R$ is defined as:
$$P(S_c, R) = \alpha \cdot \frac{S_c}{R \cdot C_{max}} \cdot \left(1 + \beta \cdot e^{(S_c - \gamma \cdot R \cdot C_{max})}\right)$$
where:

  • $C_{max}$ is the maximum compaction size the system can handle without significant degradation
  • $\alpha$ is a scaling factor for linear impact
  • $\beta$ is a scaling factor for exponential impact when compaction exceeds threshold
  • $\gamma$ is a threshold coefficient (typically $0 < \gamma < 1$)

Theorem 4.1 (Non-linear Scaling of Performance Impact): The performance impact function $P(S_c, R)$ exhibits the following properties:

  1. For $S_c \ll \gamma \cdot R \cdot C_{max}$, $P(S_c, R) \approx \alpha \cdot \frac{S_c}{R \cdot C_{max}}$ (linear growth)
  2. For $S_c > \gamma \cdot R \cdot C_{max}$, $P(S_c, R)$ grows exponentially

Proof: For small values of $S_c$ relative to $\gamma \cdot R \cdot C_{max}$, the term $e^{(S_c - \gamma \cdot R \cdot C_{max})}$ becomes negligibly small, making the exponential term's contribution minimal. As $S_c$ approaches and exceeds $\gamma \cdot R \cdot C_{max}$, the exponential term dominates, causing the function to grow at a super-linear rate. Formally, taking the derivative of $P$ with respect to $S_c$ shows that the rate of change itself increases dramatically once $S_c$ exceeds the threshold. ■

Definition 4.2 (Critical Compaction Size): The critical compaction size $S_{critical}$ is defined as the size at which performance degradation becomes unacceptable:
$$S_{critical} = \gamma \cdot R \cdot C_{max}$$

Here's the implementation of the performance impact model:

def calculate_performance_impact(compaction_size: int, 
                                available_resources: float, 
                                c_max: int,
                                alpha: float = 1.0,
                                beta: float = 2.0,
                                gamma: float = 0.5) -> float:
    """
    Calculate the performance impact of a compaction operation
    
    Args:
        compaction_size: Size of the compaction operation in bytes (S_c)
        available_resources: Available system resources normalized to [0,1] (R)
        c_max: Maximum compaction size the system can handle
        alpha: Scaling factor for linear impact
        beta: Scaling factor for exponential impact
        gamma: Threshold coefficient (0 < gamma < 1)
        
    Returns:
        The performance impact value
    """
    # Handle edge case to prevent division by zero
    if available_resources == 0:
        return float('inf')
        
    # Calculate normalized size
    normalized_size = compaction_size / (available_resources * c_max)
    
    # Calculate exponential term
    threshold = gamma * available_resources * c_max
    exp_term = beta * np.exp(compaction_size - threshold)
    
    # Calculate impact
    impact = alpha * normalized_size * (1 + exp_term)
    
    return impact


def get_critical_compaction_size(available_resources: float,
                                c_max: int,
                                gamma: float = 0.5) -> int:
    """
    Calculate the critical compaction size threshold
    
    Args:
        available_resources: Available system resources normalized to [0,1]
        c_max: Maximum compaction size the system can handle
        gamma: Threshold coefficient (0 < gamma < 1)
        
    Returns:
        The critical size threshold in bytes
    """
    return int(gamma * available_resources * c_max)

5. Compaction Growth Projection and Prediction

To prevent oversized compactions, we develop a formal model for projecting compaction size growth.

Definition 5.1 (Level Growth Functions): The growth of level $L_i$ over time is modeled as:
$$L_i(t + \Delta t) = L_i(t) + \int_{t}^{t+\Delta t} [\lambda_{insert}(s) - \lambda_{compact}(s)] ds$$
$$T_i(t + \Delta t) = T_i(t) + \int_{t}^{t+\Delta t} [\lambda_{delete}(s) - \lambda_{compact\_tombstone}(s)] ds$$

Where the $\lambda$ terms represent the rates of various operations.

Theorem 5.1 (Predictive Compaction Sizing): Given current growth rates, the expected size of a compaction operation between levels $L_i$ and $L_{i+1}$ at time $t + \Delta t$ is:
$$S_c(t + \Delta t) = (V_i(t + \Delta t) + T_i(t + \Delta t)) + (V_{i+1}(t + \Delta t) + T_{i+1}(t + \Delta t))$$

Proof: By Definition 2.4, a compaction operation involves all data from both levels. Since $S_i = V_i + T_i$ for each level, the total size of data involved in compaction is the sum of these components across both levels at the time of compaction. ■

Definition 5.2 (Time to Critical Size): The time until a compaction operation would reach critical size is defined as:
$$t_{critical} = \min \{t > t_{current} \mid S_c(t) > S_{critical}\}$$

The following code implements the growth projection model:

from typing import Callable, Tuple
import numpy as np
from scipy.integrate import quad


def project_level_growth(current_level: LSMLevel,
                        t_current: float,
                        t_future: float,
                        insert_rate_func: Callable[[float], float],
                        compact_rate_func: Callable[[float], float],
                        delete_rate_func: Callable[[float], float],
                        compact_tombstone_rate_func: Callable[[float], float]) -> LSMLevel:
    """
    Project the growth of a level over time
    
    Args:
        current_level: Current state of the level
        t_current: Current time
        t_future: Future time to project to
        insert_rate_func: Function that gives insertion rate at time t
        compact_rate_func: Function that gives compaction rate at time t
        delete_rate_func: Function that gives deletion rate at time t
        compact_tombstone_rate_func: Function that gives tombstone compaction rate at time t
        
    Returns:
        Projected level state at future time
    """
    # Calculate the integral of insertion rate minus compaction rate
    data_growth, _ = quad(lambda s: insert_rate_func(s) - compact_rate_func(s), 
                         t_current, t_future)
    
    # Calculate the integral of deletion rate minus tombstone compaction rate
    tombstone_growth, _ = quad(lambda s: delete_rate_func(s) - compact_tombstone_rate_func(s), 
                              t_current, t_future)
    
    # Create projected level
    projected_level = LSMLevel(
        level_id=current_level.level_id,
        size_bytes=current_level.size_bytes + int(data_growth) + int(tombstone_growth),
        valid_data_bytes=current_level.valid_data_bytes + int(data_growth),
        tombstone_bytes=current_level.tombstone_bytes + int(tombstone_growth),
        metadata_bytes=current_level.metadata_bytes  # Assuming metadata size stays constant
    )
    
    return projected_level


def predict_compaction_size(level_i: LSMLevel,
                           level_j: LSMLevel,
                           t_current: float,
                           t_future: float,
                           growth_model_i: Dict[str, Callable[[float], float]],
                           growth_model_j: Dict[str, Callable[[float], float]]) -> int:
    """
    Predict the size of a compaction operation at a future time
    
    Args:
        level_i: Current state of first level
        level_j: Current state of second level
        t_current: Current time
        t_future: Future time to predict for
        growth_model_i: Growth rate functions for level i
        growth_model_j: Growth rate functions for level j
        
    Returns:
        Predicted compaction size in bytes
    """
    # Project growth for level i
    projected_i = project_level_growth(
        level_i, t_current, t_future,
        growth_model_i['insert_rate'],
        growth_model_i['compact_rate'],
        growth_model_i['delete_rate'],
        growth_model_i['compact_tombstone_rate']
    )
    
    # Project growth for level j
    projected_j = project_level_growth(
        level_j, t_current, t_future,
        growth_model_j['insert_rate'],
        growth_model_j['compact_rate'],
        growth_model_j['delete_rate'],
        growth_model_j['compact_tombstone_rate']
    )
    
    # Calculate combined size of projected levels
    compaction_size = (projected_i.valid_data_bytes + projected_i.tombstone_bytes +
                      projected_j.valid_data_bytes + projected_j.tombstone_bytes)
    
    return compaction_size


def time_to_critical_size(level_i: LSMLevel,
                         level_j: LSMLevel,
                         t_current: float,
                         s_critical: int,
                         growth_model_i: Dict[str, Callable[[float], float]],
                         growth_model_j: Dict[str, Callable[[float], float]],
                         max_horizon: float = 30 * 24 * 3600) -> Optional[float]:
    """
    Calculate the time until compaction size reaches critical threshold
    
    Args:
        level_i: Current state of first level
        level_j: Current state of second level
        t_current: Current time
        s_critical: Critical size threshold
        growth_model_i: Growth rate functions for level i
        growth_model_j: Growth rate functions for level j
        max_horizon: Maximum time horizon to check (default: 30 days in seconds)
        
    Returns:
        Time until critical size is reached, or None if not reached within horizon
    """
    # Check if already at critical size
    current_size = (level_i.valid_data_bytes + level_i.tombstone_bytes + 
                   level_j.valid_data_bytes + level_j.tombstone_bytes)
    
    if current_size >= s_critical:
        return 0.0
    
    # Binary search to find t_critical
    t_min = t_current
    t_max = t_current + max_horizon
    
    while t_max - t_min > 3600:  # 1-hour precision
        t_mid = (t_min + t_max) / 2
        
        projected_size = predict_compaction_size(
            level_i, level_j, t_current, t_mid,
            growth_model_i, growth_model_j
        )
        
        if projected_size >= s_critical:
            t_max = t_mid
        else:
            t_min = t_mid
    
    # Final check - did we find a time when critical size is reached?
    if t_min >= t_current + max_horizon - 3600:
        return None  # Not reached within horizon
    
    return t_max

6. Incremental Compaction Strategy

To address the challenge of oversized compactions, we formalize an incremental compaction strategy.

Definition 6.1 (Key Space Partitioning): Given a key space $\mathcal{K}$ requiring compaction, a partitioning $\mathcal{P} = \{\mathcal{K}_1, \mathcal{K}_2, ..., \mathcal{K}_n\}$ is valid if:

  1. $\cup_{j=1}^{n} \mathcal{K}_j = \mathcal{K}$ (completeness)
  2. $\mathcal{K}_i \cap \mathcal{K}_j = \emptyset$ for all $i \neq j$ (disjointness)

Definition 6.2 (Optimal Partitioning Problem): The optimal partitioning problem is defined as:
$$\min_{n, \mathcal{P}} n$$
$$\text{subject to } S_c(\mathcal{K}_j) \leq \theta \cdot S_{critical} \text{ for all } j \in \{1,2,...,n\}$$
Where $\theta$ is a target fraction of critical capacity (e.g., $\theta = 0.3$)

Theorem 6.1 (Existence of Valid Partitioning): For any key space $\mathcal{K}$ with finite size $S_c(\mathcal{K})$, there exists a valid partitioning $\mathcal{P}$ such that $S_c(\mathcal{K}_j) \leq \theta \cdot S_{critical}$ for all partitions $\mathcal{K}_j \in \mathcal{P}$.

Proof: We can construct a valid partitioning by iteratively dividing the key space. Let $\delta > 0$ be a sufficiently small size such that $\delta < \theta \cdot S_{critical}$. Since the key space is finite, we can divide it into regions of size at most $\delta$. In the worst case, each key becomes its own partition, but this guarantees that each partition's size is below the threshold. Since the partitions can be merged as long as their combined size remains below the threshold, we can construct a valid partitioning with finite $n$. ■

Theorem 6.2 (Greedy Partitioning Approximation): A greedy algorithm that scans the key space in order and creates breaks when the accumulated size reaches $\theta \cdot S_{critical}$ produces a partitioning with at most $\lceil \frac{S_c(\mathcal{K})}{\theta \cdot S_{critical}} \rceil$ partitions.

Proof: The greedy algorithm fills each partition to the maximum allowed size $\theta \cdot S_{critical}$ (except possibly the last partition). Therefore, the number of partitions required is at most $\lceil \frac{S_c(\mathcal{K})}{\theta \cdot S_{critical}} \rceil$. This is within a constant factor of the optimal solution, as any partitioning requires at least $\lceil \frac{S_c(\mathcal{K})}{S_{critical}} \rceil$ partitions. ■

The following code implements the incremental compaction strategy:

from typing import List, Dict, Tuple, Callable, Any
import bisect


@dataclass
class KeyRange:
    """Represents a range of keys in the key space"""
    start_key: bytes  # Inclusive
    end_key: bytes    # Exclusive
    estimated_size: int


def estimate_range_size(level_i: LSMLevel, 
                        level_j: LSMLevel, 
                        start_key: bytes, 
                        end_key: bytes,
                        get_size_estimate: Callable[[LSMLevel, bytes, bytes], int]) -> int:
    """
    Estimate the size of data in a key range across two levels
    
    Args:
        level_i: First level
        level_j: Second level
        start_key: Start key of range (inclusive)
        end_key: End key of range (exclusive)
        get_size_estimate: Function to estimate size within a key range for a level
        
    Returns:
        Estimated size in bytes
    """
    size_i = get_size_estimate(level_i, start_key, end_key)
    size_j = get_size_estimate(level_j, start_key, end_key)
    return size_i + size_j


def greedy_key_space_partitioning(level_i: LSMLevel,
                                 level_j: LSMLevel,
                                 key_boundaries: List[bytes],
                                 get_size_estimate: Callable[[LSMLevel, bytes, bytes], int],
                                 max_partition_size: int) -> List[KeyRange]:
    """
    Partition the key space using a greedy algorithm
    
    Args:
        level_i: First level
        level_j: Second level
        key_boundaries: List of potential partition boundaries (sorted)
        get_size_estimate: Function to estimate size within a key range
        max_partition_size: Maximum allowed size for a partition
        
    Returns:
        List of key ranges representing the partitioning
    """
    partitions: List[KeyRange] = []
    current_start = key_boundaries[0]
    current_size = 0
    
    for i in range(1, len(key_boundaries)):
        range_size = estimate_range_size(
            level_i, level_j,
            key_boundaries[i-1], key_boundaries[i],
            get_size_estimate
        )
        
        # If adding this range would exceed max size, create a new partition
        if current_size + range_size > max_partition_size and current_size > 0:
            partitions.append(KeyRange(
                start_key=current_start,
                end_key=key_boundaries[i-1],
                estimated_size=current_size
            ))
            current_start = key_boundaries[i-1]
            current_size = range_size
        else:
            current_size += range_size
    
    # Add the final partition
    if current_size > 0:
        partitions.append(KeyRange(
            start_key=current_start,
            end_key=key_boundaries[-1],
            estimated_size=current_size
        ))
    
    return partitions


def create_bounded_compaction_tasks(level_i: LSMLevel,
                                   level_j: LSMLevel,
                                   s_critical: int,
                                   theta: float = 0.3,
                                   get_size_estimate: Callable[[LSMLevel, bytes, bytes], int] = None,
                                   get_key_boundaries: Callable[[LSMLevel, LSMLevel], List[bytes]] = None) -> List[KeyRange]:
    """
    Create bounded compaction tasks by partitioning the key space
    
    Args:
        level_i: First level
        level_j: Second level
        s_critical: Critical size threshold
        theta: Target fraction of critical capacity
        get_size_estimate: Function to estimate size within a key range
        get_key_boundaries: Function to get potential key boundaries
        
    Returns:
        List of key ranges for bounded compaction tasks
    """
    # Get key boundaries (this would typically come from the LSM tree implementation)
    if get_key_boundaries is None:
        # Placeholder for actual implementation
        raise ValueError("get_key_boundaries function must be provided")
    
    key_boundaries = get_key_boundaries(level_i, level_j)
    max_partition_size = int(theta * s_critical)
    
    # Use greedy algorithm to partition key space
    partitions = greedy_key_space_partitioning(
        level_i, level_j,
        key_boundaries,
        get_size_estimate,
        max_partition_size
    )
    
    return partitions

7. Dynamic Resource-Aware Scheduling Model

We now formalize a scheduling model for compaction that accounts for system resource availability.

Definition 7.1 (Utility Function): The utility of scheduling a compaction of size $S_c$ at time $t$ is defined as:
$$U(t, S_c) = w_1 \cdot (1 - R(t)) - w_2 \cdot P(S_c, R(t)) - w_3 \cdot D(t)$$
Where:

  • $R(t)$ is the resource utilization at time $t$
  • $P(S_c, R(t))$ is the performance impact function
  • $D(t)$ is a delay penalty function
  • $w_1, w_2, w_3$ are weight parameters

Definition 7.2 (Optimal Scheduling Problem): The optimal scheduling problem is defined as:
$$t^* = \arg\max_{t \in [t_{current}, t_{deadline}]} U(t, S_c)$$
Where $t_{deadline}$ is the latest time compaction can be delayed before causing severe performance impact.

Theorem 7.1 (Existence of Optimal Schedule): Given a continuous utility function $U(t, S_c)$ and a closed interval $[t_{current}, t_{deadline}]$, there exists an optimal scheduling time $t^*$ that maximizes utility.

Proof: By the extreme value theorem, a continuous function on a closed interval attains its maximum value. Since $U(t, S_c)$ is continuous in $t$ (assuming continuous resource utilization and penalty functions), it attains its maximum at some point $t^* \in [t_{current}, t_{deadline}]$. ■

Definition 7.3 (Compaction Controller): The compaction controller function $C(t)$ is defined as:
$$C(t) = \begin{cases} \text{Schedule Compaction} & \text{if } U(t, S_c) > \tau \text{ and } S_c \leq \theta \cdot S_{critical} \\ \text{Partition and Schedule} & \text{if } U(t, S_c) > \tau \text{ and } S_c > \theta \cdot S_{critical} \\ \text{Delay Compaction} & \text{if } U(t, S_c) \leq \tau \text{ and } t < t_{deadline} \\ \text{Emergency Partition and Schedule} & \text{if } t \geq t_{deadline} \end{cases}$$
Where $\tau$ is a threshold utility value.

Here's the Python implementation of this scheduling model:

from enum import Enum, auto
from typing import List, Dict, Tuple, Callable, Any, Optional
import numpy as np


class CompactionAction(Enum):
    """Possible actions for the compaction controller"""
    SCHEDULE = auto()
    PARTITION_AND_SCHEDULE = auto()
    DELAY = auto()
    EMERGENCY_PARTITION_AND_SCHEDULE = auto()


def calculate_utility(t: float,
                      s_c: int,
                      resource_func: Callable[[float], float],
                      performance_impact_func: Callable[[int, float], float],
                      delay_penalty_func: Callable[[float], float],
                      weights: Tuple[float, float, float]) -> float:
    """
    Calculate the utility of scheduling a compaction at time t
    
    Args:
        t: Time point
        s_c: Compaction size
        resource_func: Function returning resource utilization at time t
        performance_impact_func: Function calculating performance impact
        delay_penalty_func: Function calculating delay penalty
        weights: Tuple of weight parameters (w1, w2, w3)
        
    Returns:
        Utility value
    """
    w1, w2, w3 = weights
    
    # Available resource term
    r_t = resource_func(t)
    resource_term = w1 * (1 - r_t)
    
    # Performance impact term
    impact = performance_impact_func(s_c, r_t)
    impact_term = -w2 * impact
    
    # Delay penalty term
    penalty = delay_penalty_func(t)
    penalty_term = -w3 * penalty
    
    return resource_term + impact_term + penalty_term


def find_optimal_schedule_time(t_current: float,
                              t_deadline: float,
                              s_c: int,
                              resource_func: Callable[[float], float],
                              performance_impact_func: Callable[[int, float], float],
                              delay_penalty_func: Callable[[float], float],
                              weights: Tuple[float, float, float],
                              resolution: float = 300) -> float:
    """
    Find the optimal time to schedule a compaction
    
    Args:
        t_current: Current time
        t_deadline: Deadline time
        s_c: Compaction size
        resource_func: Function returning resource utilization at time t
        performance_impact_func: Function calculating performance impact
        delay_penalty_func: Function calculating delay penalty
        weights: Tuple of weight parameters (w1, w2, w3)
        resolution: Time resolution for search in seconds (default: 5 minutes)
        
    Returns:
        Optimal scheduling time
    """
    # Check discrete time points between current and deadline
    best_time = t_current
    best_utility = calculate_utility(
        t_current, s_c, resource_func, 
        performance_impact_func, delay_penalty_func, weights
    )
    
    # Time points to check
    t_points = np.arange(t_current, t_deadline + resolution, resolution)
    
    for t in t_points:
        utility = calculate_utility(
            t, s_c, resource_func, 
            performance_impact_func, delay_penalty_func, weights
        )
        
        if utility > best_utility:
            best_utility = utility
            best_time = t
    
    return best_time


def compaction_controller(t: float,
                         s_c: int,
                         t_current: float,
                         t_deadline: float,
                         s_critical: int,
                         theta: float,
                         tau: float,
                         resource_func: Callable[[float], float],
                         performance_impact_func: Callable[[int, float], float],
                         delay_penalty_func: Callable[[float], float],
                         weights: Tuple[float, float, float]) -> CompactionAction:
    """
    Compaction controller function C(t)
    
    Args:
        t: Time point
        s_c: Compaction size
        t_current: Current time
        t_deadline: Deadline time
        s_critical: Critical size threshold
        theta: Target fraction of critical capacity
        tau: Threshold utility value
        resource_func: Function returning resource utilization at time t
        performance_impact_func: Function calculating performance impact
        delay_penalty_func: Function calculating delay penalty
        weights: Tuple of weight parameters (w1, w2, w3)
        
    Returns:
        Appropriate action to take
    """
    # Calculate utility
    utility = calculate_utility(
        t, s_c, resource_func, 
        performance_impact_func, delay_penalty_func, weights
    )
    
    # Apply decision logic
    if t >= t_deadline:
        return CompactionAction.EMERGENCY_PARTITION_AND_SCHEDULE
    
    if utility <= tau:
        return CompactionAction.DELAY
    
    if s_c > theta * s_critical:
        return CompactionAction.PARTITION_AND_SCHEDULE
    
    return CompactionAction.SCHEDULE

8. Multi-Objective Optimization Framework

Finally, we present a comprehensive multi-objective optimization framework that integrates the various aspects of compaction management.

Definition 8.1 (Multi-Objective Compaction Problem): The multi-objective compaction optimization problem is defined as:
$$\min_{schedule, partitioning} \left[ \sum_{j} P(S_c^j, R(t_j)) + \lambda_1 \cdot \sum_{j} (t_j - t_{ideal}^j)^2 + \lambda_2 \cdot n \right]$$

Subject to:

  • Resource constraints: $R(t) \geq R_{min}$ at all times
  • Performance constraints: $P(S_c^j, R(t_j)) \leq P_{max}$ for all compactions
  • Coverage constraint: All necessary compactions must eventually complete

Theorem 8.1 (Pareto Optimality): A solution to the multi-objective compaction problem is Pareto optimal if no other solution improves one objective without degrading another.

Proof: By definition of Pareto optimality, a solution is Pareto optimal if it is not dominated by any other solution. For compaction schedules and partitionings, this means no alternative solution can simultaneously reduce performance impact, scheduling deviation, and partitioning complexity. ■

Theorem 8.2 (Adaptive Parameter Convergence): Under mild conditions, adaptive parameter updates of the form:
$$\theta_{t+1} = \theta_t - \eta_\theta \cdot \nabla_\theta L_t$$
converge to locally optimal parameter values that minimize the loss function $L_t$.

Proof: This follows from standard results in stochastic gradient descent. Under the assumptions that $L_t$ is differentiable and its gradients are Lipschitz continuous, and with appropriate step sizes $\eta_\theta$, the parameter updates converge to a local minimum of the loss function. ■

Here's a Python implementation of this multi-objective optimization framework:

from typing import List, Tuple, Dict, Callable, Optional
import numpy as np
from dataclasses import dataclass


@dataclass
class CompactionTask:
    """Represents a compaction task"""
    level_i: LSMLevel
    level_j: LSMLevel
    key_range: Optional[KeyRange]  # None for full compaction
    scheduled_time: float
    estimated_size: int


@dataclass
class CompactionSolution:
    """Represents a solution to the multi-objective compaction problem"""
    tasks: List[CompactionTask]
    performance_impact: float
    schedule_deviation: float
    partition_count: int
    total_objective: float


def calculate_multi_objective_cost(tasks: List[CompactionTask],
                                  performance_impact_func: Callable[[int, float], float],
                                  resource_func: Callable[[float], float],
                                  ideal_times: Dict[Tuple[int, int], float],
                                  lambda1: float,
                                  lambda2: float) -> Tuple[float, float, float, float]:
    """
    Calculate the cost components of the multi-objective function
    
    Args:
        tasks: List of compaction tasks
        performance_impact_func: Function to calculate performance impact
        resource_func: Function to get resource utilization at a time
        ideal_times: Dictionary mapping (level_i, level_j) to ideal compaction times
        lambda1: Weight for schedule deviation
        lambda2: Weight for partition count
        
    Returns:
        Tuple of (total cost, performance impact, schedule deviation, partition count)
    """
    # Calculate performance impact sum
    impact_sum = sum(
        performance_impact_func(task.estimated_size, resource_func(task.scheduled_time))
        for task in tasks
    )
    
    # Calculate schedule deviation
    deviation_sum = sum(
        (task.scheduled_time - ideal_times.get((task.level_i.level_id, task.level_j.level_id), 
                                              task.scheduled_time))**2
        for task in tasks
    )
    
    # Count partitions (number of tasks)
    partition_count = len(tasks)
    
    # Calculate total objective
    total = impact_sum + lambda1 * deviation_sum + lambda2 * partition_count
    
    return total, impact_sum, deviation_sum, partition_count


def find_pareto_optimal_solutions(level_i: LSMLevel,
                                 level_j: LSMLevel,
                                 s_critical: int,
                                 resource_func: Callable[[float], float],
                                 performance_impact_func: Callable[[int, float], float],
                                 get_size_estimate: Callable[[LSMLevel, bytes, bytes], int],
                                 get_key_boundaries: Callable[[LSMLevel, LSMLevel], List[bytes]],
                                 t_current: float,
                                 t_deadline: float,
                                 lambda1_values: List[float],
                                 lambda2_values: List[float],
                                 theta_values: List[float]) -> List[CompactionSolution]:
    """
    Find Pareto optimal solutions for the multi-objective compaction problem
    
    Args:
        level_i, level_j: Levels to compact
        s_critical: Critical size threshold
        resource_func: Function returning resource utilization at time t
        performance_impact_func: Function calculating performance impact
        get_size_estimate: Function to estimate size within a key range
        get_key_boundaries: Function to get potential key boundaries
        t_current: Current time
        t_deadline: Deadline time
        lambda1_values: Weight values for schedule deviation to try
        lambda2_values: Weight values for partition count to try
        theta_values: Values for target fraction of critical capacity to try
        
    Returns:
        List of Pareto optimal solutions
    """
    solutions: List[CompactionSolution] = []
    ideal_times = {(level_i.level_id, level_j.level_id): t_current}  # Simplified
    
    # Try different parameter combinations
    for lambda1 in lambda1_values:
        for lambda2 in lambda2_values:
            for theta in theta_values:
                # Create tasks based on the current parameters
                max_partition_size = int(theta * s_critical)
                
                # Estimate full compaction size
                full_size = level_i.size_bytes + level_j.size_bytes
                
                # Decide whether to partition
                if full_size <= max_partition_size:
                    # Single compaction task
                    task = CompactionTask(
                        level_i=level_i,
                        level_j=level_j,
                        key_range=None,  # Full compaction
                        scheduled_time=t_current,  # Simplified
                        estimated_size=full_size
                    )
                    tasks = [task]
                else:
                    # Partition and create multiple tasks
                    partitions = greedy_key_space_partitioning(
                        level_i, level_j,
                        get_key_boundaries(level_i, level_j),
                        get_size_estimate,
                        max_partition_size
                    )
                    
                    # Create a task for each partition
                    tasks = []
                    for i, partition in enumerate(partitions):
                        # Simplified scheduling - evenly distribute tasks
                        schedule_time = t_current + i * (t_deadline - t_current) / len(partitions)
                        
                        task = CompactionTask(
                            level_i=level_i,
                            level_j=level_j,
                            key_range=partition,
                            scheduled_time=schedule_time,
                            estimated_size=partition.estimated_size
                        )
                        tasks.append(task)
                
                # Calculate the objective function
                total, impact, deviation, count = calculate_multi_objective_cost(
                    tasks, performance_impact_func, resource_func, 
                    ideal_times, lambda1, lambda2
                )
                
                # Create solution
                solution = CompactionSolution(
                    tasks=tasks,
                    performance_impact=impact,
                    schedule_deviation=deviation,
                    partition_count=count,
                    total_objective=total
                )
                
                solutions.append(solution)
    
    # Filter for Pareto optimal solutions
    pareto_optimal = []
    for solution in solutions:
        dominated = False
        for other in solutions:
            if (other.performance_impact < solution.performance_impact and
                other.schedule_deviation <= solution.schedule_deviation and
                other.partition_count <= solution.partition_count) or (
                other.performance_impact <= solution.performance_impact and
                other.schedule_deviation < solution.schedule_deviation and
                other.partition_count <= solution.partition_count) or (
                other.performance_impact <= solution.performance_impact and
                other.schedule_deviation <= solution.schedule_deviation and
                other.partition_count < solution.partition_count):
                dominated = True
                break
        
        if not dominated:
            pareto_optimal.append(solution)
    
    return pareto_optimal


def adaptive_parameter_update(current_params: Dict[str, float],
                             gradient: Dict[str, float],
                             learning_rates: Dict[str, float]) -> Dict[str, float]:
    """
    Update parameters adaptively based on gradient
    
    Args:
        current_params: Current parameter values
        gradient: Gradient of loss with respect to parameters
        learning_rates: Learning rates for each parameter
        
    Returns:
        Updated parameters
    """
    updated_params = {}
    
    for param, value in current_params.items():
        if param in gradient and param in learning_rates:
            # Update rule: param = param - learning_rate * gradient
            updated_params[param] = value - learning_rates[param] * gradient[param]
        else:
            # Keep unchanged if no gradient or learning rate
            updated_params[param] = value
    
    return updated_params

9. Conclusion

This essay has presented a comprehensive mathematical framework for modeling, predicting, and optimizing compaction operations in LSM tree systems. We have established formal definitions and theorems governing compaction timing decisions, performance impact assessment, incremental compaction strategies, and dynamic scheduling models.

The key theoretical contributions include:

  1. A rigorous cost-benefit model for determining optimal compaction timing based on deletion ratios and system parameters
  2. A non-linear performance impact function that accurately captures the exponential degradation caused by oversized compactions
  3. Mathematical formulations for predictive compaction sizing and growth projection
  4. Formal proof of the existence and approximation guarantees for incremental compaction strategies
  5. A utility-based scheduling framework that balances multiple competing objectives

These formal models provide database system designers with theoretically sound approaches to prevent performance degradation from excessive compaction operations while maintaining the space efficiency and query performance benefits of LSM tree systems. The mathematical framework is general enough to be adapted to various LSM tree implementations and can be calibrated to specific workload characteristics through its parameterization.

The complementary Python code demonstrates how these theoretical models can be implemented in practice, providing a concrete bridge between mathematical formulations and real-world database systems. This integration of theory and implementation offers a solid foundation for building robust, adaptive compaction management systems that prevent performance degradation while maintaining optimal system efficiency.

Read more