Jump to content

Building WebSockets in PHP from Scratch

From WikiJournal

Some time ago, I was looking for a library to work with WebSockets in PHP. During my research, I came across multiple articles discussing Node.js integration with Yii, while most WebSocket-related articles limited themselves to instructions on using phpdaemon.

I explored libraries like phpdaemon and Ratchet. They seemed overly complex, especially since Ratchet recommends using WAMP for sending messages to specific users. I couldn't understand why such heavyweight solutions were necessary, particularly when they required installing additional dependencies. After reviewing the source code of these and other libraries, I figured out how everything works and decided to write a simple WebSocket server in PHP myself. This helped me reinforce my understanding and discover some hidden pitfalls I hadn't considered before.

Thus, I set out to build the required functionality from scratch.

At the end of this article, you will find the complete code and a link to a demo chat.

Goals

  1. Understand server-side sockets in PHP.
  2. Learn the WebSocket protocol.
  3. Write a simple WebSocket server from scratch.

1) Server-side Sockets in PHP

Before this, I had only a vague understanding of server-side sockets. After reviewing several WebSocket library implementations, I encountered two common approaches:

Using the PHP socket extension:

 $socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); // Create socket
 socket_bind($socket, '127.0.0.1', 8000); // Bind to IP and port
 socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1); // Allow multiple connections on the same port
 socket_listen($socket); // Start listening

Using the PHP stream extension:

$socket = stream_socket_server("tcp://127.0.0.1:8000", $errno, $errstr);

I preferred the second option for its simplicity.

Now that we have created a server socket, we need to handle incoming connections. There are two main approaches:

Basic while-loop:

while ($connect = stream_socket_accept($socket, -1)) { // Wait for new connection (no timeout)
     ... // Handle $connect
 }

Using stream_select for multiple connections:

$connects = array();
 while (true) {
     $read = $connects;
     $read[] = $socket;
     $write = $except = null;
     
     if (!stream_select($read, $write, $except, null)) { // Wait for readable sockets (no timeout)
         break;
     }
 
     if (in_array($socket, $read)) { // New connection detected
         $connect = stream_socket_accept($socket, -1);
         $connects[] = $connect;
         unset($read[array_search($socket, $read)]);
     }
 
     foreach ($read as $connect) { // Process all active connections
         ... // Handle $connect
         unset($connects[array_search($connect, $connects)]);
     }
 }

Since we need to handle both new connections and existing ones for incoming messages, we will use the stream_select approach.

2) WebSocket Protocol

A great explanation of the WebSocket protocol can be found in this article. Here, we focus on two key aspects:

WebSocket Handshake

