/**
* @description kudu测试demo
* @author IT云清
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class KuduTest {
private static Logger logger = LoggerFactory.getLogger(KuduTest.class);
private KuduClient kuduClient;
private String kuduMaster;
private String tableName;
@Before
public void init(){
kuduMaster = "";
tableName = "user";
KuduClientBuilder kuduClientBuilder = new KuduClientBuilder(kuduMaster);
kuduClientBuilder.defaultSocketReadTimeoutMs(10000);
kuduClient = kuduClientBuilder.build();
}
@Test
public void testCreateTable() throws KuduException {
if(!kuduClient.tableExists(tableName)){
ArrayList<ColumnSchema> columnSchemas = new ArrayList<>();
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name",Type.STRING).build());
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("date",Type.UNIXTIME_MICROS).build());//日期待定
columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("money",Type.DOUBLE).build());//小数待定
Schema schema = new Schema(columnSchemas);
CreateTableOptions options = new CreateTableOptions();
List<String> partitionList = new ArrayList<>();
//kudu表的分区字段是什么 TODO
partitionList.add("id");
//按照id.hashcode % 分区数 = 分区号
options.addHashPartitions(partitionList,6);
kuduClient.createTable(tableName,schema,options);
}
}
@Test
public void insert() throws KuduException {
KuduSession kuduSession = kuduClient.newSession();
kuduSession.setFlushMode(FlushMode.AUTO_FLUSH_SYNC);
for(int i = 0; i <= 100;i ++){
KuduTable userTable = kuduClient.openTable(tableName);
Insert insert = userTable.newInsert();
PartialRow row = insert.getRow();
row.addInt("id",i);
row.addString("name","wang"+i);
row.addDouble("money",100.342+i);
kuduSession.apply(insert);
}
}
@Test
public void query() throws KuduException {
KuduScannerBuilder kuduScannerBuilder = kuduClient
.newScannerBuilder(kuduClient.openTable(tableName));
List<String> columns = Arrays.asList("id","name","moeny");
kuduScannerBuilder.setProjectedColumnNames(columns);
KuduScanner kuduScanner = kuduScannerBuilder.build();
while (kuduScanner.hasMoreRows()){
RowResultIterator rowResults = kuduScanner.nextRows();
while(rowResults.hasNext()){
RowResult row = rowResults.next();
logger.info("id={},name={},money={}",
row.getInt("id"),
row.getString("name"),
row.getDouble("money"));
}
}
}
@Test
public void update() throws KuduException {
KuduSession kuduSession = kuduClient.newSession();
kuduSession.setFlushMode(FlushMode.AUTO_FLUSH_SYNC);
KuduTable kuduTable = kuduClient.openTable(tableName);
// Update update = kuduTable.newUpdate();
//id存在就修改,不存在就新增
Upsert upsert = kuduTable.newUpsert();
PartialRow row = upsert.getRow();
row.addInt("id",100000);
row.addString("name","yangege");
row.addDouble("money",100.222);
kuduSession.apply(upsert);
}
}