1. 准备环境
docker pull elasticsearch:6.7.0
docker pull kibana:6.7.0
docker run -it --name elasticsearch -d -p 9200:9200 -p 9300:9300 elasticsearch:6.7.0
docker run --link elasticsearch -p 5601:5601 --name kibana -d kibana:6.7.0
2. java代码
2.1 pom引入
<elasticsearch.version>6.3.1</elasticsearch.version>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
2.2 数据写入代码
//es
TransportClient client = new PreBuiltTransportClient(
Settings.builder()
.put("cluster.name", "docker-cluster")
.build()
).addTransportAddress(
new TransportAddress(InetAddress.getByName("localhost"), 9300)
);
if (null == client) {
LOG.error("es --> client is failed!");
}
List<InData> dataList = new ArrayList<>();
for(int i=100; i <= 110; i++) {
InData data = new InData();
// final String md5AsHex = MD5Hash.getMD5AsHex(Bytes.toBytes("flag_"));
// String newRowKey = md5AsHex + "^" + MD5Hash.getMD5AsHex(Bytes.toBytes(i));;
// System.out.println(newRowKey);
// LOG.error(newRowKey);
data.setNewRowKey("flag_"+i);
data.setFlag("flag_"+i);
data.setQuery("query_"+i);
data.setCost(i+"");
data.setConversionvalue(i+"");
dataList.add(data);
}
if (dataList.size() > 0) {
for (MysqlData data: dataList) {
IndicesExistsRequest request = new IndicesExistsRequest(data.getNewRowKey());
IndicesExistsResponse response = client.admin().indices().exists(request).actionGet();
if (!response.isExists()) {
try {
client.prepareIndex(data.getNewRowKey(), data.getNewRowKey(), data.getNewRowKey())
.setSource(XContentFactory.jsonBuilder().startObject().field("flag", data.getFlag())
.field("query", data.getQuery())
.field("cost", data.getCost())
.field("value", data.getConversionvalue())
.endObject()
)
.execute()
.get();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
展示查看:
也可以使用批量的方式数据写入:
BulkRequestBuilder bulkRequest = client.prepareBulk();
if (dataList.size() > 0) {
for (InData data: dataList) {
//搜索词, cost, value
bulkRequest.add(client.prepareIndex(data.getNewRowKey(), data.getNewRowKey(), data.getNewRowKey())
.setSource(XContentFactory.jsonBuilder().startObject()
.field("flag", data.getFlag())
.field("query", data.getQuery())
.field("cost", data.getCost())
.field("value", data.getConversionvalue())
.endObject()
)
);
}
}
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
LOG.error("es --> result is failed! count: " + dataList.size());
}
2.3 通过索引模糊查询
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.indices("flag_*");
Map<String, IndexStats> stats=client.admin().indices().stats(indicesStatsRequest).actionGet().getIndices();
String[] index = new String[stats.size()];
final String[] indexes = stats.keySet().toArray(index);
// for(IndexStats stat:stats.values()) {
//获取索引值
// String index = stat.getIndex();
// System.out.println(index);
// }
SearchResponse response = client.prepareSearch(indexes)// 索引名
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH) // 设置查询类型,精确查询
.execute().actionGet();
final SearchHits hits = response.getHits();
System.out.println("查询结果有:" + hits.getTotalHits() + "条");
Iterator<SearchHit> iterator = hits.iterator();
while (iterator.hasNext()) {
final SearchHit next = iterator.next();
final Map<String, Object> map = next.getSourceAsMap();
//获取字段值
System.out.println(map.get("cost"));
}
附完整代码:
import com.am.dads.comm.InData;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.util.*;
public class ESWriteTest {
private static final Logger LOG = LoggerFactory.getLogger(ESWriteTest.class);
public static void main(String[] args) throws Exception {
//es
TransportClient client = new PreBuiltTransportClient(
Settings.builder()
.put("cluster.name", "docker-cluster")
.build()
).addTransportAddress(
new TransportAddress(InetAddress.getByName("localhost"), 9300)
);
if (null == client) {
LOG.error("es --> client is failed!");
}
List<InData> dataList = new ArrayList<>();
for(int i=100; i <= 110; i++) {
InData data = new InData();
// final String md5AsHex = MD5Hash.getMD5AsHex(Bytes.toBytes("flag_"));
// String newRowKey = md5AsHex + "^" + MD5Hash.getMD5AsHex(Bytes.toBytes(i));;
// System.out.println(newRowKey);
// LOG.error(newRowKey);
data.setNewRowKey("flag_"+i);
data.setFlag("flag_"+i);
data.setQuery("query_"+i);
data.setCost(i+"");
data.setConversionvalue(i+"");
dataList.add(data);
}
BulkRequestBuilder bulkRequest = client.prepareBulk();
if (dataList.size() > 0) {
for (InData data: dataList) {
//搜索词, cost, value
bulkRequest.add(client.prepareIndex(data.getNewRowKey(), data.getNewRowKey(), data.getNewRowKey())
.setSource(XContentFactory.jsonBuilder().startObject()
.field("flag", data.getFlag())
.field("query", data.getQuery())
.field("cost", data.getCost())
.field("value", data.getConversionvalue())
.endObject()
)
);
}
}
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
LOG.error("es --> result is failed! count: " + dataList.size());
}
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.indices("flag_*");
Map<String, IndexStats> stats=client.admin().indices().stats(indicesStatsRequest).actionGet().getIndices();
String[] index = new String[stats.size()];
final String[] indexes = stats.keySet().toArray(index);
// for(IndexStats stat:stats.values()) {
//获取索引值
// String index = stat.getIndex();
// System.out.println(index);
// }
SearchResponse response = client.prepareSearch(indexes)// 索引名
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH) // 设置查询类型,精确查询
.execute().actionGet();
final SearchHits hits = response.getHits();
System.out.println("查询结果有:" + hits.getTotalHits() + "条");
Iterator<SearchHit> iterator = hits.iterator();
while (iterator.hasNext()) {
final SearchHit next = iterator.next();
final Map<String, Object> map = next.getSourceAsMap();
//获取字段值
System.out.println(map.get("cost"));
}
client.close();
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。