Skip to content

Commit 3e8f05b

Browse files
author
jianyan74
committed
websocket修改投递方式为task
1 parent 6f3df78 commit 3e8f05b

File tree

2 files changed

+132
-37
lines changed

2 files changed

+132
-37
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ composer require "jianyan74/yii2-websocket"
3939
'port' => 9501,// 监听端口
4040
'config' => [// 标准的swoole配置项都可以再此加入
4141
'daemonize' => false,// 守护进程执行
42+
'task_worker_num' => 4,//task进程的数量
4243
'ssl_cert_file' => '',
4344
'ssl_key_file' => '',
4445
'pid_file' => __DIR__ . '/../../backend/runtime/logs/server.pid',

src/WebSocketServer.php

Lines changed: 131 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,11 @@ public function run()
6060
{
6161
// 启动进程
6262
$this->_server = new swoole_websocket_server($this->_host, $this->_port, $this->_mode, $this->_socketType | SWOOLE_SSL);
63-
$this->_server->set([
64-
// 以非守护进程执行
65-
'daemonize' => $this->_config['daemonize'],
66-
// 配置wss
67-
'ssl_cert_file' => $this->_config['ssl_cert_file'],
68-
'ssl_key_file' => $this->_config['ssl_key_file'],
69-
]);
70-
63+
$this->_server->set($this->_config);
7164
$this->_server->on('open', [$this, 'onOpen']);
7265
$this->_server->on('message', [$this, 'onMessage']);
66+
$this->_server->on('task', [$this, 'onTask']);
67+
$this->_server->on('finish', [$this, 'onFinish']);
7368
$this->_server->on('close', [$this, 'onClose']);
7469
$this->_server->start();
7570
}
@@ -90,62 +85,161 @@ public function onOpen($server, $frame)
9085

9186
/**
9287
* 消息
93-
*
9488
* @param $server
9589
* @param $frame
90+
* @throws \Exception
9691
*/
9792
public function onMessage($server, $frame)
9893
{
99-
echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
100-
// 消息发送给自己
101-
$server->push($frame->fd, $frame->data);
102-
// 消息发送给别人
103-
$this->broadcast($frame->fd, $frame->data);
94+
// 调试信息
95+
echo $frame->data . "\n";
96+
//echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
97+
98+
$message = json_decode($frame->data, true);
99+
if (!$message)
100+
{
101+
echo "没有消息内容";
102+
return true;
103+
}
104+
105+
// 业务逻辑
106+
switch ($message['type'])
107+
{
108+
// 心跳
109+
case 'pong':
110+
return true;
111+
break;
112+
113+
// 进入房间(登录)
114+
case 'login':
115+
// 判断是否有房间号
116+
if(!isset($message['room_id']))
117+
{
118+
throw new \Exception("\$message['room_id'] not set. client_ip:{$_SERVER['REMOTE_ADDR']} \$message:$message");
119+
}
120+
121+
$_SESSION['room_id'] = $message['room_id'];
122+
$_SESSION['client_name'] = $message['client_name'];
123+
124+
// 转播给当前房间的所有客户端,xx进入聊天室 message {type:login, client_id:xx, name:xx}
125+
$new_message = [
126+
'type' => $message['type'],
127+
'client_id' => $frame->fd,
128+
'name' => $message['client_name'],
129+
'time' => date('Y-m-d H:i:s'),
130+
];
131+
132+
//投递到task广播消息
133+
$server->task(json_encode($new_message));
134+
break;
135+
136+
// 评论消息
137+
case 'say':
138+
// 非法请求
139+
if(!isset($_SESSION['room_id']))
140+
{
141+
throw new \Exception("\$_SESSION['room_id'] not set. client_ip:{$_SERVER['REMOTE_ADDR']}");
142+
}
143+
144+
$room_id = $_SESSION['room_id'];
145+
$client_name = $_SESSION['client_name'];
146+
$message['emoji_id'] = isset($message['emoji_id']) ?? '';
147+
148+
// 私聊
149+
if($message['to_client_id'] != 'all')
150+
{
151+
$new_message = [
152+
'type' => $message['type'],
153+
'from_client_id'=> $frame->fd,
154+
'from_client_name' =>$client_name,
155+
'to_client_id' => $message['to_client_id'],
156+
'emoji_id' => $message['emoji_id'],
157+
'content' => nl2br(htmlspecialchars($message['content'])),
158+
'time' => date('Y-m-d H:i:s'),
159+
];
160+
161+
// 私发
162+
$server->push($frame->fd, json_encode($new_message));
163+
}
164+
165+
$new_message = [
166+
'type' => $message['type'],
167+
'from_client_id'=> $frame->fd,
168+
'from_client_name' =>$client_name,
169+
'to_client_id' => 'all',
170+
'emoji_id' => $message['emoji_id'],
171+
'content' => nl2br(htmlspecialchars($message['content'])),
172+
'time'=> date('Y-m-d H:i:s'),
173+
];
174+
175+
// 广播消息
176+
$server->task(json_encode($new_message));
177+
break;
178+
179+
// 礼物
180+
case 'gift':
181+
182+
$client_name = $_SESSION['client_name'];
183+
$new_message = [
184+
'type' => $message['type'],
185+
'from_client_id'=> $frame->fd,
186+
'from_client_name' => $client_name,
187+
'to_client_id' => 'all',
188+
'gift_id' => $message['gift_id'],
189+
'time'=> date('Y-m-d H:i:s'),
190+
];
191+
192+
// 广播消息
193+
$server->task(json_encode($new_message));
194+
break;
195+
}
196+
197+
return true;
104198
}
105199

106200
/**
107201
* 关闭连接
108202
*
109-
* @param $ser
203+
* @param $server
110204
* @param $fd
111205
*/
112-
public function onClose($ser, $fd)
206+
public function onClose($server, $fd)
113207
{
114208
echo "client {$fd} closed\n";
115209
// 删除
116210
$this->_table->del($fd);
117211
}
118212

119213
/**
120-
* 广播进程
214+
* 处理异步任务
121215
*
122-
* @param integer $client_id 客户端id
123-
* @param string $msg 广播消息
216+
* @param $server
217+
* @param $task_id
218+
* @param $from_id
219+
* @param $data
124220
*/
125-
public function broadcast($client_id, $msg)
221+
public function onTask($server, $task_id, $from_id, $data)
126222
{
127-
//广播
128-
foreach ($this->_table as $cid => $info)
129-
{
130-
if ($client_id != $cid)
131-
{
132-
$this->_server->push($cid, $msg);
133-
}
134-
}
223+
echo "新 AsyncTask[id=$task_id]" . PHP_EOL;
224+
225+
$server->finish($data);
135226
}
136227

137228
/**
138-
* 创建内存表
229+
* 处理异步任务的结果
139230
*
140-
* 数指定表格的最大行数,如果$size不是为2的N次方,如1024、8192,65536等,底层会自动调整为接近的一个数字
141-
* 占用的内存总数为 (结构体长度 + KEY长度64字节 + 行尺寸$size) * (1.2预留20%作为hash冲突) * (列尺寸),如果机器内存不足table会创建失败
231+
* @param $server
232+
* @param $task_id
233+
* @param $data
142234
*/
143-
private function createTable()
235+
public function onFinish($server, $task_id, $data)
144236
{
145-
$this->_table = new swoole_table(1024);
146-
$this->_table->column('fd', swoole_table::TYPE_INT);
147-
//$this->_table->column('name', swoole_table::TYPE_STRING, 255);
148-
//$this->_table->column('avatar', swoole_table::TYPE_STRING, 255);
149-
$this->_table->create();
237+
//广播
238+
foreach ($this->_table as $cid => $info)
239+
{
240+
$server->push($cid, $data);
241+
}
242+
243+
echo "AsyncTask[$task_id] 完成: $data" . PHP_EOL;
150244
}
151245
}

0 commit comments

Comments
 (0)