Lily Blooper's Blog

savvy & pragmatic


  • Home

  • Categories

  • Tags

  • Archives

  • About

digest

Posted on 2017-12-25 | In Tech |

to update

1. Why coordination in a distributed system is so challenging

After getting introduced to Apache ZooKeeper and its role in the design and
development of a distributed application, let’s drill down deeper into why
coordination in a distributed system is a hard problem.
Let’s take the example of doing configuration management for a distributed application that comprises
multiple software components running independently and concurrently, spanning across multiple physical servers.
Now, having a master node where the cluster configuration is stored and other worker nodes that download it from this master
node and auto configure themselves seems to be a simple and elegant solution.
However, this solution suffers from a potential problem of the master node being a single point of failure.
Even if we assume that the master node is designed to be fault-tolerant,
designing a system where change in the configuration is propagated to all worker nodes dynamically is not straightforward.

Another coordination problem in a distributed system is service discovery. Often,
to sustain the load and increase the availability of the application, we add more
physical servers to the system. However, we can get the client or worker nodes
to know about this change in the cluster memberships and availability of newer
machines that host different services in the cluster is something. This needs careful
design and implementation of logic in the client application itself.

Scalability improves availability, but it complicates coordination. A horizontally
scalable distributed system that spans over hundreds and thousands of physical
machines is often prone to failures such as hardware faults, system crashes,
communication link failures, and so on. These types of failures don’t really follow
any pattern, and hence, to handle such failures in the application logic and design
the system to be fault-tolerant is truly a difficult problem.

Thus, from what has been noted so far, it’s apparent that architecting a distributed
system is not so simple. Making correct, fast, and scalable cluster coordination is
hard and often prone to errors, thus leading to an overall inconsistency in the cluster.
This is where Apache ZooKeeper comes to the rescue as a robust coordination
service in the design and development of distributed systems.

2.

3.

Zookeeper 核心问题解答

Posted on 2016-08-28 | In Tech |

zookeeper faq:

  1. zookeeper是什么框架?

ZooKeeper是一个高可用的分布式数据管理与系统协调框架。
基于对Paxos算法的实现,使该框架保证了分布式环境中数据的强一致性.

  1. 有哪些应用场景?
  • PUB/SUB
  • Load Balance
  • Naming Service
  • 分布式协调
  • Cluster管理
  • Master 选举
  • 分布式 锁
  • 分布式 队列

应用: Hadoop, Hbase , Kafka , Dubbo | Canal | Otter

  1. 使用什么协议?

ZAB (Zookeeper Atomic Broadcast) 协议
支持崩溃恢复的原子广播协议

  • 所有的事务请求由一个全局唯一的 Leader来处理
  • Leader将客户端事务请求转成一个事务 Proposal , 发给所有 Follower
  • 超过半数Follower回复后, Leader再次发送一个 Commit消息,要求 Follower将之前的 Proposal提交
  1. 说说分布式一致性算法Paxos

TODO

  • 来源
  • 数学证明不重要
  1. 说一说选举算法及流程

当Zookeeper集群中的一台服务器出现以下两种情况之一时,需要进入Leader选举。

(1) 服务器初始化启动。
(2) 服务器运行期间无法和Leader保持连接。

http://www.cnblogs.com/leesf456/p/6107600.html

  1. zookeeper有哪几种节点类型?

Znode :
分为永久和临时 , 销毁是与所创建的客户端会话绑定的

  1. zookeeper对节点的watch监听通知是永久的吗?

一个Watch事件是一个一次性的触发器,当被设置了Watch的数据发生了改变的时候,则服务器将这个改变发送给设置了Watch的客户端,以便通知它们

http://blog.csdn.net/liu857279611/article/details/70495413

  1. 有哪几种部署模式?

单机模式、伪集群模式 和 集群模式

伪集群:即集群的所有服务器都部署在一台机器上。
当你手头上有一台比较好的机器,如果作为单机模式进行部署,就会浪费资源,这种情况下,ZooKeeper允许你在一台机器上通过启动不同的端口来启动多个 ZooKeeper 服务实例,

  1. 集群中的机器角色都有哪些?

Leader ; Follower ; Observer

Leader : 提供读写 ; 过半选举产生
Follower : 提供读
Observer : 提供读 , 不参与选举

