目录

一个简单的线程示例

如何将Socket客户端连接植入线程?

通过初始化线程个数模拟一个线程池

使用Java Executor ExecutorService线程池管理Socket多线程

一个简单的线程示例

实现一个线程可以继承Thread类或者实现Runnable接口:

package Chapter4;

import java.util.concurrent.TimeUnit;

/**

* @TODO (功能说明:线程例子)

* @author PJL

* @package Chapter4

* @motto 学习需要毅力,那就秀毅力

* @file ThreadExample.java

* @time 2019年10月24日 下午10:11:58

*/

public class ThreadExample implements Runnable {

private String greeting; // Message to print to console

public ThreadExample(String greeting) {

this.greeting = greeting;

}

public void run() {

while (true) {

System.out.println(Thread.currentThread().getName() + ": " + greeting);

try {

// Sleep 0 to 100 milliseconds

TimeUnit.MILLISECONDS.sleep(((long) Math.random() * 100));

} catch (InterruptedException e) {

} // Should not happen

}

}

public static void main(String[] args) {

new Thread(new ThreadExample("Hello")).start();

new Thread(new ThreadExample("Aloha")).start();

new Thread(new ThreadExample("Ciao")).start();

}

}

如何将Socket客户端连接植入线程?

当我们在服务端serverSocket.accept()接收到一个新的连接时,我们获取到的实际上是一个客户端连接的Socket实例。通过这个实例,我们可以实现数据的双向通信。将Socket作为参数放入线程,是为了让Socket流水作业变成线程式工作方式。

package Chapter4;

import java.io.IOException;

import java.io.InputStream;

import java.io.OutputStream;

import java.net.Socket;

import java.util.logging.Level;

import java.util.logging.Logger;

/**

*

* @TODO (功能说明:Socket日志处理线程)

* @author PJL

* @package Chapter4

* @motto 学习需要毅力,那就秀毅力

* @file EchoProtocol.java

* @time 2019年10月24日 下午10:20:18

*/

public class EchoProtocol implements Runnable {

private static final int BUFSIZE = 32; // Size (in bytes) of I/O buffer

private Socket clntSock; // Socket connect to client

private Logger logger; // Server logger

public EchoProtocol(Socket clntSock, Logger logger) {

this.clntSock = clntSock;

this.logger = logger;

}

/**

* Socket处理日志方法

* @param clntSock

* @param logger

*/

public static void handleEchoClient(Socket clntSock, Logger logger) {

try {

// Get the input and output I/O streams from socket

InputStream in = clntSock.getInputStream();

OutputStream out = clntSock.getOutputStream();

int recvMsgSize; // Size of received message

int totalBytesEchoed = 0; // Bytes received from client

byte[] echoBuffer = new byte[BUFSIZE]; // Receive Buffer

// Receive until client closes connection, indicated by -1

while ((recvMsgSize = in.read(echoBuffer)) != -1) {

out.write(echoBuffer, 0, recvMsgSize);

totalBytesEchoed += recvMsgSize;

}

logger.info("Client " + clntSock.getRemoteSocketAddress() + ", echoed " + totalBytesEchoed + " bytes.");

} catch (IOException ex) {

logger.log(Level.WARNING, "Exception in echo protocol", ex);

} finally {

try {

clntSock.close();

} catch (IOException e) {

}

}

}

/**

* 线程执行方法

*/

public void run() {

handleEchoClient(clntSock, logger);

}

}

接下来就是将线程利用起来了,请看下面:

package Chapter4;

import java.io.IOException;

import java.net.ServerSocket;

import java.net.Socket;

import java.util.logging.Logger;

/**

* @TODO (功能说明:一客户一线程方式处理Socket)

* @author PJL

* @package Chapter4

* @motto 学习需要毅力,那就秀毅力

* @file TCPEchoServerThread.java

* @time 2019年10月24日 下午10:29:46

*/

public class TCPEchoServerThread {

public static void main(String[] args) throws IOException {

if (args.length == 0) {

args = new String[1];

args[0] = "7402";

}

if (args.length != 1) { // Test for correct # of args

throw new IllegalArgumentException("Parameter(s): ");

}

int echoServPort = Integer.parseInt(args[0]); // Server port

// Create a server socket to accept client connection requests

ServerSocket servSock = new ServerSocket(echoServPort);

Logger logger = Logger.getLogger("practical");

// Run forever, accepting and spawning a thread for each connection

while (true) {

Socket clntSock = servSock.accept(); // Block waiting for connection

// Spawn thread to handle new connection

Thread thread = new Thread(new EchoProtocol(clntSock, logger));

thread.start();

logger.info("Created and started Thread " + thread.getName());

}

/* NOT REACHED */

}

}

通过初始化线程个数模拟一个线程池

线程池的作用就是为了让线程能够被管理起来合理地使用,合理利用服务器资源。如果一个用户一个线程,那么我们的资源随着用户的增长而愈来愈紧张,所以为了避免线程的滥用,线程池的使用很重要。

package Chapter4;

import java.io.IOException;

import java.net.ServerSocket;

import java.net.Socket;

import java.util.logging.Level;

import java.util.logging.Logger;

/**

* @TODO (功能说明:模拟线程池等待Socket请求处理)

* @author PJL

* @package Chapter4

* @motto 学习需要毅力,那就秀毅力

* @file TCPEchoServerPool.java

* @time 2019年10月24日 下午10:36:47

*/

