Add ClientSyncAttributes to sync some client metadata to other server. (#4961)
This commit is contained in:
parent
2315a745d3
commit
d9e21cfa78
@ -125,7 +125,7 @@ public class DistroClientDataProcessor extends SmartSubscriber implements Distro
|
||||
}
|
||||
|
||||
private void handlerClientSyncData(ClientSyncData clientSyncData) {
|
||||
clientManager.syncClientConnected(clientSyncData.getClientId());
|
||||
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
|
||||
Client client = clientManager.getClient(clientSyncData.getClientId());
|
||||
upgradeClient(client, clientSyncData);
|
||||
}
|
||||
|
@ -23,6 +23,8 @@ package com.alibaba.nacos.naming.constants;
|
||||
*/
|
||||
public class ClientConstants {
|
||||
|
||||
public static final String CONNECTION_TYPE = "connectionType";
|
||||
|
||||
public static final String DEFAULT_FACTORY = "default";
|
||||
|
||||
public static final String EPHEMERAL_IP_PORT = "ephemeralIpPort";
|
||||
|
@ -0,0 +1,76 @@
|
||||
/*
|
||||
* Copyright 1999-2020 Alibaba Group Holding Ltd.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.alibaba.nacos.naming.core.v2.client;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Client sync data attributes.
|
||||
*
|
||||
* @author xiweng.yy
|
||||
*/
|
||||
public class ClientSyncAttributes implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -5794675800507288793L;
|
||||
|
||||
private final Map<String, Object> clientAttributes;
|
||||
|
||||
public ClientSyncAttributes() {
|
||||
this.clientAttributes = new HashMap<>(1);
|
||||
}
|
||||
|
||||
public void addClientAttribute(String key, Object value) {
|
||||
clientAttributes.put(key, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get client attribute.
|
||||
*
|
||||
* @param key attribute key.
|
||||
* @param <T> Expected type of attribute.
|
||||
* @return client attribute, if not exist or type can't case, return {@code null}
|
||||
*/
|
||||
public <T> T getClientAttribute(String key) {
|
||||
try {
|
||||
return (T) clientAttributes.get(key);
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get client attribute.
|
||||
*
|
||||
* @param key attribute key.
|
||||
* @param <T> Expected type of attribute.
|
||||
* @param defaultValue default value when not exist or type can't case
|
||||
* @return client attribute, if not exist or type can't case, return defaultValue
|
||||
*/
|
||||
public <T> T getClientAttribute(String key, T defaultValue) {
|
||||
Object result = clientAttributes.get(key);
|
||||
if (null == result) {
|
||||
return defaultValue;
|
||||
}
|
||||
try {
|
||||
return (T) result;
|
||||
} catch (Exception e) {
|
||||
return defaultValue;
|
||||
}
|
||||
}
|
||||
}
|
@ -32,6 +32,8 @@ public class ClientSyncData implements Serializable {
|
||||
|
||||
private String clientId;
|
||||
|
||||
private ClientSyncAttributes attributes;
|
||||
|
||||
private List<String> namespaces;
|
||||
|
||||
private List<String> groupNames;
|
||||
@ -50,6 +52,7 @@ public class ClientSyncData implements Serializable {
|
||||
this.groupNames = groupNames;
|
||||
this.serviceNames = serviceNames;
|
||||
this.instancePublishInfos = instancePublishInfos;
|
||||
this.attributes = new ClientSyncAttributes();
|
||||
}
|
||||
|
||||
public String getClientId() {
|
||||
@ -91,4 +94,12 @@ public class ClientSyncData implements Serializable {
|
||||
public void setInstancePublishInfos(List<InstancePublishInfo> instancePublishInfos) {
|
||||
this.instancePublishInfos = instancePublishInfos;
|
||||
}
|
||||
|
||||
public ClientSyncAttributes getAttributes() {
|
||||
return attributes;
|
||||
}
|
||||
|
||||
public void setAttributes(ClientSyncAttributes attributes) {
|
||||
this.attributes = attributes;
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package com.alibaba.nacos.naming.core.v2.client.factory;
|
||||
|
||||
import com.alibaba.nacos.naming.core.v2.client.Client;
|
||||
import com.alibaba.nacos.naming.core.v2.client.ClientSyncAttributes;
|
||||
|
||||
/**
|
||||
* Client factory.
|
||||
@ -43,8 +44,9 @@ public interface ClientFactory<C extends Client> {
|
||||
/**
|
||||
* Build a new {@link Client} synced from other server node.
|
||||
*
|
||||
* @param clientId client id
|
||||
* @param clientId client id
|
||||
* @param attributes client attributes
|
||||
* @return new sync {@link Client} implementation
|
||||
*/
|
||||
C newSyncedClient(String clientId);
|
||||
C newSyncedClient(String clientId, ClientSyncAttributes attributes);
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package com.alibaba.nacos.naming.core.v2.client.factory;
|
||||
|
||||
import com.alibaba.nacos.common.spi.NacosServiceLoader;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
import com.alibaba.nacos.naming.constants.ClientConstants;
|
||||
import com.alibaba.nacos.naming.misc.Loggers;
|
||||
|
||||
@ -56,7 +57,7 @@ public class ClientFactoryHolder {
|
||||
* @return target type {@link ClientFactory}, if not fount, return 'default' client factory.
|
||||
*/
|
||||
public ClientFactory findClientFactory(String type) {
|
||||
if (!clientFactories.containsKey(type)) {
|
||||
if (StringUtils.isEmpty(type) || !clientFactories.containsKey(type)) {
|
||||
return clientFactories.get(ClientConstants.DEFAULT_FACTORY);
|
||||
}
|
||||
return clientFactories.get(type);
|
||||
|
@ -17,6 +17,7 @@
|
||||
package com.alibaba.nacos.naming.core.v2.client.factory.impl;
|
||||
|
||||
import com.alibaba.nacos.naming.constants.ClientConstants;
|
||||
import com.alibaba.nacos.naming.core.v2.client.ClientSyncAttributes;
|
||||
import com.alibaba.nacos.naming.core.v2.client.factory.ClientFactory;
|
||||
import com.alibaba.nacos.naming.core.v2.client.impl.ConnectionBasedClient;
|
||||
|
||||
@ -38,7 +39,7 @@ public class ConnectionBasedClientFactory implements ClientFactory<ConnectionBas
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectionBasedClient newSyncedClient(String clientId) {
|
||||
public ConnectionBasedClient newSyncedClient(String clientId, ClientSyncAttributes attributes) {
|
||||
return new ConnectionBasedClient(clientId, false);
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package com.alibaba.nacos.naming.core.v2.client.factory.impl;
|
||||
|
||||
import com.alibaba.nacos.naming.constants.ClientConstants;
|
||||
import com.alibaba.nacos.naming.core.v2.client.ClientSyncAttributes;
|
||||
import com.alibaba.nacos.naming.core.v2.client.factory.ClientFactory;
|
||||
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
|
||||
|
||||
@ -38,7 +39,7 @@ public class EphemeralIpPortClientFactory implements ClientFactory<IpPortBasedCl
|
||||
}
|
||||
|
||||
@Override
|
||||
public IpPortBasedClient newSyncedClient(String clientId) {
|
||||
public IpPortBasedClient newSyncedClient(String clientId, ClientSyncAttributes attributes) {
|
||||
return new IpPortBasedClient(clientId, true);
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package com.alibaba.nacos.naming.core.v2.client.factory.impl;
|
||||
|
||||
import com.alibaba.nacos.naming.constants.ClientConstants;
|
||||
import com.alibaba.nacos.naming.core.v2.client.ClientSyncAttributes;
|
||||
import com.alibaba.nacos.naming.core.v2.client.factory.ClientFactory;
|
||||
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
|
||||
|
||||
@ -38,7 +39,7 @@ public class PersistentIpPortClientFactory implements ClientFactory<IpPortBasedC
|
||||
}
|
||||
|
||||
@Override
|
||||
public IpPortBasedClient newSyncedClient(String clientId) {
|
||||
public IpPortBasedClient newSyncedClient(String clientId, ClientSyncAttributes attributes) {
|
||||
return new IpPortBasedClient(clientId, false);
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package com.alibaba.nacos.naming.core.v2.client.manager;
|
||||
|
||||
import com.alibaba.nacos.naming.core.v2.client.Client;
|
||||
import com.alibaba.nacos.naming.core.v2.client.ClientSyncAttributes;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
@ -38,10 +39,11 @@ public interface ClientManager {
|
||||
/**
|
||||
* New sync client connected.
|
||||
*
|
||||
* @param clientId synced client id
|
||||
* @param clientId synced client id
|
||||
* @param attributes client sync attributes, which can help create sync client
|
||||
* @return true if add successfully, otherwise false
|
||||
*/
|
||||
boolean syncClientConnected(String clientId);
|
||||
boolean syncClientConnected(String clientId, ClientSyncAttributes attributes);
|
||||
|
||||
/**
|
||||
* Client disconnected.
|
||||
|
@ -17,6 +17,7 @@
|
||||
package com.alibaba.nacos.naming.core.v2.client.manager;
|
||||
|
||||
import com.alibaba.nacos.naming.core.v2.client.Client;
|
||||
import com.alibaba.nacos.naming.core.v2.client.ClientSyncAttributes;
|
||||
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
|
||||
import com.alibaba.nacos.naming.core.v2.client.manager.impl.ConnectionBasedClientManager;
|
||||
import com.alibaba.nacos.naming.core.v2.client.manager.impl.EphemeralIpPortClientManager;
|
||||
@ -54,8 +55,8 @@ public class ClientManagerDelegate implements ClientManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean syncClientConnected(String clientId) {
|
||||
return getClientManagerById(clientId).syncClientConnected(clientId);
|
||||
public boolean syncClientConnected(String clientId, ClientSyncAttributes attributes) {
|
||||
return getClientManagerById(clientId).syncClientConnected(clientId, attributes);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -21,7 +21,9 @@ import com.alibaba.nacos.api.remote.RemoteConstants;
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.core.remote.ClientConnectionEventListener;
|
||||
import com.alibaba.nacos.core.remote.Connection;
|
||||
import com.alibaba.nacos.naming.constants.ClientConstants;
|
||||
import com.alibaba.nacos.naming.core.v2.client.Client;
|
||||
import com.alibaba.nacos.naming.core.v2.client.ClientSyncAttributes;
|
||||
import com.alibaba.nacos.naming.core.v2.client.factory.ClientFactory;
|
||||
import com.alibaba.nacos.naming.core.v2.client.factory.ClientFactoryHolder;
|
||||
import com.alibaba.nacos.naming.core.v2.client.impl.ConnectionBasedClient;
|
||||
@ -71,8 +73,10 @@ public class ConnectionBasedClientManager extends ClientConnectionEventListener
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean syncClientConnected(String clientId) {
|
||||
return clientConnected(new ConnectionBasedClient(clientId, false));
|
||||
public boolean syncClientConnected(String clientId, ClientSyncAttributes attributes) {
|
||||
String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE);
|
||||
ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);
|
||||
return clientConnected(clientFactory.newSyncedClient(clientId, attributes));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -21,6 +21,7 @@ import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.naming.constants.ClientConstants;
|
||||
import com.alibaba.nacos.naming.core.DistroMapper;
|
||||
import com.alibaba.nacos.naming.core.v2.client.Client;
|
||||
import com.alibaba.nacos.naming.core.v2.client.ClientSyncAttributes;
|
||||
import com.alibaba.nacos.naming.core.v2.client.factory.ClientFactory;
|
||||
import com.alibaba.nacos.naming.core.v2.client.factory.ClientFactoryHolder;
|
||||
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
|
||||
@ -69,8 +70,8 @@ public class EphemeralIpPortClientManager implements ClientManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean syncClientConnected(String clientId) {
|
||||
return clientConnected(clientFactory.newSyncedClient(clientId));
|
||||
public boolean syncClientConnected(String clientId, ClientSyncAttributes attributes) {
|
||||
return clientConnected(clientFactory.newSyncedClient(clientId, attributes));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -18,6 +18,7 @@ package com.alibaba.nacos.naming.core.v2.client.manager.impl;
|
||||
|
||||
import com.alibaba.nacos.common.notify.NotifyCenter;
|
||||
import com.alibaba.nacos.naming.core.v2.client.Client;
|
||||
import com.alibaba.nacos.naming.core.v2.client.ClientSyncAttributes;
|
||||
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
|
||||
import com.alibaba.nacos.naming.core.v2.client.manager.ClientManager;
|
||||
import com.alibaba.nacos.naming.core.v2.event.client.ClientEvent;
|
||||
@ -53,7 +54,7 @@ public class PersistentIpPortClientManager implements ClientManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean syncClientConnected(String clientId) {
|
||||
public boolean syncClientConnected(String clientId, ClientSyncAttributes attributes) {
|
||||
throw new UnsupportedOperationException("");
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@ package com.alibaba.nacos.naming.core.v2.client.manager.impl;
|
||||
|
||||
import com.alibaba.nacos.naming.core.DistroMapper;
|
||||
import com.alibaba.nacos.naming.core.v2.client.Client;
|
||||
import com.alibaba.nacos.naming.core.v2.client.ClientSyncAttributes;
|
||||
import com.alibaba.nacos.naming.core.v2.client.impl.IpPortBasedClient;
|
||||
import com.alibaba.nacos.naming.misc.SwitchDomain;
|
||||
import org.junit.Before;
|
||||
@ -49,6 +50,9 @@ public class EphemeralIpPortClientManagerTest {
|
||||
@Mock
|
||||
private SwitchDomain switchDomain;
|
||||
|
||||
@Mock
|
||||
private ClientSyncAttributes attributes;
|
||||
|
||||
EphemeralIpPortClientManager ephemeralIpPortClientManager;
|
||||
|
||||
@Before
|
||||
@ -56,7 +60,7 @@ public class EphemeralIpPortClientManagerTest {
|
||||
ephemeralIpPortClientManager = new EphemeralIpPortClientManager(distroMapper, switchDomain);
|
||||
when(client.getClientId()).thenReturn(ephemeralIpPortId);
|
||||
ephemeralIpPortClientManager.clientConnected(client);
|
||||
ephemeralIpPortClientManager.syncClientConnected(syncedClientId);
|
||||
ephemeralIpPortClientManager.syncClientConnected(syncedClientId, attributes);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -80,4 +84,4 @@ public class EphemeralIpPortClientManagerTest {
|
||||
String unUsedClientId = "127.0.0.1:8888#true";
|
||||
assertFalse(ephemeralIpPortClientManager.contains(unUsedClientId));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user