其他基本概念:

Watcher:
TODO

ACL:
TODO

  1. 集群最少要几台机器,集群规则是怎样的

最少3台。 过半存活。
重要的一点是,只要集群中存在超过一半的机器能够正常工作,那么整个集群就能够正常对外服务。

  1. 集群如果有3台机器,挂掉一台集群还能工作吗?挂掉两台呢?

1台可以
2台就不行了

  1. 集群支持动态添加机器吗?

3.5之后是可以的。
属于动态读取配置文件而不用重启全部ZK Server

  1. zookeeper的java客户端都有哪些?
  • 官方的客户端(很难用)
  • curator (提供 best practice )
  1. chubby是什么,和zookeeper比你怎么看?

TODO

  • 分布式锁服务
  1. 说几个zookeeper常用的命令。

  2. 可以通过命令:echo stat|nc 127.0.0.1 2181 来查看哪个节点被选择作为follower或者leader

  3. 使用echo ruok|nc 127.0.0.1 2181 测试是否启动了该Server,若回复imok表示已经启动。
  4. echo dump| nc 127.0.0.1 2181 ,列出未经处理的会话和临时节点。
  5. echo kill | nc 127.0.0.1 2181 ,关掉server
  6. echo conf | nc 127.0.0.1 2181 ,输出相关服务配置的详细信息。
  7. echo cons | nc 127.0.0.1 2181 ,列出所有连接到服务器的客户端的完全的连接 / 会话的详细信息。
  8. echo envi |nc 127.0.0.1 2181 ,输出关于服务环境的详细信息(区别于 conf 命令)。
  9. echo reqs | nc 127.0.0.1 2181 ,列出未经处理的请求。
  10. echo wchs | nc 127.0.0.1 2181 ,列出服务器 watch 的详细信息。
  11. echo wchc | nc 127.0.0.1 2181 ,通过 session 列出服务器 watch 的详细信息,它的输出是一个与 watch 相关的会话的列表。
  12. echo wchp | nc 127.0.0.1 2181 ,通过路径列出服务器 watch 的详细信息。它输出一个与 session 相关的路径

数据发布与订阅(配置中心)
发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,服务式服务框架的服务地址列表等就非常适合使用。 1. 应用中用到的一些配置信息放到ZK上进行集中管理。这类场景通常是这样:应用在启动的时候会主动来获取一次配置,同时,在节点上注册一个Watcher,这样一来,以后每次配置有更新的时候,都会实时通知到订阅的客户端,从来达到获取最新配置信息的目的。

  1. 分布式搜索服务中,索引的元信息和服务器集群机器的节点状态存放在ZK的一些指定节点,供各个客户端订阅使用。
  2. 分布式日志收集系统。这个系统的核心工作是收集分布在不同机器的日志。收集器通常是按照应用来分配收集任务单元,因此需要在ZK上创建一个以应用名作为path的节点P,并将这个应用的所有机器ip,以子节点的形式注册到节点P上,这样一来就能够实现机器变动的时候,能够实时通知到收集器调整任务分配。
  3. 系统中有些信息需要动态获取,并且还会存在人工手动去修改这个信息的发问。通常是暴露出接口,例如JMX接口,来获取一些运行时的信息。引入ZK之后,就不用自己实现一套方案了,只要将这些信息存放到指定的ZK节点上即可。

注意:在上面提到的应用场景中,有个默认前提是:数据量很小,但是数据更新可能会比较快的场景。

负载均衡
这里说的负载均衡是指软负载均衡。在分布式环境中,为了保证高可用性,通常同一个应用或同一个服务的提供方都会部署多份,达到对等服务。而消费者就须要在这些对等的服务器中选择一个来执行相关的业务逻辑,其中比较典型的是消息中间件中的生产者,消费者负载均衡。
消息中间件中发布者和订阅者的负载均衡,linkedin开源的KafkaMQ和阿里开源的metaq都是通过zookeeper来做到生产者、消费者的负载均衡。这里以metaq为例如讲下:

