Pooling 技术浅析

前言

Pooling我们可以简单理解为“连接池”。
IO是很珍贵的资源,尤其是在高并发的场景下,每次IO(网络、磁盘等)创建销毁产生的开销是足以影响到整个服务(应用)的性能。
此时,采用pooling技术,可以对频繁的IO操作进行统一管理,日常应用中,我们常见的有如下几个场景:

  1. 数据库连接 -> 采用数据库连接池
  2. Nosql(如Redis) -> Redis采用连接池
  3. Http请求 -> HttpClient 采用连接池
  4. 线程 -> 多线程采用线程池

举例

1. 数据库连接池

最原始的,采用jdbc进行数据库crud操作,每次需要利用驱动,创建一个连接。
繁琐的进行创建和关闭操作,代码不清晰并且难以维护,最重要的,性能差!
目前,针对某个数据库,我们通常配置一个数据源,其中数据源的实现有很多,通过比较,目前还是推荐使用alibaba的druid

一个典型的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
<!-- 数据源驱动类可不写,Druid默认会自动根据URL识别DriverClass -->
<property name="driverClassName" value="${driverClassName}" />
<!-- 基本属性 url、user、password -->
<property name="url" value="${url}" />
<property name="username" value="${username}" />
<property name="password" value="${password}" />
<!-- 配置初始化大小、最小、最大 -->
<property name="initialSize" value="50" />
<property name="minIdle" value="10" />
<property name="maxActive" value="100" />
<!-- 配置获取连接等待超时的时间 -->
<property name="maxWait" value="60000" />
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
</bean>

再此之上,可以扩展实现主从、多读、分库分表等多数据源的场景配置,必要的时候可使用一些分布式中间件,如:当当开源的sharding-jdbc

2. Redis采用连接池

java这边redis的类库主要有2个:Jedis 以及 Lettuce
后者我们一般习惯直接引用spring-data项目的配置,这里介绍下jedis的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 连接池配置
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(Config_mpss.POOL_MAX_TOTAL);
poolConfig.setMaxIdle(Config_mpss.POOL_MAX_IDLE);
poolConfig.setMaxWaitMillis(Config_mpss.POOL_MAX_WAIT_MILLIS);
poolConfig.setTestOnBorrow(Config_mpss.POOL_TEST_ON_BORROW);
poolConfig.setTestOnReturn(Config_mpss.POOL_TEST_ON_RETURN);
// 一个简单的封装
JedisPoolWrapper.globalInit(poolConfig, Config_mpss.REDIS_STAT_HOST,
Config_mpss.REDIS_STAT_PORT, Config_mpss.REDIS_STAT_TIMEOUT,
Config_mpss.REDIS_STAT_AUTH);
// 如何使用?
Jedis jedis = null;
try {
jedis = JedisPoolWrapper.getOneJedis(); // 每次“借”一个连接
jedis.setex ...
jedis.hgetall ...
} finally {
if (jedis != null) {
jedis.close();
}
}

3. Http请求:ApacheHttpClient连接池

http请求一般我们有2个选择:

  1. java自带的HttpURLConnection
  2. apache httpclient - 注意:采用最新的版本,与旧版完全是两个东西!

