Merge branch 'jraft-naming' of https://github.com/KomachiSion/nacos into jraft_naming

This commit is contained in:
chuntaojun 2020-09-23 20:41:16 +08:00
commit bb4bc3c457
5 changed files with 125 additions and 47 deletions

View File

@ -497,17 +497,17 @@ public class JRaftServer {
Configuration oldConf = instance.getConfiguration(groupName);
String oldLeader = Optional.ofNullable(instance.selectLeader(groupName)).orElse(PeerId.emptyPeer())
.getEndpoint().toString();
status = instance.refreshConfiguration(this.cliClientService, groupName, rpcRequestTimeoutMs);
if (!status.isOk()) {
Loggers.RAFT
.error("Fail to refresh route configuration for group : {}, status is : {}", groupName, status);
}
// fix issue #3661 https://github.com/alibaba/nacos/issues/3661
status = instance.refreshLeader(this.cliClientService, groupName, rpcRequestTimeoutMs);
if (!status.isOk()) {
Loggers.RAFT
.error("Fail to refresh leader for group : {}, status is : {}", groupName, status);
}
status = instance.refreshConfiguration(this.cliClientService, groupName, rpcRequestTimeoutMs);
if (!status.isOk()) {
Loggers.RAFT
.error("Fail to refresh route configuration for group : {}, status is : {}", groupName, status);
}
} catch (Exception e) {
Loggers.RAFT.error("Fail to refresh raft metadata info for group : {}, error is : {}", groupName, e);
}

View File

