[Asoc2022][issue#8458] Support ability negotiations between server and clients
This commit is contained in:
杨翊 SionYang 2023-09-28 14:09:59 +08:00 committed by GitHub
commit 1192e4c36e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
65 changed files with 2715 additions and 239 deletions

View File

@ -28,6 +28,7 @@ import java.io.Serializable;
* @author liuzunfei
* @version $Id: ClientAbilities.java, v 0.1 2021年01月24日 00:09 AM liuzunfei Exp $
*/
@Deprecated
public class ClientAbilities implements Serializable {
private static final long serialVersionUID = -3590789441404549261L;

View File

@ -29,6 +29,7 @@ import java.util.Objects;
* @author liuzunfei
* @version $Id: ServerAbilities.java, v 0.1 2021年01月24日 00:09 AM liuzunfei Exp $
*/
@Deprecated
public class ServerAbilities implements Serializable {
private static final long serialVersionUID = -2120543002911304171L;

View File

@ -0,0 +1,179 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.ability.constant;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
import java.util.stream.Collectors;
/**
* Ability key constant. It is used to constrain the ability key.<br/>
* <strong>Ensure that return value of {@link AbilityKey#getName()} is unique under one specify {@link AbilityMode}</strong>.
*
* @author Daydreamer
* @date 2022/8/31 12:27
**/
public enum AbilityKey {
/**
* For Test temporarily.
*/
SERVER_TEST_1("test_1", "just for junit test", AbilityMode.SERVER),
/**
* For Test temporarily.
*/
SERVER_TEST_2("test_2", "just for junit test", AbilityMode.SERVER),
/**
* For Test temporarily.
*/
SDK_CLIENT_TEST_1("test_1", "just for junit test", AbilityMode.SDK_CLIENT),
/**
* For Test temporarily.
*/
CLUSTER_CLIENT_TEST_1("test_1", "just for junit test", AbilityMode.CLUSTER_CLIENT);
/**
* the name of a certain ability.
*/
private final String keyName;
/**
* description or comment about this ability.
*/
private final String description;
/**
* ability mode, which endpoint hold this ability.
*/
private final AbilityMode mode;
AbilityKey(String keyName, String description, AbilityMode mode) {
this.keyName = keyName;
this.description = description;
this.mode = mode;
}
public String getName() {
return keyName;
}
public String getDescription() {
return description;
}
public AbilityMode getMode() {
return mode;
}
/**
* All key set.
*/
private static final Map<AbilityMode, Map<String, AbilityKey>> ALL_ABILITIES = new HashMap<>();
/**
* Get all keys.
*
* @return all keys
*/
public static Collection<AbilityKey> getAllValues(AbilityMode mode) {
return Collections.unmodifiableCollection(ALL_ABILITIES.get(mode).values());
}
/**
* Get all names.
*
* @return all names
*/
public static Collection<String> getAllNames(AbilityMode mode) {
return Collections.unmodifiableCollection(ALL_ABILITIES.get(mode).keySet());
}
/**
* Whether contains this name.
*
* @param name key name
* @return whether contains
*/
public static boolean isLegalKey(AbilityMode mode, String name) {
return ALL_ABILITIES.get(mode).containsKey(name);
}
/**
* Map the string key to enum.
*
* @param abilities map
* @return enum map
*/
public static Map<AbilityKey, Boolean> mapEnum(AbilityMode mode, Map<String, Boolean> abilities) {
if (abilities == null || abilities.isEmpty()) {
return Collections.emptyMap();
}
return abilities.entrySet()
.stream()
.filter(entry -> isLegalKey(mode, entry.getKey()))
.collect(Collectors.toMap((entry) -> getEnum(mode, entry.getKey()), Map.Entry::getValue));
}
/**.
* Map the string key to enum
*
* @param abilities map
* @return enum map
*/
public static Map<String, Boolean> mapStr(Map<AbilityKey, Boolean> abilities) {
if (abilities == null || abilities.isEmpty()) {
return Collections.emptyMap();
}
return abilities.entrySet()
.stream()
.collect(Collectors.toMap((entry) -> entry.getKey().getName(), Map.Entry::getValue));
}
/**.
* getter to obtain enum
*
* @param key string key
* @return enum
*/
public static AbilityKey getEnum(AbilityMode mode, String key) {
return ALL_ABILITIES.get(mode).get(key);
}
static {
// check for developer
// ensure that name filed is unique under a AbilityMode
try {
for (AbilityKey value : AbilityKey.values()) {
AbilityMode mode = value.mode;
Map<String, AbilityKey> map = ALL_ABILITIES.getOrDefault(mode, new HashMap<>());
AbilityKey previous = map.putIfAbsent(value.getName(), value);
if (previous != null) {
throw new IllegalStateException("Duplicate key name field " + value + " and " + previous + " under mode: " + mode);
}
ALL_ABILITIES.put(mode, map);
}
} catch (Throwable t) {
// for developer checking
t.printStackTrace();
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.ability.constant;
/**
* Ability mode.
*
* @author Daydreamer
* @date 2023/9/25 12:32
**/
public enum AbilityMode {
/**
* for server ability.
*/
SERVER,
/**
* for sdk client.
*/
SDK_CLIENT,
/**
* for cluster client.
*/
CLUSTER_CLIENT;
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.ability.constant;
/**.
* @author Daydreamer
* @description It is used to know a certain ability whether supporting.
* @date 2022/8/31 12:27
**/
public enum AbilityStatus {
/**.
* Support a certain ability
*/
SUPPORTED,
/**.
* Not support a certain ability
*/
NOT_SUPPORTED,
/**.
* Cannot find ability table, unknown
*/
UNKNOWN
}

View File

@ -21,6 +21,7 @@ package com.alibaba.nacos.api.ability.initializer;
*
* @author xiweng.yy
*/
@Deprecated
public interface AbilityInitializer<A> {
/**

View File

@ -0,0 +1,40 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.ability.initializer;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import java.util.Map;
/**
* Nacos ability post processor, load by spi.
*
* @author Daydreamer-ia
*/
public interface AbilityPostProcessor {
/**
* process before loading by <code>Ability Controller </code>.
*
* @param mode mode: sdk client, server or cluster client
* @param abilities abilities
*/
void process(AbilityMode mode, Map<AbilityKey, Boolean> abilities);
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.ability.register;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**.
* @author Daydreamer
* @description Operation for bit table.
* @date 2022/7/12 19:23
**/
public abstract class AbstractAbilityRegistry {
protected final Map<AbilityKey, Boolean> supportedAbilities = new HashMap<>();
/**.
* get static ability current server supports
*
* @return static ability
*/
public Map<AbilityKey, Boolean> getSupportedAbilities() {
return Collections.unmodifiableMap(supportedAbilities);
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.ability.register.impl;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry;
import java.util.Map;
/**
* It is used to register cluster client abilities.
*
* @author Daydreamer
**/
public class ClusterClientAbilities extends AbstractAbilityRegistry {
private static final ClusterClientAbilities INSTANCE = new ClusterClientAbilities();
{
/*
* example:
* There is a function named "compression".
* The key is from <p>AbilityKey</p>, the value is whether turn on.
*
* You can add a new public field in <p>AbilityKey</p> like:
* <code>DATA_COMPRESSION("compression", "description about this ability")</code>
*
* And then you need to declare whether turn on in the ability table, you can:
* <code>supportedAbilities.put(AbilityKey.DATA_COMPRESSION, true);</code> means that current client support compression.
*
*/
// put ability here, which you want current client supports
}
/**
* get static ability current cluster client supports.
*
* @return static ability
*/
public static Map<AbilityKey, Boolean> getStaticAbilities() {
return INSTANCE.getSupportedAbilities();
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.ability.register.impl;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry;
import java.util.Map;
/**
* It is used to register client abilities.
*
* @author Daydreamer
* @date 2022/8/31 12:32
**/
public class SdkClientAbilities extends AbstractAbilityRegistry {
private static final SdkClientAbilities INSTANCE = new SdkClientAbilities();
{
/*
* example:
* There is a function named "compression".
* The key is from <p>AbilityKey</p>, the value is whether turn on.
*
* You can add a new public field in <p>AbilityKey</p> like:
* <code>DATA_COMPRESSION("compression", "description about this ability")</code>
*
* And then you need to declare whether turn on in the ability table, you can:
* <code>supportedAbilities.put(AbilityKey.DATA_COMPRESSION, true);</code> means that current client support compression.
*
*/
// put ability here, which you want current client supports
}
/**.
* get static ability current server supports
*
* @return static ability
*/
public static Map<AbilityKey, Boolean> getStaticAbilities() {
return INSTANCE.getSupportedAbilities();
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.ability.register.impl;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry;
import java.util.Map;
/**
* It is used to register server abilities.
*
* @author Daydreamer
* @date 2022/8/31 12:32
**/
public class ServerAbilities extends AbstractAbilityRegistry {
private static final ServerAbilities INSTANCE = new ServerAbilities();
{
/*
* example:
* There is a function named "compression".
* The key is from <p>AbilityKey</p>, the value is whether turn on.
*
* You can add a new public field in <p>AbilityKey</p> like:
* <code>DATA_COMPRESSION("compression", "description about this ability")</code>
*
* And then you need to declare whether turn on in the ability table, you can:
* <code>supportedAbilities.put(AbilityKey.DATA_COMPRESSION, true);</code> means that current client support compression.
*
*/
// put ability here, which you want current server supports
}
/**.
* get static ability current server supports
*
* @return static ability
*/
public static Map<AbilityKey, Boolean> getStaticAbilities() {
return INSTANCE.getSupportedAbilities();
}
}

View File

@ -30,11 +30,31 @@ public class ConnectResetRequest extends ServerRequest {
String serverPort;
String connectionId;
@Override
public String getModule() {
return INTERNAL_MODULE;
}
/**
* Getter method for property <tt>connectionId</tt>.
*
* @return property value of connectionId
*/
public String getConnectionId() {
return connectionId;
}
/**
* Setter method for property <tt>connectionId</tt>.
*
* @param connectionId value to be assigned to property connectionId
*/
public void setConnectionId(String connectionId) {
this.connectionId = connectionId;
}
/**
* Getter method for property <tt>serverIp</tt>.
*

View File

@ -16,8 +16,6 @@
package com.alibaba.nacos.api.remote.request;
import com.alibaba.nacos.api.ability.ClientAbilities;
import java.util.HashMap;
import java.util.Map;
@ -31,12 +29,12 @@ public class ConnectionSetupRequest extends InternalRequest {
private String clientVersion;
private ClientAbilities abilities;
private String tenant;
private Map<String, String> labels = new HashMap<>();
private Map<String, Boolean> abilityTable;
public ConnectionSetupRequest() {
}
@ -64,11 +62,11 @@ public class ConnectionSetupRequest extends InternalRequest {
this.tenant = tenant;
}
public ClientAbilities getAbilities() {
return abilities;
public Map<String, Boolean> getAbilityTable() {
return abilityTable;
}
public void setAbilities(ClientAbilities abilities) {
this.abilities = abilities;
public void setAbilityTable(Map<String, Boolean> abilityTable) {
this.abilityTable = abilityTable;
}
}

View File

@ -16,6 +16,9 @@
package com.alibaba.nacos.api.remote.request;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import java.util.HashMap;
import java.util.Map;
@ -35,6 +38,24 @@ public class RequestMeta {
private Map<String, String> labels = new HashMap<>();
private Map<String, Boolean> abilityTable;
public AbilityStatus getConnectionAbility(AbilityKey abilityKey) {
if (abilityTable == null || !abilityTable.containsKey(abilityKey.getName())) {
return AbilityStatus.UNKNOWN;
}
return abilityTable.get(abilityKey.getName()) ? AbilityStatus.SUPPORTED : AbilityStatus.NOT_SUPPORTED;
}
/**
* Setter method for property <tt>abilityTable</tt>.
*
* @param abilityTable property value of clientVersion
*/
public void setAbilityTable(Map<String, Boolean> abilityTable) {
this.abilityTable = abilityTable;
}
/**
* Getter method for property <tt>clientVersion</tt>.
*

View File

@ -0,0 +1,52 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.request;
import java.util.Map;
import static com.alibaba.nacos.api.common.Constants.Remote.INTERNAL_MODULE;
/**
* Server tells the client that the connection is established.
*
* @author Daydreamer.
* @date 2022/7/12 19:21
**/
public class SetupAckRequest extends ServerRequest {
private Map<String, Boolean> abilityTable;
public SetupAckRequest() {
}
public SetupAckRequest(Map<String, Boolean> abilityTable) {
this.abilityTable = abilityTable;
}
public Map<String, Boolean> getAbilityTable() {
return abilityTable;
}
public void setAbilityTable(Map<String, Boolean> abilityTable) {
this.abilityTable = abilityTable;
}
@Override
public String getModule() {
return INTERNAL_MODULE;
}
}

View File

@ -26,12 +26,15 @@ public class ServerCheckResponse extends Response {
private String connectionId;
private boolean supportAbilityNegotiation;
public ServerCheckResponse() {
}
public ServerCheckResponse(String connectionId) {
public ServerCheckResponse(String connectionId, boolean supportAbilityNegotiation) {
this.connectionId = connectionId;
this.supportAbilityNegotiation = supportAbilityNegotiation;
}
public String getConnectionId() {
@ -41,4 +44,12 @@ public class ServerCheckResponse extends Response {
public void setConnectionId(String connectionId) {
this.connectionId = connectionId;
}
public boolean isSupportAbilityNegotiation() {
return supportAbilityNegotiation;
}
public void setSupportAbilityNegotiation(boolean supportAbilityNegotiation) {
this.supportAbilityNegotiation = supportAbilityNegotiation;
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.response;
/**.
* @author Daydreamer
* @description Server tells the client that the connection is established
* @date 2022/7/12 19:21
**/
public class SetupAckResponse extends Response {
}

View File

@ -22,6 +22,8 @@ com.alibaba.nacos.api.remote.request.PushAckRequest
com.alibaba.nacos.api.remote.request.ServerCheckRequest
com.alibaba.nacos.api.remote.request.ServerLoaderInfoRequest
com.alibaba.nacos.api.remote.request.ServerReloadRequest
com.alibaba.nacos.api.remote.request.SetupAckRequest
com.alibaba.nacos.api.remote.response.SetupAckResponse
com.alibaba.nacos.api.remote.response.ClientDetectionResponse
com.alibaba.nacos.api.remote.response.ConnectResetResponse
com.alibaba.nacos.api.remote.response.ErrorResponse

View File

@ -16,11 +16,11 @@
package com.alibaba.nacos.api.remote.request;
import com.alibaba.nacos.api.ability.ClientAbilities;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
public class ConnectionSetupRequestTest extends BasicRequestTest {
@ -28,7 +28,7 @@ public class ConnectionSetupRequestTest extends BasicRequestTest {
public void testSerialize() throws Exception {
ConnectionSetupRequest request = new ConnectionSetupRequest();
request.setClientVersion("2.2.2");
request.setAbilities(new ClientAbilities());
request.setAbilityTable(new HashMap<>());
request.setTenant("testNamespaceId");
request.setLabels(Collections.singletonMap("labelKey", "labelValue"));
request.setRequestId("1");
@ -37,7 +37,7 @@ public class ConnectionSetupRequestTest extends BasicRequestTest {
Assert.assertTrue(json.contains("\"clientVersion\":\"2.2.2\""));
Assert.assertTrue(json.contains("\"tenant\":\"testNamespaceId\""));
Assert.assertTrue(json.contains("\"labels\":{\"labelKey\":\"labelValue\"}"));
Assert.assertTrue(json.contains("\"abilities\":{"));
Assert.assertTrue(json.contains("\"abilityTable\":{"));
Assert.assertTrue(json.contains("\"module\":\"internal\""));
Assert.assertTrue(json.contains("\"requestId\":\"1\""));
}

View File

@ -38,7 +38,7 @@ public class ServerCheckResponseTest {
@Test
public void testSerialization() throws JsonProcessingException {
ServerCheckResponse response = new ServerCheckResponse("35643245_1.1.1.1_3306");
ServerCheckResponse response = new ServerCheckResponse("35643245_1.1.1.1_3306", false);
String actual = mapper.writeValueAsString(response);
assertTrue(actual.contains("\"connectionId\":\"35643245_1.1.1.1_3306\""));
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.utils;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
/**.
* @author Daydreamer
* @description Ability key test
* @date 2022/9/8 12:27
**/
public class AbilityKeyTest {
@Test
public void testMapStr() {
Map<AbilityKey, Boolean> enumMap = new HashMap<>();
Map<String, Boolean> stringBooleanMap = AbilityKey.mapStr(enumMap);
Assert.assertEquals(0, stringBooleanMap.size());
enumMap.put(AbilityKey.SERVER_TEST_1, true);
enumMap.put(AbilityKey.SERVER_TEST_2, false);
stringBooleanMap = AbilityKey.mapStr(enumMap);
Assert.assertEquals(2, stringBooleanMap.size());
Assert.assertTrue(stringBooleanMap.get(AbilityKey.SERVER_TEST_1.getName()));
Assert.assertFalse(stringBooleanMap.get(AbilityKey.SERVER_TEST_2.getName()));
enumMap.put(AbilityKey.SERVER_TEST_2, true);
stringBooleanMap = AbilityKey.mapStr(enumMap);
Assert.assertEquals(2, stringBooleanMap.size());
Assert.assertTrue(stringBooleanMap.get(AbilityKey.SERVER_TEST_1.getName()));
Assert.assertTrue(stringBooleanMap.get(AbilityKey.SERVER_TEST_2.getName()));
}
@Test
public void testMapEnum() {
Map<String, Boolean> mapStr = new HashMap<>();
mapStr.put("test-no-existed", true);
Map<AbilityKey, Boolean> enumMap = AbilityKey.mapEnum(AbilityMode.SERVER, mapStr);
Assert.assertEquals(0, enumMap.size());
mapStr.put(AbilityKey.SERVER_TEST_2.getName(), false);
mapStr.put(AbilityKey.SERVER_TEST_1.getName(), true);
enumMap = AbilityKey.mapEnum(AbilityMode.SERVER, mapStr);
Assert.assertFalse(enumMap.get(AbilityKey.SERVER_TEST_2));
Assert.assertTrue(enumMap.get(AbilityKey.SERVER_TEST_1));
mapStr.clear();
mapStr.put(AbilityKey.SERVER_TEST_2.getName(), true);
mapStr.put(AbilityKey.SERVER_TEST_1.getName(), true);
enumMap = AbilityKey.mapEnum(AbilityMode.SERVER, mapStr);
Assert.assertTrue(enumMap.get(AbilityKey.SERVER_TEST_2));
Assert.assertTrue(enumMap.get(AbilityKey.SERVER_TEST_1));
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.ability;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import com.alibaba.nacos.api.ability.register.impl.SdkClientAbilities;
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
import java.util.HashMap;
import java.util.Map;
/**.
* @author Daydreamer
* @description {@link AbstractAbilityControlManager} for nacos-client.
* @date 2022/7/13 13:38
**/
public class ClientAbilityControlManager extends AbstractAbilityControlManager {
public ClientAbilityControlManager() {
}
@Override
protected Map<AbilityMode, Map<AbilityKey, Boolean>> initCurrentNodeAbilities() {
Map<AbilityMode, Map<AbilityKey, Boolean>> abilities = new HashMap<>(1);
abilities.put(AbilityMode.SDK_CLIENT, SdkClientAbilities.getStaticAbilities());
return abilities;
}
@Override
public int getPriority() {
// if server ability manager exist, you should choose the server one
return 0;
}
}

View File

@ -17,7 +17,6 @@
package com.alibaba.nacos.client.config.impl;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.ability.ClientAbilities;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.config.ConfigType;
import com.alibaba.nacos.api.config.listener.Listener;
@ -37,6 +36,8 @@ import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RemoteConstants;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.plugin.auth.api.RequestResource;
import com.alibaba.nacos.client.config.common.GroupKey;
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
import com.alibaba.nacos.client.config.filter.impl.ConfigResponse;
@ -65,7 +66,6 @@ import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.plugin.auth.api.RequestResource;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.slf4j.Logger;
@ -642,7 +642,7 @@ public class ClientWorker implements Closeable {
/*
* Register Config Change /Config ReSync Handler
*/
rpcClientInner.registerServerRequestHandler((request) -> {
rpcClientInner.registerServerRequestHandler((request, connection) -> {
if (request instanceof ConfigChangeNotifyRequest) {
ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}",
@ -665,7 +665,7 @@ public class ClientWorker implements Closeable {
return null;
});
rpcClientInner.registerServerRequestHandler((request) -> {
rpcClientInner.registerServerRequestHandler((request, connection) -> {
if (request instanceof ClientConfigMetricRequest) {
ClientConfigMetricResponse response = new ClientConfigMetricResponse();
response.setMetrics(getMetrics(((ClientConfigMetricRequest) request).getMetricsKeys()));
@ -677,13 +677,13 @@ public class ClientWorker implements Closeable {
rpcClientInner.registerConnectionListener(new ConnectionEventListener() {
@Override
public void onConnected() {
public void onConnected(Connection connection) {
LOGGER.info("[{}] Connected,notify listen context...", rpcClientInner.getName());
notifyListenConfig();
}
@Override
public void onDisConnect() {
public void onDisConnect(Connection connection) {
String taskId = rpcClientInner.getLabels().get("taskId");
LOGGER.info("[{}] DisConnected,clear listen context...", rpcClientInner.getName());
Collection<CacheData> values = cacheMap.get().values();
@ -992,7 +992,6 @@ public class ClientWorker implements Closeable {
if (rpcClient.isWaitInitiated()) {
initRpcClientHandler(rpcClient);
rpcClient.setTenant(getTenant());
rpcClient.clientAbilities(initAbilities());
rpcClient.start();
}
@ -1001,13 +1000,6 @@ public class ClientWorker implements Closeable {
}
private ClientAbilities initAbilities() {
ClientAbilities clientAbilities = new ClientAbilities();
clientAbilities.getRemoteAbility().setSupportRemoteConnection(true);
clientAbilities.getConfigAbility().setSupportRemoteMetrics(true);
return clientAbilities;
}
/**
* build config string.
*

View File

@ -23,10 +23,10 @@ import com.alibaba.nacos.client.env.NacosClientProperties;
import com.alibaba.nacos.plugin.auth.api.RequestResource;
import com.alibaba.nacos.client.config.filter.impl.ConfigResponse;
import com.alibaba.nacos.client.security.SecurityProxy;
import com.alibaba.nacos.client.utils.ParamUtil;
import com.alibaba.nacos.common.utils.ConvertUtils;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.client.utils.ParamUtil;
import java.util.HashMap;
import java.util.Map;

View File

@ -23,7 +23,6 @@ import com.alibaba.nacos.client.env.NacosClientProperties;
import com.alibaba.nacos.client.utils.ContextPathUtil;
import com.alibaba.nacos.client.utils.EnvUtil;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.client.utils.ParamUtil;
import com.alibaba.nacos.client.utils.TemplateUtils;
import com.alibaba.nacos.common.http.HttpRestResult;
import com.alibaba.nacos.common.http.client.NacosRestTemplate;
@ -48,6 +47,7 @@ import java.util.StringTokenizer;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.alibaba.nacos.client.utils.ParamUtil;
import static com.alibaba.nacos.common.constant.RequestUrlConstants.HTTPS_PREFIX;
import static com.alibaba.nacos.common.constant.RequestUrlConstants.HTTP_PREFIX;

View File

@ -21,6 +21,7 @@ import com.alibaba.nacos.api.naming.remote.response.NotifySubscriberResponse;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.ServerRequestHandler;
/**
@ -37,7 +38,7 @@ public class NamingPushRequestHandler implements ServerRequestHandler {
}
@Override
public Response requestReply(Request request) {
public Response requestReply(Request request, Connection connection) {
if (request instanceof NotifySubscriberRequest) {
NotifySubscriberRequest notifyRequest = (NotifySubscriberRequest) request;
serviceInfoHolder.processServiceInfo(notifyRequest.getServiceInfo());

View File

@ -28,6 +28,7 @@ import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.SubscriberRedoData;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.ConnectionEventListener;
import java.util.HashSet;
@ -84,13 +85,13 @@ public class NamingGrpcRedoService implements ConnectionEventListener {
}
@Override
public void onConnected() {
public void onConnected(Connection connection) {
connected = true;
LogUtils.NAMING_LOGGER.info("Grpc connection connect");
}
@Override
public void onDisConnect() {
public void onDisConnect(Connection connection) {
connected = false;
LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect, mark to redo");
synchronized (registeredInstances) {

View File

@ -0,0 +1,18 @@
#
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
com.alibaba.nacos.client.ability.ClientAbilityControlManager

View File

@ -0,0 +1,174 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.ability;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.RequestFuture;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.naming.remote.TestConnection;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClientConfig;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.remote.client.ServerRequestHandler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class AbilityTest {
private RpcClient rpcClient;
private Connection connection;
@Test
public void testReceive() throws Exception {
rpcClient = new RpcClient(new RpcClientConfig() {
@Override
public String name() {
return "test";
}
@Override
public int retryTimes() {
return 1;
}
@Override
public long timeOutMills() {
return 3000L;
}
@Override
public long connectionKeepAlive() {
return 5000L;
}
@Override
public int healthCheckRetryTimes() {
return 1;
}
@Override
public long healthCheckTimeOut() {
return 3000L;
}
@Override
public Map<String, String> labels() {
return new HashMap<>();
}
}) {
@Override
public ConnectionType getConnectionType() {
return null;
}
@Override
public int rpcPortOffset() {
return 0;
}
@Override
public Connection connectToServer(ServerInfo serverInfo) throws Exception {
connection = new Connection(new RpcClient.ServerInfo()) {
{
super.abilityTable = new HashMap<>();
super.abilityTable.put(AbilityKey.SERVER_TEST_1.getName(), true);
super.abilityTable.put(AbilityKey.SERVER_TEST_2.getName(), false);
}
@Override
public Response request(Request request, long timeoutMills) throws NacosException {
return null;
}
@Override
public RequestFuture requestFuture(Request request) throws NacosException {
return null;
}
@Override
public void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException {
}
@Override
public void close() {
}
};;
return connection;
}
};
rpcClient.start();
// test not ready
Assert.assertNull(rpcClient.getConnectionAbility(AbilityKey.SERVER_TEST_1));
// test ready
rpcClient.serverListFactory(new ServerListFactory() {
@Override
public String genNextServer() {
return "localhost:8848";
}
@Override
public String getCurrentServer() {
return "localhost:8848";
}
@Override
public List<String> getServerList() {
return null;
}
});
rpcClient.start();
// if connect successfully
Assert.assertEquals(rpcClient.getConnectionAbility(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
Assert.assertEquals(rpcClient.getConnectionAbility(AbilityKey.SERVER_TEST_2), AbilityStatus.NOT_SUPPORTED);
}
@After
public void testServerRequestAbility() {
//test support
ServerRequestHandler serverRequestHandler = (request, connection) -> {
Assert.assertEquals(connection.getConnectionAbility(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
Assert.assertEquals(connection.getConnectionAbility(AbilityKey.SERVER_TEST_2), AbilityStatus.NOT_SUPPORTED);
return new Response() { };
};
serverRequestHandler.requestReply(null, connection);
// test no ability table
serverRequestHandler = (request, connection) -> {
Assert.assertEquals(connection.getConnectionAbility(AbilityKey.SERVER_TEST_1), AbilityStatus.UNKNOWN);
return new Response() { };
};
serverRequestHandler.requestReply(null, new TestConnection(new RpcClient.ServerInfo()));
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.naming.remote;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.RequestFuture;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
public class TestConnection extends Connection {
public TestConnection(RpcClient.ServerInfo serverInfo) {
super(serverInfo);
}
@Override
public Response request(Request request, long timeoutMills) throws NacosException {
return null;
}
@Override
public RequestFuture requestFuture(Request request) throws NacosException {
return null;
}
@Override
public void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException {
}
@Override
public void close() {
}
}

View File

@ -24,6 +24,8 @@ import com.alibaba.nacos.api.naming.remote.response.NotifySubscriberResponse;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder;
import com.alibaba.nacos.client.naming.remote.TestConnection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import org.junit.Assert;
import org.junit.Test;
@ -41,7 +43,7 @@ public class NamingPushRequestHandlerTest {
ServiceInfo info = new ServiceInfo("name", "cluster1");
Request req = NotifySubscriberRequest.buildNotifySubscriberRequest(info);
//when
Response response = handler.requestReply(req);
Response response = handler.requestReply(req, new TestConnection(new RpcClient.ServerInfo()));
//then
Assert.assertTrue(response instanceof NotifySubscriberResponse);
verify(holder, times(1)).processServiceInfo(info);

View File

@ -18,11 +18,13 @@ package com.alibaba.nacos.client.naming.remote.gprc.redo;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.client.naming.remote.TestConnection;
import com.alibaba.nacos.client.env.NacosClientProperties;
import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.BatchInstanceRedoData;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData;
import com.alibaba.nacos.client.naming.remote.gprc.redo.data.SubscriberRedoData;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.utils.ReflectUtils;
import org.junit.After;
import org.junit.Before;
@ -110,13 +112,13 @@ public class NamingGrpcRedoServiceTest {
@Test
public void testOnConnected() {
assertFalse(redoService.isConnected());
redoService.onConnected();
redoService.onConnected(new TestConnection(new RpcClient.ServerInfo()));
assertTrue(redoService.isConnected());
}
@Test
public void testOnDisConnect() {
redoService.onConnected();
redoService.onConnected(new TestConnection(new RpcClient.ServerInfo()));
redoService.cacheInstanceForRedo(SERVICE, GROUP, new Instance());
redoService.instanceRegistered(SERVICE, GROUP);
redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER);
@ -124,7 +126,7 @@ public class NamingGrpcRedoServiceTest {
assertTrue(redoService.isConnected());
assertTrue(redoService.findInstanceRedoData().isEmpty());
assertTrue(redoService.findSubscriberRedoData().isEmpty());
redoService.onDisConnect();
redoService.onDisConnect(new TestConnection(new RpcClient.ServerInfo()));
assertFalse(redoService.isConnected());
assertFalse(redoService.findInstanceRedoData().isEmpty());
assertFalse(redoService.findSubscriberRedoData().isEmpty());

View File

@ -0,0 +1,220 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.ability;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.ability.initializer.AbilityPostProcessor;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.spi.NacosServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* It is a capability control center, manager current node abilities or other control.
*
* @author Daydreamer
* @date 2022/7/12 19:18
**/
public abstract class AbstractAbilityControlManager {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAbilityControlManager.class);
/**
* current node support abilities.
*/
protected final Map<AbilityMode, Map<String, Boolean>> currentNodeAbilities = new ConcurrentHashMap<>();
protected AbstractAbilityControlManager() {
NotifyCenter.registerToPublisher(AbilityUpdateEvent.class, 16384);
initAbilityTable();
}
/**
* initialize abilities.
*
* @return abilities
*/
private void initAbilityTable() {
LOGGER.info("Ready to get current node abilities...");
// get processors
Map<AbilityMode, Map<AbilityKey, Boolean>> abilities = initCurrentNodeAbilities();
// get abilities
for (AbilityMode mode : AbilityMode.values()) {
Map<AbilityKey, Boolean> abilitiesTable = abilities.get(mode);
if (abilitiesTable == null) {
continue;
}
// check whether exist error key
// check for developer
for (AbilityKey abilityKey : abilitiesTable.keySet()) {
if (!mode.equals(abilityKey.getMode())) {
LOGGER.error(
"You should not contain a other mode: {} in a specify mode: {} abilities set, error key: {}, please check again.",
abilityKey.getMode(), mode, abilityKey);
throw new IllegalStateException(
"Except mode: " + mode + " but " + abilityKey + " mode: " + abilityKey.getMode()
+ ", please check again.");
}
}
Collection<AbilityPostProcessor> processors = NacosServiceLoader.load(AbilityPostProcessor.class);
for (AbilityPostProcessor processor : processors) {
processor.process(mode, abilitiesTable);
}
}
// init
Set<AbilityMode> abilityModes = abilities.keySet();
LOGGER.info("Ready to initialize current node abilities, support modes: {}", abilityModes);
for (AbilityMode abilityMode : abilityModes) {
this.currentNodeAbilities
.put(abilityMode, new ConcurrentHashMap<>(AbilityKey.mapStr(abilities.get(abilityMode))));
}
LOGGER.info("Initialize current abilities finish...");
}
/**
* Turn on the ability whose key is <p>abilityKey</p>.
*
* @param abilityKey ability key{@link AbilityKey}
*/
public void enableCurrentNodeAbility(AbilityKey abilityKey) {
Map<String, Boolean> abilities = this.currentNodeAbilities.get(abilityKey.getMode());
if (abilities != null) {
doTurn(abilities, abilityKey, true, abilityKey.getMode());
}
}
protected void doTurn(Map<String, Boolean> abilities, AbilityKey key, boolean turn, AbilityMode mode) {
if (!key.getMode().equals(mode)) {
throw new IllegalStateException("Except " + mode + " but " + key.getMode());
}
LOGGER.info("Turn current node ability: {}, turn: {}", key, turn);
abilities.put(key.getName(), turn);
// notify event
AbilityUpdateEvent abilityUpdateEvent = new AbilityUpdateEvent();
abilityUpdateEvent.setTable(Collections.unmodifiableMap(abilities));
abilityUpdateEvent.isOn = turn;
abilityUpdateEvent.abilityKey = key;
NotifyCenter.publishEvent(abilityUpdateEvent);
}
/**
* Turn off the ability whose key is <p>abilityKey</p> {@link AbilityKey}.
*
* @param abilityKey ability key
*/
public void disableCurrentNodeAbility(AbilityKey abilityKey) {
Map<String, Boolean> abilities = this.currentNodeAbilities.get(abilityKey.getMode());
if (abilities != null) {
doTurn(abilities, abilityKey, false, abilityKey.getMode());
}
}
/**
* . Whether current node support
*
* @param abilityKey ability key from {@link AbilityKey}
* @return whether support
*/
public AbilityStatus isCurrentNodeAbilityRunning(AbilityKey abilityKey) {
Map<String, Boolean> abilities = currentNodeAbilities.get(abilityKey.getMode());
if (abilities != null) {
Boolean support = abilities.get(abilityKey.getName());
if (support != null) {
return support ? AbilityStatus.SUPPORTED : AbilityStatus.NOT_SUPPORTED;
}
}
return AbilityStatus.UNKNOWN;
}
/**
* . Init current node abilities
*
* @return current node abilities
*/
protected abstract Map<AbilityMode, Map<AbilityKey, Boolean>> initCurrentNodeAbilities();
/**
* . Return the abilities current node
*
* @return current abilities
*/
public Map<String, Boolean> getCurrentNodeAbilities(AbilityMode mode) {
Map<String, Boolean> abilities = currentNodeAbilities.get(mode);
if (abilities != null) {
return Collections.unmodifiableMap(abilities);
}
return Collections.emptyMap();
}
/**
* A legal nacos application has a ability control manager. If there are more than one, the one with higher priority
* is preferred
*
* @return priority
*/
public abstract int getPriority();
/**
* notify when current node ability changing.
*/
public class AbilityUpdateEvent extends Event {
private static final long serialVersionUID = -1232411212311111L;
private AbilityKey abilityKey;
private boolean isOn;
private Map<String, Boolean> table;
private AbilityUpdateEvent() {
}
public Map<String, Boolean> getAbilityTable() {
return table;
}
public void setTable(Map<String, Boolean> abilityTable) {
this.table = abilityTable;
}
public AbilityKey getAbilityKey() {
return abilityKey;
}
public void setAbilityKey(AbilityKey abilityKey) {
this.abilityKey = abilityKey;
}
public boolean isOn() {
return isOn;
}
public void setOn(boolean on) {
isOn = on;
}
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.common.ability.discover;
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
import com.alibaba.nacos.common.spi.NacosServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.ServiceConfigurationError;
import java.util.stream.Collectors;
/**
* This class is used to discover {@link AbstractAbilityControlManager} implements. All the
* ability operation will be finish in this singleton.
*
* @author Daydreamer
* @date 2022/7/14 19:58
**/
public class NacosAbilityManagerHolder {
/**.
* private constructor
*/
private NacosAbilityManagerHolder() {
}
private static final Logger LOGGER = LoggerFactory.getLogger(NacosAbilityManagerHolder.class);
/**.
* singleton
*/
private static AbstractAbilityControlManager abstractAbilityControlManager;
static {
// spi discover implement
Collection<AbstractAbilityControlManager> load = null;
try {
// if server
load = NacosServiceLoader.load(AbstractAbilityControlManager.class);
} catch (ServiceConfigurationError e) {
throw new RuntimeException("[AbilityControlManager] Cannot find AbilityControlManger");
}
// the priority of the server is higher
List<AbstractAbilityControlManager> collect = load.stream()
.sorted(Comparator.comparingInt(AbstractAbilityControlManager::getPriority))
.collect(Collectors.toList());
// get the highest priority one
if (load.size() > 0) {
abstractAbilityControlManager = collect.get(collect.size() - 1);
LOGGER.info("[AbilityControlManager] Successfully initialize AbilityControlManager");
}
}
/**.
* get nacos ability control manager
*
* @return BaseAbilityControlManager
*/
public static AbstractAbilityControlManager getInstance() {
return abstractAbilityControlManager;
}
/**.
* Return the target type of ability manager
*
* @param clazz clazz
* @param <T> target type
* @return AbilityControlManager
*/
public static <T extends AbstractAbilityControlManager> T getInstance(Class<T> clazz) {
return clazz.cast(abstractAbilityControlManager);
}
}

View File

@ -16,8 +16,12 @@
package com.alibaba.nacos.common.remote.client;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.remote.Requester;
import java.util.Map;
/**
* connection on client side.
*
@ -33,6 +37,8 @@ public abstract class Connection implements Requester {
protected RpcClient.ServerInfo serverInfo;
protected Map<String, Boolean> abilityTable;
public Connection(RpcClient.ServerInfo serverInfo) {
this.serverInfo = serverInfo;
}
@ -45,6 +51,17 @@ public abstract class Connection implements Requester {
this.connectionId = connectionId;
}
public AbilityStatus getConnectionAbility(AbilityKey abilityKey) {
if (abilityTable == null || !abilityTable.containsKey(abilityKey.getName())) {
return AbilityStatus.UNKNOWN;
}
return abilityTable.get(abilityKey.getName()) ? AbilityStatus.SUPPORTED : AbilityStatus.NOT_SUPPORTED;
}
public void setAbilityTable(Map<String, Boolean> abilityTable) {
this.abilityTable = abilityTable;
}
/**
* Getter method for property <tt>abandon</tt>.
*

View File

@ -25,11 +25,15 @@ public interface ConnectionEventListener {
/**
* notify when connected to server.
*
* @param connection connection has connected
*/
void onConnected();
void onConnected(Connection connection);
/**
* notify when disconnected to server.
*
* @param connection connection has disconnected
*/
void onDisConnect();
void onDisConnect(Connection connection);
}

View File

@ -16,7 +16,8 @@
package com.alibaba.nacos.common.remote.client;
import com.alibaba.nacos.api.ability.ClientAbilities;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.RequestCallBack;
@ -85,8 +86,6 @@ public abstract class RpcClient implements Closeable {
private String tenant;
protected ClientAbilities clientAbilities;
private long lastActiveTimeStamp = System.currentTimeMillis();
/**
@ -131,16 +130,6 @@ public abstract class RpcClient implements Closeable {
return Collections.unmodifiableMap(rpcClientConfig.labels());
}
/**
* init client abilities.
*
* @param clientAbilities clientAbilities.
*/
public RpcClient clientAbilities(ClientAbilities clientAbilities) {
this.clientAbilities = clientAbilities;
return this;
}
/**
* init server list factory. only can init once.
*
@ -160,15 +149,17 @@ public abstract class RpcClient implements Closeable {
/**
* Notify when client disconnected.
*
* @param connection connection has disconnected
*/
protected void notifyDisConnected() {
protected void notifyDisConnected(Connection connection) {
if (connectionEventListeners.isEmpty()) {
return;
}
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify disconnected event to listeners", rpcClientConfig.name());
for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
try {
connectionEventListener.onDisConnect();
connectionEventListener.onDisConnect(connection);
} catch (Throwable throwable) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify disconnect listener error, listener = {}",
rpcClientConfig.name(), connectionEventListener.getClass().getName());
@ -178,15 +169,17 @@ public abstract class RpcClient implements Closeable {
/**
* Notify when client new connected.
*
* @param connection connection has connected
*/
protected void notifyConnected() {
protected void notifyConnected(Connection connection) {
if (connectionEventListeners.isEmpty()) {
return;
}
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify connected event to listeners.", rpcClientConfig.name());
for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
try {
connectionEventListener.onConnected();
connectionEventListener.onConnected(connection);
} catch (Throwable throwable) {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify connect listener error, listener = {}",
rpcClientConfig.name(), connectionEventListener.getClass().getName());
@ -268,9 +261,9 @@ public abstract class RpcClient implements Closeable {
try {
take = eventLinkedBlockingQueue.take();
if (take.isConnected()) {
notifyConnected();
notifyConnected(take.connection);
} else if (take.isDisConnected()) {
notifyDisConnected();
notifyDisConnected(take.connection);
}
} catch (Throwable e) {
// Do nothing
@ -376,7 +369,7 @@ public abstract class RpcClient implements Closeable {
connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
this.currentConnection = connectToServer;
rpcClientStatus.set(RpcClientStatus.RUNNING);
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED, currentConnection));
} else {
switchServerAsync();
}
@ -384,7 +377,7 @@ public abstract class RpcClient implements Closeable {
registerServerRequestHandler(new ConnectResetRequestHandler());
// register client detection request.
registerServerRequestHandler(request -> {
registerServerRequestHandler((request, connection) -> {
if (request instanceof ClientDetectionRequest) {
return new ClientDetectionResponse();
}
@ -397,7 +390,7 @@ public abstract class RpcClient implements Closeable {
class ConnectResetRequestHandler implements ServerRequestHandler {
@Override
public Response requestReply(Request request) {
public Response requestReply(Request request, Connection connection) {
if (request instanceof ConnectResetRequest) {
@ -413,6 +406,7 @@ public abstract class RpcClient implements Closeable {
} else {
switchServerAsync();
}
afterReset(connectResetRequest);
}
}
} catch (Exception e) {
@ -424,6 +418,15 @@ public abstract class RpcClient implements Closeable {
}
}
/**.
* invoke after receiving reset request
*
* @param request request for resetting
*/
protected void afterReset(ConnectResetRequest request) {
// hook for GrpcClient
}
@Override
public void shutdown() throws NacosException {
LOGGER.info("Shutdown rpc client, set status to shutdown");
@ -521,7 +524,7 @@ public abstract class RpcClient implements Closeable {
currentConnection = connectionNew;
rpcClientStatus.set(RpcClientStatus.RUNNING);
switchSuccess = true;
eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED));
eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED, currentConnection));
return;
}
@ -585,7 +588,7 @@ public abstract class RpcClient implements Closeable {
if (connection != null) {
LOGGER.info("Close current connection " + connection.getConnectionId());
connection.close();
eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.DISCONNECTED));
eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.DISCONNECTED, connection));
}
}
@ -822,7 +825,7 @@ public abstract class RpcClient implements Closeable {
lastActiveTimeStamp = System.currentTimeMillis();
for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
try {
Response response = serverRequestHandler.requestReply(request);
Response response = serverRequestHandler.requestReply(request, currentConnection);
if (response != null) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Ack server push request, request = {}, requestId = {}",
@ -984,8 +987,11 @@ public abstract class RpcClient implements Closeable {
int eventType;
public ConnectionEvent(int eventType) {
Connection connection;
public ConnectionEvent(int eventType, Connection connection) {
this.eventType = eventType;
this.connection = connection;
}
public boolean isConnected() {
@ -1025,4 +1031,18 @@ public abstract class RpcClient implements Closeable {
public void setTenant(String tenant) {
this.tenant = tenant;
}
/**
* Return ability of current connection.
*
* @param abilityKey ability key
* @return whether support, return null if connection is not ready
*/
public AbilityStatus getConnectionAbility(AbilityKey abilityKey) {
if (currentConnection != null) {
return currentConnection.getConnectionAbility(abilityKey);
}
// return null if connection is not ready
return null;
}
}

View File

@ -31,8 +31,9 @@ public interface ServerRequestHandler {
* Handle request from server.
*
* @param request request
* @param connection current connection, it can be used to know server ability
* @return response.
*/
Response requestReply(Request request);
Response requestReply(Request request, Connection connection);
}

View File

@ -60,6 +60,8 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
private long healthCheckTimeOut;
private long capabilityNegotiationTimeout;
private Map<String, String> labels;
private RpcClientTlsConfig tlsConfig = new RpcClientTlsConfig();
@ -90,6 +92,8 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
this.healthCheckTimeOut = loadLongConfig(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT, builder.healthCheckTimeOut);
this.channelKeepAliveTimeout = loadLongConfig(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT,
builder.channelKeepAliveTimeout);
this.capabilityNegotiationTimeout = loadLongConfig(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT,
builder.capabilityNegotiationTimeout);
this.labels = builder.labels;
this.labels.put("tls.enable", "false");
if (Objects.nonNull(builder.tlsConfig)) {
@ -177,6 +181,11 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
this.tlsConfig = tlsConfig;
}
@Override
public long capabilityNegotiationTimeout() {
return this.capabilityNegotiationTimeout;
}
@Override
public int healthCheckRetryTimes() {
return healthCheckRetryTimes;
@ -226,6 +235,8 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
private long healthCheckTimeOut = 3000L;
private long capabilityNegotiationTimeout = 5000L;
private Map<String, String> labels = new HashMap<>();
private RpcClientTlsConfig tlsConfig = new RpcClientTlsConfig();
@ -250,47 +261,51 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
this.timeOutMills = Long.parseLong(properties.getProperty(GrpcConstants.GRPC_TIMEOUT_MILLS));
}
if (properties.contains(GrpcConstants.GRPC_CONNECT_KEEP_ALIVE_TIME)) {
this.connectionKeepAlive = Long.parseLong(
properties.getProperty(GrpcConstants.GRPC_CONNECT_KEEP_ALIVE_TIME));
this.connectionKeepAlive = Long
.parseLong(properties.getProperty(GrpcConstants.GRPC_CONNECT_KEEP_ALIVE_TIME));
}
if (properties.contains(GrpcConstants.GRPC_THREADPOOL_KEEPALIVETIME)) {
this.threadPoolKeepAlive = Long.parseLong(
properties.getProperty(GrpcConstants.GRPC_THREADPOOL_KEEPALIVETIME));
this.threadPoolKeepAlive = Long
.parseLong(properties.getProperty(GrpcConstants.GRPC_THREADPOOL_KEEPALIVETIME));
}
if (properties.contains(GrpcConstants.GRPC_THREADPOOL_CORE_SIZE)) {
this.threadPoolCoreSize = Integer.parseInt(
properties.getProperty(GrpcConstants.GRPC_THREADPOOL_CORE_SIZE));
this.threadPoolCoreSize = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_THREADPOOL_CORE_SIZE));
}
if (properties.contains(GrpcConstants.GRPC_THREADPOOL_MAX_SIZE)) {
this.threadPoolMaxSize = Integer.parseInt(
properties.getProperty(GrpcConstants.GRPC_THREADPOOL_MAX_SIZE));
this.threadPoolMaxSize = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_THREADPOOL_MAX_SIZE));
}
if (properties.contains(GrpcConstants.GRPC_SERVER_CHECK_TIMEOUT)) {
this.serverCheckTimeOut = Long.parseLong(
properties.getProperty(GrpcConstants.GRPC_SERVER_CHECK_TIMEOUT));
this.serverCheckTimeOut = Long
.parseLong(properties.getProperty(GrpcConstants.GRPC_SERVER_CHECK_TIMEOUT));
}
if (properties.contains(GrpcConstants.GRPC_QUEUESIZE)) {
this.threadPoolQueueSize = Integer.parseInt(properties.getProperty(GrpcConstants.GRPC_QUEUESIZE));
}
if (properties.contains(GrpcConstants.GRPC_MAX_INBOUND_MESSAGE_SIZE)) {
this.maxInboundMessageSize = Integer.parseInt(
properties.getProperty(GrpcConstants.GRPC_MAX_INBOUND_MESSAGE_SIZE));
this.maxInboundMessageSize = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_MAX_INBOUND_MESSAGE_SIZE));
}
if (properties.contains(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME)) {
this.channelKeepAlive = Integer.parseInt(
properties.getProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME));
this.channelKeepAlive = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME));
}
if (properties.contains(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT)) {
this.capabilityNegotiationTimeout = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT));
}
if (properties.contains(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES)) {
this.healthCheckRetryTimes = Integer.parseInt(
properties.getProperty(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES));
this.healthCheckRetryTimes = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES));
}
if (properties.contains(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT)) {
this.healthCheckTimeOut = Long.parseLong(
properties.getProperty(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT));
this.healthCheckTimeOut = Long
.parseLong(properties.getProperty(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT));
}
if (properties.contains(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT)) {
this.channelKeepAliveTimeout = Integer.parseInt(
properties.getProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT));
this.channelKeepAliveTimeout = Integer
.parseInt(properties.getProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT));
}
this.tlsConfig = RpcClientTlsConfig.properties(properties);
return this;
@ -399,6 +414,10 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig {
return this;
}
public void setCapabilityNegotiationTimeout(long capabilityNegotiationTimeout) {
this.capabilityNegotiationTimeout = capabilityNegotiationTimeout;
}
/**
* set healthCheckRetryTimes.
*/

View File

@ -16,30 +16,35 @@
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc;
import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.grpc.auto.RequestGrpc;
import com.alibaba.nacos.api.remote.request.ConnectResetRequest;
import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.ServerCheckRequest;
import com.alibaba.nacos.api.remote.request.SetupAckRequest;
import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ServerCheckResponse;
import com.alibaba.nacos.api.remote.response.SetupAckResponse;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.packagescan.resource.Resource;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.ServerRequestHandler;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.common.utils.TlsTypeResolve;
import com.alibaba.nacos.common.utils.ThreadFactoryBuilder;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
@ -56,9 +61,11 @@ import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -78,6 +85,11 @@ public abstract class GrpcClient extends RpcClient {
private ThreadPoolExecutor grpcExecutor;
/**
* Block to wait setup success response.
*/
private final RecAbilityContext recAbilityContext = new RecAbilityContext(null);
@Override
public ConnectionType getConnectionType() {
return ConnectionType.GRPC;
@ -109,6 +121,7 @@ public abstract class GrpcClient extends RpcClient {
public GrpcClient(GrpcClientConfig clientConfig) {
super(clientConfig);
this.clientConfig = clientConfig;
initSetupHandler();
}
/**
@ -120,6 +133,15 @@ public abstract class GrpcClient extends RpcClient {
public GrpcClient(GrpcClientConfig clientConfig, ServerListFactory serverListFactory) {
super(clientConfig, serverListFactory);
this.clientConfig = clientConfig;
initSetupHandler();
}
/**
* setup handler.
*/
private void initSetupHandler() {
// register to handler setup request
registerServerRequestHandler(new SetupRequestHandler(this.recAbilityContext));
}
/**
@ -271,6 +293,8 @@ public abstract class GrpcClient extends RpcClient {
LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}",
grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8());
// remove and notify
recAbilityContext.release(null);
}
}
@ -323,6 +347,8 @@ public abstract class GrpcClient extends RpcClient {
@Override
public Connection connectToServer(ServerInfo serverInfo) {
// the newest connection id
String connectionId = "";
try {
if (grpcExecutor == null) {
this.grpcExecutor = createGrpcExecutor(serverInfo.getServerIp());
@ -330,16 +356,28 @@ public abstract class GrpcClient extends RpcClient {
int port = serverInfo.getServerPort() + rpcPortOffset();
ManagedChannel managedChannel = createNewManagedChannel(serverInfo.getServerIp(), port);
RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(managedChannel);
if (newChannelStubTemp != null) {
Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
if (!(response instanceof ServerCheckResponse)) {
if (response == null || !(response instanceof ServerCheckResponse)) {
shuntDownChannel(managedChannel);
return null;
}
// submit ability table as soon as possible
// ability table will be null if server doesn't support ability table
ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response;
connectionId = serverCheckResponse.getConnectionId();
BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(
newChannelStubTemp.getChannel());
GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);
grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());
grpcConn.setConnectionId(connectionId);
// if not supported, it will be false
if (serverCheckResponse.isSupportAbilityNegotiation()) {
// mark
this.recAbilityContext.reset(grpcConn);
}
//create stream request and bind connection event to this connection.
StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);
@ -352,16 +390,138 @@ public abstract class GrpcClient extends RpcClient {
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels());
conSetupRequest.setAbilities(super.clientAbilities);
// set ability table
conSetupRequest.setAbilityTable(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode()));
conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest);
// wait for response
if (recAbilityContext.isNeedToSync()) {
// try to wait for notify response
recAbilityContext.await(this.clientConfig.capabilityNegotiationTimeout(), TimeUnit.MILLISECONDS);
} else {
// leave for adapting old version server
// wait to register connection setup
Thread.sleep(100L);
}
return grpcConn;
} catch (Exception e) {
LOGGER.error("[{}]Fail to connect to server!", GrpcClient.this.getName(), e);
}
return null;
} catch (Exception e) {
LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e);
// remove and notify
recAbilityContext.release(null);
}
return null;
}
/**
* ability mode: sdk client or cluster client.
*
* @return mode
*/
protected abstract AbilityMode abilityMode();
@Override
protected void afterReset(ConnectResetRequest request) {
recAbilityContext.release(null);
}
/**
* This is for receiving server abilities.
*/
class RecAbilityContext {
/**
* connection waiting for server abilities.
*/
private volatile Connection connection;
/**
* way to block client.
*/
private volatile CountDownLatch blocker;
private volatile boolean needToSync = false;
public RecAbilityContext(Connection connection) {
this.connection = connection;
this.blocker = new CountDownLatch(1);
}
/**
* whether to sync for ability table.
*
* @return whether to sync for ability table.
*/
public boolean isNeedToSync() {
return this.needToSync;
}
/**
* reset with new connection which is waiting for ability table.
*
* @param connection new connection which is waiting for ability table.
*/
public void reset(Connection connection) {
this.connection = connection;
this.blocker = new CountDownLatch(1);
this.needToSync = true;
}
/**
* notify sync by abilities.
*
* @param abilities abilities.
*/
public void release(Map<String, Boolean> abilities) {
if (this.connection != null) {
this.connection.setAbilityTable(abilities);
// avoid repeat setting
this.connection = null;
}
if (this.blocker != null) {
blocker.countDown();
}
this.needToSync = false;
}
/**
* await for abilities.
*
* @param timeout timeout.
* @param unit unit.
* @throws InterruptedException by blocker.
*/
public void await(long timeout, TimeUnit unit) throws InterruptedException {
if (this.blocker != null) {
this.blocker.await(timeout, unit);
}
this.needToSync = false;
}
}
/**
* Setup response handler.
*/
class SetupRequestHandler implements ServerRequestHandler {
private final RecAbilityContext abilityContext;
public SetupRequestHandler(RecAbilityContext abilityContext) {
this.abilityContext = abilityContext;
}
@Override
public Response requestReply(Request request, Connection connection) {
// if finish setup
if (request instanceof SetupAckRequest) {
SetupAckRequest setupAckRequest = (SetupAckRequest) request;
// remove and count down
recAbilityContext.release(setupAckRequest.getAbilityTable());
return new SetupAckResponse();
}
return null;
}
}
private ManagedChannelBuilder buildChannel(String serverIp, int port, Optional<SslContext> sslContext) {
@ -378,7 +538,7 @@ public abstract class GrpcClient extends RpcClient {
RpcClientTlsConfig tlsConfig = clientConfig.tlsConfig();
if (!tlsConfig.getEnableTls()) {
return Optional.absent();
return Optional.empty();
}
try {
SslContextBuilder builder = GrpcSslContexts.forClient();

View File

@ -96,4 +96,11 @@ public interface GrpcClientConfig extends RpcClientConfig {
*/
void setTlsConfig(RpcClientTlsConfig tlsConfig);
/**
* get timeout of connection setup(TimeUnit.MILLISECONDS).
*
* @return timeout of connection setup
*/
long capabilityNegotiationTimeout();
}

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
@ -75,6 +76,11 @@ public class GrpcClusterClient extends GrpcClient {
super(name, threadPoolCoreSize, threadPoolMaxSize, labels, tlsConfig);
}
@Override
protected AbilityMode abilityMode() {
return AbilityMode.CLUSTER_CLIENT;
}
@Override
public int rpcPortOffset() {
return Integer.parseInt(System.getProperty(GrpcConstants.NACOS_SERVER_GRPC_PORT_OFFSET_KEY,

View File

@ -79,6 +79,9 @@ public class GrpcConstants {
@GRpcConfigLabel
public static final String GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT = NACOS_CLIENT_GRPC + ".channel.keep.alive.timeout";
@GRpcConfigLabel
public static final String GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT = NACOS_CLIENT_GRPC + ".channel.capability.negotiation.timeout";
private static final Set<String> CONFIG_NAMES = new HashSet<>();
@Documented

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
@ -65,6 +66,11 @@ public class GrpcSdkClient extends GrpcClient {
super(name, threadPoolCoreSize, threadPoolMaxSize, labels, tlsConfig);
}
@Override
protected AbilityMode abilityMode() {
return AbilityMode.SDK_CLIENT;
}
/**
* constructor.
*

View File

@ -642,7 +642,7 @@ public class RpcClientTest {
return null;
}
};
rpcClient.serverRequestHandlers.add(req -> {
rpcClient.serverRequestHandlers.add((req, conn) -> {
throw new RuntimeException();
});
rpcClient.handleServerRequest(request);

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import com.alibaba.nacos.api.grpc.auto.RequestGrpc;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig;
@ -70,6 +71,11 @@ public class GrpcClientTest {
public void setUp() throws Exception {
when(clientConfig.name()).thenReturn("testClient");
grpcClient = spy(new GrpcClient(clientConfig) {
@Override
protected AbilityMode abilityMode() {
return AbilityMode.SDK_CLIENT;
}
@Override
public int rpcPortOffset() {
return 1000;

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.common.remote.client.grpc;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import org.junit.Test;
import static org.mockito.Mockito.when;
@ -33,6 +34,12 @@ public class GrpcClientTlsTest extends GrpcClientTest {
when(clientConfig.tlsConfig()).thenReturn(tlsConfig);
grpcClient = new GrpcClient(clientConfig) {
@Override
protected AbilityMode abilityMode() {
return AbilityMode.SDK_CLIENT;
}
@Override
public int rpcPortOffset() {
return 1000;
@ -51,6 +58,12 @@ public class GrpcClientTlsTest extends GrpcClientTest {
when(clientConfig.tlsConfig()).thenReturn(tlsConfig);
grpcClient = new GrpcClient(clientConfig) {
@Override
protected AbilityMode abilityMode() {
return AbilityMode.SDK_CLIENT;
}
@Override
public int rpcPortOffset() {
return 1000;
@ -72,6 +85,11 @@ public class GrpcClientTlsTest extends GrpcClientTest {
when(clientConfig.name()).thenReturn("testClient");
when(clientConfig.tlsConfig()).thenReturn(tlsConfig);
grpcClient = new GrpcClient(clientConfig) {
@Override
protected AbilityMode abilityMode() {
return AbilityMode.SDK_CLIENT;
}
@Override
public int rpcPortOffset() {
return 1000;

View File

@ -0,0 +1,109 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.ability.config;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.register.impl.ServerAbilities;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.event.ServerConfigChangeEvent;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.ConcurrentHashSet;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**.
* @author Daydreamer
* @description Dynamically load ability from config
* @date 2022/8/31 12:27
**/
@Configuration
public class AbilityConfigs extends Subscriber<ServerConfigChangeEvent> {
public static final String PREFIX = "nacos.core.ability.";
private static final Logger LOGGER = LoggerFactory.getLogger(AbilityConfigs.class);
private final Set<AbilityKey> serverAbilityKeys = new ConcurrentHashSet<>();
private AbstractAbilityControlManager abilityHandlerRegistry = NacosAbilityManagerHolder.getInstance();
public AbilityConfigs() {
// load ability
serverAbilityKeys.addAll(ServerAbilities.getStaticAbilities().keySet());
NotifyCenter.registerSubscriber(this);
}
@Override
public void onEvent(ServerConfigChangeEvent event) {
// load config
Map<AbilityKey, Boolean> newValues = new HashMap<>(serverAbilityKeys.size());
serverAbilityKeys.forEach(abilityKey -> {
String key = PREFIX + abilityKey.getName();
try {
// scan
Boolean property = EnvUtil.getProperty(key, Boolean.class);
if (property != null) {
newValues.put(abilityKey, property);
}
} catch (Exception e) {
LOGGER.warn("Update ability config from env failed, use old val, ability : {} , because : {}", key, e);
}
});
// update
refresh(newValues);
}
/**.
* refresh ability
*/
private void refresh(Map<AbilityKey, Boolean> newValues) {
newValues.forEach((abilityKey, val) -> {
// do nothing if has turned on/off
if (val) {
abilityHandlerRegistry.enableCurrentNodeAbility(abilityKey);
} else {
abilityHandlerRegistry.disableCurrentNodeAbility(abilityKey);
}
});
}
@Override
public Class<? extends Event> subscribeType() {
return ServerConfigChangeEvent.class;
}
@JustForTest
protected Set<AbilityKey> getServerAbilityKeys() {
return serverAbilityKeys;
}
@JustForTest
protected void setAbilityHandlerRegistry(AbstractAbilityControlManager abilityHandlerRegistry) {
this.abilityHandlerRegistry = abilityHandlerRegistry;
}
}

View File

@ -0,0 +1,101 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.ability.control;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import com.alibaba.nacos.api.ability.register.impl.ClusterClientAbilities;
import com.alibaba.nacos.api.ability.register.impl.SdkClientAbilities;
import com.alibaba.nacos.api.ability.register.impl.ServerAbilities;
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
import com.alibaba.nacos.core.ability.config.AbilityConfigs;
import com.alibaba.nacos.sys.env.EnvUtil;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**.
* @author Daydreamer
* @description {@link AbstractAbilityControlManager} for nacos-server.
* @date 2022/7/13 21:14
**/
public class ServerAbilityControlManager extends AbstractAbilityControlManager {
public ServerAbilityControlManager() {
}
@Override
protected Map<AbilityMode, Map<AbilityKey, Boolean>> initCurrentNodeAbilities() {
// init client abilities
Map<AbilityMode, Map<AbilityKey, Boolean>> res = new HashMap<>(2);
res.put(AbilityMode.CLUSTER_CLIENT, initClusterClientAbilities());
res.put(AbilityMode.SDK_CLIENT, initSdkClientAbilities());
// init server abilities
// static abilities
Map<AbilityKey, Boolean> staticAbilities = ServerAbilities.getStaticAbilities();
// all function server can support
Set<AbilityKey> abilityKeys = staticAbilities.keySet();
Map<AbilityKey, Boolean> abilityTable = new HashMap<>(abilityKeys.size());
// if not define in config, then load from ServerAbilities
Set<AbilityKey> unIncludedInConfig = new HashSet<>();
abilityKeys.forEach(abilityKey -> {
String key = AbilityConfigs.PREFIX + abilityKey.getName();
try {
Boolean property = EnvUtil.getProperty(key, Boolean.class);
// if not null
if (property != null) {
abilityTable.put(abilityKey, property);
} else {
unIncludedInConfig.add(abilityKey);
}
} catch (Exception e) {
// from ServerAbilities
unIncludedInConfig.add(abilityKey);
}
});
// load from ServerAbilities
unIncludedInConfig.forEach(abilityKey -> abilityTable.put(abilityKey, staticAbilities.get(abilityKey)));
res.put(AbilityMode.SERVER, abilityTable);
return res;
}
/**
* init cluster client abilities.
*/
private Map<AbilityKey, Boolean> initClusterClientAbilities() {
// static abilities
return ClusterClientAbilities.getStaticAbilities();
}
/**
* init sdk client abilities.
*/
private Map<AbilityKey, Boolean> initSdkClientAbilities() {
// static abilities
return SdkClientAbilities.getStaticAbilities();
}
@Override
public int getPriority() {
return 1;
}
}

View File

@ -53,8 +53,11 @@ public class Member implements Comparable<Member>, Cloneable, Serializable {
private transient int failAccessCnt = 0;
@Deprecated
private ServerAbilities abilities = new ServerAbilities();
private boolean grpcReportEnabled;
public Member() {
String prefix = "nacos.core.member.meta.";
extendInfo.put(MemberMetaDataConstants.SITE_KEY,
@ -65,10 +68,20 @@ public class Member implements Comparable<Member>, Cloneable, Serializable {
.put(MemberMetaDataConstants.WEIGHT, EnvUtil.getProperty(prefix + MemberMetaDataConstants.WEIGHT, "1"));
}
public boolean isGrpcReportEnabled() {
return grpcReportEnabled;
}
public void setGrpcReportEnabled(boolean grpcReportEnabled) {
this.grpcReportEnabled = grpcReportEnabled;
}
@Deprecated
public ServerAbilities getAbilities() {
return abilities;
}
@Deprecated
public void setAbilities(ServerAbilities abilities) {
this.abilities = abilities;
}

View File

@ -66,6 +66,7 @@ public class MemberUtil {
oldMember.setExtendInfo(newMember.getExtendInfo());
oldMember.setAddress(newMember.getAddress());
oldMember.setAbilities(newMember.getAbilities());
oldMember.setGrpcReportEnabled(newMember.isGrpcReportEnabled());
}
/**
@ -94,6 +95,8 @@ public class MemberUtil {
extendInfo.put(MemberMetaDataConstants.RAFT_PORT, String.valueOf(calculateRaftPort(target)));
extendInfo.put(MemberMetaDataConstants.READY_TO_UPGRADE, true);
target.setExtendInfo(extendInfo);
// use grpc to report default
target.setGrpcReportEnabled(true);
return target;
}
@ -107,7 +110,10 @@ public class MemberUtil {
if (member.getAbilities() == null || member.getAbilities().getRemoteAbility() == null) {
return false;
}
return member.getAbilities().getRemoteAbility().isSupportRemoteConnection();
boolean oldVerJudge = member.getAbilities().getRemoteAbility().isSupportRemoteConnection();
return member.isGrpcReportEnabled() || oldVerJudge;
}
public static int calculateRaftPort(Member member) {
@ -276,7 +282,8 @@ public class MemberUtil {
return true;
}
if (!expected.getAbilities().equals(actual.getAbilities())) {
// if change
if (expected.isGrpcReportEnabled() != actual.isGrpcReportEnabled()) {
return true;
}

View File

@ -65,6 +65,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import com.alibaba.nacos.core.ability.control.ServerAbilityControlManager;
import static com.alibaba.nacos.api.exception.NacosException.CLIENT_INVALID_PARAM;
@ -163,6 +164,7 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
this.localAddress = InetUtils.getSelfIP() + ":" + port;
this.self = MemberUtil.singleParse(this.localAddress);
this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);
this.self.setGrpcReportEnabled(true);
// init abilities.
this.self.setAbilities(initMemberAbilities());
@ -182,6 +184,12 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
Loggers.CORE.info("The cluster resource is initialized");
}
/**
* Init the ability of current node.
*
* @return ServerAbilities
* @deprecated ability of current node and event cluster can be managed by {@link ServerAbilityControlManager}
*/
private ServerAbilities initMemberAbilities() {
ServerAbilities serverAbilities = new ServerAbilities();
for (ServerAbilityInitializer each : ServerAbilityInitializerHolder.getInstance().getInitializers()) {
@ -566,7 +574,8 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());
if (target.getAbilities().getRemoteAbility().isGrpcReportEnabled()) {
// adapt old version
if (target.getAbilities().getRemoteAbility().isGrpcReportEnabled() || target.isGrpcReportEnabled()) {
reportByGrpc(target);
} else {
reportByHttp(target);
@ -590,6 +599,9 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
Loggers.CLUSTER.warn("failed to report new info to target node : {}, result : {}",
target.getAddress(), result);
MemberUtil.onFail(ServerMemberManager.this, target);
// try to connect by grpc next time, adapt old version
target.setGrpcReportEnabled(true);
target.getAbilities().getRemoteAbility().setGrpcReportEnabled(true);
}
}
@ -598,6 +610,9 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
Loggers.CLUSTER.error("failed to report new info to target node : {}, error : {}",
target.getAddress(), ExceptionUtil.getAllExceptionMsg(throwable));
MemberUtil.onFail(ServerMemberManager.this, target, throwable);
// try to connect by grpc next time, adapt old version
target.setGrpcReportEnabled(true);
target.getAbilities().getRemoteAbility().setGrpcReportEnabled(true);
}
@Override
@ -608,6 +623,9 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
} catch (Throwable ex) {
Loggers.CLUSTER.error("failed to report new info to target node by http : {}, error : {}",
target.getAddress(), ExceptionUtil.getAllExceptionMsg(ex));
// try to connect by grpc next time, adapt old version
target.setGrpcReportEnabled(true);
target.getAbilities().getRemoteAbility().setGrpcReportEnabled(true);
}
}
@ -635,6 +653,7 @@ public class ServerMemberManager implements ApplicationListener<WebServerInitial
} catch (NacosException e) {
if (e.getErrCode() == NacosException.NO_HANDLER) {
target.getAbilities().getRemoteAbility().setGrpcReportEnabled(false);
target.setGrpcReportEnabled(false);
}
Loggers.CLUSTER.error("failed to report new info to target node by grpc : {}, error : {}",
target.getAddress(), ExceptionUtil.getAllExceptionMsg(e));

View File

@ -18,11 +18,12 @@ package com.alibaba.nacos.core.listener;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.common.event.ServerConfigChangeEvent;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.executor.ThreadPoolManager;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.event.ServerConfigChangeEvent;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.sys.env.EnvUtil;
import com.alibaba.nacos.sys.file.FileChangeEvent;
import com.alibaba.nacos.sys.file.FileWatcher;
@ -30,7 +31,6 @@ import com.alibaba.nacos.sys.file.WatchFileCenter;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import com.alibaba.nacos.sys.utils.DiskUtils;
import com.alibaba.nacos.sys.utils.InetUtils;
import com.alibaba.nacos.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.env.OriginTrackedMapPropertySource;
@ -236,8 +236,8 @@ public class StartingApplicationListener implements NacosApplicationListener {
private void logStarting() {
if (!EnvUtil.getStandaloneMode()) {
scheduledExecutorService = ExecutorFactory.newSingleScheduledExecutorService(
new NameThreadFactory("com.alibaba.nacos.core.nacos-starting"));
scheduledExecutorService = ExecutorFactory
.newSingleScheduledExecutorService(new NameThreadFactory("com.alibaba.nacos.core.nacos-starting"));
scheduledExecutorService.scheduleWithFixedDelay(() -> {
if (starting) {

View File

@ -16,7 +16,6 @@
package com.alibaba.nacos.core.remote;
import com.alibaba.nacos.api.ability.ClientAbilities;
import com.alibaba.nacos.api.remote.Requester;
import java.util.Map;
@ -32,7 +31,7 @@ public abstract class Connection implements Requester {
private boolean traced = false;
private ClientAbilities abilities;
private Map<String, Boolean> abilityTable;
private final ConnectionMeta metaInfo;
@ -52,22 +51,12 @@ public abstract class Connection implements Requester {
this.traced = traced;
}
/**
* get abilities.
*
* @return
*/
public ClientAbilities getAbilities() {
return abilities;
public void setAbilityTable(Map<String, Boolean> abilityTable) {
this.abilityTable = abilityTable;
}
/**
* set abilities.
*
* @param abilities abilities.
*/
public void setAbilities(ClientAbilities abilities) {
this.abilities = abilities;
public Map<String, Boolean> getAbilityTable() {
return this.abilityTable;
}
/**
@ -95,7 +84,7 @@ public abstract class Connection implements Requester {
@Override
public String toString() {
return "Connection{" + "traced=" + traced + ", abilities=" + abilities + ", metaInfo=" + metaInfo + '}';
return "Connection{" + "traced=" + traced + ", abilities=" + abilityTable + ", metaInfo=" + metaInfo + '}';
}
}

View File

@ -275,6 +275,7 @@ public class ConnectionManager {
String[] split = redirectAddress.split(Constants.COLON);
connectResetRequest.setServerIp(split[0]);
connectResetRequest.setServerPort(split[1]);
connectResetRequest.setConnectionId(connectionId);
}
try {
connection.request(connectResetRequest, 3000L);

View File

@ -16,11 +16,14 @@
package com.alibaba.nacos.core.remote.grpc;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc;
import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
import com.alibaba.nacos.api.remote.request.SetupAckRequest;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.grpc.GrpcUtils;
import com.alibaba.nacos.core.remote.Connection;
@ -112,8 +115,13 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
metaInfo.setTenant(setUpRequest.getTenant());
Connection connection = new GrpcConnection(metaInfo, responseObserver, GrpcServerConstants.CONTEXT_KEY_CHANNEL.get());
connection.setAbilities(setUpRequest.getAbilities());
Connection connection = new GrpcConnection(metaInfo, responseObserver,
GrpcServerConstants.CONTEXT_KEY_CHANNEL.get());
// null if supported
if (setUpRequest.getAbilityTable() != null) {
// map to table
connection.setAbilityTable(setUpRequest.getAbilityTable());
}
boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();
if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {
@ -129,6 +137,15 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
.warn("[{}]Send connect reset request error,error={}", connectionId, e);
}
}
} else {
try {
// finish register, tell client has set up successfully
connection.request(new SetupAckRequest(NacosAbilityManagerHolder.getInstance()
.getCurrentNodeAbilities(AbilityMode.SERVER)), 3000L);
} catch (Exception e) {
// nothing to do
}
}
} else if (parseObj instanceof Response) {

View File

@ -89,7 +89,7 @@ public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase {
// server check.
if (ServerCheckRequest.class.getSimpleName().equals(type)) {
Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get()));
Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get(), true));
traceIfNecessary(serverCheckResponseP, false);
responseObserver.onNext(serverCheckResponseP);
responseObserver.onCompleted();
@ -165,6 +165,7 @@ public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase {
requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
requestMeta.setAbilityTable(connection.getAbilityTable());
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);

View File

@ -0,0 +1,18 @@
#
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
com.alibaba.nacos.core.ability.control.ServerAbilityControlManager

View File

@ -0,0 +1,66 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.ability;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@SpringBootTest
public class AbilityControlManagerTest {
private TestServerAbilityControlManager serverAbilityControlManager = new TestServerAbilityControlManager();
@Before
public void inject() {
Map<String, Boolean> newTable = new HashMap<>();
newTable.put(AbilityKey.SERVER_TEST_1.getName(), true);
serverAbilityControlManager.setCurrentSupportingAbility(newTable);
}
@Test
public void testCurrentNodeAbility() {
Set<String> keySet = serverAbilityControlManager.getCurrentNodeAbilities(AbilityMode.SERVER).keySet();
// diable all
keySet.forEach(key -> serverAbilityControlManager
.disableCurrentNodeAbility(AbilityKey.getEnum(AbilityMode.SERVER, key)));
// get all
keySet.forEach(key -> {
Assert.assertNotEquals(serverAbilityControlManager
.isCurrentNodeAbilityRunning(AbilityKey.getEnum(AbilityMode.SERVER, key)), AbilityStatus.SUPPORTED);
});
// enable all
keySet.forEach(key -> serverAbilityControlManager
.enableCurrentNodeAbility(AbilityKey.getEnum(AbilityMode.SERVER, key)));
// get all
keySet.forEach(key -> {
Assert.assertEquals(serverAbilityControlManager
.isCurrentNodeAbilityRunning(AbilityKey.getEnum(AbilityMode.SERVER, key)), AbilityStatus.SUPPORTED);
});
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.ability;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.core.ability.control.ServerAbilityControlManager;
import java.util.Map;
public class TestServerAbilityControlManager extends ServerAbilityControlManager {
@JustForTest
public void setCurrentSupportingAbility(Map<String, Boolean> ability) {
currentNodeAbilities.get(AbilityMode.SERVER).clear();
currentNodeAbilities.get(AbilityMode.SERVER).putAll(ability);
}
}

View File

@ -0,0 +1,134 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.ability.config;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry;
import com.alibaba.nacos.api.ability.register.impl.ServerAbilities;
import com.alibaba.nacos.common.event.ServerConfigChangeEvent;
import com.alibaba.nacos.core.ability.TestServerAbilityControlManager;
import com.alibaba.nacos.core.ability.control.ServerAbilityControlManager;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.mock.env.MockEnvironment;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
/**
* test for ability in config.
*
* @author Daydreamer
* @date 2022/9/3 12:27
**/
public class AbilityConfigsTest {
private MockEnvironment environment;
private TestAbilityConfig abilityConfigs;
private ServerAbilityControlManager serverAbilityControlManager;
private Map<AbilityKey, Boolean> currentAbilities;
@Before
public void setUp() throws Exception {
environment = new MockEnvironment();
EnvUtil.setEnvironment(environment);
abilityConfigs = new TestAbilityConfig();
inject(abilityConfigs);
serverAbilityControlManager.enableCurrentNodeAbility(AbilityKey.SERVER_TEST_1);
serverAbilityControlManager.enableCurrentNodeAbility(AbilityKey.SERVER_TEST_2);
}
void inject(AbilityConfigs abilityConfigs) {
TestServerAbilityControlManager serverAbilityControlManager = new TestServerAbilityControlManager();
Map<String, Boolean> newTable = new HashMap<>();
newTable.put(AbilityKey.SERVER_TEST_1.getName(), true);
newTable.put(AbilityKey.SERVER_TEST_2.getName(), true);
serverAbilityControlManager.setCurrentSupportingAbility(newTable);
abilityConfigs.setAbilityHandlerRegistry(serverAbilityControlManager);
this.serverAbilityControlManager = serverAbilityControlManager;
}
/**
* fill field.
*
* @throws Exception ignore
*/
public void fill() throws Exception {
Field instanceField = ServerAbilities.class.getDeclaredField("INSTANCE");
Field abilitiesField = AbstractAbilityRegistry.class.getDeclaredField("supportedAbilities");
abilitiesField.setAccessible(true);
instanceField.setAccessible(true);
ServerAbilities serverAbilities = (ServerAbilities) instanceField.get(ServerAbilities.class);
currentAbilities = (Map<AbilityKey, Boolean>) abilitiesField.get(serverAbilities);
currentAbilities.put(AbilityKey.SERVER_TEST_1, true);
currentAbilities.put(AbilityKey.SERVER_TEST_2, true);
}
@Test
public void testLoadAbilities() throws Exception {
environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_1.getName(), Boolean.TRUE.toString());
environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_2.getName(), Boolean.FALSE.toString());
// test load
fill();
ServerAbilityControlManager manager = new ServerAbilityControlManager();
// config has higher priority
Assert.assertEquals(manager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
Assert.assertNotEquals(manager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED);
// clear
currentAbilities.clear();
}
@Test
public void testInit() {
Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED);
}
@Test
public void testConfigChange() throws InterruptedException {
// test no change
environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_1.getName(), Boolean.TRUE.toString());
environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_2.getName(), Boolean.TRUE.toString());
abilityConfigs.onEvent(new ServerConfigChangeEvent());
Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED);
// test change
environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_1.getName(), Boolean.FALSE.toString());
abilityConfigs.onEvent(new ServerConfigChangeEvent());
Assert.assertNotEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED);
environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_1.getName(), Boolean.TRUE.toString());
abilityConfigs.onEvent(new ServerConfigChangeEvent());
Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_1.getName(), Boolean.FALSE.toString());
environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_2.getName(), Boolean.FALSE.toString());
abilityConfigs.onEvent(new ServerConfigChangeEvent());
Assert.assertNotEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
Assert.assertNotEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED);
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.ability.config;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import java.util.Set;
/**.
* @author Daydreamer
* @description Dynamically load ability from config. just for test
* @date 2022/8/31 12:27
**/
public class TestAbilityConfig extends AbilityConfigs {
public TestAbilityConfig() {
Set<AbilityKey> serverAbilityKeys = super.getServerAbilityKeys();
serverAbilityKeys.add(AbilityKey.SERVER_TEST_1);
serverAbilityKeys.add(AbilityKey.SERVER_TEST_2);
}
}

View File

@ -243,7 +243,7 @@ public class MemberUtilTest {
@Test
public void testIsBasicInfoChangedForChangedAbilities() {
Member newMember = buildMember();
newMember.getAbilities().getRemoteAbility().setSupportRemoteConnection(true);
newMember.setGrpcReportEnabled(true);
assertTrue(MemberUtil.isBasicInfoChanged(newMember, originalMember));
}

View File

@ -0,0 +1,293 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.test.ability;
import com.alibaba.nacos.Nacos;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityStatus;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest;
import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.request.SetupAckRequest;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.ability.AbstractAbilityControlManager;
import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientFactory;
import com.alibaba.nacos.common.remote.client.ServerListFactory;
import com.alibaba.nacos.common.remote.client.ServerRequestHandler;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.RequestFilters;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.alibaba.nacos.core.remote.RequestHandlerRegistry;
import com.alibaba.nacos.test.ability.component.TestServerAbilityControlManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Nacos.class, properties = {
"server.servlet.context-path=/nacos"}, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
@SuppressWarnings("all")
public class AbilityDiscovery {
@LocalServerPort
private int port;
@Resource
private RequestHandlerRegistry requestHandlerRegistry;
@Resource
private RequestFilters filters;
@Resource
private ConnectionManager connectionManager;
private RpcClient client;
private RpcClient clusterClient;
private ConfigService configService;
private AbstractAbilityControlManager oldInstance;
/**
* test server judge client abilities
*/
private volatile boolean serverSuccess = false;
private volatile boolean clientSuccess = false;
private volatile boolean clusterSuccess = false;
private Field abstractAbilityControlManager;
private Field registryHandlerFields;
private Field serverReuqestHandlersField;
private Field currentConnField;
@Before
public void setup() throws NoSuchFieldException, IllegalAccessException, NacosException {
// load class
oldInstance = NacosAbilityManagerHolder.getInstance();
// replace
abstractAbilityControlManager = NacosAbilityManagerHolder.class
.getDeclaredField("abstractAbilityControlManager");
abstractAbilityControlManager.setAccessible(true);
abstractAbilityControlManager.set(NacosAbilityManagerHolder.class, new TestServerAbilityControlManager());
// get registry field
registryHandlerFields = RequestHandlerRegistry.class.getDeclaredField("registryHandlers");
registryHandlerFields.setAccessible(true);
// currentConn
currentConnField = RpcClient.class.getDeclaredField("currentConnection");
currentConnField.setAccessible(true);
// init config service
Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:" + port);
configService = NacosFactory.createConfigService(properties);
// server request handler
serverReuqestHandlersField = RpcClient.class.getDeclaredField("serverRequestHandlers");
serverReuqestHandlersField.setAccessible(true);
// init client
client = RpcClientFactory.createClient(UUID.randomUUID().toString(), ConnectionType.GRPC, new HashMap<>());
client.serverListFactory(new ServerListFactory() {
@Override
public String genNextServer() {
return "127.0.0.1:" + port;
}
@Override
public String getCurrentServer() {
return "127.0.0.1:" + port;
}
@Override
public List<String> getServerList() {
return Collections.singletonList("127.0.0.1:" + port);
}
});
// connect to server
client.start();
clusterClient = RpcClientFactory.createClusterClient(UUID.randomUUID().toString(), ConnectionType.GRPC, new HashMap<>());
clusterClient.serverListFactory(new ServerListFactory() {
@Override
public String genNextServer() {
return "127.0.0.1:" + port;
}
@Override
public String getCurrentServer() {
return "127.0.0.1:" + port;
}
@Override
public List<String> getServerList() {
return Collections.singletonList("127.0.0.1:" + port);
}
});
// connect to server
clusterClient.start();
}
@Test
public void testClientDiscovery() throws NacosException {
// client judge ability
Assert.assertEquals(client.getConnectionAbility(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED);
Assert.assertEquals(client.getConnectionAbility(AbilityKey.SERVER_TEST_2), AbilityStatus.NOT_SUPPORTED);
}
@Test
public void testServerDiscoveryAndJudge() throws Exception {
Map<String, RequestHandler> handlers = (Map<String, RequestHandler>) registryHandlerFields
.get(requestHandlerRegistry);
// set handler
RequestHandler oldRequestHandler = handlers.remove(ConfigQueryRequest.class.getSimpleName());
handlers.put(ConfigQueryRequest.class.getSimpleName(), new ClientRequestHandler(filters));
configService.getConfig("test", "DEFAULT_GROUP", 2000);
// wait server invoke
Thread.sleep(3000);
Assert.assertTrue(serverSuccess);
// recover
handlers.remove(ConfigQueryRequest.class.getSimpleName());
handlers.put(ConfigQueryRequest.class.getSimpleName(), oldRequestHandler);
}
@Test
public void testClientJudge() throws Exception {
List<ServerRequestHandler> handlers = (List<ServerRequestHandler>) serverReuqestHandlersField.get(client);
handlers.clear();
// register
client.registerServerRequestHandler(new ServerRequestHandler() {
@Override
public Response requestReply(Request request, Connection connection) {
if (connection.getConnectionAbility(AbilityKey.SERVER_TEST_1).equals(AbilityStatus.SUPPORTED) && connection
.getConnectionAbility(AbilityKey.SERVER_TEST_2).equals(AbilityStatus.NOT_SUPPORTED)) {
clientSuccess = true;
}
return new Response(){};
}
});
// get id
Connection conn = (Connection) currentConnField.get(client);
com.alibaba.nacos.core.remote.Connection connection = connectionManager.getConnection(conn.getConnectionId());
try {
connection.request(new SetupAckRequest(), 2000L);
} catch (NacosException e) {
// nothing to do
}
// wait client react
Thread.sleep(4000);
Assert.assertTrue(clientSuccess);
}
@Test
public void testClusterClient() throws IllegalAccessException, NacosException, InterruptedException, NoSuchFieldException {
Map<String, RequestHandler> handlers = (Map<String, RequestHandler>) registryHandlerFields
.get(requestHandlerRegistry);
// set handler
RequestHandler oldRequestHandler = handlers.remove(ConfigQueryRequest.class.getSimpleName());
handlers.put(ConfigQueryRequest.class.getSimpleName(), new ClusterClientRequestHandler(filters));
configService.getConfig("test", "DEFAULT_GROUP", 2000);
// wait server invoke
Thread.sleep(3000);
Assert.assertTrue(clusterSuccess);
// recover
handlers.remove(ConfigQueryRequest.class.getSimpleName());
handlers.put(ConfigQueryRequest.class.getSimpleName(), oldRequestHandler);
}
@After
public void recover() throws IllegalAccessException, NacosException {
abstractAbilityControlManager.set(NacosAbilityManagerHolder.class, oldInstance);
client.shutdown();
}
/**
* just to test ability
*/
class ClientRequestHandler extends RequestHandler<ConfigQueryRequest, ConfigQueryResponse> {
public ClientRequestHandler(RequestFilters requestFilters) throws NoSuchFieldException, IllegalAccessException {
Field declaredField = RequestHandler.class.getDeclaredField("requestFilters");
declaredField.setAccessible(true);
declaredField.set(this, requestFilters);
}
@Override
public ConfigQueryResponse handle(ConfigQueryRequest request, RequestMeta meta) throws NacosException {
if (meta.getConnectionAbility(AbilityKey.SDK_CLIENT_TEST_1).equals(AbilityStatus.SUPPORTED)) {
serverSuccess = true;
}
return new ConfigQueryResponse();
}
}
/**
* just to test ability.
*/
class ClusterClientRequestHandler extends RequestHandler<ConfigQueryRequest, ConfigQueryResponse> {
public ClusterClientRequestHandler(RequestFilters requestFilters) throws NoSuchFieldException, IllegalAccessException {
Field declaredField = RequestHandler.class.getDeclaredField("requestFilters");
declaredField.setAccessible(true);
declaredField.set(this, requestFilters);
}
@Override
public ConfigQueryResponse handle(ConfigQueryRequest request, RequestMeta meta) throws NacosException {
if (meta.getConnectionAbility(AbilityKey.CLUSTER_CLIENT_TEST_1).equals(AbilityStatus.SUPPORTED)) {
clusterSuccess = true;
}
return new ConfigQueryResponse();
}
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 1999-2023 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.test.ability.component;
import com.alibaba.nacos.api.ability.constant.AbilityKey;
import com.alibaba.nacos.api.ability.constant.AbilityMode;
import com.alibaba.nacos.core.ability.control.ServerAbilityControlManager;
import java.util.HashMap;
import java.util.Map;
public class TestServerAbilityControlManager extends ServerAbilityControlManager {
@Override
protected Map<AbilityMode, Map<AbilityKey, Boolean>> initCurrentNodeAbilities() {
Map<AbilityKey, Boolean> map = new HashMap<>();
map.put(AbilityKey.SERVER_TEST_1, true);
map.put(AbilityKey.SERVER_TEST_2, false);
HashMap res = new HashMap<>();
res.put(AbilityMode.SERVER, map);
Map<AbilityKey, Boolean> map1 = new HashMap<>();
map1.put(AbilityKey.SDK_CLIENT_TEST_1, true);
res.put(AbilityMode.SDK_CLIENT, map1);
Map<AbilityKey, Boolean> map2 = new HashMap<>();
map2.put(AbilityKey.CLUSTER_CLIENT_TEST_1, true);
res.put(AbilityMode.CLUSTER_CLIENT, map2);
return res;
}
}