登陆

netty结构完成websocket到达高并发

admin 2019-10-31 277人围观 ,发现0个评论

导言:

在前面两篇文章中,咱们对原生websocket进行了了解,且用demo来简略的讲解了其用法。可是在实践项目中,那样的用法是不可取的,理由是tomcat对高并发的支撑不怎么好,特别是tomcat9之前,能够测验发现websocket衔接到达的数量很低,且简略断开。

所以有现在的第三篇,对websocket的一种进阶办法。

什么是Netty

Nenetty结构完成websocket到达高并发tty是业界最盛行的NIO结构之一,它的健壮性、功用、功用、可定制性和可扩展性在同类结构中都是名列前茅的,它现已得到成百上千的商用项目验证,例如Hadoop的RPC结构Avro就运用了Netty作为底层通讯结构,其他还有业界干流的RPC结构,也运用Netty来构建高功用的异步通讯才能。

经过对Netty的剖析,咱们将它的长处总结如下:

API运用简略,开发门槛低;

功用强大,预置了多种编解码功用,支撑多种干流协议;

定制才能强,能够经过ChannelHandler对通讯结构进行灵敏地扩展;

功用高,经过与其他业界干流的NIO结构比照,Netty的归纳功用最优;

老练、安稳,Netty修正了现已发现的一切JDK NIO BUG,事务开发人员不需求再为NIO的BUG而烦恼;

社区活泼,版别迭代周期短,发现的BUG能够被及时修正,一起,更多的新功用会参加;

阅历了大规模的商业运用检测,肖泽青质量得到验证。Netty在互联网、大数据、网络游戏、企业运用、电信软件等很多职业现已得到了成功商用,证明它现已彻底能够满意不同职业的商业运用了。

依据Netty的websocket压力测验

Demo详解

1.导入netty包

io.netty

netty-all

5.0.0.Alpha1

2.server发动类

以下@Service,@PostConstruct注解是标示spring发动时发动的注解,新开一个线程去敞开netty服务器端口。

package com.nettywebsocket;

import javax.annotation.PostConstruct;

import org.springframework.stereotype.Service;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.Channel;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

/**

* ClassName:NettyServer 注解式随spring发动

* Function: TODO ADD FUNCTION.

* @author hxy

*/

@Service

public class NettyServer {

public static void main(String[] args) {

new NettyServer().run();

}

@PostConstruct

public void initNetty(){

new Thread(){

public void run() {

new NettyServer().run();

}

}.start();

}

public void run(){

System.out.println("===========================Netty端口发动========");

// Boss线程:由这个线程池供给的线程是boss品种的,用于创立、衔接、绑定socket, (有点像门卫)然后把这些socket传给worker线程池。

// 在服务器端每个监听的socket都有一个boss线程来处理。在客户端,只要一个boss线程来处理一切的socket。

EventLoopGroup bossGroup = new NioEventLoopGroup();

// Worker线程:Worker线程履行一切的异步I/O,即处理操作

EventLoopGroup workGroup = new NioEventLoopGroup();

try {

// ServerBootstrap 发动NIO服务的辅佐发动类,担任初始话netty服务器,而且开端监听端口的socket恳求

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workGroup);

// 设置非堵塞,用它来建立新accept的衔接,用于结构serversocketchannel的工厂类

b.channel(NioServerSocketChannel.class);

// ChildChannelHandler 对收支的数据进行的事务操作,其承继ChannelInitializer

b.childHandler(new ChildChannelHandler());

System.out.println("服务端敞开等候客户端衔接 ... ...");

Channel ch = b.bind(7397).sync().channel();

ch.closeFuture().sync();

} catchnetty结构完成websocket到达高并发 (Exception e) {

e.printStackTrace();

}finally{

bossGroup.shutdownGracefully();

workGroup.shutdownGracefully();

}

}

}

3.channle注册类

package com.nettywebsocket;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.socket.SocketChannel;

import io.netty.handler.codec.http.HttpObjectAggregator;

import io.netty.handler.codec.http.HttpServerCodec;

import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

import io.netty.handler.stream.ChunkedWriteHandler;

/**

* ClassName:ChildChannelHandler

* Function: TODO ADD FUNCTION.

* @author hxy

*/

public class ChildChannelHandler extends ChannelInitializer{

@Override

protected void initChannel(SocketChannel e) throws Exception {

// 设置30秒没有读到数据,则触发一个READER_IDLE事情。

// pipeline.addLast(new IdleStateHandler(30, 0, 0));

// HttpServerCodec:将恳求和应对音讯解码为HTTP音讯

e.pipeline().addLast("http-codec",new HttpServerCodec());

// HttpObjectAggregator:将HTTP音讯的多个部分合成一条完好的HTTP音讯

e.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));

