PDODatabase.php
PHP
Path: src/Database/PDODatabase.php
<?php
namespace mini\Database;
use mini\Database\DatabaseInterface;
use mini\Mini;
use mini\Parsing\SQL\AST\ASTNode;
use mini\Parsing\SQL\SqlRenderer;
use mini\Table\ColumnDef;
use mini\Table\Contracts\TableInterface;
use mini\Table\GeneratorTable;
use mini\Table\Types\ColumnType;
use PDO;
use PDOException;
use Exception;
use function mini\sqlval;
/**
* PDO-based database implementation
*
* Wraps any PDO instance with a clean, ergonomic API that makes database
* operations pleasant while supporting any PDO-compatible database.
* Fetches PDO from container lazily to ensure proper scoping.
*/
class PDODatabase implements DatabaseInterface
{
private ?PDO $pdo = null;
private bool $inTransaction = false;
/** @var \WeakMap<Query, PartialQuery> Maps Query instances to their underlying PartialQuery */
private \WeakMap $queryMap;
/**
* Create a PDODatabase instance
*
* @param PDO|null $pdo Optional PDO instance. If null, fetches from container lazily.
*/
public function __construct(?PDO $pdo = null)
{
$this->pdo = $pdo;
$this->queryMap = new \WeakMap();
}
/**
* Get PDO instance (from constructor or container)
*/
private function lazyPdo(): PDO
{
if ($this->pdo === null) {
$this->pdo = Mini::$mini->get(PDO::class);
}
return $this->pdo;
}
/**
* Execute a SELECT query and return a composable Query
*/
public function query(string $sql, array $params = []): Query
{
$pq = PartialQuery::fromSql($this, $this->rawExecutor(), $sql, $params);
return $this->wrapQuery($pq);
}
/**
* Wrap a PartialQuery in a Query and register the mapping
*
* The returned Query's wrap closure also registers derived queries,
* ensuring all Query instances created from this database are tracked.
*/
private function wrapQuery(PartialQuery $pq): Query
{
$query = new Query($pq, function (PartialQuery $derivedPq): Query {
return $this->wrapQuery($derivedPq);
});
$this->queryMap[$query] = $pq;
return $query;
}
/**
* Get the underlying PartialQuery for a Query instance
*/
private function unwrapQuery(Query $query): PartialQuery
{
if (!isset($this->queryMap[$query])) {
throw new \InvalidArgumentException("Query was not created by this database");
}
return $this->queryMap[$query];
}
/**
* Get a raw query executor closure for PartialQuery
*
* Fast path: If AST is null, uses PartialQuery::getSql() which returns
* the original SQL string directly (no parsing/rendering).
*
* Slow path: If AST is provided (modified query or count), renders it.
*/
private function rawExecutor(): \Closure
{
$dialect = $this->getDialect();
$renderer = SqlRenderer::forDialect($dialect);
return function (PartialQuery $query, ?ASTNode $ast) use ($dialect, $renderer): \Traversable {
try {
// If AST provided (e.g., count query), render it
// Otherwise use getSql() which handles fast path internally
if ($ast !== null) {
[$sql, $params] = $renderer->renderWithParams($ast);
} else {
[$sql, $params] = $query->getSql($dialect);
}
$stmt = $this->lazyPdo()->prepare($sql);
$stmt->execute(array_map(sqlval(...), $params));
while ($row = $stmt->fetch(PDO::FETCH_OBJ)) {
yield $row;
}
} catch (PDOException $e) {
throw new Exception("Query failed: " . $e->getMessage());
}
};
}
/**
* Execute query and return first row only as object
*/
public function queryOne(string $sql, array $params = []): ?object
{
try {
$stmt = $this->lazyPdo()->prepare($sql);
$stmt->execute(array_map(sqlval(...), $params));
$result = $stmt->fetch(PDO::FETCH_OBJ);
return $result ?: null;
} catch (PDOException $e) {
throw new Exception("Query one failed: " . $e->getMessage());
}
}
/**
* Execute query and return first column of first row
*/
public function queryField(string $sql, array $params = []): mixed
{
try {
$stmt = $this->lazyPdo()->prepare($sql);
$stmt->execute(array_map(sqlval(...), $params));
return $stmt->fetchColumn();
} catch (PDOException $e) {
throw new Exception("Query field failed: " . $e->getMessage());
}
}
/**
* Execute query and return first column values as array
*/
public function queryColumn(string $sql, array $params = []): array
{
try {
$stmt = $this->lazyPdo()->prepare($sql);
$stmt->execute(array_map(sqlval(...), $params));
return $stmt->fetchAll(PDO::FETCH_COLUMN, 0);
} catch (PDOException $e) {
throw new Exception("Query column failed: " . $e->getMessage());
}
}
/**
* Execute a statement (INSERT, UPDATE, DELETE)
*/
public function exec(string $sql, array $params = []): int
{
try {
$stmt = $this->lazyPdo()->prepare($sql);
$stmt->execute(array_map(sqlval(...), $params));
return $stmt->rowCount();
} catch (PDOException $e) {
throw new Exception("Exec failed: " . $e->getMessage());
}
}
/**
* Get the last inserted row ID
*/
public function lastInsertId(): ?string
{
$id = $this->lazyPdo()->lastInsertId();
return $id !== false ? $id : null;
}
/**
* Check if a table exists in the database
*
* Note: This implementation uses a database-agnostic approach that should
* work with most SQL databases. For database-specific optimizations,
* consider creating specialized implementations.
*/
public function tableExists(string $tableName): bool
{
try {
// Use INFORMATION_SCHEMA which is supported by most databases
$result = $this->queryField(
"SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = ?",
[$tableName]
);
if ($result !== null) {
return (int)$result > 0;
}
// Fallback for databases that don't support INFORMATION_SCHEMA (like SQLite)
// Try to query the table and see if it fails
$this->lazyPdo()->prepare("SELECT 1 FROM {$tableName} LIMIT 0")->execute();
return true;
} catch (PDOException $e) {
// If the query failed, the table likely doesn't exist
return false;
}
}
/**
* Execute a closure within a database transaction
*
* @throws \RuntimeException If called while already in a transaction
*/
public function transaction(\Closure $task): mixed
{
if ($this->inTransaction) {
throw new \RuntimeException(
"Already in a transaction. Nested transactions are not supported. " .
"Restructure your code to use a single transaction block."
);
}
try {
$this->lazyPdo()->beginTransaction();
$this->inTransaction = true;
} catch (PDOException $e) {
throw new \RuntimeException("Failed to start transaction: " . $e->getMessage(), 0, $e);
}
try {
$result = $task($this);
$this->lazyPdo()->commit();
$this->inTransaction = false;
return $result;
} catch (\Throwable $e) {
try {
$this->lazyPdo()->rollBack();
} catch (PDOException $rollbackException) {
error_log("Transaction rollback failed: " . $rollbackException->getMessage());
}
$this->inTransaction = false;
throw $e;
}
}
/**
* Get the underlying PDO instance for advanced operations
*
* This allows access to PDO-specific functionality when needed,
* while keeping the common operations clean through the interface.
*
* @return PDO The underlying PDO instance
*/
public function getPdo(): PDO
{
return $this->lazyPdo();
}
/**
* Get the SQL dialect for this database connection
*/
public function getDialect(): SqlDialect
{
$driver = $this->lazyPdo()->getAttribute(\PDO::ATTR_DRIVER_NAME);
return match($driver) {
'mysql' => SqlDialect::MySQL,
'pgsql' => SqlDialect::Postgres,
'sqlite' => SqlDialect::Sqlite,
'sqlsrv', 'mssql', 'dblib' => SqlDialect::SqlServer,
'oci' => SqlDialect::Oracle,
default => SqlDialect::Generic,
};
}
/**
* Quotes a value for safe use in SQL query strings
*/
public function quote(mixed $value): string
{
if ($value === null) return 'NULL';
if (is_int($value)) return $this->lazyPdo()->quote($value, \PDO::PARAM_INT);
if (is_bool($value)) return $this->lazyPdo()->quote($value, \PDO::PARAM_BOOL);
return $this->lazyPdo()->quote($value, \PDO::PARAM_STR);
}
/**
* Quotes an identifier (table name, column name) for safe use in SQL
*/
public function quoteIdentifier(string $identifier): string
{
$dialect = $this->getDialect();
// Handle dotted identifiers (e.g., "table.column")
if (str_contains($identifier, '.')) {
return implode('.', array_map(fn($part) => $this->quoteIdentifier($part), explode('.', $identifier)));
}
return match($dialect) {
SqlDialect::MySQL => '`' . str_replace('`', '``', $identifier) . '`',
SqlDialect::SqlServer => '[' . str_replace(']', ']]', $identifier) . ']',
default => '"' . str_replace('"', '""', $identifier) . '"',
};
}
/**
* Delete rows matching a query
*/
public function delete(Query|PartialQuery $query): int
{
$pq = $query instanceof Query ? $this->unwrapQuery($query) : $query;
$table = $pq->getSourceTable();
$ctes = $pq->getCTEs();
$where = $pq->getWhere();
// Require WHERE clause for safety
// Use db()->exec('DELETE FROM table') or TRUNCATE for mass deletes
if (empty($where['sql'])) {
throw new \InvalidArgumentException(
"DELETE requires a WHERE clause. Use db()->exec('DELETE FROM {$table}') for mass deletes."
);
}
$sql = $ctes['sql'] . "DELETE FROM {$table}";
$sql .= " WHERE {$where['sql']}";
$limit = $pq->getLimit();
if ($limit !== null) {
$sql .= " LIMIT {$limit}";
}
$params = array_merge($ctes['params'], $where['params']);
try {
$stmt = $this->lazyPdo()->prepare($sql);
$stmt->execute($params);
return $stmt->rowCount();
} catch (PDOException $e) {
throw new Exception("Delete failed: " . $e->getMessage());
}
}
/**
* Update rows matching a query
*/
public function update(Query|PartialQuery $query, string|array $set, array $params = []): int
{
$pq = $query instanceof Query ? $this->unwrapQuery($query) : $query;
$table = $pq->getSourceTable();
$ctes = $pq->getCTEs();
$where = $pq->getWhere();
if (is_string($set)) {
// Raw SQL expression with optional params
$sql = $ctes['sql'] . "UPDATE {$table} SET {$set}";
$params = array_merge($ctes['params'], $params, $where['params']);
} else {
// Array of column => value assignments
$setParts = [];
$setParams = [];
foreach ($set as $column => $value) {
$setParts[] = "$column = ?";
$setParams[] = $value;
}
$sql = $ctes['sql'] . "UPDATE {$table} SET " . implode(', ', $setParts);
$params = array_merge($ctes['params'], $setParams, $where['params']);
}
if ($where['sql']) {
$sql .= " WHERE {$where['sql']}";
}
$limit = $pq->getLimit();
if ($limit !== null) {
$sql .= " LIMIT {$limit}";
}
try {
$stmt = $this->lazyPdo()->prepare($sql);
$stmt->execute($params);
return $stmt->rowCount();
} catch (PDOException $e) {
throw new Exception("Update failed: " . $e->getMessage());
}
}
/**
* Insert a new row into a table
*/
public function insert(string $table, array $data): string
{
if (empty($data)) {
throw new \InvalidArgumentException("Data array cannot be empty for insert");
}
$columns = array_keys($data);
$values = array_values($data);
$placeholders = implode(', ', array_fill(0, count($data), '?'));
$columnList = implode(', ', $columns);
$sql = "INSERT INTO $table ($columnList) VALUES ($placeholders)";
try {
$stmt = $this->lazyPdo()->prepare($sql);
$stmt->execute($values);
return $this->lastInsertId() ?? '';
} catch (PDOException $e) {
throw new Exception("Insert failed: " . $e->getMessage());
}
}
/**
* Insert a row, or update if conflict on unique columns
*/
public function upsert(string $table, array $data, string ...$conflictColumns): int
{
if (empty($data)) {
throw new \InvalidArgumentException("Data array cannot be empty for upsert");
}
if (empty($conflictColumns)) {
throw new \InvalidArgumentException("At least one conflict column must be specified for upsert");
}
$columns = array_keys($data);
$values = array_values($data);
$placeholders = implode(', ', array_fill(0, count($data), '?'));
$dialect = $this->getDialect();
// Build dialect-specific UPSERT SQL
$sql = match($dialect) {
SqlDialect::MySQL => $this->buildMySQLUpsert($table, $columns, $placeholders, $conflictColumns),
SqlDialect::Postgres => $this->buildPostgresUpsert($table, $columns, $placeholders, $conflictColumns),
SqlDialect::Sqlite => $this->buildSqliteUpsert($table, $columns, $placeholders, $conflictColumns),
SqlDialect::SqlServer => $this->buildSqlServerUpsert($table, $columns, $values, $conflictColumns),
SqlDialect::Oracle => $this->buildOracleUpsert($table, $columns, $values, $conflictColumns),
SqlDialect::Generic => $this->buildPostgresUpsert($table, $columns, $placeholders, $conflictColumns), // Use Postgres syntax as generic
};
try {
// SQL Server and Oracle MERGE use values directly in SQL, others use placeholders
if ($dialect === SqlDialect::SqlServer || $dialect === SqlDialect::Oracle) {
$stmt = $this->lazyPdo()->prepare($sql);
$stmt->execute();
} else {
// MySQL, Postgres, Sqlite use placeholders only for INSERT
// UPDATE uses VALUES() for MySQL or EXCLUDED for Postgres/Sqlite
$stmt = $this->lazyPdo()->prepare($sql);
$stmt->execute($values);
}
return $stmt->rowCount();
} catch (PDOException $e) {
throw new Exception("Upsert failed: " . $e->getMessage());
}
}
private function buildMySQLUpsert(string $table, array $columns, string $placeholders, array $conflictColumns): string
{
$columnList = implode(', ', $columns);
// Build UPDATE clause for all columns except conflict columns
$updateParts = [];
foreach ($columns as $column) {
$updateParts[] = "$column = VALUES($column)";
}
$updateClause = implode(', ', $updateParts);
return "INSERT INTO $table ($columnList) VALUES ($placeholders) ON DUPLICATE KEY UPDATE $updateClause";
}
private function buildPostgresUpsert(string $table, array $columns, string $placeholders, array $conflictColumns): string
{
$columnList = implode(', ', $columns);
$conflictList = implode(', ', $conflictColumns);
// Build UPDATE clause using EXCLUDED
$updateParts = [];
foreach ($columns as $column) {
// Don't update conflict columns
if (!in_array($column, $conflictColumns)) {
$updateParts[] = "$column = EXCLUDED.$column";
}
}
if (empty($updateParts)) {
// If all columns are conflict columns, use DO NOTHING
return "INSERT INTO $table ($columnList) VALUES ($placeholders) ON CONFLICT ($conflictList) DO NOTHING";
}
$updateClause = implode(', ', $updateParts);
return "INSERT INTO $table ($columnList) VALUES ($placeholders) ON CONFLICT ($conflictList) DO UPDATE SET $updateClause";
}
private function buildSqliteUpsert(string $table, array $columns, string $placeholders, array $conflictColumns): string
{
// SQLite 3.24.0+ uses same syntax as PostgreSQL
return $this->buildPostgresUpsert($table, $columns, $placeholders, $conflictColumns);
}
private function buildSqlServerUpsert(string $table, array $columns, array $values, array $conflictColumns): string
{
// SQL Server uses MERGE statement - more complex
// Build quoted values for SQL
$quotedValues = array_map(fn($v) => $this->quote($v), $values);
$columnList = implode(', ', $columns);
$valuesList = implode(', ', $quotedValues);
// Build source values
$sourceValues = [];
foreach ($columns as $i => $column) {
$sourceValues[] = "{$quotedValues[$i]} AS $column";
}
$sourceClause = implode(', ', $sourceValues);
// Build match condition
$matchConditions = [];
foreach ($conflictColumns as $column) {
$matchConditions[] = "target.$column = source.$column";
}
$matchClause = implode(' AND ', $matchConditions);
// Build update SET clause
$updateParts = [];
foreach ($columns as $column) {
if (!in_array($column, $conflictColumns)) {
$updateParts[] = "$column = source.$column";
}
}
$updateClause = empty($updateParts) ? '' : 'UPDATE SET ' . implode(', ', $updateParts);
$insertClause = "INSERT ($columnList) VALUES ($valuesList)";
return "MERGE INTO $table AS target USING (SELECT $sourceClause) AS source ON $matchClause " .
"WHEN MATCHED THEN $updateClause " .
"WHEN NOT MATCHED THEN $insertClause;";
}
private function buildOracleUpsert(string $table, array $columns, array $values, array $conflictColumns): string
{
// Oracle uses same MERGE syntax as SQL Server
return $this->buildSqlServerUpsert($table, $columns, $values, $conflictColumns);
}
/**
* Get database schema as a TableInterface
*/
public function getSchema(): TableInterface
{
$dialect = $this->getDialect();
$generator = match ($dialect) {
SqlDialect::Sqlite => $this->generateSchemaSqlite(...),
SqlDialect::MySQL => $this->generateSchemaMySQL(...),
SqlDialect::Postgres => $this->generateSchemaPostgres(...),
default => $this->generateSchemaGeneric(...),
};
return new GeneratorTable(
$generator,
new ColumnDef('table_name', ColumnType::Text),
new ColumnDef('name', ColumnType::Text),
new ColumnDef('type', ColumnType::Text),
new ColumnDef('data_type', ColumnType::Text),
new ColumnDef('is_nullable', ColumnType::Int),
new ColumnDef('default_value', ColumnType::Text),
new ColumnDef('ordinal', ColumnType::Int),
new ColumnDef('extra', ColumnType::Text),
);
}
private function generateSchemaSqlite(): \Generator
{
// Get all table names
$tables = $this->queryColumn(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite_%' ORDER BY name"
);
$rowKey = 0;
$pdo = $this->lazyPdo();
foreach ($tables as $table) {
// Get column info via PRAGMA (use raw PDO - PRAGMA isn't a SELECT statement)
$stmt = $pdo->query("PRAGMA table_info({$table})");
$columns = $stmt->fetchAll(PDO::FETCH_OBJ);
$pkColumns = [];
foreach ($columns as $col) {
yield $rowKey++ => (object)[
'table_name' => $table,
'name' => $col->name,
'type' => 'column',
'data_type' => $col->type,
'is_nullable' => $col->notnull ? 0 : 1,
'default_value' => $col->dflt_value,
'ordinal' => $col->cid + 1,
'extra' => null,
];
if ($col->pk) {
$pkColumns[$col->pk] = $col->name;
}
}
// Add primary key as an index entry
if (!empty($pkColumns)) {
ksort($pkColumns);
yield $rowKey++ => (object)[
'table_name' => $table,
'name' => 'PRIMARY',
'type' => 'primary',
'data_type' => null,
'is_nullable' => null,
'default_value' => null,
'ordinal' => null,
'extra' => implode(', ', $pkColumns),
];
}
// Get indexes via PRAGMA
$stmt = $pdo->query("PRAGMA index_list({$table})");
$indexes = $stmt->fetchAll(PDO::FETCH_OBJ);
foreach ($indexes as $idx) {
// Skip auto-generated indexes
if (str_starts_with($idx->name, 'sqlite_autoindex_')) {
continue;
}
// Get columns in this index
$stmt = $pdo->query("PRAGMA index_info({$idx->name})");
$indexCols = $stmt->fetchAll(PDO::FETCH_OBJ);
$colNames = array_map(fn($c) => $c->name, $indexCols);
yield $rowKey++ => (object)[
'table_name' => $table,
'name' => $idx->name,
'type' => $idx->unique ? 'unique' : 'index',
'data_type' => null,
'is_nullable' => null,
'default_value' => null,
'ordinal' => null,
'extra' => implode(', ', $colNames),
];
}
}
}
private function generateSchemaMySQL(): \Generator
{
$rowKey = 0;
// Yield columns
foreach ($this->query(
"SELECT
TABLE_NAME as table_name,
COLUMN_NAME as name,
COLUMN_TYPE as data_type,
IS_NULLABLE,
COLUMN_DEFAULT as default_value,
ORDINAL_POSITION as ordinal
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
ORDER BY TABLE_NAME, ORDINAL_POSITION"
) as $col) {
yield $rowKey++ => (object)[
'table_name' => $col->table_name,
'name' => $col->name,
'type' => 'column',
'data_type' => $col->data_type,
'is_nullable' => $col->IS_NULLABLE === 'YES' ? 1 : 0,
'default_value' => $col->default_value,
'ordinal' => (int)$col->ordinal,
'extra' => null,
];
}
// Yield indexes (grouped by index name)
foreach ($this->query(
"SELECT
TABLE_NAME as table_name,
INDEX_NAME as name,
NON_UNIQUE,
GROUP_CONCAT(COLUMN_NAME ORDER BY SEQ_IN_INDEX) as extra
FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = DATABASE()
GROUP BY TABLE_NAME, INDEX_NAME, NON_UNIQUE
ORDER BY TABLE_NAME, INDEX_NAME"
) as $idx) {
$type = $idx->name === 'PRIMARY' ? 'primary' : ($idx->NON_UNIQUE ? 'index' : 'unique');
yield $rowKey++ => (object)[
'table_name' => $idx->table_name,
'name' => $idx->name,
'type' => $type,
'data_type' => null,
'is_nullable' => null,
'default_value' => null,
'ordinal' => null,
'extra' => $idx->extra,
];
}
}
private function generateSchemaPostgres(): \Generator
{
$rowKey = 0;
// Yield columns
foreach ($this->query(
"SELECT
table_name,
column_name as name,
data_type,
is_nullable,
column_default as default_value,
ordinal_position as ordinal
FROM information_schema.columns
WHERE table_schema = current_schema()
ORDER BY table_name, ordinal_position"
) as $col) {
yield $rowKey++ => (object)[
'table_name' => $col->table_name,
'name' => $col->name,
'type' => 'column',
'data_type' => $col->data_type,
'is_nullable' => $col->is_nullable === 'YES' ? 1 : 0,
'default_value' => $col->default_value,
'ordinal' => (int)$col->ordinal,
'extra' => null,
];
}
// Yield indexes
foreach ($this->query(
"SELECT
t.relname AS table_name,
i.relname AS name,
ix.indisprimary,
ix.indisunique,
string_agg(a.attname, ', ' ORDER BY array_position(ix.indkey, a.attnum)) as extra
FROM pg_class t
JOIN pg_index ix ON t.oid = ix.indrelid
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
JOIN pg_namespace n ON n.oid = t.relnamespace
WHERE n.nspname = current_schema()
AND t.relkind = 'r'
GROUP BY t.relname, i.relname, ix.indisprimary, ix.indisunique
ORDER BY t.relname, i.relname"
) as $idx) {
$type = $idx->indisprimary ? 'primary' : ($idx->indisunique ? 'unique' : 'index');
yield $rowKey++ => (object)[
'table_name' => $idx->table_name,
'name' => $idx->name,
'type' => $type,
'data_type' => null,
'is_nullable' => null,
'default_value' => null,
'ordinal' => null,
'extra' => $idx->extra,
];
}
}
private function generateSchemaGeneric(): \Generator
{
$rowKey = 0;
// Try INFORMATION_SCHEMA for columns
try {
foreach ($this->query(
"SELECT
TABLE_NAME as table_name,
COLUMN_NAME as name,
DATA_TYPE as data_type,
IS_NULLABLE,
COLUMN_DEFAULT as default_value,
ORDINAL_POSITION as ordinal
FROM INFORMATION_SCHEMA.COLUMNS
ORDER BY TABLE_NAME, ORDINAL_POSITION"
) as $col) {
yield $rowKey++ => (object)[
'table_name' => $col->table_name,
'name' => $col->name,
'type' => 'column',
'data_type' => $col->data_type,
'is_nullable' => $col->IS_NULLABLE === 'YES' ? 1 : 0,
'default_value' => $col->default_value,
'ordinal' => (int)$col->ordinal,
'extra' => null,
];
}
} catch (\Throwable) {
// INFORMATION_SCHEMA not available
}
}
/**
* Create a VirtualDatabase with shadowed tables
*
* Registers all tables from this database as PartialQuery objects,
* then shadows with the provided tables. This allows joining between
* real database tables and mock/test data.
*
* @param array<string, TableInterface> $tables Table name => TableInterface to shadow
* @return DatabaseInterface VirtualDatabase with real + shadowed tables
*/
public function withTables(array $tables): DatabaseInterface
{
$vdb = new VirtualDatabase();
// Get all table names from schema
$tableNames = [];
foreach ($this->getSchema()->eq('type', 'column') as $row) {
$tableNames[$row->table_name] = true;
}
// Register real tables as PartialQuery (skip shadowed ones)
foreach (array_keys($tableNames) as $name) {
if (!isset($tables[$name])) {
$pq = PartialQuery::fromSql($this, $this->rawExecutor(), "SELECT * FROM {$name}");
$vdb->registerTable($name, $pq);
}
}
// Register shadow tables
foreach ($tables as $name => $table) {
$vdb->registerTable($name, $table);
}
return $vdb;
}
}