分类 日常 下的文章

一:事件分离器

    在IO读写时,把 IO请求 与 读写操作 分离调配进行,需要用到事件分离器。根据处理机制的不同,事件分离器又分为:同步的Reactor和异步的Proactor。

    Reactor模型:

- 应用程序在事件分离器注册 读就绪事件 和 读就绪事件处理器
- 事件分离器等待读就绪事件发生
- 读就绪事件发生,激活事件分离器,分离器调用 读就绪事件处理器(即:可以进行读操作了,开始读)
- 读事件处理器开始进行读操作,把读到的数据提供给程序使用
    Proactor模型:

   - 应用程序在事件分离器注册 读完成事件 和 读完成事件处理器,并向操作系统发出异步读请求
  • 事件分离器等待操作系统完成读取
  • 在分离器等待过程中,操作系统利用并行的内核线程执行实际的读操作,并将结果数据存入用户自定义缓冲区,最后通知事件分离器读操作完成
  • 事件分离器监听到 读完成事件 后,激活 读完成事件的处理器
  • 读完成事件处理器 处理用户自定义缓冲区中的数据给应用程序使用

    同步和异步的区别就在于 读 操作由谁完成:同步的Reactor是指程序发出读请求后,由分离器监听到可以进行读操作时(需要获得读操作条件)通知事件处理器进行读操作,异步的Proactor是指程序发出读请求后,操作系统立刻异步地进行读操作了,读完之后在通知分离器,分离器激活处理器直接取用已读到的数据。
    
     二:同步阻塞IO(BIO)
    
     我们熟知的Socket编程就是BIO,一个socket连接一个处理线程(这个线程负责这个Socket连接的一系列数据传输操作)。阻塞的原因在于:操作系统允许的线程数量是有限的,多个socket申请与服务端建立连接时,服务端不能提供相应数量的处理线程,没有分配到处理线程的连接就会阻塞等待或被拒绝。
    
     三:同步非阻塞IO(NIO)

     New IO是对BIO的改进,基于Reactor模型。我们知道,一个socket连接只有在特点时候才会发生数据传输IO操作,大部分时间这个“数据通道”是空闲的,但还是占用着线程。NIO作出的改进就是“一个请求一个线程”,在连接到服务端的众多socket中,只有需要进行IO操作的才能获取服务端的处理线程进行IO。这样就不会因为线程不够用而限制了socket的接入。客户端的socket连接到服务端时,就会在事件分离器注册一个 IO请求事件 和 IO 事件处理器。在该连接发生IO请求时,IO事件处理器就会启动一个线程来处理这个IO请求,不断尝试获取系统的IO的使用权限,一旦成功(即:可以进行IO),则通知这个socket进行IO数据传输。

     NIO还提供了两个新概念:Buffer和Channel

Buffer:
– 是一块连续的内存块。
– 是 NIO 数据读或写的中转地。
Channel:
– 数据的源头或者数据的目的地
– 用于向 buffer 提供数据或者读取 buffer 数据 ,buffer 对象的唯一接口。
– 异步 I/O 支持

  Buffer作为IO流中数据的缓冲区,而Channel则作为socket的IO流与Buffer的传输通道。客户端socket与服务端socket之间的IO传输不直接把数据交给CPU使用,

而是先经过Channel通道把数据保存到Buffer,然后CPU直接从Buffer区读写数据,一次可以读写更多的内容。

  使用Buffer提高IO效率的原因(这里与IO流里面的BufferedXXStream、BufferedReader、BufferedWriter提高性能的原理一样):IO的耗时主要花在数据传输的路上,普通的IO是一个字节一个字节地传输,

而采用了Buffer的话,通过Buffer封装的方法(比如一次读一行,则以行为单位传输而不是一个字节一次进行传输)就可以实现“一大块字节”的传输。比如:IO就是送快递,普通IO是一个快递跑一趟,采用了Buffer的IO就是一车跑一趟。很明显,buffer效率更高,花在传输路上
的时间大大缩短。

      四:异步阻塞IO(AIO)

      NIO是同步的IO,是因为程序需要IO操作时,必须获得了IO权限后亲自进行IO操作才能进行下一步操作。AIO是对NIO的改进(所以AIO又叫NIO.2),它是基于Proactor模型的。每个socket连接在事件分离器注册 IO完成事件 和 IO完成事件处理器。程序需要进行IO时,向分离器发出IO请求并把所用的Buffer区域告知分离器,分离器通知操作系统进行IO操作,操作系统自己不断尝试获取IO权限并进行IO操作(数据保存在Buffer区),操作完成后通知分离器;分离器检测到 IO完成事件,则激活 IO完成事件处理器,处理器会通知程序说“IO已完成”,程序知道后就直接从Buffer区进行数据的读写。

      也就是说:AIO是发出IO请求后,由操作系统自己去获取IO权限并进行IO操作;NIO则是发出IO请求后,由线程不断尝试获取IO权限,获取到后通知应用程序自己进行IO操作。

NIO:(new io) 同步非阻塞IO
服务端:

package com.ldyz;


