欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

【NIO】Chapter 4. Selectors

系統 2008 0

A single thread can monitor large numbers of channels with readiness selection.

The Selectorand related classes provide the APIs todo readiness selection on channels.

?

Selector Basics

You register one or more previously created selectable channels with a selector object.

?

A key that represents the relationship between one channel and oneselector is returned.

?

Selection keys remember what you are interested in for eachchannel.

?

They also track the operations of interest that their channel is currently ready to perform.

?

When you invoke select() on a selector object, the associated keys are updated by checking all the channels registered with that selector.

?

You can obtain a set of the keys whose channels were found to be ready at that point.

?

By iterating over these keys, you can service each channel that has become ready since the last time you invoked select().

?

?

事件驅動模型

注冊感興趣的事件到Selector中,當某個channel上發生了注冊的事件,將會得到通知。

?

The real power of readiness selection is that a potentially large number of channels can bechecked for readiness simultaneously.

?

Optionally, the invoking thread can ask to be put to sleep until one or more of the channels registered with the Selectoris ready, or it can periodically poll the selector to see if anything has become ready since the last check.

?

睡眠或者輪詢,以便發現某個事件已處于準備完成的狀態

?

True readiness selection must be done by the operating system.

One of the most important functions performed by an operating system is to handle I/O requests and notify processes when their data is ready.

Soit only makes sense to delegate this function down to the operating system.

The Selectorclass provides the abstraction by which Java code can request readiness selection service from the underlying operating system in a portable way.

?

由底層操作系統通知Java進程發生了某個事件需要處理,然后再傳遞到Selection中。

?

The Selector, SelectableChannel, and SelectionKey

?

Selector

The Selectorclass manages information about a set of registered channels and
their readiness states. Channels are registered with selectors, and a selector can be
asked to update the readiness states of the channels currently registered with it.
When doing so, the invoking thread can optionally indicate that it would prefer to
be suspended until one of the registered channels is ready.?

?

SelectableChannel

This abstract class provides the common methods needed to implement channel
selectability. It's the superclass of all channel classes that support readiness
selection. FileChannelobjects are not selectable because they don't extend from
SelectableChannel. All the socket channel classes are selectable,
as well as the channels obtained from a Pipeobject. SelectableChannelobjects
can be registered with Selectorobjects, along with an indication of which
operations on that channel are of interestfor that selector. A channel can be
registered with multiple selectors, but only once per selector.

?

SelectionKey

A SelectionKeyencapsulates the registration relationship between a specific
channel and a specific selector. A SelectionKeyobject is returned from
SelectableChannel.register()and serves as a token representing the registration.
SelectionKeyobjects contain two bit sets (encoded as integers) indicating which
channel operations the registrant has aninterest in and which operations the
channel is ready to perform.

?

A channel must first be placed in nonblocking mode (by calling configureBlocking(false)) before it can be registered with a selector.


【NIO】Chapter 4. Selectors
?

A selector maintains a set of channels to monitor.

The important thing is to remember that the Selector object controls the selection process for
the channels registered with it.

?

Selectors are the managing objects, not the selectable channel objects.

The Selectorobject performs readiness selection of channels registered with it and manages selection keys.

?

Setting Up Selectors

      Selector selector = Selector.open(); 
channel1.register (selector, SelectionKey.OP_READ); 
channel2.register (selector, SelectionKey.OP_WRITE); 
channel3.register (selector, SelectionKey.OP_READ | 
SelectionKey.OP_WRITE); 
// Wait up to 10 seconds for a channel to become ready 
readyCount = selector.select (10000); 
    

?

There are four defined selectable operations: read , write , connect , and accept .

      public static final int OP_READ
public static final int OP_WRITE
public static final int OP_CONNECT
public static final int OP_ACCEPT 
    

?

Not all operations are supported on all selectable channels. A SocketChannel
cannot do an accept, for example.

?

Channels are not immediately deregistered when the associated key is cancelled. They remain
registered until the next selection operation occurs .

?

Using Selection Keys

A key represents the registration of a particular channel object with a particular selector object.

?

When it's time to terminate that relationship, call the cancel()method on the SelectionKeyobject.

A key can be checked to see if it still represents a valid registration by calling its isValid()method. When a key is cancelled, it's placed in the cancelled set of the associated selector.

