Merge pull request #2759 from chuntaojun/feature_consistency

refactor: fix bugs
This commit is contained in:
liaochuntao 2020-05-08 14:55:30 +08:00 committed by GitHub
commit 79a6f453d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 256 additions and 343 deletions

View File

@ -15,8 +15,8 @@
*/
package com.alibaba.nacos.api.exception;
import com.alibaba.nacos.api.common.Constants;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
/**
* Nacos Exception
@ -63,16 +63,13 @@ public class NacosException extends Exception {
}
public String getErrMsg() {
StringBuilder builder = new StringBuilder();
if (!StringUtils.isBlank(this.errMsg)) {
builder.append("errMsg:").append(errMsg);
return errMsg;
}
if (this.causeThrowable != null) {
builder.append("causeThrowable:[").append(causeThrowable.getClass().getName())
.append("]")
.append(causeThrowable.getMessage());
return causeThrowable.getMessage();
}
return builder.toString();
return Constants.NULL;
}
public void setErrCode(int errCode) {
@ -127,7 +124,7 @@ public class NacosException extends Exception {
*/
public static final int NO_RIGHT = 403;
/**
* not found
* not found
*/
public static final int NOT_FOUND = 404;
/**

View File

@ -111,17 +111,17 @@ public class WatchFileCenter {
if (!CLOSED.compareAndSet(false, true)) {
return;
}
LOGGER.warn("WatchFileCenter start close");
LOGGER.warn("[WatchFileCenter] start close");
for (Map.Entry<String, WatchDirJob> entry : MANAGER.entrySet()) {
LOGGER.warn("start to shutdown this watcher which is watch : " + entry.getKey());
LOGGER.warn("[WatchFileCenter] start to shutdown this watcher which is watch : " + entry.getKey());
try {
entry.getValue().shutdown();
} catch (Throwable e) {
LOGGER.error("WatchFileCenter shutdown has error : {}", e);
LOGGER.error("[WatchFileCenter] shutdown has error : {}", e);
}
}
MANAGER.clear();
LOGGER.warn("WatchFileCenter already closed");
LOGGER.warn("[WatchFileCenter] already closed");
}
public synchronized static boolean deregisterWatcher(final String path, final FileWatcher watcher) {

View File

@ -0,0 +1,32 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.utils;
import org.slf4j.Logger;
/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public final class LoggerUtils {
public static void printIfDebugEnabled(Logger logger, String s, Object... args) {
if (logger.isDebugEnabled()) {
logger.debug(s, args);
}
}
}

View File

@ -42,7 +42,7 @@ public class GlobalExceptionHandler {
@ExceptionHandler(IllegalArgumentException.class)
public ResponseEntity<String> handleIllegalArgumentException(Exception ex) throws IOException {
MetricsMonitor.getIllegalArgumentException().increment();
return ResponseEntity.status(400).body(ExceptionUtil.getStackTrace(ex));
return ResponseEntity.status(400).body(ExceptionUtil.getAllExceptionMsg(ex));
}
/**
@ -53,7 +53,7 @@ public class GlobalExceptionHandler {
@ExceptionHandler(NacosException.class)
public ResponseEntity<String> handleNacosException(NacosException ex) throws IOException {
MetricsMonitor.getNacosException().increment();
return ResponseEntity.status(ex.getErrCode()).body(ExceptionUtil.getStackTrace(ex));
return ResponseEntity.status(ex.getErrCode()).body(ExceptionUtil.getAllExceptionMsg(ex));
}
/**
@ -64,7 +64,7 @@ public class GlobalExceptionHandler {
@ExceptionHandler(DataAccessException.class)
public ResponseEntity<String> handleDataAccessException(DataAccessException ex) throws DataAccessException {
MetricsMonitor.getDbException().increment();
return ResponseEntity.status(500).body(ExceptionUtil.getStackTrace(ex));
return ResponseEntity.status(500).body(ExceptionUtil.getAllExceptionMsg(ex));
}
}

View File

@ -43,7 +43,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
@ -58,7 +57,6 @@ import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.security.AccessControlException;
import java.util.Enumeration;
import java.util.Map;
@ -137,7 +135,7 @@ public class TransferToLeaderFilter implements Filter {
boolean isLeader = protocol.isLeader(Constants.CONFIG_MODEL_RAFT_GROUP);
if (downgrading && isLeader) {
resp.sendError(HttpStatus.LOCKED.value(),
resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
"Unable to process the request at this time: System triggered degradation");
return;
}
@ -145,7 +143,7 @@ public class TransferToLeaderFilter implements Filter {
if (downgrading || (method.isAnnotationPresent(ToLeader.class)
&& !isLeader)) {
if (StringUtils.isBlank(leaderServer)) {
resp.sendError(HttpStatus.LOCKED.value(),
resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
"Unable to process the request at this time: no Leader");
return;
}
@ -187,7 +185,7 @@ public class TransferToLeaderFilter implements Filter {
}
catch (AccessControlException e) {
resp.sendError(HttpServletResponse.SC_FORBIDDEN,
"access denied: " + ExceptionUtil.getStackTrace(e));
"access denied: " + ExceptionUtil.getAllExceptionMsg(e));
return;
}
catch (NoSuchMethodException e) {

View File

@ -24,8 +24,8 @@ import com.alibaba.nacos.config.server.utils.PropertyUtil;
*/
public class DynamicDataSource {
private volatile DataSourceService localDataSourceService = null;
private volatile DataSourceService basicDataSourceService = null;
private DataSourceService localDataSourceService = null;
private DataSourceService basicDataSourceService = null;
private static final DynamicDataSource INSTANCE = new DynamicDataSource();

View File

@ -109,10 +109,8 @@ public class LocalDataSourceServiceImpl implements DataSourceService {
try {
DriverManager.getConnection("jdbc:derby:;shutdown=true");
} catch (Exception e) {
// An error is thrown when the Derby shutdown is executed, which should be ignored
if (!StringUtils.contains(e.getMessage().toLowerCase(), derbyShutdownErrMsg.toLowerCase())) {
if (!StringUtils.containsIgnoreCase(e.getMessage(), derbyShutdownErrMsg)) {
throw e;
}
}

View File

@ -115,7 +115,7 @@ public class DumpService {
dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor);
});
protocol.protocolMetaData()
.ubSubscribe(Constants.CONFIG_MODEL_RAFT_GROUP,
.unSubscribe(Constants.CONFIG_MODEL_RAFT_GROUP,
com.alibaba.nacos.consistency.cp.Constants.LEADER_META_DATA,
this);
}

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.config.server.service.repository;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.config.server.service.sql.ModifyRequest;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.common.utils.ExceptionUtil;
@ -58,7 +59,7 @@ public interface BaseDatabaseOperate {
} catch (IncorrectResultSizeDataAccessException e) {
return null;
} catch (CannotGetJdbcConnectionException e) {
fatalLog.error("[db-error] " + e.toString(), e);
fatalLog.error("[db-error] {}", e.toString());
throw e;
} catch (DataAccessException e) {
fatalLog.error("[db-error] DataAccessException sql : {}, args : {}, error : {}",
@ -76,7 +77,7 @@ public interface BaseDatabaseOperate {
} catch (IncorrectResultSizeDataAccessException e) {
return null;
} catch (CannotGetJdbcConnectionException e) {
fatalLog.error("[db-error] " + e.toString(), e);
fatalLog.error("[db-error] {}", e.toString());
throw e;
} catch (DataAccessException e) {
fatalLog.error("[db-error] DataAccessException sql : {}, args : {}, error : {}",
@ -92,7 +93,7 @@ public interface BaseDatabaseOperate {
try {
return jdbcTemplate.query(sql, args, mapper);
} catch (CannotGetJdbcConnectionException e) {
fatalLog.error("[db-error] " + e.toString(), e);
fatalLog.error("[db-error] {}", e.toString());
throw e;
} catch (DataAccessException e) {
fatalLog.error("[db-error] DataAccessException sql : {}, args : {}, error : {}",
@ -112,7 +113,7 @@ public interface BaseDatabaseOperate {
return null;
}
catch (CannotGetJdbcConnectionException e) {
fatalLog.error("[db-error] " + e.toString(), e);
fatalLog.error("[db-error] {}", e.toString());
throw e;
} catch (DataAccessException e) {
fatalLog.error("[db-error] DataAccessException sql : {}, args : {}, error : {}",
@ -128,7 +129,7 @@ public interface BaseDatabaseOperate {
try {
return jdbcTemplate.queryForList(sql, args);
} catch (CannotGetJdbcConnectionException e) {
fatalLog.error("[db-error] " + e.toString(), e);
fatalLog.error("[db-error] {}", e.toString());
throw e;
} catch (DataAccessException e) {
fatalLog.error("[db-error] DataAccessException sql : {}, args : {}, error : {}",
@ -147,18 +148,18 @@ public interface BaseDatabaseOperate {
contexts.forEach(pair -> {
errSql[0] = pair.getSql();
args[0] = pair.getArgs();
LogUtil.defaultLog.debug("current sql : {}", errSql[0]);
LogUtil.defaultLog.debug("current args : {}", args[0]);
LoggerUtils.printIfDebugEnabled(LogUtil.defaultLog, "current sql : {}", errSql[0]);
LoggerUtils.printIfDebugEnabled(LogUtil.defaultLog, "current args : {}", args[0]);
jdbcTemplate.update(pair.getSql(), pair.getArgs());
});
return Boolean.TRUE;
}
catch (BadSqlGrammarException | DataIntegrityViolationException e) {
fatalLog.error("[db-error] sql : {}, args : {}, error : {}", errSql[0], args[0], e);
fatalLog.error("[db-error] sql : {}, args : {}, error : {}", errSql[0], args[0], e.toString());
return false;
}
catch (CannotGetJdbcConnectionException e) {
fatalLog.error("[db-error] sql : {}, args : {}, error : {}", errSql[0], args[0], e);
fatalLog.error("[db-error] sql : {}, args : {}, error : {}", errSql[0], args[0], e.toString());
throw e;
} catch (DataAccessException e) {
fatalLog.error("[db-error] DataAccessException sql : {}, args : {}, error : {}",

View File

@ -17,6 +17,7 @@
package com.alibaba.nacos.config.server.service.repository;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.utils.Observable;
import com.alibaba.nacos.common.utils.Observer;
@ -47,6 +48,7 @@ import com.alibaba.nacos.core.distributed.id.SnakeFlowerIdGenerator;
import com.alibaba.nacos.core.notify.Event;
import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.notify.listener.Subscribe;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.ClassUtils;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
@ -128,12 +130,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DistributedDatabaseOperateImpl extends LogProcessor4CP
implements BaseDatabaseOperate, DatabaseOperate {
@Autowired
private ServerMemberManager memberManager;
@Autowired
private ProtocolManager protocolManager;
private CPProtocol protocol;
private LocalDataSourceServiceImpl dataSourceService;
@ -144,9 +141,18 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
private ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
@PostConstruct
protected void init() throws Exception {
public DistributedDatabaseOperateImpl(ServerMemberManager memberManager,
ProtocolManager protocolManager) throws Exception {
this.memberManager = memberManager;
this.protocol = protocolManager.getCpProtocol();
init();
this.protocol.addLogProcessors(Collections.singletonList(this));
}
protected void init() throws Exception {
this.dataSourceService = (LocalDataSourceServiceImpl) DynamicDataSource.getInstance()
.getDataSource();
@ -183,7 +189,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
@Override
public <R> R queryOne(String sql, Class<R> cls) {
try {
LogUtil.defaultLog.debug("queryOne info : sql : {}", sql);
LoggerUtils.printIfDebugEnabled(LogUtil.defaultLog, "queryOne info : sql : {}", sql);
byte[] data = serializer.serialize(SelectRequest.builder()
.queryType(QueryType.QUERY_ONE_NO_MAPPER_NO_ARGS).sql(sql)
@ -199,7 +205,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
}
catch (Exception e) {
LogUtil.fatalLog
.error("An exception occurred during the query operation : {}", e);
.error("An exception occurred during the query operation : {}", e.toString());
throw new NJdbcException(e);
}
}
@ -207,7 +213,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
@Override
public <R> R queryOne(String sql, Object[] args, Class<R> cls) {
try {
LogUtil.defaultLog.debug("queryOne info : sql : {}, args : {}", sql, args);
LoggerUtils.printIfDebugEnabled(LogUtil.defaultLog, "queryOne info : sql : {}, args : {}", sql, args);
byte[] data = serializer.serialize(SelectRequest.builder()
.queryType(QueryType.QUERY_ONE_NO_MAPPER_WITH_ARGS).sql(sql)
@ -223,7 +229,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
}
catch (Exception e) {
LogUtil.fatalLog
.error("An exception occurred during the query operation : {}", e);
.error("An exception occurred during the query operation : {}", e.toString());
throw new NJdbcException(e);
}
}
@ -231,8 +237,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
@Override
public <R> R queryOne(String sql, Object[] args, RowMapper<R> mapper) {
try {
LogUtil.defaultLog.debug("queryOne info : sql : {}, args : {}", sql, args);
LoggerUtils.printIfDebugEnabled(LogUtil.defaultLog, "queryOne info : sql : {}, args : {}", sql, args);
byte[] data = serializer.serialize(SelectRequest.builder()
.queryType(QueryType.QUERY_ONE_WITH_MAPPER_WITH_ARGS).sql(sql)
@ -249,7 +254,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
}
catch (Exception e) {
LogUtil.fatalLog
.error("An exception occurred during the query operation : {}", e);
.error("An exception occurred during the query operation : {}", e.toString());
throw new NJdbcException(e);
}
}
@ -257,7 +262,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
@Override
public <R> List<R> queryMany(String sql, Object[] args, RowMapper<R> mapper) {
try {
LogUtil.defaultLog.debug("queryMany info : sql : {}, args : {}", sql, args);
LoggerUtils.printIfDebugEnabled(LogUtil.defaultLog, "queryMany info : sql : {}, args : {}", sql, args);
byte[] data = serializer.serialize(SelectRequest.builder()
.queryType(QueryType.QUERY_MANY_WITH_MAPPER_WITH_ARGS).sql(sql)
@ -274,7 +279,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
}
catch (Exception e) {
LogUtil.fatalLog
.error("An exception occurred during the query operation : {}", e);
.error("An exception occurred during the query operation : {}", e.toString());
throw new NJdbcException(e);
}
}
@ -282,7 +287,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
@Override
public <R> List<R> queryMany(String sql, Object[] args, Class<R> rClass) {
try {
LogUtil.defaultLog.debug("queryMany info : sql : {}, args : {}", sql, args);
LoggerUtils.printIfDebugEnabled(LogUtil.defaultLog, "queryMany info : sql : {}, args : {}", sql, args);
byte[] data = serializer.serialize(SelectRequest.builder()
.queryType(QueryType.QUERY_MANY_NO_MAPPER_WITH_ARGS).sql(sql)
@ -298,7 +303,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
}
catch (Exception e) {
LogUtil.fatalLog
.error("An exception occurred during the query operation : {}", e);
.error("An exception occurred during the query operation : {}", e.toString());
throw new NJdbcException(e);
}
}
@ -306,8 +311,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
@Override
public List<Map<String, Object>> queryMany(String sql, Object[] args) {
try {
LogUtil.defaultLog.debug("queryMany info : sql : {}, args : {}", sql, args);
LoggerUtils.printIfDebugEnabled(LogUtil.defaultLog, "queryMany info : sql : {}, args : {}", sql, args);
byte[] data = serializer.serialize(SelectRequest.builder()
.queryType(QueryType.QUERY_MANY_WITH_LIST_WITH_ARGS).sql(sql)
@ -324,7 +328,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
}
catch (Exception e) {
LogUtil.fatalLog
.error("An exception occurred during the query operation : {}", e);
.error("An exception occurred during the query operation : {}", e.toString());
throw new NJdbcException(e);
}
}
@ -337,7 +341,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
// array elements are not lost, the serialization here is done using the java-specific
// serialization framework, rather than continuing with the protobuff
LogUtil.defaultLog.debug("modifyRequests info : {}", sqlContext);
LoggerUtils.printIfDebugEnabled(LogUtil.defaultLog, "modifyRequests info : {}", sqlContext);
// {timestamp}-{group}-{ip:port}-{signature}
@ -358,7 +362,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
throw (ConsistencyException) e;
}
LogUtil.fatalLog
.error("An exception occurred during the update operation : {}", e);
.error("An exception occurred during the update operation : {}", e.toString());
throw new NJdbcException(e);
}
}
@ -374,7 +378,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
final SelectRequest selectRequest = serializer
.deserialize(request.getData().toByteArray(), SelectRequest.class);
LogUtil.defaultLog.debug("getData info : selectRequest : {}", selectRequest);
LoggerUtils.printIfDebugEnabled(LogUtil.defaultLog, "getData info : selectRequest : {}", selectRequest);
final RowMapper<Object> mapper = RowMapperManager
.getRowMapper(selectRequest.getClassName());
@ -417,7 +421,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
catch (Exception e) {
LogUtil.fatalLog
.error("There was an error querying the data, request : {}, error : {}",
selectRequest, e);
selectRequest, e.toString());
return GetResponse.newBuilder()
.setErrMsg(e.getClass().getSimpleName() + ":" + e.getMessage())
.build();
@ -429,8 +433,7 @@ public class DistributedDatabaseOperateImpl extends LogProcessor4CP
@Override
public LogFuture onApply(Log log) {
LogUtil.defaultLog.debug("onApply info : log : {}", log);
LoggerUtils.printIfDebugEnabled(LogUtil.defaultLog, "onApply info : log : {}", log);
final ByteString byteString = log.getData();
Preconditions.checkArgument(byteString != null, "Log.getData() must not null");

View File

@ -96,18 +96,4 @@ public interface Config<L extends LogProcessor> extends Serializable {
*/
String getValOfDefault(String key, String defaultVal);
/**
* get LogProcessors
*
* @return {@link List<LogProcessor>}
*/
List<L> listLogProcessor();
/**
* add {@link LogProcessor} processor
*
* @param processors {@link LogProcessor} array
*/
void addLogProcessors(Collection<L> processors);
}

View File

@ -21,6 +21,7 @@ import com.alibaba.nacos.consistency.entity.GetRequest;
import com.alibaba.nacos.consistency.entity.GetResponse;
import com.alibaba.nacos.consistency.entity.Log;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -38,7 +39,7 @@ import java.util.concurrent.CompletableFuture;
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public interface ConsistencyProtocol<T extends Config> extends CommandOperations {
public interface ConsistencyProtocol<T extends Config, P extends LogProcessor> extends CommandOperations {
/**
* Consistency protocol initialization: perform initialization operations based on the incoming Config
@ -48,6 +49,13 @@ public interface ConsistencyProtocol<T extends Config> extends CommandOperations
*/
void init(T config);
/**
* Add a log handler
*
* @param processors {@link LogProcessor}
*/
void addLogProcessors(Collection<P> processors);
/**
* Copy of metadata information for this consensus protocol
* 该一致性协议的元数据信息

View File

@ -18,6 +18,7 @@ package com.alibaba.nacos.consistency;
import com.alibaba.nacos.common.utils.Observable;
import com.alibaba.nacos.common.utils.Observer;
import com.alibaba.nacos.common.utils.StringUtils;
import org.javatuples.Pair;
import java.util.Map;
@ -64,13 +65,12 @@ public final class ProtocolMetaData {
});
}
public Object get(String group, String... subKey) {
if (subKey == null || subKey.length == 0) {
public Object get(String group, String subKey) {
if (StringUtils.isBlank(subKey)) {
return metaDataMap.get(group);
} else {
final String key = subKey[0];
if (metaDataMap.containsKey(group)) {
return metaDataMap.get(group).get(key);
return metaDataMap.get(group).get(subKey);
}
return null;
}
@ -84,7 +84,7 @@ public final class ProtocolMetaData {
.subscribe(key, observer);
}
public void ubSubscribe(final String group, final String key, final Observer observer) {
public void unSubscribe(final String group, final String key, final Observer observer) {
metaDataMap.computeIfAbsent(group, s -> new MetaData(group));
metaDataMap.get(group)
.unSubscribe(key, observer);

View File

@ -23,6 +23,6 @@ import com.alibaba.nacos.consistency.ConsistencyProtocol;
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@SuppressWarnings("all")
public interface APProtocol<C extends Config> extends ConsistencyProtocol<C> {
public interface APProtocol<C extends Config, P extends LogProcessor4AP> extends ConsistencyProtocol<C, P> {
}

View File

@ -23,7 +23,7 @@ import com.alibaba.nacos.consistency.ConsistencyProtocol;
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@SuppressWarnings("all")
public interface CPProtocol<C extends Config> extends ConsistencyProtocol<C> {
public interface CPProtocol<C extends Config, P extends LogProcessor4CP> extends ConsistencyProtocol<C, P> {
/**
* Returns whether this node is a leader node

View File

@ -42,12 +42,12 @@ public class ConsoleExceptionHandler {
@ExceptionHandler(IllegalArgumentException.class)
private ResponseEntity<String> handleIllegalArgumentException(IllegalArgumentException e) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ExceptionUtil.getStackTrace(e));
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ExceptionUtil.getAllExceptionMsg(e));
}
@ExceptionHandler(Exception.class)
private ResponseEntity<String> handleException(Exception e) {
logger.error("CONSOLE", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(ExceptionUtil.getStackTrace(e));
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(ExceptionUtil.getAllExceptionMsg(e));
}
}

View File

@ -21,8 +21,8 @@ server.port=8848
### Connect URL of DB:
# db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
# db.user=nacos
# db.password=nacos
# db.user=root
# db.password=1017
#*************** Naming Module Related Configurations ***************#

View File

@ -135,8 +135,7 @@ public class Member implements Comparable<Member>, Cloneable {
@Override
public String toString() {
return "Member{" + "ip='" + ip + '\'' + ", port=" + port + ", state=" + state
+ ", extendInfo=" + new TreeMap<>(extendInfo) + '}';
return "Member{" + "address='" + getAddress() + '\'' + '}';
}
@Override

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.core.cluster;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.Loggers;
import org.apache.commons.lang3.StringUtils;
@ -36,6 +37,7 @@ import java.util.function.Predicate;
public class MemberUtils {
private static final String SEMICOLON = ":";
private static final String TARGET_MEMBER_CONNECT_REFUSE_ERRMSG = "Connection refused";
private static ServerMemberManager manager;
@ -65,13 +67,20 @@ public class MemberUtils {
port = Integer.parseInt(info[1]);
}
int raftPort = port - 1000;
Member target = Member.builder().ip(address).port(port)
.state(NodeState.UP).build();
Map<String, String> extendInfo = new HashMap<>(4);
int raftPort = calculateRaftPort(target);
Map<String, Object> extendInfo = new HashMap<>(4);
// The Raft Port information needs to be set by default
extendInfo.put(MemberMetaDataConstants.RAFT_PORT, String.valueOf(raftPort));
return Member.builder().ip(address).port(port).extendInfo(extendInfo)
.state(NodeState.UP).build();
target.setExtendInfo(extendInfo);
return target;
}
public static int calculateRaftPort(Member member) {
return member.getPort() - 1000;
}
public static Collection<Member> multiParse(Collection<String> addresses) {
@ -91,12 +100,19 @@ public class MemberUtils {
}
public static void onFail(Member member) {
onFail(member, null);
}
public static void onFail(Member member, Throwable ex) {
manager.getMemberAddressInfos().remove(member.getAddress());
member.setState(NodeState.SUSPICIOUS);
member.setFailAccessCnt(member.getFailAccessCnt() + 1);
int maxFailAccessCnt = ApplicationUtils
.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);
if (member.getFailAccessCnt() > maxFailAccessCnt) {
// If the number of consecutive failures to access the target node reaches
// a maximum, or the link request is rejected, the state is directly down
if (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils.containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) {
member.setState(NodeState.DOWN);
}
manager.update(member);
@ -112,21 +128,22 @@ public class MemberUtils {
ApplicationUtils.writeClusterConf(builder.toString());
}
catch (Throwable ex) {
Loggers.CLUSTER.error("cluster member node persistence failed : {}", ex);
Loggers.CLUSTER.error("cluster member node persistence failed : {}",
ExceptionUtil.getAllExceptionMsg(ex));
}
}
@SuppressWarnings("PMD.UndefineMagicConstantRule")
public static List<Member> kRandom(Collection<Member> members,
public static Collection<Member> kRandom(Collection<Member> members,
Predicate<Member> filter) {
int k = ApplicationUtils
.getProperty("nacos.core.member.report.random-num", Integer.class, 3);
List<Member> tmp = new ArrayList<>();
// Here thinking similar consul gossip protocols random k node
Set<Member> tmp = new HashSet<>();
// Here thinking similar consul gossip protocols random k node
int totalSize = members.size();
for (int i = 0; i < 3 * totalSize && members.size() <= k; i++) {
for (int i = 0; i < 3 * totalSize && tmp.size() <= k; i++) {
for (Member member : members) {
if (filter.test(member)) {
tmp.add(member);

View File

@ -26,6 +26,7 @@ import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.core.cluster.lookup.LookupFactory;
import com.alibaba.nacos.core.notify.Event;
import com.alibaba.nacos.core.notify.NotifyCenter;
@ -47,7 +48,6 @@ import javax.servlet.ServletContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -111,7 +111,7 @@ public class ServerMemberManager
/**
* self member obj
*/
private Member self;
private volatile Member self;
/**
* here is always the node information of the "UP" state
@ -230,9 +230,9 @@ public class ServerMemberManager
public Collection<Member> allMembers() {
// We need to do a copy to avoid affecting the real data
List<Member> list = new ArrayList<>(serverList.values());
Collections.sort(list);
return list;
HashSet<Member> set = new HashSet<>(serverList.values());
set.add(self);
return set;
}
public List<Member> allMembersWithoutSelf() {
@ -257,7 +257,7 @@ public class ServerMemberManager
isInIpList = false;
members.add(this.self);
Loggers.CLUSTER
.error("[serverlist] self ip {} not in serverlist {}", self, members);
.warn("[serverlist] self ip {} not in serverlist {}", self, members);
}
boolean hasChange = false;
@ -281,6 +281,10 @@ public class ServerMemberManager
serverList = tmpMap;
memberAddressInfos = tmpAddressInfo;
Collection<Member> finalMembers = allMembers();
Loggers.CLUSTER.warn("[serverlist] updated to : {}", finalMembers);
oldList.clear();
oldSet.clear();
@ -288,9 +292,8 @@ public class ServerMemberManager
// <important> need to put the event publication into a synchronized block to ensure
// that the event publication is sequential
if (hasChange) {
MemberUtils.syncToFile(members);
Loggers.CLUSTER.warn("member has changed : {}", members);
Event event = MemberChangeEvent.builder().members(allMembers()).build();
MemberUtils.syncToFile(finalMembers);
Event event = MemberChangeEvent.builder().members(finalMembers).build();
NotifyCenter.publishEvent(event);
}
@ -411,15 +414,15 @@ public class ServerMemberManager
public void onError(Throwable throwable) {
Loggers.CLUSTER
.error("failed to report new info to target node : {}, error : {}",
target.getAddress(), throwable);
MemberUtils.onFail(target);
target.getAddress(), ExceptionUtil.getAllExceptionMsg(throwable));
MemberUtils.onFail(target, throwable);
}
});
}
catch (Throwable ex) {
Loggers.CLUSTER
.error("failed to report new info to target node : {}, error : {}",
target.getAddress(), ex);
target.getAddress(), ExceptionUtil.getAllExceptionMsg(ex));
}
}

View File

@ -22,6 +22,7 @@ import com.alibaba.nacos.common.http.NSyncHttpClient;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.core.cluster.AbstractMemberLookup;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.utils.ApplicationUtils;
@ -102,7 +103,7 @@ public class AddressServerMemberLookup extends AbstractMemberLookup {
break;
} catch (Throwable e) {
ex = e;
Loggers.CLUSTER.error("[serverlist] exception, error : {}", ex);
Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
}
}
if (!success) {
@ -142,7 +143,7 @@ public class AddressServerMemberLookup extends AbstractMemberLookup {
if (addressServerFailCount >= maxFailCount) {
isAddressServerHealth = false;
}
Loggers.CLUSTER.error("[serverlist] exception, error : {}", ex);
Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));
} finally {
GlobalExecutor.scheduleByCommon(this, 5_000L);
}
@ -162,7 +163,7 @@ public class AddressServerMemberLookup extends AbstractMemberLookup {
catch (Throwable e) {
Loggers.CLUSTER
.error("[serverlist] exception for analyzeClusterConf, error : {}",
e);
ExceptionUtil.getAllExceptionMsg(e));
}
addressServerFailCount = 0;
isAddressServerHealth = false;

View File

@ -23,6 +23,7 @@ import com.alibaba.nacos.common.http.NAsyncHttpClient;
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.core.utils.TimerContext;
import com.alibaba.nacos.core.cluster.AbstractMemberLookup;
import com.alibaba.nacos.core.cluster.Member;
@ -147,9 +148,8 @@ public class DiscoveryMemberLookup extends AbstractMemberLookup {
@Override
public void onReceive(RestResult<Collection<String>> result) {
if (result.ok()) {
Loggers.CLUSTER
.debug("success ping to node : {}, result : {}",
member, result);
LoggerUtils.printIfDebugEnabled(Loggers.CLUSTER, "success ping to node : {}, result : {}",
member, result);
final Collection<String> data = result.getData();
if (CollectionUtils.isNotEmpty(data)) {
@ -172,14 +172,14 @@ public class DiscoveryMemberLookup extends AbstractMemberLookup {
Loggers.CLUSTER
.error("An exception occurred while reporting their "
+ "information to the node : {}, error : {}",
member.getAddress(), e);
MemberUtils.onFail(member);
member.getAddress(), e.getMessage());
MemberUtils.onFail(member, e);
}
});
}
}
catch (Exception e) {
Loggers.CLUSTER.error("node state report task has error : {}", e);
Loggers.CLUSTER.error("node state report task has error : {}", e.getMessage());
}
finally {
TimerContext.end(Loggers.CLUSTER);

View File

@ -62,7 +62,7 @@ public class FileConfigMemberLookup extends AbstractMemberLookup {
}
catch (Throwable e) {
Loggers.CLUSTER
.error("An exception occurred in the launch file monitor : {}", e);
.error("An exception occurred in the launch file monitor : {}", e.getMessage());
}
}
}
@ -81,7 +81,7 @@ public class FileConfigMemberLookup extends AbstractMemberLookup {
catch (Throwable e) {
Loggers.CLUSTER
.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}",
e);
e.getMessage());
}
afterLookup(tmpMembers);

View File

@ -112,14 +112,33 @@ public class StartingSpringApplicationRunListener
public void started(ConfigurableApplicationContext context) {
starting = false;
ConfigurableEnvironment env = context.getEnvironment();
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
}
logFilePath();
LOGGER.info("Nacos started successfully in {} mode.",
System.getProperty(MODE_PROPERTY_KEY_STAND_MODE));
// External data sources are used by default in cluster mode
boolean useExternalStorage = ("mysql".equalsIgnoreCase(env.getProperty("spring.datasource.platform", "")));
// must initialize after setUseExternalDB
// This value is true in stand-alone mode and false in cluster mode
// If this value is set to true in cluster mode, nacos's distributed storage engine is turned on
// default value is depend on ${nacos.standalone}
if (!useExternalStorage) {
boolean embeddedStorage = ApplicationUtils.getStandaloneMode() || Boolean.getBoolean("embeddedStorage");
// If the embedded data source storage is not turned on, it is automatically
// upgraded to the external data source storage, as before
if (!embeddedStorage) {
useExternalStorage = true;
}
}
LOGGER.info("Nacos started successfully in {} mode. use {} storage",
System.getProperty(MODE_PROPERTY_KEY_STAND_MODE), useExternalStorage ? "external" : "embedded");
}
@Override

View File

@ -24,6 +24,7 @@ import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.model.RestResultUtils;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.NodeState;
@ -101,7 +102,7 @@ public class NacosClusterController {
if (!node.check()) {
return RestResultUtils.failedWithMsg(400, "Node information is illegal");
}
Loggers.CLUSTER.debug("node state report, receive info : {}", node);
LoggerUtils.printIfDebugEnabled(Loggers.CLUSTER, "node state report, receive info : {}", node);
node.setState(NodeState.UP);
node.setFailAccessCnt(0);
memberManager.update(node);
@ -138,7 +139,7 @@ public class NacosClusterController {
public void onReceive(RestResult<String> result) {
try {
if (result.ok()) {
Loggers.CLUSTER.debug("The node : [{}] success to process the request",
LoggerUtils.printIfDebugEnabled(Loggers.CLUSTER, "The node : [{}] success to process the request",
member);
MemberUtils.onSuccess(member);
}

View File

@ -28,7 +28,7 @@ import java.util.Map;
/**
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
public abstract class AbstractConsistencyProtocol<T extends Config, L extends LogProcessor> implements ConsistencyProtocol<T> {
public abstract class AbstractConsistencyProtocol<T extends Config, L extends LogProcessor> implements ConsistencyProtocol<T, L> {
protected final ProtocolMetaData metaData = new ProtocolMetaData();

View File

@ -17,16 +17,13 @@
package com.alibaba.nacos.core.distributed;
import com.alibaba.nacos.consistency.Config;
import com.alibaba.nacos.consistency.ConsistencyProtocol;
import com.alibaba.nacos.consistency.LogProcessor;
import com.alibaba.nacos.consistency.ap.APProtocol;
import com.alibaba.nacos.consistency.ap.LogProcessor4AP;
import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.consistency.cp.LogProcessor4CP;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.MemberChangeEvent;
import com.alibaba.nacos.core.cluster.MemberChangeListener;
import com.alibaba.nacos.core.cluster.MemberMetaDataConstants;
import com.alibaba.nacos.core.cluster.MemberUtils;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.notify.NotifyCenter;
import com.alibaba.nacos.core.utils.ApplicationUtils;
@ -39,11 +36,8 @@ import org.springframework.context.event.ContextStartedEvent;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -125,7 +119,6 @@ public class ProtocolManager
Class configType = ClassUtils.resolveGenericType(protocol.getClass());
Config config = (Config) ApplicationUtils.getBean(configType);
injectMembers4AP(config);
config.addLogProcessors(loadProcessor(LogProcessor4AP.class, protocol));
protocol.init((config));
ProtocolManager.this.apProtocol = protocol;
});
@ -136,7 +129,6 @@ public class ProtocolManager
Class configType = ClassUtils.resolveGenericType(protocol.getClass());
Config config = (Config) ApplicationUtils.getBean(configType);
injectMembers4CP(config);
config.addLogProcessors(loadProcessor(LogProcessor4CP.class, protocol));
protocol.init((config));
ProtocolManager.this.cpProtocol = protocol;
});
@ -156,14 +148,6 @@ public class ProtocolManager
config.setMembers(self, others);
}
@SuppressWarnings("all")
private List<LogProcessor> loadProcessor(Class cls, ConsistencyProtocol protocol) {
final Map<String, LogProcessor> beans = (Map<String, LogProcessor>) ApplicationUtils
.getBeansOfType(cls);
final List<LogProcessor> result = new ArrayList<>(beans.values());
return result;
}
@Override
public void onEvent(MemberChangeEvent event) {
// Here, the sequence of node change events is very important. For example,
@ -192,8 +176,7 @@ public class ProtocolManager
Set<String> nodes = new HashSet<>();
members.forEach(member -> {
final String ip = member.getIp();
final int port = member.getPort();
final int raftPort = port + 1000 >= 65535 ? port + 1 : port + 1000;
final int raftPort = MemberUtils.calculateRaftPort(member);
nodes.add(ip + ":" + raftPort);
});
return nodes;

View File

@ -43,7 +43,7 @@ public class JRaftOps {
}
public RestResult<String> execute(String[] args) {
return RestResultUtils.failed("not support yeah");
return RestResultUtils.failed("not support yet");
}
public RestResult<String> execute(Map<String, String> args) {

View File

@ -20,6 +20,7 @@ import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.consistency.LogFuture;
import com.alibaba.nacos.consistency.LogProcessor;
import com.alibaba.nacos.consistency.ProtocolMetaData;
import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.consistency.cp.Constants;
@ -43,6 +44,7 @@ import com.alipay.sofa.jraft.Node;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -100,7 +102,7 @@ import java.util.function.BiConsumer;
@SuppressWarnings("all")
public class JRaftProtocol
extends AbstractConsistencyProtocol<RaftConfig, LogProcessor4CP>
implements CPProtocol<RaftConfig> {
implements CPProtocol<RaftConfig, LogProcessor4CP> {
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final AtomicBoolean shutdowned = new AtomicBoolean(false);
@ -131,17 +133,13 @@ public class JRaftProtocol
public void init(RaftConfig config) {
if (initialized.compareAndSet(false, true)) {
this.raftConfig = config;
// Load all LogProcessor information in advance
loadLogProcessor(config.listLogProcessor());
this.selfAddress = memberManager.getSelf().getAddress();
NotifyCenter.registerToSharePublisher(RaftEvent.class);
this.failoverRetries = ConvertUtils
.toInt(config.getVal(RaftSysConstants.REQUEST_FAILOVER_RETRIES), 1);
this.failoverRetriesStr = String.valueOf(failoverRetries);
this.raftServer.setFailoverRetries(failoverRetries);
this.raftServer.init(this.raftConfig, this.raftConfig.listLogProcessor());
this.raftServer.init(this.raftConfig);
this.raftServer.start();
// There is only one consumer to ensure that the internal consumption
@ -184,6 +182,11 @@ public class JRaftProtocol
}
}
@Override
public void addLogProcessors(Collection<LogProcessor4CP> processors) {
raftServer.createMultiRaftGroup(processors);
}
@Override
public GetResponse getData(GetRequest request) throws Exception {
int retryCnt = Integer.parseInt(
@ -247,6 +250,7 @@ public class JRaftProtocol
private void injectProtocolMetaData(ProtocolMetaData metaData) {
Member member = memberManager.getSelf();
member.setExtendVal("raft_meta_data", metaData);
memberManager.update(member);
}
@Override

View File

@ -20,6 +20,7 @@ import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.DiskUtils;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.consistency.LogProcessor;
import com.alibaba.nacos.consistency.SerializeFactory;
@ -143,7 +144,7 @@ public class JRaftServer {
this.failoverRetries = failoverRetries;
}
void init(RaftConfig config, Collection<LogProcessor4CP> processors) {
void init(RaftConfig config) {
this.raftConfig = config;
this.serializer = SerializeFactory.getDefault();
Loggers.RAFT.info("Initializes the Raft protocol, raft-config info : {}", config);
@ -175,8 +176,6 @@ public class JRaftServer {
this.cliClientService = new BoltCliClientService();
this.cliClientService.init(cliOptions);
this.cliService = RaftServiceFactory.createAndInitCliService(cliOptions);
this.processors.addAll(processors);
}
synchronized void start() {
@ -376,7 +375,7 @@ public class JRaftServer {
public <T> CompletableFuture<T> commit(Log data, final CompletableFuture<T> future,
final int retryLeft) {
Loggers.RAFT.debug("data requested this time : {}", data);
LoggerUtils.printIfDebugEnabled(Loggers.RAFT, "data requested this time : {}", data);
final String group = data.getGroup();
final RaftGroupTuple tuple = findTupleByGroup(group);
if (tuple == null) {
@ -409,6 +408,11 @@ public class JRaftServer {
for (String address : addresses) {
newConf.addPeer(PeerId.parsePeer(address));
}
if (Objects.equals(oldConf, newConf)) {
return;
}
for (int i = 0; i < 3; i++) {
try {
Status status = cliService.changePeers(groupId, oldConf, newConf);

View File

@ -17,6 +17,8 @@
package com.alibaba.nacos.core.distributed.raft;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.consistency.LogFuture;
import com.alibaba.nacos.consistency.LogProcessor;
import com.alibaba.nacos.consistency.cp.LogProcessor4CP;
@ -100,8 +102,7 @@ class NacosStateMachine extends StateMachineAdapter {
final ByteBuffer data = iter.getData();
log = Log.parseFrom(data.array());
}
Loggers.RAFT.debug("receive log : {}", log);
LoggerUtils.printIfDebugEnabled(Loggers.RAFT, "receive log : {}", log);
final String type = log
.getExtendInfoOrDefault(JRaftConstants.JRAFT_EXTEND_INFO_KEY,
@ -122,7 +123,7 @@ class NacosStateMachine extends StateMachineAdapter {
}
catch (Throwable e) {
index++;
status.setError(RaftError.UNKNOWN, e.getMessage());
status.setError(RaftError.UNKNOWN, e.toString());
Optional.ofNullable(closure)
.ifPresent(closure1 -> closure1.setThrowable(e));
throw e;
@ -142,7 +143,7 @@ class NacosStateMachine extends StateMachineAdapter {
Loggers.RAFT.error("processor : {}, stateMachine meet critical error: {}.",
processor, t);
iter.setErrorAndRollback(index - applied, new Status(RaftError.ESTATEMACHINE,
"StateMachine meet critical error: %s.", t.getMessage()));
"StateMachine meet critical error: %s.", ExceptionUtil.getStackTrace(t)));
}
}

View File

@ -41,7 +41,6 @@ public class RaftConfig implements Config<LogProcessor4CP> {
private static final long serialVersionUID = 9174789390266064002L;
private Map<String, String> data = Collections.synchronizedMap(new HashMap<>());
private List<LogProcessor4CP> processors = Collections.synchronizedList(new ArrayList<>());
private String selfAddress;
private Set<String> members = Collections.synchronizedSet(new HashSet<>());
@ -95,16 +94,6 @@ public class RaftConfig implements Config<LogProcessor4CP> {
return data.getOrDefault(key, defaultVal);
}
@Override
public List<LogProcessor4CP> listLogProcessor() {
return processors;
}
@Override
public void addLogProcessors(Collection<LogProcessor4CP> processors) {
this.processors.addAll(processors);
}
@Override
public String toString() {
try {

View File

@ -232,7 +232,7 @@ public class NotifyCenter {
return publisher.publish(event);
}
throw new NoSuchElementException(
"There are no event publishers for this event, please register");
"There are no [" + topic + "] publishers for this event, please register");
}
/**

View File

@ -532,5 +532,9 @@ public class ApplicationUtils
ApplicationUtils.environment = environment;
}
public static void injectContext(ConfigurableApplicationContext context) {
ApplicationUtils.applicationContext = context;
}
}

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.core.utils;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.Pair;
import org.slf4j.Logger;
@ -38,7 +39,7 @@ public class TimerContext {
public static void end(final Logger logger) {
long endTime = System.currentTimeMillis();
Pair<String, Long> record = TIME_RECORD.get();
logger.debug("{} cost time : {} ms", record.getFirst(), (endTime - record.getSecond()));
LoggerUtils.printIfDebugEnabled(logger, "{} cost time : {} ms", record.getFirst(), (endTime - record.getSecond()));
TIME_RECORD.remove();
}

View File

@ -211,7 +211,12 @@
<level value="INFO"/>
</logger>
<logger name="com.alibaba.nacos.common.file.FileWatcher">
<logger name="com.alibaba.nacos.common.file.WatchFileCenter">
<appender-ref ref="CONSOLE"/>
<level value="INFO"/>
</logger>
<logger name="com.alibaba.nacos.common.executor.ThreadPoolManager">
<appender-ref ref="CONSOLE"/>
<level value="INFO"/>
</logger>

View File

@ -630,7 +630,7 @@
<level value="INFO"/>
</logger>
<logger name="com.alibaba.nacos.common.file.FileWatcher">
<logger name="com.alibaba.nacos.common.file.WatchFileCenter">
<appender-ref ref="CONSOLE"/>
<level value="INFO"/>
</logger>

View File

@ -32,26 +32,26 @@ public class ResponseExceptionHandler {
@ExceptionHandler(NacosException.class)
public ResponseEntity<String> handleNacosException(NacosException e) {
Loggers.SRV_LOG.error("got exception. {}", e.getErrMsg(), ExceptionUtil.getStackTrace(e));
Loggers.SRV_LOG.error("got exception. {}", e.getErrMsg(), ExceptionUtil.getAllExceptionMsg(e));
return ResponseEntity.status(e.getErrCode()).body(e.getMessage());
}
@ExceptionHandler(IllegalArgumentException.class)
public ResponseEntity<String> handleParameterError(IllegalArgumentException ex) {
Loggers.SRV_LOG.error("got exception. {}", ex.getMessage(), ExceptionUtil.getStackTrace(ex));
Loggers.SRV_LOG.error("got exception. {}", ex.getMessage(), ExceptionUtil.getAllExceptionMsg(ex));
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(ex.getMessage());
}
@ExceptionHandler(MissingServletRequestParameterException.class)
public ResponseEntity<String> handleMissingParams(MissingServletRequestParameterException ex) {
Loggers.SRV_LOG.error("got exception.", ExceptionUtil.getStackTrace(ex));
Loggers.SRV_LOG.error("got exception.", ExceptionUtil.getAllExceptionMsg(ex));
String name = ex.getParameterName();
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("Parameter '" + name + "' is missing");
}
@ExceptionHandler(Exception.class)
public ResponseEntity<String> handleException(Exception e) {
Loggers.SRV_LOG.error("got exception.", ExceptionUtil.getStackTrace(e));
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(ExceptionUtil.getStackTrace(e));
Loggers.SRV_LOG.error("got exception.", ExceptionUtil.getAllExceptionMsg(e));
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(ExceptionUtil.getAllExceptionMsg(e));
}
}

View File

@ -36,6 +36,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@ -111,7 +112,7 @@ public class WatchFileCenter_ITCase {
ThreadUtils.sleep(10_000L);
}
latch.await();
latch.await(10_000L, TimeUnit.MILLISECONDS);
Assert.assertEquals(3, count.get());
}
@ -139,7 +140,7 @@ public class WatchFileCenter_ITCase {
}
});
}
latch.await();
latch.await(10_000L, TimeUnit.MILLISECONDS);
ThreadUtils.sleep(5_000L);
}

View File

@ -250,7 +250,7 @@ public class ConfigDerbyRaft_DITCase
iconfig7.publishConfig(dataId, group, content);
ThreadUtils.sleep(10_000L);
latch[0].await();
latch[0].await(10_000L, TimeUnit.MILLISECONDS);
Assert.assertEquals(content, r.get());
Assert.assertEquals(content, iconfig7.getConfig(dataId, group, 2_000L));
@ -258,7 +258,7 @@ public class ConfigDerbyRaft_DITCase
iconfig7.publishConfig(dataId, group, content);
ThreadUtils.sleep(10_000L);
latch[1].await();
latch[1].await(10_000L, TimeUnit.MILLISECONDS);
Assert.assertEquals(content, r.get());
Assert.assertEquals(content, iconfig7.getConfig(dataId, group, 2_000L));
}
@ -311,7 +311,7 @@ public class ConfigDerbyRaft_DITCase
}
});
NotifyCenter.publishEvent(new RaftDBErrorEvent());
latch1.await();
latch1.await(10_000L, TimeUnit.MILLISECONDS);
result = iconfig7.publishConfig("raft_test_raft_error", "cluster_test_1",
"this.is.raft_cluster=lessspring_7");
@ -331,7 +331,7 @@ public class ConfigDerbyRaft_DITCase
}
});
NotifyCenter.publishEvent(new RaftDBErrorRecoverEvent());
latch2.await();
latch2.await(10_000L, TimeUnit.MILLISECONDS);
result = iconfig7.publishConfig("raft_test_raft_error", "cluster_test_1",
"this.is.raft_cluster=lessspring_7");

View File

@ -84,7 +84,7 @@ public class ConfigLongPollReturnChanges_ITCase {
configService.getConfig(dataId, group, 50);
latch.await();
latch.await(10_000L, TimeUnit.MILLISECONDS);
}
@Test
@ -124,7 +124,7 @@ public class ConfigLongPollReturnChanges_ITCase {
});
configService.publishConfig(dataId, group, newData);
latch.await();
latch.await(10_000L, TimeUnit.MILLISECONDS);
}
@Test
@ -162,7 +162,7 @@ public class ConfigLongPollReturnChanges_ITCase {
});
configService.removeConfig(dataId, group);
latch.await();
latch.await(10_000L, TimeUnit.MILLISECONDS);
}
}

View File

@ -1,144 +0,0 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.test.config;
import com.alibaba.nacos.config.server.service.repository.DistributedDatabaseOperateImpl;
import com.alibaba.nacos.config.server.service.sql.ModifyRequest;
import com.alibaba.nacos.consistency.LogFuture;
import com.alibaba.nacos.consistency.SerializeFactory;
import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.entity.Log;
import com.alibaba.nacos.consistency.exception.ConsistencyException;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.dao.DeadlockLoserDataAccessException;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.jdbc.BadSqlGrammarException;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import java.sql.SQLException;
import java.util.List;
/**
* If the SQL logic is wrong or the constraint is violated, the exception should not be
* thrown, but the return should be carried in the LogFuture, otherwise it will be thrown
* to the upper level
*
* @author <a href="mailto:liaochuntao@live.com">liaochuntao</a>
*/
@RunWith(MockitoJUnitRunner.class)
public class DerbyRaftError_ITCase {
@Spy
private DistributedDatabaseOperateImpl databaseOperate;
private Serializer serializer = SerializeFactory.getDefault();
@Before
public void before() {
MockitoAnnotations.initMocks(this);
}
@Test
public void test_logic_DuplicateKeyException() {
Mockito.doThrow(new DuplicateKeyException("DuplicateKeyException"))
.when(databaseOperate).onUpdate(Mockito.anyList());
List<ModifyRequest> list = Lists.newArrayList(new ModifyRequest());
LogFuture future = databaseOperate.onApply(
Log.newBuilder().setData(ByteString.copyFrom(serializer.serialize(list)))
.build());
Assert.assertEquals(future.getError().getMessage(), "DuplicateKeyException");
}
@Test
public void test_logic_BadSqlGrammarException() {
Mockito.doThrow(new BadSqlGrammarException("BadSqlGrammarException",
"BadSqlGrammarException", new SQLException())).when(databaseOperate)
.onUpdate(Mockito.anyList());
List<ModifyRequest> list = Lists.newArrayList(new ModifyRequest());
LogFuture future = databaseOperate.onApply(
Log.newBuilder().setData(ByteString.copyFrom(serializer.serialize(list)))
.build());
Assert.assertEquals(future.getError().getMessage(),
"BadSqlGrammarException; bad SQL grammar [BadSqlGrammarException]; nested exception is java.sql.SQLException");
}
@Test
public void test_logic_DataIntegrityViolationException() {
Mockito.doThrow(
new DataIntegrityViolationException("DataIntegrityViolationException"))
.when(databaseOperate).onUpdate(Mockito.anyList());
List<ModifyRequest> list = Lists.newArrayList(new ModifyRequest());
LogFuture future = databaseOperate.onApply(
Log.newBuilder().setData(ByteString.copyFrom(serializer.serialize(list)))
.build());
Assert.assertEquals(future.getError().getMessage(),
"DataIntegrityViolationException");
}
@Test(expected = ConsistencyException.class)
public void test_error_CannotGetJdbcConnectionException() {
Mockito.doThrow(
new CannotGetJdbcConnectionException("CannotGetJdbcConnectionException"))
.when(databaseOperate).onUpdate(Mockito.anyList());
List<ModifyRequest> list = Lists.newArrayList(new ModifyRequest());
databaseOperate.onApply(
Log.newBuilder().setData(ByteString.copyFrom(serializer.serialize(list)))
.build());
}
@Test(expected = ConsistencyException.class)
public void test_error_DataAccessException() {
Mockito.doThrow(
new DeadlockLoserDataAccessException("DeadlockLoserDataAccessException",
new SQLException())).when(databaseOperate)
.onUpdate(Mockito.anyList());
List<ModifyRequest> list = Lists.newArrayList(new ModifyRequest());
databaseOperate.onApply(
Log.newBuilder().setData(ByteString.copyFrom(serializer.serialize(list)))
.build());
}
}

View File

@ -25,6 +25,7 @@ import org.junit.Test;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static com.alibaba.nacos.core.utils.Constants.NACOS_SERVER_IP;
@ -69,7 +70,7 @@ public class InetUtils_ITCase {
};
NotifyCenter.registerSubscribe(subscribe);
latch.await();
latch.await(10_000L, TimeUnit.MILLISECONDS);
Assert.assertEquals(testIp, reference.get());
Assert.assertEquals(testIp, InetUtils.getSelfIp());

View File

@ -39,6 +39,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -119,7 +120,7 @@ public class ServerMemberManager_ITCase {
boolean changed = memberManager.memberJoin(members);
Assert.assertTrue(changed);
latch.await();
latch.await(10_000L, TimeUnit.MILLISECONDS);
Assert.assertEquals(1, integer.get());
}

View File

@ -252,8 +252,8 @@ public class NotifyCenter_ITCase {
NotifyCenter.publishEvent(new SlowE1());
NotifyCenter.publishEvent(new SlowE2());
latch1.await();
latch2.await();
latch1.await(10_000L, TimeUnit.MILLISECONDS);
latch2.await(10_000L, TimeUnit.MILLISECONDS);
Assert.assertEquals("SlowE1", values[0]);
Assert.assertEquals("SlowE2", values[1]);

View File

@ -17,12 +17,12 @@ server.port=8848
# spring.datasource.platform=mysql
### Count of DB:
db.num=1
# db.num=1
### Connect URL of DB:
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user=root
db.password=1017
# db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
# db.user=root
# db.password=1017
#*************** Naming Module Related Configurations ***************#