LsmIndex.php

PHP

Path: src/Table/Index/LsmIndex.php

<?php
namespace mini\Table\Index;

use EmptyIterator;
use Traversable;

/**
 * LSM (Log-Structured Merge) index for dynamic insert-heavy workloads.
 *
 * Optimized for bulk inserts with occasional range queries. Creates inner
 * layers only when sorted and threshold exceeded, keeping sort costs low.
 */
final class LsmIndex implements IndexInterface, \Countable
{
    private const COMPACTION_RATIO = 0.05;

    private array $hash = [];
    private ?array $sorted = null;
    private ?self $inner = null;
    private ?self $outermost = null;
    private int $minimumSizeBeforeDelegation;

    public function __construct(int $minimumSizeBeforeDelegation = 100)
    {
        $this->minimumSizeBeforeDelegation = $minimumSizeBeforeDelegation;
    }

    public function insert(string $key, int $rowId): void
    {
        $packed = pack('Q', $rowId);

        // Existing key - append locally
        if (isset($this->hash[$key])) {
            $this->hash[$key] .= $packed;
            return;
        }

        // New key - delegate to inner if exists
        if ($this->inner !== null) {
            $this->inner->insert($key, $rowId);
            return;
        }

        // Only delegate when sorted and exceeds adaptive threshold (1% of total, min 100)
        if ($this->sorted !== null) {
            $totalKeys = ($this->outermost ?? $this)->count();
            $threshold = max($this->minimumSizeBeforeDelegation, (int)($totalKeys * 0.01));
            if (count($this->hash) >= $threshold) {
                $this->inner = new self($this->minimumSizeBeforeDelegation);
                $this->inner->outermost = $this->outermost ?? $this;
                $this->inner->insert($key, $rowId);
                return;
            }
        }

        $this->hash[$key] = $packed;
        $this->sorted = null;
    }

    public function delete(string $key, int $rowId): void
    {
        if (isset($this->hash[$key])) {
            $packed = pack('Q', $rowId);
            $blob = $this->hash[$key];
            $pos = strpos($blob, $packed);
            while ($pos !== false && $pos % 8 !== 0) {
                $pos = strpos($blob, $packed, $pos + 1);
            }
            if ($pos !== false) {
                $this->hash[$key] = substr($blob, 0, $pos) . substr($blob, $pos + 8);
                if ($this->hash[$key] === '') {
                    unset($this->hash[$key]);
                }
            }
            return;
        }
        $this->inner?->delete($key, $rowId);
    }

    public function has(string $key): bool
    {
        return isset($this->hash[$key]) || ($this->inner?->has($key) ?? false);
    }

    public function eq(string $key): Traversable
    {
        if (isset($this->hash[$key])) {
            foreach (unpack('Q*', $this->hash[$key]) as $rowId) {
                yield $rowId;
            }
        } elseif ($this->inner !== null) {
            yield from $this->inner->eq($key);
        }
    }

    public function count(): int
    {
        return count($this->hash) + ($this->inner ? count($this->inner) : 0);
    }

    public function range(?string $start = null, ?string $end = null, bool $reverse = false): Traversable
    {
        // Compact when inner has grown large relative to local
        if ($this->inner !== null && count($this->inner) > count($this->hash) * self::COMPACTION_RATIO) {
            $this->compact();
        }

        $from = $reverse ? $end : $start;
        $to = $reverse ? $start : $end;

        foreach ($this->walkSortedBlobs($from, $to, $reverse) as $blob) {
            if ($reverse) {
                $u = unpack('Q*', $blob);
                for ($j = count($u); $j >= 1; $j--) yield $u[$j];
            } else {
                foreach (unpack('Q*', $blob) as $rowId) {
                    yield $rowId;
                }
            }
        }
    }

    /**
     * Walk [key => blob] pairs in sorted key order, merging with inner layer.
     */
    private function walkSortedBlobs(?string $from, ?string $to, bool $reverseHint): Traversable
    {
        $this->ensureSorted();

        $reverse = ($from !== null && $to !== null) ? strcmp($from, $to) > 0 : $reverseHint;
        $inner = $this->inner?->walkSortedBlobs($from, $to, $reverse) ?? new EmptyIterator;
        $n = count($this->sorted);

        if (!$reverse) {
            $i = $from !== null ? self::lowerBound($this->sorted, $from) : 0;
            $stop = $to !== null ? self::upperBound($this->sorted, $to) : $n;
            $step = 1;
        } else {
            $i = $from !== null ? self::upperBound($this->sorted, $from) - 1 : $n - 1;
            $stop = $to !== null ? self::lowerBound($this->sorted, $to) - 1 : -1;
            $step = -1;
        }

        for (; $i !== $stop; $i += $step) {
            $key = $this->sorted[$i];
            if (!isset($this->hash[$key])) continue;

            while ($inner->valid() && strcmp($key, $inner->key()) * $step > 0) {
                yield $inner->key() => $inner->current();
                $inner->next();
            }

            if ($inner->valid() && $key === $inner->key()) {
                throw new \LogicException("Key '$key' exists in both local and inner layer");
            }

            yield $key => $this->hash[$key];
        }

        while ($inner->valid()) {
            yield $inner->key() => $inner->current();
            $inner->next();
        }
    }

    private function compact(): void
    {
        if ($this->inner === null) {
            return;
        }

        $sorted = [];
        foreach ($this->walkSortedBlobs(null, null, false) as $key => $blob) {
            $sorted[] = $key;
            $this->hash[$key] = $blob;
        }
        $this->sorted = $sorted;
        $this->inner = null;
    }

    private function ensureSorted(): void
    {
        if ($this->sorted === null) {
            $this->sorted = array_keys($this->hash);
            sort($this->sorted, SORT_STRING);
        }
    }

    private static function lowerBound(array $a, string $x): int
    {
        $lo = 0;
        $hi = count($a);
        while ($lo < $hi) {
            $mid = ($lo + $hi) >> 1;
            if (strcmp($a[$mid], $x) < 0) {
                $lo = $mid + 1;
            } else {
                $hi = $mid;
            }
        }
        return $lo;
    }

    private static function upperBound(array $a, string $x): int
    {
        $lo = 0;
        $hi = count($a);
        while ($lo < $hi) {
            $mid = ($lo + $hi) >> 1;
            if (strcmp($a[$mid], $x) <= 0) {
                $lo = $mid + 1;
            } else {
                $hi = $mid;
            }
        }
        return $lo;
    }
}