FilteredTable.php
PHP
Path: src/Table/Wrappers/FilteredTable.php
<?php
namespace mini\Table\Wrappers;
use mini\Table\AbstractTable;
use mini\Table\ColumnDef;
use mini\Table\Contracts\SetInterface;
use mini\Table\Contracts\TableInterface;
use mini\Table\Types\Operator;
use mini\Table\OrderDef;
use mini\Table\Utility\EmptyTable;
use mini\Table\Utility\TablePropertiesTrait;
use Traversable;
/**
* Filters rows from source table using a column/operator/value condition
*
* Unlike AbstractTableWrapper's default pushdown behavior, FilteredTable
* applies filtering in-memory during materialization. This is used by
* BarrierTable to filter already-selected rows.
*
* ```php
* new FilteredTable($source, 'status', Operator::Eq, 'active')
* new FilteredTable($source, 'age', Operator::Gte, 18)
* new FilteredTable($source, 'id', Operator::In, $setOfIds)
* ```
*/
class FilteredTable extends AbstractTableWrapper
{
use TablePropertiesTrait;
private ColumnDef $columnDef;
public function __construct(
AbstractTable $source,
private string $column,
private Operator $operator,
private mixed $value,
) {
// Absorb source's limit/offset - we apply them after filtering
$this->limit = $source->getLimit();
$this->offset = $source->getOffset();
// Clear source's limit/offset since we handle it
if ($this->limit !== null) {
$source = $source->limit(null);
}
if ($this->offset !== 0) {
$source = $source->offset(0);
}
// Validate column exists and cache its definition (check all columns, not just visible)
$cols = $source->getAllColumns();
$this->columnDef = $cols[$column] ?? throw new \LogicException("Unknown column '$column'");
parent::__construct($source);
}
// -------------------------------------------------------------------------
// Accessors for predicate inspection
// -------------------------------------------------------------------------
public function getFilterColumn(): string
{
return $this->column;
}
public function getFilterOperator(): Operator
{
return $this->operator;
}
public function getFilterValue(): mixed
{
return $this->value;
}
/**
* Test if a row matches this filter
*/
public function test(object $row): bool
{
$col = $this->column;
if (!property_exists($row, $col)) {
return true; // Open world assumption - missing properties pass
}
$rowValue = $row->$col;
return match ($this->operator) {
Operator::Eq => $this->testEq($rowValue),
Operator::Lt => $rowValue !== null && $rowValue < $this->value,
Operator::Lte => $rowValue !== null && $rowValue <= $this->value,
Operator::Gt => $rowValue !== null && $rowValue > $this->value,
Operator::Gte => $rowValue !== null && $rowValue >= $this->value,
Operator::In => $this->testIn($rowValue),
Operator::Like => $this->testLike($rowValue),
};
}
private function testEq(mixed $rowValue): bool
{
$value = $this->value;
if ($value === null) {
return $rowValue === null;
}
if (is_numeric($rowValue) && is_numeric($value)) {
return $rowValue == $value;
}
return $rowValue === $value;
}
private function testIn(mixed $rowValue): bool
{
$member = (object)[$this->column => $rowValue];
return $this->value->has($member);
}
private function testLike(mixed $rowValue): bool
{
if ($rowValue === null) {
return false;
}
$regex = '/^' . str_replace(
['%', '_'],
['.*', '.'],
preg_quote($this->value, '/')
) . '$/i';
return preg_match($regex, (string)$rowValue) === 1;
}
protected function materialize(string ...$additionalColumns): Traversable
{
$col = $this->column;
$allAdditional = array_unique([...$additionalColumns, $col]);
$source = parent::materialize(...$allAdditional);
// Select filter strategy once, then apply pagination
$filtered = match ($this->operator) {
Operator::Eq => $this->filterEq($source, $col),
Operator::Lt => $this->filterLt($source, $col),
Operator::Lte => $this->filterLte($source, $col),
Operator::Gt => $this->filterGt($source, $col),
Operator::Gte => $this->filterGte($source, $col),
Operator::In => $this->filterIn($source, $col),
Operator::Like => $this->filterLike($source, $col),
};
if ($this->getLimit() === null && $this->getOffset() === 0) {
yield from $filtered;
} else {
yield from $this->paginate($filtered);
}
}
private function filterEq(iterable $source, string $col): \Generator
{
$value = $this->value;
if ($value === null) {
// eq(col, null) implements SQL "col IS NULL"
foreach ($source as $id => $row) {
if ($row->$col === null) {
yield $id => $row;
}
}
} elseif ($this->columnDef->type->isNumeric()) {
// Numeric: use == for type coercion (5 == 5.0 is true)
foreach ($source as $id => $row) {
if ($row->$col == $value) {
yield $id => $row;
}
}
} else {
// String: use === for exact match
foreach ($source as $id => $row) {
if ($row->$col === $value) {
yield $id => $row;
}
}
}
}
private function filterLt(iterable $source, string $col): \Generator
{
$value = $this->value;
if ($this->columnDef->type->isNumeric()) {
foreach ($source as $id => $row) {
if ($row->$col !== null && $row->$col < $value) {
yield $id => $row;
}
}
} elseif ($this->columnDef->type->shouldUseCollator()) {
if (!is_string($value) && !is_numeric($value)) return;
$value = (string) $value;
$cmp = $this->getCompareFn();
foreach ($source as $id => $row) {
if ($row->$col !== null && $cmp($row->$col, $value) < 0) {
yield $id => $row;
}
}
} else {
// DateTime, Binary - use binary comparison
if (!is_string($value) && !is_numeric($value)) return;
$value = (string) $value;
foreach ($source as $id => $row) {
if ($row->$col !== null && $row->$col < $value) {
yield $id => $row;
}
}
}
}
private function filterLte(iterable $source, string $col): \Generator
{
$value = $this->value;
if ($this->columnDef->type->isNumeric()) {
foreach ($source as $id => $row) {
if ($row->$col !== null && $row->$col <= $value) {
yield $id => $row;
}
}
} elseif ($this->columnDef->type->shouldUseCollator()) {
if (!is_string($value) && !is_numeric($value)) return;
$value = (string) $value;
$cmp = $this->getCompareFn();
foreach ($source as $id => $row) {
if ($row->$col !== null && $cmp($row->$col, $value) <= 0) {
yield $id => $row;
}
}
} else {
// DateTime, Binary - use binary comparison
if (!is_string($value) && !is_numeric($value)) return;
$value = (string) $value;
foreach ($source as $id => $row) {
if ($row->$col !== null && $row->$col <= $value) {
yield $id => $row;
}
}
}
}
private function filterGt(iterable $source, string $col): \Generator
{
$value = $this->value;
if ($this->columnDef->type->isNumeric()) {
foreach ($source as $id => $row) {
if ($row->$col !== null && $row->$col > $value) {
yield $id => $row;
}
}
} elseif ($this->columnDef->type->shouldUseCollator()) {
if (!is_string($value) && !is_numeric($value)) return;
$value = (string) $value;
$cmp = $this->getCompareFn();
foreach ($source as $id => $row) {
if ($row->$col !== null && $cmp($row->$col, $value) > 0) {
yield $id => $row;
}
}
} else {
// DateTime, Binary - use binary comparison
if (!is_string($value) && !is_numeric($value)) return;
$value = (string) $value;
foreach ($source as $id => $row) {
if ($row->$col !== null && $row->$col > $value) {
yield $id => $row;
}
}
}
}
private function filterGte(iterable $source, string $col): \Generator
{
$value = $this->value;
if ($this->columnDef->type->isNumeric()) {
foreach ($source as $id => $row) {
if ($row->$col !== null && $row->$col >= $value) {
yield $id => $row;
}
}
} elseif ($this->columnDef->type->shouldUseCollator()) {
if (!is_string($value) && !is_numeric($value)) return;
$value = (string) $value;
$cmp = $this->getCompareFn();
foreach ($source as $id => $row) {
if ($row->$col !== null && $cmp($row->$col, $value) >= 0) {
yield $id => $row;
}
}
} else {
// DateTime, Binary - use binary comparison
if (!is_string($value) && !is_numeric($value)) return;
$value = (string) $value;
foreach ($source as $id => $row) {
if ($row->$col !== null && $row->$col >= $value) {
yield $id => $row;
}
}
}
}
private function filterIn(iterable $source, string $col): \Generator
{
foreach ($source as $id => $row) {
if ($this->matchesIn($row->$col ?? null)) {
yield $id => $row;
}
}
}
private function filterLike(iterable $source, string $col): \Generator
{
foreach ($source as $id => $row) {
if ($this->matchesLike($row->$col ?? null)) {
yield $id => $row;
}
}
}
private function paginate(iterable $source): \Generator
{
$skipped = 0;
$emitted = 0;
$limit = $this->getLimit();
$offset = $this->getOffset();
foreach ($source as $id => $row) {
if ($skipped < $offset) {
$skipped++;
continue;
}
yield $id => $row;
$emitted++;
if ($limit !== null && $emitted >= $limit) {
return;
}
}
}
// -------------------------------------------------------------------------
// Limit/offset must be stored locally, not pushed to source
// -------------------------------------------------------------------------
public function limit(?int $n): TableInterface
{
if ($this->limit === $n) {
return $this;
}
$c = clone $this;
$c->limit = $n;
return $c;
}
public function offset(int $n): TableInterface
{
if ($this->offset === $n) {
return $this;
}
$c = clone $this;
$c->offset = $n;
return $c;
}
private function matches(mixed $rowValue): bool
{
return match ($this->operator) {
Operator::Eq => $this->compare($rowValue, $this->value) === 0,
Operator::Lt => $this->compare($rowValue, $this->value) < 0,
Operator::Lte => $this->compare($rowValue, $this->value) <= 0,
Operator::Gt => $this->compare($rowValue, $this->value) > 0,
Operator::Gte => $this->compare($rowValue, $this->value) >= 0,
Operator::In => $this->matchesIn($rowValue),
Operator::Like => $this->matchesLike($rowValue),
};
}
/**
* Compare two values respecting column type
*
* - Numeric columns: use <=> (allows int/float coercion)
* - Text columns: use collator for locale-aware comparison
* - Binary/DateTime columns: use <=> for raw byte ordering
*/
private function compare(mixed $left, mixed $right): int
{
// Text columns use locale-aware collator
if ($this->columnDef->type->shouldUseCollator()) {
if (is_string($left) || is_string($right)) {
return $this->getCompareFn()((string)$left, (string)$right);
}
}
// All other types (Int, Float, DateTime, Binary) use binary comparison
return $left <=> $right;
}
private function matchesIn(mixed $rowValue): bool
{
$set = $this->value;
if (!$set instanceof SetInterface) {
throw new \LogicException('IN operator requires SetInterface value');
}
$member = (object)[$this->column => $rowValue];
return $set->has($member);
}
private function matchesLike(mixed $rowValue): bool
{
if ($rowValue === null) {
return false;
}
// Convert SQL LIKE pattern to regex
$pattern = $this->value;
$regex = '/^' . str_replace(
['%', '_'],
['.*', '.'],
preg_quote($pattern, '/')
) . '$/i';
return preg_match($regex, (string)$rowValue) === 1;
}
public function count(): int
{
return iterator_count($this);
}
public function has(object $member): bool
{
// Short-circuit: if member doesn't match our filter, it's not in the result
if (!$this->test($member)) {
return false;
}
return parent::has($member);
}
public function load(string|int $rowId): ?object
{
$row = $this->source->load($rowId);
if ($row === null) {
return null;
}
// Check if the loaded row passes our filter
if (!$this->test($row)) {
return null;
}
return $row;
}
// -------------------------------------------------------------------------
// Filter methods - optimize same-column filters, wrap otherwise
// -------------------------------------------------------------------------
/**
* Restore absorbed pagination to a replacement table
*
* Used when optimizations bypass this wrapper and return a new table
* directly on source - we must transfer our absorbed limit/offset.
*/
private function withPagination(TableInterface $table): TableInterface
{
if ($this->limit !== null) {
$table = $table->limit($this->limit);
}
if ($this->offset !== 0) {
$table = $table->offset($this->offset);
}
return $table;
}
public function eq(string $column, int|float|string|null $value): TableInterface
{
if ($column === $this->column) {
// IN + eq: check if value is in the set (only for simple Sets, not subqueries)
if ($this->operator === Operator::In && !$this->value instanceof TableInterface) {
$member = (object)[$column => $value];
if ($this->value->has($member)) {
// Value in set: eq is more specific, use it
return $this->withPagination($this->source->eq($column, $value));
}
return EmptyTable::from($this);
}
// LIKE + eq: check if value matches pattern
if ($this->operator === Operator::Like) {
if ($this->matchesLike($value)) {
return $this->withPagination($this->source->eq($column, $value));
}
return EmptyTable::from($this);
}
$cmp = $this->compare($value, $this->value);
if ($this->operator === Operator::Eq) {
return $cmp === 0 ? $this : EmptyTable::from($this);
}
// Check if current filter allows this eq value
$compatible = match ($this->operator) {
Operator::Lt => $cmp < 0,
Operator::Lte => $cmp <= 0,
Operator::Gt => $cmp > 0,
Operator::Gte => $cmp >= 0,
default => null,
};
if ($compatible === true) {
return $this->withPagination($this->source->eq($column, $value));
}
if ($compatible === false) {
return EmptyTable::from($this);
}
}
return parent::eq($column, $value);
}
public function lt(string $column, int|float|string $value): TableInterface
{
if ($column === $this->column) {
$cmp = $this->compare($value, $this->value);
if ($this->operator === Operator::Eq) {
return $this->compare($this->value, $value) < 0 ? $this : EmptyTable::from($this);
}
if ($this->operator === Operator::Lt) {
// lt + lt: keep stricter (smaller) bound
return $cmp < 0 ? $this->withPagination($this->source->lt($column, $value)) : $this;
}
if ($this->operator === Operator::Lte) {
// lte + lt: lt is stricter when value <= existing
return $cmp <= 0 ? $this->withPagination($this->source->lt($column, $value)) : $this;
}
if ($this->operator === Operator::Gt || $this->operator === Operator::Gte) {
if ($cmp <= 0) {
return EmptyTable::from($this);
}
}
}
return parent::lt($column, $value);
}
public function lte(string $column, int|float|string $value): TableInterface
{
if ($column === $this->column) {
$cmp = $this->compare($value, $this->value);
if ($this->operator === Operator::Eq) {
return $this->compare($this->value, $value) <= 0 ? $this : EmptyTable::from($this);
}
if ($this->operator === Operator::Lt || $this->operator === Operator::Lte) {
return $cmp < 0 ? $this->withPagination($this->source->lte($column, $value)) : $this;
}
if ($this->operator === Operator::Gt) {
if ($cmp <= 0) {
return EmptyTable::from($this);
}
}
if ($this->operator === Operator::Gte) {
if ($cmp > 0) {
// valid range, fall through
} elseif ($cmp === 0) {
return $this->withPagination($this->source->eq($column, $value));
} else {
return EmptyTable::from($this);
}
}
}
return parent::lte($column, $value);
}
public function gt(string $column, int|float|string $value): TableInterface
{
if ($column === $this->column) {
$cmp = $this->compare($value, $this->value);
if ($this->operator === Operator::Eq) {
return $this->compare($this->value, $value) > 0 ? $this : EmptyTable::from($this);
}
if ($this->operator === Operator::Gt) {
// gt + gt: keep stricter (larger) bound
return $cmp > 0 ? $this->withPagination($this->source->gt($column, $value)) : $this;
}
if ($this->operator === Operator::Gte) {
// gte + gt: gt is stricter when value >= existing
return $cmp >= 0 ? $this->withPagination($this->source->gt($column, $value)) : $this;
}
if ($this->operator === Operator::Lt || $this->operator === Operator::Lte) {
if ($cmp >= 0) {
return EmptyTable::from($this);
}
}
}
return parent::gt($column, $value);
}
public function gte(string $column, int|float|string $value): TableInterface
{
if ($column === $this->column) {
$cmp = $this->compare($value, $this->value);
if ($this->operator === Operator::Eq) {
return $this->compare($this->value, $value) >= 0 ? $this : EmptyTable::from($this);
}
if ($this->operator === Operator::Gt || $this->operator === Operator::Gte) {
return $cmp > 0 ? $this->withPagination($this->source->gte($column, $value)) : $this;
}
if ($this->operator === Operator::Lt) {
if ($cmp >= 0) {
return EmptyTable::from($this);
}
}
if ($this->operator === Operator::Lte) {
if ($cmp < 0) {
// valid range, fall through
} elseif ($cmp === 0) {
return $this->withPagination($this->source->eq($column, $value));
} else {
return EmptyTable::from($this);
}
}
}
return parent::gte($column, $value);
}
public function order(?string $spec): TableInterface
{
$orders = $spec ? OrderDef::parse($spec) : [];
if (empty($orders)) {
return $this;
}
return new SortedTable($this, ...$orders);
}
}