生产者负载均衡:metaq发送消息的时候,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息,因此metaq在运行过程中,会把所有broker和对应的分区信息全部注册到ZK指定节点上,默认的策略是一个依次轮询的过程,生产者在通过ZK获取分区列表之后,会按照brokerId和partition的顺序排列组织成一个有序的分区列表,发送的时候按照从头到尾循环往复的方式选择一个分区来发送消息。

消费负载均衡: 在消费过程中,一个消费者会消费一个或多个分区中的消息,但是一个分区只会由一个消费者来消费。MetaQ的消费策略是:

  1. 每个分区针对同一个group只挂载一个消费者。
  2. 如果同一个group的消费者数目大于分区数目,则多出来的消费者将不参与消费。
  3. 如果同一个group的消费者数目小于分区数目,则有部分消费者需要额外承担消费任务。
    在某个消费者故障或者重启等情况下,其他消费者会感知到这一变化(通过 zookeeper watch消费者列表),然后重新进行负载均衡,保证所有的分区都有消费者进行消费。

命名服务(Naming Service)
命名服务也是分布式系统中比较常见的一类场景。在分布式系统中,通过使用命名服务,客户端应用能够根据指定名字来获取资源或服务的地址,提供者等信息。被命名的实体通常可以是集群中的机器,提供的服务地址,远程对象等等——这些我们都可以统称他们为名字(Name)。其中较为常见的就是一些分布式服务框架中的服务地址列表。通过调用ZK提供的创建节点的API,能够很容易创建一个全局唯一的path,这个path就可以作为一个名称。
阿里巴巴集团开源的分布式服务框架Dubbo中使用ZooKeeper来作为其命名服务,维护全局的服务地址列表,点击这里查看Dubbo开源项目。在Dubbo实现中:

服务提供者在启动的时候,向ZK上的指定节点/dubbo/${serviceName}/providers目录下写入自己的URL地址,这个操作就完成了服务的发布。

服务消费者启动的时候,订阅/dubbo/${serviceName}/providers目录下的提供者URL地址, 并向/dubbo/${serviceName} /consumers目录下写入自己的URL地址。

注意,所有向ZK上注册的地址都是临时节点,这样就能够保证服务提供者和消费者能够自动感应资源的变化。 另外,Dubbo还有针对服务粒度的监控,方法是订阅/dubbo/${serviceName}目录下所有提供者和消费者的信息。

分布式通知/协调
ZooKeeper中特有watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是不同系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode本身内容及子节点的),其中一个系统update了znode,那么另一个系统能够收到通知,并作出相应处理 1. 另一种心跳检测机制:检测系统和被检测系统之间并不直接关联起来,而是通过zk上某个节点关联,大大减少系统耦合。

  1. 另一种系统调度模式:某系统有控制台和推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工作。管理人员在控制台作的一些操作,实际上是修改了ZK上某些节点的状态,而ZK就把这些变化通知给他们注册Watcher的客户端,即推送系统,于是,作出相应的推送任务。
  2. 另一种工作汇报模式:一些类似于任务分发系统,子任务启动后,到zk来注册一个临时节点,并且定时将自己的进度进行汇报(将进度写回这个临时节点),这样任务管理者就能够实时知道任务进度。

总之,使用zookeeper来进行分布式通知和协调能够大大降低系统之间的耦合

集群管理与Master选举

  1. 集群机器监控:这通常用于那种对集群中机器状态,机器在线率有较高要求的场景,能够快速对集群中机器变化作出响应。这样的场景中,往往有一个监控系统,实时检测集群机器是否存活。过去的做法通常是:监控系统通过某种手段(比如ping)定时检测每个机器,或者每个机器自己定时向监控系统汇报“我还活着”。 这种做法可行,但是存在两个比较明显的问题:
  2. 集群中机器有变动的时候,牵连修改的东西比较多。
  3. 有一定的延时。
    利用ZooKeeper有两个特性,就可以实现另一种集群机器存活性监控系统:

  4. 客户端在节点 x 上注册一个Watcher,那么如果 x?的子节点变化了,会通知该客户端。

  5. 创建EPHEMERAL类型的节点,一旦客户端和服务器的会话结束或过期,那么该节点就会消失。
    例如,监控系统在 /clusterServers 节点上注册一个Watcher,以后每动态加机器,那么就往 /clusterServers 下创建一个 EPHEMERAL类型的节点:/clusterServers/{hostname}. 这样,监控系统就能够实时知道机器的增减情况,至于后续处理就是监控系统的业务了。

  6. Master选举则是zookeeper中最为经典的应用场景了。
    在分布式环境中,相同的业务应用分布在不同的机器上,有些业务逻辑(例如一些耗时的计算,网络I/O处理),往往只需要让整个集群中的某一台机器进行执行,其余机器可以共享这个结果,这样可以大大减少重复劳动,提高性能,于是这个master选举便是这种场景下的碰到的主要问题。

