Nio

NIO编程

什么是NIO

NIO编程是jdk1.4在IO基础之上进行改进而产生的。

Java NIO(New IO)是一个可以替代标准Java IO API的IO API(从Java 1.4开始),Java NIO提供了与标准IO不同的IO工作方式。

Java NIO: Channels and Buffers(通道和缓冲区)

标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。

Java NIO: Non-blocking IO(非阻塞IO)

Java NIO可以让你非阻塞的使用IO,例如:当线程从通道读取数据到缓冲区时,线程还是可以进行其他事情。当数据被写入到缓冲区时,线程可以继续处理它。从缓冲区写入通道也类似。

Java NIO: Selectors(选择器)

Java NIO引入了选择器的概念,选择器用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个的线程可以监听多个数据通道。

注意:传统IO是单向。 NIO类似

NIO和IO的区别

​ IO是阻塞的,面向流的,网络通信时,客户端访问服务器时,就会阻塞,直到关闭之前都不能再继续访问,如果需要继续用另外的客户端访问就可以用多线程的方法,新建链接时,就打开新的线程,才能避免阻塞的情况。称之为伪异步,读写都是单向的。

ss

​ NIO(new io)中有一个非阻塞IO的,面向缓冲区的,保留了IO本质上的读写操作,但是对数据的存放都是在缓冲区里面进行操作,而且是双向的,通道传送数据

ss

IO NIO
面向流 面向缓冲区
阻塞IO 非阻塞IO
选择器

Buffer


import java.nio.Buffer;
import java.nio.ByteBuffer;

import org.junit.Test;

/**
 *
 * 缓冲区是NIO 提高给传输文件和通道一起配合使用,存储数据.<br>
 * Buffer<br>
 * ByteBuffer<br>
 * LongBuffer<br>
 * InteigBuffer<br>
 * FloatBuffer<br>
 * DubboBuffer<br>
 *
 */
public class BuffTest {
	/**
	 * position<br> 缓冲区正在操作的位置 默认从0开始。
	 * limit<br>  界面(缓冲区可用大小)
	 * capacity;<br> 缓冲区最大容量,一旦声明不能改变
	 *
	 * 核心方法:
	 * put() 往buff存放数据
	 * get() 获取数据
	 *
	 *  */
	@Test
	public void test001() {
		try {
			// 初始化byteBuffer大小
			ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
		    System.out.println(byteBuffer.position());
		    System.out.println(byteBuffer.limit());
		    System.out.println(byteBuffer.capacity());
		    System.out.println("----------往bytebuff存放数据....----------");
		    System.out.println("");
		    byteBuffer.put("abcd1".getBytes());
		    System.out.println(byteBuffer.position());
		    System.out.println(byteBuffer.limit());
		    System.out.println(byteBuffer.capacity());
		    System.out.println("----------读取值...----------");
		    //开启读取模式
		    byteBuffer.flip();
		    System.out.println("position:"+byteBuffer.position());
		    System.out.println(byteBuffer.limit());
		    System.out.println(byteBuffer.capacity());
		    byte[] bytes=    new byte[byteBuffer.limit()];
		    byteBuffer.get(bytes);
		    System.out.println(new String(bytes,0,bytes.length));

		    System.out.println("----------重复读取----------");
		    byteBuffer.rewind();//重复读取
		    System.out.println("position:"+byteBuffer.position());
		    System.out.println(byteBuffer.limit());
		    System.out.println(byteBuffer.capacity());
		    byte[] bytes2=    new byte[byteBuffer.limit()];
		    byteBuffer.get(bytes2);
		    System.out.println(new String(bytes2,0,bytes2.length));
		    //清空缓冲数据被遗忘值...
		    System.out.println("----------清空缓冲区--------------");
		    byteBuffer.clear();
		    System.out.println("position:"+byteBuffer.position());
		    System.out.println(byteBuffer.limit());
		    System.out.println(byteBuffer.capacity());
		    System.out.println((char)byteBuffer.get());

		} catch (Exception e) {
			e.printStackTrace();
		}
	}

make与rest用法

标记(mark)与重置(reset):标记是一个索引,通过Buffer中的mark()方法指定Buffer中一个特定的position,之后可以通过调用reset()方法恢复到这个position。

import java.nio.ByteBuffer;

public class Test002 {

