一、Zookeeper概念简介
1.zookeeper是为别的分布式应用程序提供协调服务的。
2.zookeeper本身就是一个分布式程序,(只要有半数节点存活,就能正常服务。适合奇数节点)
3.提供的服务:主从协调,服务器节点动态上下线,统一配置管理,分布式共享锁,同意名称服务。
4.底层只有两个服务:
管理(存储,读取)用户程序提交的数据
并为用户程序提供数据节点监听服务。
二、zookeeper安装
1:时间同步 ntpdate 1.cn.pool.ntp.org
2:上传文件,解压
tar -zxvf zookeeper-3.4.5.tar.gz -C /root/app/
3:修改zoo.cfg(拷贝zoo_sample.cfg)
dataDir=/root/app/zookeeper*/data
server.1=hadoop01:2888:3888
server.2=hadoop02:2888:3888
server.3=hadoop03:2888:3888
4:创建文件夹 /root/app/zookeeper*/data
在data文件夹下面创建myid文件 echo “1” > myid
5:分发 对应好 scp -r /root/app/zookeeper hadoop02:/root/app/
6:启动 bin/zkServer.sh start|status|stop 配置环境变量
三、zookeeper结构和命令
3.1 zookeeper特性
1、Zookeeper:一个leader,多个follower组成的集群
2、全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据都是一致的
3、分布式读写,更新请求转发,由leader实施
4、更新请求顺序进行,来自同一个client的更新请求按其发送顺序依次执行
5、数据更新原子性,一次数据更新要么成功,要么失败
6、实时性,在一定时间范围内,client能读到最新数
3.2 zookeeper数据结构
1、层次化的目录结构,命名符合常规文件系统规范
2、每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
3、节点Znode可以包含数据和子节点(但是EPHEMERAL类型的节点不能有子节点,下一页详细讲解)
4、客户端应用可以在节点上设置监视器
3.3 节点类型
1、Znode有两种类型:
短暂(ephemeral)(断开连接自己删除)
持久(persistent)(断开连接不删除)
2、Znode有四种形式的目录节点(默认是persistent )
PERSISTENT
PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )
EPHEMERAL
EPHEMERAL_SEQUENTIAL
3、创建znode时设置顺序标识,znode名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护
4、在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序
3.4 zookeeper命令行操作
运行zkCli.sh start 进入命令行工具
1. ls / 查看当前zookeeper所包含的内容
2. create /zk "hello" 创建一个新的节点
3. get /zk 得到数据
4. set /zk "hi" 设置数据
5. delete /zk 删除节点
四、JavaAPI操作zookeeper
4.1 增删改查
package Zookeeper.demo;
import java.io.IOException;
import java.util.List;
import javax.xml.bind.annotation.XmlList;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
public class FirstTest {
private static final String connectString = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
private int sessionTimeout = 2000;
ZooKeeper zkClient = null;
@Before
public void init() throws Exception{
zkClient = new ZooKeeper(connectString, sessionTimeout, null);
}
/**
* data insert
* @throws Exception
*/
@Test
public void testCreate() throws Exception{
zkClient.create("/w", "eclipse".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
/**
* get child data
*/
@Test
public void getChildren() throws Exception{
List<String> list = zkClient.getChildren("/", false);
for (String string : list) {
System.out.println(string);
}
}
/**
* the data isExits?
*/
@Test
public void testExits() throws Exception{
Stat exists = zkClient.exists("/a", false);
System.out.println(exists);
}
/**
* get data
* @throws Exception
*/
@Test
public void testGetData() throws Exception{
byte[] data = zkClient.getData("/a", false, null);
System.out.println(new String(data));
}
/**
* delete data
* @throws Exception
* @throws InterruptedException
*/
@Test
public void deleteData() throws InterruptedException, Exception{
zkClient.delete("/eclipse0000000002", -1);
}
/**
* update data
* @throws Exception
* @throws KeeperException
*/
@Test
public void setData() throws KeeperException, Exception{
zkClient.setData("/w", "hello".getBytes(), -1);
}
/**
* listener
* @throws Exception
* @throws KeeperException
*/
@Test
public void watch() throws KeeperException, Exception{
List<String> list = zkClient.getChildren("/", true);
for (String string : list) {
System.out.println(string);
}
Thread.sleep(2*60*1000);
}
/**
* release source
* @throws Exception
*/
@After
public void close() throws Exception{
zkClient.close();
}
}
4.2 模拟server-client(运用监视器功能)
server端
package Zookeeper.demo;
import java.awt.List;
import java.util.ArrayList;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
/**
* 1.注册
* 2.遍历servers
* 3.监控
* @author hasee
*
*/
public class DistributedClient {
private static final String connectString= "hadoop01:2181,hadoop02:2181,hadoop03:2181";
private static final int sessionTimeout = 2000;
ZooKeeper zkClient = null;
private static final String parentName = "/servers";
private volatile ArrayList<String> list ;
public void connect() throws Exception{
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher(){
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
try {
getList();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
}
public void getList() throws Exception{
java.util.List<String> list2 = zkClient.getChildren(parentName, true);
ArrayList<String> serverList = new ArrayList<>();
for (String key : list2) {
byte[] data = zkClient.getData(parentName+"/"+key, false, null);
serverList.add(new String(data));
}
list = serverList;
System.out.println(list);
}
public void handleServer() throws Exception{
System.out.println("zkClient is running");
Thread.sleep(10000000);
}
public static void main(String[] args) throws Exception {
DistributedClient client = new DistributedClient();
client.connect();
client.getList();
client.handleServer();
}
}
client端
package Zookeeper.demo;
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class DistributedServer {
private static final String connectString= "hadoop01:2181,hadoop02:2181,hadoop03:2181";
private static final int sessionTimeout = 2000;
ZooKeeper zkClient = null;
private static final String parentName = "/servers";
public void connect() throws Exception{
zkClient = new ZooKeeper(connectString, sessionTimeout, null);
}
public void register(String hostName) throws Exception{
String create = zkClient.create(parentName+"/server", hostName.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostName +" is registed in "+create);
}
public void Server(String hostName) throws Exception{
System.out.println(hostName + "is working");
Thread.sleep(20000);
}
public static void main(String[] args) throws Exception {
DistributedServer server = new DistributedServer();
server.connect();
server.register(args[0]);
server.Server(args[0]);
}
}