@ -18,10 +18,7 @@ package com.alibaba.nacos.naming.consistency;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.consistency.persistent.impl.PersistentServiceProcessor;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftConsistencyServiceImpl;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyServiceDelegateImpl;
import com.alibaba.nacos.naming.pojo.Record;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Service;
@ -36,29 +33,14 @@ import org.springframework.stereotype.Service;
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {
private final ClusterVersionJudgement versionJudgement;
private final RaftConsistencyServiceImpl oldPersistentConsistencyService;
private final PersistentServiceProcessor newPersistentConsistencyService;
private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService;
private final EphemeralConsistencyService ephemeralConsistencyService;
private volatile boolean switchNewPersistentService = false;
public DelegateConsistencyServiceImpl(ClusterVersionJudgement versionJudgement,
RaftConsistencyServiceImpl oldPersistentConsistencyService,
PersistentServiceProcessor newPersistentConsistencyService,
public DelegateConsistencyServiceImpl(PersistentConsistencyServiceDelegateImpl persistentConsistencyService,
EphemeralConsistencyService ephemeralConsistencyService) {
this.versionJudgement = versionJudgement;
this.oldPersistentConsistencyService = oldPersistentConsistencyService;
this.newPersistentConsistencyService = newPersistentConsistencyService;
this.persistentConsistencyService = persistentConsistencyService;
this.ephemeralConsistencyService = ephemeralConsistencyService;
this.init();
}
private void init() {
this.versionJudgement.registerObserver(isAllNewVersion -> switchNewPersistentService = isAllNewVersion, -1);
}
@Override
@ -81,8 +63,7 @@ public class DelegateConsistencyServiceImpl implements ConsistencyService {
// this special key is listened by both:
if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
oldPersistentConsistencyService.listen(key, listener);
newPersistentConsistencyService.listen(key, listener);
persistentConsistencyService.listen(key, listener);
ephemeralConsistencyService.listen(key, listener);
return;
}
@ -92,23 +73,15 @@ public class DelegateConsistencyServiceImpl implements ConsistencyService {
@Override
public void unListen(String key, RecordListener listener) throws NacosException {
ConsistencyService service = mapConsistencyService(key);
service.unListen(key, listener);
if (service instanceof PersistentConsistencyService && !switchNewPersistentService) {
newPersistentConsistencyService.unListen(key, listener);
}
mapConsistencyService(key).unListen(key, listener);
}
@Override
public boolean isAvailable() {
return ephemeralConsistencyService.isAvailable() && switchOne().isAvailable();
return ephemeralConsistencyService.isAvailable() && persistentConsistencyService.isAvailable();
}
private ConsistencyService mapConsistencyService(String key) {
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : switchOne();
}
private PersistentConsistencyService switchOne() {
return switchNewPersistentService ? newPersistentConsistencyService : oldPersistentConsistencyService;
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.consistency.persistent;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.persistent.impl.PersistentServiceProcessor;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftConsistencyServiceImpl;
import com.alibaba.nacos.naming.pojo.Record;
import org.springframework.stereotype.Component;
/**
* Persistent consistency service delegate.
*
* @author xiweng.yy
*/
@Component("persistentConsistencyServiceDelegate")
public class PersistentConsistencyServiceDelegateImpl implements PersistentConsistencyService {
private final ClusterVersionJudgement versionJudgement;
private final RaftConsistencyServiceImpl oldPersistentConsistencyService;
private final PersistentServiceProcessor newPersistentConsistencyService;
private volatile boolean switchNewPersistentService = false;
public PersistentConsistencyServiceDelegateImpl(ClusterVersionJudgement versionJudgement,
RaftConsistencyServiceImpl oldPersistentConsistencyService,
PersistentServiceProcessor newPersistentConsistencyService) {
this.versionJudgement = versionJudgement;
this.oldPersistentConsistencyService = oldPersistentConsistencyService;
this.newPersistentConsistencyService = newPersistentConsistencyService;
init();
}
private void init() {
this.versionJudgement.registerObserver(isAllNewVersion -> switchNewPersistentService = isAllNewVersion, -1);
}
@Override
public void put(String key, Record value) throws NacosException {
switchOne().put(key, value);
}
@Override
public void remove(String key) throws NacosException {
switchOne().remove(key);
}
@Override
public Datum get(String key) throws NacosException {
return switchOne().get(key);
}
@Override
public void listen(String key, RecordListener listener) throws NacosException {
oldPersistentConsistencyService.listen(key, listener);
newPersistentConsistencyService.listen(key, listener);
}
@Override
public void unListen(String key, RecordListener listener) throws NacosException {
newPersistentConsistencyService.unListen(key, listener);
oldPersistentConsistencyService.unListen(key, listener);
}
@Override
public boolean isAvailable() {
return switchOne().isAvailable();
}
private PersistentConsistencyService switchOne() {
return switchNewPersistentService ? newPersistentConsistencyService : oldPersistentConsistencyService;
}
}

View File

@ -263,7 +263,7 @@ public class PersistentServiceProcessor extends LogProcessor4CP implements Persi
.setData(ByteString.copyFrom(serializer.serialize(request))).build())
.whenComplete(((response, throwable) -> {
if (throwable == null) {
Loggers.RAFT.error("submit old raft data result : {}", response);
Loggers.RAFT.info("submit old raft data result : {}", response);
} else {
Loggers.RAFT.error("submit old raft data occur exception : {}", throwable);
}

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.naming.monitor;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftCore;
import com.alibaba.nacos.naming.consistency.persistent.raft.RaftPeer;
import com.alibaba.nacos.naming.core.ServiceManager;
@ -48,6 +49,9 @@ public class PerformanceLoggerThread {
@Autowired
private RaftCore raftCore;
@Autowired
private ClusterVersionJudgement versionJudgement;
private static final long PERIOD = 5 * 60;
@PostConstruct
@ -92,12 +96,22 @@ public class PerformanceLoggerThread {
MetricsMonitor.getTotalPushMonitor().set(pushService.getTotalPush());
MetricsMonitor.getFailedPushMonitor().set(pushService.getFailedPushCount());
if (raftCore.isLeader()) {
MetricsMonitor.getLeaderStatusMonitor().set(1);
} else if (raftCore.getPeerSet().local().state == RaftPeer.State.FOLLOWER) {
MetricsMonitor.getLeaderStatusMonitor().set(0);
} else {
MetricsMonitor.getLeaderStatusMonitor().set(2);
metricsRaftLeader();
}
/**
* Will deprecated after v1.4.x
*/
@Deprecated
private void metricsRaftLeader() {
if (!versionJudgement.allMemberIsNewVersion()) {
if (raftCore.isLeader()) {
MetricsMonitor.getLeaderStatusMonitor().set(1);
} else if (raftCore.getPeerSet().local().state == RaftPeer.State.FOLLOWER) {
MetricsMonitor.getLeaderStatusMonitor().set(0);
} else {
MetricsMonitor.getLeaderStatusMonitor().set(2);
}
}
}