import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Server {
    private Selector selector;
    private ExecutorService tp = Executors.newCachedThreadPool();
    private static Map<Socket,Long> geym_time_stat  = new HashMap<Socket,Long>(10240);
    class EchoClient{
        private LinkedList<ByteBuffer> outq;
        EchoClient(){
            outq = new LinkedList<ByteBuffer>();
        }
        public LinkedList<ByteBuffer> getOutputQueue(){
            return outq;
        }
        public void enqueue(ByteBuffer bb){
            outq.addFirst(bb);
        }
    }
    class HandleMsg implements Runnable{
        SelectionKey sk;
        ByteBuffer bb;
        public HandleMsg(SelectionKey sk,ByteBuffer bb){
            this.sk = sk;
            this.bb =  bb;
        }

        @Override
        public void run(){
            EchoClient echoClient = (EchoClient)sk.attachment();
            echoClient.enqueue(bb);
            sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
            //设置SK为写入 然后唤醒selector
            selector.wakeup();
        }
    }
    private void doAccept(SelectionKey sk){
        ServerSocketChannel server = (ServerSocketChannel)sk.channel();
        SocketChannel clientChannel;
        try{
            clientChannel = server.accept();
            clientChannel.configureBlocking(false);

            //register the channel for reading
            SelectionKey clientKey = clientChannel.register(selector,SelectionKey.OP_READ);
            //Allocate an EchoClient instance and attach it to this selection key
            //对每个一个客户实例都附加一个EchoClient
            EchoClient echoClient = new EchoClient();
            clientKey.attach(echoClient);

            InetAddress clientAddress = clientChannel.socket().getInetAddress();
            System.out.println("Accepted connection from "+clientAddress.getHostAddress()+".");
        }catch(Exception e){
            System.out.println("Faild to accept new client");
            e.printStackTrace();
        }
    }
    private void doRead(SelectionKey sk){
        SocketChannel channel = (SocketChannel)sk.channel();
        ByteBuffer bb = ByteBuffer.allocate(8192);//最多8192Bye
        int len;
        try{
            len = channel.read(bb);
            if(len < 0){
                disconnect(sk);
                return;
            }
        }catch (Exception e){
            System.out.println("Fail to read from client");
            e.printStackTrace();
            disconnect(sk);
            return;
        }
        bb.flip();
        //收到信息之后启动线程发送回去
        tp.execute(new HandleMsg(sk,bb));
    }
    private void doWrite(SelectionKey sk){
        SocketChannel channel = (SocketChannel)sk.channel();
        EchoClient echoClient = (EchoClient)sk.attachment();
        LinkedList<ByteBuffer> outq = echoClient.getOutputQueue();

        ByteBuffer bb = outq.getLast();
        try{
            int len = channel.write(bb);
            if(len == -1){
                disconnect(sk);
                return;
            }
            if(bb.remaining() == 0){
                outq.removeLast();
            }
        }catch (Exception e){
            e.printStackTrace();
            System.out.println("fail to write to client");
            disconnect(sk);
        }
        if(outq.size() == 0){
            sk.interestOps(SelectionKey.OP_READ);
        }
    }
    private void disconnect(SelectionKey sk){
        SocketChannel sc = (SocketChannel)sk.channel();
        try{
            sc.finishConnect();
        }catch (IOException e){

        }
    }
    public Server(int port) throws IOException {
        selector = SelectorProvider.provider().openSelector();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);//非阻塞
        //服务器端口
        InetSocketAddress isa = new InetSocketAddress(port);
        //socket 绑定在这个端口
        ssc.socket().bind(isa);
        //把一个socketchannel注册到selector上,同时选择监听的事件 acceptKey
        SelectionKey acceptKey = ssc.register(selector,SelectionKey.OP_ACCEPT);
        for(;;){
            selector.select();
            Set readyKeys = selector.selectedKeys();//获取Key
            Iterator i = readyKeys.iterator();
            long e = 0;
            while(i.hasNext()){
                SelectionKey sk = (SelectionKey)i.next();
                i.remove();
                if(sk.isAcceptable()){//可接受
                    doAccept(sk);
                }else if(sk.isValid() && sk.isReadable()){//有效且可读
                    if(!geym_time_stat.containsKey((SocketChannel)sk.channel())){
                        geym_time_stat.put(((SocketChannel)sk.channel()).socket(),System.currentTimeMillis());//记入读取时间
                        doRead(sk);
                    }
                }else if(sk.isValid() && sk.isWritable()){//有效且可写
                    doWrite(sk);
                    e = System.currentTimeMillis();
                    long b = geym_time_stat.remove(((SocketChannel) sk.channel()).socket());//记录写入时间得处理时间
                    System.out.println("speed"+(e-b)+"ms");
                }
            }
        }
    }
}

客户端:

package com.ldyz;


import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.LockSupport;

public class Main {
    static ExecutorService es = Executors.newCachedThreadPool();
    static Long sleep_time = 1000*1000*1000L;
    public static class EchoClient implements Runnable{
        @Override
        public void run(){
            Socket client = null;
            PrintWriter writer = null;
            BufferedReader reader = null;
            try{
                client = new Socket();
                client.connect(new InetSocketAddress("localhost",10086));
                writer = new PrintWriter(client.getOutputStream(),true);
                writer.print("h");
                LockSupport.parkNanos(sleep_time);
                writer.print("e");
                LockSupport.parkNanos(sleep_time);
                writer.print("l");
                LockSupport.parkNanos(sleep_time);
                writer.print("l");
                LockSupport.parkNanos(sleep_time);
                writer.print("o");
                LockSupport.parkNanos(sleep_time);
                writer.print("!");
                LockSupport.parkNanos(sleep_time);
                writer.println();
                writer.flush();
                reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
                System.out.println("from Server:"+reader.readLine());
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                if(writer != null){
                    writer.close();
                }
                if(reader != null){
                    try {
                        reader.close();
                    }catch (IOException ex){
                        ex.printStackTrace();
                    }
                }
                if(client != null){
                    try{
                        client.close();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    public static void main(String[] args)  {
        EchoClient ec = new EchoClient();
        for (int i = 0;i<10;i++){
            es.execute(ec);
        }
    // write your code here
    }
}

第一话:新鲜屋
第二话:名门私餐厅
第三话:鬼北居酒屋
第四话:大阪双子烤肉(过500赞就填坑)