MultipartMessageStream.php

PHP

Path: src/Mail/MultipartMessageStream.php

<?php

namespace mini\Mail;

use Psr\Http\Message\MessageInterface;
use Psr\Http\Message\StreamInterface;

/**
 * Streaming reader for MultipartMessage body
 *
 * Produces RFC 2046 compliant multipart content by streaming through
 * child parts without buffering entire bodies.
 *
 * For each part:
 * 1. Emit boundary delimiter
 * 2. Emit part's headers
 * 3. Emit blank line
 * 4. Forward reads to part's body stream
 * 5. Emit CRLF
 *
 * After all parts: emit closing boundary.
 *
 * @internal Used by MultipartMessage::getBody()
 */
class MultipartMessageStream implements StreamInterface
{
    private const CRLF = "\r\n";

    /** @var MessageInterface[] */
    private array $parts;

    private string $boundary;

    private int $partIndex = 0;
    private int $phase = self::PHASE_INIT;

    private const PHASE_INIT = 0;
    private const PHASE_BOUNDARY = 1;
    private const PHASE_HEADERS = 2;
    private const PHASE_BODY = 3;
    private const PHASE_AFTER_BODY = 4;
    private const PHASE_CLOSING = 5;
    private const PHASE_DONE = 6;

    private string $buffer = '';
    private ?StreamInterface $currentBody = null;
    private bool $detached = false;

    /**
     * @param MessageInterface[] $parts
     * @param string $boundary
     */
    public function __construct(array $parts, string $boundary)
    {
        $this->parts = array_values($parts);
        $this->boundary = $boundary;

        if (count($this->parts) > 0) {
            $this->phase = self::PHASE_BOUNDARY;
            $this->preparePhase();
        } else {
            $this->phase = self::PHASE_CLOSING;
            $this->preparePhase();
        }
    }

    private function preparePhase(): void
    {
        switch ($this->phase) {
            case self::PHASE_BOUNDARY:
                $this->buffer = '--' . $this->boundary . self::CRLF;
                break;

            case self::PHASE_HEADERS:
                $part = $this->parts[$this->partIndex];
                $headers = '';
                foreach ($part->getHeaders() as $name => $values) {
                    foreach ($values as $value) {
                        $headers .= "{$name}: {$value}" . self::CRLF;
                    }
                }
                $headers .= self::CRLF;
                $this->buffer = $headers;
                break;

            case self::PHASE_BODY:
                $part = $this->parts[$this->partIndex];
                $this->currentBody = $part->getBody();
                if ($this->currentBody->isSeekable()) {
                    $this->currentBody->rewind();
                }
                break;

            case self::PHASE_AFTER_BODY:
                $this->buffer = self::CRLF;
                $this->currentBody = null;
                break;

            case self::PHASE_CLOSING:
                $this->buffer = '--' . $this->boundary . '--' . self::CRLF;
                break;

            case self::PHASE_DONE:
                $this->buffer = '';
                $this->currentBody = null;
                break;
        }
    }

    private function advance(): void
    {
        switch ($this->phase) {
            case self::PHASE_BOUNDARY:
                $this->phase = self::PHASE_HEADERS;
                break;

            case self::PHASE_HEADERS:
                $this->phase = self::PHASE_BODY;
                break;

            case self::PHASE_BODY:
                $this->phase = self::PHASE_AFTER_BODY;
                break;

            case self::PHASE_AFTER_BODY:
                $this->partIndex++;
                if ($this->partIndex < count($this->parts)) {
                    $this->phase = self::PHASE_BOUNDARY;
                } else {
                    $this->phase = self::PHASE_CLOSING;
                }
                break;

            case self::PHASE_CLOSING:
                $this->phase = self::PHASE_DONE;
                break;
        }

        $this->preparePhase();
    }

    public function read(int $length): string
    {
        if ($this->detached) {
            throw new \RuntimeException('Stream is detached');
        }

        $result = '';
        $remaining = $length;

        while ($remaining > 0 && $this->phase !== self::PHASE_DONE) {
            if ($this->phase === self::PHASE_BODY) {
                if ($this->currentBody !== null && !$this->currentBody->eof()) {
                    $chunk = $this->currentBody->read($remaining);
                    $result .= $chunk;
                    $remaining -= strlen($chunk);
                }

                if ($this->currentBody === null || $this->currentBody->eof()) {
                    $this->advance();
                }
            } else {
                if ($this->buffer !== '') {
                    $chunk = substr($this->buffer, 0, $remaining);
                    $this->buffer = substr($this->buffer, strlen($chunk));
                    $result .= $chunk;
                    $remaining -= strlen($chunk);
                }

                if ($this->buffer === '') {
                    $this->advance();
                }
            }
        }

        return $result;
    }

    public function getContents(): string
    {
        if ($this->detached) {
            throw new \RuntimeException('Stream is detached');
        }

        $contents = '';
        while (!$this->eof()) {
            $contents .= $this->read(8192);
        }
        return $contents;
    }

    public function __toString(): string
    {
        try {
            // Create fresh instance to read from beginning
            $stream = new self($this->parts, $this->boundary);
            return $stream->getContents();
        } catch (\Throwable $e) {
            return '';
        }
    }

    public function eof(): bool
    {
        return $this->detached || $this->phase === self::PHASE_DONE;
    }

    public function tell(): int
    {
        throw new \RuntimeException('MultipartMessageStream does not support tell()');
    }

    public function seek(int $offset, int $whence = SEEK_SET): void
    {
        throw new \RuntimeException('MultipartMessageStream is not seekable');
    }

    public function rewind(): void
    {
        if ($this->detached) {
            throw new \RuntimeException('Stream is detached');
        }

        $this->partIndex = 0;
        $this->buffer = '';
        $this->currentBody = null;

        if (count($this->parts) > 0) {
            $this->phase = self::PHASE_BOUNDARY;
        } else {
            $this->phase = self::PHASE_CLOSING;
        }
        $this->preparePhase();
    }

    public function isSeekable(): bool
    {
        return false;
    }

    public function isWritable(): bool
    {
        return false;
    }

    public function write(string $string): int
    {
        throw new \RuntimeException('MultipartMessageStream is not writable');
    }

    public function isReadable(): bool
    {
        return !$this->detached;
    }

    public function getSize(): ?int
    {
        return null;
    }

    public function getMetadata(?string $key = null): mixed
    {
        $meta = [
            'seekable' => false,
            'eof' => $this->eof(),
        ];

        if ($key === null) {
            return $meta;
        }
        return $meta[$key] ?? null;
    }

    public function close(): void
    {
        $this->detached = true;
        $this->buffer = '';
        $this->currentBody = null;
    }

    public function detach()
    {
        $this->detached = true;
        $this->buffer = '';
        $this->currentBody = null;
        return null;
    }
}