观察者模式是一种行为设计模式,它允许对象(观察者)订阅另一个对象(主题)的状态变化,当主题状态改变时,会自动通知所有观察者。这种模式在事件处理、状态监控等场景中非常有用。
下面是一个Python实现观察者模式的示例:
# 主题/可观察者接口
class Subject:
def __init__(self):
self._observers = [] # 存储所有观察者
def attach(self, observer):
"""添加观察者"""
if observer not in self._observers:
self._observers.append(observer)
def detach(self, observer):
"""移除观察者"""
try:
self._observers.remove(observer)
except ValueError:
pass
def notify(self, *args, **kwargs):
"""通知所有观察者"""
for observer in self._observers:
observer.update(*args, **kwargs)
# 具体主题实现
class ConcreteSubject(Subject):
def __init__(self):
super().__init__()
self._state = None # 主题的状态
@property
def state(self):
return self._state
@state.setter
def state(self, value):
self._state = value
# 状态改变时通知所有观察者
self.notify(self._state)
# 观察者接口
class Observer:
def update(self, *args, **kwargs):
"""当主题状态改变时调用"""
raise NotImplementedError("子类必须实现update方法")
# 具体观察者1
class ConcreteObserver1(Observer):
def update(self, *args, **kwargs):
print(f"观察者1接收到更新: {args[0]}")
# 具体观察者2
class ConcreteObserver2(Observer):
def update(self, *args, **kwargs):
print(f"观察者2接收到更新: {args[0]}")
# 使用示例
if __name__ == "__main__":
# 创建主题
subject = ConcreteSubject()
# 创建观察者
observer1 = ConcreteObserver1()
observer2 = ConcreteObserver2()
# 注册观察者
subject.attach(observer1)
subject.attach(observer2)
# 改变主题状态,会自动通知观察者
subject.state = "状态1"
subject.state = "状态2"
# 移除一个观察者
subject.detach(observer1)
print("\n移除观察者1后:")
subject.state = "状态3"attach()方法添加观察者detach()方法移除观察者notify()方法通知所有观察者notify()方法通知观察者update()方法,用于接收主题的通知这种实现方式的优点是:
你可以根据实际需求扩展这个基础实现,例如添加更复杂的状态传递、异步通知等功能。