利用ZooKeeper的强一致性,能够保证在分布式高并发情况下节点创建的全局唯一性,即:同时有多个客户端请求创建 /currentMaster 节点,最终一定只有一个客户端请求能够创建成功。利用这个特性,就能很轻易的在分布式环境中进行集群选取了。

另外,这种场景演化一下,就是动态Master选举。这就要用到EPHEMERAL_SEQUENTIAL类型节点的特性了。

上文中提到,所有客户端创建请求,最终只有一个能够创建成功。在这里稍微变化下,就是允许所有请求都能够创建成功,但是得有个创建顺序,于是所有的请求最终在ZK上创建结果的一种可能情况是这样: /currentMaster/{sessionId}-1 ,/currentMaster/{sessionId}-2,/currentMaster/{sessionId}-3 ….. 每次选取序列号最小的那个机器作为Master,如果这个机器挂了,由于他创建的节点会马上小时,那么之后最小的那个机器就是Master了。

  1. 在搜索系统中,如果集群中每个机器都生成一份全量索引,不仅耗时,而且不能保证彼此之间索引数据一致。因此让集群中的Master来进行全量索引的生成,然后同步到集群中其它机器。另外,Master选举的容灾措施是,可以随时进行手动指定master,就是说应用在zk在无法获取master信息时,可以通过比如http方式,向一个地方获取master。
  2. 在Hbase中,也是使用ZooKeeper来实现动态HMaster的选举。在Hbase实现中,会在ZK上存储一些ROOT表的地址和HMaster的地址,HRegionServer也会把自己以临时节点(Ephemeral)的方式注册到Zookeeper中,使得HMaster可以随时感知到各个HRegionServer的存活状态,同时,一旦HMaster出现问题,会重新选举出一个HMaster来运行,从而避免了HMaster的单点问题

分布式锁
分布式锁,这个主要得益于ZooKeeper为我们保证了数据的强一致性。锁服务可以分为两类,一个是保持独占,另一个是控制时序。

  1. 所谓保持独占,就是所有试图来获取这个锁的客户端,最终只有一个可以成功获得这把锁。通常的做法是把zk上的一个znode看作是一把锁,通过create znode的方式来实现。所有客户端都去创建 /distribute_lock 节点,最终成功创建的那个客户端也即拥有了这把锁。
  2. 控制时序,就是所有视图来获取这个锁的客户端,最终都是会被安排执行,只是有个全局时序了。做法和上面基本类似,只是这里 /distribute_lock 已经预先存在,客户端在它下面创建临时有序节点(这个可以通过节点的属性控制:CreateMode.EPHEMERAL_SEQUENTIAL来指定)。Zk的父节点(/distribute_lock)维持一份sequence,保证子节点创建的时序性,从而也形成了每个客户端的全局时序。

分布式队列
队列方面,简单地讲有两种,一种是常规的先进先出队列,另一种是要等到队列成员聚齐之后的才统一按序执行。对于第一种先进先出队列,和分布式锁服务中的控制时序场景基本原理一致,这里不再赘述。 第二种队列其实是在FIFO队列的基础上作了一个增强。通常可以在 /queue 这个znode下预先建立一个/queue/num 节点,并且赋值为n(或者直接给/queue赋值n),表示队列大小,之后每次有队列成员加入后,就判断下是否已经到达队列大小,决定是否可以开始执行了。这种用法的典型场景是,分布式环境中,一个大任务Task A,需要在很多子任务完成(或条件就绪)情况下才能进行。这个时候,凡是其中一个子任务完成(就绪),那么就去 /taskList 下建立自己的临时时序节点(CreateMode.EPHEMERAL_SEQUENTIAL),当 /taskList 发现自己下面的子节点满足指定个数,就可以进行下一步按序进行处理了。
分布式系统

