前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >结合 Flink 学习观察者模式

结合 Flink 学习观察者模式

作者头像
shengjk1
发布2020-11-11 16:21:00
4370
发布2020-11-11 16:21:00
举报
文章被收录于专栏:码字搬砖码字搬砖
什么是观察者模式

定义了对象之间的一对多依赖,这样一来,当一个对象改变状态时,它的 所有依赖者都会收到通知并自动更新

观察者模式实现

1.可观察者,也可以称之为主题 2.观察者 其实类似于订阅邮件,订阅者就是观察者,而发送邮件的就是可观察者( 主题 )

代码语言:javascript
复制
/**
 * @author shengjk1
 * @date 2020/11/9
 */
//主题,注册观察者、remove观察者、
// nofity observer  通知完之后,具体 Observer 要做什么完全在于 Observer
// 把通知与实现解耦了
public interface Subject {
	//注册一个观察者
	void registerObserver(Observer o);
	
	void removeObserver(Observer o);
	
	void notifyObservers();
	
}
代码语言:javascript
复制
//观察者,观察者的目的是对主题数据发生变化的一种处理方式
public interface Observer {
	void update(int value);
}
代码语言:javascript
复制
/**
 * @author shengjk1
 * @date 2020/11/9
 */
/*
简单的主题
 */
public class SimpleSubject implements Subject {
	private List<Observer> observers;
	private int value = 0;
	
	public SimpleSubject() {
		observers = new ArrayList<>();
	}
	
	@Override
	public void registerObserver(Observer o) {
		observers.add(o);
	}
	
	@Override
	public void removeObserver(Observer o) {
		observers.remove(o);
	}
	
	@Override
	public void notifyObservers() {
		for (Observer observer : observers) {
			observer.update(value);
		}
	}
	
	public void setValue(int value) {
		this.value = value;
		notifyObservers();
	}
}
代码语言:javascript
复制
/**
 * @author shengjk1
 * @date 2020/11/9
 */
// 简单观察者
public class SimpleObserver implements Observer {
	private int value;
	//观察者需要有主题的指针
	private Subject subject;
	
	public SimpleObserver(Subject subject) {
		this.subject = subject;
		subject.registerObserver(this);
	}
	
	@Override
	public void update(int value) {
		this.value = value;
		display();
	}
	
	public void display() {
		System.out.println("value: " + value);
	}
	
	@Override
	public boolean equals(Object o) {
		if (this == o) return true;
		if (o == null || getClass() != o.getClass()) return false;
		SimpleObserver that = (SimpleObserver) o;
		return value == that.value &&
				Objects.equals(subject, that.subject);
	}
	
	@Override
	public int hashCode() {
		return Objects.hash(value, subject);
	}
}
代码语言:javascript
复制
/**
 * @author shengjk1
 * @date 2020/11/9
 */
public class Example {
	public static void main(String[] args) {
		SimpleSubject simpleSubject = new SimpleSubject();
		SimpleObserver simpleObserver = new SimpleObserver(simpleSubject);
		simpleSubject.setValue(80);
		simpleSubject.removeObserver(simpleObserver);
//		simpleSubject.setValue(80);
	}
}
Flink 是如何实现观察者模式的

其实 Flink 很多地方都用到了观察者模式,特别是一些 Listener 比如 ExecutionGraph

代码语言:javascript
复制
//观察者模式
	//注册 observer
	public void registerJobStatusListener(JobStatusListener listener) {
		if (listener != null) {
			jobStatusListeners.add(listener);
		}
	}

	public void registerExecutionListener(ExecutionStatusListener listener) {
		if (listener != null) {
			executionListeners.add(listener);
		}
	}

	//subject has change nofify observer
	private void notifyJobStatusChange(JobStatus newState, Throwable error) {
		if (jobStatusListeners.size() > 0) {
			final long timestamp = System.currentTimeMillis();
			final Throwable serializedError = error == null ? null : new SerializedThrowable(error);

			for (JobStatusListener listener : jobStatusListeners) {
				try {
					listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);
				} catch (Throwable t) {
					LOG.warn("Error while notifying JobStatusListener", t);
				}
			}
		}
	}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-11-09 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是观察者模式
  • 观察者模式实现
  • Flink 是如何实现观察者模式的
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档