上代码:
class AcePush
{
public $server;
public function __construct()
{
$this->server = new Swoole\Websocket\Server("0.0.0.0", 9573);
$this->server->set([
'dispatch_mode' => 5,
'worker_num' => 1,
'heartbeat_check_interval' => 30,
'heartbeat_idle_time' => 62,
]);
$this->server->on('open', [$this, 'onOpen']);
$this->server->on('message', [$this, 'onMessage']);
$this->server->on('close', [$this, 'onClose']);
$this->server->on('request', [$this, 'onRequest']);
}
public function onOpen(swoole_websocket_server $server, $request)
{
// get不存在或者uid和token有一项不存在,关闭当前连接
if (!isset($request->get) || !isset($request->get['uid']) || !isset($request->get['token']) || !isset($request->get['client'])) {
$this->server->close($request->fd);
return false;
}
$token = $request->get['token'];
$client = $request->get['client'];
$uid = $request->get['uid'];
if (false == $this->checkAccess($token, $client)) {
$this->server->close($request->fd);
return false;
}
// 把 client, uid , fd 存入mysql或redis,以便两个用户问互发消息
$this->server->bind($request->fd, $uid);
return true;
}
public function onMessage(Swoole\WebSocket\Server $server, \Swoole\WebSocket\Frame $frame)
{
echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
$data = json_decode($frame->data, true);
if (!$data) {
$this->invalidMessageResponse($frame, "Invalid data format");
return false;
}
if (!isset($data['act']) || !isset($data['message']) || !isset($data['to_uid'])) {
$this->invalidMessageResponse($frame, "Invalid data parameter");
return false;
}
// 根据act 判断是否特殊处理
switch ($data['act']) {
case 'ping':
$this->server->push($frame->fd, json_encode(['act' => 'pong', 'status' => 1]));
return true;
break;
default:
}
if ($data['to_uid'] == 'all') {
$this->broadcastAll($data['message'], $data['act']);
return true;
} else {
$this->server->push($data['to_uid'], json_encode($data, JSON_UNESCAPED_UNICODE));
return true;
}
}
public function onClose($ser, $fd)
{
echo "client {$fd} closed\n";
}
public function onRequest($request, \Swoole\Http\Response $response)
{
if (!isset($request->get) || !isset($request->get['act'])) {
$this->invalidHttpRequest();
return false;
}
$act = $request->get['act'];
if ($act == 'shutdown') {
$this->server->shutdown();
} elseif ($act == 'stats') {
$data['connections'] = [];
foreach ($this->server->connections as $fd) {
if ($this->server->exist($fd)) {
$this->server->push($fd, $request->get['message']);
}
$client = $this->server->getClientInfo($fd);
$client['is_connect'] = $this->server->isEstablished($fd);
$data['connections'][] = $client;
}
$data['stats'] = $this->server->stats();
$response->end(json_encode($data));
} elseif ($act == 'push') {
$act = 'message';
$message = [
'time' => date('Y-m-d H:i:s'),
'message' => $request->get['message'] ?? '',
'voice' => $request->get['voice'] ?? ''
];
$this->broadcastAll($message, $act);
$response->end('ok');
} else {
$response->end(json_encode('Undefined action'));
}
return true;
}
public function invalidHttpRequest($message = "Invalid request")
{
$data = [
'status' => 403,
'message' => $message
];
echo json_encode($data, JSON_UNESCAPED_UNICODE);
}
public function checkAccess($token, $client = 'admin')
{
if ($token) {
$url = '';
$resp = $this->httpRequest($url);
return true;
} else {
return false;
}
}
public function broadcastAll($message, $act)
{
$data = [
'status' => 1,
'message' => $message,
'act' => $act
];
$resp = json_encode($data, JSON_UNESCAPED_UNICODE);
foreach ($this->server->connections as $fd) {
if ($this->server->isEstablished($fd)) {
$this->server->push($fd, $resp);
}
}
}
public function parsePushRequest($request)
{
}
public function invalidMessageResponse(\Swoole\WebSocket\Frame $request, $message = "Invalid Message")
{
$resp = [
'status' => 0,
'message' => $message,
'act' => 'none'
];
$this->server->push($request->fd, json_encode($resp));
}
public function httpRequest($url, $data = null)
{
return '';
}
public function start()
{
$this->server->start();
}
}
(new AcePush())->start();