log optimize; config re sync request (#4638)
This commit is contained in:
parent
ac73e7395e
commit
afb1a28135
@ -16,75 +16,13 @@
|
||||
|
||||
package com.alibaba.nacos.api.config.remote.request;
|
||||
|
||||
import com.alibaba.nacos.api.remote.request.ServerPushRequest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* ConfigChangeNotifyRequest.
|
||||
*
|
||||
* @author liuzunfei
|
||||
* @version $Id: ConfigChangeNotifyRequest.java, v 0.1 2020年07月14日 3:20 PM liuzunfei Exp $
|
||||
*/
|
||||
public class ConfigChangeNotifyRequest extends ServerPushRequest {
|
||||
|
||||
private String dataId;
|
||||
|
||||
private String group;
|
||||
|
||||
private String tenant;
|
||||
|
||||
private boolean beta;
|
||||
|
||||
private List<String> betaIps;
|
||||
|
||||
private String content;
|
||||
|
||||
private String type;
|
||||
|
||||
private boolean contentPush;
|
||||
|
||||
public long lastModifiedTs;
|
||||
|
||||
@Override
|
||||
public String getModule() {
|
||||
return "config";
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter method for property <tt>contentPush</tt>.
|
||||
*
|
||||
* @return property value of contentPush
|
||||
*/
|
||||
public boolean isContentPush() {
|
||||
return contentPush;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setter method for property <tt>contentPush</tt>.
|
||||
*
|
||||
* @param contentPush value to be assigned to property contentPush
|
||||
*/
|
||||
public void setContentPush(boolean contentPush) {
|
||||
this.contentPush = contentPush;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter method for property <tt>lastModifiedTs</tt>.
|
||||
*
|
||||
* @return property value of lastModifiedTs
|
||||
*/
|
||||
public long getLastModifiedTs() {
|
||||
return lastModifiedTs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setter method for property <tt>lastModifiedTs</tt>.
|
||||
*
|
||||
* @param lastModifiedTs value to be assigned to property lastModifiedTs
|
||||
*/
|
||||
public void setLastModifiedTs(long lastModifiedTs) {
|
||||
this.lastModifiedTs = lastModifiedTs;
|
||||
}
|
||||
public class ConfigChangeNotifyRequest extends ConfigReSyncRequest {
|
||||
|
||||
/**
|
||||
* build success response.
|
||||
@ -101,130 +39,4 @@ public class ConfigChangeNotifyRequest extends ServerPushRequest {
|
||||
response.setTenant(tenant);
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter method for property <tt>dataId</tt>.
|
||||
*
|
||||
* @return property value of dataId
|
||||
*/
|
||||
public String getDataId() {
|
||||
return dataId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setter method for property <tt>dataId</tt>.
|
||||
*
|
||||
* @param dataId value to be assigned to property dataId
|
||||
*/
|
||||
public void setDataId(String dataId) {
|
||||
this.dataId = dataId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter method for property <tt>group</tt>.
|
||||
*
|
||||
* @return property value of group
|
||||
*/
|
||||
public String getGroup() {
|
||||
return group;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setter method for property <tt>group</tt>.
|
||||
*
|
||||
* @param group value to be assigned to property group
|
||||
*/
|
||||
public void setGroup(String group) {
|
||||
this.group = group;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter method for property <tt>tenant</tt>.
|
||||
*
|
||||
* @return property value of tenant
|
||||
*/
|
||||
public String getTenant() {
|
||||
return tenant;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setter method for property <tt>tenant</tt>.
|
||||
*
|
||||
* @param tenant value to be assigned to property tenant
|
||||
*/
|
||||
public void setTenant(String tenant) {
|
||||
this.tenant = tenant;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter method for property <tt>beta</tt>.
|
||||
*
|
||||
* @return property value of beta
|
||||
*/
|
||||
public boolean isBeta() {
|
||||
return beta;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setter method for property <tt>beta</tt>.
|
||||
*
|
||||
* @param beta value to be assigned to property beta
|
||||
*/
|
||||
public void setBeta(boolean beta) {
|
||||
this.beta = beta;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter method for property <tt>betaIps</tt>.
|
||||
*
|
||||
* @return property value of betaIps
|
||||
*/
|
||||
public List<String> getBetaIps() {
|
||||
return betaIps;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setter method for property <tt>betaIps</tt>.
|
||||
*
|
||||
* @param betaIps value to be assigned to property betaIps
|
||||
*/
|
||||
public void setBetaIps(List<String> betaIps) {
|
||||
this.betaIps = betaIps;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter method for property <tt>content</tt>.
|
||||
*
|
||||
* @return property value of content
|
||||
*/
|
||||
public String getContent() {
|
||||
return content;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setter method for property <tt>content</tt>.
|
||||
*
|
||||
* @param content value to be assigned to property content
|
||||
*/
|
||||
public void setContent(String content) {
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter method for property <tt>type</tt>.
|
||||
*
|
||||
* @return property value of type
|
||||
*/
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setter method for property <tt>type</tt>.
|
||||
*
|
||||
* @param type value to be assigned to property type
|
||||
*/
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,109 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.api.config.remote.request;
|
||||
|
||||
import com.alibaba.nacos.api.remote.request.ServerRequest;
|
||||
|
||||
/**
|
||||
* ConfigReSyncRequest.
|
||||
*
|
||||
* @author liuzunfei
|
||||
* @version $Id: ConfigReSyncRequest.java, v 0.1 2020年07月14日 3:20 PM liuzunfei Exp $
|
||||
*/
|
||||
public class ConfigReSyncRequest extends ServerRequest {
|
||||
|
||||
private String dataId;
|
||||
|
||||
private String group;
|
||||
|
||||
private String tenant;
|
||||
|
||||
@Override
|
||||
public String getModule() {
|
||||
return "config";
|
||||
}
|
||||
|
||||
/**
|
||||
* build success response.
|
||||
*
|
||||
* @param dataId dataId
|
||||
* @param group group
|
||||
* @param tenant tenant
|
||||
* @return ConfigReSyncRequest
|
||||
*/
|
||||
public static ConfigReSyncRequest build(String dataId, String group, String tenant) {
|
||||
ConfigReSyncRequest response = new ConfigReSyncRequest();
|
||||
response.setDataId(dataId);
|
||||
response.setGroup(group);
|
||||
response.setTenant(tenant);
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter method for property <tt>dataId</tt>.
|
||||
*
|
||||
* @return property value of dataId
|
||||
*/
|
||||
public String getDataId() {
|
||||
return dataId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setter method for property <tt>dataId</tt>.
|
||||
*
|
||||
* @param dataId value to be assigned to property dataId
|
||||
*/
|
||||
public void setDataId(String dataId) {
|
||||
this.dataId = dataId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter method for property <tt>group</tt>.
|
||||
*
|
||||
* @return property value of group
|
||||
*/
|
||||
public String getGroup() {
|
||||
return group;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setter method for property <tt>group</tt>.
|
||||
*
|
||||
* @param group value to be assigned to property group
|
||||
*/
|
||||
public void setGroup(String group) {
|
||||
this.group = group;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter method for property <tt>tenant</tt>.
|
||||
*
|
||||
* @return property value of tenant
|
||||
*/
|
||||
public String getTenant() {
|
||||
return tenant;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setter method for property <tt>tenant</tt>.
|
||||
*
|
||||
* @param tenant value to be assigned to property tenant
|
||||
*/
|
||||
public void setTenant(String tenant) {
|
||||
this.tenant = tenant;
|
||||
}
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.api.config.remote.response;
|
||||
|
||||
import com.alibaba.nacos.api.remote.response.Response;
|
||||
|
||||
/**
|
||||
* config change notify response from client.
|
||||
* @author liuzunfei
|
||||
* @version $Id: ConfigChangeNotifyResponse.java, v 0.1 2020年09月01日 2:59 PM liuzunfei Exp $
|
||||
*/
|
||||
public class ConfigReSyncResponse extends Response {
|
||||
|
||||
}
|
@ -17,14 +17,14 @@
|
||||
package com.alibaba.nacos.api.naming.remote.request;
|
||||
|
||||
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
|
||||
import com.alibaba.nacos.api.remote.request.ServerPushRequest;
|
||||
import com.alibaba.nacos.api.remote.request.ServerRequest;
|
||||
|
||||
/**
|
||||
* Notify subscriber response.
|
||||
*
|
||||
* @author xiweng.yy
|
||||
*/
|
||||
public class NotifySubscriberRequest extends ServerPushRequest {
|
||||
public class NotifySubscriberRequest extends ServerRequest {
|
||||
|
||||
private String namespace;
|
||||
|
||||
|
@ -22,7 +22,7 @@ package com.alibaba.nacos.api.remote.request;
|
||||
* @author liuzunfei
|
||||
* @version $Id: ConnectResetResponse.java, v 0.1 2020年07月15日 11:11 AM liuzunfei Exp $
|
||||
*/
|
||||
public class ConnectResetRequest extends ServerPushRequest {
|
||||
public class ConnectResetRequest extends ServerRequest {
|
||||
|
||||
String serverIp;
|
||||
|
||||
|
@ -26,6 +26,8 @@ public class ServerReloadRequest extends InternalRequest {
|
||||
|
||||
int reloadCount = 0;
|
||||
|
||||
String reloadServer;
|
||||
|
||||
/**
|
||||
* Getter method for property <tt>reloadCount</tt>.
|
||||
*
|
||||
@ -43,4 +45,12 @@ public class ServerReloadRequest extends InternalRequest {
|
||||
public void setReloadCount(int reloadCount) {
|
||||
this.reloadCount = reloadCount;
|
||||
}
|
||||
|
||||
public String getReloadServer() {
|
||||
return reloadServer;
|
||||
}
|
||||
|
||||
public void setReloadServer(String reloadServer) {
|
||||
this.reloadServer = reloadServer;
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,6 @@ package com.alibaba.nacos.api.remote.request;
|
||||
* @version $Id: ServerPushResponse.java, v 0.1 2020年07月20日 1:21 PM liuzunfei Exp $
|
||||
*/
|
||||
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
|
||||
public abstract class ServerPushRequest extends Request {
|
||||
public abstract class ServerRequest extends Request {
|
||||
|
||||
}
|
@ -24,11 +24,13 @@ import com.alibaba.nacos.api.config.remote.request.ConfigBatchListenRequest;
|
||||
import com.alibaba.nacos.api.config.remote.request.ConfigChangeNotifyRequest;
|
||||
import com.alibaba.nacos.api.config.remote.request.ConfigPublishRequest;
|
||||
import com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest;
|
||||
import com.alibaba.nacos.api.config.remote.request.ConfigReSyncRequest;
|
||||
import com.alibaba.nacos.api.config.remote.request.ConfigRemoveRequest;
|
||||
import com.alibaba.nacos.api.config.remote.response.ConfigChangeBatchListenResponse;
|
||||
import com.alibaba.nacos.api.config.remote.response.ConfigChangeNotifyResponse;
|
||||
import com.alibaba.nacos.api.config.remote.response.ConfigPublishResponse;
|
||||
import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse;
|
||||
import com.alibaba.nacos.api.config.remote.response.ConfigReSyncResponse;
|
||||
import com.alibaba.nacos.api.config.remote.response.ConfigRemoveResponse;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.api.remote.RemoteConstants;
|
||||
@ -525,7 +527,7 @@ public class ClientWorker implements Closeable {
|
||||
|
||||
public class ConfigRpcTransportClient extends ConfigTransportClient {
|
||||
|
||||
private BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<Object>(1);
|
||||
private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<Object>(1);
|
||||
|
||||
private Object bellItem = new Object();
|
||||
|
||||
@ -557,29 +559,25 @@ public class ClientWorker implements Closeable {
|
||||
|
||||
private void initRpcClientHandler(final RpcClient rpcClientInner) {
|
||||
/*
|
||||
* Register Listen Change Handler
|
||||
* Register Config Change /Config ReSync Handler
|
||||
*/
|
||||
rpcClientInner.registerServerRequestHandler((request, requestMeta) -> {
|
||||
if (request instanceof ConfigChangeNotifyRequest) {
|
||||
ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
|
||||
LOGGER.info("[{}] [server-push] config changed. dataId={}, group={}", getName(),
|
||||
configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup());
|
||||
if (request instanceof ConfigChangeNotifyRequest || request instanceof ConfigReSyncRequest) {
|
||||
ConfigReSyncRequest configReSyncRequest = (ConfigReSyncRequest) request;
|
||||
LOGGER.info("[{}] [server-push] config {}. dataId={}, group={}", rpcClientInner.getName(),
|
||||
(request instanceof ConfigChangeNotifyRequest) ? "changed" : "re sync",
|
||||
configReSyncRequest.getDataId(), configReSyncRequest.getGroup());
|
||||
String groupKey = GroupKey
|
||||
.getKeyTenant(configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(),
|
||||
configChangeNotifyRequest.getTenant());
|
||||
.getKeyTenant(configReSyncRequest.getDataId(), configReSyncRequest.getGroup(),
|
||||
configReSyncRequest.getTenant());
|
||||
|
||||
CacheData cacheData = cacheMap.get().get(groupKey);
|
||||
if (cacheData != null) {
|
||||
if (configChangeNotifyRequest.isContentPush()
|
||||
&& cacheData.getLastModifiedTs() < configChangeNotifyRequest.getLastModifiedTs()) {
|
||||
cacheData.setContent(configChangeNotifyRequest.getContent());
|
||||
cacheData.setType(configChangeNotifyRequest.getType());
|
||||
cacheData.checkListenerMd5();
|
||||
}
|
||||
cacheData.setSync(false);
|
||||
notifyListenConfig();
|
||||
}
|
||||
return new ConfigChangeNotifyResponse();
|
||||
return (request instanceof ConfigChangeNotifyRequest) ? new ConfigChangeNotifyResponse()
|
||||
: new ConfigReSyncResponse();
|
||||
}
|
||||
return null;
|
||||
});
|
||||
@ -746,7 +744,7 @@ public class ClientWorker implements Closeable {
|
||||
changeConfig.getTenant());
|
||||
changeKeys.add(changeKey);
|
||||
boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
|
||||
this.executor.execute(() -> refreshContentAndCheck(changeKey, !isInitializing));
|
||||
refreshContentAndCheck(changeKey, !isInitializing);
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -130,12 +130,12 @@ public abstract class RpcClient implements Closeable {
|
||||
if (connectionEventListeners.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "Notify disconnected event to listeners");
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Notify disconnected event to listeners", name);
|
||||
for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
|
||||
try {
|
||||
connectionEventListener.onDisConnect();
|
||||
} catch (Throwable throwable) {
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "notify disconnect listener error,listener ={}",
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Notify disconnect listener error,listener ={}", name,
|
||||
connectionEventListener.getClass().getName());
|
||||
}
|
||||
}
|
||||
@ -148,12 +148,12 @@ public abstract class RpcClient implements Closeable {
|
||||
if (connectionEventListeners.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "Notify connected event to listeners.");
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Notify connected event to listeners.", name);
|
||||
for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
|
||||
try {
|
||||
connectionEventListener.onConnected();
|
||||
} catch (Throwable throwable) {
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "notify connect listener error,listener ={}",
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Notify connect listener error,listener ={}", name,
|
||||
connectionEventListener.getClass().getName());
|
||||
}
|
||||
}
|
||||
@ -198,7 +198,7 @@ public abstract class RpcClient implements Closeable {
|
||||
this.serverListFactory = serverListFactory;
|
||||
rpcClientStatus.compareAndSet(RpcClientStatus.WAIT_INIT, RpcClientStatus.INITIALIZED);
|
||||
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "RpcClient init, ServerListFactory ={}",
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]RpcClient init, ServerListFactory ={}", name,
|
||||
serverListFactory.getClass().getName());
|
||||
}
|
||||
|
||||
@ -209,7 +209,7 @@ public abstract class RpcClient implements Closeable {
|
||||
*/
|
||||
public void initLabels(Map<String, String> labels) {
|
||||
this.labels.putAll(labels);
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "RpcClient init label, labels={}", this.labels);
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]RpcClient init label, labels={}", name, this.labels);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -262,20 +262,19 @@ public abstract class RpcClient implements Closeable {
|
||||
startUpRetryTimes--;
|
||||
ServerInfo serverInfo = nextRpcServer();
|
||||
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER,
|
||||
String.format("[%s] try to connect to server on start up, server: %s", name, serverInfo));
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] try to connect to server on start up, server: {}", name,
|
||||
serverInfo);
|
||||
|
||||
connectToServer = connectToServer(serverInfo);
|
||||
} catch (Exception e) {
|
||||
LoggerUtils.printIfWarnEnabled(LOGGER, String.format(
|
||||
"Fail to connect to server on start up, error message=%s, start up retry times left: %s",
|
||||
e.getMessage(), startUpRetryTimes));
|
||||
LoggerUtils.printIfWarnEnabled(LOGGER,
|
||||
"[{}]Fail to connect to server on start up, error message={}, start up retry times left: {}",
|
||||
name, e.getMessage(), startUpRetryTimes);
|
||||
}
|
||||
}
|
||||
|
||||
if (connectToServer != null) {
|
||||
LoggerUtils
|
||||
.printIfInfoEnabled(LOGGER, String.format("[%s] success to connect to server on start up", name));
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] success to connect to server on start up", name);
|
||||
this.currentConnection = connectToServer;
|
||||
rpcClientStatus.set(RpcClientStatus.RUNNING);
|
||||
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
|
||||
@ -290,7 +289,7 @@ public abstract class RpcClient implements Closeable {
|
||||
try {
|
||||
RpcClient.this.shutdown();
|
||||
} catch (NacosException e) {
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "RpcClient shutdown exception, errorMessage ={}",
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]RpcClient shutdown exception, errorMessage ={}", name,
|
||||
e.getMessage());
|
||||
}
|
||||
|
||||
@ -325,7 +324,7 @@ public abstract class RpcClient implements Closeable {
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "Switch server error ", e);
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Switch server error ,{}", name, e);
|
||||
}
|
||||
return new ConnectResetResponse();
|
||||
}
|
||||
@ -372,9 +371,8 @@ public abstract class RpcClient implements Closeable {
|
||||
if (switchingFlag.get()) {
|
||||
return;
|
||||
}
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER,
|
||||
String.format("[%s] Submit server switch task : %s,onRequestFail=%s", name, recommendServerInfo,
|
||||
onRequestFail));
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Submit server switch task : {},onRequestFail={}", name,
|
||||
recommendServerInfo, onRequestFail);
|
||||
executor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
@ -389,14 +387,13 @@ public abstract class RpcClient implements Closeable {
|
||||
}
|
||||
|
||||
if (onRequestFail && serverCheck()) {
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER,
|
||||
String.format("[%s] Server check success : %s", name, recommendServer));
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Server check success : {}", name, recommendServer);
|
||||
rpcClientStatus.set(RpcClientStatus.RUNNING);
|
||||
return;
|
||||
}
|
||||
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER,
|
||||
String.format("[%s] Execute server switch task : %s", name, recommendServer));
|
||||
LoggerUtils
|
||||
.printIfInfoEnabled(LOGGER, "[{}] Execute server switch task : {}", name, recommendServer);
|
||||
|
||||
switchingFlag.compareAndSet(false, true);
|
||||
// loop until start client success.
|
||||
@ -414,8 +411,8 @@ public abstract class RpcClient implements Closeable {
|
||||
//2.create a new channel to new server
|
||||
Connection connectionNew = connectToServer(serverInfo);
|
||||
if (connectionNew != null) {
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER,
|
||||
String.format("[%s] success to connect server : %s", name, serverInfo));
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] success to connect server : {}", name,
|
||||
serverInfo);
|
||||
//successfully create a new connect.
|
||||
if (currentConnection != null) {
|
||||
//set current connection to enable connection event.
|
||||
@ -445,9 +442,9 @@ public abstract class RpcClient implements Closeable {
|
||||
|
||||
if (reConnectTimes > 0
|
||||
&& reConnectTimes % RpcClient.this.serverListFactory.getServerList().size() == 0) {
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, String.format(
|
||||
"[%s] fail to connect server,after trying %s times, last try server is %s", name,
|
||||
reConnectTimes, serverInfo));
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER,
|
||||
"[{}] fail to connect server,after trying {} times, last try server is {}", name,
|
||||
reConnectTimes, serverInfo);
|
||||
if (Integer.MAX_VALUE == retryTurns) {
|
||||
retryTurns = 50;
|
||||
} else {
|
||||
@ -469,12 +466,12 @@ public abstract class RpcClient implements Closeable {
|
||||
}
|
||||
|
||||
if (isShutdown()) {
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER,
|
||||
String.format("[%s] client is shutdown ,stop reconnect to server", name));
|
||||
LoggerUtils
|
||||
.printIfInfoEnabled(LOGGER, "[{}] client is shutdown ,stop reconnect to server", name);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LoggerUtils.printIfWarnEnabled(LOGGER, String.format("[%s] fail to connect to server", name));
|
||||
LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] fail to connect to server", name);
|
||||
} finally {
|
||||
switchingFlag.set(false);
|
||||
switchingLock.unlock();
|
||||
@ -606,9 +603,9 @@ public abstract class RpcClient implements Closeable {
|
||||
&& ((NacosException) e).getErrCode() == NacosException.CLIENT_DISCONNECT) {
|
||||
// Do nothing.
|
||||
} else {
|
||||
LoggerUtils
|
||||
.printIfErrorEnabled(LOGGER, "send request fail, request={}, retryTimes={},errorMessage={}",
|
||||
request, retryTimes, e.getMessage());
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER,
|
||||
"[{}]send request fail, request={}, retryTimes={},errorMessage={}", name, request,
|
||||
retryTimes, e.getMessage());
|
||||
}
|
||||
}
|
||||
retryTimes++;
|
||||
@ -649,9 +646,9 @@ public abstract class RpcClient implements Closeable {
|
||||
&& ((NacosException) e).getErrCode() == NacosException.CLIENT_DISCONNECT) {
|
||||
// Do nothing.
|
||||
} else {
|
||||
LoggerUtils
|
||||
.printIfErrorEnabled(LOGGER, "send request fail, request={}, retryTimes={},errorMessage={}",
|
||||
request, retryTimes, e.getMessage());
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER,
|
||||
"[{}]send request fail, request={}, retryTimes={},errorMessage={}", name, request,
|
||||
retryTimes, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -685,7 +682,7 @@ public abstract class RpcClient implements Closeable {
|
||||
*/
|
||||
protected Response handleServerRequest(final Request request, final RequestMeta meta) {
|
||||
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "receive server push request,request={},requestId={}",
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]receive server push request,request={},requestId={}", name,
|
||||
request.getClass().getSimpleName(), request.getRequestId());
|
||||
for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
|
||||
try {
|
||||
@ -694,7 +691,7 @@ public abstract class RpcClient implements Closeable {
|
||||
return response;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "handleServerRequest:{}, errorMessage={}",
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]handleServerRequest:{}, errorMessage={}", name,
|
||||
serverRequestHandler.getClass().getName(), e.getMessage());
|
||||
}
|
||||
|
||||
@ -709,7 +706,7 @@ public abstract class RpcClient implements Closeable {
|
||||
*/
|
||||
public synchronized void registerConnectionListener(ConnectionEventListener connectionEventListener) {
|
||||
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "Registry connection listener to current client:{}",
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Registry connection listener to current client:{}", name,
|
||||
connectionEventListener.getClass().getName());
|
||||
this.connectionEventListeners.add(connectionEventListener);
|
||||
}
|
||||
@ -720,7 +717,7 @@ public abstract class RpcClient implements Closeable {
|
||||
* @param serverRequestHandler serverRequestHandler
|
||||
*/
|
||||
public synchronized void registerServerRequestHandler(ServerRequestHandler serverRequestHandler) {
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "Register server push request handler:{}",
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Register server push request handler:{}", name,
|
||||
serverRequestHandler.getClass().getName());
|
||||
|
||||
this.serverRequestHandlers.add(serverRequestHandler);
|
||||
|
@ -127,8 +127,8 @@ public abstract class GrpcClient extends RpcClient {
|
||||
@Override
|
||||
public void onNext(Payload payload) {
|
||||
|
||||
LoggerUtils.printIfDebugEnabled(LOGGER, "Stream server request receive, original info: {}",
|
||||
payload.toString());
|
||||
LoggerUtils.printIfDebugEnabled(LOGGER, "[{}]Stream server request receive, original info: {}",
|
||||
GrpcClient.this.getName(), payload.toString());
|
||||
try {
|
||||
GrpcUtils.PlainRequest parse = GrpcUtils.parse(payload);
|
||||
final Request request = (Request) parse.getBody();
|
||||
@ -140,13 +140,14 @@ public abstract class GrpcClient extends RpcClient {
|
||||
response.setRequestId(request.getRequestId());
|
||||
sendResponse(response);
|
||||
} else {
|
||||
LOGGER.warn("Fail to process server request, ackId->{}", request.getRequestId());
|
||||
LOGGER.warn("[{}]Fail to process server request, ackId->{}", GrpcClient.this.getName(),
|
||||
request.getRequestId());
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LoggerUtils
|
||||
.printIfErrorEnabled(LOGGER, e.getMessage(), "Handle server request exception: {}",
|
||||
payload.toString());
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, e.getMessage(),
|
||||
"[{}]Handle server request exception: {}", GrpcClient.this.getName(),
|
||||
payload.toString());
|
||||
sendResponse(request.getRequestId(), false);
|
||||
}
|
||||
|
||||
@ -154,8 +155,8 @@ public abstract class GrpcClient extends RpcClient {
|
||||
|
||||
} catch (Exception e) {
|
||||
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "Error tp process server push response: {}",
|
||||
payload.getBody().getValue().toStringUtf8());
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error tp process server push response: {}",
|
||||
GrpcClient.this.getName(), payload.getBody().getValue().toStringUtf8());
|
||||
}
|
||||
}
|
||||
|
||||
@ -164,7 +165,8 @@ public abstract class GrpcClient extends RpcClient {
|
||||
boolean isRunning = isRunning();
|
||||
boolean isAbandon = grpcConn.isAbandon();
|
||||
if (isRunning && !isAbandon) {
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "Request stream error, switch server,error={}", throwable);
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}",
|
||||
GrpcClient.this.getName(), throwable);
|
||||
if (throwable instanceof StatusRuntimeException) {
|
||||
Status.Code code = ((StatusRuntimeException) throwable).getStatus().getCode();
|
||||
if (Status.UNAVAILABLE.getCode().equals(code) || Status.CANCELLED.getCode().equals(code)) {
|
||||
@ -174,8 +176,8 @@ public abstract class GrpcClient extends RpcClient {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LoggerUtils.printIfWarnEnabled(LOGGER, "ignore error event,isRunning:{},isAbandon={}", isRunning,
|
||||
isAbandon);
|
||||
LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]ignore error event,isRunning:{},isAbandon={}",
|
||||
GrpcClient.this.getName(), isRunning, isAbandon);
|
||||
}
|
||||
|
||||
}
|
||||
@ -185,13 +187,14 @@ public abstract class GrpcClient extends RpcClient {
|
||||
boolean isRunning = isRunning();
|
||||
boolean isAbandon = grpcConn.isAbandon();
|
||||
if (isRunning && !isAbandon) {
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "Request stream onCompleted, switch server");
|
||||
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream onCompleted, switch server",
|
||||
GrpcClient.this.getName());
|
||||
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
|
||||
switchServerAsync();
|
||||
}
|
||||
} else {
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "ignore complete event,isRunning:{},isAbandon={}", isRunning,
|
||||
isAbandon);
|
||||
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]ignore complete event,isRunning:{},isAbandon={}",
|
||||
GrpcClient.this.getName(), isRunning, isAbandon);
|
||||
}
|
||||
|
||||
}
|
||||
@ -203,7 +206,7 @@ public abstract class GrpcClient extends RpcClient {
|
||||
PushAckRequest request = PushAckRequest.build(ackId, success);
|
||||
this.currentConnection.request(request, buildMeta());
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error to send ack response, ackId->{}", ackId);
|
||||
LOGGER.error("[{}]Error to send ack response, ackId->{}", GrpcClient.this.getName(), ackId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -211,7 +214,8 @@ public abstract class GrpcClient extends RpcClient {
|
||||
try {
|
||||
((GrpcConnection) this.currentConnection).sendResponse(response);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error to send ack response, ackId->{}", response.getRequestId());
|
||||
LOGGER.error("[{}]Error to send ack response, ackId->{}", GrpcClient.this.getName(),
|
||||
response.getRequestId());
|
||||
}
|
||||
}
|
||||
|
||||
@ -242,7 +246,7 @@ public abstract class GrpcClient extends RpcClient {
|
||||
}
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Fail to connect to server!", e);
|
||||
LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -16,6 +16,9 @@
|
||||
|
||||
package com.alibaba.nacos.config.server.controller;
|
||||
|
||||
import com.alibaba.nacos.api.config.remote.request.ConfigChangeNotifyRequest;
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.api.remote.request.RequestMeta;
|
||||
import com.alibaba.nacos.config.server.constant.Constants;
|
||||
import com.alibaba.nacos.config.server.model.SampleResult;
|
||||
import com.alibaba.nacos.config.server.remote.ConfigChangeListenContext;
|
||||
@ -27,6 +30,7 @@ import com.alibaba.nacos.core.remote.Connection;
|
||||
import com.alibaba.nacos.core.remote.ConnectionManager;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.ui.ModelMap;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
@ -35,6 +39,7 @@ import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -99,7 +104,7 @@ public class CommunicationController {
|
||||
group = StringUtils.isBlank(group) ? Constants.DEFAULT_GROUP : group;
|
||||
// long polling listners.
|
||||
SampleResult result = longPollingService.getCollectSubscribleInfo(dataId, group, tenant);
|
||||
// rpc listerns.
|
||||
// rpc listeners.
|
||||
String groupKey = GroupKey2.getKey(dataId, group, tenant);
|
||||
Set<String> listenersClients = configChangeListenContext.getListeners(groupKey);
|
||||
SampleResult rpcSample = new SampleResult();
|
||||
@ -136,4 +141,43 @@ public class CommunicationController {
|
||||
return result;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify client to check config from server.
|
||||
*/
|
||||
@GetMapping("/watcherSyncConfig")
|
||||
public ResponseEntity watcherSyncConfig(@RequestParam("dataId") String dataId, @RequestParam("group") String group,
|
||||
@RequestParam(value = "tenant", required = false) String tenant,
|
||||
@RequestParam(value = "clientIp", required = false) String clientIp, ModelMap modelMap) {
|
||||
String groupKey = GroupKey2.getKey(dataId, group, tenant);
|
||||
Set<String> listenersClients = configChangeListenContext.getListeners(groupKey);
|
||||
List<Connection> listeners = new ArrayList<>();
|
||||
for (String connectionId : listenersClients) {
|
||||
Connection connection = connectionManager.getConnection(connectionId);
|
||||
if (connection != null) {
|
||||
if (StringUtils.isNotBlank(clientIp) && !connection.getMetaInfo().getClientIp().equals(clientIp)) {
|
||||
continue;
|
||||
}
|
||||
listeners.add(connection);
|
||||
}
|
||||
|
||||
}
|
||||
if (!listeners.isEmpty()) {
|
||||
ConfigChangeNotifyRequest notifyRequest = new ConfigChangeNotifyRequest();
|
||||
notifyRequest.setDataId(dataId);
|
||||
notifyRequest.setGroup(group);
|
||||
notifyRequest.setTenant(tenant);
|
||||
for (Connection connectionByIp : listeners) {
|
||||
try {
|
||||
connectionByIp.request(notifyRequest, new RequestMeta());
|
||||
} catch (NacosException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
return ResponseEntity.ok().body(trueStr);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -26,7 +26,6 @@ import com.alibaba.nacos.common.utils.CollectionUtils;
|
||||
import com.alibaba.nacos.config.server.model.event.LocalDataChangeEvent;
|
||||
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
|
||||
import com.alibaba.nacos.config.server.utils.GroupKey;
|
||||
import com.alibaba.nacos.config.server.utils.PropertyUtil;
|
||||
import com.alibaba.nacos.core.remote.Connection;
|
||||
import com.alibaba.nacos.core.remote.ConnectionManager;
|
||||
import com.alibaba.nacos.core.remote.RpcPushService;
|
||||
@ -63,10 +62,11 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
|
||||
/**
|
||||
* adaptor to config module ,when server side config change ,invoke this method.
|
||||
*
|
||||
* @param groupKey groupKey
|
||||
* @param notifyRequet notifyRequet
|
||||
* @param groupKey groupKey
|
||||
* @param notifyRequest notifyRequest
|
||||
*/
|
||||
public void configDataChanged(String groupKey, final ConfigChangeNotifyRequest notifyRequet) {
|
||||
public void configDataChanged(String groupKey, final ConfigChangeNotifyRequest notifyRequest, boolean isBeta,
|
||||
List<String> betaIps) {
|
||||
|
||||
Set<String> listeners = configChangeListenContext.getListeners(groupKey);
|
||||
if (CollectionUtils.isEmpty(listeners)) {
|
||||
@ -80,14 +80,13 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (notifyRequet.isBeta()) {
|
||||
List<String> betaIps = notifyRequet.getBetaIps();
|
||||
if (isBeta) {
|
||||
if (betaIps != null && !betaIps.contains(connection.getMetaInfo().getClientIp())) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequet, 50, client,
|
||||
RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client,
|
||||
connection.getMetaInfo().getClientIp(), connection.getMetaInfo().getConnectionId());
|
||||
push(rpcPushRetryTask);
|
||||
notifyCount++;
|
||||
@ -106,16 +105,7 @@ public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> {
|
||||
String group = strings[1];
|
||||
String tenant = strings.length > 2 ? strings[2] : "";
|
||||
ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant);
|
||||
notifyRequest.setBeta(isBeta);
|
||||
notifyRequest.setBetaIps(betaIps);
|
||||
if (PropertyUtil.isPushContent()) {
|
||||
notifyRequest.setContent(event.content);
|
||||
notifyRequest.setType(event.type);
|
||||
notifyRequest.setLastModifiedTs(event.lastModifiedTs);
|
||||
notifyRequest.setContentPush(true);
|
||||
}
|
||||
|
||||
configDataChanged(groupKey, notifyRequest);
|
||||
configDataChanged(groupKey, notifyRequest, isBeta, betaIps);
|
||||
|
||||
}
|
||||
|
||||
|
@ -38,8 +38,6 @@ public class PropertyUtil implements ApplicationContextInitializer<ConfigurableA
|
||||
|
||||
private static boolean isHealthCheck = true;
|
||||
|
||||
private static boolean pushContent = false;
|
||||
|
||||
private static int maxContent = 10 * 1024 * 1024;
|
||||
|
||||
/**
|
||||
@ -103,24 +101,6 @@ public class PropertyUtil implements ApplicationContextInitializer<ConfigurableA
|
||||
*/
|
||||
private static boolean embeddedStorage = EnvUtil.getStandaloneMode();
|
||||
|
||||
/**
|
||||
* Getter method for property <tt>pushContent</tt>.
|
||||
*
|
||||
* @return property value of pushContent
|
||||
*/
|
||||
public static boolean isPushContent() {
|
||||
return pushContent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setter method for property <tt>pushContent</tt>.
|
||||
*
|
||||
* @param pushContent value to be assigned to property pushContent
|
||||
*/
|
||||
public static void setPushContent(boolean pushContent) {
|
||||
PropertyUtil.pushContent = pushContent;
|
||||
}
|
||||
|
||||
public static int getNotifyConnectTimeout() {
|
||||
return notifyConnectTimeout;
|
||||
}
|
||||
@ -292,9 +272,6 @@ public class PropertyUtil implements ApplicationContextInitializer<ConfigurableA
|
||||
setDefaultMaxAggrSize(getInt("defaultMaxAggrSize", defaultMaxAggrSize));
|
||||
setCorrectUsageDelay(getInt("correctUsageDelay", correctUsageDelay));
|
||||
setInitialExpansionPercent(getInt("initialExpansionPercent", initialExpansionPercent));
|
||||
setPushContent(getBoolean("isPushContent", false));
|
||||
LOGGER.info("isPushContent:{}", pushContent);
|
||||
|
||||
// External data sources are used by default in cluster mode
|
||||
setUseExternalDB("mysql".equalsIgnoreCase(getString("spring.datasource.platform", "")));
|
||||
|
||||
|
@ -27,6 +27,7 @@ import com.alibaba.nacos.auth.common.ActionTypes;
|
||||
import com.alibaba.nacos.common.executor.ExecutorFactory;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
import com.alibaba.nacos.config.server.utils.LogUtil;
|
||||
import com.alibaba.nacos.config.server.utils.RequestUtil;
|
||||
import com.alibaba.nacos.console.security.nacos.NacosAuthConfig;
|
||||
import com.alibaba.nacos.core.cluster.Member;
|
||||
import com.alibaba.nacos.core.cluster.MemberUtil;
|
||||
@ -38,6 +39,8 @@ import com.alibaba.nacos.core.remote.core.ServerLoaderInfoRequestHandler;
|
||||
import com.alibaba.nacos.core.remote.core.ServerReloaderRequestHandler;
|
||||
import com.alibaba.nacos.core.utils.RemoteUtils;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
@ -45,6 +48,8 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
@ -60,7 +65,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* controller to controll server loader.
|
||||
* controller to control server loader.
|
||||
*
|
||||
* @author liuzunfei
|
||||
* @version $Id: ServerLoaderController.java, v 0.1 2020年07月22日 4:28 PM liuzunfei Exp $
|
||||
@ -69,6 +74,8 @@ import java.util.concurrent.TimeoutException;
|
||||
@RequestMapping("/v1/console/loader")
|
||||
public class ServerLoaderController {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(HealthController.class);
|
||||
|
||||
@Autowired
|
||||
private ConnectionManager connectionManager;
|
||||
|
||||
@ -104,7 +111,7 @@ public class ServerLoaderController {
|
||||
@GetMapping("/max")
|
||||
public ResponseEntity updateMaxClients(@RequestParam Integer count) {
|
||||
Map<String, String> responseMap = new HashMap<>(3);
|
||||
connectionManager.coordinateMaxClientsSmoth(count);
|
||||
connectionManager.setMaxClientCount(count);
|
||||
return ResponseEntity.ok().body("success");
|
||||
}
|
||||
|
||||
@ -122,6 +129,84 @@ public class ServerLoaderController {
|
||||
return ResponseEntity.ok().body("success");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get server state of current server.
|
||||
*
|
||||
* @return state json.
|
||||
*/
|
||||
@Secured(resource = NacosAuthConfig.CONSOLE_RESOURCE_NAME_PREFIX + "loader", action = ActionTypes.WRITE)
|
||||
@GetMapping("/smartReload")
|
||||
public ResponseEntity smartReload(HttpServletRequest request,
|
||||
@RequestParam(value = "loaderFactor", required = false) String loaderFactorStr) {
|
||||
|
||||
LOGGER.info("Smart reload request receive,requestIp={}", RequestUtil.getRemoteIp(request));
|
||||
|
||||
Map<String, Object> serverLoadMetrics = getServerLoadMetrics();
|
||||
Object avgString = (Object) serverLoadMetrics.get("avg");
|
||||
List<ServerLoaderMetrics> details = (List<ServerLoaderMetrics>) serverLoadMetrics.get("detail");
|
||||
int avg = Integer.valueOf(avgString.toString());
|
||||
float loaderFactor =
|
||||
StringUtils.isBlank(loaderFactorStr) ? RemoteUtils.LOADER_FACTOR : Float.valueOf(loaderFactorStr);
|
||||
int overLimitCount = (int) (avg * (1 + loaderFactor));
|
||||
int lowLimitCount = (int) (avg * (1 - loaderFactor));
|
||||
|
||||
List<ServerLoaderMetrics> overLimitServer = new ArrayList<ServerLoaderMetrics>();
|
||||
List<ServerLoaderMetrics> lowLimitServer = new ArrayList<ServerLoaderMetrics>();
|
||||
|
||||
for (ServerLoaderMetrics metrics : details) {
|
||||
int sdkCount = Integer.valueOf(metrics.getMetric().get("sdkConCount"));
|
||||
if (sdkCount > overLimitCount) {
|
||||
overLimitServer.add(metrics);
|
||||
}
|
||||
if (sdkCount < lowLimitCount) {
|
||||
lowLimitServer.add(metrics);
|
||||
}
|
||||
}
|
||||
|
||||
// desc by sdkConCount
|
||||
overLimitServer.sort((o1, o2) -> {
|
||||
Integer sdkCount1 = Integer.valueOf(o1.getMetric().get("sdkConCount"));
|
||||
Integer sdkCount2 = Integer.valueOf(o2.getMetric().get("sdkConCount"));
|
||||
return sdkCount1.compareTo(sdkCount2) * -1;
|
||||
});
|
||||
|
||||
LOGGER.info("Over load limit server list ={}", overLimitServer);
|
||||
|
||||
//asc by sdkConCount
|
||||
lowLimitServer.sort((o1, o2) -> {
|
||||
Integer sdkCount1 = Integer.valueOf(o1.getMetric().get("sdkConCount"));
|
||||
Integer sdkCount2 = Integer.valueOf(o2.getMetric().get("sdkConCount"));
|
||||
return sdkCount1.compareTo(sdkCount2);
|
||||
});
|
||||
|
||||
LOGGER.info("Low load limit server list ={}", lowLimitServer);
|
||||
|
||||
CompletionService<ServerReloadResponse> completionService = new ExecutorCompletionService<ServerReloadResponse>(
|
||||
executorService);
|
||||
for (int i = 0; i < overLimitServer.size() & i < lowLimitServer.size(); i++) {
|
||||
ServerReloadRequest serverLoaderInfoRequest = new ServerReloadRequest();
|
||||
serverLoaderInfoRequest.setReloadCount(overLimitCount);
|
||||
serverLoaderInfoRequest.setReloadServer(lowLimitServer.get(i).address);
|
||||
Member member = serverMemberManager.find(overLimitServer.get(i).address);
|
||||
|
||||
LOGGER.info("Reload task submit ,fromServer ={},toServer={}, ", overLimitServer.get(i).address,
|
||||
lowLimitServer.get(i).address);
|
||||
|
||||
if (serverMemberManager.getSelf().equals(member)) {
|
||||
try {
|
||||
serverReloaderRequestHandler.handle(serverLoaderInfoRequest, new RequestMeta());
|
||||
} catch (NacosException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
} else {
|
||||
completionService.submit(new ServerReLoaderRpcTask(serverLoaderInfoRequest, member));
|
||||
}
|
||||
}
|
||||
|
||||
return ResponseEntity.ok().body("ok");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get server state of current server.
|
||||
*
|
||||
@ -203,7 +288,7 @@ public class ServerLoaderController {
|
||||
*/
|
||||
@Secured(resource = NacosAuthConfig.CONSOLE_RESOURCE_NAME_PREFIX + "loader", action = ActionTypes.READ)
|
||||
@GetMapping("/clustermetric")
|
||||
public ResponseEntity clusterLoader() {
|
||||
public ResponseEntity loaderMetrics() {
|
||||
|
||||
Map<String, Object> serverLoadMetrics = getServerLoadMetrics();
|
||||
|
||||
@ -212,7 +297,7 @@ public class ServerLoaderController {
|
||||
|
||||
private Map<String, Object> getServerLoadMetrics() {
|
||||
|
||||
CompletionService<ServerLoaderMetris> completionService = new ExecutorCompletionService<ServerLoaderMetris>(
|
||||
CompletionService<ServerLoaderMetrics> completionService = new ExecutorCompletionService<ServerLoaderMetrics>(
|
||||
executorService);
|
||||
|
||||
int count = 0;
|
||||
@ -224,12 +309,12 @@ public class ServerLoaderController {
|
||||
}
|
||||
}
|
||||
|
||||
List<ServerLoaderMetris> responseList = new LinkedList<ServerLoaderMetris>();
|
||||
List<ServerLoaderMetrics> responseList = new LinkedList<ServerLoaderMetrics>();
|
||||
|
||||
try {
|
||||
ServerLoaderInfoResponse handle = serverLoaderInfoRequestHandler
|
||||
.handle(new ServerLoaderInfoRequest(), new RequestMeta());
|
||||
ServerLoaderMetris metris = new ServerLoaderMetris();
|
||||
ServerLoaderMetrics metris = new ServerLoaderMetrics();
|
||||
metris.setAddress(serverMemberManager.getSelf().getAddress());
|
||||
metris.setMetric(handle.getLoaderMetrics());
|
||||
responseList.add(metris);
|
||||
@ -237,13 +322,15 @@ public class ServerLoaderController {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
int resultCount = 0;
|
||||
for (int i = 0; i < count; i++) {
|
||||
try {
|
||||
Future<ServerLoaderMetris> f = completionService.poll(1000, TimeUnit.MILLISECONDS);
|
||||
Future<ServerLoaderMetrics> f = completionService.poll(1000, TimeUnit.MILLISECONDS);
|
||||
try {
|
||||
if (f != null) {
|
||||
ServerLoaderMetris response = f.get(500, TimeUnit.MILLISECONDS);
|
||||
ServerLoaderMetrics response = f.get(500, TimeUnit.MILLISECONDS);
|
||||
if (response != null) {
|
||||
resultCount++;
|
||||
responseList.add(response);
|
||||
}
|
||||
}
|
||||
@ -262,12 +349,15 @@ public class ServerLoaderController {
|
||||
Map<String, Object> responseMap = new HashMap<>(3);
|
||||
|
||||
responseMap.put("detail", responseList);
|
||||
responseMap.put("memberCount", count);
|
||||
responseMap.put("metricsCount", resultCount);
|
||||
|
||||
int max = 0;
|
||||
int min = -1;
|
||||
int total = 0;
|
||||
|
||||
for (ServerLoaderMetris serverLoaderMetris : responseList) {
|
||||
String sdkConCountStr = serverLoaderMetris.getMetric().get("sdkConCount");
|
||||
for (ServerLoaderMetrics serverLoaderMetrics : responseList) {
|
||||
String sdkConCountStr = serverLoaderMetrics.getMetric().get("sdkConCount");
|
||||
|
||||
if (StringUtils.isNotBlank(sdkConCountStr)) {
|
||||
int sdkConCount = Integer.valueOf(sdkConCountStr);
|
||||
@ -290,7 +380,7 @@ public class ServerLoaderController {
|
||||
|
||||
}
|
||||
|
||||
class ServerLoaderInfoRpcTask implements Callable<ServerLoaderMetris> {
|
||||
class ServerLoaderInfoRpcTask implements Callable<ServerLoaderMetrics> {
|
||||
|
||||
ServerLoaderInfoRequest request;
|
||||
|
||||
@ -302,18 +392,18 @@ public class ServerLoaderController {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerLoaderMetris call() throws Exception {
|
||||
public ServerLoaderMetrics call() throws Exception {
|
||||
|
||||
ServerLoaderInfoResponse response = (ServerLoaderInfoResponse) clusterRpcClientProxy
|
||||
.sendRequest(this.member, this.request);
|
||||
ServerLoaderMetris metris = new ServerLoaderMetris();
|
||||
ServerLoaderMetrics metris = new ServerLoaderMetrics();
|
||||
metris.setAddress(member.getAddress());
|
||||
metris.setMetric(response.getLoaderMetrics());
|
||||
return metris;
|
||||
}
|
||||
}
|
||||
|
||||
class ServerLoaderMetris {
|
||||
class ServerLoaderMetrics {
|
||||
|
||||
String address;
|
||||
|
||||
|
@ -167,10 +167,7 @@ public class ConnectionManager {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
|
||||
MetricsMonitor.getLongConnectionMonitor().set(connections.size());
|
||||
|
||||
long currentStamp = System.currentTimeMillis();
|
||||
Set<Map.Entry<String, Connection>> entries = connections.entrySet();
|
||||
boolean isLoaderClient = loadClient >= 0;
|
||||
int currentMaxClient = isLoaderClient ? loadClient : maxClient;
|
||||
@ -197,8 +194,9 @@ public class ConnectionManager {
|
||||
if (connection != null) {
|
||||
connection.asyncRequest(connectResetRequest, buildMeta(), null);
|
||||
Loggers.REMOTE
|
||||
.info("expel connection ,send switch server response connection id = {},connectResetRequest={} ",
|
||||
expelledClientId, connectResetRequest);
|
||||
.info("send connection reset server , connection id = {},recommendServerIp={}, recommendServerPort={}",
|
||||
expelledClientId, connectResetRequest.getServerIp(),
|
||||
connectResetRequest.getServerPort());
|
||||
}
|
||||
|
||||
} catch (ConnectionAlreadyClosedException e) {
|
||||
@ -229,7 +227,7 @@ public class ConnectionManager {
|
||||
return meta;
|
||||
}
|
||||
|
||||
public void coordinateMaxClientsSmoth(int maxClient) {
|
||||
public void setMaxClientCount(int maxClient) {
|
||||
this.maxClient = maxClient;
|
||||
}
|
||||
|
||||
@ -259,7 +257,7 @@ public class ConnectionManager {
|
||||
} catch (ConnectionAlreadyClosedException e) {
|
||||
unregister(connectionId);
|
||||
} catch (Exception e) {
|
||||
Loggers.REMOTE.error("error occurs when expel connetion :", connectionId, e);
|
||||
Loggers.REMOTE.error("error occurs when expel connection :", connectionId, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -18,7 +18,7 @@ package com.alibaba.nacos.core.remote;
|
||||
|
||||
import com.alibaba.nacos.api.exception.NacosException;
|
||||
import com.alibaba.nacos.api.remote.AbstractRequestCallBack;
|
||||
import com.alibaba.nacos.api.remote.request.ServerPushRequest;
|
||||
import com.alibaba.nacos.api.remote.request.ServerRequest;
|
||||
import com.alibaba.nacos.api.remote.PushCallBack;
|
||||
import com.alibaba.nacos.api.remote.response.Response;
|
||||
import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException;
|
||||
@ -47,7 +47,7 @@ public class RpcPushService {
|
||||
* @param request request.
|
||||
* @param requestCallBack requestCallBack.
|
||||
*/
|
||||
public void pushWithCallback(String connectionId, ServerPushRequest request, PushCallBack requestCallBack,
|
||||
public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack,
|
||||
Executor executor) {
|
||||
Connection connection = connectionManager.getConnection(connectionId);
|
||||
if (connection != null) {
|
||||
@ -93,7 +93,7 @@ public class RpcPushService {
|
||||
* @param connectionId connectionId.
|
||||
* @param request request.
|
||||
*/
|
||||
public void pushWithoutAck(String connectionId, ServerPushRequest request) {
|
||||
public void pushWithoutAck(String connectionId, ServerRequest request) {
|
||||
Connection connection = connectionManager.getConnection(connectionId);
|
||||
if (connection != null) {
|
||||
try {
|
||||
|
@ -23,6 +23,7 @@ import com.alibaba.nacos.api.remote.request.ServerReloadRequest;
|
||||
import com.alibaba.nacos.api.remote.response.ServerReloadResponse;
|
||||
import com.alibaba.nacos.core.remote.ConnectionManager;
|
||||
import com.alibaba.nacos.core.remote.RequestHandler;
|
||||
import com.alibaba.nacos.core.utils.Loggers;
|
||||
import com.alibaba.nacos.core.utils.RemoteUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
@ -45,6 +46,8 @@ public class ServerReloaderRequestHandler extends RequestHandler<ServerReloadReq
|
||||
@Override
|
||||
public ServerReloadResponse handle(ServerReloadRequest request, RequestMeta meta) throws NacosException {
|
||||
ServerReloadResponse response = new ServerReloadResponse();
|
||||
Loggers.REMOTE.info("server reload request receive,reload count={},redirectServer={},requestIp={}",
|
||||
request.getReloadCount(), request.getReloadServer(), meta.getClientIp());
|
||||
int reloadCount = request.getReloadCount();
|
||||
Map<String, String> filter = new HashMap<String, String>(2);
|
||||
filter.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
|
||||
@ -53,7 +56,7 @@ public class ServerReloaderRequestHandler extends RequestHandler<ServerReloadReq
|
||||
response.setMessage("ignore");
|
||||
} else {
|
||||
reloadCount = (int) Math.max(reloadCount, sdkCount * (1 - RemoteUtils.LOADER_FACTOR));
|
||||
connectionManager.loadCount(reloadCount, null);
|
||||
connectionManager.loadCount(reloadCount, request.getReloadServer());
|
||||
response.setMessage("ok");
|
||||
}
|
||||
return response;
|
||||
|
Loading…
Reference in New Issue
Block a user