To establish a WebSocket connection, we need to read the Sec-WebSocket-Key header from the client request, compute the Sec-WebSocket-Accept value, and send a proper response:

 $SecWebSocketAccept = base64_encode(pack('H*', sha1($SecWebSocketKey . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')));
 $response = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" .
     "Upgrade: websocket\r\n" .
     "Connection: Upgrade\r\n" .
     "Sec-WebSocket-Accept: $SecWebSocketAccept\r\n\r\n";

Message Encoding and Decoding

When receiving data from a WebSocket, we need to decode it, and when sending data, we must encode it. The encoding process is well-documented in WebSocket specifications, but in practice, we only need two functions: decode() and encode().

3) Simple WebSocket Server

Now that we have all the necessary components, we can combine the HTTP server logic with handshake, decoding, and encoding functions to create a basic WebSocket server.

Example of a simple WebSocket server:

// Implementation of a basic WebSocket server

This example allows customization of event handlers like onOpen, onClose, and onMessage for custom functionality.

Goals Achieved

With this implementation, we have successfully:

  • Understood PHP server sockets.
  • Implemented the WebSocket protocol.
  • Built a simple WebSocket server from scratch.

If you found this material useful, in the next article, I will describe how to run multiple processes for handling connections (one master and several workers), inter-process communication, and integration with frameworks like Yii.

Demo Chat with the Above Functionality

#!/usr/bin/env php
<?php

class SimpleWebSocketServer
{
    public function __construct($settings) {
        $this->settings = $settings;
    }

    public function launch() {
        // Открываем серверный сокет
        $mainSocket = stream_socket_server("tcp://{$this->settings['host']}:{$this->settings['port']}", $errorNum, $errorMsg);

        if (!$mainSocket) {
            die("Ошибка: stream_socket_server: $errorMsg ($errorNum)\r\n");
        }

        list($processId, $primary, $workers) = $this->initializeWorkers();

        if ($processId) { // Главный процесс
            fclose($mainSocket); // Главный процесс не будет обрабатывать соединения
            $mainHandler = new WebSocketCoordinator($workers);
            $mainHandler->run();
        } else { // Воркер-процесс
            $clientHandler = new WebSocketClientHandler($mainSocket, $primary);
            $clientHandler->run();
        }
    }

    protected function initializeWorkers() {
        $primary = null;
        $workerPool = [];
        $counter = 0;

        while ($counter < $this->settings['workers']) {
            $counter++;

            // Создаём сокеты для связи между мастер-процессом и воркерами
            $socketPair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);

            $processId = pcntl_fork();
            if ($processId == -1) {
                die("Ошибка: pcntl_fork\r\n");
            } elseif ($processId) { // Главный процесс
                fclose($socketPair[0]);
                $workerPool[$processId] = $socketPair[1];
            } else { // Воркер-процесс
                fclose($socketPair[1]);
                $primary = $socketPair[0];
                break;
            }
        }

        return [$processId, $primary, $workerPool];
    }
}

class WebSocketCoordinator
{
    protected $workerPool = [];
    protected $clientSockets = [];

    public function __construct($workers) {
        $this->clientSockets = $this->workerPool = $workers;
    }

    public function run() {
        while (true) {
            $activeSockets = $this->clientSockets;

            stream_select($activeSockets, $write, $except, null);

            if ($activeSockets) {
                foreach ($activeSockets as $client) {
                    $data = fread($client, 1000);

                    if (!$data) {
                        unset($this->clientSockets[intval($client)]);
                        @fclose($client);
                        continue;
                    }

                    foreach ($this->workerPool as $worker) {
                        if ($worker !== $client) {
                            fwrite($worker, $data);
                        }
                    }
                }
            }
        }
    }
}

abstract class WebSocketWorker
{
    protected $connectedClients = [];
    protected $serverSocket;
    protected $primarySocket;
    protected $processId;
    protected $pendingHandshakes = [];
    protected $ipAddresses = [];

    public function __construct($server, $primary) {
        $this->serverSocket = $server;
        $this->primarySocket = $primary;
        $this->processId = posix_getpid();
    }

    public function run() {
        while (true) {
            $activeSockets = $this->connectedClients;
            $activeSockets[] = $this->serverSocket;
            $activeSockets[] = $this->primarySocket;

            $writeSockets = [];
            if ($this->pendingHandshakes) {
                foreach ($this->pendingHandshakes as $clientId => $clientInfo) {
                    if ($clientInfo) {
                        $writeSockets[] = $this->connectedClients[$clientId];
                    }
                }
            }

            stream_select($activeSockets, $writeSockets, $except, null);

            if (in_array($this->serverSocket, $activeSockets)) {
                if ($clientSocket = stream_socket_accept($this->serverSocket, -1)) {
                    $clientAddr = explode(':', stream_socket_get_name($clientSocket, true));
                    if (isset($this->ipAddresses[$clientAddr[0]]) && $this->ipAddresses[$clientAddr[0]] > 5) {
                        @fclose($clientSocket);
                    } else {
                        @$this->ipAddresses[$clientAddr[0]]++;

                        $this->connectedClients[intval($clientSocket)] = $clientSocket;
                        $this->pendingHandshakes[intval($clientSocket)] = [];
                    }
                }

                unset($activeSockets[array_search($this->serverSocket, $activeSockets)]);
            }

            if (in_array($this->primarySocket, $activeSockets)) {
                $data = fread($this->primarySocket, 1000);
                $this->onBroadcast($data);
                unset($activeSockets[array_search($this->primarySocket, $activeSockets)]);
            }

            if ($activeSockets) {
                foreach ($activeSockets as $client) {
                    if (isset($this->pendingHandshakes[intval($client)])) {
                        if ($this->pendingHandshakes[intval($client)]) {
                            continue;
                        }

                        if (!$this->performHandshake($client)) {
                            unset($this->connectedClients[intval($client)]);
                            unset($this->pendingHandshakes[intval($client)]);
                            $clientAddr = explode(':', stream_socket_get_name($client, true));
                            if (isset($this->ipAddresses[$clientAddr[0]]) && $this->ipAddresses[$clientAddr[0]] > 0) {
                                @$this->ipAddresses[$clientAddr[0]]--;
                            }
                            @fclose($client);
                        }
                    } else {
                        $data = fread($client, 1000);

                        if (!$data) {
                            unset($this->connectedClients[intval($client)]);
                            unset($this->pendingHandshakes[intval($client)]);
                            $clientAddr = explode(':', stream_socket_get_name($client, true));
                            if (isset($this->ipAddresses[$clientAddr[0]]) && $this->ipAddresses[$clientAddr[0]] > 0) {
                                @$this->ipAddresses[$clientAddr[0]]--;
                            }
                            @fclose($client);
                            $this->onDisconnect($client);
                            continue;
                        }

                        $this->onMessageReceived($client, $data);
                    }
                }
            }

            if ($writeSockets) {
                foreach ($writeSockets as $client) {
                    if (!$this->pendingHandshakes[intval($client)]) {
                        continue;
                    }
                    $info = $this->performHandshake($client);
                    $this->onClientConnect($client, $info);
                }
            }
        }
    }

    abstract protected function onClientConnect($client, $info);
    abstract protected function onDisconnect($client);
    abstract protected function onMessageReceived($client, $data);
    abstract protected function onBroadcast($data);
}

// Реализация чата
class WebSocketClientHandler extends WebSocketWorker
{
    protected function onClientConnect($client, $info) {}

    protected function onDisconnect($client) {}

    protected function onMessageReceived($client, $data) {
        $decodedMessage = $this->decode($data);

        if (!$decodedMessage['payload']) {
            return;
        }

        if (!mb_check_encoding($decodedMessage['payload'], 'utf-8')) {
            return;
        }

        $message = 'User #' . intval($client) . ' (' . $this->processId . '): ' . strip_tags($decodedMessage['payload']);
        $this->broadcastMessage($message);
    }

    protected function onBroadcast($data) {
        $this->distributeMessage($data);
    }

    protected function broadcastMessage($message) {
        @fwrite($this->primarySocket, $message);
    }

    private function distributeMessage($data) {
        $encodedMessage = $this->encode($data);

        $writeSockets = $this->connectedClients;
        if (stream_select($read, $writeSockets, $except, 0)) {
            foreach ($writeSockets as $client) {
                @fwrite($client, $encodedMessage);
            }
        }
    }
}

$settings = [
    'host' => '0.0.0.0',
    'port' => 8000,
    'workers' => 1,
];

$serverInstance = new SimpleWebSocketServer($settings);
$serverInstance->launch();

Update (Best Comments from Readers):

  • Each connection consumes about 9KB of memory.
  • Using fgets() with open sockets can cause "hanging" because WebSocket messages do not end with a newline. Use fread() instead.
  • When writing a response to a socket using fwrite(), always check if all bytes were successfully written.
  • Before sending data from the server, check if the client is ready to receive using stream_socket_accept().
  • Sending non-UTF-8 characters to the socket will cause the client to disconnect with an error: WebSocket connection to 'ws://sharoid.ru:8000/' failed: Could not decode a text frame as UTF-8.
  • To check if no data was received and the socket should be closed, use !strlen($data), not !$data.
  • You can place an Nginx server in front of the WebSocket server for better performance.