ArrayTable.php
PHP
Path: src/Table/ArrayTable.php
<?php
namespace mini\Table;
use mini\Table\Contracts\MutableTableInterface;
use mini\Table\Contracts\SetInterface;
use mini\Table\Contracts\TableInterface;
use mini\Table\Index\TreapIndex;
use mini\Table\Types\ColumnType;
use mini\Table\Types\IndexType;
use mini\Table\Utility\EmptyTable;
use mini\Table\Wrappers\SortedTable;
use Traversable;
/**
* Pure PHP array-backed in-memory table implementation
*
* Unlike InMemoryTable which uses SQLite as a backend, this implementation
* stores all data in PHP arrays. Useful for benchmarking and environments
* without SQLite extension.
*
* ```php
* $table = new ArrayTable(
* new ColumnDef('id', ColumnType::Int, IndexType::Primary),
* new ColumnDef('name', ColumnType::Text),
* );
*
* $table->insert(['id' => 1, 'name' => 'Alice']);
* ```
*/
class ArrayTable extends AbstractTable implements MutableTableInterface
{
/** @var array<int, object> All rows indexed by rowid (stored as objects for fast iteration) */
private array $rows = [];
/** @var int Auto-increment counter for rowid */
private int $nextRowId = 1;
/** @var array<string, TreapIndex> Index: column name => TreapIndex (for primary/unique columns) */
private array $indexes = [];
/** @var array{column: string, op: string, value: mixed}[] */
private array $where = [];
/** @var OrderDef[] */
private array $orderBy = [];
public function __construct(ColumnDef ...$columns)
{
if (empty($columns)) {
throw new \InvalidArgumentException('ArrayTable requires at least one column');
}
parent::__construct(...$columns);
// Initialize TreapIndex for all indexed columns
foreach ($columns as $col) {
if ($col->index->isIndexed()) {
$this->indexes[$col->name] = new TreapIndex();
}
}
}
public function __clone()
{
// Keep same row data reference - clones share the data (like InMemoryTable)
}
// =========================================================================
// Mutation methods
// =========================================================================
public function insert(array $row): int|string
{
$rowId = $this->nextRowId++;
// Update indexes (TreapIndex uses string keys - pack numeric types for proper ordering)
foreach ($this->indexes as $col => $index) {
if (isset($row[$col])) {
$index->insert($this->toIndexKey($col, $row[$col]), $rowId);
}
}
$this->rows[$rowId] = (object) $row;
return $rowId;
}
public function update(TableInterface $query, array $changes): int
{
$this->validateQuery($query);
$affected = 0;
foreach ($query as $rowId => $_) {
// Update indexes for changed indexed columns
foreach ($this->indexes as $col => $index) {
if (isset($changes[$col])) {
// Remove old index entry
$oldValue = $this->rows[$rowId]->$col ?? null;
if ($oldValue !== null) {
$index->delete($this->toIndexKey($col, $oldValue), $rowId);
}
// Add new index entry
$index->insert($this->toIndexKey($col, $changes[$col]), $rowId);
}
}
// Apply changes
foreach ($changes as $col => $value) {
$this->rows[$rowId]->$col = $value;
}
$affected++;
}
return $affected;
}
public function delete(TableInterface $query): int
{
$this->validateQuery($query);
// Collect rowids first to avoid modifying array during iteration
$rowIds = [];
foreach ($query as $rowId => $_) {
$rowIds[] = $rowId;
}
foreach ($rowIds as $rowId) {
// Remove from indexes
foreach ($this->indexes as $col => $index) {
$value = $this->rows[$rowId]->$col ?? null;
if ($value !== null) {
$index->delete($this->toIndexKey($col, $value), $rowId);
}
}
unset($this->rows[$rowId]);
}
return count($rowIds);
}
private function validateQuery(TableInterface $query): void
{
if (!$query instanceof self) {
throw new \InvalidArgumentException(
'Query must be an ArrayTable derived from this table'
);
}
if ($query->rows !== $this->rows) {
throw new \InvalidArgumentException(
'Query must be derived from the same table instance'
);
}
}
/**
* Convert a value to an index key based on column type.
* Uses Index::packInt/packFloat for numeric types to ensure proper strcmp ordering.
*/
private function toIndexKey(string $column, mixed $value): string
{
$cols = $this->getAllColumns();
$type = $cols[$column]->type ?? ColumnType::Text;
return match ($type) {
ColumnType::Int => Index::packInt((int) $value),
ColumnType::Float, ColumnType::Decimal => Index::packFloat((float) $value),
default => (string) $value,
};
}
// =========================================================================
// Filter methods
// =========================================================================
public function eq(string $column, int|float|string|null $value): TableInterface
{
$clone = clone $this;
$clone->where[] = ['column' => $column, 'op' => '=', 'value' => $value];
return $clone;
}
public function lt(string $column, int|float|string $value): TableInterface
{
$clone = clone $this;
$clone->where[] = ['column' => $column, 'op' => '<', 'value' => $value];
return $clone;
}
public function lte(string $column, int|float|string $value): TableInterface
{
$clone = clone $this;
$clone->where[] = ['column' => $column, 'op' => '<=', 'value' => $value];
return $clone;
}
public function gt(string $column, int|float|string $value): TableInterface
{
$clone = clone $this;
$clone->where[] = ['column' => $column, 'op' => '>', 'value' => $value];
return $clone;
}
public function gte(string $column, int|float|string $value): TableInterface
{
$clone = clone $this;
$clone->where[] = ['column' => $column, 'op' => '>=', 'value' => $value];
return $clone;
}
public function in(string $column, SetInterface $values): TableInterface
{
// Materialize the set values
$members = [];
foreach ($values as $row) {
$cols = array_keys($values->getColumns());
if (count($cols) === 1) {
$members[] = $row->{$cols[0]};
}
}
if (empty($members)) {
return EmptyTable::from($this);
}
$clone = clone $this;
$clone->where[] = ['column' => $column, 'op' => 'IN', 'value' => $members];
return $clone;
}
public function like(string $column, string $pattern): TableInterface
{
$clone = clone $this;
$clone->where[] = ['column' => $column, 'op' => 'LIKE', 'value' => $pattern];
return $clone;
}
// =========================================================================
// Ordering
// =========================================================================
public function order(?string $spec): TableInterface
{
$orders = $spec ? OrderDef::parse($spec) : [];
if (empty($orders)) {
$clone = clone $this;
$clone->orderBy = [];
return $clone;
}
// If primary order column is indexed, we can satisfy it via index scan
$primaryCol = $orders[0]->column;
if (isset($this->indexes[$primaryCol]) && count($orders) === 1) {
$clone = clone $this;
$clone->orderBy = $orders;
return $clone;
}
// Otherwise use SortedTable wrapper
return new SortedTable($this, ...$orders);
}
// =========================================================================
// Materialization
// =========================================================================
protected function materialize(string ...$additionalColumns): Traversable
{
$offset = $this->offset;
$limit = $this->limit;
$count = 0;
$yielded = 0;
// Analyze filters and ordering to find best index strategy
[$indexCol, $lowBound, $highBound, $includeLow, $includeHigh, $remainingFilters] = $this->planIndexScan();
// Check if we can use an index
if ($indexCol !== null && isset($this->indexes[$indexCol])) {
$index = $this->indexes[$indexCol];
// Determine iteration direction from ORDER BY
$reverse = false;
if (!empty($this->orderBy) && $this->orderBy[0]->column === $indexCol) {
$reverse = !$this->orderBy[0]->asc;
}
// Stream from index - yields rowIds in sorted order
foreach ($index->range($lowBound, $highBound, $reverse) as $rowId) {
if (!isset($this->rows[$rowId])) {
continue;
}
$row = $this->rows[$rowId];
// Apply remaining (non-indexed) filters only
if (!$this->matchesFilterList($row, $rowId, $remainingFilters)) {
continue;
}
if ($offset > 0 && $count < $offset) {
$count++;
continue;
}
if ($limit !== null && $yielded >= $limit) {
return;
}
yield $rowId => $row;
$count++;
$yielded++;
}
return;
}
// Full scan path - no usable index
// Note: orderBy is only set when it can be satisfied by index,
// otherwise order() returns SortedTable wrapper
foreach ($this->rows as $rowId => $row) {
if (!$this->matchesFilters($row, $rowId)) {
continue;
}
if ($offset > 0 && $count < $offset) {
$count++;
continue;
}
if ($limit !== null && $yielded >= $limit) {
break;
}
yield $rowId => $row;
$count++;
$yielded++;
}
}
/**
* Plan an index scan based on filters and ordering
*
* Returns: [indexCol, lowBound, highBound, includeLow, includeHigh, remainingFilters]
* - indexCol: column to use for index scan, or null for full scan
* - lowBound/highBound: range bounds (null = unbounded)
* - includeLow/includeHigh: whether bounds are inclusive
* - remainingFilters: filters that couldn't use the index
*/
private function planIndexScan(): array
{
$indexCol = null;
$lowBound = null;
$highBound = null;
$includeLow = false;
$includeHigh = false;
$remainingFilters = [];
// Prefer ORDER BY column if indexed (gives us sorted output for free)
if (!empty($this->orderBy)) {
$orderCol = $this->orderBy[0]->column;
if (isset($this->indexes[$orderCol])) {
$indexCol = $orderCol;
}
}
// Process filters to find bounds and remaining filters
foreach ($this->where as $filter) {
$column = $filter['column'];
$op = $filter['op'];
$value = $filter['value'];
// If this filter is on an indexed column
if (isset($this->indexes[$column])) {
// Prefer eq filter (most selective) or match ORDER BY column
if ($indexCol === null || $column === $indexCol || $op === '=') {
if ($op === '=' && $value !== null) {
// Exact match - set both bounds to same value
$indexCol = $column;
$indexKey = $this->toIndexKey($column, $value);
$lowBound = $indexKey;
$highBound = $indexKey;
// Don't add to remaining - already handled by bounds
continue;
} elseif (($indexCol === null || $column === $indexCol) && $value !== null) {
// Range filter - use this column for index scan
$indexCol = $column;
// TreapIndex uses inclusive bounds, so convert exclusive to inclusive:
// > X becomes >= X+1 (for integers) or add to remainingFilters
// < X becomes <= X-1 (for integers) or add to remainingFilters
$cols = $this->getAllColumns();
$type = $cols[$column]->type ?? ColumnType::Text;
if ($op === '>' || $op === '>=') {
if ($op === '>' && $type === ColumnType::Int) {
// Exclusive: > X is same as >= X+1
$adjustedKey = $this->toIndexKey($column, (int) $value + 1);
} else {
$adjustedKey = $this->toIndexKey($column, $value);
}
if ($lowBound === null || $adjustedKey > $lowBound) {
$lowBound = $adjustedKey;
}
// For non-integer exclusive bounds, add to remainingFilters
if ($op === '>' && $type !== ColumnType::Int) {
$remainingFilters[] = $filter;
}
continue;
} elseif ($op === '<' || $op === '<=') {
if ($op === '<' && $type === ColumnType::Int) {
// Exclusive: < X is same as <= X-1
$adjustedKey = $this->toIndexKey($column, (int) $value - 1);
} else {
$adjustedKey = $this->toIndexKey($column, $value);
}
if ($highBound === null || $adjustedKey < $highBound) {
$highBound = $adjustedKey;
}
// For non-integer exclusive bounds, add to remainingFilters
if ($op === '<' && $type !== ColumnType::Int) {
$remainingFilters[] = $filter;
}
continue;
}
}
}
}
// Filter couldn't use index
$remainingFilters[] = $filter;
}
return [$indexCol, $lowBound, $highBound, $includeLow, $includeHigh, $remainingFilters];
}
/**
* Check if row matches a specific list of filters
*/
private function matchesFilterList(object $row, int $rowId, array $filters): bool
{
foreach ($filters as $filter) {
$column = $filter['column'];
$op = $filter['op'];
$filterValue = $filter['value'];
if ($column === '_rowid_') {
$rowValue = $rowId;
} else {
$rowValue = $row->$column ?? null;
}
$matches = match ($op) {
'=' => $this->compareEqual($rowValue, $filterValue),
'<' => $this->compareLt($rowValue, $filterValue),
'<=' => $this->compareLte($rowValue, $filterValue),
'>' => $this->compareGt($rowValue, $filterValue),
'>=' => $this->compareGte($rowValue, $filterValue),
'IN' => in_array($rowValue, $filterValue, false),
'LIKE' => $this->compareLike($rowValue, $filterValue),
default => true,
};
if (!$matches) {
return false;
}
}
return true;
}
private function matchesFilters(object $row, int $rowId): bool
{
foreach ($this->where as $filter) {
$column = $filter['column'];
$op = $filter['op'];
$filterValue = $filter['value'];
// Special handling for _rowid_ (internal row identifier)
if ($column === '_rowid_') {
$rowValue = $rowId;
} else {
$rowValue = $row->$column ?? null;
}
$matches = match ($op) {
'=' => $this->compareEqual($rowValue, $filterValue),
'<' => $this->compareLt($rowValue, $filterValue),
'<=' => $this->compareLte($rowValue, $filterValue),
'>' => $this->compareGt($rowValue, $filterValue),
'>=' => $this->compareGte($rowValue, $filterValue),
'IN' => in_array($rowValue, $filterValue, false),
'LIKE' => $this->compareLike($rowValue, $filterValue),
default => true,
};
if (!$matches) {
return false;
}
}
return true;
}
private function compareEqual(mixed $a, mixed $b): bool
{
// NULL handling: NULL = NULL is true, NULL = anything_else is false
if ($a === null && $b === null) {
return true;
}
if ($a === null || $b === null) {
return false;
}
// Numeric comparison for numeric strings
if (is_numeric($a) && is_numeric($b)) {
return (float) $a == (float) $b;
}
return $a == $b;
}
private function compareLt(mixed $a, mixed $b): bool
{
if ($a === null || $b === null) {
return false;
}
if (is_numeric($a) && is_numeric($b)) {
return (float) $a < (float) $b;
}
return $a < $b;
}
private function compareLte(mixed $a, mixed $b): bool
{
if ($a === null || $b === null) {
return false;
}
if (is_numeric($a) && is_numeric($b)) {
return (float) $a <= (float) $b;
}
return $a <= $b;
}
private function compareGt(mixed $a, mixed $b): bool
{
if ($a === null || $b === null) {
return false;
}
if (is_numeric($a) && is_numeric($b)) {
return (float) $a > (float) $b;
}
return $a > $b;
}
private function compareGte(mixed $a, mixed $b): bool
{
if ($a === null || $b === null) {
return false;
}
if (is_numeric($a) && is_numeric($b)) {
return (float) $a >= (float) $b;
}
return $a >= $b;
}
private function compareLike(mixed $value, string $pattern): bool
{
if ($value === null) {
return false;
}
// Convert SQL LIKE pattern to regex
// % = any characters, _ = single character
$regex = '/^';
$len = strlen($pattern);
for ($i = 0; $i < $len; $i++) {
$char = $pattern[$i];
if ($char === '%') {
$regex .= '.*';
} elseif ($char === '_') {
$regex .= '.';
} else {
$regex .= preg_quote($char, '/');
}
}
$regex .= '$/is';
return preg_match($regex, (string) $value) === 1;
}
private function sortRows(array $rows): array
{
$compareFn = $this->getCompareFn();
uasort($rows, function ($a, $b) use ($compareFn) {
foreach ($this->orderBy as $order) {
$col = $order->column;
$valA = $a->$col ?? null;
$valB = $b->$col ?? null;
// NULL handling: NULLs sort last in ASC, first in DESC
if ($valA === null && $valB === null) {
continue;
}
if ($valA === null) {
return $order->asc ? 1 : -1;
}
if ($valB === null) {
return $order->asc ? -1 : 1;
}
// Compare values
if (is_numeric($valA) && is_numeric($valB)) {
$cmp = (float) $valA <=> (float) $valB;
} elseif (is_string($valA) && is_string($valB)) {
$cmp = $compareFn($valA, $valB);
} else {
$cmp = $valA <=> $valB;
}
if ($cmp !== 0) {
return $order->asc ? $cmp : -$cmp;
}
}
return 0;
});
return $rows;
}
public function count(): int
{
$count = 0;
$offset = $this->offset;
$limit = $this->limit;
$skipped = 0;
foreach ($this->rows as $rowId => $row) {
if (!$this->matchesFilters($row, $rowId)) {
continue;
}
if ($offset > 0 && $skipped < $offset) {
$skipped++;
continue;
}
$count++;
if ($limit !== null && $count >= $limit) {
break;
}
}
return $count;
}
public function load(string|int $rowId): ?object
{
if (!isset($this->rows[$rowId])) {
return null;
}
$row = $this->rows[$rowId];
// Check filters
if (!$this->matchesFilters($row, (int) $rowId)) {
return null;
}
return $row; // Already an object
}
}