The registration is not immediately terminated, but the key is immediately invalidated,any cancelled keys will be cleared from the cancelled key set, and the corresponding deregistrations will be completed.

?

A channel can be registered with many selectors

?

When a channel is closed:

all keys associated with it are automatically cancelled

When a selector is closed:

all channels registered with that selector are deregistered, and the associated keys are
invalidated (cancelled).

When a key is cancelled:

calling any of its methods related to selection will throw a CancelledKeyException.

?

      if ((key.readyOps() & SelectionKey.OP_READ) != 0) 
{ 
myBuffer.clear(); 
key.channel().read (myBuffer); 
doSomethingWithBuffer (myBuffer.flip()); 
} 

if (key.isWritable()) 
//is equivalent to: 
if ((key.readyOps() & SelectionKey.OP_WRITE) != 0)
    

?

?

attach (Object ob)

This is a convenience that allows you to associate an arbitrary object with a key. This object can be a reference to anything meaningful to you, such as a business object , session handle , another channel, etc . This allows you to iterate through the keys associated with a selector, using the attached object handle on each as a reference to retrieve the associated context.

?

The attach()method stores the provided object reference in the key object.

?

If the selection key is long-lived, but the object you attach should not be, remember to clear the attachment when you're done. Otherwise, your attached object will not be garbage collected, and you may have a memory leak .

?

      SelectionKey key = channel.register (selector, SelectionKey.OP_READ, 
myObject); 

//is equivalent to this
SelectionKey key = channel.register (selector, SelectionKey.OP_READ); 
key.attach (myObject); 
    

?

?

The Selection Process

Each Selectorobject maintains three sets of keys:?

Registered key set

The set of currently registered keys associated with the selector. Not every
registered key is necessarily still valid. This set is returned by the keys()method
and may be empty. The registered key set is not directly modifiable;

?

Selected key set

A subset of the registered key set. Each member of this set is a key whose
associated channel was determined by the selector (during a prior selection
operation) to be ready for at least one of the operations in the key's interest set.
This set is returned by the selectedKeys()method (and may be empty).?

Keys can be directly removed from this set, but not added.

?

Cancelled key set

A subset of the registered key set, this set contains keys whose cancel()methods
have been called (the key has been invalidated), but they have not been
deregistered. This set is private to the selector object and cannot be accessed
directly.

?

Managing Selection Keys

The way to clear the ready set of a SelectionKeyis to remove the key itself from the set
of selected keys.

The ready set of a selection key is modified only by the Selectorobject

?

The conventional approach is to perform a select() call on the selector (which updates the selected key set) then iterate over the set of keys returned by selectedKeys().

As each key is examined in turn, the associated channel is dealt with according to the key's ready set.

The key is then removed from the selected key set (by calling remove() on the Iterator object), and the next key is examined.
When complete, the cycle repeats by calling select()again.

?

This example is bad:

reading the data synchronously in the main thread.

      package com.java.nio;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.logging.Logger;

public class SelectSockets {
	
	private final Logger logger = Logger.getLogger(getClass().getName());  

	private static int PORT_NUMBER = 1234;
	
	public static void main(String[] argv) throws Exception {
		new SelectSockets().go(argv);
	}

	private void go(String[] argv) throws Exception  {
		int port = PORT_NUMBER;
		if(argv.length>0) {
			port = Integer.parseInt(argv[0]);
		}
		logger.info("Listen on port " + port);
		
		//Allocate an unbound server socket channel
		ServerSocketChannel serverChannel = ServerSocketChannel.open();
		
		//Get the associated SeverSocket to bind it with
		ServerSocket serverSocket = serverChannel.socket();
		
		//Create a new Selector for use below
		Selector selector = Selector.open();
		
		//Set the port the server channel will listen to 
		serverSocket.bind(new InetSocketAddress(port));
		
		//Set nonblocking mode for the listening socket
		serverChannel.configureBlocking(false);
		
		//Register the ServerSocketChannle with the Selector
		serverChannel.register(selector, SelectionKey.OP_ACCEPT);
		
		while(true) {
			//This may block for a long time. Upon returning, the selected set contains keys of the ready channels.
			int n = selector.select();
			
			if(n==0)
				continue;
			
			//Get an iterator over the set of selected key
			Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
			
			//Look for each key in the selected set
			while(iter.hasNext()) {
				SelectionKey key = (SelectionKey)iter.next();
				
				//Is a new connection come in ?
				if(key.isAcceptable()) {
					ServerSocketChannel server = (ServerSocketChannel)key.channel();
					SocketChannel channel = server.accept();
					registerChannel(selector, channel, SelectionKey.OP_READ);
					
					sayHello(channel);
				} else if(key.isReadable()) {
					readDataFromSocket(key);
				}
				
				//Remove key from selected set; because it's been handled!
				iter.remove();
			}
		}
		
	}
	