CountdownLatch Example

Posted on 2014-03-07 | In Tech |

J.U.C包同步工具类: CountdownLatch演示

直接上代码

我们用了2个CountdownLatch。主锁要求一个参与者。
另一个参与者个数与任务线程个数相同。
任务本身是带有返回值的,通过future.get获取

思路:
主线程创建10个任务,提交到线程池,线程池会立刻执行。
但是由于主锁,所有任务线程都会等待。
此时主线程先获取future对象集合。之后解锁。等待另一锁
10个任务分别执行 - 执行速度和先后顺序不定
每个任务结束,将任务锁减一,主线程此时已经确定所有子任务执行完毕
直接循环future.get() 一定是不需要等待直接拿到任务结果
主线程进行汇总处理
结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 主调线程
public class CountdownLatchTest {
public static void main(String[] args) throws Exception {
int workerSize = 10;
CountDownLatch mainLatch = new CountDownLatch(1);
CountDownLatch workerLatch = new CountDownLatch(5);
ExecutorService pool = Executors.newFixedThreadPool(workerSize);
List<Future<String>> futures = new ArrayList<Future<String>>();
for (int i = 0; i < workerSize; i++) {
Future<String> future = pool.submit(new SumWorker(mainLatch, workerLatch, i * 10));
futures.add(future);
}
mainLatch.countDown();
workerLatch.await();
pool.shutdown();
Integer total = 0;
for (Future<String> f : futures) {
String oneResult = f.get();
System.out.println(oneResult);
total += Integer.parseInt(oneResult.split(",")[1]);
}
String text = "";
int mainSum = 0;
for (int i = 0; i < 10 * workerSize; i++) {
mainSum += i;
}
System.out.println(String.format(text, workerSize, total, mainSum));
}
}
// 子任务实际上是一个求和工作
public class SumWorker implements Callable<String> {
private CountDownLatch mainLatch;
private CountDownLatch workerLatch;
private Integer startNumber;
private Integer size = 10;
public SumWorker(CountDownLatch mainLatch, CountDownLatch workerLatch, Integer startNumber) {
this.mainLatch = mainLatch;
this.workerLatch = workerLatch;
this.startNumber = startNumber;
}
@Override
public String call() throws Exception {
mainLatch.await(); // 等待主线程倒计时结束
Integer sum = 0;
for (int i = startNumber; i < startNumber + size; i++) {
sum += i;
}
String desc = Thread.currentThread().getName() + "," + sum;
workerLatch.countDown();
return desc;
}
}

Pooling 技术浅析

Posted on 2014-03-07 | In Tech |

前言

Pooling我们可以简单理解为“连接池”。
IO是很珍贵的资源,尤其是在高并发的场景下,每次IO(网络、磁盘等)创建销毁产生的开销是足以影响到整个服务(应用)的性能。
此时,采用pooling技术,可以对频繁的IO操作进行统一管理,日常应用中,我们常见的有如下几个场景:

  1. 数据库连接 -> 采用数据库连接池
  2. Nosql(如Redis) -> Redis采用连接池
  3. Http请求 -> HttpClient 采用连接池
  4. 线程 -> 多线程采用线程池

举例

1. 数据库连接池

最原始的,采用jdbc进行数据库crud操作,每次需要利用驱动,创建一个连接。
繁琐的进行创建和关闭操作,代码不清晰并且难以维护,最重要的,性能差!
目前,针对某个数据库,我们通常配置一个数据源,其中数据源的实现有很多,通过比较,目前还是推荐使用alibaba的druid

一个典型的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
<!-- 数据源驱动类可不写,Druid默认会自动根据URL识别DriverClass -->
<property name="driverClassName" value="${driverClassName}" />
<!-- 基本属性 url、user、password -->
<property name="url" value="${url}" />
<property name="username" value="${username}" />
<property name="password" value="${password}" />
<!-- 配置初始化大小、最小、最大 -->
<property name="initialSize" value="50" />
<property name="minIdle" value="10" />
<property name="maxActive" value="100" />
<!-- 配置获取连接等待超时的时间 -->
<property name="maxWait" value="60000" />
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
</bean>

