Nacos config support gRPC (#3334)

* Add gprc support-> client side code submit ,and create server side naming module abstract handler template.

* Add gprc support-> config data change  notify code submit....

* Add gprc support-> serve side healthy check code submit and solve some checkstyle problems.

* Add gprc support-> 1.config client worker ,cancel long polling task  2. test add listener,remove listener in config module  3.add response resolve
This commit is contained in:
nov.lzf 2020-07-15 13:56:29 +08:00 committed by GitHub
parent bbbeabe1e8
commit 139b211e21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 2248 additions and 1407 deletions

View File

@ -15,13 +15,12 @@
*/
package com.alibaba.nacos.api.config.remote.request;
import com.alibaba.nacos.api.remote.request.ChangeListenRequest;
/**
* @author liuzunfei
* @version $Id: ConfigChangeListenRequest.java, v 0.1 2020年07月13日 9:01 PM liuzunfei Exp $
*/
public class ConfigChangeListenRequest extends ChangeListenRequest {
public class ConfigChangeListenRequest extends ConfigCommonRequest {
private static final String LISTEN="listen";

View File

@ -13,15 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.config.remote.request;
import com.alibaba.nacos.api.remote.request.CommonRequest;
import com.alibaba.nacos.api.remote.request.Request;
/**
* abstract request of config module request,all config module request should extends this class.
* @author liuzunfei
* @version $Id: ConfigCommonRequest.java, v 0.1 2020年07月13日 9:05 PM liuzunfei Exp $
*/
public abstract class ConfigCommonRequest extends CommonRequest {
public abstract class ConfigCommonRequest extends Request {
@Override
public String getModule() {

View File

@ -13,34 +13,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.config.remote.request;
/**
* request to query config content.
*
* @author liuzunfei
* @version $Id: ConfigQueryRequest.java, v 0.1 2020年07月13日 9:06 PM liuzunfei Exp $
*/
public class ConfigQueryRequest extends ConfigCommonRequest{
public class ConfigQueryRequest extends ConfigCommonRequest {
private String dataId;
private String group;
private String tenant;
@Override
public String getType() {
return ConfigRequestTypeConstants.QUERY_CONFIG;
}
public static ConfigQueryRequest build(String dataId, String group,String tenant){
ConfigQueryRequest request=new ConfigQueryRequest();
/**
* request builder.
*
* @param dataId dataId
* @param group group
* @param tenant tenant
* @return ConfigQueryRequest instance.
*/
public static ConfigQueryRequest build(String dataId, String group, String tenant) {
ConfigQueryRequest request = new ConfigQueryRequest();
request.setDataId(dataId);
request.setGroup(group);
request.setTenant(tenant);
return request;
}
/**
* Getter method for property <tt>dataId</tt>.
*
@ -49,7 +59,7 @@ public class ConfigQueryRequest extends ConfigCommonRequest{
public String getDataId() {
return dataId;
}
/**
* Setter method for property <tt>dataId</tt>.
*
@ -58,7 +68,7 @@ public class ConfigQueryRequest extends ConfigCommonRequest{
public void setDataId(String dataId) {
this.dataId = dataId;
}
/**
* Getter method for property <tt>group</tt>.
*
@ -67,7 +77,7 @@ public class ConfigQueryRequest extends ConfigCommonRequest{
public String getGroup() {
return group;
}
/**
* Setter method for property <tt>group</tt>.
*
@ -76,7 +86,7 @@ public class ConfigQueryRequest extends ConfigCommonRequest{
public void setGroup(String group) {
this.group = group;
}
/**
* Getter method for property <tt>tenant</tt>.
*
@ -85,7 +95,7 @@ public class ConfigQueryRequest extends ConfigCommonRequest{
public String getTenant() {
return tenant;
}
/**
* Setter method for property <tt>tenant</tt>.
*

View File

@ -13,18 +13,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.config.remote.response;
import com.alibaba.nacos.api.remote.response.Response;
/**
* ConfigChangeListenResponse.
*
* @author liuzunfei
* @version $Id: ConfigChangeListenResponse.java, v 0.1 2020年07月14日 3:07 PM liuzunfei Exp $
*/
public class ConfigChangeListenResponse extends Response {
public ConfigChangeListenResponse() {
super();
}
public ConfigChangeListenResponse(int resultCode, String message) {
super(ConfigResponseTypeConstants.CONFIG_CHANGE, resultCode, message);
}
}

View File

@ -16,24 +16,63 @@
package com.alibaba.nacos.api.config.remote.response;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ResponseCode;
/**
* @author liuzunfei
* @version $Id: ConfigChangeNotifyResponse.java, v 0.1 2020年07月14日 3:20 PM liuzunfei Exp $
*/
public class ConfigChangeNotifyResponse extends Response {
private String ackId;
private String dataId;
private String group;
private String tenant;
public ConfigChangeNotifyResponse( int resultCode, String message) {
public ConfigChangeNotifyResponse() {
}
public ConfigChangeNotifyResponse(int resultCode, String message) {
super(ConfigResponseTypeConstants.CONFIG_CHANGE_NOTIFY, resultCode, message);
}
/**
* build success response.
*
* @param dataId dataId
* @param group group
* @param tenant tenant
* @return ConfigChangeNotifyResponse
*/
public static ConfigChangeNotifyResponse buildSuccessResponse(String dataId, String group, String tenant) {
ConfigChangeNotifyResponse response = new ConfigChangeNotifyResponse(ResponseCode.SUCCESS.getCode(),
"data changed");
response.setDataId(dataId);
response.setGroup(group);
response.setTenant(tenant);
return response;
}
/**
* Getter method for property <tt>ackId</tt>.
*
* @return property value of ackId
*/
public String getAckId() {
return ackId;
}
/**
* Setter method for property <tt>ackId</tt>.
*
* @param ackId value to be assigned to property ackId
*/
public void setAckId(String ackId) {
this.ackId = ackId;
}
/**
* Getter method for property <tt>dataId</tt>.

View File

@ -13,47 +13,51 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.config.remote.response;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ResponseCode;
/**
* ConfigQueryResponse.
* @author liuzunfei
* @version $Id: ConfigQueryResponse.java, v 0.1 2020年07月14日 2:47 PM liuzunfei Exp $
*/
public class ConfigQueryResponse extends Response {
String content;
public static final int CONFIG_NOT_FOUND=300;
public static final int CONFIG_QUERY_CONFLICT=400;
public static final int CONFIG_NOT_FOUND = 300;
public static final int CONFIG_QUERY_CONFLICT = 400;
/**
* Buid fail response
* Buid fail response.
*
* @param errorCode
* @param message
* @return
*/
public static ConfigQueryResponse buildFailResponse(int errorCode,String message){
ConfigQueryResponse response=new ConfigQueryResponse(ResponseCode.FAIL.getCode(),message);
public static ConfigQueryResponse buildFailResponse(int errorCode, String message) {
ConfigQueryResponse response = new ConfigQueryResponse(ResponseCode.FAIL.getCode(), message);
response.setErrorCode(errorCode);
return response;
}
/**
* Buidl success resposne
*
* @param content
* @return
*/
public static ConfigQueryResponse buildSuccessResponse(String content){
ConfigQueryResponse response=new ConfigQueryResponse(ResponseCode.SUCCESS.getCode(),"");
public static ConfigQueryResponse buildSuccessResponse(String content) {
ConfigQueryResponse response = new ConfigQueryResponse(ResponseCode.SUCCESS.getCode(), "");
response.setContent(content);
return response;
}
/**
* Getter method for property <tt>content</tt>.
*
@ -62,7 +66,7 @@ public class ConfigQueryResponse extends Response {
public String getContent() {
return content;
}
/**
* Setter method for property <tt>content</tt>.
*
@ -71,9 +75,9 @@ public class ConfigQueryResponse extends Response {
public void setContent(String content) {
this.content = content;
}
public ConfigQueryResponse(int resultCode, String message) {
super(ConfigResponseTypeConstants.CONFIG_QUERY,resultCode, message);
super(ConfigResponseTypeConstants.CONFIG_QUERY, resultCode, message);
}
}

View File

@ -13,21 +13,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.config.remote.response;
import com.alibaba.nacos.api.remote.response.ResponseTypeConstants;
/**
* response type defined in config module
* @author liuzunfei
* @version $Id: ConfigResponseTypeConstants.java, v 0.1 2020年07月14日 3:10 PM liuzunfei Exp $
*/
public class ConfigResponseTypeConstants extends ResponseTypeConstants {
public static final String CONFIG_CHANGE="CONFIG_CHANGE";
public static final String CONFIG_CHANGE_NOTIFY="CONFIG_CHANGE_NOTIFY";
public static final String CONFIG_QUERY="CONFIG_QUERY";
public static final String CONFIG_CHANGE = "CONFIG_CHANGE";
public static final String CONFIG_CHANGE_NOTIFY = "CONFIG_CHANGE_NOTIFY";
public static final String CONFIG_QUERY = "CONFIG_QUERY";
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -834,7 +834,7 @@ public final class GrpcMetadata extends
if (value == null) {
throw new NullPointerException();
}
name_ = value;
onChanged();
return this;
@ -843,7 +843,7 @@ public final class GrpcMetadata extends
* <code>string name = 1;</code>
*/
public Builder clearName() {
name_ = getDefaultInstance().getName();
onChanged();
return this;
@ -857,7 +857,7 @@ public final class GrpcMetadata extends
throw new NullPointerException();
}
checkByteStringIsUtf8(value);
name_ = value;
onChanged();
return this;
@ -903,7 +903,7 @@ public final class GrpcMetadata extends
if (value == null) {
throw new NullPointerException();
}
clientIp_ = value;
onChanged();
return this;
@ -912,7 +912,7 @@ public final class GrpcMetadata extends
* <code>string client_ip = 2;</code>
*/
public Builder clearClientIp() {
clientIp_ = getDefaultInstance().getClientIp();
onChanged();
return this;
@ -926,7 +926,7 @@ public final class GrpcMetadata extends
throw new NullPointerException();
}
checkByteStringIsUtf8(value);
clientIp_ = value;
onChanged();
return this;
@ -972,7 +972,7 @@ public final class GrpcMetadata extends
if (value == null) {
throw new NullPointerException();
}
connectionId_ = value;
onChanged();
return this;
@ -981,7 +981,7 @@ public final class GrpcMetadata extends
* <code>string connection_id = 3;</code>
*/
public Builder clearConnectionId() {
connectionId_ = getDefaultInstance().getConnectionId();
onChanged();
return this;
@ -995,7 +995,7 @@ public final class GrpcMetadata extends
throw new NullPointerException();
}
checkByteStringIsUtf8(value);
connectionId_ = value;
onChanged();
return this;
@ -1086,7 +1086,7 @@ public final class GrpcMetadata extends
* <code>.google.protobuf.Timestamp create_time = 4;</code>
*/
public com.google.protobuf.Timestamp.Builder getCreateTimeBuilder() {
onChanged();
return getCreateTimeFieldBuilder().getBuilder();
}
@ -1158,7 +1158,7 @@ public final class GrpcMetadata extends
if (value == null) {
throw new NullPointerException();
}
version_ = value;
onChanged();
return this;
@ -1167,7 +1167,7 @@ public final class GrpcMetadata extends
* <code>string version = 5;</code>
*/
public Builder clearVersion() {
version_ = getDefaultInstance().getVersion();
onChanged();
return this;
@ -1181,7 +1181,7 @@ public final class GrpcMetadata extends
throw new NullPointerException();
}
checkByteStringIsUtf8(value);
version_ = value;
onChanged();
return this;

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -20,93 +20,94 @@
package com.alibaba.nacos.api.grpc;
public interface GrpcMetadataOrBuilder extends
// @@protoc_insertion_point(interface_extends:GrpcMetadata)
com.google.protobuf.MessageOrBuilder {
/**
* <code>string name = 1;</code>
*/
String getName();
/**
* <code>string name = 1;</code>
*/
com.google.protobuf.ByteString
getNameBytes();
/**
* <code>string client_ip = 2;</code>
*/
String getClientIp();
/**
* <code>string client_ip = 2;</code>
*/
com.google.protobuf.ByteString
getClientIpBytes();
/**
* <code>string connection_id = 3;</code>
*/
String getConnectionId();
/**
* <code>string connection_id = 3;</code>
*/
com.google.protobuf.ByteString
getConnectionIdBytes();
/**
* <code>.google.protobuf.Timestamp create_time = 4;</code>
*/
boolean hasCreateTime();
/**
* <code>.google.protobuf.Timestamp create_time = 4;</code>
*/
com.google.protobuf.Timestamp getCreateTime();
/**
* <code>.google.protobuf.Timestamp create_time = 4;</code>
*/
com.google.protobuf.TimestampOrBuilder getCreateTimeOrBuilder();
/**
* <code>string version = 5;</code>
*/
String getVersion();
/**
* <code>string version = 5;</code>
*/
com.google.protobuf.ByteString
getVersionBytes();
/**
* <code>map&lt;string, string&gt; labels = 6;</code>
*/
int getLabelsCount();
/**
* <code>map&lt;string, string&gt; labels = 6;</code>
*/
boolean containsLabels(
String key);
/**
* Use {@link #getLabelsMap()} instead.
*/
@Deprecated
java.util.Map<String, String>
getLabels();
/**
* <code>map&lt;string, string&gt; labels = 6;</code>
*/
java.util.Map<String, String>
getLabelsMap();
/**
* <code>map&lt;string, string&gt; labels = 6;</code>
*/
String getLabelsOrDefault(
String key,
String defaultValue);
/**
* <code>map&lt;string, string&gt; labels = 6;</code>
*/
String getLabelsOrThrow(
String key);
// @@protoc_insertion_point(interface_extends:GrpcMetadata)
com.google.protobuf.MessageOrBuilder {
/**
* <code>string name = 1;.</code>
*/
String getName();
/**
* <code>string name = 1;.</code>
*/
com.google.protobuf.ByteString getNameBytes();
/**
* <code>string client_ip = 2;.</code>
*/
String getClientIp();
/**
* <code>string client_ip = 2;.</code>
*/
com.google.protobuf.ByteString getClientIpBytes();
/**
* <code>string connection_id = 3;.</code>
*/
String getConnectionId();
/**
* <code>string connection_id = 3;.</code>
*/
com.google.protobuf.ByteString getConnectionIdBytes();
/**
* <code>.google.protobuf.Timestamp create_time = 4;.</code>
*/
boolean hasCreateTime();
/**
* <code>.google.protobuf.Timestamp create_time = 4;.</code>
*/
com.google.protobuf.Timestamp getCreateTime();
/**
* <code>.google.protobuf.Timestamp create_time = 4;.</code>
*/
com.google.protobuf.TimestampOrBuilder getCreateTimeOrBuilder();
/**
* <code>string version = 5;.</code>
*/
String getVersion();
/**
* <code>string version = 5;.</code>
*/
com.google.protobuf.ByteString getVersionBytes();
/**
* <code>map&lt;string, string&gt; labels = 6;.</code>
*/
int getLabelsCount();
/**
* <code>map&lt;string, string&gt; labels = 6;.</code>
*/
boolean containsLabels(String key);
/**
* Use {@link #getLabelsMap()} instead.
*/
@Deprecated
java.util.Map<String, String> getLabels();
/**
* <code>map&lt;string, string&gt; labels = 6;.</code>
*/
java.util.Map<String, String> getLabelsMap();
/**
* <code>map&lt;string, string&gt; labels = 6;.</code>
*/
String getLabelsOrDefault(String key, String defaultValue);
/**
* <code>map&lt;string, string&gt; labels = 6;.</code>
*/
String getLabelsOrThrow(String key);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -594,7 +594,7 @@ public final class GrpcRequest extends
if (value == null) {
throw new NullPointerException();
}
type_ = value;
onChanged();
return this;
@ -603,7 +603,7 @@ public final class GrpcRequest extends
* <code>string type = 2;</code>
*/
public Builder clearType() {
type_ = getDefaultInstance().getType();
onChanged();
return this;
@ -617,7 +617,7 @@ public final class GrpcRequest extends
throw new NullPointerException();
}
checkByteStringIsUtf8(value);
type_ = value;
onChanged();
return this;
@ -736,7 +736,7 @@ public final class GrpcRequest extends
* <code>.GrpcMetadata metadata = 1;</code>
*/
public GrpcMetadata.Builder getMetadataBuilder() {
onChanged();
return getMetadataFieldBuilder().getBuilder();
}
@ -889,7 +889,7 @@ public final class GrpcRequest extends
* <code>.google.protobuf.Any body = 3;</code>
*/
public com.google.protobuf.Any.Builder getBodyBuilder() {
onChanged();
return getBodyFieldBuilder().getBuilder();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

File diff suppressed because it is too large Load Diff

View File

@ -1,17 +1,33 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: nacos_grpc_service.proto
package com.alibaba.nacos.api.grpc;
public interface GrpcResponseOrBuilder extends
// @@protoc_insertion_point(interface_extends:GrpcResponse)
com.google.protobuf.MessageOrBuilder {
// @@protoc_insertion_point(interface_extends:GrpcResponse)
com.google.protobuf.MessageOrBuilder {
/**
* <code>int32 code = 1;</code>
*/
int getCode();
/**
* <pre>
* reponse body
@ -20,6 +36,7 @@ public interface GrpcResponseOrBuilder extends
* <code>.google.protobuf.Any body = 2;</code>
*/
boolean hasBody();
/**
* <pre>
* reponse body
@ -28,6 +45,7 @@ public interface GrpcResponseOrBuilder extends
* <code>.google.protobuf.Any body = 2;</code>
*/
com.google.protobuf.Any getBody();
/**
* <pre>
* reponse body
@ -36,4 +54,14 @@ public interface GrpcResponseOrBuilder extends
* <code>.google.protobuf.Any body = 2;</code>
*/
com.google.protobuf.AnyOrBuilder getBodyOrBuilder();
/**
* <code>string type = 3;</code>
*/
String getType();
/**
* <code>string type = 3;</code>
*/
com.google.protobuf.ByteString getTypeBytes();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -69,12 +69,12 @@ public final class NacosGrpcService {
"try\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028\001\"`\n\013G" +
"rpcRequest\022\014\n\004type\030\002 \001(\t\022\037\n\010metadata\030\001 \001" +
"(\0132\r.GrpcMetadata\022\"\n\004body\030\003 \001(\0132\024.google",
".protobuf.Any\"@\n\014GrpcResponse\022\014\n\004code\030\001 " +
"\001(\005\022\"\n\004body\030\002 \001(\0132\024.google.protobuf.Any2" +
"A\n\rRequestStream\0220\n\rrequestStream\022\014.Grpc" +
"Request\032\r.GrpcResponse\"\0000\00123\n\007Request\022(\n" +
"\007request\022\014.GrpcRequest\032\r.GrpcResponse\"\000B" +
"\036\n\032com.alibaba.nacos.api.grpcP\001b\006proto3"
".protobuf.Any\"N\n\014GrpcResponse\022\014\n\004code\030\001 "
+ "\001(\005\022\"\n\004body\030\002 \001(\0132\024.google.protobuf.Any\022"
+ "\014\n\004type\030\003 \001(\t2A\n\rRequestStream\0220\n\rreques"
+ "tStream\022\014.GrpcRequest\032\r.GrpcResponse\"\0000\001"
+ "23\n\007Request\022(\n\007request\022\014.GrpcRequest\032\r.G"
+ "rpcResponse\"\000B\036\n\032com.alibaba.nacos.api.g" + "rpcP\001b\006proto3"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() {
@ -112,8 +112,7 @@ public final class NacosGrpcService {
getDescriptor().getMessageTypes().get(2);
internal_static_GrpcResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_GrpcResponse_descriptor,
new String[] { "Code", "Body", });
internal_static_GrpcResponse_descriptor, new String[] {"Code", "Body", "Type",});
com.google.protobuf.AnyProto.getDescriptor();
com.google.protobuf.TimestampProto.getDescriptor();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -107,8 +107,7 @@ public final class RequestGrpc {
* Sends a commonRequest
* </pre>
*/
public void request(GrpcRequest request,
io.grpc.stub.StreamObserver<GrpcResponse> responseObserver) {
public void request(GrpcRequest request, io.grpc.stub.StreamObserver<GrpcResponse> responseObserver) {
asyncUnimplementedUnaryCall(getRequestMethod(), responseObserver);
}
@ -148,8 +147,7 @@ public final class RequestGrpc {
* Sends a commonRequest
* </pre>
*/
public void request(GrpcRequest request,
io.grpc.stub.StreamObserver<GrpcResponse> responseObserver) {
public void request(GrpcRequest request, io.grpc.stub.StreamObserver<GrpcResponse> responseObserver) {
asyncUnaryCall(
getChannel().newCall(getRequestMethod(), getCallOptions()), request, responseObserver);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -107,8 +107,7 @@ public final class RequestStreamGrpc {
* build a streamRequest
* </pre>
*/
public void requestStream(GrpcRequest request,
io.grpc.stub.StreamObserver<GrpcResponse> responseObserver) {
public void requestStream(GrpcRequest request, io.grpc.stub.StreamObserver<GrpcResponse> responseObserver) {
asyncUnimplementedUnaryCall(getRequestStreamMethod(), responseObserver);
}
@ -148,8 +147,7 @@ public final class RequestStreamGrpc {
* build a streamRequest
* </pre>
*/
public void requestStream(GrpcRequest request,
io.grpc.stub.StreamObserver<GrpcResponse> responseObserver) {
public void requestStream(GrpcRequest request, io.grpc.stub.StreamObserver<GrpcResponse> responseObserver) {
asyncServerStreamingCall(
getChannel().newCall(getRequestStreamMethod(), getCallOptions()), request, responseObserver);
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -13,17 +13,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.request;
package com.alibaba.nacos.api.naming.remote;
import com.alibaba.nacos.api.remote.request.Request;
/**
* uniform remote request of naming module
*
* @author liuzunfei
* @version $Id: ChangeListenRequest.java, v 0.1 2020年07月13日 8:45 PM liuzunfei Exp $
* @version $Id: NamingCommonRequest.java, v 0.1 2020年07月14日 7:26 PM liuzunfei Exp $
*/
public abstract class CommonRequest extends Request {
public abstract class NamingCommonRequest extends Request {
@Override
public String getModel() {
return RequestMode.COMMON.mode;
public String getModule() {
return "naming";
}
}

View File

@ -13,16 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.naming.remote;
import com.alibaba.nacos.api.remote.request.RequestTypeConstants;
/**
* retain all naming module request type constants.
*
* @author liuzunfei
* @version $Id: NamingRequestTypeConstants.java, v 0.1 2020年07月13日 9:12 PM liuzunfei Exp $
*/
public class NamingRequestTypeConstants extends RequestTypeConstants {
public static final String SERVICE_INSTANCE_CHANGE = "SERVICE_INSTANCE_CHANGE";
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote;
import com.alibaba.nacos.api.config.remote.response.ConfigChangeListenResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigChangeNotifyResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigResponseTypeConstants;
import com.alibaba.nacos.api.naming.remote.NamingRequestTypeConstants;
import com.alibaba.nacos.api.remote.response.ConnectResetResponse;
import com.alibaba.nacos.api.remote.response.HeartBeatResponse;
import com.alibaba.nacos.api.remote.response.ResponseTypeConstants;
import java.util.HashMap;
import java.util.Map;
/**
* ResponseRegistry.
*
* @author liuzunfei
* @version $Id: ResponseRegistry.java, v 0.1 2020年07月15日 12:43 PM liuzunfei Exp $
*/
public class ResponseRegistry {
private static final Map<String, Class> REGISTRY_RESPONSES = new HashMap<String, Class>();
static {
//internal response regitry
REGISTRY_RESPONSES.put(ResponseTypeConstants.HEART_BEAT, HeartBeatResponse.class);
REGISTRY_RESPONSES.put(ResponseTypeConstants.CONNECT_SWITCH, ConnectResetResponse.class);
//config response registry
REGISTRY_RESPONSES.put(ConfigResponseTypeConstants.CONFIG_CHANGE, ConfigChangeListenResponse.class);
REGISTRY_RESPONSES.put(ConfigResponseTypeConstants.CONFIG_CHANGE_NOTIFY, ConfigChangeNotifyResponse.class);
REGISTRY_RESPONSES.put(ConfigResponseTypeConstants.CONFIG_QUERY, ConfigQueryResponse.class);
//naming response registry
//REGISTRY_RESPONSES.put(NamingRequestTypeConstants.SERVICE_INSTANCE_CHANGE, ServiceI.class);
}
public static Class getClassByType(String type) {
return REGISTRY_RESPONSES.get(type);
}
}

View File

@ -13,53 +13,55 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.connection;
import java.util.Date;
package com.alibaba.nacos.api.remote.connection;
import com.alibaba.nacos.api.remote.response.Response;
/**
* Connection.
*
* @author liuzunfei
* @version $Id: Connection.java, v 0.1 2020年07月13日 7:08 PM liuzunfei Exp $
*/
public abstract class Connection {
private ConnectionMetaInfo metaInfo;
public Connection(ConnectionMetaInfo metaInfo){
this.metaInfo=metaInfo;
private final ConnectionMetaInfo metaInfo;
public Connection(ConnectionMetaInfo metaInfo) {
this.metaInfo = metaInfo;
}
/**
* Send response to this client that associated to this connection
* @param reponse
* Send response to this client that associated to this connection.
*
* @param reponse reponse
*/
public abstract void sendResponse(Response reponse);
/**
* Close this connection, if this connection is not active yet,
* Close this connection, if this connection is not active yet.
*/
public abstract void closeGrapcefully();
public abstract void closeGrapcefully();
/**
* Update last Active Time to now.
*/
public void freshActiveTime(){
metaInfo.lastActiveTime=new Date();
public void freshActiveTime() {
metaInfo.setLastActiveTime(System.currentTimeMillis());
}
/**
* return last active time, include request occurs and
* return last active time, include request occurs and.
* @return
*/
public Date getLastActiveTime(){
public long getLastActiveTimestamp() {
return metaInfo.lastActiveTime;
}
public String getConnectionId() {
return metaInfo.connectionId;
}
}

View File

@ -13,47 +13,52 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.connection;
import java.util.Date;
/**
* ConnectionMetaInfo.
*
* @author liuzunfei
* @version $Id: ConnectionMetaInfo.java, v 0.1 2020年07月13日 7:28 PM liuzunfei Exp $
*/
public class ConnectionMetaInfo{
public class ConnectionMetaInfo {
/**
* ConnectionType
*/
String connectType;
/**
* Client IP Address
*/
String clientIp;
/**
* Identify Unique connectionId
*/
String connectionId;
/**
* create time
*/
Date createTime;
/**
* astActiveTime
*/
Date lastActiveTime;
public ConnectionMetaInfo(String connectionId,String clientIp,String connectType){
this.connectionId=connectionId;
this.clientIp=clientIp;
this.connectType=connectType;
this.createTime=new Date();
this.lastActiveTime=new Date();
long lastActiveTime;
public ConnectionMetaInfo(String connectionId, String clientIp, String connectType) {
this.connectionId = connectionId;
this.clientIp = clientIp;
this.connectType = connectType;
this.createTime = new Date();
this.lastActiveTime = System.currentTimeMillis();
}
/**
* Getter method for property <tt>clientIp</tt>.
*
@ -62,7 +67,7 @@ public class ConnectionMetaInfo{
public String getClientIp() {
return clientIp;
}
/**
* Setter method for property <tt>clientIp</tt>.
*
@ -71,7 +76,7 @@ public class ConnectionMetaInfo{
public void setClientIp(String clientIp) {
this.clientIp = clientIp;
}
/**
* Getter method for property <tt>connectionId</tt>.
*
@ -80,7 +85,7 @@ public class ConnectionMetaInfo{
public String getConnectionId() {
return connectionId;
}
/**
* Setter method for property <tt>connectionId</tt>.
*
@ -89,7 +94,7 @@ public class ConnectionMetaInfo{
public void setConnectionId(String connectionId) {
this.connectionId = connectionId;
}
/**
* Getter method for property <tt>createTime</tt>.
*
@ -98,7 +103,7 @@ public class ConnectionMetaInfo{
public Date getCreateTime() {
return createTime;
}
/**
* Setter method for property <tt>createTime</tt>.
*
@ -107,22 +112,22 @@ public class ConnectionMetaInfo{
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
/**
* Getter method for property <tt>lastActiveTime</tt>.
*
* @return property value of lastActiveTime
*/
public Date getLastActiveTime() {
public long getLastActiveTime() {
return lastActiveTime;
}
/**
* Setter method for property <tt>lastActiveTime</tt>.
*
* @param lastActiveTime value to be assigned to property lastActiveTime
*/
public void setLastActiveTime(Date lastActiveTime) {
public void setLastActiveTime(long lastActiveTime) {
this.lastActiveTime = lastActiveTime;
}
}

View File

@ -20,7 +20,8 @@ package com.alibaba.nacos.api.remote.request;
* @author liuzunfei
* @version $Id: HeartBeatRequest.java, v 0.1 2020年07月14日 11:38 AM liuzunfei Exp $
*/
public class HeartBeatRequest extends CommonRequest{
public class HeartBeatRequest extends Request {
@Override
public String getType() {
return RequestTypeConstants.HEART_BEAT;
@ -28,6 +29,6 @@ public class HeartBeatRequest extends CommonRequest{
@Override
public String getModule() {
return "core";
return "internal";
}
}

View File

@ -17,48 +17,24 @@
package com.alibaba.nacos.api.remote.request;
/**
* Request.
* @author liuzunfei
* @version $Id: Request.java, v 0.1 2020年07月13日 3:46 PM liuzunfei Exp $
*/
public abstract class Request {
private String bodyString;
/**
* Getter method for property <tt>model</tt>.
*
* @return property value of model
*/
abstract public String getModel();
/**
* Getter method for property <tt>bodyString</tt>.
*
* @return property value of bodyString
*/
public String getBodyString() {
return bodyString;
}
/**
* Setter method for property <tt>bodyString</tt>.
*
* @param bodyString value to be assigned to property bodyString
*/
public void setBodyString(String bodyString) {
this.bodyString = bodyString;
}
/**
* Getter method for property <tt>type</tt>.
*
* @return property value of type
*/
abstract public String getType() ;
public abstract String getType();
/**
* Getter method for property <tt>type</tt>.
*
* @return property value of type
*/
abstract public String getModule() ;
public abstract String getModule();
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -13,30 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.request;
package com.alibaba.nacos.api.remote.response;
/**
* ConnectResetResponse.
*
* @author liuzunfei
* @version $Id: RequestMode.java, v 0.1 2020年07月13日 3:46 PM liuzunfei Exp $
* @version $Id: ConnectResetResponse.java, v 0.1 2020年07月15日 11:11 AM liuzunfei Exp $
*/
public enum RequestMode {
COMMON("COMMON","common request "),
CHANGE_LISTEN("CHANGE_LISTEN","listen change");
public String mode;
public String desc;
/**
* Private constructor
* @param mode
* @param desc
*/
private RequestMode(String mode,String desc){
this.mode=mode;
this.desc=desc;
public class ConnectResetResponse extends Response {
public ConnectResetResponse(int resultCode, String message) {
super(ResponseTypeConstants.CONNECT_SWITCH, resultCode, message);
}
}
}

View File

@ -13,17 +13,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.response;
/**
* HeartBeatResponse.
*
* @author liuzunfei
* @version $Id: HeartBeatResponse.java, v 0.1 2020年07月14日 3:03 PM liuzunfei Exp $
*/
public class HeartBeatResponse extends Response{
public class HeartBeatResponse extends Response {
public HeartBeatResponse(int resultCode, String message) {
super("HeartBeat", resultCode, message);
super(ResponseTypeConstants.HEART_BEAT, resultCode, message);
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.response;
/**
* PlainBodyResponse.
*
* @author liuzunfei
* @version $Id: PlainBodyResponse.java, v 0.1 2020年07月15日 1:37 PM liuzunfei Exp $
*/
public class PlainBodyResponse extends Response {
private String bodyString;
public PlainBodyResponse() {
}
public PlainBodyResponse(int resultCode, String message) {
super(ResponseTypeConstants.PLAIN_BODY_STRING, resultCode, message);
}
/**
* Getter method for property <tt>bodyString</tt>.
*
* @return property value of bodyString
*/
public String getBodyString() {
return bodyString;
}
/**
* Setter method for property <tt>bodyString</tt>.
*
* @param bodyString value to be assigned to property bodyString
*/
public void setBodyString(String bodyString) {
this.bodyString = bodyString;
}
}

View File

@ -13,29 +13,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.response;
/**
* abstract response model via rpc channel.
*
* @author liuzunfei
* @version $Id: Response.java, v 0.1 2020年07月13日 6:03 PM liuzunfei Exp $
*/
public abstract class Response<T> {
public abstract class Response {
int resultCode;
int errorCode;
String message;
String type;
public Response(String type,int resultCode,String message){
this.type=type;
this.resultCode=resultCode;
this.message=message;
/**
* Check Response is Successd.
* @return
*/
public boolean isSuccess() {
return this.resultCode == ResponseCode.SUCCESS.getCode();
}
public Response() {
}
public Response(String type, int resultCode, String message) {
this.type = type;
this.resultCode = resultCode;
this.message = message;
}
/**
* Getter method for property <tt>resultCode</tt>.
*
@ -44,7 +58,7 @@ public abstract class Response<T> {
public int getResultCode() {
return resultCode;
}
/**
* Setter method for property <tt>resultCode</tt>.
*
@ -53,7 +67,7 @@ public abstract class Response<T> {
public void setResultCode(int resultCode) {
this.resultCode = resultCode;
}
/**
* Getter method for property <tt>message</tt>.
*
@ -62,7 +76,7 @@ public abstract class Response<T> {
public String getMessage() {
return message;
}
/**
* Setter method for property <tt>message</tt>.
*
@ -71,7 +85,7 @@ public abstract class Response<T> {
public void setMessage(String message) {
this.message = message;
}
/**
* Getter method for property <tt>errorCode</tt>.
*
@ -80,7 +94,7 @@ public abstract class Response<T> {
public int getErrorCode() {
return errorCode;
}
/**
* Setter method for property <tt>errorCode</tt>.
*
@ -89,9 +103,16 @@ public abstract class Response<T> {
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
/**
* Getter method for property <tt>type</tt>.
*
* @return property value of type
*/
public String getType() {
return type;
}
/**
* Setter method for property <tt>type</tt>.
*
@ -100,4 +121,5 @@ public abstract class Response<T> {
public void setType(String type) {
this.type = type;
}
}

View File

@ -13,14 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.response;
/**
* internal ResponseTypeConstants.
* @author liuzunfei
* @version $Id: RequestTypeConstants.java, v 0.1 2020年07月13日 9:18 PM liuzunfei Exp $
*/
public class ResponseTypeConstants {
public static final String HEART_BEAT="HEART_BEAT";
public static final String PLAIN_BODY_STRING = "PLAIN_BODY_STRING";
public static final String HEART_BEAT = "HEART_BEAT";
public static final String CONNECT_SWITCH = "CONNECT_SWITCH";
}

View File

@ -52,6 +52,9 @@ message GrpcResponse {
int32 code = 1;
// reponse body
google.protobuf.Any body = 2;
string type = 3;
}
service RequestStream {

View File

@ -20,19 +20,21 @@ import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.config.ConfigType;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.config.remote.request.ConfigChangeListenRequest;
import com.alibaba.nacos.api.config.remote.response.ConfigChangeNotifyResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.config.common.GroupKey;
import com.alibaba.nacos.client.config.filter.impl.ConfigFilterChainManager;
import com.alibaba.nacos.client.config.http.HttpAgent;
import com.alibaba.nacos.client.config.impl.HttpSimpleClient.HttpResult;
import com.alibaba.nacos.client.config.remote.ConfigGrpcClientProxy;
import com.alibaba.nacos.client.config.utils.ContentUtils;
import com.alibaba.nacos.client.monitor.MetricsMonitor;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;
import com.alibaba.nacos.client.remote.ChangeListenResponseHandler;
import com.alibaba.nacos.client.remote.RpcClient;
import com.alibaba.nacos.client.remote.ConnectionEventListener;
import com.alibaba.nacos.client.remote.ServerListFactory;
import com.alibaba.nacos.client.remote.grpc.GrpcClient;
import com.alibaba.nacos.client.utils.LogUtils;
import com.alibaba.nacos.client.utils.ParamUtil;
import com.alibaba.nacos.client.utils.TenantUtil;
@ -49,6 +51,7 @@ import java.net.HttpURLConnection;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@ -87,6 +90,13 @@ public class ClientWorker implements Closeable {
for (Listener listener : listeners) {
cache.addListener(listener);
}
try {
rpcClientProxy.listenConfigChange(dataId, group, "");
} catch (NacosException e) {
e.printStackTrace();
}
}
/**
@ -105,6 +115,13 @@ public class ClientWorker implements Closeable {
removeCache(dataId, group);
}
}
try {
rpcClientProxy.unListenConfigChange(dataId, group, "");
} catch (NacosException e) {
e.printStackTrace();
}
}
/**
@ -123,6 +140,9 @@ public class ClientWorker implements Closeable {
for (Listener listener : listeners) {
cache.addListener(listener);
}
rpcClientProxy.unListenConfigChange(dataId, group, tenant);
}
/**
@ -143,6 +163,9 @@ public class ClientWorker implements Closeable {
for (Listener listener : listeners) {
cache.addListener(listener);
}
rpcClientProxy.listenConfigChange(dataId, group, tenant);
}
/**
@ -162,6 +185,13 @@ public class ClientWorker implements Closeable {
removeCache(dataId, group, tenant);
}
}
try {
rpcClientProxy.unListenConfigChange(dataId, group, tenant);
} catch (NacosException e) {
e.printStackTrace();
}
}
private void removeCache(String dataId, String group) {
@ -532,7 +562,7 @@ public class ClientWorker implements Closeable {
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
final Properties properties) {
final Properties properties) throws NacosException {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
@ -560,45 +590,99 @@ public class ClientWorker implements Closeable {
return t;
}
});
// cancel long polling config check task
// this.executor.scheduleWithFixedDelay(new Runnable() {
// @Override
// public void run() {
// try {
// checkConfigInfo();
// } catch (Throwable e) {
// LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
// }
// }
// }, 1L, 10L, TimeUnit.MILLISECONDS);
this.executor.scheduleWithFixedDelay(new Runnable() {
rpcClientProxy = new ConfigGrpcClientProxy();
if (rpcClientProxy.getRpcClient().isWaitInited()) {
rpcClientProxy.getRpcClient().init(new ServerListFactory() {
@Override
public String genNextServer() {
ServerListManager serverListManager = agent.getServerListManager();
serverListManager.refreshCurrentServerAddr();
return serverListManager.getCurrentServerAddr();
}
@Override
public String getCurrentServer() {
return agent.getServerListManager().getCurrentServerAddr();
}
});
rpcClientProxy.start();
}
/*
* Register Listen Change Handler
*/
rpcClientProxy.getRpcClient().registerChangeListenHandler(new ChangeListenResponseHandler() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
public void responseReply(Response myresponse) {
if (myresponse instanceof ConfigChangeNotifyResponse) {
ConfigChangeNotifyResponse configChangeNotifyResponse = (ConfigChangeNotifyResponse) myresponse;
String groupKey = GroupKey
.getKeyTenant(configChangeNotifyResponse.getDataId(), configChangeNotifyResponse.getGroup(),
configChangeNotifyResponse.getTenant());
if (cacheMap.get() != null && cacheMap.get().containsKey(groupKey)) {
CacheData cache = cacheMap.get().get(groupKey);
try {
String[] ct = getServerConfig(configChangeNotifyResponse.getDataId(),
configChangeNotifyResponse.getGroup(), configChangeNotifyResponse.getTenant(),
3000L);
cache.setContent(ct[0]);
if (null != ct[1]) {
cache.setType(ct[1]);
}
cache.checkListenerMd5();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
});
/*
*
*/
rpcClientProxy.getRpcClient().registerConnectionListener(new ConnectionEventListener() {
@Override
public void onConnected() {
}
@Override
public void onReconnected() {
Collection<CacheData> values = cacheMap.get().values();
for (CacheData cacheData : values) {
if (!CollectionUtils.isEmpty(cacheData.getListeners())) {
rpcClientProxy.request(ConfigChangeListenRequest
.buildListenRequest(cacheData.dataId, cacheData.group, cacheData.tenant));
}
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
rpcClient=new GrpcClient(new ServerListFactory() {
@Override
public String genNextServer() {
ServerListManager serverListManager = agent.getServerListManager();
serverListManager.refreshCurrentServerAddr();
return serverListManager.getCurrentServerAddr();
}
@Override
public String getCurrentServer() {
return agent.getServerListManager().getCurrentServerAddr();
}
});
rpcClient.registerChangeListenHandler(new ChangeListenResponseHandler() {
@Override
public void responseReply(Response response) {
}
@Override
public Response parseBodyString(String bodyString) {
return null;
public void onDisConnect() {
}
});
}
private void init(Properties properties) {
@ -727,8 +811,8 @@ public class ClientWorker implements Closeable {
private boolean isHealthServer = true;
private long timeout;
private RpcClient rpcClient;
private ConfigGrpcClientProxy rpcClientProxy;
private double currentLongingTaskCount = 0;

View File

@ -1,27 +1,101 @@
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
* Alipay.com Inc. Copyright (c) 2004-2020 All Rights Reserved.
*/
package com.alibaba.nacos.client.config.remote;
import com.alibaba.nacos.api.config.remote.request.ConfigChangeListenRequest;
import com.alibaba.nacos.api.config.remote.request.ConfigQueryRequest;
import com.alibaba.nacos.api.config.remote.response.ConfigChangeListenResponse;
import com.alibaba.nacos.api.config.remote.response.ConfigQueryResponse;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.remote.RpcClient;
import com.alibaba.nacos.client.remote.RpcClientFactory;
import com.alibaba.nacos.client.remote.ServerListFactory;
/**
* config grpc client proxy.
*
* @author liuzunfei
* @version $Id: ConfigGrpcClientProxy.java, v 0.1 2020年07月14日 3:37 PM liuzunfei Exp $
*/
public class ConfigGrpcClientProxy {
private RpcClient rpcClient;
public ConfigGrpcClientProxy() {
rpcClient = RpcClientFactory.getClient("config");
}
public void start() throws NacosException {
rpcClient.start();
}
public void switchServer() {
rpcClient.switchServer();
}
public Response request(Request request) {
return rpcClient.request(request);
}
public RpcClient getRpcClient() {
return this.rpcClient;
}
public void initAndStart(ServerListFactory serverListFactory) throws NacosException {
rpcClient.init(serverListFactory);
rpcClient.start();
}
/**
* send congif change listen request.
*
* @param dataId
* @param group
* @param tenat
* @param dataId dataId
* @param group group
* @param tenat tenat
* @throws NacosException throws when listen fail.
*/
public void listenConfigChange(String dataId,String group,String tenat){
public void listenConfigChange(String dataId, String group, String tenat) throws NacosException {
ConfigChangeListenRequest configChangeListenRequest = ConfigChangeListenRequest
.buildListenRequest(dataId, group, tenat);
ConfigChangeListenResponse response = (ConfigChangeListenResponse) rpcClient.request(configChangeListenRequest);
if (!response.isSuccess()) {
throw new NacosException(NacosException.SERVER_ERROR, "Fail to Listen Config Change");
}
}
/**
* sned cancel listen congif change request .
*
* @param dataId dataId
* @param group group
* @param tenat tenat
*/
public void unListenConfigChange(String dataId, String group, String tenat) throws NacosException {
ConfigChangeListenRequest configChangeListenRequest = ConfigChangeListenRequest
.buildUnListenRequest(dataId, group, tenat);
ConfigChangeListenResponse response = (ConfigChangeListenResponse) rpcClient.request(configChangeListenRequest);
if (!response.isSuccess()) {
throw new NacosException(NacosException.SERVER_ERROR, "Fail to UnListen Config Change");
}
}
/**
* query config content by grpc channel .
*
* @param dataId dataId
* @param group group
* @param tenat tenat
* @return ConfigQueryResponse.
* @throws NacosException throw where query fail .
*/
public ConfigQueryResponse queryConfig(String dataId, String group, String tenat) throws NacosException {
ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenat);
ConfigQueryResponse response = (ConfigQueryResponse) rpcClient.request(request);
return (ConfigQueryResponse) response;
}
}

View File

@ -1,30 +1,34 @@
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.remote;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.utils.JacksonUtils;
/**
* ChangeListenResponseHandler.
* @author liuzunfei
* @version $Id: ChangeListenResponseHandler.java, v 0.1 2020年07月14日 11:41 AM liuzunfei Exp $
*/
public abstract interface ChangeListenResponseHandler<T> {
/**
*
* @param response
* handle logic when response ceceive.
* @param response.
*/
abstract public void responseReply(Response response);
/**
*
* @param bodyString
* @param <T>
* @return
*/
public <T extends Response> T parseBodyString(String bodyString);
public abstract void responseReply(Response response);
}

View File

@ -1,6 +1,17 @@
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.remote;

View File

@ -1,7 +1,19 @@
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.remote;
import java.util.ArrayList;
@ -10,95 +22,100 @@ import java.util.UUID;
import javax.annotation.PostConstruct;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
/**
* abstract remote client to connect to server.
*
* @author liuzunfei
* @version $Id: RpcClient.java, v 0.1 2020年07月13日 9:15 PM liuzunfei Exp $
*/
public abstract class RpcClient {
private ServerListFactory serverListFactory;
protected String connectionId;
protected RpcClientStatus rpcClientStatus=RpcClientStatus.WAIT_INIT;
protected RpcClientStatus rpcClientStatus = RpcClientStatus.WAIT_INIT;
/**
* check is this client is inited
* check is this client is inited.
*
* @return
*/
public boolean isInited(){
return this.rpcClientStatus!=RpcClientStatus.WAIT_INIT;
public boolean isWaitInited() {
return this.rpcClientStatus == RpcClientStatus.WAIT_INIT;
}
/**
* listener called where connect status changed
* listener called where connect status changed.
*/
List<ConnectionEventListener> connectionEventListeners=new ArrayList<ConnectionEventListener>();
List<ConnectionEventListener> connectionEventListeners = new ArrayList<ConnectionEventListener>();
/**
* change listeners handler registry
* change listeners handler registry.
*/
List<ChangeListenResponseHandler> changeListenReplyListeners=new ArrayList<ChangeListenResponseHandler>();
public RpcClient(){
protected List<ChangeListenResponseHandler> changeListenReplyListeners = new ArrayList<ChangeListenResponseHandler>();
public RpcClient() {
}
public RpcClient(ServerListFactory serverListFactory){
this.serverListFactory=serverListFactory;
this.connectionId= UUID.randomUUID().toString();
this.rpcClientStatus=RpcClientStatus.INITED;
}
/**
* Start this client
* init server list factory.
*
* @param serverListFactory serverListFactory
*/
public void init(ServerListFactory serverListFactory) {
this.serverListFactory = serverListFactory;
this.connectionId = UUID.randomUUID().toString();
this.rpcClientStatus = RpcClientStatus.INITED;
}
public RpcClient(ServerListFactory serverListFactory) {
this.serverListFactory = serverListFactory;
this.connectionId = UUID.randomUUID().toString();
this.rpcClientStatus = RpcClientStatus.INITED;
}
/**
* Start this client.
*/
@PostConstruct
abstract public void start() throws Exception;
public abstract void start() throws NacosException;
/**
*
* Switch Server.
*/
abstract public void switchServer();
public abstract void switchServer();
/**
* send request.
*
* @param request
* @param <T>
* @param request request.
* @return
*/
abstract public <T extends Response> T request( Request request);
public abstract Response request(Request request);
/**
* register connection handler.will be notified wher inner connect chanfed.
*
* @param request
* @param <T>
* @return
* @param connectionEventListener connectionEventListener
*/
abstract public <T extends Response> T listenChange( Request request);
/**
*
* @param connectionEventListener
*/
public void registerConnectionListener(ConnectionEventListener connectionEventListener){
public void registerConnectionListener(ConnectionEventListener connectionEventListener) {
this.connectionEventListeners.add(connectionEventListener);
}
/**
* register change listeners ,will be called when server send change notify response th current client.
*
* @param changeListenResponseHandler
* @param changeListenResponseHandler changeListenResponseHandler
*/
public void registerChangeListenHandler(ChangeListenResponseHandler changeListenResponseHandler){
public void registerChangeListenHandler(ChangeListenResponseHandler changeListenResponseHandler) {
this.changeListenReplyListeners.add(changeListenResponseHandler);
}
/**
* Getter method for property <tt>serverListFactory</tt>.
*
@ -107,6 +124,5 @@ public abstract class RpcClient {
public ServerListFactory getServerListFactory() {
return serverListFactory;
}
}

View File

@ -1,6 +1,17 @@
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.remote;
@ -17,11 +28,12 @@ import sun.management.resources.agent;
* @version $Id: RpcClientFactory.java, v 0.1 2020年07月14日 3:41 PM liuzunfei Exp $
*/
public class RpcClientFactory {
private RpcClient sharedClient;
Map<String ,RpcClient> clientMap=new HashMap<String ,RpcClient>();
public RpcClient getClient(String module){
static private RpcClient sharedClient;
static Map<String, RpcClient> clientMap = new HashMap<String, RpcClient>();
public static RpcClient getClient(String module) {
String useIndependentClient = System.getProperty("rpc.client.independent");
if ("Y".equalsIgnoreCase(useIndependentClient)){
if(clientMap.get(module)==null){
@ -37,7 +49,8 @@ public class RpcClientFactory {
sharedClient=new GrpcClient();
return sharedClient;
}
}
}
}

View File

@ -1,6 +1,17 @@
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.remote;
@ -14,7 +25,8 @@ public enum RpcClientStatus {
WAIT_INIT(0,"wait to init serverlist factory... "),
INITED(1,"server list factory is ready,wait to start"),
RUNNING(0,"client is running...");
STARTING(2, "server list factory is ready,wait to start"),
RUNNING(3, "client is running...");
int status;
String desc;

View File

@ -1,6 +1,17 @@
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.remote;

View File

@ -1,74 +1,88 @@
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.client.remote.grpc;
import java.net.UnknownHostException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.grpc.GrpcMetadata;
import com.alibaba.nacos.api.grpc.GrpcRequest;
import com.alibaba.nacos.api.grpc.GrpcResponse;
import com.alibaba.nacos.api.grpc.RequestGrpc;
import com.alibaba.nacos.api.grpc.RequestStreamGrpc;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.ResponseRegistry;
import com.alibaba.nacos.api.remote.request.HeartBeatRequest;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.ConnectResetResponse;
import com.alibaba.nacos.api.remote.response.PlainBodyResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.client.naming.utils.NetUtils;
import com.alibaba.nacos.client.remote.ChangeListenResponseHandler;
import com.alibaba.nacos.client.remote.RpcClient;
import com.alibaba.nacos.client.remote.RpcClientStatus;
import com.alibaba.nacos.client.remote.ServerListFactory;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* gRPC Client.
*
* @author liuzunfei
* @version $Id: GrpcClient.java, v 0.1 2020年07月13日 9:16 PM liuzunfei Exp $
*/
public class GrpcClient extends RpcClient {
protected ManagedChannel channel;
protected RequestStreamGrpc.RequestStreamStub grpcStreamServiceStub;
protected RequestGrpc.RequestBlockingStub grpcServiceStub;
public GrpcClient(){
public GrpcClient() {
super();
}
public GrpcClient(ServerListFactory serverListFactory) {
super(serverListFactory);
try {
start();
} catch (Exception e) {
System.out.println("GrpcClient start fail .....");
e.printStackTrace();
}
}
@Override
public void start() throws Exception {
if (rpcClientStatus!=RpcClientStatus.INITED){
public void start() throws NacosException {
if (rpcClientStatus != RpcClientStatus.INITED) {
return;
}
rpcClientStatus = RpcClientStatus.STARTING;
buildClient();
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2, new ThreadFactory() {
@Override
@ -79,119 +93,154 @@ public class GrpcClient extends RpcClient {
return t;
}
});
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
sendBeat();
}
}, 5000, 5000, TimeUnit.MILLISECONDS);
}, 5000, 10000, TimeUnit.MILLISECONDS);
rpcClientStatus = RpcClientStatus.RUNNING;
super.registerChangeListenHandler(new ChangeListenResponseHandler() {
@Override
public void responseReply(Response response) {
if (response instanceof ConnectResetResponse) {
try {
buildClient();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
}
/**
* Send Heart Beat Request.
*/
public void sendBeat() {
GrpcMetadata meta= GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(
NetUtils.localIP()).build();
HeartBeatRequest heartBeatRequest=new HeartBeatRequest();
GrpcRequest streamRequest = GrpcRequest.newBuilder()
.setMetadata(meta)
.setType(heartBeatRequest.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest))).build())
.build();
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.build();
HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(meta).setType(heartBeatRequest.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(heartBeatRequest)))
.build()).build();
GrpcResponse response = grpcServiceStub.request(streamRequest);
System.out.println("Send heart beat message,response :"+response);
}
private void buildClient() throws UnknownHostException {
String serverAddress =getServerListFactory().genNextServer();
String serverIp="";
int serverPort=0;
private void buildClient() throws NacosException {
String serverAddress = getServerListFactory().genNextServer();
String serverIp = "";
int serverPort = 0;
if (serverAddress.contains("http")) {
serverIp = serverAddress.split(":")[1].replaceAll("//","");
serverPort=Integer.valueOf(serverAddress.split(":")[2].replaceAll("//",""));
}else{
serverIp = serverAddress.split(":")[1].replaceAll("//", "");
serverPort = Integer.valueOf(serverAddress.split(":")[2].replaceAll("//", ""));
} else {
serverIp = serverAddress.split(":")[0];
serverPort=Integer.valueOf(serverAddress.split(":")[1]);
serverPort = Integer.valueOf(serverAddress.split(":")[1]);
}
//Loggers.info("[GRPC ]init config listen stream.......,server list:"+server );
this.channel = ManagedChannelBuilder.forAddress(serverIp, serverPort+1000)
.usePlaintext(true)
.build();
this.channel = ManagedChannelBuilder.forAddress(serverIp, serverPort + 1000).usePlaintext(true).build();
grpcStreamServiceStub = RequestStreamGrpc.newStub(channel);
grpcServiceStub = RequestGrpc.newBlockingStub(channel);
GrpcMetadata meta= GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(
NetUtils.localIP()).build();
GrpcRequest streamRequest = GrpcRequest.newBuilder()
.setMetadata(meta)
.build();
//LOGGER.info("[GRPC ]init config listen stream......." );
System.out.println("GrpcClient send stream....."+streamRequest);
grpcStreamServiceStub.requestStream(streamRequest, new NacosStreamObserver());
//relistenKeyIfNecessary();
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.build();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(meta).build();
grpcStreamServiceStub.requestStream(streamRequest, new StreamObserver<GrpcResponse>() {
@Override
public void onNext(GrpcResponse grpcResponse) {
String message = grpcResponse.getBody().getValue().toStringUtf8();
String type = grpcResponse.getType();
String bodyString = grpcResponse.getBody().getValue().toStringUtf8();
Class classByType = ResponseRegistry.getClassByType(type);
final Response response;
if (classByType != null) {
response = (Response) JacksonUtils.toObj(bodyString, classByType);
} else {
PlainBodyResponse myresponse = JacksonUtils.toObj(bodyString, PlainBodyResponse.class);
myresponse.setBodyString(bodyString);
response = myresponse;
}
changeListenReplyListeners.forEach(new Consumer<ChangeListenResponseHandler>() {
@Override
public void accept(ChangeListenResponseHandler changeListenResponseHandler) {
changeListenResponseHandler.responseReply(response);
}
});
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
}
});
}
private class NacosStreamObserver implements StreamObserver<GrpcResponse> {
@Override
public void onNext(GrpcResponse value) {
//LOGGER.info("[GRPC] receive config data: " + value.toString());
String message = value.getBody().getValue().toStringUtf8();
System.out.println("Receive Stream Response"+message);
//JSONObject json = JSON.parseObject(message.trim());
//LOGGER.info("[GRPC] receive config data: " + json);
//abstractStreamMessageHandler.onResponse(json);
public void onNext(GrpcResponse response) {
}
@Override
public void onError(Throwable t) {
//LOGGER.error("[GRPC] config error", t);
//rebuildClient();
}
@Override
public void onCompleted() {
//LOGGER.info("[GRPC] config connection closed.");
//rebuildClient();
}
}
@Override
public void switchServer() {
}
@Override
public <T extends Response> T request(Request request) {
GrpcMetadata meta= GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(
NetUtils.localIP()).build();
GrpcRequest streamRequest = GrpcRequest.newBuilder()
.setMetadata(meta)
.setType(request.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(request))))
.build();
GrpcResponse response =grpcServiceStub.request(streamRequest);
return null;
}
@Override
public <T extends Response> T listenChange(Request request) {
return null;
public Response request(Request request) {
GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.build();
GrpcRequest grpcrequest = GrpcRequest.newBuilder().setMetadata(meta).setType(request.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(request)))).build();
GrpcResponse response = grpcServiceStub.request(grpcrequest);
String type = response.getType();
String bodyString = response.getBody().getValue().toStringUtf8();
// transfrom grpcResponse to response model
Class classByType = ResponseRegistry.getClassByType(type);
if (classByType != null) {
Object object = JacksonUtils.toObj(bodyString, classByType);
return (Response) object;
} else {
PlainBodyResponse myresponse = JacksonUtils.toObj(bodyString, PlainBodyResponse.class);
myresponse.setBodyString(bodyString);
return (PlainBodyResponse) myresponse;
}
}
}

View File

@ -36,7 +36,7 @@ public class ConfigTest {
private static ConfigService configService;
@Before
public void before() throws Exception {
public void before() throws Exception {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:28848");
configService = NacosFactory.createConfigService(properties);
@ -47,27 +47,30 @@ public class ConfigTest {
configService.shutDown();
}
@Test
public void test() throws Exception {
public void test() throws Exception {
final String dataId = "lessspring";
final String group = "lessspring";
final String content = "lessspring-" + System.currentTimeMillis();
boolean result = configService.publishConfig(dataId, group, content);
Assert.assertTrue(result);
ThreadUtils.sleep(100000L);
String response = configService.getConfigAndSignListener(dataId, group, 5000, new AbstractListener() {
@Override
public void receiveConfigInfo(String configInfo) {
System.err.println(configInfo);
}
});
Assert.assertEquals(content, response);
ThreadUtils.sleep(200L);
ConfigListener1 listener1 = new ConfigListener1();
ConfigListener2 listener2 = new ConfigListener2();
configService.getConfigAndSignListener(dataId, group, 5000, listener1);
configService.getConfigAndSignListener(dataId, group, 5000, listener2);
configService.publishConfig(dataId, group, "testchange");
configService.getConfigAndSignListener("lessspring2", group, 5000, listener1);
configService.publishConfig("lessspring2", group, "lessspring2value");
Scanner scanner = new Scanner(System.in);
System.out.println("input content");
while (scanner.hasNextLine()) {
@ -81,3 +84,20 @@ public class ConfigTest {
}
}
class ConfigListener1 extends AbstractListener {
@Override
public void receiveConfigInfo(String configInfo) {
System.err.println("Listener1 invoked." + configInfo);
}
}
class ConfigListener2 extends AbstractListener {
@Override
public void receiveConfigInfo(String configInfo) {
System.err.println("Listener2 invoked." + configInfo);
}
}

View File

@ -2,6 +2,7 @@
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
*/
package com.alibaba.nacos.config.server.remote;
import java.util.List;
@ -16,23 +17,24 @@ import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.config.server.utils.GroupKey2;
import com.alibaba.nacos.core.remote.AsyncListenContext;
import com.alibaba.nacos.core.remote.NacosRemoteConstants;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* config change listen request handler.
* @author liuzunfei
* @version $Id: ConfigChangeListenRequestHandler.java, v 0.1 2020年07月14日 10:11 AM liuzunfei Exp $
*/
@Component
public class ConfigChangeListenRequestHandler extends RequestHandler {
@Autowired
AsyncListenContext asyncListenContext;
private static final String LISTEN_CONTEXT_TYPE="CONFIG";
@Override
public Request parseBodyString(String bodyString) {
return JacksonUtils.toObj(bodyString, ConfigChangeListenRequest.class);
@ -40,18 +42,18 @@ public class ConfigChangeListenRequestHandler extends RequestHandler {
@Override
public Response handle(Request request, RequestMeta requestMeta) throws NacosException {
ConfigChangeListenRequest configChangeListenRequest= (ConfigChangeListenRequest)request;
ConfigChangeListenRequest configChangeListenRequest = (ConfigChangeListenRequest) request;
String dataId = configChangeListenRequest.getDataId();
String group = configChangeListenRequest.getGroup();
String tenant = configChangeListenRequest.getTenant();
String configKey = GroupKey2.getKey(dataId, group, tenant);
String connectionId = requestMeta.getConnectionId();
if (configChangeListenRequest.isCancelListen()){
asyncListenContext.removeListen(LISTEN_CONTEXT_TYPE,configKey,connectionId);
}else{
asyncListenContext.addListen(LISTEN_CONTEXT_TYPE,configKey,connectionId);
if (configChangeListenRequest.isCancelListen()) {
asyncListenContext.removeListen(NacosRemoteConstants.LISTEN_CONTEXT_CONFIG, configKey, connectionId);
} else {
asyncListenContext.addListen(NacosRemoteConstants.LISTEN_CONTEXT_CONFIG, configKey, connectionId);
}
return new ConfigChangeListenResponse(200,"success");
return new ConfigChangeListenResponse(200, "success");
}
@Override

View File

@ -16,6 +16,7 @@
package com.alibaba.nacos.config.server.service;
import com.alibaba.nacos.api.config.remote.response.ConfigChangeNotifyResponse;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.config.server.model.SampleResult;
@ -27,30 +28,31 @@ import com.alibaba.nacos.config.server.utils.MD5Util;
import com.alibaba.nacos.config.server.utils.RequestUtil;
import com.alibaba.nacos.config.server.utils.event.EventDispatcher.AbstractEventListener;
import com.alibaba.nacos.config.server.utils.event.EventDispatcher.Event;
import com.alibaba.nacos.core.remote.DataChangeListenerNotifier;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Future;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.Arrays;
import static com.alibaba.nacos.config.server.utils.LogUtil.MEMORY_LOG;
import static com.alibaba.nacos.config.server.utils.LogUtil.PULL_LOG;
@ -71,6 +73,9 @@ public class LongPollingService extends AbstractEventListener {
private static final String TRUE_STR = "true";
@Autowired
private DataChangeListenerNotifier dataChangeListenerNotifier;
private Map<String, Long> retainIps = new ConcurrentHashMap<String, Long>();
private static boolean isFixedPolling() {
@ -126,8 +131,8 @@ public class LongPollingService extends AbstractEventListener {
}
/**
* Aggregate the sampling IP and monitoring configuration information in the sampling results.
* There is no problem for the merging strategy to cover the previous one with the latter.
* Aggregate the sampling IP and monitoring configuration information in the sampling results. There is no problem
* for the merging strategy to cover the previous one with the latter.
*
* @param sampleResults sample Results.
* @return Results.
@ -147,6 +152,7 @@ public class LongPollingService extends AbstractEventListener {
/**
* Collect application subscribe configinfos.
*
* @return configinfos results.
*/
public Map<String, Set<String>> collectApplicationSubscribeConfigInfos() {
@ -232,9 +238,9 @@ public class LongPollingService extends AbstractEventListener {
/**
* Add LongPollingClient.
*
* @param req HttpServletRequest.
* @param rsp HttpServletResponse.
* @param clientMd5Map clientMd5Map.
* @param req HttpServletRequest.
* @param rsp HttpServletResponse.
* @param clientMd5Map clientMd5Map.
* @param probeRequestSize probeRequestSize.
*/
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
@ -245,7 +251,7 @@ public class LongPollingService extends AbstractEventListener {
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
@ -358,6 +364,15 @@ public class LongPollingService extends AbstractEventListener {
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
String[] strings = GroupKey.parseKey(groupKey);
String dataid = strings[0];
String group = strings[1];
String tenant = strings.length > 2 ? strings[2] : "";
ConfigChangeNotifyResponse notifyResponse = ConfigChangeNotifyResponse
.buildSuccessResponse(dataid, group, tenant);
dataChangeListenerNotifier.configDataChanged(groupKey, notifyResponse);
} catch (Throwable t) {
LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
}
@ -403,7 +418,7 @@ public class LongPollingService extends AbstractEventListener {
public void run() {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
// Delete subsciber's relations.
allSubs.remove(ClientLongPolling.this);
@ -439,7 +454,7 @@ public class LongPollingService extends AbstractEventListener {
}
void sendResponse(List<String> changedGroups) {
// Cancel time out task.
if (null != asyncTimeoutFuture) {
asyncTimeoutFuture.cancel(false);
@ -449,7 +464,7 @@ public class LongPollingService extends AbstractEventListener {
void generateResponse(List<String> changedGroups) {
if (null == changedGroups) {
// Tell web container to send http response.
asyncContext.complete();
return;

View File

@ -1,7 +1,19 @@
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote;
import java.util.HashMap;
@ -12,62 +24,75 @@ import java.util.Set;
import org.springframework.stereotype.Service;
/**
* AsyncListenContext. conserve which clientId that insterest in which congif key.
*
* @author liuzunfei
* @version $Id: AsyncListenContext.java, v 0.1 2020年07月14日 10:13 AM liuzunfei Exp $
*/
@Service
public class AsyncListenContext {
private Map<String, Map<String,Set<String>>> listenContexts=new HashMap<String,Map<String,Set<String>>>() ;
private Map<String, Map<String, Set<String>>> listenContexts = new HashMap<String, Map<String, Set<String>>>();
/**
* add listen .
*
* @param requestType
* @param lisnteKey
* @param connectionId
* @param requestType requestType
* @param listenKey listenKey.
* @param connectionId connectionId.
*/
public void addListen(String requestType,String listenKey,String connectionId){
public void addListen(String requestType, String listenKey, String connectionId) {
Map<String, Set<String>> listenClients = listenContexts.get(requestType);
if (listenClients==null){
listenContexts.putIfAbsent(requestType,new HashMap<String,Set<String>>());
listenClients=listenContexts.get(requestType);
if (listenClients == null) {
listenContexts.putIfAbsent(requestType, new HashMap<String, Set<String>>());
listenClients = listenContexts.get(requestType);
}
Set<String> connectionIds = listenClients.get(listenKey);
if (connectionIds==null){
listenClients.putIfAbsent(listenKey,new HashSet<String>());
connectionIds=listenClients.get(listenKey);
if (connectionIds == null) {
listenClients.putIfAbsent(listenKey, new HashSet<String>());
connectionIds = listenClients.get(listenKey);
}
boolean addSuccess = connectionIds.add(connectionId);
if (addSuccess){
if (addSuccess) {
//TODO add log ...success to add listen
}
}
public void removeListen(String requestType,String lisnteKey,String connectionId){
/**
* remove listen context for connectionId ..
*
* @param requestType requestType
* @param lisnteKey lisnteKey
* @param connectionId connectionId
*/
public void removeListen(String requestType, String lisnteKey, String connectionId) {
Map<String, Set<String>> stringSetMap = listenContexts.get(requestType);
if (stringSetMap==null||stringSetMap.isEmpty()){
if (stringSetMap == null || stringSetMap.isEmpty()) {
return;
}
Set<String> connectionIds = stringSetMap.get(lisnteKey);
if (connectionIds==null){
if (connectionIds == null) {
return;
}
boolean remove = connectionIds.remove(connectionId);
if (remove){
if (remove) {
//TODO add log ...success to remove listen
}
}
public Set<String> getListeners(String requestType, String listenKey) {
if (listenContexts.containsKey(requestType)) {
Map<String, Set<String>> stringSetMap = listenContexts.get(requestType);
return stringSetMap.get(listenKey);
}
return null;
}
}

View File

@ -13,15 +13,62 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote;
import com.alibaba.nacos.api.remote.connection.Connection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* ConnectCoordinator.
*
* @author liuzunfei
* @version $Id: ConnectCoordinator.java, v 0.1 2020年07月14日 12:01 AM liuzunfei Exp $
*/
public class ConnectCoordinator implements ConnectionHeathyChecker{
@Service
public class ConnectCoordinator implements ConnectionHeathyChecker {
@Autowired
ConnectionManager connectionManager;
private ScheduledExecutorService executors = Executors.newScheduledThreadPool(1);
private static final long EXPIRE_MILLSECOND = 15000L;
/**
* Start TaskExpel the connection which active Time expire.
*/
@PostConstruct
public void start() {
// Start UnHeathy Conection Expel Task.
executors.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
long currentStamp = System.currentTimeMillis();
Collection<Connection> connections = connectionManager.connetions.values();
for (Connection conn : connections) {
try {
long lastActiveTimestamp = conn.getLastActiveTimestamp();
if (currentStamp - lastActiveTimestamp > EXPIRE_MILLSECOND) {
conn.closeGrapcefully();
connectionManager.unregister(conn.getConnectionId());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}, 500L, 5000L, TimeUnit.MILLISECONDS);
}
}

View File

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote;
import java.util.HashMap;
@ -23,31 +24,39 @@ import com.alibaba.nacos.api.remote.connection.Connection;
import org.springframework.stereotype.Service;
/**
* connect manager.
* @author liuzunfei
* @version $Id: ConnectionManager.java, v 0.1 2020年07月13日 7:07 PM liuzunfei Exp $
*/
@Service
public class ConnectionManager {
private Map<String, Connection> connetions=new HashMap<String,Connection>();
public void register(String clientId,Connection connection){
connetions.putIfAbsent(clientId,connection);
System.out.println("connetions updated, connetions:"+ connetions);
Map<String, Connection> connetions = new HashMap<String, Connection>();
public void register(String connectionId, Connection connection) {
connetions.putIfAbsent(connectionId, connection);
System.out.println("connetions updated, connetions:" + connetions);
}
public void unregister(String clientId){
this.connetions.remove(clientId);
public void unregister(String connectionId) {
this.connetions.remove(connectionId);
}
public void refreshActiveTime(String connnectionId){
System.out.println("connetions activetime update , connnectionId:"+ connnectionId);
public Connection getConnection(String connectionId) {
return connetions.get(connectionId);
}
/**
* regresh connection active time.
*
* @param connnectionId connnectionId.
*/
public void refreshActiveTime(String connnectionId) {
System.out.println("connetions activetime update , connnectionId:" + connnectionId);
Connection connection = connetions.get(connnectionId);
if (connection!=null){
if (connection != null) {
connection.freshActiveTime();
}
}
}

View File

@ -1,32 +0,0 @@
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
*/
package com.alibaba.nacos.core.remote;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
/**
* @author liuzunfei
* @version $Id: DataChangeLisenerNotifier.java, v 0.1 2020年07月14日 10:44 AM liuzunfei Exp $
*/
@Service
public class DataChangeLisenerNotifier {
/**
*
*/
ConnectionManager connectionManager;
/**
*
*/
AsyncListenContext asyncListenContext;
}

View File

@ -0,0 +1,71 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote;
import com.alibaba.nacos.api.remote.connection.Connection;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.common.utils.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Set;
/**
* DataChangeListenerNotifier.
*
* @author liuzunfei
* @version $Id: DataChangeLisenerNotifier.java, v 0.1 2020年07月14日 10:44 AM liuzunfei Exp $
*/
@Service
public class DataChangeListenerNotifier {
/**
* connect manager.
*/
@Autowired
ConnectionManager connectionManager;
/**
* asyncListenContext.
*/
@Autowired
AsyncListenContext asyncListenContext;
/**
* adaptor to config module ,when server side congif change ,invoke this method.
*
* @param groupKey groupKey
* @param notifyResponse notifyResponse
*/
public void configDataChanged(String groupKey, Response notifyResponse) {
Set<String> listeners = asyncListenContext.getListeners(NacosRemoteConstants.LISTEN_CONTEXT_CONFIG, groupKey);
if (!CollectionUtils.isEmpty(listeners)) {
for (String connectionId : listeners) {
Connection connection = connectionManager.getConnection(connectionId);
if (connection != null) {
connection.sendResponse(notifyResponse);
}
}
}
}
public void serviceIndoChanged(String serviceKey, Response notifyResponse) {
//TODO
}
}

View File

@ -1,6 +1,17 @@
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote;

View File

@ -1,5 +1,5 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -13,18 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.api.remote.request;
package com.alibaba.nacos.core.remote;
/**
* NacosRemoteConstants.
*
* @author liuzunfei
* @version $Id: ChangeListenRequest.java, v 0.1 2020年07月13日 8:45 PM liuzunfei Exp $
* @version $Id: NacosRemoteConstants.java, v 0.1 2020年07月14日 9:22 PM liuzunfei Exp $
*/
public abstract class ChangeListenRequest extends Request {
@Override
public String getModel() {
return RequestMode.CHANGE_LISTEN.mode;
}
public class NacosRemoteConstants {
public static final String LISTEN_CONTEXT_CONFIG = "CONFIG";
public static final String LISTEN_CONTEXT_NAMING = "NAMING";
}

View File

@ -1,6 +1,17 @@
/**
* Alipay.com Inc.
* Copyright (c) 2004-2020 All Rights Reserved.
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote;

View File

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.grpc;
import com.alibaba.nacos.api.grpc.GrpcMetadata;
@ -35,44 +36,44 @@ import org.springframework.stereotype.Service;
* @version $Id: GrpcRequestHandlerReactor.java, v 0.1 2020年07月13日 4:25 PM liuzunfei Exp $
*/
@Service
public class GrpcRequestHandlerReactor extends RequestGrpc.RequestImplBase {
public class GrpcRequestHandlerReactor extends RequestGrpc.RequestImplBase {
@Autowired
RequestHandlerRegistry requestHandlerRegistry;
@Override
public void request(GrpcRequest grpcRequest, StreamObserver<GrpcResponse> responseObserver) {
Loggers.GRPC.debug(" gRpc Server receive request :"+grpcRequest);
String type =grpcRequest.getType();
Loggers.GRPC.debug(" gRpc Server receive request :" + grpcRequest);
String type = grpcRequest.getType();
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
if (requestHandler!=null){
String bodyString=grpcRequest.getBody().getValue().toStringUtf8();
if (requestHandler != null) {
String bodyString = grpcRequest.getBody().getValue().toStringUtf8();
Request request = requestHandler.parseBodyString(bodyString);
try {
Response response = requestHandler.handle(request,convertMeta(grpcRequest.getMetadata()));
Response response = requestHandler.handle(request, convertMeta(grpcRequest.getMetadata()));
responseObserver.onNext(GrpcUtils.convert(response));
responseObserver.onCompleted();
} catch (Exception e) {
Loggers.GRPC.error(" gRpc Server handle request exception :"+e.getMessage(),e);
Loggers.GRPC.error(" gRpc Server handle request exception :" + e.getMessage(), e);
responseObserver.onNext(GrpcUtils.buildFailResponse("Error"));
responseObserver.onCompleted();
}
}else{
} else {
Loggers.GRPC.error(" gRpc Server requestHandler Not found ");
responseObserver.onNext(GrpcUtils.buildFailResponse("RequestHandler Not Found"));
responseObserver.onCompleted();
}
}
private RequestMeta convertMeta(GrpcMetadata metadata){
RequestMeta requestMeta=new RequestMeta();
private RequestMeta convertMeta(GrpcMetadata metadata) {
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(metadata.getClientIp());
requestMeta.setConnectionId(metadata.getConnectionId());
return requestMeta;
}
}

View File

@ -13,87 +13,79 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.grpc;
import java.util.Properties;
import javax.annotation.PostConstruct;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.RequestHandlerRegistry;
import com.alibaba.nacos.core.remote.RpcServer;
import com.alibaba.nacos.core.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.Loggers;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import org.apache.commons.lang3.math.NumberUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
/**
* Grpc implementation as a rpc server.
*
* @author liuzunfei
* @version $Id: GrpcServer.java, v 0.1 2020年07月13日 3:42 PM liuzunfei Exp $
*/
@Service
public class GrpcServer extends RpcServer {
public class GrpcServer extends RpcServer {
private Server server;
@Autowired
private ConnectionManager connectionManager;
@Autowired
private GrpcStreamRequestHanderImpl streamRequestHander;
@Autowired
private GrpcRequestHandlerReactor requestHander;
@Autowired
private RequestHandlerRegistry requestHandlerRegistry;
int grpcServerPort=ApplicationUtils.getPort()+1000;;
private void init() {
int grpcServerPort = ApplicationUtils.getPort() + 1000;
private void init() {
Loggers.GRPC.info("Nacos gRPC server initiazing Component ...");
Loggers.GRPC.info("Nacos gRPC server port :"+grpcServerPort);
Loggers.GRPC.info("Connection Manager inited :"+connectionManager);
Loggers.GRPC.info("Stream request handler inited :"+streamRequestHander);
Loggers.GRPC.info("Common request handler inited :"+requestHander);
Loggers.GRPC.info("Request handler Registry inited :"+requestHandlerRegistry);
Loggers.GRPC.info("Nacos gRPC server port :" + grpcServerPort);
Loggers.GRPC.info("Connection Manager inited :" + connectionManager);
Loggers.GRPC.info("Stream request handler inited :" + streamRequestHander);
Loggers.GRPC.info("Common request handler inited :" + requestHander);
Loggers.GRPC.info("Request handler Registry inited :" + requestHandlerRegistry);
}
@PostConstruct
@Override
public void start() throws Exception {
init();
server = ServerBuilder.forPort(grpcServerPort)
.addService(streamRequestHander)
.addService(requestHander)
.build();
server = ServerBuilder.forPort(grpcServerPort).addService(streamRequestHander).addService(requestHander)
.build();
server.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
Loggers.GRPC.info("Stopping Nacos gRPC server...");
GrpcServer.this.stop();
Loggers.GRPC.info("Nacos gRPC server stopped...");
}
});
}
@Override
public void stop() {
if (server != null) {
server.shutdown();
}
}
}

View File

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.core.remote.grpc;
import java.io.UnsupportedEncodingException;
@ -26,20 +27,23 @@ import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
/**
* GrpcUtils.
*
* @author liuzunfei
* @version $Id: GrpcUtils.java, v 0.1 2020年07月14日 12:15 AM liuzunfei Exp $
*/
public class GrpcUtils {
/**
* convert Response to GrpcResponse
* @param response
* convert Response to GrpcResponse.
*
* @param response response.
* @return
*/
public static GrpcResponse convert(Response response) {
String jsonString= JacksonUtils.toJson(response);
byte[] bytes= null;
public static GrpcResponse convert(Response response) {
String jsonString = JacksonUtils.toJson(response);
byte[] bytes = null;
try {
bytes = jsonString.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
@ -47,19 +51,20 @@ public class GrpcUtils {
return null;
}
GrpcMetadata metadata = GrpcMetadata.newBuilder().build();
GrpcResponse grpcResponse = GrpcResponse.newBuilder().setBody(Any.newBuilder().setValue(ByteString.copyFrom(bytes))).
build();
GrpcResponse grpcResponse = GrpcResponse.newBuilder().setType(response.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFrom(bytes))).build();
return grpcResponse;
}
/**
* buildFailResponse
* @param msg
* build fail response.
*
* @param msg errorMsg
* @return
*/
public static GrpcResponse buildFailResponse(String msg) {
byte[] bytes= new byte[0];
public static GrpcResponse buildFailResponse(String msg) {
byte[] bytes = new byte[0];
try {
bytes = msg.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
@ -67,12 +72,9 @@ public class GrpcUtils {
return null;
}
GrpcMetadata metadata = GrpcMetadata.newBuilder().build();
GrpcResponse grpcResponse = GrpcResponse.newBuilder().setBody(Any.newBuilder().setValue(ByteString.copyFrom(bytes))).
build();
GrpcResponse grpcResponse = GrpcResponse.newBuilder()
.setBody(Any.newBuilder().setValue(ByteString.copyFrom(bytes))).build();
return grpcResponse;
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.nacos.naming.remote;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.remote.NamingRequestTypeConstants;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.core.remote.AsyncListenContext;
import com.alibaba.nacos.core.remote.RequestHandler;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* handler to handle service instance change listen request.
*
* @author liuzunfei
* @version $Id: ServiceInstanceChangeListenRequestHandler.java, v 0.1 2020年07月14日 7:55 PM liuzunfei Exp $
*/
@Component
public class ServiceInstanceChangeListenRequestHandler extends RequestHandler {
@Autowired
AsyncListenContext asyncListenContext;
private static final String LISTEN_CONTEXT_TYPE = "CONFIG";
@Override
public Request parseBodyString(String bodyString) {
return null;
}
@Override
public Response handle(Request request, RequestMeta meta) throws NacosException {
return null;
}
@Override
public List<String> getRequestTypes() {
return Lists.newArrayList(NamingRequestTypeConstants.SERVICE_INSTANCE_CHANGE);
}
}