// ChunkedWriteHandler:向客户端发送HTML5文件

e.pipeline().addLast("http-chunked",new ChunkedWriteHandler());

// 在管道中添加咱们自己的接纳数据完结办法

e.pipeline().addLast("handler",new MyWebSocketServerHandler());

}

}

4.存储类

以下类netty结构完成websocket到达高并发是用来存储拜访的channle,channelGroup的原型是set调集,确保channle的仅有,如需依据参数标示存储,能够运用currentHashMap来存储。

package com.nettywebsocket;

import io.netty.channel.group.ChannelGroup;

import io.netty.channel.group.DefaultChannelGroup;

import io.netty.util.concurrent.GlobalEventExecutor;

/**

* ClassName:Global

* Function: TODO ADD FUNCTION.

* @author hxy

*/

public class Global {

public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

}

5.实践处理类

以下处理类尽管做了注释,可是在这里仍是具体讲解下。

这个类是单例的,每个线程处理睬新实例化一个类。

每个成功的线程拜访次序:channelActive(敞开衔接)-handleHttpRequest(http握手处理)-messageReceived(音讯接纳处理)-handlerWebSocketFrame(实践处理,能够放到其他类里边分事务进行)

留意:这个demo中我做了路由功用,在handleHttpRequest中对每个channel衔接的时分对每个衔接的url进行绑定参数,然后在messageReceived中获取绑定的参数进行分发处理(handlerWebSocketFrame或handlerWebSocketFrame2),一起也获取了uri后置参数,有注释。

针对第三点路由分发,还有一种办法便是handshaker的uri()办法,看源码即可,简略好用。

群发的时分遍历调集或许map的时分,有必要每个channle都实例化一个TextWebSocketFrame目标,不然会报错或许发不出。

package com.nettywebsocket;

import java.util.Date;

import java.util.List;

import java.util.Map;

import java.util.logging.Level;

import java.util.logging.Logger;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelFutureListener;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.codec.http.DefaultFullHttpResponse;

import io.netty.handler.codec.http.FullHttpRequest;

import io.netty.handler.codec.http.HttpHeaders;

import io.netty.handler.codec.http.HttpMethod;

import io.netty.handler.codec.http.HttpResponseStatus;

import io.netty.handler.codec.http.HttpVersion;

import io.netty.handler.codec.http.QueryStringDecoder;

import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;

import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;

import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;

import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import io.netty.handler.codec.http.websocketx.WebSocketFrame;

import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;

import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;

import io.netty.util.AttributeKey;

import io.netty.util.CharsetUtil;

/**

* ClassName:MyWebSocketServerHandler Function: TODO ADD FUNCTION.

*

* @author hxy

*/

public class MyWebSocketServerHandler extends SimpleChannelInboundHandler {

private static final Logger logger = Logger.getLogger(WebSocketServerHandshaker.class.getName());

private WebSocketServerHandshaker handshaker;

/**

* channel 通道 action 活泼的 当客户端自动链接服务端的链接后,这个通道便是活泼的了。也便是客户端与服务端建立了通讯通道而且能够传输数据

*/

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

// 添加

Global.group.add(ctx.channel());

System.out.println("客户端与服务端衔接敞开:" + ctx.channel().remoteAddress().toString());

}

/**

* channel 通道 Inactive 不活泼的 当客户端自动断开服务端的链接后,这个通道便是不活泼的。也便是说客户端与服务端封闭了通讯通道而且不能够传输数据

*/

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

// 移除

Global.group.remove(ctx.channel());

System.out.println("客户端与服务端衔接封闭:" + ctx.channel().remoteAddress().toString());

}

/**

* 接纳客户端发送的音讯 channel 通道 Read 读 简而言之便是从通道中读取数据,也便是服务端接纳客户端发来的数据。可是这个数据在不进行解码时它是ByteBuf类型的

*/

@Override

protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {

// 传统的HTTP接入

if (msg instanceof FullHttpRequest) {

handleHttpRequest(ctx, ((FullHttpRequest) msg));

// WebSocket接入

} elsenetty结构完成websocket到达高并发 if (msg instanceof WebSocketFrame) {

System.out.println(handshaker.uri());

if("anzhuo".equals(ctx.attr(AttributeKey.valueOf("type")).get())){

handlerWebSocketFrame(ctx, (WebSocketFrame) msg);

}else{

handlerWebSocketFrame2(ctx, (WebSocketFrame) msg);

}

}

}

