在前面的文章中介绍过 观察者模式 及 并发编程的基础知识,为了让大家更好的了解
观察者模式
故而特意写了这篇番外..
在Java多线程下,我们需要知道当前执行线程的状态是什么比如 运行
, 关闭
, 异常
等状态的通知,而且不仅仅是更新当前页面。
观察者模式: 是一种使用率极高的模式,用于建立一种对象与对象之间的依赖关系,一个对象发生改变时将自动通知其他对象,其他对象将相应作出反应。在观察者模式中,发生改变的对象称为观察目标,而被通知的对象称为观察者,一个观察目标可以对应多个观察者,而且这些观察者之间可以没有任何相互联系,可以根据需要增加和删除观察者,使得系统更易于扩展。
观察者模式传送门:http://blog.battcn.com/2017/12/11/java/design-pattern/observer-pattern/#more
假设开发一个 爬虫功能
,由于数据过大需要利用多线程并行化来提升抓取的效率,并且在抓取过程中要记录执行线程的运行状态以便追溯问题原因
UML图如下
1.定义 具体观察
对象,实现JDK自带的 Observer
接口,然后在需要实现的 upload
方法中记录下每个线程执行的状态信息
class ObserverListener implements Observer {
/**
* 避免多线程锁竞争
*/
private static final Object LOCK = new Object();
@Override
public void update(Observable observable, Object runnableEvent) {
synchronized (LOCK) {
ObservableRunnable.RunnableEvent event = (ObservableRunnable.RunnableEvent) runnableEvent;
if (event != null) {
if (event.getCause() != null) {
System.out.println("The Runnable [" + event.getThread().getName() + "] process failed and state is " + event.getState().name());
event.getCause().printStackTrace();
} else {
System.out.println("The Runnable [" + event.getThread().getName() + "] data changed and state is " + event.getState().name());
}
}
}
}
}
2.定义 具体被观察
的对象,该对象需要继承 Observable
类,以及实现 Runnable
接口,这里 run
的实现非常简单,执行每一步骤操作时都进行了通知,通知 观察者
消息发生变更了
setChanged
呢setChanged
,但是在最后由于某种原因需要取消通知,我们可以使用 clearChanged
轻松解决问题。setChanged
为 protected
,而 notifyObservers
方法为 public
,这就导致存在外部随意调用 notifyObservers
的可能,但是外部无法调用 setChanged
,因此真正的控制权应该在主题这里。class ObservableRunnable extends Observable implements Runnable {
/**
* 线程名称
*/
private String name;
ObservableRunnable(String name, ObserverListener listener) {
this.name = name;
// 将被观察的对象注册到观察者中
super.addObserver(listener);
}
/**
* 发送通知
*
* @param event 通知的内容
*/
private void notifyChange(final RunnableEvent event) {
// 前面说过 JDK自带的 需要每次设置一次状态,代表当前内容更改了
super.setChanged();
super.notifyObservers(event);
}
@Override
public void run() {
try {
notifyChange(new RunnableEvent(RunnableState.RUNNING, Thread.currentThread(), null));
System.out.printf("根据 [%s] 查询 \n", this.name);
Thread.sleep(1000L);
if (this.name.equals("T3")) {
// 故意模拟报错
throw new RuntimeException("故意抛出错误");
}
notifyChange(new RunnableEvent(RunnableState.DOWN, Thread.currentThread(), null));
} catch (Exception e) {
notifyChange(new RunnableEvent(RunnableState.ERROR, Thread.currentThread(), e));
}
}
enum RunnableState {
/**
* RUNNING:运行
* ERROR:异常
* DOWN:正常结束
*/
RUNNING, ERROR, DOWN
}
static class RunnableEvent {
private final RunnableState state;
private final Thread thread;
private final Throwable cause;
RunnableEvent(RunnableState state, Thread thread, Throwable cause) {
this.state = state;
this.thread = thread;
this.cause = cause;
}
RunnableState getState() {
return state;
}
public Thread getThread() {
return thread;
}
Throwable getCause() {
return cause;
}
}
}
3.创建测试工程
public class ObserverClient {
public static void main(String[] args) {
ObserverListener listener = new ObserverListener();
List<String> names = Arrays.asList("T1", "T2", "T3");
for (String name : names) {
Thread thread = new Thread(new ObservableRunnable(name, listener));
thread.start();
}
}
}
4.运行结果,通过运行日志可以发现,启动三个线程后同时执行抓取操作,但是 Thread-2
线程在数据处理时发生了异常,在 ObserverListener
处也成功收到通知的内容,然后对信息进行了输出操作。在实际过程中我们可以为异常进行补偿操作
The Runnable [Thread-1] data changed and state is RUNNING
The Runnable [Thread-0] data changed and state is RUNNING
根据 [T1] 查询
The Runnable [Thread-2] data changed and state is RUNNING
根据 [T2] 查询
根据 [T3] 查询
java.lang.RuntimeException: 故意抛出错误
The Runnable [Thread-0] data changed and state is DOWN
at com.battcn.chapter14.ObservableRunnable.run(ObserverClient.java:67)
The Runnable [Thread-1] data changed and state is DOWN
at java.lang.Thread.run(Thread.java:745)
The Runnable [Thread-2] process failed and state is ERROR
本文,简单讲述了多线程环境下如何利用观察者模式进行线程状态监听,也是对前面所讲的基础进行巩固,在学习的过程中,既要知其然也要知其所以然。这样才能更好地驾驭它,更好地去理解和使用,也能更好地帮助我们 触类旁通
。
全文代码:https://gitee.com/battcn/battcn-concurent/tree/master/Chapter1-1/battcn-thread/src/main/java/com/battcn/chapter14