diff --git a/api/src/main/java/com/alibaba/nacos/api/ability/ClientAbilities.java b/api/src/main/java/com/alibaba/nacos/api/ability/ClientAbilities.java index ea4dd9aa7..fe9c80e95 100644 --- a/api/src/main/java/com/alibaba/nacos/api/ability/ClientAbilities.java +++ b/api/src/main/java/com/alibaba/nacos/api/ability/ClientAbilities.java @@ -28,6 +28,7 @@ import java.io.Serializable; * @author liuzunfei * @version $Id: ClientAbilities.java, v 0.1 2021年01月24日 00:09 AM liuzunfei Exp $ */ +@Deprecated public class ClientAbilities implements Serializable { private static final long serialVersionUID = -3590789441404549261L; diff --git a/api/src/main/java/com/alibaba/nacos/api/ability/ServerAbilities.java b/api/src/main/java/com/alibaba/nacos/api/ability/ServerAbilities.java index f6b8b5591..80d3f772d 100644 --- a/api/src/main/java/com/alibaba/nacos/api/ability/ServerAbilities.java +++ b/api/src/main/java/com/alibaba/nacos/api/ability/ServerAbilities.java @@ -29,6 +29,7 @@ import java.util.Objects; * @author liuzunfei * @version $Id: ServerAbilities.java, v 0.1 2021年01月24日 00:09 AM liuzunfei Exp $ */ +@Deprecated public class ServerAbilities implements Serializable { private static final long serialVersionUID = -2120543002911304171L; diff --git a/api/src/main/java/com/alibaba/nacos/api/ability/constant/AbilityKey.java b/api/src/main/java/com/alibaba/nacos/api/ability/constant/AbilityKey.java new file mode 100644 index 000000000..d77460cf3 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/ability/constant/AbilityKey.java @@ -0,0 +1,179 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.ability.constant; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.stream.Collectors; + +/** + * Ability key constant. It is used to constrain the ability key.
+ * Ensure that return value of {@link AbilityKey#getName()} is unique under one specify {@link AbilityMode}. + * + * @author Daydreamer + * @date 2022/8/31 12:27 + **/ +public enum AbilityKey { + + /** + * For Test temporarily. + */ + SERVER_TEST_1("test_1", "just for junit test", AbilityMode.SERVER), + + /** + * For Test temporarily. + */ + SERVER_TEST_2("test_2", "just for junit test", AbilityMode.SERVER), + + /** + * For Test temporarily. + */ + SDK_CLIENT_TEST_1("test_1", "just for junit test", AbilityMode.SDK_CLIENT), + + /** + * For Test temporarily. + */ + CLUSTER_CLIENT_TEST_1("test_1", "just for junit test", AbilityMode.CLUSTER_CLIENT); + + /** + * the name of a certain ability. + */ + private final String keyName; + + /** + * description or comment about this ability. + */ + private final String description; + + /** + * ability mode, which endpoint hold this ability. + */ + private final AbilityMode mode; + + AbilityKey(String keyName, String description, AbilityMode mode) { + this.keyName = keyName; + this.description = description; + this.mode = mode; + } + + public String getName() { + return keyName; + } + + public String getDescription() { + return description; + } + + public AbilityMode getMode() { + return mode; + } + + /** + * All key set. + */ + private static final Map> ALL_ABILITIES = new HashMap<>(); + + /** + * Get all keys. + * + * @return all keys + */ + public static Collection getAllValues(AbilityMode mode) { + return Collections.unmodifiableCollection(ALL_ABILITIES.get(mode).values()); + } + + /** + * Get all names. + * + * @return all names + */ + public static Collection getAllNames(AbilityMode mode) { + return Collections.unmodifiableCollection(ALL_ABILITIES.get(mode).keySet()); + } + + /** + * Whether contains this name. + * + * @param name key name + * @return whether contains + */ + public static boolean isLegalKey(AbilityMode mode, String name) { + return ALL_ABILITIES.get(mode).containsKey(name); + } + + /** + * Map the string key to enum. + * + * @param abilities map + * @return enum map + */ + public static Map mapEnum(AbilityMode mode, Map abilities) { + if (abilities == null || abilities.isEmpty()) { + return Collections.emptyMap(); + } + return abilities.entrySet() + .stream() + .filter(entry -> isLegalKey(mode, entry.getKey())) + .collect(Collectors.toMap((entry) -> getEnum(mode, entry.getKey()), Map.Entry::getValue)); + } + + /**. + * Map the string key to enum + * + * @param abilities map + * @return enum map + */ + public static Map mapStr(Map abilities) { + if (abilities == null || abilities.isEmpty()) { + return Collections.emptyMap(); + } + return abilities.entrySet() + .stream() + .collect(Collectors.toMap((entry) -> entry.getKey().getName(), Map.Entry::getValue)); + } + + /**. + * getter to obtain enum + * + * @param key string key + * @return enum + */ + public static AbilityKey getEnum(AbilityMode mode, String key) { + return ALL_ABILITIES.get(mode).get(key); + } + + static { + // check for developer + // ensure that name filed is unique under a AbilityMode + try { + for (AbilityKey value : AbilityKey.values()) { + AbilityMode mode = value.mode; + Map map = ALL_ABILITIES.getOrDefault(mode, new HashMap<>()); + AbilityKey previous = map.putIfAbsent(value.getName(), value); + if (previous != null) { + throw new IllegalStateException("Duplicate key name field " + value + " and " + previous + " under mode: " + mode); + } + ALL_ABILITIES.put(mode, map); + } + } catch (Throwable t) { + // for developer checking + t.printStackTrace(); + } + } +} diff --git a/api/src/main/java/com/alibaba/nacos/api/ability/constant/AbilityMode.java b/api/src/main/java/com/alibaba/nacos/api/ability/constant/AbilityMode.java new file mode 100644 index 000000000..2355a48bf --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/ability/constant/AbilityMode.java @@ -0,0 +1,41 @@ +/* + * Copyright 1999-2023 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.ability.constant; + +/** + * Ability mode. + * + * @author Daydreamer + * @date 2023/9/25 12:32 + **/ +public enum AbilityMode { + + /** + * for server ability. + */ + SERVER, + + /** + * for sdk client. + */ + SDK_CLIENT, + + /** + * for cluster client. + */ + CLUSTER_CLIENT; +} diff --git a/api/src/main/java/com/alibaba/nacos/api/ability/constant/AbilityStatus.java b/api/src/main/java/com/alibaba/nacos/api/ability/constant/AbilityStatus.java new file mode 100644 index 000000000..a762c0373 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/ability/constant/AbilityStatus.java @@ -0,0 +1,40 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.ability.constant; + +/**. + * @author Daydreamer + * @description It is used to know a certain ability whether supporting. + * @date 2022/8/31 12:27 + **/ +public enum AbilityStatus { + + /**. + * Support a certain ability + */ + SUPPORTED, + + /**. + * Not support a certain ability + */ + NOT_SUPPORTED, + + /**. + * Cannot find ability table, unknown + */ + UNKNOWN +} diff --git a/api/src/main/java/com/alibaba/nacos/api/ability/initializer/AbilityInitializer.java b/api/src/main/java/com/alibaba/nacos/api/ability/initializer/AbilityInitializer.java index f7ad356c1..71192dffa 100644 --- a/api/src/main/java/com/alibaba/nacos/api/ability/initializer/AbilityInitializer.java +++ b/api/src/main/java/com/alibaba/nacos/api/ability/initializer/AbilityInitializer.java @@ -21,6 +21,7 @@ package com.alibaba.nacos.api.ability.initializer; * * @author xiweng.yy */ +@Deprecated public interface AbilityInitializer { /** diff --git a/api/src/main/java/com/alibaba/nacos/api/ability/initializer/AbilityPostProcessor.java b/api/src/main/java/com/alibaba/nacos/api/ability/initializer/AbilityPostProcessor.java new file mode 100644 index 000000000..667c21c58 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/ability/initializer/AbilityPostProcessor.java @@ -0,0 +1,40 @@ +/* + * Copyright 1999-2023 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.ability.initializer; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.constant.AbilityMode; + +import java.util.Map; + +/** + * Nacos ability post processor, load by spi. + * + * @author Daydreamer-ia + */ +public interface AbilityPostProcessor { + + + /** + * process before loading by Ability Controller . + * + * @param mode mode: sdk client, server or cluster client + * @param abilities abilities + */ + void process(AbilityMode mode, Map abilities); + +} diff --git a/api/src/main/java/com/alibaba/nacos/api/ability/register/AbstractAbilityRegistry.java b/api/src/main/java/com/alibaba/nacos/api/ability/register/AbstractAbilityRegistry.java new file mode 100644 index 000000000..40af9087c --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/ability/register/AbstractAbilityRegistry.java @@ -0,0 +1,42 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.ability.register; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/**. + * @author Daydreamer + * @description Operation for bit table. + * @date 2022/7/12 19:23 + **/ +public abstract class AbstractAbilityRegistry { + + protected final Map supportedAbilities = new HashMap<>(); + + /**. + * get static ability current server supports + * + * @return static ability + */ + public Map getSupportedAbilities() { + return Collections.unmodifiableMap(supportedAbilities); + } +} diff --git a/api/src/main/java/com/alibaba/nacos/api/ability/register/impl/ClusterClientAbilities.java b/api/src/main/java/com/alibaba/nacos/api/ability/register/impl/ClusterClientAbilities.java new file mode 100644 index 000000000..b43f43dae --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/ability/register/impl/ClusterClientAbilities.java @@ -0,0 +1,57 @@ +/* + * Copyright 1999-2023 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.ability.register.impl; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry; + +import java.util.Map; + +/** + * It is used to register cluster client abilities. + * + * @author Daydreamer + **/ +public class ClusterClientAbilities extends AbstractAbilityRegistry { + + private static final ClusterClientAbilities INSTANCE = new ClusterClientAbilities(); + + { + /* + * example: + * There is a function named "compression". + * The key is from

AbilityKey

, the value is whether turn on. + * + * You can add a new public field in

AbilityKey

like: + * DATA_COMPRESSION("compression", "description about this ability") + * + * And then you need to declare whether turn on in the ability table, you can: + * supportedAbilities.put(AbilityKey.DATA_COMPRESSION, true); means that current client support compression. + * + */ + // put ability here, which you want current client supports + } + + /** + * get static ability current cluster client supports. + * + * @return static ability + */ + public static Map getStaticAbilities() { + return INSTANCE.getSupportedAbilities(); + } +} diff --git a/api/src/main/java/com/alibaba/nacos/api/ability/register/impl/SdkClientAbilities.java b/api/src/main/java/com/alibaba/nacos/api/ability/register/impl/SdkClientAbilities.java new file mode 100644 index 000000000..cb306f11d --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/ability/register/impl/SdkClientAbilities.java @@ -0,0 +1,58 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.ability.register.impl; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry; + +import java.util.Map; + +/** + * It is used to register client abilities. + * + * @author Daydreamer + * @date 2022/8/31 12:32 + **/ +public class SdkClientAbilities extends AbstractAbilityRegistry { + + private static final SdkClientAbilities INSTANCE = new SdkClientAbilities(); + + { + /* + * example: + * There is a function named "compression". + * The key is from

AbilityKey

, the value is whether turn on. + * + * You can add a new public field in

AbilityKey

like: + * DATA_COMPRESSION("compression", "description about this ability") + * + * And then you need to declare whether turn on in the ability table, you can: + * supportedAbilities.put(AbilityKey.DATA_COMPRESSION, true); means that current client support compression. + * + */ + // put ability here, which you want current client supports + } + + /**. + * get static ability current server supports + * + * @return static ability + */ + public static Map getStaticAbilities() { + return INSTANCE.getSupportedAbilities(); + } +} diff --git a/api/src/main/java/com/alibaba/nacos/api/ability/register/impl/ServerAbilities.java b/api/src/main/java/com/alibaba/nacos/api/ability/register/impl/ServerAbilities.java new file mode 100644 index 000000000..2fa8f9693 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/ability/register/impl/ServerAbilities.java @@ -0,0 +1,59 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.ability.register.impl; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry; + +import java.util.Map; + +/** + * It is used to register server abilities. + * + * @author Daydreamer + * @date 2022/8/31 12:32 + **/ +public class ServerAbilities extends AbstractAbilityRegistry { + + private static final ServerAbilities INSTANCE = new ServerAbilities(); + + { + /* + * example: + * There is a function named "compression". + * The key is from

AbilityKey

, the value is whether turn on. + * + * You can add a new public field in

AbilityKey

like: + * DATA_COMPRESSION("compression", "description about this ability") + * + * And then you need to declare whether turn on in the ability table, you can: + * supportedAbilities.put(AbilityKey.DATA_COMPRESSION, true); means that current client support compression. + * + */ + // put ability here, which you want current server supports + } + + /**. + * get static ability current server supports + * + * @return static ability + */ + public static Map getStaticAbilities() { + return INSTANCE.getSupportedAbilities(); + } + +} diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectResetRequest.java b/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectResetRequest.java index 1a2e14424..75c6fa0ca 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectResetRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectResetRequest.java @@ -30,11 +30,31 @@ public class ConnectResetRequest extends ServerRequest { String serverPort; + String connectionId; + @Override public String getModule() { return INTERNAL_MODULE; } + /** + * Getter method for property connectionId. + * + * @return property value of connectionId + */ + public String getConnectionId() { + return connectionId; + } + + /** + * Setter method for property connectionId. + * + * @param connectionId value to be assigned to property connectionId + */ + public void setConnectionId(String connectionId) { + this.connectionId = connectionId; + } + /** * Getter method for property serverIp. * diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectionSetupRequest.java b/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectionSetupRequest.java index 56e94dad2..3409728f5 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectionSetupRequest.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/request/ConnectionSetupRequest.java @@ -16,8 +16,6 @@ package com.alibaba.nacos.api.remote.request; -import com.alibaba.nacos.api.ability.ClientAbilities; - import java.util.HashMap; import java.util.Map; @@ -31,12 +29,12 @@ public class ConnectionSetupRequest extends InternalRequest { private String clientVersion; - private ClientAbilities abilities; - private String tenant; private Map labels = new HashMap<>(); + private Map abilityTable; + public ConnectionSetupRequest() { } @@ -64,11 +62,11 @@ public class ConnectionSetupRequest extends InternalRequest { this.tenant = tenant; } - public ClientAbilities getAbilities() { - return abilities; + public Map getAbilityTable() { + return abilityTable; } - public void setAbilities(ClientAbilities abilities) { - this.abilities = abilities; + public void setAbilityTable(Map abilityTable) { + this.abilityTable = abilityTable; } } diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/request/RequestMeta.java b/api/src/main/java/com/alibaba/nacos/api/remote/request/RequestMeta.java index 82eba58db..c633f8934 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/request/RequestMeta.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/request/RequestMeta.java @@ -16,6 +16,9 @@ package com.alibaba.nacos.api.remote.request; +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.constant.AbilityStatus; + import java.util.HashMap; import java.util.Map; @@ -34,7 +37,25 @@ public class RequestMeta { private String clientVersion = ""; private Map labels = new HashMap<>(); - + + private Map abilityTable; + + public AbilityStatus getConnectionAbility(AbilityKey abilityKey) { + if (abilityTable == null || !abilityTable.containsKey(abilityKey.getName())) { + return AbilityStatus.UNKNOWN; + } + return abilityTable.get(abilityKey.getName()) ? AbilityStatus.SUPPORTED : AbilityStatus.NOT_SUPPORTED; + } + + /** + * Setter method for property abilityTable. + * + * @param abilityTable property value of clientVersion + */ + public void setAbilityTable(Map abilityTable) { + this.abilityTable = abilityTable; + } + /** * Getter method for property clientVersion. * diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/request/SetupAckRequest.java b/api/src/main/java/com/alibaba/nacos/api/remote/request/SetupAckRequest.java new file mode 100644 index 000000000..cecdaa825 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/remote/request/SetupAckRequest.java @@ -0,0 +1,52 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.remote.request; + +import java.util.Map; + +import static com.alibaba.nacos.api.common.Constants.Remote.INTERNAL_MODULE; + +/** + * Server tells the client that the connection is established. + * + * @author Daydreamer. + * @date 2022/7/12 19:21 + **/ +public class SetupAckRequest extends ServerRequest { + + private Map abilityTable; + + public SetupAckRequest() { + } + + public SetupAckRequest(Map abilityTable) { + this.abilityTable = abilityTable; + } + + public Map getAbilityTable() { + return abilityTable; + } + + public void setAbilityTable(Map abilityTable) { + this.abilityTable = abilityTable; + } + + @Override + public String getModule() { + return INTERNAL_MODULE; + } +} diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/response/ServerCheckResponse.java b/api/src/main/java/com/alibaba/nacos/api/remote/response/ServerCheckResponse.java index fd3981cd1..2b8e9fea8 100644 --- a/api/src/main/java/com/alibaba/nacos/api/remote/response/ServerCheckResponse.java +++ b/api/src/main/java/com/alibaba/nacos/api/remote/response/ServerCheckResponse.java @@ -26,12 +26,15 @@ public class ServerCheckResponse extends Response { private String connectionId; + private boolean supportAbilityNegotiation; + public ServerCheckResponse() { } - public ServerCheckResponse(String connectionId) { + public ServerCheckResponse(String connectionId, boolean supportAbilityNegotiation) { this.connectionId = connectionId; + this.supportAbilityNegotiation = supportAbilityNegotiation; } public String getConnectionId() { @@ -41,4 +44,12 @@ public class ServerCheckResponse extends Response { public void setConnectionId(String connectionId) { this.connectionId = connectionId; } + + public boolean isSupportAbilityNegotiation() { + return supportAbilityNegotiation; + } + + public void setSupportAbilityNegotiation(boolean supportAbilityNegotiation) { + this.supportAbilityNegotiation = supportAbilityNegotiation; + } } diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/response/SetupAckResponse.java b/api/src/main/java/com/alibaba/nacos/api/remote/response/SetupAckResponse.java new file mode 100644 index 000000000..ce4abcb94 --- /dev/null +++ b/api/src/main/java/com/alibaba/nacos/api/remote/response/SetupAckResponse.java @@ -0,0 +1,26 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.remote.response; + +/**. + * @author Daydreamer + * @description Server tells the client that the connection is established + * @date 2022/7/12 19:21 + **/ +public class SetupAckResponse extends Response { + +} diff --git a/api/src/main/resources/META-INF/services/com.alibaba.nacos.api.remote.Payload b/api/src/main/resources/META-INF/services/com.alibaba.nacos.api.remote.Payload index d838e65ea..5e9552afd 100644 --- a/api/src/main/resources/META-INF/services/com.alibaba.nacos.api.remote.Payload +++ b/api/src/main/resources/META-INF/services/com.alibaba.nacos.api.remote.Payload @@ -22,6 +22,8 @@ com.alibaba.nacos.api.remote.request.PushAckRequest com.alibaba.nacos.api.remote.request.ServerCheckRequest com.alibaba.nacos.api.remote.request.ServerLoaderInfoRequest com.alibaba.nacos.api.remote.request.ServerReloadRequest +com.alibaba.nacos.api.remote.request.SetupAckRequest +com.alibaba.nacos.api.remote.response.SetupAckResponse com.alibaba.nacos.api.remote.response.ClientDetectionResponse com.alibaba.nacos.api.remote.response.ConnectResetResponse com.alibaba.nacos.api.remote.response.ErrorResponse diff --git a/api/src/test/java/com/alibaba/nacos/api/remote/request/ConnectionSetupRequestTest.java b/api/src/test/java/com/alibaba/nacos/api/remote/request/ConnectionSetupRequestTest.java index e05860af9..03424d50a 100644 --- a/api/src/test/java/com/alibaba/nacos/api/remote/request/ConnectionSetupRequestTest.java +++ b/api/src/test/java/com/alibaba/nacos/api/remote/request/ConnectionSetupRequestTest.java @@ -16,11 +16,11 @@ package com.alibaba.nacos.api.remote.request; -import com.alibaba.nacos.api.ability.ClientAbilities; import org.junit.Assert; import org.junit.Test; import java.util.Collections; +import java.util.HashMap; public class ConnectionSetupRequestTest extends BasicRequestTest { @@ -28,7 +28,7 @@ public class ConnectionSetupRequestTest extends BasicRequestTest { public void testSerialize() throws Exception { ConnectionSetupRequest request = new ConnectionSetupRequest(); request.setClientVersion("2.2.2"); - request.setAbilities(new ClientAbilities()); + request.setAbilityTable(new HashMap<>()); request.setTenant("testNamespaceId"); request.setLabels(Collections.singletonMap("labelKey", "labelValue")); request.setRequestId("1"); @@ -37,7 +37,7 @@ public class ConnectionSetupRequestTest extends BasicRequestTest { Assert.assertTrue(json.contains("\"clientVersion\":\"2.2.2\"")); Assert.assertTrue(json.contains("\"tenant\":\"testNamespaceId\"")); Assert.assertTrue(json.contains("\"labels\":{\"labelKey\":\"labelValue\"}")); - Assert.assertTrue(json.contains("\"abilities\":{")); + Assert.assertTrue(json.contains("\"abilityTable\":{")); Assert.assertTrue(json.contains("\"module\":\"internal\"")); Assert.assertTrue(json.contains("\"requestId\":\"1\"")); } diff --git a/api/src/test/java/com/alibaba/nacos/api/remote/response/ServerCheckResponseTest.java b/api/src/test/java/com/alibaba/nacos/api/remote/response/ServerCheckResponseTest.java index 80c2ac9ea..c010f0d99 100644 --- a/api/src/test/java/com/alibaba/nacos/api/remote/response/ServerCheckResponseTest.java +++ b/api/src/test/java/com/alibaba/nacos/api/remote/response/ServerCheckResponseTest.java @@ -38,7 +38,7 @@ public class ServerCheckResponseTest { @Test public void testSerialization() throws JsonProcessingException { - ServerCheckResponse response = new ServerCheckResponse("35643245_1.1.1.1_3306"); + ServerCheckResponse response = new ServerCheckResponse("35643245_1.1.1.1_3306", false); String actual = mapper.writeValueAsString(response); assertTrue(actual.contains("\"connectionId\":\"35643245_1.1.1.1_3306\"")); } diff --git a/api/src/test/java/com/alibaba/nacos/api/utils/AbilityKeyTest.java b/api/src/test/java/com/alibaba/nacos/api/utils/AbilityKeyTest.java new file mode 100644 index 000000000..32af5ad2b --- /dev/null +++ b/api/src/test/java/com/alibaba/nacos/api/utils/AbilityKeyTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.api.utils; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.constant.AbilityMode; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +/**. + * @author Daydreamer + * @description Ability key test + * @date 2022/9/8 12:27 + **/ +public class AbilityKeyTest { + + @Test + public void testMapStr() { + Map enumMap = new HashMap<>(); + Map stringBooleanMap = AbilityKey.mapStr(enumMap); + Assert.assertEquals(0, stringBooleanMap.size()); + + enumMap.put(AbilityKey.SERVER_TEST_1, true); + enumMap.put(AbilityKey.SERVER_TEST_2, false); + stringBooleanMap = AbilityKey.mapStr(enumMap); + Assert.assertEquals(2, stringBooleanMap.size()); + Assert.assertTrue(stringBooleanMap.get(AbilityKey.SERVER_TEST_1.getName())); + Assert.assertFalse(stringBooleanMap.get(AbilityKey.SERVER_TEST_2.getName())); + + enumMap.put(AbilityKey.SERVER_TEST_2, true); + stringBooleanMap = AbilityKey.mapStr(enumMap); + Assert.assertEquals(2, stringBooleanMap.size()); + Assert.assertTrue(stringBooleanMap.get(AbilityKey.SERVER_TEST_1.getName())); + Assert.assertTrue(stringBooleanMap.get(AbilityKey.SERVER_TEST_2.getName())); + } + + @Test + public void testMapEnum() { + Map mapStr = new HashMap<>(); + mapStr.put("test-no-existed", true); + Map enumMap = AbilityKey.mapEnum(AbilityMode.SERVER, mapStr); + Assert.assertEquals(0, enumMap.size()); + + mapStr.put(AbilityKey.SERVER_TEST_2.getName(), false); + mapStr.put(AbilityKey.SERVER_TEST_1.getName(), true); + enumMap = AbilityKey.mapEnum(AbilityMode.SERVER, mapStr); + Assert.assertFalse(enumMap.get(AbilityKey.SERVER_TEST_2)); + Assert.assertTrue(enumMap.get(AbilityKey.SERVER_TEST_1)); + + mapStr.clear(); + mapStr.put(AbilityKey.SERVER_TEST_2.getName(), true); + mapStr.put(AbilityKey.SERVER_TEST_1.getName(), true); + enumMap = AbilityKey.mapEnum(AbilityMode.SERVER, mapStr); + Assert.assertTrue(enumMap.get(AbilityKey.SERVER_TEST_2)); + Assert.assertTrue(enumMap.get(AbilityKey.SERVER_TEST_1)); + + } + +} diff --git a/client/src/main/java/com/alibaba/nacos/client/ability/ClientAbilityControlManager.java b/client/src/main/java/com/alibaba/nacos/client/ability/ClientAbilityControlManager.java new file mode 100644 index 000000000..05d8fef47 --- /dev/null +++ b/client/src/main/java/com/alibaba/nacos/client/ability/ClientAbilityControlManager.java @@ -0,0 +1,50 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client.ability; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.constant.AbilityMode; +import com.alibaba.nacos.api.ability.register.impl.SdkClientAbilities; +import com.alibaba.nacos.common.ability.AbstractAbilityControlManager; + +import java.util.HashMap; +import java.util.Map; + +/**. + * @author Daydreamer + * @description {@link AbstractAbilityControlManager} for nacos-client. + * @date 2022/7/13 13:38 + **/ +public class ClientAbilityControlManager extends AbstractAbilityControlManager { + + public ClientAbilityControlManager() { + } + + @Override + protected Map> initCurrentNodeAbilities() { + Map> abilities = new HashMap<>(1); + abilities.put(AbilityMode.SDK_CLIENT, SdkClientAbilities.getStaticAbilities()); + return abilities; + } + + @Override + public int getPriority() { + // if server ability manager exist, you should choose the server one + return 0; + } + +} diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java index eb8c2d679..86dd81d46 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ClientWorker.java @@ -17,7 +17,6 @@ package com.alibaba.nacos.client.config.impl; import com.alibaba.nacos.api.PropertyKeyConst; -import com.alibaba.nacos.api.ability.ClientAbilities; import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.config.ConfigType; import com.alibaba.nacos.api.config.listener.Listener; @@ -37,6 +36,8 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.RemoteConstants; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.common.remote.client.Connection; +import com.alibaba.nacos.plugin.auth.api.RequestResource; import com.alibaba.nacos.client.config.common.GroupKey; import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager; import com.alibaba.nacos.client.config.filter.impl.ConfigResponse; @@ -65,7 +66,6 @@ import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacos.common.utils.ThreadUtils; import com.alibaba.nacos.common.utils.VersionUtils; -import com.alibaba.nacos.plugin.auth.api.RequestResource; import com.google.gson.Gson; import com.google.gson.JsonObject; import org.slf4j.Logger; @@ -122,7 +122,7 @@ public class ClientWorker implements Closeable { private final AtomicReference> cacheMap = new AtomicReference<>(new HashMap<>()); private final ConfigFilterChainManager configFilterChainManager; - + private String uuid = UUID.randomUUID().toString(); private long timeout; @@ -132,16 +132,16 @@ public class ClientWorker implements Closeable { private int taskPenaltyTime; private boolean enableRemoteSyncConfig = false; - + private static final int MIN_THREAD_NUM = 2; - + private static final int THREAD_MULTIPLE = 1; - + /** * index(taskId)-> total cache count for this taskId. */ private final List taskIdCacheCountList = new ArrayList<>(); - + /** * Add listeners for data. * @@ -404,11 +404,11 @@ public class ClientWorker implements Closeable { private void increaseTaskIdCount(int taskId) { taskIdCacheCountList.get(taskId).incrementAndGet(); } - + private void decreaseTaskIdCount(int taskId) { taskIdCacheCountList.get(taskId).decrementAndGet(); } - + private int calculateTaskId() { int perTaskSize = (int) ParamUtil.getPerTaskConfigSize(); for (int index = 0; index < taskIdCacheCountList.size(); index++) { @@ -419,7 +419,7 @@ public class ClientWorker implements Closeable { taskIdCacheCountList.add(new AtomicInteger(0)); return taskIdCacheCountList.size() - 1; } - + public CacheData getCache(String dataId, String group) { return getCache(dataId, group, TenantUtil.getUserTenantForAcm()); } @@ -567,7 +567,7 @@ public class ClientWorker implements Closeable { public class ConfigRpcTransportClient extends ConfigTransportClient { Map multiTaskExecutor = new HashMap<>(); - + private final BlockingQueue listenExecutebell = new ArrayBlockingQueue<>(1); private Object bellItem = new Object(); @@ -575,7 +575,7 @@ public class ClientWorker implements Closeable { private long lastAllSyncTime = System.currentTimeMillis(); Subscriber subscriber = null; - + /** * 3 minutes to check all listen cache keys. */ @@ -642,7 +642,7 @@ public class ClientWorker implements Closeable { /* * Register Config Change /Config ReSync Handler */ - rpcClientInner.registerServerRequestHandler((request) -> { + rpcClientInner.registerServerRequestHandler((request, connection) -> { if (request instanceof ConfigChangeNotifyRequest) { ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request; LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}", @@ -665,7 +665,7 @@ public class ClientWorker implements Closeable { return null; }); - rpcClientInner.registerServerRequestHandler((request) -> { + rpcClientInner.registerServerRequestHandler((request, connection) -> { if (request instanceof ClientConfigMetricRequest) { ClientConfigMetricResponse response = new ClientConfigMetricResponse(); response.setMetrics(getMetrics(((ClientConfigMetricRequest) request).getMetricsKeys())); @@ -677,13 +677,13 @@ public class ClientWorker implements Closeable { rpcClientInner.registerConnectionListener(new ConnectionEventListener() { @Override - public void onConnected() { + public void onConnected(Connection connection) { LOGGER.info("[{}] Connected,notify listen context...", rpcClientInner.getName()); notifyListenConfig(); } @Override - public void onDisConnect() { + public void onDisConnect(Connection connection) { String taskId = rpcClientInner.getLabels().get("taskId"); LOGGER.info("[{}] DisConnected,clear listen context...", rpcClientInner.getName()); Collection values = cacheMap.get().values(); @@ -820,7 +820,7 @@ public class ClientWorker implements Closeable { //execute check remove listen. checkRemoveListenCache(removeListenCachesMap); - + if (needAllSync) { lastAllSyncTime = now; } @@ -828,9 +828,9 @@ public class ClientWorker implements Closeable { if (hasChangedKeys) { notifyListenConfig(); } - + } - + private ExecutorService ensureSyncExecutor(String taskId) { if (!multiTaskExecutor.containsKey(taskId)) { multiTaskExecutor.put(taskId, @@ -842,14 +842,14 @@ public class ClientWorker implements Closeable { } return multiTaskExecutor.get(taskId); } - + private void checkRemoveListenCache(Map> removeListenCachesMap) { if (!removeListenCachesMap.isEmpty()) { List listenFutures = new ArrayList<>(); - + for (Map.Entry> entry : removeListenCachesMap.entrySet()) { String taskId = entry.getKey(); - + ExecutorService executorService = ensureSyncExecutor(taskId); Future future = executorService.submit(() -> { List removeListenCaches = entry.getValue(); @@ -868,7 +868,7 @@ public class ClientWorker implements Closeable { } } } - + } catch (Throwable e) { LOGGER.error("Async remove listen config change error ", e); try { @@ -880,7 +880,7 @@ public class ClientWorker implements Closeable { } }); listenFutures.add(future); - + } for (Future future : listenFutures) { try { @@ -891,9 +891,9 @@ public class ClientWorker implements Closeable { } } } - + private boolean checkListenCache(Map> listenCachesMap) { - + final AtomicBoolean hasChangedKeys = new AtomicBoolean(false); if (!listenCachesMap.isEmpty()) { List listenFutures = new ArrayList<>(); @@ -913,9 +913,9 @@ public class ClientWorker implements Closeable { ConfigChangeBatchListenResponse listenResponse = (ConfigChangeBatchListenResponse) requestProxy( rpcClient, configChangeListenRequest); if (listenResponse != null && listenResponse.isSuccess()) { - + Set changeKeys = new HashSet(); - + List changedConfigs = listenResponse.getChangedConfigs(); //handle changed keys,notify listener if (!CollectionUtils.isEmpty(changedConfigs)) { @@ -927,9 +927,9 @@ public class ClientWorker implements Closeable { boolean isInitializing = cacheMap.get().get(changeKey).isInitializing(); refreshContentAndCheck(changeKey, !isInitializing); } - + } - + for (CacheData cacheData : listenCaches) { if (cacheData.getReceiveNotifyChanged().get()) { String changeKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, @@ -940,7 +940,7 @@ public class ClientWorker implements Closeable { } } } - + //handler content configs for (CacheData cacheData : listenCaches) { cacheData.setInitializing(false); @@ -954,7 +954,7 @@ public class ClientWorker implements Closeable { } } } - + } } catch (Throwable e) { LOGGER.error("Execute listen config change error ", e); @@ -967,7 +967,7 @@ public class ClientWorker implements Closeable { } }); listenFutures.add(future); - + } for (Future future : listenFutures) { try { @@ -976,7 +976,7 @@ public class ClientWorker implements Closeable { LOGGER.error("Async listen config change error ", throwable); } } - + } return hasChangedKeys.get(); } @@ -992,7 +992,6 @@ public class ClientWorker implements Closeable { if (rpcClient.isWaitInitiated()) { initRpcClientHandler(rpcClient); rpcClient.setTenant(getTenant()); - rpcClient.clientAbilities(initAbilities()); rpcClient.start(); } @@ -1000,14 +999,7 @@ public class ClientWorker implements Closeable { } } - - private ClientAbilities initAbilities() { - ClientAbilities clientAbilities = new ClientAbilities(); - clientAbilities.getRemoteAbility().setSupportRemoteConnection(true); - clientAbilities.getConfigAbility().setSupportRemoteMetrics(true); - return clientAbilities; - } - + /** * build config string. * @@ -1178,7 +1170,7 @@ public class ClientWorker implements Closeable { ConfigRemoveResponse response = (ConfigRemoveResponse) requestProxy(getOneRunningClient(), request); return response.isSuccess(); } - + /** * check server is health. * diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ConfigTransportClient.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ConfigTransportClient.java index 596c99784..12e7f9903 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ConfigTransportClient.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ConfigTransportClient.java @@ -23,10 +23,10 @@ import com.alibaba.nacos.client.env.NacosClientProperties; import com.alibaba.nacos.plugin.auth.api.RequestResource; import com.alibaba.nacos.client.config.filter.impl.ConfigResponse; import com.alibaba.nacos.client.security.SecurityProxy; -import com.alibaba.nacos.client.utils.ParamUtil; import com.alibaba.nacos.common.utils.ConvertUtils; import com.alibaba.nacos.common.utils.MD5Utils; import com.alibaba.nacos.common.utils.StringUtils; +import com.alibaba.nacos.client.utils.ParamUtil; import java.util.HashMap; import java.util.Map; diff --git a/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java b/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java index 062701d55..1259cb92e 100644 --- a/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java +++ b/client/src/main/java/com/alibaba/nacos/client/config/impl/ServerListManager.java @@ -23,7 +23,6 @@ import com.alibaba.nacos.client.env.NacosClientProperties; import com.alibaba.nacos.client.utils.ContextPathUtil; import com.alibaba.nacos.client.utils.EnvUtil; import com.alibaba.nacos.client.utils.LogUtils; -import com.alibaba.nacos.client.utils.ParamUtil; import com.alibaba.nacos.client.utils.TemplateUtils; import com.alibaba.nacos.common.http.HttpRestResult; import com.alibaba.nacos.common.http.client.NacosRestTemplate; @@ -48,6 +47,7 @@ import java.util.StringTokenizer; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.alibaba.nacos.client.utils.ParamUtil; import static com.alibaba.nacos.common.constant.RequestUrlConstants.HTTPS_PREFIX; import static com.alibaba.nacos.common.constant.RequestUrlConstants.HTTP_PREFIX; diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandler.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandler.java index bf6e1917a..83c828c80 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandler.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandler.java @@ -21,6 +21,7 @@ import com.alibaba.nacos.api.naming.remote.response.NotifySubscriberResponse; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder; +import com.alibaba.nacos.common.remote.client.Connection; import com.alibaba.nacos.common.remote.client.ServerRequestHandler; /** @@ -37,7 +38,7 @@ public class NamingPushRequestHandler implements ServerRequestHandler { } @Override - public Response requestReply(Request request) { + public Response requestReply(Request request, Connection connection) { if (request instanceof NotifySubscriberRequest) { NotifySubscriberRequest notifyRequest = (NotifySubscriberRequest) request; serviceInfoHolder.processServiceInfo(notifyRequest.getServiceInfo()); diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoService.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoService.java index 481b4eecb..aad883dda 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoService.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoService.java @@ -28,6 +28,7 @@ import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData; import com.alibaba.nacos.client.naming.remote.gprc.redo.data.SubscriberRedoData; import com.alibaba.nacos.client.utils.LogUtils; import com.alibaba.nacos.common.executor.NameThreadFactory; +import com.alibaba.nacos.common.remote.client.Connection; import com.alibaba.nacos.common.remote.client.ConnectionEventListener; import java.util.HashSet; @@ -84,13 +85,13 @@ public class NamingGrpcRedoService implements ConnectionEventListener { } @Override - public void onConnected() { + public void onConnected(Connection connection) { connected = true; LogUtils.NAMING_LOGGER.info("Grpc connection connect"); } @Override - public void onDisConnect() { + public void onDisConnect(Connection connection) { connected = false; LogUtils.NAMING_LOGGER.warn("Grpc connection disconnect, mark to redo"); synchronized (registeredInstances) { diff --git a/client/src/main/resources/META-INF/services/com.alibaba.nacos.common.ability.AbstractAbilityControlManager b/client/src/main/resources/META-INF/services/com.alibaba.nacos.common.ability.AbstractAbilityControlManager new file mode 100644 index 000000000..6434f25b1 --- /dev/null +++ b/client/src/main/resources/META-INF/services/com.alibaba.nacos.common.ability.AbstractAbilityControlManager @@ -0,0 +1,18 @@ +# +# Copyright 1999-2022 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +com.alibaba.nacos.client.ability.ClientAbilityControlManager \ No newline at end of file diff --git a/client/src/test/java/com/alibaba/nacos/client/ability/AbilityTest.java b/client/src/test/java/com/alibaba/nacos/client/ability/AbilityTest.java new file mode 100644 index 000000000..e4b53de10 --- /dev/null +++ b/client/src/test/java/com/alibaba/nacos/client/ability/AbilityTest.java @@ -0,0 +1,174 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client.ability; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.constant.AbilityStatus; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.remote.RequestCallBack; +import com.alibaba.nacos.api.remote.RequestFuture; +import com.alibaba.nacos.api.remote.request.Request; +import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.client.naming.remote.TestConnection; +import com.alibaba.nacos.common.remote.ConnectionType; +import com.alibaba.nacos.common.remote.client.RpcClient; +import com.alibaba.nacos.common.remote.client.Connection; +import com.alibaba.nacos.common.remote.client.RpcClientConfig; +import com.alibaba.nacos.common.remote.client.ServerListFactory; +import com.alibaba.nacos.common.remote.client.ServerRequestHandler; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AbilityTest { + + private RpcClient rpcClient; + + private Connection connection; + + @Test + public void testReceive() throws Exception { + rpcClient = new RpcClient(new RpcClientConfig() { + @Override + public String name() { + return "test"; + } + + @Override + public int retryTimes() { + return 1; + } + + @Override + public long timeOutMills() { + return 3000L; + } + + @Override + public long connectionKeepAlive() { + return 5000L; + } + + @Override + public int healthCheckRetryTimes() { + return 1; + } + + @Override + public long healthCheckTimeOut() { + return 3000L; + } + + @Override + public Map labels() { + return new HashMap<>(); + } + }) { + + @Override + public ConnectionType getConnectionType() { + return null; + } + + @Override + public int rpcPortOffset() { + return 0; + } + + @Override + public Connection connectToServer(ServerInfo serverInfo) throws Exception { + connection = new Connection(new RpcClient.ServerInfo()) { + + { + super.abilityTable = new HashMap<>(); + super.abilityTable.put(AbilityKey.SERVER_TEST_1.getName(), true); + super.abilityTable.put(AbilityKey.SERVER_TEST_2.getName(), false); + } + + @Override + public Response request(Request request, long timeoutMills) throws NacosException { + return null; + } + + @Override + public RequestFuture requestFuture(Request request) throws NacosException { + return null; + } + + @Override + public void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException { + + } + + @Override + public void close() { + + } + };; + return connection; + } + }; + rpcClient.start(); + // test not ready + Assert.assertNull(rpcClient.getConnectionAbility(AbilityKey.SERVER_TEST_1)); + + // test ready + rpcClient.serverListFactory(new ServerListFactory() { + + @Override + public String genNextServer() { + return "localhost:8848"; + } + + @Override + public String getCurrentServer() { + return "localhost:8848"; + } + + @Override + public List getServerList() { + return null; + } + }); + rpcClient.start(); + // if connect successfully + Assert.assertEquals(rpcClient.getConnectionAbility(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED); + Assert.assertEquals(rpcClient.getConnectionAbility(AbilityKey.SERVER_TEST_2), AbilityStatus.NOT_SUPPORTED); + } + + @After + public void testServerRequestAbility() { + //test support + ServerRequestHandler serverRequestHandler = (request, connection) -> { + Assert.assertEquals(connection.getConnectionAbility(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED); + Assert.assertEquals(connection.getConnectionAbility(AbilityKey.SERVER_TEST_2), AbilityStatus.NOT_SUPPORTED); + return new Response() { }; + }; + serverRequestHandler.requestReply(null, connection); + + // test no ability table + serverRequestHandler = (request, connection) -> { + Assert.assertEquals(connection.getConnectionAbility(AbilityKey.SERVER_TEST_1), AbilityStatus.UNKNOWN); + return new Response() { }; + }; + serverRequestHandler.requestReply(null, new TestConnection(new RpcClient.ServerInfo())); + } + +} diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/remote/TestConnection.java b/client/src/test/java/com/alibaba/nacos/client/naming/remote/TestConnection.java new file mode 100644 index 000000000..43fc088fd --- /dev/null +++ b/client/src/test/java/com/alibaba/nacos/client/naming/remote/TestConnection.java @@ -0,0 +1,52 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.client.naming.remote; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.remote.RequestCallBack; +import com.alibaba.nacos.api.remote.RequestFuture; +import com.alibaba.nacos.api.remote.request.Request; +import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.common.remote.client.Connection; +import com.alibaba.nacos.common.remote.client.RpcClient; + +public class TestConnection extends Connection { + + public TestConnection(RpcClient.ServerInfo serverInfo) { + super(serverInfo); + } + + @Override + public Response request(Request request, long timeoutMills) throws NacosException { + return null; + } + + @Override + public RequestFuture requestFuture(Request request) throws NacosException { + return null; + } + + @Override + public void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException { + + } + + @Override + public void close() { + + } +} diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandlerTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandlerTest.java index 2e70dd24c..22dc7fd64 100644 --- a/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandlerTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandlerTest.java @@ -24,6 +24,8 @@ import com.alibaba.nacos.api.naming.remote.response.NotifySubscriberResponse; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.client.naming.cache.ServiceInfoHolder; +import com.alibaba.nacos.client.naming.remote.TestConnection; +import com.alibaba.nacos.common.remote.client.RpcClient; import org.junit.Assert; import org.junit.Test; @@ -41,7 +43,7 @@ public class NamingPushRequestHandlerTest { ServiceInfo info = new ServiceInfo("name", "cluster1"); Request req = NotifySubscriberRequest.buildNotifySubscriberRequest(info); //when - Response response = handler.requestReply(req); + Response response = handler.requestReply(req, new TestConnection(new RpcClient.ServerInfo())); //then Assert.assertTrue(response instanceof NotifySubscriberResponse); verify(holder, times(1)).processServiceInfo(info); diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoServiceTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoServiceTest.java index 78851c386..23681502a 100644 --- a/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoServiceTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/redo/NamingGrpcRedoServiceTest.java @@ -18,11 +18,13 @@ package com.alibaba.nacos.client.naming.remote.gprc.redo; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.naming.pojo.Instance; +import com.alibaba.nacos.client.naming.remote.TestConnection; import com.alibaba.nacos.client.env.NacosClientProperties; import com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy; import com.alibaba.nacos.client.naming.remote.gprc.redo.data.BatchInstanceRedoData; import com.alibaba.nacos.client.naming.remote.gprc.redo.data.InstanceRedoData; import com.alibaba.nacos.client.naming.remote.gprc.redo.data.SubscriberRedoData; +import com.alibaba.nacos.common.remote.client.RpcClient; import com.alibaba.nacos.common.utils.ReflectUtils; import org.junit.After; import org.junit.Before; @@ -75,48 +77,48 @@ public class NamingGrpcRedoServiceTest { public void testDefaultProperties() throws Exception { Field redoThreadCountField = NamingGrpcRedoService.class.getDeclaredField("redoThreadCount"); redoThreadCountField.setAccessible(true); - + Field redoDelayTimeField = NamingGrpcRedoService.class.getDeclaredField("redoDelayTime"); redoDelayTimeField.setAccessible(true); - + Long redoDelayTimeValue = (Long) redoDelayTimeField.get(redoService); Integer redoThreadCountValue = (Integer) redoThreadCountField.get(redoService); - + assertEquals(Long.valueOf(3000L), redoDelayTimeValue); assertEquals(Integer.valueOf(1), redoThreadCountValue); } - + @Test public void testCustomProperties() throws Exception { Properties prop = new Properties(); prop.setProperty(PropertyKeyConst.REDO_DELAY_TIME, "4000"); prop.setProperty(PropertyKeyConst.REDO_DELAY_THREAD_COUNT, "2"); NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(prop); - + NamingGrpcRedoService redoService = new NamingGrpcRedoService(clientProxy, nacosClientProperties); - + Field redoThreadCountField = NamingGrpcRedoService.class.getDeclaredField("redoThreadCount"); redoThreadCountField.setAccessible(true); - + Field redoDelayTimeField = NamingGrpcRedoService.class.getDeclaredField("redoDelayTime"); redoDelayTimeField.setAccessible(true); - + Long redoDelayTimeValue = (Long) redoDelayTimeField.get(redoService); Integer redoThreadCountValue = (Integer) redoThreadCountField.get(redoService); assertEquals(Long.valueOf(4000L), redoDelayTimeValue); assertEquals(Integer.valueOf(2), redoThreadCountValue); } - + @Test public void testOnConnected() { assertFalse(redoService.isConnected()); - redoService.onConnected(); + redoService.onConnected(new TestConnection(new RpcClient.ServerInfo())); assertTrue(redoService.isConnected()); } @Test public void testOnDisConnect() { - redoService.onConnected(); + redoService.onConnected(new TestConnection(new RpcClient.ServerInfo())); redoService.cacheInstanceForRedo(SERVICE, GROUP, new Instance()); redoService.instanceRegistered(SERVICE, GROUP); redoService.cacheSubscriberForRedo(SERVICE, GROUP, CLUSTER); @@ -124,7 +126,7 @@ public class NamingGrpcRedoServiceTest { assertTrue(redoService.isConnected()); assertTrue(redoService.findInstanceRedoData().isEmpty()); assertTrue(redoService.findSubscriberRedoData().isEmpty()); - redoService.onDisConnect(); + redoService.onDisConnect(new TestConnection(new RpcClient.ServerInfo())); assertFalse(redoService.isConnected()); assertFalse(redoService.findInstanceRedoData().isEmpty()); assertFalse(redoService.findSubscriberRedoData().isEmpty()); diff --git a/common/src/main/java/com/alibaba/nacos/common/ability/AbstractAbilityControlManager.java b/common/src/main/java/com/alibaba/nacos/common/ability/AbstractAbilityControlManager.java new file mode 100644 index 000000000..7571f5f70 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/ability/AbstractAbilityControlManager.java @@ -0,0 +1,220 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.ability; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.constant.AbilityMode; +import com.alibaba.nacos.api.ability.constant.AbilityStatus; +import com.alibaba.nacos.api.ability.initializer.AbilityPostProcessor; +import com.alibaba.nacos.common.notify.Event; +import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.common.spi.NacosServiceLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * It is a capability control center, manager current node abilities or other control. + * + * @author Daydreamer + * @date 2022/7/12 19:18 + **/ +public abstract class AbstractAbilityControlManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAbilityControlManager.class); + + /** + * current node support abilities. + */ + protected final Map> currentNodeAbilities = new ConcurrentHashMap<>(); + + protected AbstractAbilityControlManager() { + NotifyCenter.registerToPublisher(AbilityUpdateEvent.class, 16384); + initAbilityTable(); + } + + /** + * initialize abilities. + * + * @return abilities + */ + private void initAbilityTable() { + LOGGER.info("Ready to get current node abilities..."); + // get processors + Map> abilities = initCurrentNodeAbilities(); + // get abilities + for (AbilityMode mode : AbilityMode.values()) { + Map abilitiesTable = abilities.get(mode); + if (abilitiesTable == null) { + continue; + } + // check whether exist error key + // check for developer + for (AbilityKey abilityKey : abilitiesTable.keySet()) { + if (!mode.equals(abilityKey.getMode())) { + LOGGER.error( + "You should not contain a other mode: {} in a specify mode: {} abilities set, error key: {}, please check again.", + abilityKey.getMode(), mode, abilityKey); + throw new IllegalStateException( + "Except mode: " + mode + " but " + abilityKey + " mode: " + abilityKey.getMode() + + ", please check again."); + } + } + Collection processors = NacosServiceLoader.load(AbilityPostProcessor.class); + for (AbilityPostProcessor processor : processors) { + processor.process(mode, abilitiesTable); + } + } + // init + Set abilityModes = abilities.keySet(); + LOGGER.info("Ready to initialize current node abilities, support modes: {}", abilityModes); + for (AbilityMode abilityMode : abilityModes) { + this.currentNodeAbilities + .put(abilityMode, new ConcurrentHashMap<>(AbilityKey.mapStr(abilities.get(abilityMode)))); + } + LOGGER.info("Initialize current abilities finish..."); + } + + /** + * Turn on the ability whose key is

abilityKey

. + * + * @param abilityKey ability key{@link AbilityKey} + */ + public void enableCurrentNodeAbility(AbilityKey abilityKey) { + Map abilities = this.currentNodeAbilities.get(abilityKey.getMode()); + if (abilities != null) { + doTurn(abilities, abilityKey, true, abilityKey.getMode()); + } + } + + protected void doTurn(Map abilities, AbilityKey key, boolean turn, AbilityMode mode) { + if (!key.getMode().equals(mode)) { + throw new IllegalStateException("Except " + mode + " but " + key.getMode()); + } + LOGGER.info("Turn current node ability: {}, turn: {}", key, turn); + abilities.put(key.getName(), turn); + // notify event + AbilityUpdateEvent abilityUpdateEvent = new AbilityUpdateEvent(); + abilityUpdateEvent.setTable(Collections.unmodifiableMap(abilities)); + abilityUpdateEvent.isOn = turn; + abilityUpdateEvent.abilityKey = key; + NotifyCenter.publishEvent(abilityUpdateEvent); + } + + /** + * Turn off the ability whose key is

abilityKey

{@link AbilityKey}. + * + * @param abilityKey ability key + */ + public void disableCurrentNodeAbility(AbilityKey abilityKey) { + Map abilities = this.currentNodeAbilities.get(abilityKey.getMode()); + if (abilities != null) { + doTurn(abilities, abilityKey, false, abilityKey.getMode()); + } + } + + /** + * . Whether current node support + * + * @param abilityKey ability key from {@link AbilityKey} + * @return whether support + */ + public AbilityStatus isCurrentNodeAbilityRunning(AbilityKey abilityKey) { + Map abilities = currentNodeAbilities.get(abilityKey.getMode()); + if (abilities != null) { + Boolean support = abilities.get(abilityKey.getName()); + if (support != null) { + return support ? AbilityStatus.SUPPORTED : AbilityStatus.NOT_SUPPORTED; + } + } + return AbilityStatus.UNKNOWN; + } + + /** + * . Init current node abilities + * + * @return current node abilities + */ + protected abstract Map> initCurrentNodeAbilities(); + + /** + * . Return the abilities current node + * + * @return current abilities + */ + public Map getCurrentNodeAbilities(AbilityMode mode) { + Map abilities = currentNodeAbilities.get(mode); + if (abilities != null) { + return Collections.unmodifiableMap(abilities); + } + return Collections.emptyMap(); + } + + /** + * A legal nacos application has a ability control manager. If there are more than one, the one with higher priority + * is preferred + * + * @return priority + */ + public abstract int getPriority(); + + /** + * notify when current node ability changing. + */ + public class AbilityUpdateEvent extends Event { + + private static final long serialVersionUID = -1232411212311111L; + + private AbilityKey abilityKey; + + private boolean isOn; + + private Map table; + + private AbilityUpdateEvent() { + } + + public Map getAbilityTable() { + return table; + } + + public void setTable(Map abilityTable) { + this.table = abilityTable; + } + + public AbilityKey getAbilityKey() { + return abilityKey; + } + + public void setAbilityKey(AbilityKey abilityKey) { + this.abilityKey = abilityKey; + } + + public boolean isOn() { + return isOn; + } + + public void setOn(boolean on) { + isOn = on; + } + } +} diff --git a/common/src/main/java/com/alibaba/nacos/common/ability/discover/NacosAbilityManagerHolder.java b/common/src/main/java/com/alibaba/nacos/common/ability/discover/NacosAbilityManagerHolder.java new file mode 100644 index 000000000..3ca141718 --- /dev/null +++ b/common/src/main/java/com/alibaba/nacos/common/ability/discover/NacosAbilityManagerHolder.java @@ -0,0 +1,91 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.common.ability.discover; + +import com.alibaba.nacos.common.ability.AbstractAbilityControlManager; +import com.alibaba.nacos.common.spi.NacosServiceLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.ServiceConfigurationError; +import java.util.stream.Collectors; + +/** + * This class is used to discover {@link AbstractAbilityControlManager} implements. All the + * ability operation will be finish in this singleton. + * + * @author Daydreamer + * @date 2022/7/14 19:58 + **/ +public class NacosAbilityManagerHolder { + + /**. + * private constructor + */ + private NacosAbilityManagerHolder() { + } + + private static final Logger LOGGER = LoggerFactory.getLogger(NacosAbilityManagerHolder.class); + + /**. + * singleton + */ + private static AbstractAbilityControlManager abstractAbilityControlManager; + + static { + // spi discover implement + Collection load = null; + try { + // if server + load = NacosServiceLoader.load(AbstractAbilityControlManager.class); + } catch (ServiceConfigurationError e) { + throw new RuntimeException("[AbilityControlManager] Cannot find AbilityControlManger"); + } + // the priority of the server is higher + List collect = load.stream() + .sorted(Comparator.comparingInt(AbstractAbilityControlManager::getPriority)) + .collect(Collectors.toList()); + // get the highest priority one + if (load.size() > 0) { + abstractAbilityControlManager = collect.get(collect.size() - 1); + LOGGER.info("[AbilityControlManager] Successfully initialize AbilityControlManager"); + } + } + + /**. + * get nacos ability control manager + * + * @return BaseAbilityControlManager + */ + public static AbstractAbilityControlManager getInstance() { + return abstractAbilityControlManager; + } + + /**. + * Return the target type of ability manager + * + * @param clazz clazz + * @param target type + * @return AbilityControlManager + */ + public static T getInstance(Class clazz) { + return clazz.cast(abstractAbilityControlManager); + } +} diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java index b56eaaa3b..b8ea5a836 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/Connection.java @@ -16,8 +16,12 @@ package com.alibaba.nacos.common.remote.client; +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.constant.AbilityStatus; import com.alibaba.nacos.api.remote.Requester; +import java.util.Map; + /** * connection on client side. * @@ -33,6 +37,8 @@ public abstract class Connection implements Requester { protected RpcClient.ServerInfo serverInfo; + protected Map abilityTable; + public Connection(RpcClient.ServerInfo serverInfo) { this.serverInfo = serverInfo; } @@ -45,6 +51,17 @@ public abstract class Connection implements Requester { this.connectionId = connectionId; } + public AbilityStatus getConnectionAbility(AbilityKey abilityKey) { + if (abilityTable == null || !abilityTable.containsKey(abilityKey.getName())) { + return AbilityStatus.UNKNOWN; + } + return abilityTable.get(abilityKey.getName()) ? AbilityStatus.SUPPORTED : AbilityStatus.NOT_SUPPORTED; + } + + public void setAbilityTable(Map abilityTable) { + this.abilityTable = abilityTable; + } + /** * Getter method for property abandon. * diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/ConnectionEventListener.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/ConnectionEventListener.java index 7074461b1..1d689de1d 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/ConnectionEventListener.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/ConnectionEventListener.java @@ -25,11 +25,15 @@ public interface ConnectionEventListener { /** * notify when connected to server. + * + * @param connection connection has connected */ - void onConnected(); + void onConnected(Connection connection); /** * notify when disconnected to server. + * + * @param connection connection has disconnected */ - void onDisConnect(); + void onDisConnect(Connection connection); } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java index 2e3986e70..b75cb5ee7 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java @@ -16,7 +16,8 @@ package com.alibaba.nacos.common.remote.client; -import com.alibaba.nacos.api.ability.ClientAbilities; +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.constant.AbilityStatus; import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.remote.RequestCallBack; @@ -82,11 +83,9 @@ public abstract class RpcClient implements Closeable { private final BlockingQueue reconnectionSignal = new ArrayBlockingQueue<>(1); protected volatile Connection currentConnection; - + private String tenant; - protected ClientAbilities clientAbilities; - private long lastActiveTimeStamp = System.currentTimeMillis(); /** @@ -102,9 +101,9 @@ public abstract class RpcClient implements Closeable { private static final Pattern EXCLUDE_PROTOCOL_PATTERN = Pattern.compile("(?<=\\w{1,5}://)(.*)"); protected RpcClientConfig rpcClientConfig; - + protected final ResourceLoader resourceLoader = new DefaultResourceLoader(); - + static { PayloadRegistry.init(); } @@ -118,7 +117,7 @@ public abstract class RpcClient implements Closeable { this.serverListFactory = serverListFactory; init(); } - + protected void init() { if (this.serverListFactory != null) { rpcClientStatus.compareAndSet(RpcClientStatus.WAIT_INIT, RpcClientStatus.INITIALIZED); @@ -131,16 +130,6 @@ public abstract class RpcClient implements Closeable { return Collections.unmodifiableMap(rpcClientConfig.labels()); } - /** - * init client abilities. - * - * @param clientAbilities clientAbilities. - */ - public RpcClient clientAbilities(ClientAbilities clientAbilities) { - this.clientAbilities = clientAbilities; - return this; - } - /** * init server list factory. only can init once. * @@ -160,15 +149,17 @@ public abstract class RpcClient implements Closeable { /** * Notify when client disconnected. + * + * @param connection connection has disconnected */ - protected void notifyDisConnected() { + protected void notifyDisConnected(Connection connection) { if (connectionEventListeners.isEmpty()) { return; } LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify disconnected event to listeners", rpcClientConfig.name()); for (ConnectionEventListener connectionEventListener : connectionEventListeners) { try { - connectionEventListener.onDisConnect(); + connectionEventListener.onDisConnect(connection); } catch (Throwable throwable) { LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify disconnect listener error, listener = {}", rpcClientConfig.name(), connectionEventListener.getClass().getName()); @@ -178,15 +169,17 @@ public abstract class RpcClient implements Closeable { /** * Notify when client new connected. + * + * @param connection connection has connected */ - protected void notifyConnected() { + protected void notifyConnected(Connection connection) { if (connectionEventListeners.isEmpty()) { return; } LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify connected event to listeners.", rpcClientConfig.name()); for (ConnectionEventListener connectionEventListener : connectionEventListeners) { try { - connectionEventListener.onConnected(); + connectionEventListener.onConnected(connection); } catch (Throwable throwable) { LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify connect listener error, listener = {}", rpcClientConfig.name(), connectionEventListener.getClass().getName()); @@ -268,9 +261,9 @@ public abstract class RpcClient implements Closeable { try { take = eventLinkedBlockingQueue.take(); if (take.isConnected()) { - notifyConnected(); + notifyConnected(take.connection); } else if (take.isDisConnected()) { - notifyDisConnected(); + notifyDisConnected(take.connection); } } catch (Throwable e) { // Do nothing @@ -376,7 +369,7 @@ public abstract class RpcClient implements Closeable { connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId()); this.currentConnection = connectToServer; rpcClientStatus.set(RpcClientStatus.RUNNING); - eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED)); + eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED, currentConnection)); } else { switchServerAsync(); } @@ -384,7 +377,7 @@ public abstract class RpcClient implements Closeable { registerServerRequestHandler(new ConnectResetRequestHandler()); // register client detection request. - registerServerRequestHandler(request -> { + registerServerRequestHandler((request, connection) -> { if (request instanceof ClientDetectionRequest) { return new ClientDetectionResponse(); } @@ -397,7 +390,7 @@ public abstract class RpcClient implements Closeable { class ConnectResetRequestHandler implements ServerRequestHandler { @Override - public Response requestReply(Request request) { + public Response requestReply(Request request, Connection connection) { if (request instanceof ConnectResetRequest) { @@ -413,6 +406,7 @@ public abstract class RpcClient implements Closeable { } else { switchServerAsync(); } + afterReset(connectResetRequest); } } } catch (Exception e) { @@ -424,6 +418,15 @@ public abstract class RpcClient implements Closeable { } } + /**. + * invoke after receiving reset request + * + * @param request request for resetting + */ + protected void afterReset(ConnectResetRequest request) { + // hook for GrpcClient + } + @Override public void shutdown() throws NacosException { LOGGER.info("Shutdown rpc client, set status to shutdown"); @@ -521,7 +524,7 @@ public abstract class RpcClient implements Closeable { currentConnection = connectionNew; rpcClientStatus.set(RpcClientStatus.RUNNING); switchSuccess = true; - eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED)); + eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.CONNECTED, currentConnection)); return; } @@ -585,7 +588,7 @@ public abstract class RpcClient implements Closeable { if (connection != null) { LOGGER.info("Close current connection " + connection.getConnectionId()); connection.close(); - eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.DISCONNECTED)); + eventLinkedBlockingQueue.add(new ConnectionEvent(ConnectionEvent.DISCONNECTED, connection)); } } @@ -822,7 +825,7 @@ public abstract class RpcClient implements Closeable { lastActiveTimeStamp = System.currentTimeMillis(); for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) { try { - Response response = serverRequestHandler.requestReply(request); + Response response = serverRequestHandler.requestReply(request, currentConnection); if (response != null) { LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Ack server push request, request = {}, requestId = {}", @@ -984,8 +987,11 @@ public abstract class RpcClient implements Closeable { int eventType; - public ConnectionEvent(int eventType) { + Connection connection; + + public ConnectionEvent(int eventType, Connection connection) { this.eventType = eventType; + this.connection = connection; } public boolean isConnected() { @@ -1025,4 +1031,18 @@ public abstract class RpcClient implements Closeable { public void setTenant(String tenant) { this.tenant = tenant; } + + /** + * Return ability of current connection. + * + * @param abilityKey ability key + * @return whether support, return null if connection is not ready + */ + public AbilityStatus getConnectionAbility(AbilityKey abilityKey) { + if (currentConnection != null) { + return currentConnection.getConnectionAbility(abilityKey); + } + // return null if connection is not ready + return null; + } } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/ServerRequestHandler.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/ServerRequestHandler.java index 19f97fde0..09ad1002e 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/ServerRequestHandler.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/ServerRequestHandler.java @@ -31,8 +31,9 @@ public interface ServerRequestHandler { * Handle request from server. * * @param request request + * @param connection current connection, it can be used to know server ability * @return response. */ - Response requestReply(Request request); + Response requestReply(Request request, Connection connection); } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/DefaultGrpcClientConfig.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/DefaultGrpcClientConfig.java index 685405022..fd44229af 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/DefaultGrpcClientConfig.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/DefaultGrpcClientConfig.java @@ -60,10 +60,12 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { private long healthCheckTimeOut; + private long capabilityNegotiationTimeout; + private Map labels; - + private RpcClientTlsConfig tlsConfig = new RpcClientTlsConfig(); - + /** * constructor. * @@ -90,6 +92,8 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { this.healthCheckTimeOut = loadLongConfig(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT, builder.healthCheckTimeOut); this.channelKeepAliveTimeout = loadLongConfig(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT, builder.channelKeepAliveTimeout); + this.capabilityNegotiationTimeout = loadLongConfig(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT, + builder.capabilityNegotiationTimeout); this.labels = builder.labels; this.labels.put("tls.enable", "false"); if (Objects.nonNull(builder.tlsConfig)) { @@ -167,26 +171,31 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { public long channelKeepAliveTimeout() { return channelKeepAliveTimeout; } - + @Override public RpcClientTlsConfig tlsConfig() { return tlsConfig; } - + public void setTlsConfig(RpcClientTlsConfig tlsConfig) { this.tlsConfig = tlsConfig; } - + + @Override + public long capabilityNegotiationTimeout() { + return this.capabilityNegotiationTimeout; + } + @Override public int healthCheckRetryTimes() { return healthCheckRetryTimes; } - + @Override public long healthCheckTimeOut() { return healthCheckTimeOut; } - + @Override public Map labels() { return this.labels; @@ -226,10 +235,12 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { private long healthCheckTimeOut = 3000L; + private long capabilityNegotiationTimeout = 5000L; + private Map labels = new HashMap<>(); - + private RpcClientTlsConfig tlsConfig = new RpcClientTlsConfig(); - + private Builder() { } @@ -250,47 +261,51 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { this.timeOutMills = Long.parseLong(properties.getProperty(GrpcConstants.GRPC_TIMEOUT_MILLS)); } if (properties.contains(GrpcConstants.GRPC_CONNECT_KEEP_ALIVE_TIME)) { - this.connectionKeepAlive = Long.parseLong( - properties.getProperty(GrpcConstants.GRPC_CONNECT_KEEP_ALIVE_TIME)); + this.connectionKeepAlive = Long + .parseLong(properties.getProperty(GrpcConstants.GRPC_CONNECT_KEEP_ALIVE_TIME)); } if (properties.contains(GrpcConstants.GRPC_THREADPOOL_KEEPALIVETIME)) { - this.threadPoolKeepAlive = Long.parseLong( - properties.getProperty(GrpcConstants.GRPC_THREADPOOL_KEEPALIVETIME)); + this.threadPoolKeepAlive = Long + .parseLong(properties.getProperty(GrpcConstants.GRPC_THREADPOOL_KEEPALIVETIME)); } if (properties.contains(GrpcConstants.GRPC_THREADPOOL_CORE_SIZE)) { - this.threadPoolCoreSize = Integer.parseInt( - properties.getProperty(GrpcConstants.GRPC_THREADPOOL_CORE_SIZE)); + this.threadPoolCoreSize = Integer + .parseInt(properties.getProperty(GrpcConstants.GRPC_THREADPOOL_CORE_SIZE)); } if (properties.contains(GrpcConstants.GRPC_THREADPOOL_MAX_SIZE)) { - this.threadPoolMaxSize = Integer.parseInt( - properties.getProperty(GrpcConstants.GRPC_THREADPOOL_MAX_SIZE)); + this.threadPoolMaxSize = Integer + .parseInt(properties.getProperty(GrpcConstants.GRPC_THREADPOOL_MAX_SIZE)); } if (properties.contains(GrpcConstants.GRPC_SERVER_CHECK_TIMEOUT)) { - this.serverCheckTimeOut = Long.parseLong( - properties.getProperty(GrpcConstants.GRPC_SERVER_CHECK_TIMEOUT)); + this.serverCheckTimeOut = Long + .parseLong(properties.getProperty(GrpcConstants.GRPC_SERVER_CHECK_TIMEOUT)); } if (properties.contains(GrpcConstants.GRPC_QUEUESIZE)) { this.threadPoolQueueSize = Integer.parseInt(properties.getProperty(GrpcConstants.GRPC_QUEUESIZE)); } if (properties.contains(GrpcConstants.GRPC_MAX_INBOUND_MESSAGE_SIZE)) { - this.maxInboundMessageSize = Integer.parseInt( - properties.getProperty(GrpcConstants.GRPC_MAX_INBOUND_MESSAGE_SIZE)); + this.maxInboundMessageSize = Integer + .parseInt(properties.getProperty(GrpcConstants.GRPC_MAX_INBOUND_MESSAGE_SIZE)); } if (properties.contains(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME)) { - this.channelKeepAlive = Integer.parseInt( - properties.getProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME)); + this.channelKeepAlive = Integer + .parseInt(properties.getProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIME)); + } + if (properties.contains(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT)) { + this.capabilityNegotiationTimeout = Integer + .parseInt(properties.getProperty(GrpcConstants.GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT)); } if (properties.contains(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES)) { - this.healthCheckRetryTimes = Integer.parseInt( - properties.getProperty(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES)); + this.healthCheckRetryTimes = Integer + .parseInt(properties.getProperty(GrpcConstants.GRPC_HEALTHCHECK_RETRY_TIMES)); } if (properties.contains(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT)) { - this.healthCheckTimeOut = Long.parseLong( - properties.getProperty(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT)); + this.healthCheckTimeOut = Long + .parseLong(properties.getProperty(GrpcConstants.GRPC_HEALTHCHECK_TIMEOUT)); } if (properties.contains(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT)) { - this.channelKeepAliveTimeout = Integer.parseInt( - properties.getProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT)); + this.channelKeepAliveTimeout = Integer + .parseInt(properties.getProperty(GrpcConstants.GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT)); } this.tlsConfig = RpcClientTlsConfig.properties(properties); return this; @@ -399,6 +414,10 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { return this; } + public void setCapabilityNegotiationTimeout(long capabilityNegotiationTimeout) { + this.capabilityNegotiationTimeout = capabilityNegotiationTimeout; + } + /** * set healthCheckRetryTimes. */ @@ -422,7 +441,7 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { this.labels.putAll(labels); return this; } - + /** * set tlsConfig. * @@ -433,7 +452,7 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { this.tlsConfig = tlsConfig; return this; } - + /** * build GrpcClientConfig. */ @@ -441,5 +460,5 @@ public class DefaultGrpcClientConfig implements GrpcClientConfig { return new DefaultGrpcClientConfig(this); } } - + } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java index f21bf52a5..923acc997 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java @@ -16,30 +16,35 @@ package com.alibaba.nacos.common.remote.client.grpc; +import com.alibaba.nacos.api.ability.constant.AbilityMode; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc; import com.alibaba.nacos.api.grpc.auto.Payload; import com.alibaba.nacos.api.grpc.auto.RequestGrpc; +import com.alibaba.nacos.api.remote.request.ConnectResetRequest; import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest; import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.ServerCheckRequest; +import com.alibaba.nacos.api.remote.request.SetupAckRequest; import com.alibaba.nacos.api.remote.response.ErrorResponse; import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.api.remote.response.ServerCheckResponse; +import com.alibaba.nacos.api.remote.response.SetupAckResponse; +import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder; import com.alibaba.nacos.common.packagescan.resource.Resource; import com.alibaba.nacos.common.remote.ConnectionType; -import com.alibaba.nacos.common.remote.client.RpcClient; -import com.alibaba.nacos.common.remote.client.RpcClientStatus; -import com.alibaba.nacos.common.remote.client.Connection; import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig; +import com.alibaba.nacos.common.remote.client.RpcClientStatus; +import com.alibaba.nacos.common.remote.client.RpcClient; import com.alibaba.nacos.common.remote.client.ServerListFactory; +import com.alibaba.nacos.common.remote.client.Connection; +import com.alibaba.nacos.common.remote.client.ServerRequestHandler; import com.alibaba.nacos.common.utils.JacksonUtils; import com.alibaba.nacos.common.utils.LoggerUtils; import com.alibaba.nacos.common.utils.StringUtils; import com.alibaba.nacos.common.utils.VersionUtils; import com.alibaba.nacos.common.utils.TlsTypeResolve; import com.alibaba.nacos.common.utils.ThreadFactoryBuilder; -import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; @@ -56,9 +61,11 @@ import io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; import java.util.Map; +import java.util.Optional; +import java.util.Arrays; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -78,6 +85,11 @@ public abstract class GrpcClient extends RpcClient { private ThreadPoolExecutor grpcExecutor; + /** + * Block to wait setup success response. + */ + private final RecAbilityContext recAbilityContext = new RecAbilityContext(null); + @Override public ConnectionType getConnectionType() { return ConnectionType.GRPC; @@ -91,7 +103,7 @@ public abstract class GrpcClient extends RpcClient { public GrpcClient(String name) { this(DefaultGrpcClientConfig.newBuilder().setName(name).build()); } - + /** * constructor. * @@ -109,6 +121,7 @@ public abstract class GrpcClient extends RpcClient { public GrpcClient(GrpcClientConfig clientConfig) { super(clientConfig); this.clientConfig = clientConfig; + initSetupHandler(); } /** @@ -120,6 +133,15 @@ public abstract class GrpcClient extends RpcClient { public GrpcClient(GrpcClientConfig clientConfig, ServerListFactory serverListFactory) { super(clientConfig, serverListFactory); this.clientConfig = clientConfig; + initSetupHandler(); + } + + /** + * setup handler. + */ + private void initSetupHandler() { + // register to handler setup request + registerServerRequestHandler(new SetupRequestHandler(this.recAbilityContext)); } /** @@ -161,7 +183,7 @@ public abstract class GrpcClient extends RpcClient { grpcExecutor.shutdown(); } } - + /** * Create a stub using a channel. * @@ -171,7 +193,7 @@ public abstract class GrpcClient extends RpcClient { private RequestGrpc.RequestFutureStub createNewChannelStub(ManagedChannel managedChannelTemp) { return RequestGrpc.newFutureStub(managedChannelTemp); } - + /** * create a new channel with specific server address. * @@ -190,7 +212,7 @@ public abstract class GrpcClient extends RpcClient { .keepAliveTimeout(clientConfig.channelKeepAliveTimeout(), TimeUnit.MILLISECONDS); return managedChannelBuilder.build(); } - + /** * shutdown a channel. * @@ -271,6 +293,8 @@ public abstract class GrpcClient extends RpcClient { LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}", grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8()); + // remove and notify + recAbilityContext.release(null); } } @@ -323,6 +347,8 @@ public abstract class GrpcClient extends RpcClient { @Override public Connection connectToServer(ServerInfo serverInfo) { + // the newest connection id + String connectionId = ""; try { if (grpcExecutor == null) { this.grpcExecutor = createGrpcExecutor(serverInfo.getServerIp()); @@ -330,62 +356,196 @@ public abstract class GrpcClient extends RpcClient { int port = serverInfo.getServerPort() + rpcPortOffset(); ManagedChannel managedChannel = createNewManagedChannel(serverInfo.getServerIp(), port); RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(managedChannel); - Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp); - if (!(response instanceof ServerCheckResponse)) { - shuntDownChannel(managedChannel); - return null; + if (newChannelStubTemp != null) { + + Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp); + if (response == null || !(response instanceof ServerCheckResponse)) { + shuntDownChannel(managedChannel); + return null; + } + + // submit ability table as soon as possible + // ability table will be null if server doesn't support ability table + ServerCheckResponse serverCheckResponse = (ServerCheckResponse) response; + connectionId = serverCheckResponse.getConnectionId(); + + BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub( + newChannelStubTemp.getChannel()); + GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor); + grpcConn.setConnectionId(connectionId); + // if not supported, it will be false + if (serverCheckResponse.isSupportAbilityNegotiation()) { + // mark + this.recAbilityContext.reset(grpcConn); + } + + //create stream request and bind connection event to this connection. + StreamObserver payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn); + + // stream observer to send response to server + grpcConn.setPayloadStreamObserver(payloadStreamObserver); + grpcConn.setGrpcFutureServiceStub(newChannelStubTemp); + grpcConn.setChannel(managedChannel); + //send a setup request. + ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest(); + conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion()); + conSetupRequest.setLabels(super.getLabels()); + // set ability table + conSetupRequest.setAbilityTable(NacosAbilityManagerHolder.getInstance().getCurrentNodeAbilities(abilityMode())); + conSetupRequest.setTenant(super.getTenant()); + grpcConn.sendRequest(conSetupRequest); + // wait for response + if (recAbilityContext.isNeedToSync()) { + // try to wait for notify response + recAbilityContext.await(this.clientConfig.capabilityNegotiationTimeout(), TimeUnit.MILLISECONDS); + } else { + // leave for adapting old version server + // wait to register connection setup + Thread.sleep(100L); + } + return grpcConn; } - - BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub( - newChannelStubTemp.getChannel()); - GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor); - grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId()); - - //create stream request and bind connection event to this connection. - StreamObserver payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn); - - // stream observer to send response to server - grpcConn.setPayloadStreamObserver(payloadStreamObserver); - grpcConn.setGrpcFutureServiceStub(newChannelStubTemp); - grpcConn.setChannel(managedChannel); - //send a setup request. - ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest(); - conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion()); - conSetupRequest.setLabels(super.getLabels()); - conSetupRequest.setAbilities(super.clientAbilities); - conSetupRequest.setTenant(super.getTenant()); - grpcConn.sendRequest(conSetupRequest); - //wait to register connection setup - Thread.sleep(100L); - return grpcConn; + return null; } catch (Exception e) { - LOGGER.error("[{}]Fail to connect to server!", GrpcClient.this.getName(), e); + LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e); + // remove and notify + recAbilityContext.release(null); } return null; } + + /** + * ability mode: sdk client or cluster client. + * + * @return mode + */ + protected abstract AbilityMode abilityMode(); + + @Override + protected void afterReset(ConnectResetRequest request) { + recAbilityContext.release(null); + } + + /** + * This is for receiving server abilities. + */ + class RecAbilityContext { + + /** + * connection waiting for server abilities. + */ + private volatile Connection connection; + + /** + * way to block client. + */ + private volatile CountDownLatch blocker; + + private volatile boolean needToSync = false; + + public RecAbilityContext(Connection connection) { + this.connection = connection; + this.blocker = new CountDownLatch(1); + } + + /** + * whether to sync for ability table. + * + * @return whether to sync for ability table. + */ + public boolean isNeedToSync() { + return this.needToSync; + } + + /** + * reset with new connection which is waiting for ability table. + * + * @param connection new connection which is waiting for ability table. + */ + public void reset(Connection connection) { + this.connection = connection; + this.blocker = new CountDownLatch(1); + this.needToSync = true; + } + + /** + * notify sync by abilities. + * + * @param abilities abilities. + */ + public void release(Map abilities) { + if (this.connection != null) { + this.connection.setAbilityTable(abilities); + // avoid repeat setting + this.connection = null; + } + if (this.blocker != null) { + blocker.countDown(); + } + this.needToSync = false; + } + + /** + * await for abilities. + * + * @param timeout timeout. + * @param unit unit. + * @throws InterruptedException by blocker. + */ + public void await(long timeout, TimeUnit unit) throws InterruptedException { + if (this.blocker != null) { + this.blocker.await(timeout, unit); + } + this.needToSync = false; + } + } + + /** + * Setup response handler. + */ + class SetupRequestHandler implements ServerRequestHandler { + + private final RecAbilityContext abilityContext; + + public SetupRequestHandler(RecAbilityContext abilityContext) { + this.abilityContext = abilityContext; + } + + @Override + public Response requestReply(Request request, Connection connection) { + // if finish setup + if (request instanceof SetupAckRequest) { + SetupAckRequest setupAckRequest = (SetupAckRequest) request; + // remove and count down + recAbilityContext.release(setupAckRequest.getAbilityTable()); + return new SetupAckResponse(); + } + return null; + } + } private ManagedChannelBuilder buildChannel(String serverIp, int port, Optional sslContext) { if (sslContext.isPresent()) { return NettyChannelBuilder.forAddress(serverIp, port).negotiationType(NegotiationType.TLS) .sslContext(sslContext.get()); - + } else { return ManagedChannelBuilder.forAddress(serverIp, port).usePlaintext(); } } - + private Optional buildSslContext() { - + RpcClientTlsConfig tlsConfig = clientConfig.tlsConfig(); if (!tlsConfig.getEnableTls()) { - return Optional.absent(); + return Optional.empty(); } try { SslContextBuilder builder = GrpcSslContexts.forClient(); if (StringUtils.isNotBlank(tlsConfig.getSslProvider())) { builder.sslProvider(TlsTypeResolve.getSslProvider(tlsConfig.getSslProvider())); } - + if (StringUtils.isNotBlank(tlsConfig.getProtocols())) { builder.protocols(tlsConfig.getProtocols().split(",")); } @@ -401,7 +561,7 @@ public abstract class GrpcClient extends RpcClient { Resource resource = resourceLoader.getResource(tlsConfig.getTrustCollectionCertFile()); builder.trustManager(resource.getInputStream()); } - + if (tlsConfig.getMutualAuthEnable()) { if (StringUtils.isBlank(tlsConfig.getCertChainFile()) || StringUtils.isBlank( tlsConfig.getCertPrivateKey())) { diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientConfig.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientConfig.java index 776072830..1c1b4003b 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientConfig.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientConfig.java @@ -96,4 +96,11 @@ public interface GrpcClientConfig extends RpcClientConfig { */ void setTlsConfig(RpcClientTlsConfig tlsConfig); + /** + * get timeout of connection setup(TimeUnit.MILLISECONDS). + * + * @return timeout of connection setup + */ + long capabilityNegotiationTimeout(); + } diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java index 61f5b7649..7749f69b6 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClusterClient.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.common.remote.client.grpc; +import com.alibaba.nacos.api.ability.constant.AbilityMode; import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig; @@ -75,6 +76,11 @@ public class GrpcClusterClient extends GrpcClient { super(name, threadPoolCoreSize, threadPoolMaxSize, labels, tlsConfig); } + @Override + protected AbilityMode abilityMode() { + return AbilityMode.CLUSTER_CLIENT; + } + @Override public int rpcPortOffset() { return Integer.parseInt(System.getProperty(GrpcConstants.NACOS_SERVER_GRPC_PORT_OFFSET_KEY, diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConstants.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConstants.java index c1afa0744..908997def 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConstants.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcConstants.java @@ -79,6 +79,9 @@ public class GrpcConstants { @GRpcConfigLabel public static final String GRPC_CHANNEL_KEEP_ALIVE_TIMEOUT = NACOS_CLIENT_GRPC + ".channel.keep.alive.timeout"; + @GRpcConfigLabel + public static final String GRPC_CHANNEL_CAPABILITY_NEGOTIATION_TIMEOUT = NACOS_CLIENT_GRPC + ".channel.capability.negotiation.timeout"; + private static final Set CONFIG_NAMES = new HashSet<>(); @Documented diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java index 3b8f859e2..0ede2af6d 100644 --- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java +++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcSdkClient.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.common.remote.client.grpc; +import com.alibaba.nacos.api.ability.constant.AbilityMode; import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig; @@ -65,6 +66,11 @@ public class GrpcSdkClient extends GrpcClient { super(name, threadPoolCoreSize, threadPoolMaxSize, labels, tlsConfig); } + @Override + protected AbilityMode abilityMode() { + return AbilityMode.SDK_CLIENT; + } + /** * constructor. * diff --git a/common/src/test/java/com/alibaba/nacos/common/remote/client/RpcClientTest.java b/common/src/test/java/com/alibaba/nacos/common/remote/client/RpcClientTest.java index 796e2c6d0..281d0ebb2 100644 --- a/common/src/test/java/com/alibaba/nacos/common/remote/client/RpcClientTest.java +++ b/common/src/test/java/com/alibaba/nacos/common/remote/client/RpcClientTest.java @@ -642,7 +642,7 @@ public class RpcClientTest { return null; } }; - rpcClient.serverRequestHandlers.add(req -> { + rpcClient.serverRequestHandlers.add((req, conn) -> { throw new RuntimeException(); }); rpcClient.handleServerRequest(request); diff --git a/common/src/test/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientTest.java b/common/src/test/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientTest.java index 246952dda..c3f8bf8ed 100644 --- a/common/src/test/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientTest.java +++ b/common/src/test/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientTest.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.common.remote.client.grpc; +import com.alibaba.nacos.api.ability.constant.AbilityMode; import com.alibaba.nacos.api.grpc.auto.RequestGrpc; import com.alibaba.nacos.common.remote.client.RpcClient; import com.alibaba.nacos.common.remote.client.RpcClientTlsConfig; @@ -70,6 +71,11 @@ public class GrpcClientTest { public void setUp() throws Exception { when(clientConfig.name()).thenReturn("testClient"); grpcClient = spy(new GrpcClient(clientConfig) { + @Override + protected AbilityMode abilityMode() { + return AbilityMode.SDK_CLIENT; + } + @Override public int rpcPortOffset() { return 1000; diff --git a/common/src/test/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientTlsTest.java b/common/src/test/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientTlsTest.java index 4a65577be..75397ae06 100644 --- a/common/src/test/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientTlsTest.java +++ b/common/src/test/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClientTlsTest.java @@ -16,6 +16,7 @@ package com.alibaba.nacos.common.remote.client.grpc; +import com.alibaba.nacos.api.ability.constant.AbilityMode; import org.junit.Test; import static org.mockito.Mockito.when; @@ -33,6 +34,12 @@ public class GrpcClientTlsTest extends GrpcClientTest { when(clientConfig.tlsConfig()).thenReturn(tlsConfig); grpcClient = new GrpcClient(clientConfig) { + + @Override + protected AbilityMode abilityMode() { + return AbilityMode.SDK_CLIENT; + } + @Override public int rpcPortOffset() { return 1000; @@ -51,6 +58,12 @@ public class GrpcClientTlsTest extends GrpcClientTest { when(clientConfig.tlsConfig()).thenReturn(tlsConfig); grpcClient = new GrpcClient(clientConfig) { + + @Override + protected AbilityMode abilityMode() { + return AbilityMode.SDK_CLIENT; + } + @Override public int rpcPortOffset() { return 1000; @@ -72,6 +85,11 @@ public class GrpcClientTlsTest extends GrpcClientTest { when(clientConfig.name()).thenReturn("testClient"); when(clientConfig.tlsConfig()).thenReturn(tlsConfig); grpcClient = new GrpcClient(clientConfig) { + @Override + protected AbilityMode abilityMode() { + return AbilityMode.SDK_CLIENT; + } + @Override public int rpcPortOffset() { return 1000; diff --git a/core/src/main/java/com/alibaba/nacos/core/ability/config/AbilityConfigs.java b/core/src/main/java/com/alibaba/nacos/core/ability/config/AbilityConfigs.java new file mode 100644 index 000000000..45b680b4b --- /dev/null +++ b/core/src/main/java/com/alibaba/nacos/core/ability/config/AbilityConfigs.java @@ -0,0 +1,109 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.ability.config; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.register.impl.ServerAbilities; +import com.alibaba.nacos.common.JustForTest; +import com.alibaba.nacos.common.ability.AbstractAbilityControlManager; +import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder; +import com.alibaba.nacos.common.event.ServerConfigChangeEvent; +import com.alibaba.nacos.common.notify.Event; +import com.alibaba.nacos.common.notify.NotifyCenter; +import com.alibaba.nacos.common.notify.listener.Subscriber; +import com.alibaba.nacos.common.utils.ConcurrentHashSet; +import com.alibaba.nacos.sys.env.EnvUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/**. + * @author Daydreamer + * @description Dynamically load ability from config + * @date 2022/8/31 12:27 + **/ +@Configuration +public class AbilityConfigs extends Subscriber { + + public static final String PREFIX = "nacos.core.ability."; + + private static final Logger LOGGER = LoggerFactory.getLogger(AbilityConfigs.class); + + private final Set serverAbilityKeys = new ConcurrentHashSet<>(); + + private AbstractAbilityControlManager abilityHandlerRegistry = NacosAbilityManagerHolder.getInstance(); + + public AbilityConfigs() { + // load ability + serverAbilityKeys.addAll(ServerAbilities.getStaticAbilities().keySet()); + NotifyCenter.registerSubscriber(this); + } + + @Override + public void onEvent(ServerConfigChangeEvent event) { + // load config + Map newValues = new HashMap<>(serverAbilityKeys.size()); + serverAbilityKeys.forEach(abilityKey -> { + String key = PREFIX + abilityKey.getName(); + try { + // scan + Boolean property = EnvUtil.getProperty(key, Boolean.class); + if (property != null) { + newValues.put(abilityKey, property); + } + } catch (Exception e) { + LOGGER.warn("Update ability config from env failed, use old val, ability : {} , because : {}", key, e); + } + }); + // update + refresh(newValues); + } + + /**. + * refresh ability + */ + private void refresh(Map newValues) { + newValues.forEach((abilityKey, val) -> { + // do nothing if has turned on/off + if (val) { + abilityHandlerRegistry.enableCurrentNodeAbility(abilityKey); + } else { + abilityHandlerRegistry.disableCurrentNodeAbility(abilityKey); + } + }); + } + + @Override + public Class subscribeType() { + return ServerConfigChangeEvent.class; + } + + @JustForTest + protected Set getServerAbilityKeys() { + return serverAbilityKeys; + } + + @JustForTest + protected void setAbilityHandlerRegistry(AbstractAbilityControlManager abilityHandlerRegistry) { + this.abilityHandlerRegistry = abilityHandlerRegistry; + } + +} diff --git a/core/src/main/java/com/alibaba/nacos/core/ability/control/ServerAbilityControlManager.java b/core/src/main/java/com/alibaba/nacos/core/ability/control/ServerAbilityControlManager.java new file mode 100644 index 000000000..fe467ff1d --- /dev/null +++ b/core/src/main/java/com/alibaba/nacos/core/ability/control/ServerAbilityControlManager.java @@ -0,0 +1,101 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.ability.control; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.constant.AbilityMode; +import com.alibaba.nacos.api.ability.register.impl.ClusterClientAbilities; +import com.alibaba.nacos.api.ability.register.impl.SdkClientAbilities; +import com.alibaba.nacos.api.ability.register.impl.ServerAbilities; +import com.alibaba.nacos.common.ability.AbstractAbilityControlManager; +import com.alibaba.nacos.core.ability.config.AbilityConfigs; +import com.alibaba.nacos.sys.env.EnvUtil; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/**. + * @author Daydreamer + * @description {@link AbstractAbilityControlManager} for nacos-server. + * @date 2022/7/13 21:14 + **/ +public class ServerAbilityControlManager extends AbstractAbilityControlManager { + + public ServerAbilityControlManager() { + } + + @Override + protected Map> initCurrentNodeAbilities() { + // init client abilities + Map> res = new HashMap<>(2); + res.put(AbilityMode.CLUSTER_CLIENT, initClusterClientAbilities()); + res.put(AbilityMode.SDK_CLIENT, initSdkClientAbilities()); + + // init server abilities + // static abilities + Map staticAbilities = ServerAbilities.getStaticAbilities(); + // all function server can support + Set abilityKeys = staticAbilities.keySet(); + Map abilityTable = new HashMap<>(abilityKeys.size()); + // if not define in config, then load from ServerAbilities + Set unIncludedInConfig = new HashSet<>(); + abilityKeys.forEach(abilityKey -> { + String key = AbilityConfigs.PREFIX + abilityKey.getName(); + try { + Boolean property = EnvUtil.getProperty(key, Boolean.class); + // if not null + if (property != null) { + abilityTable.put(abilityKey, property); + } else { + unIncludedInConfig.add(abilityKey); + } + } catch (Exception e) { + // from ServerAbilities + unIncludedInConfig.add(abilityKey); + } + }); + // load from ServerAbilities + unIncludedInConfig.forEach(abilityKey -> abilityTable.put(abilityKey, staticAbilities.get(abilityKey))); + + res.put(AbilityMode.SERVER, abilityTable); + return res; + } + + /** + * init cluster client abilities. + */ + private Map initClusterClientAbilities() { + // static abilities + return ClusterClientAbilities.getStaticAbilities(); + } + + /** + * init sdk client abilities. + */ + private Map initSdkClientAbilities() { + // static abilities + return SdkClientAbilities.getStaticAbilities(); + } + + @Override + public int getPriority() { + return 1; + } + +} diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/Member.java b/core/src/main/java/com/alibaba/nacos/core/cluster/Member.java index 090cf9f41..7f5377629 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/Member.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/Member.java @@ -53,8 +53,11 @@ public class Member implements Comparable, Cloneable, Serializable { private transient int failAccessCnt = 0; + @Deprecated private ServerAbilities abilities = new ServerAbilities(); + private boolean grpcReportEnabled; + public Member() { String prefix = "nacos.core.member.meta."; extendInfo.put(MemberMetaDataConstants.SITE_KEY, @@ -65,10 +68,20 @@ public class Member implements Comparable, Cloneable, Serializable { .put(MemberMetaDataConstants.WEIGHT, EnvUtil.getProperty(prefix + MemberMetaDataConstants.WEIGHT, "1")); } + public boolean isGrpcReportEnabled() { + return grpcReportEnabled; + } + + public void setGrpcReportEnabled(boolean grpcReportEnabled) { + this.grpcReportEnabled = grpcReportEnabled; + } + + @Deprecated public ServerAbilities getAbilities() { return abilities; } + @Deprecated public void setAbilities(ServerAbilities abilities) { this.abilities = abilities; } diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtil.java b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtil.java index 9d6652543..97f8afcd0 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtil.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/MemberUtil.java @@ -66,6 +66,7 @@ public class MemberUtil { oldMember.setExtendInfo(newMember.getExtendInfo()); oldMember.setAddress(newMember.getAddress()); oldMember.setAbilities(newMember.getAbilities()); + oldMember.setGrpcReportEnabled(newMember.isGrpcReportEnabled()); } /** @@ -94,6 +95,8 @@ public class MemberUtil { extendInfo.put(MemberMetaDataConstants.RAFT_PORT, String.valueOf(calculateRaftPort(target))); extendInfo.put(MemberMetaDataConstants.READY_TO_UPGRADE, true); target.setExtendInfo(extendInfo); + // use grpc to report default + target.setGrpcReportEnabled(true); return target; } @@ -107,7 +110,10 @@ public class MemberUtil { if (member.getAbilities() == null || member.getAbilities().getRemoteAbility() == null) { return false; } - return member.getAbilities().getRemoteAbility().isSupportRemoteConnection(); + + boolean oldVerJudge = member.getAbilities().getRemoteAbility().isSupportRemoteConnection(); + + return member.isGrpcReportEnabled() || oldVerJudge; } public static int calculateRaftPort(Member member) { @@ -275,8 +281,9 @@ public class MemberUtil { if (!expected.getState().equals(actual.getState())) { return true; } - - if (!expected.getAbilities().equals(actual.getAbilities())) { + + // if change + if (expected.isGrpcReportEnabled() != actual.isGrpcReportEnabled()) { return true; } diff --git a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java index d9ccbd792..eeee8855c 100644 --- a/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/cluster/ServerMemberManager.java @@ -65,6 +65,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; +import com.alibaba.nacos.core.ability.control.ServerAbilityControlManager; import static com.alibaba.nacos.api.exception.NacosException.CLIENT_INVALID_PARAM; @@ -138,7 +139,7 @@ public class ServerMemberManager implements ApplicationListener(); EnvUtil.setContextPath(servletContext.getContextPath()); @@ -163,7 +164,8 @@ public class ServerMemberManager implements ApplicationListener members = ServerMemberManager.this.allMembersWithoutSelf(); @@ -556,7 +564,7 @@ public class ServerMemberManager implements ApplicationListener members = ServerMemberManager.this.allMembersWithoutSelf(); - + if (members.isEmpty()) { return; } @@ -683,11 +702,11 @@ public class ServerMemberManager implements ApplicationListener { if (starting) { diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/Connection.java b/core/src/main/java/com/alibaba/nacos/core/remote/Connection.java index 96eace5e0..190787c5e 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/Connection.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/Connection.java @@ -16,7 +16,6 @@ package com.alibaba.nacos.core.remote; -import com.alibaba.nacos.api.ability.ClientAbilities; import com.alibaba.nacos.api.remote.Requester; import java.util.Map; @@ -32,7 +31,7 @@ public abstract class Connection implements Requester { private boolean traced = false; - private ClientAbilities abilities; + private Map abilityTable; private final ConnectionMeta metaInfo; @@ -52,22 +51,12 @@ public abstract class Connection implements Requester { this.traced = traced; } - /** - * get abilities. - * - * @return - */ - public ClientAbilities getAbilities() { - return abilities; + public void setAbilityTable(Map abilityTable) { + this.abilityTable = abilityTable; } - /** - * set abilities. - * - * @param abilities abilities. - */ - public void setAbilities(ClientAbilities abilities) { - this.abilities = abilities; + public Map getAbilityTable() { + return this.abilityTable; } /** @@ -95,7 +84,7 @@ public abstract class Connection implements Requester { @Override public String toString() { - return "Connection{" + "traced=" + traced + ", abilities=" + abilities + ", metaInfo=" + metaInfo + '}'; + return "Connection{" + "traced=" + traced + ", abilities=" + abilityTable + ", metaInfo=" + metaInfo + '}'; } } diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java index ad4bf1ade..7eea24883 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java @@ -275,6 +275,7 @@ public class ConnectionManager { String[] split = redirectAddress.split(Constants.COLON); connectResetRequest.setServerIp(split[0]); connectResetRequest.setServerPort(split[1]); + connectResetRequest.setConnectionId(connectionId); } try { connection.request(connectResetRequest, 3000L); diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcBiStreamRequestAcceptor.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcBiStreamRequestAcceptor.java index 8eed5f648..25af8c0a7 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcBiStreamRequestAcceptor.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcBiStreamRequestAcceptor.java @@ -16,11 +16,14 @@ package com.alibaba.nacos.core.remote.grpc; +import com.alibaba.nacos.api.ability.constant.AbilityMode; import com.alibaba.nacos.api.common.Constants; import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc; import com.alibaba.nacos.api.grpc.auto.Payload; import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest; +import com.alibaba.nacos.api.remote.request.SetupAckRequest; import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder; import com.alibaba.nacos.common.remote.ConnectionType; import com.alibaba.nacos.common.remote.client.grpc.GrpcUtils; import com.alibaba.nacos.core.remote.Connection; @@ -112,8 +115,13 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(), setUpRequest.getClientVersion(), appName, setUpRequest.getLabels()); metaInfo.setTenant(setUpRequest.getTenant()); - Connection connection = new GrpcConnection(metaInfo, responseObserver, GrpcServerConstants.CONTEXT_KEY_CHANNEL.get()); - connection.setAbilities(setUpRequest.getAbilities()); + Connection connection = new GrpcConnection(metaInfo, responseObserver, + GrpcServerConstants.CONTEXT_KEY_CHANNEL.get()); + // null if supported + if (setUpRequest.getAbilityTable() != null) { + // map to table + connection.setAbilityTable(setUpRequest.getAbilityTable()); + } boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted(); if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) { @@ -129,6 +137,15 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt .warn("[{}]Send connect reset request error,error={}", connectionId, e); } } + } else { + try { + // finish register, tell client has set up successfully + connection.request(new SetupAckRequest(NacosAbilityManagerHolder.getInstance() + .getCurrentNodeAbilities(AbilityMode.SERVER)), 3000L); + } catch (Exception e) { + // nothing to do + + } } } else if (parseObj instanceof Response) { diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestAcceptor.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestAcceptor.java index 6354cd0a2..19dcad954 100644 --- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestAcceptor.java +++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestAcceptor.java @@ -86,10 +86,10 @@ public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase { responseObserver.onCompleted(); return; } - + // server check. if (ServerCheckRequest.class.getSimpleName().equals(type)) { - Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get())); + Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get(), true)); traceIfNecessary(serverCheckResponseP, false); responseObserver.onNext(serverCheckResponseP); responseObserver.onCompleted(); @@ -165,6 +165,7 @@ public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase { requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get()); requestMeta.setClientVersion(connection.getMetaInfo().getVersion()); requestMeta.setLabels(connection.getMetaInfo().getLabels()); + requestMeta.setAbilityTable(connection.getAbilityTable()); connectionManager.refreshActiveTime(requestMeta.getConnectionId()); Response response = requestHandler.handleRequest(request, requestMeta); Payload payloadResponse = GrpcUtils.convert(response); diff --git a/core/src/main/resources/META-INF/services/com.alibaba.nacos.common.ability.AbstractAbilityControlManager b/core/src/main/resources/META-INF/services/com.alibaba.nacos.common.ability.AbstractAbilityControlManager new file mode 100644 index 000000000..f2545ddc3 --- /dev/null +++ b/core/src/main/resources/META-INF/services/com.alibaba.nacos.common.ability.AbstractAbilityControlManager @@ -0,0 +1,18 @@ +# +# Copyright 1999-2022 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +com.alibaba.nacos.core.ability.control.ServerAbilityControlManager \ No newline at end of file diff --git a/core/src/test/java/com/alibaba/nacos/core/ability/AbilityControlManagerTest.java b/core/src/test/java/com/alibaba/nacos/core/ability/AbilityControlManagerTest.java new file mode 100644 index 000000000..2ef69de74 --- /dev/null +++ b/core/src/test/java/com/alibaba/nacos/core/ability/AbilityControlManagerTest.java @@ -0,0 +1,66 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.ability; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.constant.AbilityMode; +import com.alibaba.nacos.api.ability.constant.AbilityStatus; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +@SpringBootTest +public class AbilityControlManagerTest { + + private TestServerAbilityControlManager serverAbilityControlManager = new TestServerAbilityControlManager(); + + @Before + public void inject() { + Map newTable = new HashMap<>(); + newTable.put(AbilityKey.SERVER_TEST_1.getName(), true); + serverAbilityControlManager.setCurrentSupportingAbility(newTable); + } + + @Test + public void testCurrentNodeAbility() { + Set keySet = serverAbilityControlManager.getCurrentNodeAbilities(AbilityMode.SERVER).keySet(); + // diable all + keySet.forEach(key -> serverAbilityControlManager + .disableCurrentNodeAbility(AbilityKey.getEnum(AbilityMode.SERVER, key))); + // get all + keySet.forEach(key -> { + Assert.assertNotEquals(serverAbilityControlManager + .isCurrentNodeAbilityRunning(AbilityKey.getEnum(AbilityMode.SERVER, key)), AbilityStatus.SUPPORTED); + }); + // enable all + keySet.forEach(key -> serverAbilityControlManager + .enableCurrentNodeAbility(AbilityKey.getEnum(AbilityMode.SERVER, key))); + // get all + keySet.forEach(key -> { + Assert.assertEquals(serverAbilityControlManager + .isCurrentNodeAbilityRunning(AbilityKey.getEnum(AbilityMode.SERVER, key)), AbilityStatus.SUPPORTED); + }); + } + +} + + diff --git a/core/src/test/java/com/alibaba/nacos/core/ability/TestServerAbilityControlManager.java b/core/src/test/java/com/alibaba/nacos/core/ability/TestServerAbilityControlManager.java new file mode 100644 index 000000000..087b94ef1 --- /dev/null +++ b/core/src/test/java/com/alibaba/nacos/core/ability/TestServerAbilityControlManager.java @@ -0,0 +1,33 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.ability; + +import com.alibaba.nacos.api.ability.constant.AbilityMode; +import com.alibaba.nacos.common.JustForTest; +import com.alibaba.nacos.core.ability.control.ServerAbilityControlManager; + +import java.util.Map; + +public class TestServerAbilityControlManager extends ServerAbilityControlManager { + + @JustForTest + public void setCurrentSupportingAbility(Map ability) { + currentNodeAbilities.get(AbilityMode.SERVER).clear(); + currentNodeAbilities.get(AbilityMode.SERVER).putAll(ability); + } + +} diff --git a/core/src/test/java/com/alibaba/nacos/core/ability/config/AbilityConfigsTest.java b/core/src/test/java/com/alibaba/nacos/core/ability/config/AbilityConfigsTest.java new file mode 100644 index 000000000..ccca0bb2c --- /dev/null +++ b/core/src/test/java/com/alibaba/nacos/core/ability/config/AbilityConfigsTest.java @@ -0,0 +1,134 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.ability.config; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.constant.AbilityStatus; +import com.alibaba.nacos.api.ability.register.AbstractAbilityRegistry; +import com.alibaba.nacos.api.ability.register.impl.ServerAbilities; +import com.alibaba.nacos.common.event.ServerConfigChangeEvent; +import com.alibaba.nacos.core.ability.TestServerAbilityControlManager; +import com.alibaba.nacos.core.ability.control.ServerAbilityControlManager; +import com.alibaba.nacos.sys.env.EnvUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.springframework.mock.env.MockEnvironment; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +/** + * test for ability in config. + * + * @author Daydreamer + * @date 2022/9/3 12:27 + **/ +public class AbilityConfigsTest { + + private MockEnvironment environment; + + private TestAbilityConfig abilityConfigs; + + private ServerAbilityControlManager serverAbilityControlManager; + + private Map currentAbilities; + + @Before + public void setUp() throws Exception { + environment = new MockEnvironment(); + EnvUtil.setEnvironment(environment); + abilityConfigs = new TestAbilityConfig(); + inject(abilityConfigs); + serverAbilityControlManager.enableCurrentNodeAbility(AbilityKey.SERVER_TEST_1); + serverAbilityControlManager.enableCurrentNodeAbility(AbilityKey.SERVER_TEST_2); + } + + void inject(AbilityConfigs abilityConfigs) { + TestServerAbilityControlManager serverAbilityControlManager = new TestServerAbilityControlManager(); + Map newTable = new HashMap<>(); + newTable.put(AbilityKey.SERVER_TEST_1.getName(), true); + newTable.put(AbilityKey.SERVER_TEST_2.getName(), true); + serverAbilityControlManager.setCurrentSupportingAbility(newTable); + abilityConfigs.setAbilityHandlerRegistry(serverAbilityControlManager); + this.serverAbilityControlManager = serverAbilityControlManager; + } + + /** + * fill field. + * + * @throws Exception ignore + */ + public void fill() throws Exception { + Field instanceField = ServerAbilities.class.getDeclaredField("INSTANCE"); + Field abilitiesField = AbstractAbilityRegistry.class.getDeclaredField("supportedAbilities"); + abilitiesField.setAccessible(true); + instanceField.setAccessible(true); + ServerAbilities serverAbilities = (ServerAbilities) instanceField.get(ServerAbilities.class); + currentAbilities = (Map) abilitiesField.get(serverAbilities); + currentAbilities.put(AbilityKey.SERVER_TEST_1, true); + currentAbilities.put(AbilityKey.SERVER_TEST_2, true); + } + + @Test + public void testLoadAbilities() throws Exception { + environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_1.getName(), Boolean.TRUE.toString()); + environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_2.getName(), Boolean.FALSE.toString()); + // test load + fill(); + ServerAbilityControlManager manager = new ServerAbilityControlManager(); + // config has higher priority + Assert.assertEquals(manager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED); + Assert.assertNotEquals(manager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED); + // clear + currentAbilities.clear(); + } + + @Test + public void testInit() { + Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED); + Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED); + } + + @Test + public void testConfigChange() throws InterruptedException { + // test no change + environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_1.getName(), Boolean.TRUE.toString()); + environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_2.getName(), Boolean.TRUE.toString()); + abilityConfigs.onEvent(new ServerConfigChangeEvent()); + Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED); + Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED); + + // test change + environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_1.getName(), Boolean.FALSE.toString()); + abilityConfigs.onEvent(new ServerConfigChangeEvent()); + Assert.assertNotEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED); + Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED); + + environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_1.getName(), Boolean.TRUE.toString()); + abilityConfigs.onEvent(new ServerConfigChangeEvent()); + Assert.assertEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED); + + environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_1.getName(), Boolean.FALSE.toString()); + environment.setProperty(AbilityConfigs.PREFIX + AbilityKey.SERVER_TEST_2.getName(), Boolean.FALSE.toString()); + abilityConfigs.onEvent(new ServerConfigChangeEvent()); + Assert.assertNotEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED); + Assert.assertNotEquals(serverAbilityControlManager.isCurrentNodeAbilityRunning(AbilityKey.SERVER_TEST_2), AbilityStatus.SUPPORTED); + } + +} diff --git a/core/src/test/java/com/alibaba/nacos/core/ability/config/TestAbilityConfig.java b/core/src/test/java/com/alibaba/nacos/core/ability/config/TestAbilityConfig.java new file mode 100644 index 000000000..446d2630b --- /dev/null +++ b/core/src/test/java/com/alibaba/nacos/core/ability/config/TestAbilityConfig.java @@ -0,0 +1,35 @@ +/* + * Copyright 1999-2022 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.ability.config; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; + +import java.util.Set; + +/**. + * @author Daydreamer + * @description Dynamically load ability from config. just for test + * @date 2022/8/31 12:27 + **/ +public class TestAbilityConfig extends AbilityConfigs { + + public TestAbilityConfig() { + Set serverAbilityKeys = super.getServerAbilityKeys(); + serverAbilityKeys.add(AbilityKey.SERVER_TEST_1); + serverAbilityKeys.add(AbilityKey.SERVER_TEST_2); + } +} diff --git a/core/src/test/java/com/alibaba/nacos/core/cluster/MemberUtilTest.java b/core/src/test/java/com/alibaba/nacos/core/cluster/MemberUtilTest.java index c1750dbee..679e5ada8 100644 --- a/core/src/test/java/com/alibaba/nacos/core/cluster/MemberUtilTest.java +++ b/core/src/test/java/com/alibaba/nacos/core/cluster/MemberUtilTest.java @@ -243,7 +243,7 @@ public class MemberUtilTest { @Test public void testIsBasicInfoChangedForChangedAbilities() { Member newMember = buildMember(); - newMember.getAbilities().getRemoteAbility().setSupportRemoteConnection(true); + newMember.setGrpcReportEnabled(true); assertTrue(MemberUtil.isBasicInfoChanged(newMember, originalMember)); } diff --git a/test/core-test/src/test/java/com/alibaba/nacos/test/ability/AbilityDiscovery.java b/test/core-test/src/test/java/com/alibaba/nacos/test/ability/AbilityDiscovery.java new file mode 100644 index 000000000..22a711d1c --- /dev/null +++ b/test/core-test/src/test/java/com/alibaba/nacos/test/ability/AbilityDiscovery.java @@ -0,0 +1,293 @@ +/* + * Copyright 1999-2023 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.test.ability; + +import com.alibaba.nacos.Nacos; +import com.alibaba.nacos.api.NacosFactory; +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.constant.AbilityStatus; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest; +import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.remote.request.Request; +import com.alibaba.nacos.api.remote.request.RequestMeta; +import com.alibaba.nacos.api.remote.request.SetupAckRequest; +import com.alibaba.nacos.api.remote.response.Response; +import com.alibaba.nacos.common.ability.AbstractAbilityControlManager; +import com.alibaba.nacos.common.ability.discover.NacosAbilityManagerHolder; +import com.alibaba.nacos.common.remote.ConnectionType; +import com.alibaba.nacos.common.remote.client.Connection; +import com.alibaba.nacos.common.remote.client.RpcClient; +import com.alibaba.nacos.common.remote.client.RpcClientFactory; +import com.alibaba.nacos.common.remote.client.ServerListFactory; +import com.alibaba.nacos.common.remote.client.ServerRequestHandler; +import com.alibaba.nacos.core.remote.ConnectionManager; +import com.alibaba.nacos.core.remote.RequestFilters; +import com.alibaba.nacos.core.remote.RequestHandler; +import com.alibaba.nacos.core.remote.RequestHandlerRegistry; +import com.alibaba.nacos.test.ability.component.TestServerAbilityControlManager; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.test.context.junit4.SpringRunner; + +import javax.annotation.Resource; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = Nacos.class, properties = { + "server.servlet.context-path=/nacos"}, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT) +@SuppressWarnings("all") +public class AbilityDiscovery { + + @LocalServerPort + private int port; + + @Resource + private RequestHandlerRegistry requestHandlerRegistry; + + @Resource + private RequestFilters filters; + + @Resource + private ConnectionManager connectionManager; + + private RpcClient client; + + private RpcClient clusterClient; + + private ConfigService configService; + + private AbstractAbilityControlManager oldInstance; + + /** + * test server judge client abilities + */ + private volatile boolean serverSuccess = false; + + private volatile boolean clientSuccess = false; + + private volatile boolean clusterSuccess = false; + + private Field abstractAbilityControlManager; + + private Field registryHandlerFields; + + private Field serverReuqestHandlersField; + + private Field currentConnField; + + @Before + public void setup() throws NoSuchFieldException, IllegalAccessException, NacosException { + // load class + oldInstance = NacosAbilityManagerHolder.getInstance(); + + // replace + abstractAbilityControlManager = NacosAbilityManagerHolder.class + .getDeclaredField("abstractAbilityControlManager"); + abstractAbilityControlManager.setAccessible(true); + abstractAbilityControlManager.set(NacosAbilityManagerHolder.class, new TestServerAbilityControlManager()); + + // get registry field + registryHandlerFields = RequestHandlerRegistry.class.getDeclaredField("registryHandlers"); + registryHandlerFields.setAccessible(true); + + // currentConn + currentConnField = RpcClient.class.getDeclaredField("currentConnection"); + currentConnField.setAccessible(true); + + // init config service + Properties properties = new Properties(); + properties.put(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:" + port); + configService = NacosFactory.createConfigService(properties); + + // server request handler + serverReuqestHandlersField = RpcClient.class.getDeclaredField("serverRequestHandlers"); + serverReuqestHandlersField.setAccessible(true); + + // init client + client = RpcClientFactory.createClient(UUID.randomUUID().toString(), ConnectionType.GRPC, new HashMap<>()); + client.serverListFactory(new ServerListFactory() { + @Override + public String genNextServer() { + return "127.0.0.1:" + port; + } + + @Override + public String getCurrentServer() { + return "127.0.0.1:" + port; + } + + @Override + public List getServerList() { + return Collections.singletonList("127.0.0.1:" + port); + } + }); + // connect to server + client.start(); + + clusterClient = RpcClientFactory.createClusterClient(UUID.randomUUID().toString(), ConnectionType.GRPC, new HashMap<>()); + clusterClient.serverListFactory(new ServerListFactory() { + @Override + public String genNextServer() { + return "127.0.0.1:" + port; + } + + @Override + public String getCurrentServer() { + return "127.0.0.1:" + port; + } + + @Override + public List getServerList() { + return Collections.singletonList("127.0.0.1:" + port); + } + }); + // connect to server + clusterClient.start(); + } + + @Test + public void testClientDiscovery() throws NacosException { + // client judge ability + Assert.assertEquals(client.getConnectionAbility(AbilityKey.SERVER_TEST_1), AbilityStatus.SUPPORTED); + Assert.assertEquals(client.getConnectionAbility(AbilityKey.SERVER_TEST_2), AbilityStatus.NOT_SUPPORTED); + } + + @Test + public void testServerDiscoveryAndJudge() throws Exception { + Map handlers = (Map) registryHandlerFields + .get(requestHandlerRegistry); + + // set handler + RequestHandler oldRequestHandler = handlers.remove(ConfigQueryRequest.class.getSimpleName()); + handlers.put(ConfigQueryRequest.class.getSimpleName(), new ClientRequestHandler(filters)); + configService.getConfig("test", "DEFAULT_GROUP", 2000); + // wait server invoke + Thread.sleep(3000); + Assert.assertTrue(serverSuccess); + // recover + handlers.remove(ConfigQueryRequest.class.getSimpleName()); + handlers.put(ConfigQueryRequest.class.getSimpleName(), oldRequestHandler); + } + + @Test + public void testClientJudge() throws Exception { + List handlers = (List) serverReuqestHandlersField.get(client); + handlers.clear(); + // register + client.registerServerRequestHandler(new ServerRequestHandler() { + @Override + public Response requestReply(Request request, Connection connection) { + if (connection.getConnectionAbility(AbilityKey.SERVER_TEST_1).equals(AbilityStatus.SUPPORTED) && connection + .getConnectionAbility(AbilityKey.SERVER_TEST_2).equals(AbilityStatus.NOT_SUPPORTED)) { + clientSuccess = true; + } + return new Response(){}; + } + }); + + // get id + Connection conn = (Connection) currentConnField.get(client); + + com.alibaba.nacos.core.remote.Connection connection = connectionManager.getConnection(conn.getConnectionId()); + try { + connection.request(new SetupAckRequest(), 2000L); + } catch (NacosException e) { + // nothing to do + } + + // wait client react + Thread.sleep(4000); + Assert.assertTrue(clientSuccess); + } + + @Test + public void testClusterClient() throws IllegalAccessException, NacosException, InterruptedException, NoSuchFieldException { + Map handlers = (Map) registryHandlerFields + .get(requestHandlerRegistry); + + // set handler + RequestHandler oldRequestHandler = handlers.remove(ConfigQueryRequest.class.getSimpleName()); + handlers.put(ConfigQueryRequest.class.getSimpleName(), new ClusterClientRequestHandler(filters)); + configService.getConfig("test", "DEFAULT_GROUP", 2000); + // wait server invoke + Thread.sleep(3000); + Assert.assertTrue(clusterSuccess); + // recover + handlers.remove(ConfigQueryRequest.class.getSimpleName()); + handlers.put(ConfigQueryRequest.class.getSimpleName(), oldRequestHandler); + } + + @After + public void recover() throws IllegalAccessException, NacosException { + abstractAbilityControlManager.set(NacosAbilityManagerHolder.class, oldInstance); + client.shutdown(); + } + + /** + * just to test ability + */ + class ClientRequestHandler extends RequestHandler { + + public ClientRequestHandler(RequestFilters requestFilters) throws NoSuchFieldException, IllegalAccessException { + Field declaredField = RequestHandler.class.getDeclaredField("requestFilters"); + declaredField.setAccessible(true); + declaredField.set(this, requestFilters); + } + + @Override + public ConfigQueryResponse handle(ConfigQueryRequest request, RequestMeta meta) throws NacosException { + if (meta.getConnectionAbility(AbilityKey.SDK_CLIENT_TEST_1).equals(AbilityStatus.SUPPORTED)) { + serverSuccess = true; + } + return new ConfigQueryResponse(); + } + } + + /** + * just to test ability. + */ + class ClusterClientRequestHandler extends RequestHandler { + + public ClusterClientRequestHandler(RequestFilters requestFilters) throws NoSuchFieldException, IllegalAccessException { + Field declaredField = RequestHandler.class.getDeclaredField("requestFilters"); + declaredField.setAccessible(true); + declaredField.set(this, requestFilters); + } + + @Override + public ConfigQueryResponse handle(ConfigQueryRequest request, RequestMeta meta) throws NacosException { + if (meta.getConnectionAbility(AbilityKey.CLUSTER_CLIENT_TEST_1).equals(AbilityStatus.SUPPORTED)) { + clusterSuccess = true; + } + return new ConfigQueryResponse(); + } + } +} diff --git a/test/core-test/src/test/java/com/alibaba/nacos/test/ability/component/TestServerAbilityControlManager.java b/test/core-test/src/test/java/com/alibaba/nacos/test/ability/component/TestServerAbilityControlManager.java new file mode 100644 index 000000000..fccc8b22a --- /dev/null +++ b/test/core-test/src/test/java/com/alibaba/nacos/test/ability/component/TestServerAbilityControlManager.java @@ -0,0 +1,45 @@ +/* + * Copyright 1999-2023 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.test.ability.component; + +import com.alibaba.nacos.api.ability.constant.AbilityKey; +import com.alibaba.nacos.api.ability.constant.AbilityMode; +import com.alibaba.nacos.core.ability.control.ServerAbilityControlManager; + +import java.util.HashMap; +import java.util.Map; + +public class TestServerAbilityControlManager extends ServerAbilityControlManager { + + @Override + protected Map> initCurrentNodeAbilities() { + Map map = new HashMap<>(); + map.put(AbilityKey.SERVER_TEST_1, true); + map.put(AbilityKey.SERVER_TEST_2, false); + HashMap res = new HashMap<>(); + res.put(AbilityMode.SERVER, map); + + Map map1 = new HashMap<>(); + map1.put(AbilityKey.SDK_CLIENT_TEST_1, true); + res.put(AbilityMode.SDK_CLIENT, map1); + + Map map2 = new HashMap<>(); + map2.put(AbilityKey.CLUSTER_CLIENT_TEST_1, true); + res.put(AbilityMode.CLUSTER_CLIENT, map2); + return res; + } +}