请先参考Riak - 安装运维篇(1)安装部署并启动Riak集群(3个Node)。 Riak默认有两种端口,一种是protobuf端口,还有一种是HTTP Restful端口。 以前的Riak client java API会支持两种端口。最新的Riak client Java API作了很多改造,比如说利用netty4作为网络通信框架,简化了API代码,并且只支持Protobuf端口。理由如下:
我们之后主要使用riak client2.0.5和Riak的protobuf端口进行开发使用Riak客户端,在某些情况下,会穿插一些Restful端口使用。因为某些功能在riak client2.0.5还未实现或者实现的不完整:
建立maven项目,添加如下依赖:
<properties>
<netty_version>5.0.0.Alpha2netty_version>
<riak-client_version>2.0.5riak-client_version>
<jackson-core_version>2.7.3jackson-core_version>
<slf4j_version>1.7.2slf4j_version>
properties>
<dependencies>
<dependency>
<groupId>com.basho.riakgroupId>
<artifactId>riak-clientartifactId>
<version>${riak-client_version}version>
dependency>
<dependency>
<groupId>io.nettygroupId>
<artifactId>netty-allartifactId>
<version>${netty_version}version>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
<version>${slf4j_version}version>
dependency>
<dependency>
<groupId>com.fasterxml.jackson.coregroupId>
<artifactId>jackson-coreartifactId>
<version>${jackson-core_version}version>
dependency>
dependencies>
之后新建ClusterRiakClient类:
package io.timberwolf.cache;
import com.basho.riak.client.api.RiakClient;
import com.basho.riak.client.api.cap.Quorum;
import com.basho.riak.client.api.commands.kv.FetchValue;
import com.basho.riak.client.api.commands.kv.StoreValue;
import com.basho.riak.client.core.RiakCluster;
import com.basho.riak.client.core.RiakNode;
import com.basho.riak.client.core.query.Location;
import com.basho.riak.client.core.query.Namespace;
import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.concurrent.ExecutionException;
/**
* The client class of Riak
*
* @author Hash Zhang
* @version 0.0.0
* @see @https://github.com/basho/riak-java-client/wiki/RiakClient-%26-Cluster-Node-Builders-%28v2.0%29
*/
//implements AutoCloseable是为了利用Java7的新特性,try-with-resources
public class ClusterRiakClient implements AutoCloseable{
private final static int DEFAULTMAXCONNECTIONS = 50;
private final static int DEFAULTMINCONNECTIONS = 10;
private final static int RETRIES = 5;
private int maxConnections = DEFAULTMAXCONNECTIONS;
private int minConnections = DEFAULTMINCONNECTIONS;
private int retries = DEFAULTMINCONNECTIONS;
private RiakClient riakClient;
/**
* 构建Riak集群的客户端
*
* @param hosts (hosts格式:127.0.0.1:10017,127.0.0.1:10027,127.0.0.1:10037)
* @throws UnknownHostException
*/
public ClusterRiakClient(String hosts) throws UnknownHostException {
String[] addresses = hosts.split(",");
//使用RiakNode Builder用于构建每个RiakNode
RiakNode.Builder riakNodeBuilder = new RiakNode.Builder()
.withMinConnections(minConnections)
.withMaxConnections(maxConnections);
//构建每个RiakNode并保存在list中
LinkedList nodes = new LinkedList();
for (int i = 0; i < addresses.length; i++) {
int j = addresses[i].indexOf(":");
nodes.add(riakNodeBuilder.withRemoteAddress(addresses[i].substring(0, j))
.withRemotePort(Integer.parseInt(addresses[i].substring(j + 1))).build());
}
//构建Riak集群并启动,注意,必须调用start()
RiakCluster riakCliuster = RiakCluster.builder(nodes).withExecutionAttempts(retries).build();
riakCliuster.start();
riakClient = new RiakClient(riakCliuster);
}
public void close() throws Exception {
riakClient.shutdown();
}
}
将控制台(Console)日志级别设为debug,编写测试main:
public static void main(String[] args) throws Exception {
ClusterRiakClient clusterRiakClient = new ClusterRiakClient("10.202.44.206:10017,10.202.44.206:10027,10.202.44.206:10037");
clusterRiakClient.close();
}
运行,查看控制台输出
04-21 08:38:40.061 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10017
04-21 08:38:40.147 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10027
04-21 08:38:40.173 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10037
04-21 08:38:40.174 INFO [main] (RiakCluster.java:142) -RiakCluster is starting.
04-21 08:38:40.175 INFO [main] (RiakCluster.java:149) -RiakCluster is shutting down.
04-21 08:38:40.677 INFO [pool-1-thread-2] (RiakCluster.java:428) -All operations have completed
04-21 08:38:40.677 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10017
04-21 08:38:40.677 INFO [pool-1-thread-1] (RiakCluster.java:416) -Retrier shutting down.
04-21 08:38:40.677 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10017
04-21 08:38:40.683 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10027
04-21 08:38:40.683 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10027
04-21 08:38:40.685 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10037
04-21 08:38:40.685 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10037
04-21 08:38:40.688 INFO [pool-1-thread-1] (RiakCluster.java:305) -RiakCluster has shut down
可以看出,Riak集群连接成功,并成功关闭连接。 如果日志级别为Debug,你可以看出,Riak客户端使用了Netty客户端连接的Riak集群 这里,Riakclient和RiakCluster还有实际的Riak集群之间的关系如下图所示:
Riak是一种键值的存储方式,所以必须提供键值对才能保存数据。为了防止键冲突,Riak运用了桶的概念,用户可以将键放入不同的桶中(相当于键的namespace)
首先,我们建立一个简单的POJO类,保存快递员的基本信息。
/**
* User POJO
*
* @author Hash Zhang
* @version 0.0.0
*/
public class User {
private String username;
private String password;
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
在ClusterRiakClient中添加如下方法:
/**
* * 插入POJO对象
*
* @param bucket 桶
* @param key 键
* @param value 值
* @throws ExecutionException
* @throws InterruptedException
* @throws JsonProcessingException
*/
public void set(String bucket, String key, Object value) throws ExecutionException, InterruptedException, JsonProcessingException {
Location location = new Location(new Namespace(bucket), key);
RiakObject riakObject = new RiakObject();
riakObject.setValue(BinaryValue.create(OBJECT_MAPPER.writeValueAsBytes(value)));
StoreValue sv = new StoreValue.Builder(riakObject).withLocation(location).build();
StoreValue.Response svResponse = this.riakClient.execute(sv);
}
/**
* 取得POJO对象
*
* @param bucket 桶
* @param key 键
* @return FetchValue.Response
* @throws ExecutionException
* @throws InterruptedException
*/
public RiakObject get(String bucket, String key) throws ExecutionException, InterruptedException {
Location location = new Location(new Namespace(bucket), key);
FetchValue fv = new FetchValue.Builder(location).build();
return this.riakClient.execute(fv).getValue(RiakObject.class);
}
Location通过Bucket和Key确定唯一位置。编写测试main
public static void main(String[] args) throws Exception {
ClusterRiakClient clusterRiakClient = new ClusterRiakClient("10.202.44.206:10017,10.202.44.206:10027,10.202.44.206:10037");
User user1 = new User();
user1.setUsername("zhxhash").setPassword("123456");
clusterRiakClient.set("users", "user1", user1);
RiakObject riakObject = clusterRiakClient.get("users", "user1");
User getUser1 = OBJECT_MAPPER.readValue(riakObject.getValue().getValue(),User.class);
System.out.println(getUser1.getUsername()+"|"+getUser1.getPassword());
clusterRiakClient.close();
}
结果:
04-21 09:31:03.213 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10017
04-21 09:31:03.240 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10027
04-21 09:31:03.264 INFO [main] (RiakNode.java:282) -RiakNode started; 10.202.44.206:10037
04-21 09:31:03.266 INFO [main] (RiakCluster.java:142) -RiakCluster is starting.
zhxhash|123456
04-21 09:31:04.048 INFO [main] (RiakCluster.java:149) -RiakCluster is shutting down.
04-21 09:31:04.549 INFO [pool-1-thread-2] (RiakCluster.java:428) -All operations have completed
04-21 09:31:04.549 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10017
04-21 09:31:04.549 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10017
04-21 09:31:04.555 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10027
04-21 09:31:04.556 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10027
04-21 09:31:04.558 INFO [pool-1-thread-2] (RiakNode.java:291) -RiakNode shutting down; 10.202.44.206:10037
04-21 09:31:04.558 INFO [pool-1-thread-2] (DefaultNodeManager.java:159) -NodeManager removed node due to it shutting down; 10.202.44.206:10037
04-21 09:31:04.562 INFO [pool-1-thread-2] (RiakCluster.java:305) -RiakCluster has shut down
04-21 09:31:04.564 INFO [pool-1-thread-1] (RiakCluster.java:416) -Retrier shutting down.
以上是比较简单的增加和查询的实现,还有异步实现的方式。
/**
* 异步插入POJO对象
*
* @param bucket 桶
* @param key 键
* @param value 值
* @return RiakFuture
* @throws JsonProcessingException
*/
public RiakFuture asyncSet(String bucket, String key, Object value) throws JsonProcessingException {
Location location = new Location(new Namespace(bucket), key);
RiakObject riakObject = new RiakObject();
riakObject.setValue(BinaryValue.create(OBJECT_MAPPER.writeValueAsBytes(value)));
StoreValue sv = new StoreValue.Builder(riakObject).withLocation(location).build();
return this.riakClient.executeAsync(sv);
}
/**
* 异步取得POJO对象
*
* @param bucket
* @param key
* @return RiakFuture
*/
public RiakFuture asyncGet(String bucket, String key) {
Location location = new Location(new Namespace(bucket), key);
FetchValue fv = new FetchValue.Builder(location).build();
return this.riakClient.executeAsync(fv);
}
测试代码:
public static void main(String[] args) throws Exception {
final ClusterRiakClient clusterRiakClient = new ClusterRiakClient("10.202.44.206:10017,10.202.44.206:10027,10.202.44.206:10037");
final CountDownLatch countDownLatch = new CountDownLatch(1);
Thread thread = new Thread() {
@Override
public void run() {
User user1 = new User();
user1.setUsername("zhxhash").setPassword("123456");
RiakFuture.Response, Location> riakFuture = null;
while(true) {
try {
riakFuture = clusterRiakClient.asyncSet("users", "user1", user1);
riakFuture.await(100L, TimeUnit.MILLISECONDS);
} catch (JsonProcessingException | InterruptedException e) {
e.printStackTrace();
}
if (riakFuture.isDone() && riakFuture.isSuccess())
{
countDownLatch.countDown();
break;
}
}
}
};
thread.start();
countDownLatch.await();
RiakFuture.Response, Location> riakFuture = null;
while(true) {
try {
riakFuture = clusterRiakClient.asyncGet("users", "user1");
riakFuture.await(100L, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (riakFuture.isDone() && riakFuture.isSuccess())
{
RiakObject riakObject = riakFuture.get().getValue(RiakObject.class);
User getUser1 = OBJECT_MAPPER.readValue(riakObject.getValue().getValue(),User.class);
System.out.println(getUser1.getUsername()+"|"+getUser1.getPassword());
break;
}
}
clusterRiakClient.close();
}
输出结果和之前同步的应该一样。 我们还可以删除某个键值对,同样的,有同步和异步两种实现:
/**
* 删除一个键值对
*
* @param bucket 桶
* @param key 键
* @throws ExecutionException
* @throws InterruptedException
*/
public void remove(String bucket, String key) throws ExecutionException, InterruptedException {
Location location = new Location(new Namespace(bucket), key);
DeleteValue dv = new DeleteValue.Builder(location).build();
this.riakClient.execute(dv);
}
/**
* 异步删除一个键值对
*
* @param bucket 桶
* @param key 键
* @return RiakFuture
*/
public RiakFuture asyncRemove(String bucket, String key){
Location location = new Location(new Namespace(bucket), key);
DeleteValue dv = new DeleteValue.Builder(location).build();
return this.riakClient.executeAsync(dv);
}
测试:
public static void main(String[] args) throws Exception {
ClusterRiakClient clusterRiakClient = new ClusterRiakClient("10.202.44.206:10017,10.202.44.206:10027,10.202.44.206:10037");
clusterRiakClient.remove("users", "user1");
RiakObject riakObject = clusterRiakClient.get("users", "user1");
if (riakObject!=null) {
User getUser1 = OBJECT_MAPPER.readValue(riakObject.getValue().getValue(), User.class);
System.out.println(getUser1.getUsername() + "|" + getUser1.getPassword());
} else{
System.out.println("Not Found");
}
clusterRiakClient.close();
}
更新比较特殊,我们放到之后的章节去说 取得一个bucket下的所有键:
/**
* 取得一个bucket下所有键
*
* @param bucket 桶
* @return List
* @throws ExecutionException
* @throws InterruptedException
*/
public List getKeys(String bucket) throws ExecutionException, InterruptedException {
Namespace ns = new Namespace(bucket);
ListKeys lk = new ListKeys.Builder(ns).build();
ListKeys.Response response = this.riakClient.execute(lk);
List keys = new LinkedList<>();
for (Location l : response)
{
keys.add(l.getKeyAsString());
}
return keys;
}