Theoretical Models for LSM Tree Compaction Optimization: A Mathematical Analysis
1. Introduction: The Compaction Challenge in LSM Tree Systems
Log-Structured Merge Trees (LSM trees) have become a prevalent data structure for modern storage systems due to their ability to optimize write performance by converting random writes into sequential ones. However, this advantage comes with a fundamental trade-off: as deletions and updates accumulate, the system requires periodic compaction operations to maintain read performance and space efficiency. These compactions can grow prohibitively large, causing significant performance degradation that disrupts normal system operation.
This essay presents a rigorous theoretical framework for modeling and optimizing compaction processes in LSM tree systems, with particular attention to preventing excessive compaction sizes while maintaining system efficiency. We develop formal models for compaction triggering conditions, size impact assessment, and incremental compaction strategies, providing mathematical foundations for each approach.
2. Fundamental Definitions and Notation
We begin by establishing the mathematical notation and core definitions that will serve as the foundation for our analysis.
Definition 2.1 (LSM Tree Structure): An LSM tree consists of $L$ levels $\{L_0, L_1, ..., L_L\}$, where each level $L_i$ contains a set of sorted string tables (SSTables). The size of level $L_i$ is denoted by $S_i$, with the constraint that $S_{i+1} = k \cdot S_i$ for some growth factor $k > 1$.
Definition 2.2 (Valid Data and Tombstones): For a level $L_i$, we denote:
- $V_i$: The volume of valid (non-deleted) data in bytes
- $T_i$: The volume of tombstone data in bytes
- Thus, $S_i = V_i + T_i + O_i$, where $O_i$ represents other metadata overhead
Definition 2.3 (Deletion Ratio): The deletion ratio of level $L_i$ is defined as:
$$D_r(L_i) = \frac{T_i}{S_i}$$
Definition 2.4 (Compaction Operation): A compaction operation $C(L_i, L_j)$ merges data from levels $L_i$ and $L_j$ (typically where $j = i+1$), removing duplicate keys and obsolete values marked by tombstones.
The following code demonstrates these fundamental structures:
from dataclasses import dataclass
from typing import List, Dict, Tuple, Optional
import numpy as np
@dataclass
class LSMLevel:
level_id: int
size_bytes: int # S_i
valid_data_bytes: int # V_i
tombstone_bytes: int # T_i
metadata_bytes: int # O_i
@property
def deletion_ratio(self) -> float:
"""Calculate the deletion ratio D_r(L_i)"""
return self.tombstone_bytes / self.size_bytes if self.size_bytes > 0 else 0.0
class LSMTree:
def __init__(self, num_levels: int, growth_factor: float = 10.0):
"""Initialize an LSM tree with specified number of levels and growth factor"""
self.levels: List[LSMLevel] = []
self.num_levels = num_levels
self.growth_factor = growth_factor
# Initialize empty levels
for i in range(num_levels):
target_size = 1000000 * (growth_factor ** i) # Example base size of ~1MB
self.levels.append(LSMLevel(
level_id=i,
size_bytes=0,
valid_data_bytes=0,
tombstone_bytes=0,
metadata_bytes=0
))
3. Cost-Benefit Model for Compaction Decisions
We now establish a formal cost-benefit model to determine optimal compaction timing.
Definition 3.1 (Compaction Costs): The costs associated with a compaction operation $C(L_i, L_{i+1})$ are defined as:
$$C_{compaction}(L_i, L_{i+1}) = C_{IO} \cdot W_a \cdot (S_i + S_{i+1}) + C_{resources} \cdot (S_i + S_{i+1})$$
where:
- $C_{IO}$ is the IO cost per byte
- $W_a$ is the write amplification factor
- $C_{resources}$ represents the cost coefficient for system resources
Definition 3.2 (Deferral Costs): The costs associated with deferring compaction include:
$$C_{defer}(L_i, L_{i+1}) = C_{space}(L_i, L_{i+1}) + C_{read}(L_i, L_{i+1})$$
where:
- $C_{space}(L_i, L_{i+1}) = \alpha_s \cdot (T_i + T_{i+1})$ represents the cost of wasted space
- $C_{read}(L_i, L_{i+1}) = \alpha_r \cdot (D_r(L_i) \cdot Q(L_i) \cdot P(L_i) + D_r(L_{i+1}) \cdot Q(L_{i+1}) \cdot P(L_{i+1}))$ represents the cost of degraded read performance
Theorem 3.1 (Optimal Compaction Timing): Under the cost-benefit model, the optimal time to trigger compaction for levels $L_i$ and $L_{i+1}$ is when:
$$C_{defer}(L_i, L_{i+1}) > C_{compaction}(L_i, L_{i+1})$$
Proof: The decision to compact at time $t$ versus deferring to time $t + \Delta t$ is optimal when the cost of deferral exceeds the cost of immediate compaction. This creates a threshold condition where deferring compaction would incur higher total costs than performing it immediately. Therefore, the optimal timing is precisely when $C_{defer}(L_i, L_{i+1}) = C_{compaction}(L_i, L_{i+1})$, and any deferral beyond this point increases total cost. ■
Corollary 3.1 (Deletion Ratio Threshold): From Theorem 3.1, we can derive a deletion ratio threshold that triggers compaction:
$$D_r(L_i) > \frac{C_{compaction}(L_i, L_{i+1}) - \alpha_s \cdot (T_i + T_{i+1})}{\alpha_r \cdot Q(L_i) \cdot P(L_i) + \alpha_r \cdot Q(L_{i+1}) \cdot P(L_{i+1})}$$
The following code implements this cost-benefit model:
def calculate_compaction_cost(level_i: LSMLevel,
level_j: LSMLevel,
io_cost_per_byte: float,
write_amp_factor: float,
resource_cost_coef: float) -> float:
"""
Calculate the cost of performing compaction between two levels
Args:
level_i: The first level (typically i)
level_j: The second level (typically i+1)
io_cost_per_byte: Cost coefficient for IO operations
write_amp_factor: Write amplification factor
resource_cost_coef: Cost coefficient for system resources
Returns:
The total cost of compaction
"""
total_size = level_i.size_bytes + level_j.size_bytes
io_cost = io_cost_per_byte * write_amp_factor * total_size
resource_cost = resource_cost_coef * total_size
return io_cost + resource_cost
def calculate_deferral_cost(level_i: LSMLevel,
level_j: LSMLevel,
alpha_space: float,
alpha_read: float,
query_freq_i: float,
query_freq_j: float,
read_penalty_i: float,
read_penalty_j: float) -> float:
"""
Calculate the cost of deferring compaction between two levels
Args:
level_i: The first level (typically i)
level_j: The second level (typically i+1)
alpha_space: Weight coefficient for space costs
alpha_read: Weight coefficient for read performance costs
query_freq_i: Query frequency for level i (Q(L_i))
query_freq_j: Query frequency for level j (Q(L_j))
read_penalty_i: Read performance penalty for level i (P(L_i))
read_penalty_j: Read performance penalty for level j (P(L_j))
Returns:
The total cost of deferring compaction
"""
# Space waste cost
space_cost = alpha_space * (level_i.tombstone_bytes + level_j.tombstone_bytes)
# Read performance degradation cost
read_cost_i = level_i.deletion_ratio * query_freq_i * read_penalty_i
read_cost_j = level_j.deletion_ratio * query_freq_j * read_penalty_j
read_cost = alpha_read * (read_cost_i + read_cost_j)
return space_cost + read_cost
def should_trigger_compaction(level_i: LSMLevel,
level_j: LSMLevel,
system_params: Dict[str, float]) -> bool:
"""
Determine if compaction should be triggered based on cost-benefit analysis
Args:
level_i: The first level (typically i)
level_j: The second level (typically i+1)
system_params: Dictionary of system parameters
Returns:
True if compaction should be triggered, False otherwise
"""
defer_cost = calculate_deferral_cost(
level_i, level_j,
system_params['alpha_space'],
system_params['alpha_read'],
system_params['query_freq_i'],
system_params['query_freq_j'],
system_params['read_penalty_i'],
system_params['read_penalty_j']
)
compaction_cost = calculate_compaction_cost(
level_i, level_j,
system_params['io_cost_per_byte'],
system_params['write_amp_factor'],
system_params['resource_cost_coef']
)
return defer_cost > compaction_cost
4. Performance Impact Modeling of Compaction Size
We now establish a mathematical model for the performance impact of compaction operations based on their size.
Definition 4.1 (Performance Impact Function): The performance impact of a compaction operation of size $S_c$ with available system resources $R$ is defined as:
$$P(S_c, R) = \alpha \cdot \frac{S_c}{R \cdot C_{max}} \cdot \left(1 + \beta \cdot e^{(S_c - \gamma \cdot R \cdot C_{max})}\right)$$
where:
- $C_{max}$ is the maximum compaction size the system can handle without significant degradation
- $\alpha$ is a scaling factor for linear impact
- $\beta$ is a scaling factor for exponential impact when compaction exceeds threshold
- $\gamma$ is a threshold coefficient (typically $0 < \gamma < 1$)
Theorem 4.1 (Non-linear Scaling of Performance Impact): The performance impact function $P(S_c, R)$ exhibits the following properties:
- For $S_c \ll \gamma \cdot R \cdot C_{max}$, $P(S_c, R) \approx \alpha \cdot \frac{S_c}{R \cdot C_{max}}$ (linear growth)
- For $S_c > \gamma \cdot R \cdot C_{max}$, $P(S_c, R)$ grows exponentially
Proof: For small values of $S_c$ relative to $\gamma \cdot R \cdot C_{max}$, the term $e^{(S_c - \gamma \cdot R \cdot C_{max})}$ becomes negligibly small, making the exponential term's contribution minimal. As $S_c$ approaches and exceeds $\gamma \cdot R \cdot C_{max}$, the exponential term dominates, causing the function to grow at a super-linear rate. Formally, taking the derivative of $P$ with respect to $S_c$ shows that the rate of change itself increases dramatically once $S_c$ exceeds the threshold. ■
Definition 4.2 (Critical Compaction Size): The critical compaction size $S_{critical}$ is defined as the size at which performance degradation becomes unacceptable:
$$S_{critical} = \gamma \cdot R \cdot C_{max}$$
Here's the implementation of the performance impact model:
def calculate_performance_impact(compaction_size: int,
available_resources: float,
c_max: int,
alpha: float = 1.0,
beta: float = 2.0,
gamma: float = 0.5) -> float:
"""
Calculate the performance impact of a compaction operation
Args:
compaction_size: Size of the compaction operation in bytes (S_c)
available_resources: Available system resources normalized to [0,1] (R)
c_max: Maximum compaction size the system can handle
alpha: Scaling factor for linear impact
beta: Scaling factor for exponential impact
gamma: Threshold coefficient (0 < gamma < 1)
Returns:
The performance impact value
"""
# Handle edge case to prevent division by zero
if available_resources == 0:
return float('inf')
# Calculate normalized size
normalized_size = compaction_size / (available_resources * c_max)
# Calculate exponential term
threshold = gamma * available_resources * c_max
exp_term = beta * np.exp(compaction_size - threshold)
# Calculate impact
impact = alpha * normalized_size * (1 + exp_term)
return impact
def get_critical_compaction_size(available_resources: float,
c_max: int,
gamma: float = 0.5) -> int:
"""
Calculate the critical compaction size threshold
Args:
available_resources: Available system resources normalized to [0,1]
c_max: Maximum compaction size the system can handle
gamma: Threshold coefficient (0 < gamma < 1)
Returns:
The critical size threshold in bytes
"""
return int(gamma * available_resources * c_max)
5. Compaction Growth Projection and Prediction
To prevent oversized compactions, we develop a formal model for projecting compaction size growth.
Definition 5.1 (Level Growth Functions): The growth of level $L_i$ over time is modeled as:
$$L_i(t + \Delta t) = L_i(t) + \int_{t}^{t+\Delta t} [\lambda_{insert}(s) - \lambda_{compact}(s)] ds$$
$$T_i(t + \Delta t) = T_i(t) + \int_{t}^{t+\Delta t} [\lambda_{delete}(s) - \lambda_{compact\_tombstone}(s)] ds$$
Where the $\lambda$ terms represent the rates of various operations.
Theorem 5.1 (Predictive Compaction Sizing): Given current growth rates, the expected size of a compaction operation between levels $L_i$ and $L_{i+1}$ at time $t + \Delta t$ is:
$$S_c(t + \Delta t) = (V_i(t + \Delta t) + T_i(t + \Delta t)) + (V_{i+1}(t + \Delta t) + T_{i+1}(t + \Delta t))$$
Proof: By Definition 2.4, a compaction operation involves all data from both levels. Since $S_i = V_i + T_i$ for each level, the total size of data involved in compaction is the sum of these components across both levels at the time of compaction. ■
Definition 5.2 (Time to Critical Size): The time until a compaction operation would reach critical size is defined as:
$$t_{critical} = \min \{t > t_{current} \mid S_c(t) > S_{critical}\}$$
The following code implements the growth projection model:
from typing import Callable, Tuple
import numpy as np
from scipy.integrate import quad
def project_level_growth(current_level: LSMLevel,
t_current: float,
t_future: float,
insert_rate_func: Callable[[float], float],
compact_rate_func: Callable[[float], float],
delete_rate_func: Callable[[float], float],
compact_tombstone_rate_func: Callable[[float], float]) -> LSMLevel:
"""
Project the growth of a level over time
Args:
current_level: Current state of the level
t_current: Current time
t_future: Future time to project to
insert_rate_func: Function that gives insertion rate at time t
compact_rate_func: Function that gives compaction rate at time t
delete_rate_func: Function that gives deletion rate at time t
compact_tombstone_rate_func: Function that gives tombstone compaction rate at time t
Returns:
Projected level state at future time
"""
# Calculate the integral of insertion rate minus compaction rate
data_growth, _ = quad(lambda s: insert_rate_func(s) - compact_rate_func(s),
t_current, t_future)
# Calculate the integral of deletion rate minus tombstone compaction rate
tombstone_growth, _ = quad(lambda s: delete_rate_func(s) - compact_tombstone_rate_func(s),
t_current, t_future)
# Create projected level
projected_level = LSMLevel(
level_id=current_level.level_id,
size_bytes=current_level.size_bytes + int(data_growth) + int(tombstone_growth),
valid_data_bytes=current_level.valid_data_bytes + int(data_growth),
tombstone_bytes=current_level.tombstone_bytes + int(tombstone_growth),
metadata_bytes=current_level.metadata_bytes # Assuming metadata size stays constant
)
return projected_level
def predict_compaction_size(level_i: LSMLevel,
level_j: LSMLevel,
t_current: float,
t_future: float,
growth_model_i: Dict[str, Callable[[float], float]],
growth_model_j: Dict[str, Callable[[float], float]]) -> int:
"""
Predict the size of a compaction operation at a future time
Args:
level_i: Current state of first level
level_j: Current state of second level
t_current: Current time
t_future: Future time to predict for
growth_model_i: Growth rate functions for level i
growth_model_j: Growth rate functions for level j
Returns:
Predicted compaction size in bytes
"""
# Project growth for level i
projected_i = project_level_growth(
level_i, t_current, t_future,
growth_model_i['insert_rate'],
growth_model_i['compact_rate'],
growth_model_i['delete_rate'],
growth_model_i['compact_tombstone_rate']
)
# Project growth for level j
projected_j = project_level_growth(
level_j, t_current, t_future,
growth_model_j['insert_rate'],
growth_model_j['compact_rate'],
growth_model_j['delete_rate'],
growth_model_j['compact_tombstone_rate']
)
# Calculate combined size of projected levels
compaction_size = (projected_i.valid_data_bytes + projected_i.tombstone_bytes +
projected_j.valid_data_bytes + projected_j.tombstone_bytes)
return compaction_size
def time_to_critical_size(level_i: LSMLevel,
level_j: LSMLevel,
t_current: float,
s_critical: int,
growth_model_i: Dict[str, Callable[[float], float]],
growth_model_j: Dict[str, Callable[[float], float]],
max_horizon: float = 30 * 24 * 3600) -> Optional[float]:
"""
Calculate the time until compaction size reaches critical threshold
Args:
level_i: Current state of first level
level_j: Current state of second level
t_current: Current time
s_critical: Critical size threshold
growth_model_i: Growth rate functions for level i
growth_model_j: Growth rate functions for level j
max_horizon: Maximum time horizon to check (default: 30 days in seconds)
Returns:
Time until critical size is reached, or None if not reached within horizon
"""
# Check if already at critical size
current_size = (level_i.valid_data_bytes + level_i.tombstone_bytes +
level_j.valid_data_bytes + level_j.tombstone_bytes)
if current_size >= s_critical:
return 0.0
# Binary search to find t_critical
t_min = t_current
t_max = t_current + max_horizon
while t_max - t_min > 3600: # 1-hour precision
t_mid = (t_min + t_max) / 2
projected_size = predict_compaction_size(
level_i, level_j, t_current, t_mid,
growth_model_i, growth_model_j
)
if projected_size >= s_critical:
t_max = t_mid
else:
t_min = t_mid
# Final check - did we find a time when critical size is reached?
if t_min >= t_current + max_horizon - 3600:
return None # Not reached within horizon
return t_max
6. Incremental Compaction Strategy
To address the challenge of oversized compactions, we formalize an incremental compaction strategy.
Definition 6.1 (Key Space Partitioning): Given a key space $\mathcal{K}$ requiring compaction, a partitioning $\mathcal{P} = \{\mathcal{K}_1, \mathcal{K}_2, ..., \mathcal{K}_n\}$ is valid if:
- $\cup_{j=1}^{n} \mathcal{K}_j = \mathcal{K}$ (completeness)
- $\mathcal{K}_i \cap \mathcal{K}_j = \emptyset$ for all $i \neq j$ (disjointness)
Definition 6.2 (Optimal Partitioning Problem): The optimal partitioning problem is defined as:
$$\min_{n, \mathcal{P}} n$$
$$\text{subject to } S_c(\mathcal{K}_j) \leq \theta \cdot S_{critical} \text{ for all } j \in \{1,2,...,n\}$$
Where $\theta$ is a target fraction of critical capacity (e.g., $\theta = 0.3$)
Theorem 6.1 (Existence of Valid Partitioning): For any key space $\mathcal{K}$ with finite size $S_c(\mathcal{K})$, there exists a valid partitioning $\mathcal{P}$ such that $S_c(\mathcal{K}_j) \leq \theta \cdot S_{critical}$ for all partitions $\mathcal{K}_j \in \mathcal{P}$.
Proof: We can construct a valid partitioning by iteratively dividing the key space. Let $\delta > 0$ be a sufficiently small size such that $\delta < \theta \cdot S_{critical}$. Since the key space is finite, we can divide it into regions of size at most $\delta$. In the worst case, each key becomes its own partition, but this guarantees that each partition's size is below the threshold. Since the partitions can be merged as long as their combined size remains below the threshold, we can construct a valid partitioning with finite $n$. ■
Theorem 6.2 (Greedy Partitioning Approximation): A greedy algorithm that scans the key space in order and creates breaks when the accumulated size reaches $\theta \cdot S_{critical}$ produces a partitioning with at most $\lceil \frac{S_c(\mathcal{K})}{\theta \cdot S_{critical}} \rceil$ partitions.
Proof: The greedy algorithm fills each partition to the maximum allowed size $\theta \cdot S_{critical}$ (except possibly the last partition). Therefore, the number of partitions required is at most $\lceil \frac{S_c(\mathcal{K})}{\theta \cdot S_{critical}} \rceil$. This is within a constant factor of the optimal solution, as any partitioning requires at least $\lceil \frac{S_c(\mathcal{K})}{S_{critical}} \rceil$ partitions. ■
The following code implements the incremental compaction strategy:
from typing import List, Dict, Tuple, Callable, Any
import bisect
@dataclass
class KeyRange:
"""Represents a range of keys in the key space"""
start_key: bytes # Inclusive
end_key: bytes # Exclusive
estimated_size: int
def estimate_range_size(level_i: LSMLevel,
level_j: LSMLevel,
start_key: bytes,
end_key: bytes,
get_size_estimate: Callable[[LSMLevel, bytes, bytes], int]) -> int:
"""
Estimate the size of data in a key range across two levels
Args:
level_i: First level
level_j: Second level
start_key: Start key of range (inclusive)
end_key: End key of range (exclusive)
get_size_estimate: Function to estimate size within a key range for a level
Returns:
Estimated size in bytes
"""
size_i = get_size_estimate(level_i, start_key, end_key)
size_j = get_size_estimate(level_j, start_key, end_key)
return size_i + size_j
def greedy_key_space_partitioning(level_i: LSMLevel,
level_j: LSMLevel,
key_boundaries: List[bytes],
get_size_estimate: Callable[[LSMLevel, bytes, bytes], int],
max_partition_size: int) -> List[KeyRange]:
"""
Partition the key space using a greedy algorithm
Args:
level_i: First level
level_j: Second level
key_boundaries: List of potential partition boundaries (sorted)
get_size_estimate: Function to estimate size within a key range
max_partition_size: Maximum allowed size for a partition
Returns:
List of key ranges representing the partitioning
"""
partitions: List[KeyRange] = []
current_start = key_boundaries[0]
current_size = 0
for i in range(1, len(key_boundaries)):
range_size = estimate_range_size(
level_i, level_j,
key_boundaries[i-1], key_boundaries[i],
get_size_estimate
)
# If adding this range would exceed max size, create a new partition
if current_size + range_size > max_partition_size and current_size > 0:
partitions.append(KeyRange(
start_key=current_start,
end_key=key_boundaries[i-1],
estimated_size=current_size
))
current_start = key_boundaries[i-1]
current_size = range_size
else:
current_size += range_size
# Add the final partition
if current_size > 0:
partitions.append(KeyRange(
start_key=current_start,
end_key=key_boundaries[-1],
estimated_size=current_size
))
return partitions
def create_bounded_compaction_tasks(level_i: LSMLevel,
level_j: LSMLevel,
s_critical: int,
theta: float = 0.3,
get_size_estimate: Callable[[LSMLevel, bytes, bytes], int] = None,
get_key_boundaries: Callable[[LSMLevel, LSMLevel], List[bytes]] = None) -> List[KeyRange]:
"""
Create bounded compaction tasks by partitioning the key space
Args:
level_i: First level
level_j: Second level
s_critical: Critical size threshold
theta: Target fraction of critical capacity
get_size_estimate: Function to estimate size within a key range
get_key_boundaries: Function to get potential key boundaries
Returns:
List of key ranges for bounded compaction tasks
"""
# Get key boundaries (this would typically come from the LSM tree implementation)
if get_key_boundaries is None:
# Placeholder for actual implementation
raise ValueError("get_key_boundaries function must be provided")
key_boundaries = get_key_boundaries(level_i, level_j)
max_partition_size = int(theta * s_critical)
# Use greedy algorithm to partition key space
partitions = greedy_key_space_partitioning(
level_i, level_j,
key_boundaries,
get_size_estimate,
max_partition_size
)
return partitions
7. Dynamic Resource-Aware Scheduling Model
We now formalize a scheduling model for compaction that accounts for system resource availability.
Definition 7.1 (Utility Function): The utility of scheduling a compaction of size $S_c$ at time $t$ is defined as:
$$U(t, S_c) = w_1 \cdot (1 - R(t)) - w_2 \cdot P(S_c, R(t)) - w_3 \cdot D(t)$$
Where:
- $R(t)$ is the resource utilization at time $t$
- $P(S_c, R(t))$ is the performance impact function
- $D(t)$ is a delay penalty function
- $w_1, w_2, w_3$ are weight parameters
Definition 7.2 (Optimal Scheduling Problem): The optimal scheduling problem is defined as:
$$t^* = \arg\max_{t \in [t_{current}, t_{deadline}]} U(t, S_c)$$
Where $t_{deadline}$ is the latest time compaction can be delayed before causing severe performance impact.
Theorem 7.1 (Existence of Optimal Schedule): Given a continuous utility function $U(t, S_c)$ and a closed interval $[t_{current}, t_{deadline}]$, there exists an optimal scheduling time $t^*$ that maximizes utility.
Proof: By the extreme value theorem, a continuous function on a closed interval attains its maximum value. Since $U(t, S_c)$ is continuous in $t$ (assuming continuous resource utilization and penalty functions), it attains its maximum at some point $t^* \in [t_{current}, t_{deadline}]$. ■
Definition 7.3 (Compaction Controller): The compaction controller function $C(t)$ is defined as:
$$C(t) = \begin{cases} \text{Schedule Compaction} & \text{if } U(t, S_c) > \tau \text{ and } S_c \leq \theta \cdot S_{critical} \\ \text{Partition and Schedule} & \text{if } U(t, S_c) > \tau \text{ and } S_c > \theta \cdot S_{critical} \\ \text{Delay Compaction} & \text{if } U(t, S_c) \leq \tau \text{ and } t < t_{deadline} \\ \text{Emergency Partition and Schedule} & \text{if } t \geq t_{deadline} \end{cases}$$
Where $\tau$ is a threshold utility value.
Here's the Python implementation of this scheduling model:
from enum import Enum, auto
from typing import List, Dict, Tuple, Callable, Any, Optional
import numpy as np
class CompactionAction(Enum):
"""Possible actions for the compaction controller"""
SCHEDULE = auto()
PARTITION_AND_SCHEDULE = auto()
DELAY = auto()
EMERGENCY_PARTITION_AND_SCHEDULE = auto()
def calculate_utility(t: float,
s_c: int,
resource_func: Callable[[float], float],
performance_impact_func: Callable[[int, float], float],
delay_penalty_func: Callable[[float], float],
weights: Tuple[float, float, float]) -> float:
"""
Calculate the utility of scheduling a compaction at time t
Args:
t: Time point
s_c: Compaction size
resource_func: Function returning resource utilization at time t
performance_impact_func: Function calculating performance impact
delay_penalty_func: Function calculating delay penalty
weights: Tuple of weight parameters (w1, w2, w3)
Returns:
Utility value
"""
w1, w2, w3 = weights
# Available resource term
r_t = resource_func(t)
resource_term = w1 * (1 - r_t)
# Performance impact term
impact = performance_impact_func(s_c, r_t)
impact_term = -w2 * impact
# Delay penalty term
penalty = delay_penalty_func(t)
penalty_term = -w3 * penalty
return resource_term + impact_term + penalty_term
def find_optimal_schedule_time(t_current: float,
t_deadline: float,
s_c: int,
resource_func: Callable[[float], float],
performance_impact_func: Callable[[int, float], float],
delay_penalty_func: Callable[[float], float],
weights: Tuple[float, float, float],
resolution: float = 300) -> float:
"""
Find the optimal time to schedule a compaction
Args:
t_current: Current time
t_deadline: Deadline time
s_c: Compaction size
resource_func: Function returning resource utilization at time t
performance_impact_func: Function calculating performance impact
delay_penalty_func: Function calculating delay penalty
weights: Tuple of weight parameters (w1, w2, w3)
resolution: Time resolution for search in seconds (default: 5 minutes)
Returns:
Optimal scheduling time
"""
# Check discrete time points between current and deadline
best_time = t_current
best_utility = calculate_utility(
t_current, s_c, resource_func,
performance_impact_func, delay_penalty_func, weights
)
# Time points to check
t_points = np.arange(t_current, t_deadline + resolution, resolution)
for t in t_points:
utility = calculate_utility(
t, s_c, resource_func,
performance_impact_func, delay_penalty_func, weights
)
if utility > best_utility:
best_utility = utility
best_time = t
return best_time
def compaction_controller(t: float,
s_c: int,
t_current: float,
t_deadline: float,
s_critical: int,
theta: float,
tau: float,
resource_func: Callable[[float], float],
performance_impact_func: Callable[[int, float], float],
delay_penalty_func: Callable[[float], float],
weights: Tuple[float, float, float]) -> CompactionAction:
"""
Compaction controller function C(t)
Args:
t: Time point
s_c: Compaction size
t_current: Current time
t_deadline: Deadline time
s_critical: Critical size threshold
theta: Target fraction of critical capacity
tau: Threshold utility value
resource_func: Function returning resource utilization at time t
performance_impact_func: Function calculating performance impact
delay_penalty_func: Function calculating delay penalty
weights: Tuple of weight parameters (w1, w2, w3)
Returns:
Appropriate action to take
"""
# Calculate utility
utility = calculate_utility(
t, s_c, resource_func,
performance_impact_func, delay_penalty_func, weights
)
# Apply decision logic
if t >= t_deadline:
return CompactionAction.EMERGENCY_PARTITION_AND_SCHEDULE
if utility <= tau:
return CompactionAction.DELAY
if s_c > theta * s_critical:
return CompactionAction.PARTITION_AND_SCHEDULE
return CompactionAction.SCHEDULE
8. Multi-Objective Optimization Framework
Finally, we present a comprehensive multi-objective optimization framework that integrates the various aspects of compaction management.
Definition 8.1 (Multi-Objective Compaction Problem): The multi-objective compaction optimization problem is defined as:
$$\min_{schedule, partitioning} \left[ \sum_{j} P(S_c^j, R(t_j)) + \lambda_1 \cdot \sum_{j} (t_j - t_{ideal}^j)^2 + \lambda_2 \cdot n \right]$$
Subject to:
- Resource constraints: $R(t) \geq R_{min}$ at all times
- Performance constraints: $P(S_c^j, R(t_j)) \leq P_{max}$ for all compactions
- Coverage constraint: All necessary compactions must eventually complete
Theorem 8.1 (Pareto Optimality): A solution to the multi-objective compaction problem is Pareto optimal if no other solution improves one objective without degrading another.
Proof: By definition of Pareto optimality, a solution is Pareto optimal if it is not dominated by any other solution. For compaction schedules and partitionings, this means no alternative solution can simultaneously reduce performance impact, scheduling deviation, and partitioning complexity. ■
Theorem 8.2 (Adaptive Parameter Convergence): Under mild conditions, adaptive parameter updates of the form:
$$\theta_{t+1} = \theta_t - \eta_\theta \cdot \nabla_\theta L_t$$
converge to locally optimal parameter values that minimize the loss function $L_t$.
Proof: This follows from standard results in stochastic gradient descent. Under the assumptions that $L_t$ is differentiable and its gradients are Lipschitz continuous, and with appropriate step sizes $\eta_\theta$, the parameter updates converge to a local minimum of the loss function. ■
Here's a Python implementation of this multi-objective optimization framework:
from typing import List, Tuple, Dict, Callable, Optional
import numpy as np
from dataclasses import dataclass
@dataclass
class CompactionTask:
"""Represents a compaction task"""
level_i: LSMLevel
level_j: LSMLevel
key_range: Optional[KeyRange] # None for full compaction
scheduled_time: float
estimated_size: int
@dataclass
class CompactionSolution:
"""Represents a solution to the multi-objective compaction problem"""
tasks: List[CompactionTask]
performance_impact: float
schedule_deviation: float
partition_count: int
total_objective: float
def calculate_multi_objective_cost(tasks: List[CompactionTask],
performance_impact_func: Callable[[int, float], float],
resource_func: Callable[[float], float],
ideal_times: Dict[Tuple[int, int], float],
lambda1: float,
lambda2: float) -> Tuple[float, float, float, float]:
"""
Calculate the cost components of the multi-objective function
Args:
tasks: List of compaction tasks
performance_impact_func: Function to calculate performance impact
resource_func: Function to get resource utilization at a time
ideal_times: Dictionary mapping (level_i, level_j) to ideal compaction times
lambda1: Weight for schedule deviation
lambda2: Weight for partition count
Returns:
Tuple of (total cost, performance impact, schedule deviation, partition count)
"""
# Calculate performance impact sum
impact_sum = sum(
performance_impact_func(task.estimated_size, resource_func(task.scheduled_time))
for task in tasks
)
# Calculate schedule deviation
deviation_sum = sum(
(task.scheduled_time - ideal_times.get((task.level_i.level_id, task.level_j.level_id),
task.scheduled_time))**2
for task in tasks
)
# Count partitions (number of tasks)
partition_count = len(tasks)
# Calculate total objective
total = impact_sum + lambda1 * deviation_sum + lambda2 * partition_count
return total, impact_sum, deviation_sum, partition_count
def find_pareto_optimal_solutions(level_i: LSMLevel,
level_j: LSMLevel,
s_critical: int,
resource_func: Callable[[float], float],
performance_impact_func: Callable[[int, float], float],
get_size_estimate: Callable[[LSMLevel, bytes, bytes], int],
get_key_boundaries: Callable[[LSMLevel, LSMLevel], List[bytes]],
t_current: float,
t_deadline: float,
lambda1_values: List[float],
lambda2_values: List[float],
theta_values: List[float]) -> List[CompactionSolution]:
"""
Find Pareto optimal solutions for the multi-objective compaction problem
Args:
level_i, level_j: Levels to compact
s_critical: Critical size threshold
resource_func: Function returning resource utilization at time t
performance_impact_func: Function calculating performance impact
get_size_estimate: Function to estimate size within a key range
get_key_boundaries: Function to get potential key boundaries
t_current: Current time
t_deadline: Deadline time
lambda1_values: Weight values for schedule deviation to try
lambda2_values: Weight values for partition count to try
theta_values: Values for target fraction of critical capacity to try
Returns:
List of Pareto optimal solutions
"""
solutions: List[CompactionSolution] = []
ideal_times = {(level_i.level_id, level_j.level_id): t_current} # Simplified
# Try different parameter combinations
for lambda1 in lambda1_values:
for lambda2 in lambda2_values:
for theta in theta_values:
# Create tasks based on the current parameters
max_partition_size = int(theta * s_critical)
# Estimate full compaction size
full_size = level_i.size_bytes + level_j.size_bytes
# Decide whether to partition
if full_size <= max_partition_size:
# Single compaction task
task = CompactionTask(
level_i=level_i,
level_j=level_j,
key_range=None, # Full compaction
scheduled_time=t_current, # Simplified
estimated_size=full_size
)
tasks = [task]
else:
# Partition and create multiple tasks
partitions = greedy_key_space_partitioning(
level_i, level_j,
get_key_boundaries(level_i, level_j),
get_size_estimate,
max_partition_size
)
# Create a task for each partition
tasks = []
for i, partition in enumerate(partitions):
# Simplified scheduling - evenly distribute tasks
schedule_time = t_current + i * (t_deadline - t_current) / len(partitions)
task = CompactionTask(
level_i=level_i,
level_j=level_j,
key_range=partition,
scheduled_time=schedule_time,
estimated_size=partition.estimated_size
)
tasks.append(task)
# Calculate the objective function
total, impact, deviation, count = calculate_multi_objective_cost(
tasks, performance_impact_func, resource_func,
ideal_times, lambda1, lambda2
)
# Create solution
solution = CompactionSolution(
tasks=tasks,
performance_impact=impact,
schedule_deviation=deviation,
partition_count=count,
total_objective=total
)
solutions.append(solution)
# Filter for Pareto optimal solutions
pareto_optimal = []
for solution in solutions:
dominated = False
for other in solutions:
if (other.performance_impact < solution.performance_impact and
other.schedule_deviation <= solution.schedule_deviation and
other.partition_count <= solution.partition_count) or (
other.performance_impact <= solution.performance_impact and
other.schedule_deviation < solution.schedule_deviation and
other.partition_count <= solution.partition_count) or (
other.performance_impact <= solution.performance_impact and
other.schedule_deviation <= solution.schedule_deviation and
other.partition_count < solution.partition_count):
dominated = True
break
if not dominated:
pareto_optimal.append(solution)
return pareto_optimal
def adaptive_parameter_update(current_params: Dict[str, float],
gradient: Dict[str, float],
learning_rates: Dict[str, float]) -> Dict[str, float]:
"""
Update parameters adaptively based on gradient
Args:
current_params: Current parameter values
gradient: Gradient of loss with respect to parameters
learning_rates: Learning rates for each parameter
Returns:
Updated parameters
"""
updated_params = {}
for param, value in current_params.items():
if param in gradient and param in learning_rates:
# Update rule: param = param - learning_rate * gradient
updated_params[param] = value - learning_rates[param] * gradient[param]
else:
# Keep unchanged if no gradient or learning rate
updated_params[param] = value
return updated_params
9. Conclusion
This essay has presented a comprehensive mathematical framework for modeling, predicting, and optimizing compaction operations in LSM tree systems. We have established formal definitions and theorems governing compaction timing decisions, performance impact assessment, incremental compaction strategies, and dynamic scheduling models.
The key theoretical contributions include:
- A rigorous cost-benefit model for determining optimal compaction timing based on deletion ratios and system parameters
- A non-linear performance impact function that accurately captures the exponential degradation caused by oversized compactions
- Mathematical formulations for predictive compaction sizing and growth projection
- Formal proof of the existence and approximation guarantees for incremental compaction strategies
- A utility-based scheduling framework that balances multiple competing objectives
These formal models provide database system designers with theoretically sound approaches to prevent performance degradation from excessive compaction operations while maintaining the space efficiency and query performance benefits of LSM tree systems. The mathematical framework is general enough to be adapted to various LSM tree implementations and can be calibrated to specific workload characteristics through its parameterization.
The complementary Python code demonstrates how these theoretical models can be implemented in practice, providing a concrete bridge between mathematical formulations and real-world database systems. This integration of theory and implementation offers a solid foundation for building robust, adaptive compaction management systems that prevent performance degradation while maintaining optimal system efficiency.