并发场景下缓存的创建

问题背景:
现场提了一个新的需求:jdbc需要提供一个新的接口,用于查询session的执行进度。后台提供了查询视图,jdbc要做的只是在这个接口中查询这个视图,获得当前session的执行进度返回给客户。
查询当前session的执行进度,说明当前session很有可能正在执行某条sql,是阻塞的。所以需要通过创建新的session并以当前session的sessionID作为条件在视图中查找当前session的执行情况。这个接口的调用在某些特定场景下是比较频繁的,比如用户每隔5秒就需要调用一次,那么每次都创建新的session去查询是不是显得太low?是不是可以在第一次查询时创建新的session,然后缓存起来,下次可以直接用?于是涉及到了缓存session的问题。(其实完全可以使用连接池来做到缓存,这次讨论的是并发场景下缓存的创建问题~~思路主要来自于《java并发编程实战》)

使用HashMap建立缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ConnCache {
private final Map<String, Connection> cache;
private final ConnFactory factory;
public ConnCache(ConnFactory factory) {
cache = new HashMap<>();
this.factory = factory;
}
public synchronized Connection get(String key) throws SQLException {
Connection connection = cache.get(key);
if (connection == null) {
connection = factory.getConn();
cache.put(key, connection);
}
return connection;
}
}
interface ConnFactory {
Connection getConn() throws SQLException;
}

上面的ConnFactory.getConn()创建新的session,是一个相对耗时的操作。用户需要获得session时调用ConnCache.get()方法,先从map中查找是否有对应的session(key可以设计为连接串+用户名映射到指定的session),如果没有,那么创建一个新的session,放到map里面,然后返回。

注意到整个get方法是被synchronized修饰的,因为HashMap不是线程安全的,如果有多个线程同时访问HashMap会出现并发问题
synchronized确保了两个线程不会同时访问HashMap。但是这么做也有一个问题,对整个get方法同步会使访问同一ConnCache对象get方法的线程串行化,如果一个线程正在调用这个方法,那么其他想要调用get方法的线程需要排队等候,很有可能被阻塞很长时间(创建session是个耗时的动作)。这种情况是由于锁的粒度较大带来的伸缩性问题。

使用HashMap建立缓存

使用ConcurrentHashMap建立缓存

我们很容易想到使用ConcurrentHashMap来代替HashMap,ConcurrentHashMap本身就是线程安全的,采用了分段锁的技术,并发性能相对于加锁的HashMap要好上很多。使用ConcurrentHashMap后,我们就不需要在访问底层的Map时进行同步了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ConnCache {
private final Map<String, Connection> cache;
private final ConnFactory factory;
public ConnCache(ConnFactory factory) {
cache = new ConcurrentHashMap<>();
this.factory = factory;
}
public Connection get(String key) throws SQLException {
Connection connection = cache.get(key);
if (connection == null) {
connection = factory.getConn();
cache.put(key, connection);
}
return connection;
}
}
interface ConnFactory {
Connection getConn() throws SQLException;
}

上面这种方法相对于第一种方法,减小了锁的粒度,有着更好的并发性能。但是他也有一个严重的问题:如果一个线程在调用get方法时没有命中缓存,那么他会去创建一个新的session,然后放到map里面。如果在创建session的过程中,另一个线程也调用了get方法传入同样的key,那么就会导致重复创建的问题(这种情况很有可能出现,因为创建session是个耗时的操作)。

使用ConcurrentHashMap建立缓存

所以,我们需要某种方法来知道当前是否有其他线程在创建指定的session,如果有,则等待这个线程创建完毕,然后直接获取创建好的session。这样就能避免一次session多余的创建。

这时,我们就需要FutureTask来实现这个功能。FutureTask表示一个计算过程,这个过程可能计算完成,也可能正在运行。如果计算完毕,那么调用FutureTask.get()就会立即返回结果,否则,该方法会一直阻塞,直到有结果可用。
categories: 生活
tags: 食物

基于FutureTask建立缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ConnCache {
private final Map<String, Future<Connection>> cache;
private final ConnFactory factory;
public ConnCache(ConnFactory factory) {
cache = new ConcurrentHashMap<>();
this.factory = factory;
}
public Connection get(String key) throws SQLException {
Future<Connection> future = cache.get(key);
if (future == null) {
Callable<Connection> eval = new Callable<Connection>() {
@Override
public Connection call() throws Exception {
return factory.getConn();
}
};
FutureTask<Connection> task = new FutureTask<>(eval);
future = task;
cache.put(key, task);
task.run();
}
try {
return future.get();
} catch (InterruptedException e) {
throw new SQLException(e);
} catch (ExecutionException e) {
throw new SQLException(e);
}
}
}
interface ConnFactory {categories: 生活
tags: 食物
Connection getConn() throws SQLException;
}

与第二种方法相反,上面的方法是先检查创建session的动作是否开始(第二种方法是检查session创建是否完成),如果已经有线程在创建指定的session,就等待其创建完毕,然后获取结果。

看起来已经很完美了,但是还有一个并发缺陷: if代码块中不是原子的先检查再执行操作,两个线程很有可能同时检查到缓存为空,然后重复创建了session。

基于FutureTask建立缓存

解决这个问题的方法有一种思路:把创建好的FutureTask放入到Map这一步需要是一个原子操作,如果对应的FutureTask已经存在了,调用已存在的FutureTask.get()方法即可。

最终的实现

ConcurrentHashMap提供了一个同步方法:putIfAbsent()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class ConnCache {
private final Map<String, Future<Connection>> cache;
private final ConnFactory factory;
public ConnCache(ConnFactory factory) {
cache = new ConcurrentHashMap<>();
this.factory = factory;
}
public Connection get(String key) throws SQLException {
Future<Connection> future = cache.get(key);
if (future == null) {
Callable<Connection> eval = new Callable<Connection>() {
@Override
public Connection call() throws Exception {
return factory.getConn();
}
};
FutureTask<Connection> task = new FutureTask<>(eval);
future = cache.putIfAbsent(key, task);
if (future == null) {
future = task;
task.run();
}
}
try {
return future.get();
} catch (InterruptedException e) {
cache.remove(future);
throw new SQLException(e);
} catch (ExecutionException e) {
cache.remove(future);
throw new SQLException(e);
}
}
}
interface ConnFactory {
Connection getConn() throws SQLException;
}

上面的演示是对并发场景的一个思考。实际的缓存在使用中还要考虑缓存过期时间(可以在FutureTask的子类中实现),缓存清理算法等问题。我们也可以通过泛型将上面的代码设计为一个通用的缓存框架:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Cache<K, V> implements Factory<K, V>{
private final Map<K, Future<V>> cache;
private final Factory<K,V> factory;
public Cache(Factory<K, V> factory) {
cache = new ConcurrentHashMap<>();
this.factory = factory;
}
public V get(K key) throws InterruptedException {
Future<V> future = cache.get(key);
if (future == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws Exception {
return factory.get(key);
}
};
FutureTask<V> task = new FutureTask<>(eval);
future = cache.putIfAbsent(key, task);
if (future == null) {
future = task;
task.run();
}
}
try {
return future.get();
} catch (ExecutionException e) {
cache.remove(future);
throw new IllegalStateException(e);
}
}
}
interface Factory<K, V> {
V get(K key) throws InterruptedException;
}

推荐文章