Loading oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java +34 −14 Original line number Diff line number Diff line Loading @@ -18,28 +18,47 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch; import java.io.IOException; import java.util.*; import org.apache.commons.lang3.StringUtils; import org.apache.http.*; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.entity.ContentType; import org.apache.http.nio.entity.NStringEntity; import org.apache.skywalking.oap.server.library.client.Client; import org.elasticsearch.action.admin.indices.create.*; import org.elasticsearch.action.admin.indices.delete.*; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.get.*; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.*; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.*; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.*; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.slf4j.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; /** * @author peng-yongsheng Loading Loading @@ -93,7 +112,7 @@ public class ElasticSearchClient implements Client { request.settings(settings); request.mapping(TYPE, mappingBuilder); CreateIndexResponse response = client.indices().create(request); logger.info("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); return response.isAcknowledged(); } Loading @@ -102,7 +121,7 @@ public class ElasticSearchClient implements Client { DeleteIndexRequest request = new DeleteIndexRequest(indexName); DeleteIndexResponse response; response = client.indices().delete(request); logger.info("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); logger.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); return response.isAcknowledged(); } Loading Loading @@ -177,10 +196,11 @@ public class ElasticSearchClient implements Client { "}"; HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON); Response response = client.getLowLevelClient().performRequest("POST", "/" + indexName + "/_delete_by_query", params, entity); logger.debug("delete indexName: {}, jsonString : {}", indexName, jsonString); return response.getStatusLine().getStatusCode(); } private String formatIndexName(String indexName) { public String formatIndexName(String indexName) { if (StringUtils.isNotEmpty(namespace)) { return namespace + "_" + indexName; } Loading oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java +6 −3 Original line number Diff line number Diff line Loading @@ -18,12 +18,14 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import java.io.IOException; import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * @author peng-yongsheng */ Loading @@ -37,9 +39,10 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO { @Override public void deleteHistory(String modelName, String timeBucketColumnName, Long timeBucketBefore) throws IOException { int statusCode = getClient().delete(modelName, timeBucketColumnName, timeBucketBefore); ElasticSearchClient client = getClient(); int statusCode = client.delete(modelName, timeBucketColumnName, timeBucketBefore); if (logger.isDebugEnabled()) { logger.debug("Delete history from {} index, status code {}", modelName, statusCode); logger.debug("Delete history from {} index, status code {}", client.formatIndexName(modelName), statusCode); } } } oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java +13 −8 Original line number Diff line number Diff line Loading @@ -18,16 +18,21 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import java.io.IOException; import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller; import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.*; import org.slf4j.*; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * @author peng-yongsheng Loading Loading @@ -81,9 +86,9 @@ public class StorageEsInstaller extends ModelInstaller { Settings settings = createSettingBuilder(); try { mappingBuilder = createMappingBuilder(tableDefine); logger.info("index {}'s mapping builder str: {}", tableDefine.getName(), Strings.toString(mappingBuilder.prettyPrint())); logger.info("index {}'s mapping builder str: {}", esClient.formatIndexName(tableDefine.getName()), Strings.toString(mappingBuilder.prettyPrint())); } catch (Exception e) { logger.error("create {} index mapping builder error, error message: {}", tableDefine.getName(), e.getMessage()); logger.error("create {} index mapping builder error, error message: {}", esClient.formatIndexName(tableDefine.getName()), e.getMessage()); } boolean isAcknowledged; Loading @@ -92,10 +97,10 @@ public class StorageEsInstaller extends ModelInstaller { } catch (IOException e) { throw new StorageException(e.getMessage()); } logger.info("create {} index finished, isAcknowledged: {}", tableDefine.getName(), isAcknowledged); logger.info("create {} index finished, isAcknowledged: {}", esClient.formatIndexName(tableDefine.getName()), isAcknowledged); if (!isAcknowledged) { throw new StorageException("create " + tableDefine.getName() + " index failure, "); throw new StorageException("create " + esClient.formatIndexName(tableDefine.getName()) + " index failure, "); } } Loading Loading
oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java +34 −14 Original line number Diff line number Diff line Loading @@ -18,28 +18,47 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch; import java.io.IOException; import java.util.*; import org.apache.commons.lang3.StringUtils; import org.apache.http.*; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.entity.ContentType; import org.apache.http.nio.entity.NStringEntity; import org.apache.skywalking.oap.server.library.client.Client; import org.elasticsearch.action.admin.indices.create.*; import org.elasticsearch.action.admin.indices.delete.*; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.get.*; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.*; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.*; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.*; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.slf4j.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; /** * @author peng-yongsheng Loading Loading @@ -93,7 +112,7 @@ public class ElasticSearchClient implements Client { request.settings(settings); request.mapping(TYPE, mappingBuilder); CreateIndexResponse response = client.indices().create(request); logger.info("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); return response.isAcknowledged(); } Loading @@ -102,7 +121,7 @@ public class ElasticSearchClient implements Client { DeleteIndexRequest request = new DeleteIndexRequest(indexName); DeleteIndexResponse response; response = client.indices().delete(request); logger.info("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); logger.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged()); return response.isAcknowledged(); } Loading Loading @@ -177,10 +196,11 @@ public class ElasticSearchClient implements Client { "}"; HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON); Response response = client.getLowLevelClient().performRequest("POST", "/" + indexName + "/_delete_by_query", params, entity); logger.debug("delete indexName: {}, jsonString : {}", indexName, jsonString); return response.getStatusLine().getStatusCode(); } private String formatIndexName(String indexName) { public String formatIndexName(String indexName) { if (StringUtils.isNotEmpty(namespace)) { return namespace + "_" + indexName; } Loading
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java +6 −3 Original line number Diff line number Diff line Loading @@ -18,12 +18,14 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import java.io.IOException; import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * @author peng-yongsheng */ Loading @@ -37,9 +39,10 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO { @Override public void deleteHistory(String modelName, String timeBucketColumnName, Long timeBucketBefore) throws IOException { int statusCode = getClient().delete(modelName, timeBucketColumnName, timeBucketBefore); ElasticSearchClient client = getClient(); int statusCode = client.delete(modelName, timeBucketColumnName, timeBucketBefore); if (logger.isDebugEnabled()) { logger.debug("Delete history from {} index, status code {}", modelName, statusCode); logger.debug("Delete history from {} index, status code {}", client.formatIndexName(modelName), statusCode); } } }
oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java +13 −8 Original line number Diff line number Diff line Loading @@ -18,16 +18,21 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base; import java.io.IOException; import org.apache.skywalking.oap.server.core.storage.StorageException; import org.apache.skywalking.oap.server.core.storage.model.*; import org.apache.skywalking.oap.server.core.storage.model.Model; import org.apache.skywalking.oap.server.core.storage.model.ModelColumn; import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller; import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.*; import org.slf4j.*; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * @author peng-yongsheng Loading Loading @@ -81,9 +86,9 @@ public class StorageEsInstaller extends ModelInstaller { Settings settings = createSettingBuilder(); try { mappingBuilder = createMappingBuilder(tableDefine); logger.info("index {}'s mapping builder str: {}", tableDefine.getName(), Strings.toString(mappingBuilder.prettyPrint())); logger.info("index {}'s mapping builder str: {}", esClient.formatIndexName(tableDefine.getName()), Strings.toString(mappingBuilder.prettyPrint())); } catch (Exception e) { logger.error("create {} index mapping builder error, error message: {}", tableDefine.getName(), e.getMessage()); logger.error("create {} index mapping builder error, error message: {}", esClient.formatIndexName(tableDefine.getName()), e.getMessage()); } boolean isAcknowledged; Loading @@ -92,10 +97,10 @@ public class StorageEsInstaller extends ModelInstaller { } catch (IOException e) { throw new StorageException(e.getMessage()); } logger.info("create {} index finished, isAcknowledged: {}", tableDefine.getName(), isAcknowledged); logger.info("create {} index finished, isAcknowledged: {}", esClient.formatIndexName(tableDefine.getName()), isAcknowledged); if (!isAcknowledged) { throw new StorageException("create " + tableDefine.getName() + " index failure, "); throw new StorageException("create " + esClient.formatIndexName(tableDefine.getName()) + " index failure, "); } } Loading