	public static void main(String[] args) {
		ByteBuffer byteBuffer = ByteBuffer.allocate(10);
		String str = "abcd";
		byteBuffer.put(str.getBytes());
		// 开启读的模式
		byteBuffer.flip();
		byte[] bytes = new byte[byteBuffer.limit()];
		byteBuffer.get(bytes, 0, 2);
		byteBuffer.mark();  ///打印标记
		System.out.println(new String(bytes, 0, 2));
		System.out.println(byteBuffer.position());
		System.out.println("----------------------------");
		//byteBuffer.reset();//还原到mark位置
		byteBuffer.get(bytes, 2, 2);
		System.out.println(new String(bytes, 2, 2));
		byteBuffer.reset();
		System.out.println("重置还原到make标记..");
		System.out.println(byteBuffer.position());
	}
//没什么用
}


缓存区

非直接缓冲区:通过 allocate() 方法分配缓冲区,将缓冲区建立在 JVM 的内存中,需要来回拷贝。IO就是非直接

直接缓冲区:通过 allocateDirect() 方法分配直接缓冲区,将缓冲区建立在物理内存中。可以提高效率,不需要来回拷贝,效率高,但是不安全,而且内存占用量大。


通道(Channel)

通道表示打开到 IO 设备(例如:文件、套接字)的连接。若需要使用 NIO 系统,需要获取用于连接 IO 设备的通道以及用于容纳数据的缓冲区。然后操作缓冲区,对数据进行处理。Channel 负责传输, Buffer 负责存储。通道是由 java.nio.channels 包定义的。 Channel 表示 IO 源与目标打开的连接。Channel 类似于传统的“流”。只不过 Channel本身不能直接访问数据, Channel 只能与Buffer 进行交互。

java.nio.channels.Channel 接口:
  		|--FileChannel
  		|--SocketChannel
  		|--ServerSocketChannel
  		|--DatagramChannel

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;

import org.junit.Test;

public class Test003 {

	//直接缓冲区
	@Test
	public void test002() throws IOException {
		long statTime=System.currentTimeMillis();
		//创建管道
		FileChannel   inChannel=	FileChannel.open(Paths.get("f://1.mp4"), StandardOpenOption.READ);
		FileChannel   outChannel=	FileChannel.open(Paths.get("f://2.mp4"), StandardOpenOption.READ,StandardOpenOption.WRITE, StandardOpenOption.CREATE);
	    //定义映射文件
		MappedByteBuffer inMappedByte = inChannel.map(MapMode.READ_ONLY,0, inChannel.size());
		MappedByteBuffer outMappedByte = outChannel.map(MapMode.READ_WRITE,0, inChannel.size());
		//直接对缓冲区操作
		byte[] dsf=new byte[inMappedByte.limit()];
		inMappedByte.get(dsf);
		outMappedByte.put(dsf);
		inChannel.close();
		outChannel.close();
		long endTime=System.currentTimeMillis();
		System.out.println("操作直接缓冲区耗时时间:"+(endTime-statTime));
	}

	// 非直接缓冲区 读写操作
	@Test
	public void test001() throws IOException {
		long statTime=System.currentTimeMillis();
		// 读入流
		FileInputStream fst = new FileInputStream("f://1.mp4");
		// 写入流
		FileOutputStream fos = new FileOutputStream("f://2.mp4");
		// 创建通道
		FileChannel inChannel = fst.getChannel();
		FileChannel outChannel = fos.getChannel();
		// 分配指定大小缓冲区
		ByteBuffer buf = ByteBuffer.allocate(1024);
		while (inChannel.read(buf) != -1) {
			// 开启读取模式
			buf.flip();
			// 将数据写入到通道中
			outChannel.write(buf);
			buf.clear();
		}
		// 关闭通道 、关闭连接
		inChannel.close();
		outChannel.close();
		fos.close();
		fst.close();
		long endTime=System.currentTimeMillis();
		System.out.println("操作非直接缓冲区耗时时间:"+(endTime-statTime));
	}

}

分散读取与聚集写入

简单理解字面意思,一个通道对应多个缓冲区进行读文件,多个缓冲区又通过一个通道集中写入

RandomAccessFile raf1 = new RandomAccessFile("test.txt", "rw");
		// 1.获取通道
		FileChannel channel = raf1.getChannel();
		// 2.分配指定大小的指定缓冲区
		ByteBuffer buf1 = ByteBuffer.allocate(100);
		ByteBuffer buf2 = ByteBuffer.allocate(1024);
		// 3.分散读取
		ByteBuffer[] bufs = { buf1, buf2 };
		channel.read(bufs);
		for (ByteBuffer byteBuffer : bufs) {
			// 切换为读取模式
			byteBuffer.flip();
		}
		System.out.println(new String(bufs[0].array(), 0, bufs[0].limit()));
		System.out.println("------------------分算读取线分割--------------------");
		System.out.println(new String(bufs[1].array(), 0, bufs[1].limit()));
		// 聚集写入
		RandomAccessFile raf2 = new RandomAccessFile("2.txt", "rw");
		FileChannel channel2 = raf2.getChannel();
		channel2.write(bufs);

字符集 Charset

编码:字符串->字节数组

解码:字节数组 -> 字符串

public class Test005 {

