定义了对象之间的一对多依赖,这样一来,当一个对象改变状态时,它的 所有依赖者都会收到通知并自动更新
1.可观察者,也可以称之为主题 2.观察者 其实类似于订阅邮件,订阅者就是观察者,而发送邮件的就是可观察者( 主题 )
/**
* @author shengjk1
* @date 2020/11/9
*/
//主题,注册观察者、remove观察者、
// nofity observer 通知完之后,具体 Observer 要做什么完全在于 Observer
// 把通知与实现解耦了
public interface Subject {
//注册一个观察者
void registerObserver(Observer o);
void removeObserver(Observer o);
void notifyObservers();
}
//观察者,观察者的目的是对主题数据发生变化的一种处理方式
public interface Observer {
void update(int value);
}
/**
* @author shengjk1
* @date 2020/11/9
*/
/*
简单的主题
*/
public class SimpleSubject implements Subject {
private List<Observer> observers;
private int value = 0;
public SimpleSubject() {
observers = new ArrayList<>();
}
@Override
public void registerObserver(Observer o) {
observers.add(o);
}
@Override
public void removeObserver(Observer o) {
observers.remove(o);
}
@Override
public void notifyObservers() {
for (Observer observer : observers) {
observer.update(value);
}
}
public void setValue(int value) {
this.value = value;
notifyObservers();
}
}
/**
* @author shengjk1
* @date 2020/11/9
*/
// 简单观察者
public class SimpleObserver implements Observer {
private int value;
//观察者需要有主题的指针
private Subject subject;
public SimpleObserver(Subject subject) {
this.subject = subject;
subject.registerObserver(this);
}
@Override
public void update(int value) {
this.value = value;
display();
}
public void display() {
System.out.println("value: " + value);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SimpleObserver that = (SimpleObserver) o;
return value == that.value &&
Objects.equals(subject, that.subject);
}
@Override
public int hashCode() {
return Objects.hash(value, subject);
}
}
/**
* @author shengjk1
* @date 2020/11/9
*/
public class Example {
public static void main(String[] args) {
SimpleSubject simpleSubject = new SimpleSubject();
SimpleObserver simpleObserver = new SimpleObserver(simpleSubject);
simpleSubject.setValue(80);
simpleSubject.removeObserver(simpleObserver);
// simpleSubject.setValue(80);
}
}
其实 Flink 很多地方都用到了观察者模式,特别是一些 Listener 比如 ExecutionGraph
//观察者模式
//注册 observer
public void registerJobStatusListener(JobStatusListener listener) {
if (listener != null) {
jobStatusListeners.add(listener);
}
}
public void registerExecutionListener(ExecutionStatusListener listener) {
if (listener != null) {
executionListeners.add(listener);
}
}
//subject has change nofify observer
private void notifyJobStatusChange(JobStatus newState, Throwable error) {
if (jobStatusListeners.size() > 0) {
final long timestamp = System.currentTimeMillis();
final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
for (JobStatusListener listener : jobStatusListeners) {
try {
listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);
} catch (Throwable t) {
LOG.warn("Error while notifying JobStatusListener", t);
}
}
}
}