您的当前位置:首页>全部文章>文章详情

THINKPHP5整合workerman+gateway

发表于:2023-04-09 22:56:28浏览:858次TAG: #php #thinkphp

这次要开发聊天系统 , 需要用到WebSocket 我使用的是workerman+gateway,为了方便后面再用,做个简单记录

首先要特别注意的是,端口要开放,如果端口未开放,会出现连接时握手失败的情况,这里我用的商品是 801 和 802

1、安装workerman和gateway

composer require topthink/think-worker
composer require workerman/gatewayclient

2、添加server.php文件,后成需要在CLI模式下运行

#!/usr/bin/env php
<?php
ini_set('display_errors', 'on');
if(strpos(strtolower(PHP_OS), 'win') === 0)
{
    exit("start.php not support windows.\n");
}
// 检查扩展
if(!extension_loaded('pcntl'))
{
    exit("Please install pcntl extension. See http://doc3.workerman.net/appendices/install-extension.html\n");
}
if(!extension_loaded('posix'))
{
    exit("Please install posix extension. See http://doc3.workerman.net/appendices/install-extension.html\n");
}
define('APP_PATH', __DIR__ . '/application/');
define('BIND_MODULE','push/Run');
// 加载框架引导文件
require __DIR__ . '/thinkphp/start.php';

3、创建push模块新建两个控制器Run.php Events.php

Start.php

<?php
namespace app\push\controller;
use Workerman\Worker;
use GatewayWorker\Register;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
class Run extends Worker{
  /**
     * 构造函数
     * @access public
     */

    public function __construct(){
        //由于是手动添加,因此需要注册命名空间,方便自动加载,具体代码路径以实际情况为准
                \think\Loader::addNamespace([
                    'Workerman' => VENDOR_PATH . 'Workerman/workerman',
                    'GatewayWorker' =>VENDOR_PATH . 'Workerman/gateway-worker/src',
                ]);
                //初始化各个GatewayWorker
        //初始化register
        new Register('text://0.0.0.0:801');

         //初始化 bussinessWorker 进程
        $worker = new BusinessWorker();
        $worker->name = 'WebIMBusinessWorker';
        $worker->count = 4;
        $worker->registerAddress = '127.0.0.1:801';
        //设置处理业务的类,此处制定Events的命名空间
        $worker->eventHandler = '\app\push\controller\Events';
        // 初始化 gateway 进程
        $gateway = new Gateway("websocket://0.0.0.0:802");
        $gateway->name = 'WebIMGateway';
        $gateway->count = 4;
        $gateway->lanIp = '127.0.0.1';
        $gateway->startPort = 2900;
        $gateway->registerAddress = '127.0.0.1:801';

         //运行所有Worker;
        Worker::runAll();
    }
}

Events.php

<?php
namespace app\push\controller;
use GatewayWorker\Lib\Gateway;
/**
 * 主逻辑
 * 主要是处理 onConnect onMessage onClose 三个方法
 * onConnect 和 onClose 如果不需要可以不用实现并删除
 */
class Events{
/**
    * 当客户端发来消息时触发
    * @param int $client_id 连接id
    * @param mixed $data 具体消息
    */
    public static function onMessage($client_id, $data){
       $message = json_decode($data, true);
       $message_type = $message['type'];
       switch($message_type) {
           case 'init':
               // uid
               $uid = $message['id'];
               // 设置session
               $_SESSION = [
                   'username' => $message['username'],
                   'avatar'   => isset($message['avatar'])?$message['avatar']:'',
                   'id'       => $uid,
                   'sign'     => $message['sign'],
                    'type'=> $type, 
               ];
               // 将当前链接与uid绑定
               Gateway::bindUid($client_id, $uid);
               // 通知当前客户端初始化
               $init_message = array(
                   'message_type' => 'init',
                   'id'           => $uid,
               );
               Gateway::sendToClient($client_id, json_encode($init_message));
               return;
               break;
           case 'chatMessage':
               // 聊天消息
               $type = $message['data']['to']['type'];
               $to_id = $message['data']['to']['id'];
               $uid = $_SESSION['id'];

               $chat_message = [
                    'message_type' => 'chatMessage',
                    'data' => [
                        'username' => $_SESSION['username'],
                        'avatar'   => $_SESSION['avatar'],
                        'id'       => $type === 'friend' ? $uid : $to_id,
                        'type'     => $type,
                        'content'  => htmlspecialchars($message['data']['mine']['content']),
                        'timestamp'=> time()*1000,
                    ]
               ];

               return Gateway::sendToUid($to_id, json_encode($chat_message));
               break;
           case 'hide':
           case 'online':
               $status_message = [
                   'message_type' => $message_type,
                   'id'           => $_SESSION['id'],
               ];
               $_SESSION['online'] = $message_type;
               Gateway::sendToAll(json_encode($status_message));
               return;
               break;
           case 'ping':
               return;
           default:
               echo "unknown message $data" . PHP_EOL;
       }
    }
    /**
     * 当客户端连接时触发
     * 如果业务不需此回调可以删除onConnect
     * 
     * @param int $client_id 连接id
     */
    public static function onConnect($client_id)
    {

        var_dump($client_id);
        Gateway::sendToClient($client_id,json_encode(['status'=>"success",'msg'=>"连接成功"]));
        Gateway::sendToAll("连接成功");
    }
    /**
     * 当连接断开时触发的回调函数
     * @param $connection
     */
    public static function onClose($client_id){
       $logout_message = [
           'message_type' => 'logout',
           'id'           => $_SESSION['id']
       ];
       Gateway::sendToAll(json_encode($logout_message));
    }
    /**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */
    public static function onError($client_id, $code, $msg)
    {
        echo "error $code $msg\n";
    }
    /**
     * 每个进程启动
     * @param $worker
     */
    public static function onWorkerStart($worker)
    {
    }
}

然后执行命令开启进程

php server.php start

前端示例:

<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <title>Title</title>
    <script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script>
</head>
<body>

<script>
    ws = new WebSocket("ws:0.0.0.0:802");    //填你线上服务端地址
    // 服务端主动推送消息时会触发这里的onmessage
    ws.onopen = function(){
        console.info("与服务端连接成功");
        ws.send('test msg\n');//相当于发送一个初始化信息
        console.info("向服务端发送心跳包字符串");
    };

    ws.onmessage = function(e){
        // json数据转换成js对象
        var data = eval("("+e.data+")");
        console.log(data.msg);
    };

    ws.onclose = function(e){
        console.log(e);
    };

</script>

</body>
</html>