/**

* channel 通道 Read 读取 Complete 完结 在通道读取完结后会在这个办法里告诉,对应能够做改写操作 ctx.flush()

*/

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

ctx.flush();

}

private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

// 判别是否封闭链路的指令

if (frame instanceof CloseWebSocketFrame) {

System.out.println(1);

handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());

return;

}

// 判别是否ping音讯

if (frame instanceof PingWebSocketFrame) {

ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));

return;

}

// 本例程仅支撑文本音讯,不支撑二进制音讯

if (!(frame instanceof TextWebSocketFrame)) {

System.out.println("本例程仅支撑文本音讯,不支撑二进制音讯");

throw new UnsupportedOperationException(

String.format("%s frame types not supported", frame.getClass().getName()));

}

// 回来应对音讯

String request = ((TextWebSocketFrame) frame).text();

System.out.println("服务端收到:" + request);

if (logger.isLoggable(Level.FINE)) {

logger.fine(String.format("%s received %s", ctx.channel(), request));

}

TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request);

// 群发

Global.group.writeAndFlush(tws);

// 回来【谁发的发给谁】

// ctx.channel().writeAndFlush(tws);

}

private void handlerWebSocketFrame2(ChannelHandlerContext ctx, WebSocketFrame frame) {

// 判别是否封闭链路的指令

if (frame instanceof CloseWebSocketFrame) {

handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());

return;

}

// 判别是否ping音讯

if (frame instanceof PingWebSocketFrame) {

ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));

return;

}

// 本例程仅支撑文本音讯,不支撑二进制音讯

if (!(frame instanceof TextWebSocketFrame)) {

System.out.println("本例程仅支撑文本音讯,不支撑二进制音讯");

throw new UnsupportedOperationException(

String.format("%s frame types not supported", frame.getClass().getName()));

}

// 回来应对音讯

String request = ((TextWebSocketFrame) frame).text();

System.out.println("服务端2收到:" + request);

if (logger.isLoggable(Level.FINE)) {

logger.fine(String.format("%s received %s", ctx.channel(), request));

}

TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request);

// 群发

Global.group.writeAndFlush(tws);

// 回来【谁发的发给谁】

// ctx.channel().writeAndFlush(tws);

}

private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {

// 假如HTTP解码失利,回来HHTP反常

if (!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {

sendHttpResponse(ctx, req,

new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));

return;

}

//获取url后置参数

HttpMethod method=req.getMethod();

String uri=req.getUri();

QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);

Map> parameters = queryStringDecoder.parameters();

System.out.println(parameters.get("request").get(0));

if(method==HttpMethod.GET&&"/webssss".equals(uri)){

//....处理

ctx.attr(AttributeKey.valueOf("type")).set("anzhuo");

}else if(method==HttpMethod.GET&&"/websocket".equals(uri)){

//...处理

ctx.attr(AttributeKey.valueOf("type")).set("live");

}

// 结构握手呼应回来,本机测验

WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(

"ws://"+req.headers().get(HttpHeaders.Names.HOST)+uri, null, false);

handshaker = wsFactory.newHandshaker(req);

if (handshaker == null) {

WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());

} else {

handshaker.handshake(ctx.channel(), req);

}

}

private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {

// 回来应对给客户端

if (res.getStatus().code() != 200) {

ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);

res.content().writeBytes(buf);

buf.release();

}

// 假如对错Keep-Alive,封闭衔接

ChannelFuture f = ctx.channel().writeAndFlush(res);

if (!HttpHeaders.isKeepAlive(req) || res.getStatus().code() != 200) {

f.addListener(ChannelFutureListener.CLOSE);

}

}

/**

* exception 反常 Caught 捉住 捉住反常,当发作反常的时分,能够做一些相应的处理,比方打印日志、封闭链接

*/

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

cause.printStackTrace();

ctx.close();

}

}

以上便是netty-websocket的Demo了,应该现已解说的很具体了,一起应对的并发量也满意一般企业用于websocket的衔接,假如需求不行,能够用nginx负载均衡添加。

最终给我们一条主张,在实践项目中,别让这种长衔接一向坚持,在nginx中能够设置衔接无沟通超时断开,大约设置10分钟左右,然后每8分钟守时从服务端发送一条心跳,具体主意就看你们喽~

假如我们有疑问,能够私信我“学习”收取具体的视频材料。

请关注微信公众号
微信二维码
不容错过
Powered By Z-BlogPHP