	/**
	 * Use the same byte buffer for all channels. 
	 * A single thread is servicing all the channels, so no danger of concurrent acccess.
	 */
	private ByteBuffer buffer = ByteBuffer.allocate(1024);
	
	/**
	 * Sample data handler method for a channel with data ready to read.
	 */
	protected void readDataFromSocket(SelectionKey key) throws Exception {
		SocketChannel socketChannel = (SocketChannel)key.channel();
		int count;
		
		buffer.clear();//Empty buffer
		//Loop while data is available; channel is nonblocking
		while((count=socketChannel.read(buffer))>0) {
			buffer.flip();//Make buffer readable
			//Send the data;don't assume it goes all at once
			while(buffer.hasRemaining()) {
				socketChannel.write(buffer);//change it!
			}
			
			buffer.clear();//Empty buffer
		}
              	if(count<0) 
		     socketChannel.close();// Close channel on EOF, invalidates the key
	}

	/**
	 * A greeting to the incoming client connection. 
	 * @throws Exception 
	 */
	private void sayHello(SocketChannel channel) throws Exception {
		buffer.clear();
		buffer.put("Hi there!\r\n".getBytes());
		
		buffer.flip();
		channel.write(buffer);
	}

	/**
	 * Register the given channel with the given selector for the given operations of interest
	 */
	protected void registerChannel(Selector selector, SocketChannel channel,
			int ops) throws Exception {
		if(channel == null)
			return; //could happen
		
		//Set the new channel nonblocking
		channel.configureBlocking(false);
		
		//Register it with the selector
		channel.register(selector, ops);
	}
	
}

    

?

This example is good:

Uses a thread pool to service channels with data to read.

Passes the SelectionKey object to a worker thread for servicing.

SelectionKeySet被多線程操作是不安全的,但是可以把key分配給不同的線程去執行。

?

A better approach is to use one selector for all selectable channels and delegate the
servicing of ready channels to other threads.

You have a single point to monitor channel readiness and a decoupled pool of worker threads to handle the incoming data. The thread pool size can be tuned (or tune itself, dynamically) according to deployment conditions.
Management of selectable channels remains simple, and simple is good.?

使用一個selector對所有通道進行監測,委派“就緒”狀態的通道給其它線程去執行。

1個線程負責監聽channel是否處于某種就緒狀態

1個線程池負責與就緒狀態的通道進行交互(接受請求,讀取數據,寫出數據)

?

示例中的線程池部分需要使用Concurrent包中的線程池進行替換

?

服務端:

      package com.java.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.logging.Logger;

public class SelectSocketsThreadPool {
	
	private final Logger logger = Logger.getLogger(getClass().getName());
	
	private static final int MAX_THREADS = 5;
	
	private static int PORT_NUMBER = 1234;
	
	private ByteBuffer buffer = ByteBuffer.allocate(1024);
	
	private ThreadPool pool = new ThreadPool(MAX_THREADS);
	
	public static void main(String[] args) throws Exception {
		new SelectSocketsThreadPool().go(args);
	}
	