再此之上,可以扩展实现主从、多读、分库分表等多数据源的场景配置,必要的时候可使用一些分布式中间件,如:当当开源的sharding-jdbc

2. Redis采用连接池

java这边redis的类库主要有2个:Jedis 以及 Lettuce
后者我们一般习惯直接引用spring-data项目的配置,这里介绍下jedis的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 连接池配置
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(Config_mpss.POOL_MAX_TOTAL);
poolConfig.setMaxIdle(Config_mpss.POOL_MAX_IDLE);
poolConfig.setMaxWaitMillis(Config_mpss.POOL_MAX_WAIT_MILLIS);
poolConfig.setTestOnBorrow(Config_mpss.POOL_TEST_ON_BORROW);
poolConfig.setTestOnReturn(Config_mpss.POOL_TEST_ON_RETURN);
// 一个简单的封装
JedisPoolWrapper.globalInit(poolConfig, Config_mpss.REDIS_STAT_HOST,
Config_mpss.REDIS_STAT_PORT, Config_mpss.REDIS_STAT_TIMEOUT,
Config_mpss.REDIS_STAT_AUTH);
// 如何使用?
Jedis jedis = null;
try {
jedis = JedisPoolWrapper.getOneJedis(); // 每次“借”一个连接
jedis.setex ...
jedis.hgetall ...
} finally {
if (jedis != null) {
jedis.close();
}
}

3. Http请求:ApacheHttpClient连接池

http请求一般我们有2个选择:

  1. java自带的HttpURLConnection
  2. apache httpclient - 注意:采用最新的版本,与旧版完全是两个东西!

不考虑性能,我们当然可以每次请求都建立一个连接,进行操作后再销毁。但是实际业务中,往往面对的都是高并发频繁的调用,每次创建和销毁显然不现实。
因此,可以自行封装一个http请求工具,根据业务需要预设一定数量的连接,并开放工具方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
public class MockHttpUtils {
public static final MockHttpUtils instance = new MockHttpUtils();
private static final Logger logger = Logger.getLogger(MockHttpUtils.class);
// 超时时间-毫秒
private static final int MAX_TIMEOUT_MILLIS = 30000;
// 请求配置
private static RequestConfig requestConfig = null;
// 连接池配置
private PoolingHttpClientConnectionManager connMgr = null;
private MockHttpUtils() {
// 连接池管理器
connMgr = new PoolingHttpClientConnectionManager();
connMgr.setMaxTotal(2048);
connMgr.setDefaultMaxPerRoute(1024);
// 设置请求参数配置
RequestConfig.Builder configBuilder = RequestConfig.custom();
configBuilder.setConnectTimeout(MAX_TIMEOUT_MILLIS);
configBuilder.setSocketTimeout(MAX_TIMEOUT_MILLIS);
configBuilder.setConnectionRequestTimeout(MAX_TIMEOUT_MILLIS);
// 在提交请求之前 测试连接是否可用
// configBuilder.setStaleConnectionCheckEnabled(true);
requestConfig = configBuilder.build();
}
public static MockHttpUtils getInstance() {
return instance;
}
/**
* 发送HTTP请求<code>json</code>方式
*
* @param url
* 请求地址
* @param postBody
* json格式
*/
public void doPost(String url, String postBody) {
CloseableHttpResponse response = null;
try {
HttpPost httpPost = genHttpPost(url, postBody);
// 初始化一个http连接
CloseableHttpClient httpClient = HttpClients.custom().setConnectionManager(connMgr)
.setDefaultRequestConfig(requestConfig).build();
response = httpClient.execute(httpPost);
if (response != null) {
int status = response.getStatusLine().getStatusCode();
String content = entity2Content(response);
// TODO
}
} catch (Exception e) {
logger.error("MockHttpUtils doPost:" + e.getMessage());
} finally {
closeQuietly(response);
}
}
/**
* 发送HTTP请求<code>json</code>方式
*
* @param url
* 请求地址
* @param postBody
* json格式
*/
public void doPostSSL(String url, String postBody) {
CloseableHttpResponse response = null;
try {
HttpPost httpPost = genHttpPost(url, postBody);
CloseableHttpClient httpsClient = HttpClients.custom()
.setSSLSocketFactory(createSSLConnSocketFactory()).setConnectionManager(connMgr)
.setDefaultRequestConfig(requestConfig).build();
response = httpsClient.execute(httpPost);
if (response != null) {
int status = response.getStatusLine().getStatusCode();
String content = entity2Content(response);
// TODO
}
} catch (Exception e) {
logger.error("MockHttpUtils doPostSSL:" + e.getMessage());
} finally {
closeQuietly(response);
}
}
/**
* 构造一个HttpPost
*
* @param url
* url
* @param postBody
* postbody
* @return httppost
*/
private HttpPost genHttpPost(String url, String postBody) {
StringEntity se = new StringEntity(postBody, "UTF-8");
HttpPost httpPost = new HttpPost(url);
httpPost.setConfig(requestConfig);
httpPost.setHeader("User-Agent", "Mozilla/5.0");
httpPost.setEntity(se);
return httpPost;
}
/**
* 创建SSL安全连接
*
* @return SSL连接socket工厂
*/
private SSLConnectionSocketFactory createSSLConnSocketFactory() {
SSLConnectionSocketFactory sslsf = null;
try {
SSLContext sslContext = new SSLContextBuilder()
.loadTrustMaterial(null, new TrustStrategy() {
public boolean isTrusted(X509Certificate[] chain, String authType)
throws CertificateException {
return true;
}
}).build();
sslsf = new SSLConnectionSocketFactory(sslContext, new HostnameVerifier() {
@Override
public boolean verify(String hostname, SSLSession session) {
return true;
}
});
} catch (GeneralSecurityException e) {
e.printStackTrace();
}
return sslsf;
}
/**
* 关闭HttpResponse
*
* @param response
* HttpResponse
*/
private void closeQuietly(CloseableHttpResponse response) {
HttpClientUtils.closeQuietly(response);
}
/**
* 转换返回实体
*
* @param response
* 返回实体
* @throws IOException
* io异常
*/
private String entity2Content(CloseableHttpResponse response) throws IOException {
return EntityUtils.toString(response.getEntity(), "utf-8");
}
}