	public static void main(String[] args) throws CharacterCodingException {
		// 获取编码器
		Charset cs1 = Charset.forName("GBK");
		// 获取编码器
		CharsetEncoder ce = cs1.newEncoder();
		// 获取解码器
		CharsetDecoder cd = cs1.newDecoder();
		CharBuffer cBuf = CharBuffer.allocate(1024);
		cBuf.put("牛逼!");
		cBuf.flip();
		// 编码
		ByteBuffer bBuf = ce.encode(cBuf);
		for (int i = 0; i < 12; i++) {
			System.out.println(bBuf.get());
		}
		// 解码
		bBuf.flip();
		CharBuffer cBuf2 = cd.decode(bBuf);
		System.out.println(cBuf2.toString());
		System.out.println("-------------------------------------");
		Charset cs2 = Charset.forName("GBK");
		bBuf.flip();
		CharBuffer cbeef = cs2.decode(bBuf);
		System.out.println(cbeef.toString());
	}

}

编码与解码类型不匹配时会报错。

什么是阻塞

阻塞概念:应用程序在获取网络数据的时候,如果网络传输很慢,那么程序就一直等着,直接到传输完毕。

同步:执行一个操作之后,等待结构,然后才继续执行后续的操作、

异步:执行一个操作之后,可以去执行其他的操作,然后等待通知在归来执行刚才没执行玩的操作

阻塞:进程给CPU传达一个任务之后,一直等待CPU处理完成,然后才执行后面的操作

非阻塞:进程给CPU传达一个任务之后,继续处理后续的操作,隔断时间在来询问之前的操作是否完成。这样的过程其实也叫做轮询

什么是非阻塞

应用程序直接可以获取已经准备好的数据,无需等待.

IO为同步阻塞形式,NIO为同步非阻塞形式。NIO没有实现异步,在JDK1.7之后,升级了NIO库包

,支持异步费阻塞通讯模型NIO2.0(AIO)

//nio   异步非阻塞
class Client {

	public static void main(String[] args) throws IOException {
		System.out.println("客户端已经启动....");
		// 1.创建通道
		SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8080));
		// 2.切换异步非阻塞
		sChannel.configureBlocking(false);
		// 3.指定缓冲区大小
		ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
		Scanner scanner=  new Scanner(System.in);
		while (scanner.hasNext()) {
			String str=scanner.next();
			byteBuffer.put((new Date().toString()+"\n"+str).getBytes());
			// 4.切换读取模式
			byteBuffer.flip();
			sChannel.write(byteBuffer);
			byteBuffer.clear();
		}
		sChannel.close();
	}

}

// nio
class Server {
	public static void main(String[] args) throws IOException {
		System.out.println("服务器端已经启动....");
		// 1.创建通道
		ServerSocketChannel sChannel = ServerSocketChannel.open();
		// 2.切换读取模式
		sChannel.configureBlocking(false);
		// 3.绑定连接
		sChannel.bind(new InetSocketAddress(8080));
		// 4.获取选择器
		Selector selector = Selector.open();
		// 5.将通道注册到选择器 "并且指定监听接受事件"
		sChannel.register(selector, SelectionKey.OP_ACCEPT);
		// 6. 轮训式 获取选择 "已经准备就绪"的事件
		while (selector.select() > 0) {
			// 7.获取当前选择器所有注册的"选择键(已经就绪的监听事件)"
			Iterator<SelectionKey> it = selector.selectedKeys().iterator();
			while (it.hasNext()) {
				// 8.获取准备就绪的事件
				SelectionKey sk = it.next();
				// 9.判断具体是什么事件准备就绪
				if (sk.isAcceptable()) {
					// 10.若"接受就绪",获取客户端连接
					SocketChannel socketChannel = sChannel.accept();
					// 11.设置阻塞模式
					socketChannel.configureBlocking(false);
					// 12.将该通道注册到服务器上
					socketChannel.register(selector, SelectionKey.OP_READ);
				} else if (sk.isReadable()) {
					// 13.获取当前选择器"就绪" 状态的通道
					SocketChannel socketChannel = (SocketChannel) sk.channel();
					// 14.读取数据
					ByteBuffer buf = ByteBuffer.allocate(1024);
					int len = 0;
					while ((len = socketChannel.read(buf)) > 0) {
						buf.flip();
						System.out.println(new String(buf.array(), 0, len));
						buf.clear();
					}
				}
				it.remove();
			}
		}

	}
}


Netty框架

​ Netty 是一个基于 JAVA NIO 类库的异步通信框架,它的架构特点是:异步非阻塞、基于事件驱动、高性能、高可靠性和高可定制性。解决nio代码复杂问题,容错机制。

Netty应用场景

1.分布式开源框架中dubbo、Zookeeper,RocketMQ底层rpc通讯使用就是netty。

2.游戏开发中,底层使用netty通讯。

为什么选择netty

在本小节,我们总结下为什么不建议开发者直接使用JDK的NIO类库进行开发的原因:

1) NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等;

2) 需要具备其它的额外技能做铺垫,例如熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序;

3) 可靠性能力补齐,工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大;

4) JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该bug发生概率降低了一些而已,它并没有被根本解决。

netty服务端

class ServerHandler extends SimpleChannelHandler {


	/**
	 * 通道关闭的时候触发
	 */
	@Override
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		System.out.println("channelClosed");
	}

	/**
	 * 必须是连接已经建立,关闭通道的时候才会触发.
	 */
	@Override
	public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		super.channelDisconnected(ctx, e);
		System.out.println("channelDisconnected");
	}

	/**
	 * 捕获异常
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
		super.exceptionCaught(ctx, e);
		System.out.println("exceptionCaught");

	}

	/**
	 * 接受消息
	 */
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		super.messageReceived(ctx, e);
//		System.out.println("messageReceived");
		System.out.println("服务器端收到客户端消息:"+e.getMessage());
		//回复内容
		ctx.getChannel().write("好的");
	}

}
// netty 服务器端
public class NettyServer {

	public static void main(String[] args) {
		// 创建服务类对象
		ServerBootstrap serverBootstrap = new ServerBootstrap();
		// 创建两个线程池 分别为监听监听端口 ,nio监听
		ExecutorService boos = Executors.newCachedThreadPool();
		ExecutorService worker = Executors.newCachedThreadPool();
		// 设置工程 并把两个线程池加入中
		serverBootstrap.setFactory(new NioServerSocketChannelFactory(boos, worker));
		// 设置管道工厂
		serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {

			public ChannelPipeline getPipeline() throws Exception {
				ChannelPipeline pipeline = Channels.pipeline();
				//将数据转换为string类型.
				pipeline.addLast("decoder", new StringDecoder());
				pipeline.addLast("encoder", new StringEncoder());
				pipeline.addLast("serverHandler", new ServerHandler());
				return pipeline;
			}
		});
		// 绑定端口号
		serverBootstrap.bind(new InetSocketAddress(9090));
		System.out.println("netty server启动....");
	}


}

netty客户端

import java.net.InetSocketAddress;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
class ClientHandler extends SimpleChannelHandler {


	/**
	 * 通道关闭的时候触发
	 */
	@Override
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		System.out.println("channelClosed");
	}

	/**
	 * 必须是连接已经建立,关闭通道的时候才会触发.
	 */
	@Override
	public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		super.channelDisconnected(ctx, e);
		System.out.println("channelDisconnected");
	}

	/**
	 * 捕获异常
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
		super.exceptionCaught(ctx, e);
		System.out.println("exceptionCaught");

	}

	/**
	 * 接受消息
	 */
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		super.messageReceived(ctx, e);
//		System.out.println("messageReceived");
		System.out.println("服务器端向客户端回复内容:"+e.getMessage());
		//回复内容
//		ctx.getChannel().write("好的");
	}

}
public class NettyClient {

	public static void main(String[] args) {
		System.out.println("netty client启动...");
		// 创建客户端类
		ClientBootstrap clientBootstrap = new ClientBootstrap();
		// 线程池
		ExecutorService boos = Executors.newCachedThreadPool();
		ExecutorService worker = Executors.newCachedThreadPool();
		clientBootstrap.setFactory(new NioClientSocketChannelFactory(boos, worker));
		clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {

			public ChannelPipeline getPipeline() throws Exception {
				ChannelPipeline pipeline = Channels.pipeline();
				// 将数据转换为string类型.
				pipeline.addLast("decoder", new StringDecoder());
				pipeline.addLast("encoder", new StringEncoder());
				pipeline.addLast("clientHandler", new ClientHandler());
				return pipeline;

			}
		});
		//连接服务端
		ChannelFuture connect = clientBootstrap.connect(new InetSocketAddress("127.0.0.1", 9090));
		Channel channel = connect.getChannel();
		System.out.println("client start");
		Scanner scanner=	new Scanner(System.in);
		while (true) {
			System.out.println("请输输入内容...");
			channel.write(scanner.next());
		}
	}



}


同步和异步是相对于操作结果来说,会不会等待结果返回。

阻塞就是说在煮水的过程中,你不可以去干其他的事情,非阻塞就是在同样的情况下,可以同时去干其他的事情。阻塞和非阻塞是相对于线程是否被阻塞。

Class Singleton{
    //懒汉 面试写这个
    private static Singleton instance;
    private Singleton(){}
    public static Singleton getInstance(){
        if(singleton==null){
            synchronized(Singleton.class){
                if(singleton == null){
                    return new Instance();
                }
            }
        }
        return singleton;
    }

}


Class Singleton{
    //饿汉
    private static Singleton instance= new Instance();
    private Singleton(){}
    public static Singleton getInstance(){
        return instance;
    }
}

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