InnerJoinTable.php

PHP

Path: src/Table/Wrappers/InnerJoinTable.php

<?php

namespace mini\Table\Wrappers;

use mini\Table\AbstractTable;
use mini\Table\ColumnDef;
use mini\Table\Contracts\SetInterface;
use mini\Table\Contracts\TableInterface;
use mini\Table\OrderDef;
use mini\Table\Predicate;
use mini\Table\Types\IndexType;
use Traversable;

/**
 * Inner join of two tables with equi-join condition
 *
 * Yields rows where the join condition matches between left and right tables.
 * Uses property-based binding: left table must have '__bind__' property with Predicate.
 *
 * Basic nested loop: iterate right, probe left with eq() for each row.
 */
class InnerJoinTable extends AbstractTable
{
    /** @var string Left column name for join */
    private string $leftCol;

    /** @var string Right column name for join */
    private string $rightCol;

    public function __construct(
        private TableInterface $left,
        private TableInterface $right,
    ) {
        // Extract bind predicate from left's property
        $bindPredicate = $left->getProperty('__bind__');
        if (!$bindPredicate instanceof Predicate) {
            throw new \InvalidArgumentException(
                'INNER JOIN requires __bind__ property with Predicate on left table'
            );
        }

        // Extract the single equi-join condition
        $conditions = $bindPredicate->getConditions();
        if (count($conditions) !== 1) {
            throw new \InvalidArgumentException(
                'INNER JOIN currently only supports single equi-join condition'
            );
        }

        $cond = $conditions[0];
        $this->leftCol = $cond['column'];
        $this->rightCol = ltrim($cond['value'], ':');

        $leftCols = $left->getColumns();
        $rightCols = $right->getColumns();

        // Validate columns exist
        if (!isset($leftCols[$this->leftCol])) {
            throw new \InvalidArgumentException(
                "Left join column '{$this->leftCol}' does not exist"
            );
        }
        if (!isset($rightCols[$this->rightCol])) {
            throw new \InvalidArgumentException(
                "Right join column '{$this->rightCol}' does not exist"
            );
        }

        // Validate no column name conflicts
        foreach ($leftCols as $name => $_) {
            if (isset($rightCols[$name])) {
                throw new \InvalidArgumentException(
                    "Column name conflict in INNER JOIN: '$name'. Use withAlias() to disambiguate."
                );
            }
        }

        // Merge column definitions
        $merged = [];
        foreach ($leftCols as $name => $def) {
            $merged[] = new ColumnDef($name, $def->type, $def->index);
        }
        foreach ($rightCols as $name => $def) {
            $merged[] = new ColumnDef($name, $def->type, $def->index);
        }

        parent::__construct(...$merged);
    }

    protected function materialize(string ...$additionalColumns): Traversable
    {
        $leftCol = $this->leftCol;
        $rightCol = $this->rightCol;

        // Check index status on join columns
        $leftCols = $this->left->getColumns();
        $rightCols = $this->right->getColumns();
        $leftIndexed = $leftCols[$leftCol]->index->isIndexed();
        $rightIndexed = $rightCols[$rightCol]->index->isIndexed();

        if ($leftIndexed || $rightIndexed) {
            // Sort-merge join: at least one side can sort efficiently
            yield from $this->sortMergeJoin();
        } else {
            // Neither side indexed: use partitioned hash join
            yield from $this->blockHashJoin();
        }
    }

    /**
     * Sort-merge join: sort both sides and merge matching runs
     */
    private function sortMergeJoin(): Traversable
    {
        $leftCol = $this->leftCol;
        $rightCol = $this->rightCol;
        $limit = $this->getLimit();
        $offset = $this->getOffset();

        // Get sorted iterators - use order() to let each table sort efficiently
        $sortedLeft = $this->left->order($leftCol);
        $sortedRight = $this->right->order($rightCol);

        $leftIter = $sortedLeft->getIterator();
        $rightIter = $sortedRight->getIterator();

        $leftIter->rewind();
        $rightIter->rewind();

        $rowId = 0;
        $skipped = 0;
        $emitted = 0;

        // Buffer for handling duplicate keys
        $leftBuffer = [];
        $currentLeftKey = null;

        while ($leftIter->valid() && $rightIter->valid()) {
            $leftRow = $leftIter->current();
            $rightRow = $rightIter->current();
            $leftKey = $leftRow->$leftCol;
            $rightKey = $rightRow->$rightCol;

            if ($leftKey < $rightKey) {
                // Left is behind, advance it
                $leftIter->next();
                $leftBuffer = [];
                $currentLeftKey = null;
            } elseif ($leftKey > $rightKey) {
                // Right is behind, advance it
                $rightIter->next();
            } else {
                // Keys match - collect all left rows with this key
                if ($currentLeftKey !== $leftKey) {
                    $leftBuffer = [];
                    $currentLeftKey = $leftKey;
                    while ($leftIter->valid()) {
                        $lr = $leftIter->current();
                        if ($lr->$leftCol !== $leftKey) {
                            break;
                        }
                        $leftBuffer[] = $lr;
                        $leftIter->next();
                    }
                }

                // Emit all combinations with current right row
                foreach ($leftBuffer as $lr) {
                    if ($skipped++ < $offset) {
                        continue;
                    }
                    yield $rowId++ => (object) ((array) $lr + (array) $rightRow);
                    if ($limit !== null && ++$emitted >= $limit) {
                        return;
                    }
                }

                $rightIter->next();
            }
        }
    }