4. 线程池

高并发的场景下,每次有请求来临,不可能单独创建一个thread,这时候通常采用线程池,主要是2个思路:

  1. 简单情况下,直接利用Executors工具类,选择合适的实现

这部分可以参考:Java线程池-Executors

  1. 自定义,根据业务配置具体的参数,自行调优

这部分可以参考:Java线程池-自定义

JVM问题集

Posted on 2013-09-04 | In Tech |

JVM问题集锦

不断更新中。。。

1. JVM了解吗,介绍一下,讲了垃圾回收机制

2. Full GC和Minor GC区别,及各自的触发条件

3. 说说CMS垃圾回收器,及其适用场景

Java并发-J.U.C包简介

Posted on 2012-08-05 | In Tech |

Java并发包简介

J.U.C 即 java.util.concurrent 包。包含了大量可以利用学习的类,本文简介如下:
直接上图,方便理清楚它们的关系:

lock包


如图,包含 lock, 以及可以重点关注的 AbstractQueuedSynchronizer (AQS)

同步工具类

有的基于AQS,有的不是

如图,Semaphore和CountdownLatch是基于AQS

线程安全集合

队列

Future相关

线程池

原子相关

后端工程师如何在技术上定位?

Posted on 2012-08-01 | In Tech |

前言

正所谓术业有专攻,一个人如果什么都会,那么他也什么都不精。 大中型公司需要专业人才,小公司需要全才,但是对于个人职业发展来说,我建议是分开。

后端工程师技术路线

对于后端java工程师:

  • Java基础
  • 设计模式
  • Spring & SpringMVC 原理和源码
  • 用得上的 linux运维
  • 用得上的 mysql :事务、索引、调优、分库分表等
  • 用得上的 redis/mongodb 应用
  • http 协议
  • 多线程和并发
  • JVM
  • 分布式架构(dubbo|spring cloud)
  • 微服务架构(springboot,docker)
  • java性能优化
  • 相关的项目管理等等 (CI、CD、CVS ==)

后端追求的是:高并发,高可用,高性能
能够精通业务是很大的加分项

术业有专攻