不考虑性能,我们当然可以每次请求都建立一个连接,进行操作后再销毁。但是实际业务中,往往面对的都是高并发频繁的调用,每次创建和销毁显然不现实。
因此,可以自行封装一个http请求工具,根据业务需要预设一定数量的连接,并开放工具方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
public class MockHttpUtils {
public static final MockHttpUtils instance = new MockHttpUtils();
private static final Logger logger = Logger.getLogger(MockHttpUtils.class);
// 超时时间-毫秒
private static final int MAX_TIMEOUT_MILLIS = 30000;
// 请求配置
private static RequestConfig requestConfig = null;
// 连接池配置
private PoolingHttpClientConnectionManager connMgr = null;
private MockHttpUtils() {
// 连接池管理器
connMgr = new PoolingHttpClientConnectionManager();
connMgr.setMaxTotal(2048);
connMgr.setDefaultMaxPerRoute(1024);
// 设置请求参数配置
RequestConfig.Builder configBuilder = RequestConfig.custom();
configBuilder.setConnectTimeout(MAX_TIMEOUT_MILLIS);
configBuilder.setSocketTimeout(MAX_TIMEOUT_MILLIS);
configBuilder.setConnectionRequestTimeout(MAX_TIMEOUT_MILLIS);
// 在提交请求之前 测试连接是否可用
// configBuilder.setStaleConnectionCheckEnabled(true);
requestConfig = configBuilder.build();
}
public static MockHttpUtils getInstance() {
return instance;
}
/**
* 发送HTTP请求<code>json</code>方式
*
* @param url
* 请求地址
* @param postBody
* json格式
*/
public void doPost(String url, String postBody) {
CloseableHttpResponse response = null;
try {
HttpPost httpPost = genHttpPost(url, postBody);
// 初始化一个http连接
CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(connMgr)
.setDefaultRequestConfig(requestConfig).build();
response = httpClient.execute(httpPost);
if (response != null) {
int status = response.getStatusLine().getStatusCode();
String content = entity2Content(response);
// TODO
}
} catch (Exception e) {
logger.error("MockHttpUtils doPost:" + e.getMessage());
} finally {
closeQuietly(response);
}
}
/**
* 发送HTTP请求<code>json</code>方式
*
* @param url
* 请求地址
* @param postBody
* json格式
*/
public void doPostSSL(String url, String postBody) {
CloseableHttpResponse response = null;
try {
HttpPost httpPost = genHttpPost(url, postBody);
CloseableHttpClient httpsClient = HttpClients.custom()
.setSSLSocketFactory(createSSLConnSocketFactory()).setConnectionManager(connMgr)
.setDefaultRequestConfig(requestConfig).build();
response = httpsClient.execute(httpPost);
if (response != null) {
int status = response.getStatusLine().getStatusCode();
String content = entity2Content(response);
// TODO
}
} catch (Exception e) {
logger.error("MockHttpUtils doPostSSL:" + e.getMessage());
} finally {
closeQuietly(response);
}
}
/**
* 构造一个HttpPost
*
* @param url
* url
* @param postBody
* postbody
* @return httppost
*/
private HttpPost genHttpPost(String url, String postBody) {
StringEntity se = new StringEntity(postBody, "UTF-8");
HttpPost httpPost = new HttpPost(url);
httpPost.setConfig(requestConfig);
httpPost.setHeader("User-Agent", "Mozilla/5.0");
httpPost.setEntity(se);
return httpPost;
}
/**
* 创建SSL安全连接
*
* @return SSL连接socket工厂
*/
private SSLConnectionSocketFactory createSSLConnSocketFactory() {
SSLConnectionSocketFactory sslsf = null;
try {
SSLContext sslContext = new SSLContextBuilder()
.loadTrustMaterial(null, new TrustStrategy() {
public boolean isTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
return true;
}
}).build();
sslsf = new SSLConnectionSocketFactory(sslContext, new HostnameVerifier() {
@Override
public boolean verify(String hostname, SSLSession session) {
return true;
}
});
} catch (GeneralSecurityException e) {
e.printStackTrace();
}
return sslsf;
}
/**
* 关闭HttpResponse
*
* @param response
* HttpResponse
*/
private void closeQuietly(CloseableHttpResponse response) {
HttpClientUtils.closeQuietly(response);
}
/**
* 转换返回实体
*
* @param response
* 返回实体
* @throws IOException
* io异常
*/
private String entity2Content(CloseableHttpResponse response) throws IOException {
return EntityUtils.toString(response.getEntity(), "utf-8");
}
}

4. 线程池

高并发的场景下,每次有请求来临,不可能单独创建一个thread,这时候通常采用线程池,主要是2个思路:

  1. 简单情况下,直接利用Executors工具类,选择合适的实现

这部分可以参考:Java线程池-Executors

  1. 自定义,根据业务配置具体的参数,自行调优

这部分可以参考:Java线程池-自定义