	private void go(String[] argv) throws Exception  {
		int port = PORT_NUMBER;
		if(argv.length>0) {
			port = Integer.parseInt(argv[0]);
		}
		logger.info("------>>>Listen on port " + port);
		
		//Allocate an unbound server socket channel
		ServerSocketChannel serverChannel = ServerSocketChannel.open();
		
		//Get the associated SeverSocket to bind it with
		ServerSocket serverSocket = serverChannel.socket();
		
		//Create a new Selector for use below
		Selector selector = Selector.open();
		
		//Set the port the server channel will listen to 
		serverSocket.bind(new InetSocketAddress(port));
		
		//Set nonblocking mode for the listening socket
		serverChannel.configureBlocking(false);
		
		//Register the ServerSocketChannle with the Selector
		serverChannel.register(selector, SelectionKey.OP_ACCEPT);
		
		while(true) {
			//This may block for a long time. Upon returning, the selected set contains keys of the ready channels.
			int n = selector.select();
			
			if(n==0)
				continue;
			
			//Get an iterator over the set of selected key
			Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
			
			//Look for each key in the selected set
			while(iter.hasNext()) {
				SelectionKey key = (SelectionKey)iter.next();
				
				//Is a new connection come in ?
				if(key.isAcceptable()) {
					ServerSocketChannel server = (ServerSocketChannel)key.channel();
					SocketChannel channel = server.accept();
					//注冊當前被接受的channel的"可讀事件"到selector中
					registerChannel(selector, channel, SelectionKey.OP_READ);
					
					sayHello(channel);//客戶端連接成功后便發送1條消息給客戶端
				} else if(key.isReadable()) {
					//當channel發生"可讀事件",開始從當前channel中讀取數據(具體什么時候數據到達利用了底層操作系統的功能)
					readDataFromSocket(key);
				}
				
				//Remove key from selected set; because it's been handled!
				iter.remove();
			}
		}
		
	}
	
	/**
	 * 將channel注冊到selector對象中,由selector負責監聽此通道上"請求連接"事件
	 */
	private void registerChannel(Selector selector, SocketChannel channel,
			int ops) throws Exception {
		if(channel == null)
			return; //could happen
		
		//Set the new channel nonblocking
		channel.configureBlocking(false);
		
		//Register it with the selector
		channel.register(selector, ops);
	}


	/**
	 * 通過線程池中的線程進行數據的讀取
	 */
	private void readDataFromSocket(SelectionKey key) throws Exception {
		Worker worker = pool.getWorker();
		if(worker==null)
			return;
		//Invoking this wakes up the worker thread, then returns
		worker.serviceChannel(key);
	}

	private void sayHello(SocketChannel channel) throws Exception {
		buffer.clear();
		buffer.put("Hi ~ welcome you!\r\n".getBytes());
		
		buffer.flip();
		channel.write(buffer);
	}
	
	
	/**
	 * 內部維護1個線程池
	 */
	private class ThreadPool {
		//Threads are cycled through a FIFO idle queue. 
		List<Worker> idle = new LinkedList<>();
		
		ThreadPool(int poolSize) {
			for(int i=0; i<poolSize; i++) {
				Worker thread = new Worker(this);
				thread.setName("【Woker " + (i+1) + "】");
				thread.start();
				idle.add(thread);
			}
		}
		
		/**
		 * Find an idle worker thread, if any. Could return null.
		 */
		Worker getWorker() {
			Worker worker = null;
			synchronized (idle) {
				if(!idle.isEmpty())
					worker = idle.remove(0);
			}
			return worker;
		}
		
		/**
		 * return itself to the idle pool. 
		 */
		void returnWoker(Worker worker) {
			synchronized (idle) {
				idle.add(worker);
			}
		}
	}
	
	/**
	 * 線程類
	 */
	private class Worker extends Thread {
		private ByteBuffer buffer = ByteBuffer.allocate(1024);
		private ThreadPool pool;
		private SelectionKey key;
		
		Worker(ThreadPool pool) {
			this.pool = pool;
		}
		
		synchronized void serviceChannel(SelectionKey key) {
			this.key = key;
			//This will cause the selector to ignore read-readiness for this channel while the worker thread is servicing it.
			key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
			this.notify();
		}
		
		@Override
		public synchronized void run() {
			
			logger.info(this.getName() + " is ready");
			
			//Loop forever waiting for work to do
			while(true) {
				try {
					this.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
					System.out.println(this.isInterrupted());
					//clear interrupt status
					Worker.interrupted();
				}
				
				if(key==null)
					return; // just in case
				
				logger.info(this.getName()+" has been awakened");
				
				try {
					drainChannel(key);
				} catch (Exception e) {
					e.printStackTrace();
					try {
						key.channel().close();//遇到異常關閉channel
					} catch (IOException e1) {
						e1.printStackTrace();
					}
					key.selector().wakeup();
				}
				
				key = null;
				
				// Done. Ready for more. Return to pool
				this.pool.returnWoker(this);
			}
		}
		
		
		/**
		 * 從channel中讀取數據
		 */
		void drainChannel(SelectionKey key) throws Exception {
			SocketChannel channel = (SocketChannel) key.channel();
			int count;
			buffer.clear();
			// Loop while data is available; channel is nonblocking 
			while((count=channel.read(buffer))>0) {
				buffer.flip();
				while(buffer.hasRemaining()) 
					channel.write(buffer);
				buffer.clear();
			}
			if(count<0) {
				logger.info(channel.toString() + " closed");
				channel.close();// Close channel on EOF; invalidates the key 
				return;
			}
			logger.info(key.toString() + " register OP_READ again!");
			key.interestOps(key.interestOps() | SelectionKey.OP_READ);
			key.selector().wakeup();
		}
	}
	
	
}

    