术业有专攻,这样你的核心竞争力才会越来越高,正所谓你往生活中投入什么,生活就会反馈给你什么。
后续会对每一部分进行详细的介绍,重点会放在两部分:

  1. 工作中实用
  2. 面试中常问
    虽然可能是”面试造飞机,工作拧螺丝“,但是参考下还是有好处的。

Java ThreadPool - Custom

Posted on 2012-04-13 | In Tech |

Java 线程池简要-自定义线程池

自定义线程池

直接看源码,自定义线程池的几个关键参数

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

几个核心参数:

  • corePoolSize - 即时空闲也会保持的核心线程数量
  • maximumPoolSize - 池中最大线程数量
  • keepAliveTime - 超过核心数量后,等待接收任务的最大时间
  • unit - keepAliveTime 参数的单位
  • workQueue - 执行之前排队的缓存队列
  • handler - 线程数和队列数都达到最大值后的处理器

新任务来了之后的流程

直接看javadoc怎么说的,已经很详细了:

  • 核心数和最大线程数:
    ThreadPoolExecutor会根据核心线程数和最大线程数不断调整线程池大小。
    当新的任务通过 execute()方法提交后:

    1. 当前线程数 < corePoolSize : 会创建一个新线程处理请求,即时有其他的空闲线程
    2. 当前线程数 > corePoolSize 但是队列还没满 -> 任务加入队列
      队列如果也满了,并且当前线程数 < maximumPoolSize : 创建一个新线程处理请求
    3. corePoolSize = maximumPoolSize : 固定大小的线程池
    4. maximumPoolSize = Integer.MAX_VALUE,线程池处理任意数量的并发任务
      一般来说,core 和 maximum 的值是构造的时候指定,但也可以通过方法动态调整
  • 按需创建
    默认的话即时核心前程也是有新任务的时候才创建,但是可以通过方法覆盖此策略。

  • 创建新的线程
    通过 ThreadFactory 创建新的线程,不指定就用默认的。
    创建的线程有相同的 ThreadGroup,优先级,都是非守护线程的状态。
    提供一个不同的 ThreadFactory ,即可修改这些策略。

  • Keep-alive 时间
    超过 corePoolSize 的时候,如果有空闲线程,那边就会被终结。之后有需求会重新创建

  • Queuing
    任何的 BlockingQueue 都可以用来传递、持有提交的任务。

    • 小于 corePoolSize ,创建新线程,不加到队列
    • 大于 corePoolSize , 总是加入队列而不是创建新线程
    • 如果无法加入队列,创建新线程直到达到 maximumPoolSize。超过了则任务被拒
      主要有3种加入队列的策略:
    • SynchronousQueue (待补充)
    • LinkedBlockingQueue (待补充)
    • ArrayBlockingQueue (待补充)
  • 拒绝任务
    线程池已经关闭 or 队列和线程全部饱和,会触发任务拒绝处理,默认有4个预置策略

    • 抛出运行时异常(默认)
    • 发起execute线程自己执行任务
    • 直接扔掉
    • 扔掉最早的任务
  • 钩子方法
    开始和结束执行任务时提供钩子方法。可以做一些:初始化ThreadLocal,统计,加日志等

  • 队列维护

  • 终结

Java ThreadPool - Executors

Posted on 2012-04-13 | In Tech |

Java 线程池简要-使用Executors

直接使用J.U.C包内置的工具方法

通过javadoc我们发现其实只有4组:

通过源码看本质,需要提前了解自定义线程池的方法 点这里

1.固定大小的线程池

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

本质上,核心和最大线程数一致。缓存队列无限。

2.单线程线程池

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

同上,可以理解为 newFixedThreadPool(1)

3.可缓存的线程池

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

核心线程数0,最大线程数不控制,可能会复用之前创建好的线程。适合短生命周期的异步任务

4. 周期执行的线程池

1
2
3
4
5
6
7
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}

本质上,核心线程数是指定的,最大不限,核心在于这个缓存队列
支持周期执行任务,或者给定间隔后执行

Richard J. Lee (Lily Blooper)

Richard J. Lee (Lily Blooper)

enjoy handful of articles & code snippets & experiences

9 posts
1 categories
9 tags
GitHub Weibo
Creative Commons
© 2012 - 2018 Richard J. Lee (Lily Blooper)
Powered by Hexo
Theme - NexT.Gemini