public class TCPEchoServerPool {

public static void main(String[] args) throws IOException {

if (args.length == 0) {

args = new String[2];

args[0] = "7402";

args[1] = "10";

}

if (args.length != 2) { // Test for correct # of args

throw new IllegalArgumentException("Parameter(s): ");

}

int echoServPort = Integer.parseInt(args[0]); // Server port

int threadPoolSize = Integer.parseInt(args[1]);

// Create a server socket to accept client connection requests

final ServerSocket servSock = new ServerSocket(echoServPort);

final Logger logger = Logger.getLogger("practical");

// Spawn a fixed number of threads to service clients

for (int i = 0; i < threadPoolSize; i++) {

Thread thread = new Thread() {

public void run() {

while (true) {

try {

Socket clntSock = servSock.accept(); // Wait for a connection

EchoProtocol.handleEchoClient(clntSock, logger); // Handle it

} catch (IOException ex) {

logger.log(Level.WARNING, "Client accept failed", ex);

}

}

}

};

thread.start();

logger.info("Created and started Thread = " + thread.getName());

}

}

}

上面初始化了一定数量的线程,每个线程在没有收到请求时都是阻塞的,我们不知道哪一个线程会幸运地连接到新的请求,不确定性就是线程存在最有意思的地方。

使用Java Executor ExecutorService线程池管理Socket多线程

Executor 是线程池的基础接口类。

Executors 是线程池的一组实现方法,如下面:

我们可以这样来使用线程池对象:

int threadPoolSize=100;

Executor service0 =Executors.newFixedThreadPool(threadPoolSize);

Executor service1 = Executors.newCachedThreadPool();

ExecutorService service2=Executors.newCachedThreadPool();

Executor的方法很少:

ExecutorService提供了更为高级的方法:

下面来看一个Socket关于线程池的使用方式:

package Chapter4;

import java.io.IOException;

import java.net.ServerSocket;

import java.net.Socket;

import java.util.concurrent.Executor;

import java.util.concurrent.Executors;

import java.util.logging.Logger;

/**

* @TODO (功能说明:Java自带高级线程池管理Socket请求线程服务)

* @author PJL

* @package Chapter4

* @motto 学习需要毅力,那就秀毅力

* @file TCPEchoServerExecutor.java

* @time 2019年10月24日 下午10:45:45

*/

public class TCPEchoServerExecutor {

public static void main(String[] args) throws IOException {

if (args.length == 0) {

args = new String[1];

args[0] = "7402";

}

if (args.length != 1) { // Test for correct # of args

throw new IllegalArgumentException("Parameter(s): ");

}

int echoServPort = Integer.parseInt(args[0]); // Server port

// Create a server socket to accept client connection requests

ServerSocket servSock = new ServerSocket(echoServPort);

Logger logger = Logger.getLogger("practical");

/*

* int threadPoolSize=100;

* Executor service =Executors.newFixedThreadPool(threadPoolSize);

*/

Executor service = Executors.newCachedThreadPool(); // Dispatch svc

// Run forever, accepting and spawning threads to service each connection

while (true) {

Socket clntSock = servSock.accept(); // Block waiting for connection

service.execute(new EchoProtocol(clntSock, logger));

// set timeout socket thread

//service.execute(new TimelimitEchoProtocol(clntSock, logger));

}

/* NOT REACHED */

}

}

另外Socket连接还比较关注阻塞问题,所以会涉及到超时问题的处理,主要是对Socket设置超时时间,如下例:

package Chapter4;

import java.io.IOException;

import java.io.InputStream;

import java.io.OutputStream;

import java.net.Socket;

import java.util.logging.Level;

import java.util.logging.Logger;

/**

*

* @TODO (功能说明:时间限制-控制接收receive、读取read、等待链接accept 超时时间 核心是设置setSoTimeout)

* @author PJL

* @package Chapter4

* @motto 学习需要毅力,那就秀毅力

* @file TimeLimitEchoProtocol.java

* @time 2019年10月24日 下午10:54:00

*/

class TimelimitEchoProtocol implements Runnable {

private static final int BUFSIZE = 32; // Size (bytes) buffer

private static final String TIMELIMIT = "10000"; // Default limit (ms)

private static final String TIMELIMITPROP = "Timelimit"; // Thread property

private static int timelimit;

private Socket clntSock;

private Logger logger;

public TimelimitEchoProtocol(Socket clntSock, Logger logger) {

this.clntSock = clntSock;

this.logger = logger;

// Get the time limit from the System properties or take the default

timelimit = Integer.parseInt(System.getProperty(TIMELIMITPROP, TIMELIMIT));

}

public static void handleEchoClient(Socket clntSock, Logger logger) {

try {

// Get the input and output I/O streams from socket

InputStream in = clntSock.getInputStream();

OutputStream out = clntSock.getOutputStream();

int recvMsgSize; // Size of received message

int totalBytesEchoed = 0; // Bytes received from client

byte[] echoBuffer = new byte[BUFSIZE]; // Receive buffer

long endTime = System.currentTimeMillis() + timelimit;

int timeBoundMillis = timelimit;

clntSock.setSoTimeout(timeBoundMillis);

// Receive until client closes connection, indicated by -1

while ((timeBoundMillis > 0) && // catch zero values

((recvMsgSize = in.read(echoBuffer)) != -1)) {

out.write(echoBuffer, 0, recvMsgSize);

totalBytesEchoed += recvMsgSize;

timeBoundMillis = (int) (endTime - System.currentTimeMillis());

clntSock.setSoTimeout(timeBoundMillis);

}

logger.info("Client " + clntSock.getRemoteSocketAddress() + ", echoed " + totalBytesEchoed + " bytes.");

} catch (IOException ex) {

logger.log(Level.WARNING, "Exception in echo protocol", ex);

}

}

public void run() {

handleEchoClient(this.clntSock, this.logger);

}

}

学习代码实例请参看:https://github.com/Nuclear-Core-Learning/TCPIP-Socket/tree/master/src/Chapter4