?

?

客戶端:

      package com.java.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.Iterator;
import java.util.Random;

public class SocketChannelClient {

	private CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
	private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
	
	private final int SERVER_PORT = 1234;
	
	private Selector selector = null;
	private SocketChannel socket = null;
	private SelectionKey clientKey = null;
	
	public static void main(String[] args) {
		new SocketChannelClient().new ClientThread().start();
	}
	
	public SocketChannelClient() {
		init();
	}

	private void init() {
		try {
			//create a selector
			selector = Selector.open();
			
			//create socket and register
			socket = SocketChannel.open();
			socket.configureBlocking(false);
			
			clientKey = socket.register(selector, SelectionKey.OP_CONNECT);
			
			InetSocketAddress remote = new InetSocketAddress("localhost", SERVER_PORT);
			//connect to remote server
			socket.connect(remote);

		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	
	private class ClientThread extends Thread {
		
		int times = 0;
		
		@Override
		public void run() {
			try {
				//listening for event
				for(;;) {
					selector.select();
					
					Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
					while(iter.hasNext()) {
						SelectionKey key = iter.next();
						iter.remove();
						
						if(key.isConnectable()) {
							//connection event
							SocketChannel channel = (SocketChannel)key.channel();
							if(channel.isConnectionPending())
								channel.finishConnect();
							channel.register(selector, SelectionKey.OP_READ);
							send("Hello Server!");
						} else if(key.isReadable()) {
							//read event
							SocketChannel channel = (SocketChannel)key.channel();
							
							//read data
							ByteBuffer buffer = ByteBuffer.allocate(100);
							channel.read(buffer);
							buffer.flip();
							String msg = decoder.decode(buffer).toString();
							System.out.println("Receive :" + msg);
							Thread.sleep(3000);
							if(++times==10)
								throw new RuntimeException("達到最大通信次數,程序終止");
							send("abcdefghijklmnopqrst".substring(new Random().nextInt(10)));
						}
					}
				}
			} catch(Exception e) {
				e.printStackTrace(System.err);
			} finally {
				close();
			}
		}
		
		//send message to server
		public void send(String msg) {
			try {
				SocketChannel client = (SocketChannel)clientKey.channel();
				client.write(encoder.encode(CharBuffer.wrap(msg)));
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		
		//shut down
		public void close() {
			try {
				selector.close();
				socket.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

}

    

?

?

【NIO】Chapter 4. Selectors


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 澳门一级淫片免费视频 | 久久婷五月综合 | 日韩电影中文字幕 | 中文字幕视频在线观看 | 大伊香蕉在线观看视频 wap | 精品一区二区三区在线观看 | 午夜直播在线 | 乳欲人妻办公室奶水在线电影国产 | 亚洲一级电影 | 亚洲精品不卡 | 日本久久综合视频 | 久久久久久久 | 欧美电影一区 | 亚洲综合99| 女猛烈无遮挡性视频免费 | 欧美精品九九99久久在观看 | 超碰一区二区三区 | 久久久视频在线 | 五月天婷婷精品视频 | 热灸灸这里只有精品 | 五月婷婷导航 | 国产大尺度吃奶无遮无挡网 | 久久久久久久久久久久久久av | 国产在线视频网址 | 青草草在线观看免费视频 | 黄色入口网站 | 欧洲精品视频完整版在线 | 色精品一区二区三区 | 亚洲一区二区三区在线播放 | 国产精品99爱免费视频 | 久久草在线 | 日本高清免费不卡毛片 | 青青草视频破解版 | 69久久夜色精品国产69 | 日韩成人性视频 | 黄瓜av| 色AV亚洲AV永久无码精品软件 | 人人艹人人看 | 亚洲国产片高清在线观看 | 日韩福利在线观看 | 玖玖在线免费视频 |