Enhance derby sql to support limit sql. (#12490)
This commit is contained in:
parent
c56c4153eb
commit
7133411bc2
@ -56,6 +56,8 @@ import com.alibaba.nacos.persistence.repository.embedded.operate.BaseDatabaseOpe
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.ModifyRequest;
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.QueryType;
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.SelectRequest;
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.limiter.SqlLimiter;
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.limiter.SqlTypeLimiter;
|
||||
import com.alibaba.nacos.persistence.utils.PersistenceExecutor;
|
||||
import com.alibaba.nacos.sys.utils.DiskUtils;
|
||||
import com.google.protobuf.ByteString;
|
||||
@ -171,11 +173,14 @@ public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implemen
|
||||
|
||||
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
|
||||
|
||||
private final SqlLimiter sqlLimiter;
|
||||
|
||||
public DistributedDatabaseOperateImpl(ServerMemberManager memberManager, ProtocolManager protocolManager)
|
||||
throws Exception {
|
||||
this.memberManager = memberManager;
|
||||
this.protocol = protocolManager.getCpProtocol();
|
||||
init();
|
||||
this.sqlLimiter = new SqlTypeLimiter();
|
||||
}
|
||||
|
||||
protected void init() throws Exception {
|
||||
@ -471,6 +476,7 @@ public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implemen
|
||||
try {
|
||||
selectRequest = serializer.deserialize(request.getData().toByteArray(), SelectRequest.class);
|
||||
LoggerUtils.printIfDebugEnabled(LOGGER, "getData info : selectRequest : {}", selectRequest);
|
||||
sqlLimiter.doLimitForSelectRequest(selectRequest);
|
||||
final RowMapper<Object> mapper = RowMapperManager.getRowMapper(selectRequest.getClassName());
|
||||
final byte type = selectRequest.getQueryType();
|
||||
switch (type) {
|
||||
@ -518,6 +524,7 @@ public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implemen
|
||||
lock.lock();
|
||||
try {
|
||||
List<ModifyRequest> sqlContext = serializer.deserialize(byteString.toByteArray(), List.class);
|
||||
sqlLimiter.doLimitForModifyRequest(sqlContext);
|
||||
boolean isOk = false;
|
||||
if (log.containsExtendInfo(DATA_IMPORT_KEY)) {
|
||||
isOk = doDataImport(jdbcTemplate, sqlContext);
|
||||
|
@ -24,6 +24,8 @@ import com.alibaba.nacos.persistence.configuration.condition.ConditionStandalone
|
||||
import com.alibaba.nacos.persistence.datasource.DataSourceService;
|
||||
import com.alibaba.nacos.persistence.datasource.DynamicDataSource;
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.ModifyRequest;
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.limiter.SqlLimiter;
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.limiter.SqlTypeLimiter;
|
||||
import com.alibaba.nacos.sys.utils.DiskUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -54,10 +56,16 @@ public class StandaloneDatabaseOperateImpl implements BaseDatabaseOperate {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(StandaloneDatabaseOperateImpl.class);
|
||||
|
||||
private final SqlLimiter sqlLimiter;
|
||||
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
private TransactionTemplate transactionTemplate;
|
||||
|
||||
public StandaloneDatabaseOperateImpl() {
|
||||
this.sqlLimiter = new SqlTypeLimiter();
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
protected void init() {
|
||||
DataSourceService dataSourceService = DynamicDataSource.getInstance().getDataSource();
|
||||
@ -107,6 +115,7 @@ public class StandaloneDatabaseOperateImpl implements BaseDatabaseOperate {
|
||||
while (iterator.hasNext()) {
|
||||
String sql = iterator.next();
|
||||
if (StringUtils.isNotBlank(sql)) {
|
||||
sqlLimiter.doLimit(sql);
|
||||
batchUpdate.add(sql);
|
||||
}
|
||||
if (batchUpdate.size() == batchSize || !iterator.hasNext()) {
|
||||
|
@ -0,0 +1,79 @@
|
||||
/*
|
||||
* Copyright 1999-2023 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.persistence.repository.embedded.sql.limiter;
|
||||
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.ModifyRequest;
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.SelectRequest;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* SQL limiter.
|
||||
*
|
||||
* @author xiweng.yy
|
||||
*/
|
||||
public interface SqlLimiter {
|
||||
|
||||
/**
|
||||
* Do SQL limit for modify request.
|
||||
*
|
||||
* @param modifyRequest modify request
|
||||
* @throws SQLException when SQL match the limit rule.
|
||||
*/
|
||||
void doLimitForModifyRequest(ModifyRequest modifyRequest) throws SQLException;
|
||||
|
||||
/**
|
||||
* Do SQL limit for modify request.
|
||||
*
|
||||
* @param modifyRequests modify request
|
||||
* @throws SQLException when SQL match the limit rule.
|
||||
*/
|
||||
void doLimitForModifyRequest(List<ModifyRequest> modifyRequests) throws SQLException;
|
||||
|
||||
/**
|
||||
* Do SQL limit for select request.
|
||||
*
|
||||
* @param selectRequest select request
|
||||
* @throws SQLException when SQL match the limit rule.
|
||||
*/
|
||||
void doLimitForSelectRequest(SelectRequest selectRequest) throws SQLException;
|
||||
|
||||
/**
|
||||
* Do SQL limit for select request.
|
||||
*
|
||||
* @param selectRequests select request
|
||||
* @throws SQLException when SQL match the limit rule.
|
||||
*/
|
||||
void doLimitForSelectRequest(List<SelectRequest> selectRequests) throws SQLException;
|
||||
|
||||
/**
|
||||
* Do SQL limit for sql.
|
||||
*
|
||||
* @param sql SQL
|
||||
* @throws SQLException when SQL match the limit rule.
|
||||
*/
|
||||
void doLimit(String sql) throws SQLException;
|
||||
|
||||
/**
|
||||
* Do SQL limit for sql.
|
||||
*
|
||||
* @param sql SQL
|
||||
* @throws SQLException when SQL match the limit rule.
|
||||
*/
|
||||
void doLimit(List<String> sql) throws SQLException;
|
||||
}
|
@ -0,0 +1,146 @@
|
||||
/*
|
||||
* Copyright 1999-2023 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.persistence.repository.embedded.sql.limiter;
|
||||
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.ModifyRequest;
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.SelectRequest;
|
||||
import com.alibaba.nacos.sys.env.EnvUtil;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* SQL Type Limiter, Nacos only allow `INSERT`, `UPDATE`, `DELETE`, `SELECT`, `CREATE SCHEMA`, `CREATE TABLE`, `CREATE
|
||||
* INDEX` and `ALTER TABLE`.
|
||||
*
|
||||
* @author xiweng.yy
|
||||
*/
|
||||
public class SqlTypeLimiter implements SqlLimiter {
|
||||
|
||||
private static final String ENABLED_SQL_LIMIT = "nacos.persistence.sql.derby.limit.enabled";
|
||||
|
||||
private final Set<String> allowedDmlSqls;
|
||||
|
||||
private final Set<String> allowedDdlSqls;
|
||||
|
||||
private final Set<String> allowedDdlScopes;
|
||||
|
||||
private final boolean enabledLimit;
|
||||
|
||||
public SqlTypeLimiter() {
|
||||
this.enabledLimit = EnvUtil.getProperty(ENABLED_SQL_LIMIT, Boolean.class, true);
|
||||
this.allowedDmlSqls = new HashSet<>(4);
|
||||
this.allowedDmlSqls.add("INSERT");
|
||||
this.allowedDmlSqls.add("UPDATE");
|
||||
this.allowedDmlSqls.add("DELETE");
|
||||
this.allowedDmlSqls.add("SELECT");
|
||||
this.allowedDdlSqls = new HashSet<>(2);
|
||||
this.allowedDdlSqls.add("CREATE");
|
||||
this.allowedDdlSqls.add("ALTER");
|
||||
this.allowedDdlScopes = new HashSet<>(3);
|
||||
this.allowedDdlScopes.add("SCHEMA");
|
||||
this.allowedDdlScopes.add("TABLE");
|
||||
this.allowedDdlScopes.add("INDEX");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doLimitForModifyRequest(ModifyRequest modifyRequest) throws SQLException {
|
||||
if (null == modifyRequest || !enabledLimit) {
|
||||
return;
|
||||
}
|
||||
doLimit(modifyRequest.getSql());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doLimitForModifyRequest(List<ModifyRequest> modifyRequests) throws SQLException {
|
||||
if (null == modifyRequests || !enabledLimit) {
|
||||
return;
|
||||
}
|
||||
for (ModifyRequest each : modifyRequests) {
|
||||
doLimitForModifyRequest(each);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doLimitForSelectRequest(SelectRequest selectRequest) throws SQLException {
|
||||
if (null == selectRequest || !enabledLimit) {
|
||||
return;
|
||||
}
|
||||
doLimit(selectRequest.getSql());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doLimitForSelectRequest(List<SelectRequest> selectRequests) throws SQLException {
|
||||
if (null == selectRequests || !enabledLimit) {
|
||||
return;
|
||||
}
|
||||
for (SelectRequest each : selectRequests) {
|
||||
doLimitForSelectRequest(each);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doLimit(String sql) throws SQLException {
|
||||
if (!enabledLimit) {
|
||||
return;
|
||||
}
|
||||
String trimmedSql = sql.trim();
|
||||
if (StringUtils.isEmpty(trimmedSql)) {
|
||||
return;
|
||||
}
|
||||
int firstTokenIndex = trimmedSql.indexOf(" ");
|
||||
if (-1 == firstTokenIndex) {
|
||||
throwException(trimmedSql);
|
||||
}
|
||||
String firstToken = trimmedSql.substring(0, firstTokenIndex).toUpperCase();
|
||||
if (allowedDmlSqls.contains(firstToken)) {
|
||||
return;
|
||||
}
|
||||
if (!allowedDdlSqls.contains(firstToken)) {
|
||||
throwException(trimmedSql);
|
||||
}
|
||||
checkSqlForSecondToken(firstTokenIndex, trimmedSql);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doLimit(List<String> sql) throws SQLException {
|
||||
if (null == sql || !enabledLimit) {
|
||||
return;
|
||||
}
|
||||
for (String each : sql) {
|
||||
doLimit(each);
|
||||
}
|
||||
}
|
||||
|
||||
private void throwException(String sql) throws SQLException {
|
||||
throw new SQLException(String.format("Unsupported SQL: %s. Nacos only support DML and some DDL SQL.", sql));
|
||||
}
|
||||
|
||||
private void checkSqlForSecondToken(int firstTokenIndex, String trimmedSql) throws SQLException {
|
||||
int secondTokenIndex = trimmedSql.indexOf(" ", firstTokenIndex + 1);
|
||||
if (-1 == secondTokenIndex) {
|
||||
secondTokenIndex = trimmedSql.length();
|
||||
}
|
||||
String secondToken = trimmedSql.substring(firstTokenIndex + 1, secondTokenIndex).toUpperCase();
|
||||
if (!allowedDdlScopes.contains(secondToken)) {
|
||||
throwException(trimmedSql);
|
||||
}
|
||||
}
|
||||
}
|
@ -19,6 +19,9 @@ package com.alibaba.nacos.persistence.repository.embedded.operate;
|
||||
import com.alibaba.nacos.common.model.RestResult;
|
||||
import com.alibaba.nacos.persistence.repository.embedded.EmbeddedStorageContextHolder;
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.ModifyRequest;
|
||||
import com.alibaba.nacos.sys.env.EnvUtil;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
@ -28,6 +31,7 @@ import org.mockito.Spy;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.jdbc.core.RowMapper;
|
||||
import org.springframework.mock.env.MockEnvironment;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
import org.springframework.transaction.support.TransactionCallback;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
@ -73,11 +77,22 @@ class StandaloneDatabaseOperateImplTest {
|
||||
@Mock
|
||||
private TransactionTemplate transactionTemplate;
|
||||
|
||||
@BeforeAll
|
||||
static void beforeAll() {
|
||||
MockEnvironment environment = new MockEnvironment();
|
||||
environment.setProperty("nacos.persistence.sql.derby.limit.enabled", "false");
|
||||
EnvUtil.setEnvironment(environment);
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
ReflectionTestUtils.setField(operate, "jdbcTemplate", jdbcTemplate);
|
||||
ReflectionTestUtils.setField(operate, "transactionTemplate", transactionTemplate);
|
||||
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void afterAll() {
|
||||
EnvUtil.setEnvironment(null);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -0,0 +1,134 @@
|
||||
/*
|
||||
* Copyright 1999-2023 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.persistence.repository.embedded.sql.limiter;
|
||||
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.ModifyRequest;
|
||||
import com.alibaba.nacos.persistence.repository.embedded.sql.SelectRequest;
|
||||
import com.alibaba.nacos.sys.env.EnvUtil;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.mock.env.MockEnvironment;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
class SqlTypeLimiterTest {
|
||||
|
||||
SqlTypeLimiter sqlLimiter;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
MockEnvironment environment = new MockEnvironment();
|
||||
EnvUtil.setEnvironment(environment);
|
||||
sqlLimiter = new SqlTypeLimiter();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void tearDown() {
|
||||
EnvUtil.setEnvironment(null);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDoLimitForModifyRequestForDdl() throws SQLException {
|
||||
ModifyRequest createTable = new ModifyRequest("create table test(id int,name varchar(255))");
|
||||
ModifyRequest createIndex = new ModifyRequest("create index test_index on test(id)");
|
||||
ModifyRequest alterTable = new ModifyRequest("alter table test add column age int");
|
||||
List<ModifyRequest> modifyRequests = new LinkedList<>();
|
||||
modifyRequests.add(createTable);
|
||||
modifyRequests.add(createIndex);
|
||||
modifyRequests.add(alterTable);
|
||||
sqlLimiter.doLimitForModifyRequest(modifyRequests);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDoLimitForModifyRequestForDdlForEmptyToken() throws SQLException {
|
||||
ModifyRequest create = new ModifyRequest("create ");
|
||||
assertThrows(SQLException.class, () -> sqlLimiter.doLimitForModifyRequest(create));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDoLimitForModifyRequestForDdlForOneToken() throws SQLException {
|
||||
ModifyRequest create = new ModifyRequest("create");
|
||||
assertThrows(SQLException.class, () -> sqlLimiter.doLimitForModifyRequest(create));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDoLimitForModifyRequestForDdlForInvalidSecondToken() throws SQLException {
|
||||
ModifyRequest create = new ModifyRequest("create xxx");
|
||||
assertThrows(SQLException.class, () -> sqlLimiter.doLimitForModifyRequest(create));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDoLimitForModifyRequestForDml() throws SQLException {
|
||||
ModifyRequest insert = new ModifyRequest("insert into test(id,name) values(1,'test')");
|
||||
ModifyRequest update = new ModifyRequest("update test set name='test' where id=1");
|
||||
ModifyRequest delete = new ModifyRequest("delete from test where id=1");
|
||||
List<ModifyRequest> modifyRequests = new LinkedList<>();
|
||||
modifyRequests.add(insert);
|
||||
modifyRequests.add(update);
|
||||
modifyRequests.add(delete);
|
||||
sqlLimiter.doLimitForModifyRequest(modifyRequests);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDoLimitForModifyRequestForDmlInvalid() throws SQLException {
|
||||
ModifyRequest insert = new ModifyRequest("insert into test(id,name) values(1,'test')");
|
||||
ModifyRequest invalid = new ModifyRequest("CALL SALES.TOTAL_REVENUES()");
|
||||
List<ModifyRequest> modifyRequests = new LinkedList<>();
|
||||
modifyRequests.add(insert);
|
||||
modifyRequests.add(invalid);
|
||||
assertThrows(SQLException.class, () -> sqlLimiter.doLimitForModifyRequest(modifyRequests));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDoLimitForSelectRequest() throws SQLException {
|
||||
SelectRequest selectRequest = SelectRequest.builder().sql("select * from test").build();
|
||||
sqlLimiter.doLimitForSelectRequest(selectRequest);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDoLimitForSelectRequestInvalid() throws SQLException {
|
||||
SelectRequest selectRequest = SelectRequest.builder().sql("select * from test").build();
|
||||
SelectRequest invalid = SelectRequest.builder().sql("CALL SALES.TOTAL_REVENUES()").build();
|
||||
List<SelectRequest> selectRequests = new LinkedList<>();
|
||||
selectRequests.add(selectRequest);
|
||||
selectRequests.add(invalid);
|
||||
assertThrows(SQLException.class, () -> sqlLimiter.doLimitForSelectRequest(selectRequests));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDoLimit() {
|
||||
List<String> sql = new LinkedList<>();
|
||||
sql.add("create table test(id int,name varchar(255))");
|
||||
sql.add("select * from test");
|
||||
sql.add("CALL SALES.TOTAL_REVENUES();");
|
||||
assertThrows(SQLException.class, () -> sqlLimiter.doLimit(sql));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testDoLimitForDisabledLimit() throws SQLException {
|
||||
MockEnvironment environment = new MockEnvironment();
|
||||
environment.setProperty("nacos.persistence.sql.derby.limit.enabled", "false");
|
||||
EnvUtil.setEnvironment(environment);
|
||||
sqlLimiter = new SqlTypeLimiter();
|
||||
sqlLimiter.doLimit("CALL SALES.TOTAL_REVENUES();");
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user