    /**
     * Block nested loop join with hash probe
     *
     * Processes left side in chunks, scanning right side once per chunk.
     * Memory bounded to chunk size, trades memory for right-side scans.
     */
    private function blockHashJoin(): Traversable
    {
        $leftCol = $this->leftCol;
        $rightCol = $this->rightCol;
        $limit = $this->getLimit();
        $offset = $this->getOffset();

        // TODO: Tune chunk size - can probably be 1000 or so
        $chunkSize = 64;
        $rowId = 0;
        $skipped = 0;
        $emitted = 0;

        // Process left side in chunks
        $hashTable = [];
        $chunkCount = 0;
        $leftIter = $this->left->getIterator();

        foreach ($leftIter as $leftRow) {
            $key = $leftRow->$leftCol;
            $hashTable[$key][] = $leftRow;
            $chunkCount++;

            // When chunk is full, scan right side
            if ($chunkCount >= $chunkSize) {
                // Full scan of right, probe hash table
                foreach ($this->right as $rightRow) {
                    $key = $rightRow->$rightCol;
                    if (!isset($hashTable[$key])) {
                        continue;
                    }

                    foreach ($hashTable[$key] as $matchedLeft) {
                        if ($skipped++ < $offset) {
                            continue;
                        }
                        yield $rowId++ => (object) ((array) $matchedLeft + (array) $rightRow);
                        if ($limit !== null && ++$emitted >= $limit) {
                            return;
                        }
                    }
                }

                // Clear chunk for next batch
                $hashTable = [];
                $chunkCount = 0;
            }
        }

        // Process remaining rows in final partial chunk
        if ($chunkCount > 0) {
            foreach ($this->right as $rightRow) {
                $key = $rightRow->$rightCol;
                if (!isset($hashTable[$key])) {
                    continue;
                }

                foreach ($hashTable[$key] as $matchedLeft) {
                    if ($skipped++ < $offset) {
                        continue;
                    }
                    yield $rowId++ => (object) ((array) $matchedLeft + (array) $rightRow);
                    if ($limit !== null && ++$emitted >= $limit) {
                        return;
                    }
                }
            }
        }
    }

    public function order(?string $spec): TableInterface
    {
        $orders = $spec ? OrderDef::parse($spec) : [];
        if (empty($orders)) {
            return $this;
        }
        return new SortedTable($this, ...$orders);
    }

    // ─────────────────────────────────────────────────────────────────────────
    // Filter pushdown
    // ─────────────────────────────────────────────────────────────────────────

    public function eq(string $column, mixed $value): TableInterface
    {
        return $this->pushFilter(__FUNCTION__, $column, $value);
    }

    public function lt(string $column, mixed $value): TableInterface
    {
        return $this->pushFilter(__FUNCTION__, $column, $value);
    }

    public function lte(string $column, mixed $value): TableInterface
    {
        return $this->pushFilter(__FUNCTION__, $column, $value);
    }

    public function gt(string $column, mixed $value): TableInterface
    {
        return $this->pushFilter(__FUNCTION__, $column, $value);
    }

    public function gte(string $column, mixed $value): TableInterface
    {
        return $this->pushFilter(__FUNCTION__, $column, $value);
    }

    public function in(string $column, SetInterface $values): TableInterface
    {
        return $this->pushFilter(__FUNCTION__, $column, $values);
    }

    public function like(string $column, string $pattern): TableInterface
    {
        return $this->pushFilter(__FUNCTION__, $column, $pattern);
    }

    public function count(): int
    {
        return iterator_count($this);
    }

    /**
     * Push a filter operation to the appropriate source table
     */
    private function pushFilter(string $method, string $column, mixed $value): TableInterface
    {
        $leftCols = $this->left->getColumns();
        $rightCols = $this->right->getColumns();

        if (isset($leftCols[$column])) {
            $filtered = $this->left->$method($column, $value);
            return $this->withFilteredSources($filtered, $this->right);
        }

        if (isset($rightCols[$column])) {
            $filtered = $this->right->$method($column, $value);
            return $this->withFilteredSources($this->left, $filtered);
        }

        throw new \InvalidArgumentException("Unknown column in INNER JOIN: '$column'");
    }

    /**
     * Create new join with filtered source tables
     */
    private function withFilteredSources(TableInterface $left, TableInterface $right): TableInterface
    {
        // Recreate the bind predicate
        $predicate = (new Predicate())->eqBind($this->leftCol, ':' . $this->rightCol);
        $leftWithBind = $left->withProperty('__bind__', $predicate);

        $new = new self($leftWithBind, $right);
        if ($this->getLimit() !== null) {
            $new = $new->limit($this->getLimit());
        }
        if ($this->getOffset() > 0) {
            $new = $new->offset($this->getOffset());
        }
        return $new;
    }
}