TreapIndex.php

PHP

Path: src/Table/Index/TreapIndex.php

<?php
namespace mini\Table\Index;

use Traversable;

/**
 * Hybrid hash/sorted-array/treap index optimized for real-world access patterns.
 *
 * Three stages based on usage:
 * 1. Hash only: eq() lookups, no range yet
 * 2. Sorted array: first range() called, assume no more inserts
 * 3. Treap: insert after range, convert sorted→treap once, stay in treap
 *
 * This avoids merge overhead by using only one structure at a time for range().
 */
final class TreapIndex implements IndexInterface
{
    private const MODE_HASH = 0;
    private const MODE_SORTED = 1;
    private const MODE_TREAP = 2;

    /** @var array<string, string> key => packed rowIds (always maintained) */
    private array $hash = [];

    /** @var int Current mode */
    private int $mode = self::MODE_HASH;

    /** @var ?array<string> Sorted keys (MODE_SORTED only) */
    private ?array $sorted = null;

    /** @var ?TreapNode Treap root (MODE_TREAP only) */
    private ?TreapNode $treap = null;

    /**
     * Build index from a generator function.
     * Generator should yield [string $key, int $rowId] pairs.
     */
    public static function fromGenerator(\Closure $fn): self
    {
        $index = new self();
        foreach ($fn() as [$key, $rowId]) {
            $index->hash[$key] = ($index->hash[$key] ?? '') . pack('Q', $rowId);
        }
        return $index;
    }

    /**
     * Build index from array of [key, rowId] pairs.
     */
    public static function fromArray(array $rows): self
    {
        $index = new self();
        foreach ($rows as [$key, $rowId]) {
            $index->hash[$key] = ($index->hash[$key] ?? '') . pack('Q', $rowId);
        }
        return $index;
    }

    public function insert(string $key, int $rowId): void
    {
        $packed = pack('Q', $rowId);
        $isNew = !isset($this->hash[$key]);

        if ($isNew) {
            $this->hash[$key] = $packed;
        } else {
            $this->hash[$key] .= $packed;
        }

        // Handle new key based on current mode
        if ($isNew && $this->mode === self::MODE_SORTED) {
            // Insert after range - convert to treap mode
            $this->convertToTreap();
            $this->treap = $this->treapInsert($this->treap, $key);
        } elseif ($isNew && $this->mode === self::MODE_TREAP) {
            $this->treap = $this->treapInsert($this->treap, $key);
        }
    }

    public function delete(string $key, int $rowId): void
    {
        if (!isset($this->hash[$key])) {
            return;
        }

        $packed = pack('Q', $rowId);
        $blob = $this->hash[$key];
        $pos = strpos($blob, $packed);
        while ($pos !== false && $pos % 8 !== 0) {
            $pos = strpos($blob, $packed, $pos + 1);
        }

        if ($pos === false) {
            return;
        }

        $this->hash[$key] = substr($blob, 0, $pos) . substr($blob, $pos + 8);

        if ($this->hash[$key] === '') {
            unset($this->hash[$key]);
            // Note: we don't remove from sorted/treap - range() checks hash
        }
    }

    public function has(string $key): bool
    {
        return isset($this->hash[$key]);
    }

    public function eq(string $key): Traversable
    {
        if (isset($this->hash[$key])) {
            foreach (unpack('Q*', $this->hash[$key]) as $rowId) {
                yield $rowId;
            }
        }
    }

    /**
     * Count rowIds for a specific key.
     */
    public function count(string $key): int
    {
        return intdiv(strlen($this->hash[$key] ?? ''), 8);
    }

    /**
     * Count unique keys.
     */
    public function keyCount(): int
    {
        return count($this->hash);
    }

    /**
     * Count total rowIds across all keys.
     */
    public function rowCount(): int
    {
        $sum = 0;
        foreach ($this->hash as $blob) {
            $sum += strlen($blob);
        }
        return intdiv($sum, 8);
    }

    /**
     * Clear all data from the index.
     */
    public function clear(): void
    {
        $this->hash = [];
        $this->sorted = null;
        $this->treap = null;
        $this->mode = self::MODE_HASH;
    }

    /**
     * Count rowIds in a range.
     */
    public function countRange(?string $start = null, ?string $end = null): int
    {
        $count = 0;
        foreach ($this->range($start, $end) as $_) {
            $count++;
        }
        return $count;
    }

    public function range(?string $start = null, ?string $end = null, bool $reverse = false): Traversable
    {
        // First range() call: build sorted array
        if ($this->mode === self::MODE_HASH) {
            $this->sorted = array_keys($this->hash);
            sort($this->sorted, SORT_STRING);
            $this->mode = self::MODE_SORTED;
        }

        // Use appropriate structure
        if ($this->mode === self::MODE_SORTED) {
            yield from $this->sortedRange($start, $end, $reverse);
        } else {
            yield from $this->treapRangeWithBlobs($start, $end, $reverse);
        }
    }

    /**
     * Convert sorted array to treap (called on first insert after range).
     */
    private function convertToTreap(): void
    {
        // Filter out keys deleted since sorted was built
        $live = [];
        foreach ($this->sorted as $key) {
            if (isset($this->hash[$key])) {
                $live[] = $key;
            }
        }

        $this->treap = $this->buildBalanced($live, 0, count($live) - 1);
        $this->sorted = null;
        $this->mode = self::MODE_TREAP;
    }

