SortedTable.php
PHP
Path: src/Table/Wrappers/SortedTable.php
<?php
namespace mini\Table\Wrappers;
use Closure;
use mini\Table\AbstractTable;
use mini\Table\Contracts\TableInterface;
use mini\Table\OrderDef;
use mini\Table\Utility\TablePropertiesTrait;
use stdClass;
use Traversable;
/**
* Sorted table wrapper
*
* Applies ordering to a source table. If source has an index for the order
* columns, delegates to source. Otherwise, buffers and sorts in-memory.
*
* ```php
* SortedTable::for($table, new OrderDef('name'), new OrderDef('age', false))
* ```
*/
class SortedTable extends AbstractTableWrapper
{
/** @var OrderDef[] */
private array $orderBy = [];
/**
* Create sorted view of a table, optimizing where possible
*
* Returns source directly if it can handle the ordering natively.
* Strips any existing SortedTable wrapper before wrapping.
*/
public static function from(TableInterface $source, OrderDef ...$orders): TableInterface
{
if (empty($orders)) {
return $source->order(null);
}
$orderColumns = OrderDef::columns($orders);
// Check if source can handle this ordering natively
foreach ($source->getColumns() as $colDef) {
if ($colDef->canOrder($orderColumns)) {
return $source->order(OrderDef::toSpec($orders));
}
}
// Strip any existing SortedTable from stack before wrapping
$base = $source->order(null);
return new self($base, ...$orders);
}
/**
* @param AbstractTable $source Source table to wrap
* @param OrderDef ...$orders Order specifications
*/
public function __construct(
AbstractTable $source,
OrderDef ...$orders,
) {
// Absorb source's limit/offset - we apply them after sorting
$this->limit = $source->getLimit();
$this->offset = $source->getOffset();
// Clear source's limit/offset since we handle it
if ($this->limit !== null) {
$source = $source->limit(null);
}
if ($this->offset !== 0) {
$source = $source->offset(0);
}
parent::__construct($source);
$this->orderBy = $orders;
}
public function order(?string $spec): TableInterface
{
return $this->source->order($spec);
}
/**
* Get the order specification for predicate inspection
*/
public function getOrderSpec(): ?string
{
return empty($this->orderBy) ? null : OrderDef::toSpec($this->orderBy);
}
protected function materialize(string ...$additionalColumns): Traversable
{
$orderColumns = OrderDef::columns($this->orderBy);
$allAdditional = array_unique([...$additionalColumns, ...$orderColumns]);
$limit = $this->getLimit();
$offset = $this->getOffset();
$k = $limit !== null ? $limit + $offset : null;
// Optimization: use bounded heap when we have a small limit
// Only beneficial when k << n (we don't know n upfront, but k < 1000 is a good heuristic)
if ($k !== null && $k <= 1000) {
yield from $this->materializeWithBoundedHeap($allAdditional, $k, $offset, $limit);
return;
}
// Full sort for unlimited queries or large limits
$buffer = iterator_to_array(parent::materialize(...$allAdditional));
uasort($buffer, $this->buildRowComparator());
// Apply limit/offset after sorting
$skipped = 0;
$emitted = 0;
foreach ($buffer as $key => $row) {
if ($skipped < $offset) {
$skipped++;
continue;
}
yield $key => $row;
$emitted++;
if ($limit !== null && $emitted >= $limit) {
return;
}
}
}
/**
* Materialize using a bounded heap for efficient top-k selection
*
* Instead of sorting all n rows (O(n log n)), we maintain a heap of size k
* and process each row once (O(n log k)). This is much faster when k << n.
*
* @param array $additionalColumns Columns to include in materialization
* @param int $k Maximum heap size (limit + offset)
* @param int $offset Number of rows to skip
* @param int|null $limit Number of rows to return
*/
private function materializeWithBoundedHeap(
array $additionalColumns,
int $k,
int $offset,
?int $limit
): Traversable {
$comparator = $this->buildRowComparator();
// Heap comparator: we want the WORST element at the top for easy eviction.
// For ORDER BY ASC, worst = largest, so we need max-heap.
// SplHeap puts elements with HIGHEST compare value at top.
// Our comparator returns positive when a > b, so it naturally gives max-heap.
$heapComparator = fn($a, $b) => $comparator($a[1], $b[1]);
// Use a custom heap with our comparator
$heap = new class($heapComparator) extends \SplHeap {
private $cmp;
public function __construct(callable $cmp) { $this->cmp = $cmp; }
protected function compare($a, $b): int { return ($this->cmp)($a, $b); }
};
// Stream through source, maintaining bounded heap
foreach (parent::materialize(...$additionalColumns) as $key => $row) {
$heap->insert([$key, $row]);
// Evict worst element if heap exceeds bound
if ($heap->count() > $k) {
$heap->extract();
}
}
// Extract results from heap (they come out in reverse order - worst first)
$results = [];
while (!$heap->isEmpty()) {
$results[] = $heap->extract();
}
// Reverse to get correct order (best first), then apply offset/limit
$results = array_reverse($results);
$skipped = 0;
$emitted = 0;
foreach ($results as [$key, $row]) {
if ($skipped < $offset) {
$skipped++;
continue;
}
yield $key => $row;
$emitted++;
if ($limit !== null && $emitted >= $limit) {
return;
}
}
}
public function count(): int
{
return iterator_count($this);
}
// -------------------------------------------------------------------------
// Limit/offset must be stored locally, not pushed to source
// -------------------------------------------------------------------------
public function limit(?int $n): TableInterface
{
if ($this->limit === $n) {
return $this;
}
$c = clone $this;
$c->limit = $n;
return $c;
}
public function offset(int $n): TableInterface
{
if ($this->offset === $n) {
return $this;
}
$c = clone $this;
$c->offset = $n;
return $c;
}
/**
* Build a row comparator function from OrderDef[]
*/
private function buildRowComparator(): Closure
{
$orders = $this->orderBy;
$compareFn = $this->getCompareFn();
$columns = $this->source->getAllColumns();
return function (stdClass $a, stdClass $b) use ($orders, $compareFn, $columns): int {
foreach ($orders as $order) {
$column = $order->column;
$mult = $order->asc ? 1 : -1;
$valA = $a->$column ?? null;
$valB = $b->$column ?? null;
// Use <=> for null, int, float
if ($valA === null || $valB === null
|| is_int($valA) || is_float($valA)
|| is_int($valB) || is_float($valB)
) {
$cmp = $valA <=> $valB;
} elseif (isset($columns[$column]) && $columns[$column]->type->shouldUseCollator()) {
// Text columns use locale-aware collator
$cmp = $compareFn((string)$valA, (string)$valB);
} else {
// Binary, Date, Time, DateTime use binary comparison
$cmp = $valA <=> $valB;
}
if ($cmp !== 0) {
return $mult * $cmp;
}
}
return 0;
};
}
}