    /**
     * Build balanced treap from sorted keys.
     */
    private function buildBalanced(array $keys, int $lo, int $hi): ?TreapNode
    {
        if ($lo > $hi) {
            return null;
        }

        $mid = ($lo + $hi) >> 1;
        $key = $keys[$mid];
        $node = new TreapNode($key, crc32($key) & 0xffffffff); // Deterministic priority
        $node->left = $this->buildBalanced($keys, $lo, $mid - 1);
        $node->right = $this->buildBalanced($keys, $mid + 1, $hi);

        return $node;
    }

    // =========================================================================
    // Sorted array operations
    // =========================================================================

    private function sortedRange(?string $start, ?string $end, bool $reverse): \Generator
    {
        $n = count($this->sorted);

        if (!$reverse) {
            $i = $start !== null ? $this->lowerBound($start) : 0;
            $stop = $end !== null ? $this->upperBound($end) : $n;
            for (; $i < $stop; $i++) {
                $key = $this->sorted[$i];
                if (!isset($this->hash[$key])) continue;
                foreach (unpack('Q*', $this->hash[$key]) as $rowId) {
                    yield $rowId;
                }
            }
        } else {
            $i = $end !== null ? $this->upperBound($end) - 1 : $n - 1;
            $stop = $start !== null ? $this->lowerBound($start) - 1 : -1;
            for (; $i > $stop; $i--) {
                $key = $this->sorted[$i];
                if (!isset($this->hash[$key])) continue;
                $u = unpack('Q*', $this->hash[$key]);
                for ($j = count($u); $j >= 1; $j--) {
                    yield $u[$j];
                }
            }
        }
    }

    private function lowerBound(string $key): int
    {
        $lo = 0;
        $hi = count($this->sorted);
        while ($lo < $hi) {
            $mid = ($lo + $hi) >> 1;
            if (strcmp($this->sorted[$mid], $key) < 0) {
                $lo = $mid + 1;
            } else {
                $hi = $mid;
            }
        }
        return $lo;
    }

    private function upperBound(string $key): int
    {
        $lo = 0;
        $hi = count($this->sorted);
        while ($lo < $hi) {
            $mid = ($lo + $hi) >> 1;
            if (strcmp($this->sorted[$mid], $key) <= 0) {
                $lo = $mid + 1;
            } else {
                $hi = $mid;
            }
        }
        return $lo;
    }

    // =========================================================================
    // Treap operations
    // =========================================================================

    private function treapInsert(?TreapNode $node, string $key): TreapNode
    {
        if ($node === null) {
            return new TreapNode($key, crc32($key) & 0xffffffff); // Deterministic priority
        }

        $cmp = strcmp($key, $node->key);
        if ($cmp === 0) {
            return $node;
        }

        if ($cmp < 0) {
            $node->left = $this->treapInsert($node->left, $key);
            if ($node->left->priority > $node->priority) {
                $node = $this->rotateRight($node);
            }
        } else {
            $node->right = $this->treapInsert($node->right, $key);
            if ($node->right->priority > $node->priority) {
                $node = $this->rotateLeft($node);
            }
        }

        return $node;
    }

    /**
     * Treap range with blob yielding (iterative, strict pruning).
     */
    private function treapRangeWithBlobs(?string $start, ?string $end, bool $reverse): \Generator
    {
        $node = $this->treap;
        $stack = [];

        if (!$reverse) {
            while ($node !== null || $stack) {
                // Go left, but prune subtrees that are entirely out of range
                while ($node !== null) {
                    // If node < start, entire left subtree is also < start, skip to right
                    if ($start !== null && strcmp($node->key, $start) < 0) {
                        $node = $node->right;
                        continue;
                    }
                    $stack[] = $node;
                    $node = $node->left;
                }

                if (!$stack) break;
                $node = array_pop($stack);

                // If past end, we're done (in-order means all remaining are > end)
                if ($end !== null && strcmp($node->key, $end) > 0) {
                    return;
                }

                // Yield if key exists in hash (lazy delete check)
                if (isset($this->hash[$node->key])) {
                    foreach (unpack('Q*', $this->hash[$node->key]) as $rowId) {
                        yield $rowId;
                    }
                }

                $node = $node->right;
            }
        } else {
            while ($node !== null || $stack) {
                // Go right, but prune subtrees that are entirely out of range
                while ($node !== null) {
                    // If node > end, entire right subtree is also > end, skip to left
                    if ($end !== null && strcmp($node->key, $end) > 0) {
                        $node = $node->left;
                        continue;
                    }
                    $stack[] = $node;
                    $node = $node->right;
                }

                if (!$stack) break;
                $node = array_pop($stack);

                // If past start, we're done (reverse in-order means all remaining are < start)
                if ($start !== null && strcmp($node->key, $start) < 0) {
                    return;
                }

                // Yield if key exists in hash (lazy delete check)
                if (isset($this->hash[$node->key])) {
                    $u = unpack('Q*', $this->hash[$node->key]);
                    for ($j = count($u); $j >= 1; $j--) {
                        yield $u[$j];
                    }
                }

                $node = $node->left;
            }
        }
    }

    private function rotateRight(TreapNode $node): TreapNode
    {
        $left = $node->left;
        $node->left = $left->right;
        $left->right = $node;
        return $left;
    }

    private function rotateLeft(TreapNode $node): TreapNode
    {
        $right = $node->right;
        $node->right = $right->left;
        $right->left = $node;
        return $right;
    }
}

/**
 * @internal
 */
final class TreapNode
{
    public ?TreapNode $left = null;
    public ?TreapNode $right = null;

    public function __construct(
        public string $key,
        public int $priority,
    ) {}
}