Merge remote-tracking branch 'upstream/feature_support_grpc_core' into develop-merge-to-2.0

This commit is contained in:
KomachiSion 2021-03-18 13:38:35 +08:00
commit bd4b63d834
13 changed files with 211 additions and 116 deletions

View File

@ -32,7 +32,7 @@
<build> <build>
<plugins> <plugins>
<!-- reuse when you need to update grpc model --> <!-- reuse when you need to update grpc model -->
<!-- <plugin> <!--<plugin>
<groupId>org.xolstice.maven.plugins</groupId> <groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId> <artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version> <version>0.5.0</version>

View File

@ -30,6 +30,7 @@ private static final long serialVersionUID = 0L;
} }
private Metadata() { private Metadata() {
type_ = ""; type_ = "";
clientIp_ = "";
} }
@Override @Override
@ -82,6 +83,12 @@ private static final long serialVersionUID = 0L;
headers__.getKey(), headers__.getValue()); headers__.getKey(), headers__.getValue());
break; break;
} }
case 66: {
String s = input.readStringRequireUtf8();
clientIp_ = s;
break;
}
default: { default: {
if (!parseUnknownField( if (!parseUnknownField(
input, unknownFields, extensionRegistry, tag)) { input, unknownFields, extensionRegistry, tag)) {
@ -160,6 +167,40 @@ private static final long serialVersionUID = 0L;
} }
} }
public static final int CLIENTIP_FIELD_NUMBER = 8;
private volatile Object clientIp_;
/**
* <code>string clientIp = 8;</code>
*/
public String getClientIp() {
Object ref = clientIp_;
if (ref instanceof String) {
return (String) ref;
} else {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
String s = bs.toStringUtf8();
clientIp_ = s;
return s;
}
}
/**
* <code>string clientIp = 8;</code>
*/
public com.google.protobuf.ByteString
getClientIpBytes() {
Object ref = clientIp_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(String) ref);
clientIp_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
public static final int HEADERS_FIELD_NUMBER = 7; public static final int HEADERS_FIELD_NUMBER = 7;
private static final class HeadersDefaultEntryHolder { private static final class HeadersDefaultEntryHolder {
static final com.google.protobuf.MapEntry< static final com.google.protobuf.MapEntry<
@ -259,6 +300,9 @@ private static final long serialVersionUID = 0L;
internalGetHeaders(), internalGetHeaders(),
HeadersDefaultEntryHolder.defaultEntry, HeadersDefaultEntryHolder.defaultEntry,
7); 7);
if (!getClientIpBytes().isEmpty()) {
com.google.protobuf.GeneratedMessageV3.writeString(output, 8, clientIp_);
}
unknownFields.writeTo(output); unknownFields.writeTo(output);
} }
@ -281,6 +325,9 @@ private static final long serialVersionUID = 0L;
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeMessageSize(7, headers__); .computeMessageSize(7, headers__);
} }
if (!getClientIpBytes().isEmpty()) {
size += com.google.protobuf.GeneratedMessageV3.computeStringSize(8, clientIp_);
}
size += unknownFields.getSerializedSize(); size += unknownFields.getSerializedSize();
memoizedSize = size; memoizedSize = size;
return size; return size;
@ -298,6 +345,8 @@ private static final long serialVersionUID = 0L;
if (!getType() if (!getType()
.equals(other.getType())) return false; .equals(other.getType())) return false;
if (!getClientIp()
.equals(other.getClientIp())) return false;
if (!internalGetHeaders().equals( if (!internalGetHeaders().equals(
other.internalGetHeaders())) return false; other.internalGetHeaders())) return false;
if (!unknownFields.equals(other.unknownFields)) return false; if (!unknownFields.equals(other.unknownFields)) return false;
@ -313,6 +362,8 @@ private static final long serialVersionUID = 0L;
hash = (19 * hash) + getDescriptor().hashCode(); hash = (19 * hash) + getDescriptor().hashCode();
hash = (37 * hash) + TYPE_FIELD_NUMBER; hash = (37 * hash) + TYPE_FIELD_NUMBER;
hash = (53 * hash) + getType().hashCode(); hash = (53 * hash) + getType().hashCode();
hash = (37 * hash) + CLIENTIP_FIELD_NUMBER;
hash = (53 * hash) + getClientIp().hashCode();
if (!internalGetHeaders().getMap().isEmpty()) { if (!internalGetHeaders().getMap().isEmpty()) {
hash = (37 * hash) + HEADERS_FIELD_NUMBER; hash = (37 * hash) + HEADERS_FIELD_NUMBER;
hash = (53 * hash) + internalGetHeaders().hashCode(); hash = (53 * hash) + internalGetHeaders().hashCode();
@ -474,6 +525,8 @@ private static final long serialVersionUID = 0L;
super.clear(); super.clear();
type_ = ""; type_ = "";
clientIp_ = "";
internalGetMutableHeaders().clear(); internalGetMutableHeaders().clear();
return this; return this;
} }
@ -503,6 +556,7 @@ private static final long serialVersionUID = 0L;
Metadata result = new Metadata(this); Metadata result = new Metadata(this);
int from_bitField0_ = bitField0_; int from_bitField0_ = bitField0_;
result.type_ = type_; result.type_ = type_;
result.clientIp_ = clientIp_;
result.headers_ = internalGetHeaders(); result.headers_ = internalGetHeaders();
result.headers_.makeImmutable(); result.headers_.makeImmutable();
onBuilt(); onBuilt();
@ -557,6 +611,10 @@ private static final long serialVersionUID = 0L;
type_ = other.type_; type_ = other.type_;
onChanged(); onChanged();
} }
if (!other.getClientIp().isEmpty()) {
clientIp_ = other.clientIp_;
onChanged();
}
internalGetMutableHeaders().mergeFrom( internalGetMutableHeaders().mergeFrom(
other.internalGetHeaders()); other.internalGetHeaders());
this.mergeUnknownFields(other.unknownFields); this.mergeUnknownFields(other.unknownFields);
@ -658,6 +716,75 @@ private static final long serialVersionUID = 0L;
return this; return this;
} }
private Object clientIp_ = "";
/**
* <code>string clientIp = 8;</code>
*/
public String getClientIp() {
Object ref = clientIp_;
if (!(ref instanceof String)) {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
String s = bs.toStringUtf8();
clientIp_ = s;
return s;
} else {
return (String) ref;
}
}
/**
* <code>string clientIp = 8;</code>
*/
public com.google.protobuf.ByteString
getClientIpBytes() {
Object ref = clientIp_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(String) ref);
clientIp_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
/**
* <code>string clientIp = 8;</code>
*/
public Builder setClientIp(
String value) {
if (value == null) {
throw new NullPointerException();
}
clientIp_ = value;
onChanged();
return this;
}
/**
* <code>string clientIp = 8;</code>
*/
public Builder clearClientIp() {
clientIp_ = getDefaultInstance().getClientIp();
onChanged();
return this;
}
/**
* <code>string clientIp = 8;</code>
*/
public Builder setClientIpBytes(
com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
checkByteStringIsUtf8(value);
clientIp_ = value;
onChanged();
return this;
}
private com.google.protobuf.MapField< private com.google.protobuf.MapField<
String, String> headers_; String, String> headers_;
private com.google.protobuf.MapField<String, String> private com.google.protobuf.MapField<String, String>

View File

@ -30,6 +30,16 @@ public interface MetadataOrBuilder extends
com.google.protobuf.ByteString com.google.protobuf.ByteString
getTypeBytes(); getTypeBytes();
/**
* <code>string clientIp = 8;</code>
*/
String getClientIp();
/**
* <code>string clientIp = 8;</code>
*/
com.google.protobuf.ByteString
getClientIpBytes();
/** /**
* <code>map&lt;string, string&gt; headers = 7;</code> * <code>map&lt;string, string&gt; headers = 7;</code>
*/ */

View File

@ -53,17 +53,17 @@ public final class NacosGrpcService {
String[] descriptorData = { String[] descriptorData = {
"\n\030nacos_grpc_service.proto\032\031google/proto" + "\n\030nacos_grpc_service.proto\032\031google/proto" +
"buf/any.proto\032\037google/protobuf/timestamp" + "buf/any.proto\032\037google/protobuf/timestamp" +
".proto\"q\n\010Metadata\022\014\n\004type\030\003 \001(\t\022\'\n\007head" + ".proto\"\203\001\n\010Metadata\022\014\n\004type\030\003 \001(\t\022\020\n\010cli" +
"ers\030\007 \003(\0132\026.Metadata.HeadersEntry\032.\n\014Hea" + "entIp\030\010 \001(\t\022\'\n\007headers\030\007 \003(\0132\026.Metadata." +
"dersEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\t:\0028" + "HeadersEntry\032.\n\014HeadersEntry\022\013\n\003key\030\001 \001(" +
"\001\"J\n\007Payload\022\033\n\010metadata\030\002 \001(\0132\t.Metadat" + "\t\022\r\n\005value\030\002 \001(\t:\0028\001\"J\n\007Payload\022\033\n\010metad" +
"a\022\"\n\004body\030\003 \001(\0132\024.google.protobuf.Any28\n" + "ata\030\002 \001(\0132\t.Metadata\022\"\n\004body\030\003 \001(\0132\024.goo" +
"\rRequestStream\022\'\n\rrequestStream\022\010.Payloa" + "gle.protobuf.Any28\n\rRequestStream\022\'\n\rreq" +
"d\032\010.Payload\"\0000\0012*\n\007Request\022\037\n\007request\022\010." + "uestStream\022\010.Payload\032\010.Payload\"\0000\0012*\n\007Re" +
"Payload\032\010.Payload\"\0002>\n\017BiRequestStream\022+" + "quest\022\037\n\007request\022\010.Payload\032\010.Payload\"\0002>" +
"\n\017requestBiStream\022\010.Payload\032\010.Payload\"\000(" + "\n\017BiRequestStream\022+\n\017requestBiStream\022\010.P" +
"\0010\001B#\n\037com.alibaba.nacos.api.grpc.autoP\001" + "ayload\032\010.Payload\"\000(\0010\001B#\n\037com.alibaba.na" +
"b\006proto3" "cos.api.grpc.autoP\001b\006proto3"
}; };
descriptor = com.google.protobuf.Descriptors.FileDescriptor descriptor = com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData, .internalBuildGeneratedFileFrom(descriptorData,
@ -76,7 +76,7 @@ public final class NacosGrpcService {
internal_static_Metadata_fieldAccessorTable = new internal_static_Metadata_fieldAccessorTable = new
com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( com.google.protobuf.GeneratedMessageV3.FieldAccessorTable(
internal_static_Metadata_descriptor, internal_static_Metadata_descriptor,
new String[] { "Type", "Headers", }); new String[] { "Type", "ClientIp", "Headers", });
internal_static_Metadata_HeadersEntry_descriptor = internal_static_Metadata_HeadersEntry_descriptor =
internal_static_Metadata_descriptor.getNestedTypes().get(0); internal_static_Metadata_descriptor.getNestedTypes().get(0);
internal_static_Metadata_HeadersEntry_fieldAccessorTable = new internal_static_Metadata_HeadersEntry_fieldAccessorTable = new

View File

@ -31,30 +31,10 @@ public class RequestMeta {
private String clientIp = ""; private String clientIp = "";
private int clientPort;
private String clientVersion = ""; private String clientVersion = "";
private Map<String, String> labels = new HashMap<String, String>(); private Map<String, String> labels = new HashMap<String, String>();
/**
* Getter method for property <tt>clientPort</tt>.
*
* @return property value of clientPort
*/
public int getClientPort() {
return clientPort;
}
/**
* Setter method for property <tt>clientPort</tt>.
*
* @param clientPort value to be assigned to property clientPort
*/
public void setClientPort(int clientPort) {
this.clientPort = clientPort;
}
/** /**
* Getter method for property <tt>clientVersion</tt>. * Getter method for property <tt>clientVersion</tt>.
* *

View File

@ -24,30 +24,31 @@ option java_multiple_files = true;
option java_package = "com.alibaba.nacos.api.grpc.auto"; option java_package = "com.alibaba.nacos.api.grpc.auto";
message Metadata { message Metadata {
string type = 3; string type = 3;
map<string, string> headers = 7; string clientIp = 8;
map<string, string> headers = 7;
} }
message Payload { message Payload {
Metadata metadata = 2; Metadata metadata = 2;
google.protobuf.Any body = 3; google.protobuf.Any body = 3;
} }
service RequestStream { service RequestStream {
// build a streamRequest // build a streamRequest
rpc requestStream (Payload) returns (stream Payload) { rpc requestStream (Payload) returns (stream Payload) {
} }
} }
service Request { service Request {
// Sends a commonRequest // Sends a commonRequest
rpc request (Payload) returns (Payload) { rpc request (Payload) returns (Payload) {
} }
} }
service BiRequestStream { service BiRequestStream {
// Sends a commonRequest // Sends a commonRequest
rpc requestBiStream (stream Payload) returns (stream Payload) { rpc requestBiStream (stream Payload) returns (stream Payload) {
} }
} }

View File

@ -806,9 +806,10 @@ public abstract class RpcClient implements Closeable {
for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) { for (ServerRequestHandler serverRequestHandler : serverRequestHandlers) {
try { try {
Response response = serverRequestHandler.requestReply(request); Response response = serverRequestHandler.requestReply(request);
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]ack server push request,request={},requestId={}", name,
request.getClass().getSimpleName(), request.getRequestId());
if (response != null) { if (response != null) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]ack server push request,request={},requestId={}", name,
request.getClass().getSimpleName(), request.getRequestId());
return response; return response;
} }
} catch (Exception e) { } catch (Exception e) {

View File

@ -272,13 +272,15 @@ public abstract class GrpcClient extends RpcClient {
grpcConn.setPayloadStreamObserver(payloadStreamObserver); grpcConn.setPayloadStreamObserver(payloadStreamObserver);
grpcConn.setGrpcFutureServiceStub(newChannelStubTemp); grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel()); grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());
//send a connection setup request. //send a setup request.
ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest(); ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion()); conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
conSetupRequest.setLabels(super.getLabels()); conSetupRequest.setLabels(super.getLabels());
conSetupRequest.setAbilities(super.clientAbilities); conSetupRequest.setAbilities(super.clientAbilities);
conSetupRequest.setTenant(super.getTenant()); conSetupRequest.setTenant(super.getTenant());
grpcConn.sendRequest(conSetupRequest); grpcConn.sendRequest(conSetupRequest);
//wait to register connection setup
Thread.sleep(100L);
return grpcConn; return grpcConn;
} }
return null; return null;

View File

@ -26,6 +26,7 @@ import com.alibaba.nacos.api.remote.PayloadRegistry;
import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.utils.NetUtils;
import com.alibaba.nacos.common.remote.exception.RemoteException; import com.alibaba.nacos.common.remote.exception.RemoteException;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
@ -98,6 +99,7 @@ public class GrpcUtils {
if (meta != null) { if (meta != null) {
metaBuilder.putAllHeaders(request.getHeaders()).setType(request.getClass().getSimpleName()); metaBuilder.putAllHeaders(request.getHeaders()).setType(request.getClass().getSimpleName());
} }
metaBuilder.setClientIp(NetUtils.localIP());
payloadBuilder.setMetadata(metaBuilder.build()); payloadBuilder.setMetadata(metaBuilder.build());
// request body . // request body .
@ -119,7 +121,7 @@ public class GrpcUtils {
public static Payload convert(Request request) { public static Payload convert(Request request) {
Metadata newMeta = Metadata.newBuilder().setType(request.getClass().getSimpleName()) Metadata newMeta = Metadata.newBuilder().setType(request.getClass().getSimpleName())
.putAllHeaders(request.getHeaders()).build(); .setClientIp(NetUtils.localIP()).putAllHeaders(request.getHeaders()).build();
request.clearHeaders(); request.clearHeaders();
String jsonString = toJson(request); String jsonString = toJson(request);

View File

@ -44,9 +44,14 @@ public class ConnectionMeta {
String clientIp; String clientIp;
/** /**
* Client IP Port. * Remote IP Address.
*/ */
int clientPort; String remoteIp;
/**
* Remote IP Port.
*/
int remotePort;
/** /**
* Local Ip Port. * Local Ip Port.
@ -93,14 +98,15 @@ public class ConnectionMeta {
return labels.get(VIPSERVER_TAG); return labels.get(VIPSERVER_TAG);
} }
public ConnectionMeta(String connectionId, String clientIp, int clientPort, int localPort, String connectType, public ConnectionMeta(String connectionId, String clientIp, String remoteIp, int remotePort, int localPort,
String version, String appName, Map<String, String> labels) { String connectType, String version, String appName, Map<String, String> labels) {
this.connectionId = connectionId; this.connectionId = connectionId;
this.clientIp = clientIp; this.clientIp = clientIp;
this.connectType = connectType; this.connectType = connectType;
this.version = version; this.version = version;
this.appName = appName; this.appName = appName;
this.clientPort = clientPort; this.remoteIp = remoteIp;
this.remotePort = remotePort;
this.localPort = localPort; this.localPort = localPort;
this.createTime = new Date(); this.createTime = new Date();
this.lastActiveTime = System.currentTimeMillis(); this.lastActiveTime = System.currentTimeMillis();
@ -136,24 +142,6 @@ public class ConnectionMeta {
return labels; return labels;
} }
/**
* Getter method for property <tt>clientPort</tt>.
*
* @return property value of clientPort
*/
public int getClientPort() {
return clientPort;
}
/**
* Setter method for property <tt>clientPort</tt>.
*
* @param clientPort value to be assigned to property clientPort
*/
public void setClientPort(int clientPort) {
this.clientPort = clientPort;
}
/** /**
* Setter method for property <tt>labels</tt>. * Setter method for property <tt>labels</tt>.
* *

View File

@ -81,21 +81,6 @@ public abstract class BaseGrpcServer extends BaseRpcServer {
return ConnectionType.GRPC; return ConnectionType.GRPC;
} }
static final Metadata.Key<String> X_REAL_IP = Metadata.Key.of("X-Real-IP", Metadata.ASCII_STRING_MARSHALLER);
static final Metadata.Key<String> X_FORWARDED_FOR = Metadata.Key
.of("X-Forwarded-For", Metadata.ASCII_STRING_MARSHALLER);
private static final String X_FORWARDED_FOR_SPLIT_SYMBOL = ",";
public static String getRemoteIp(Metadata headers) {
String xForwardedFor = headers.get(X_FORWARDED_FOR);
if (!org.apache.commons.lang3.StringUtils.isBlank(xForwardedFor)) {
return xForwardedFor.split(X_FORWARDED_FOR_SPLIT_SYMBOL)[0].trim();
}
return headers.get(X_REAL_IP);
}
@Override @Override
public void startServer() throws Exception { public void startServer() throws Exception {
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry(); final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
@ -105,12 +90,10 @@ public abstract class BaseGrpcServer extends BaseRpcServer {
@Override @Override
public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers, public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
ServerCallHandler<T, S> next) { ServerCallHandler<T, S> next) {
String remoteIp = getRemoteIp(headers);
Context ctx = Context.current() Context ctx = Context.current()
.withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID)) .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
.withValue(CONTEXT_KEY_CONN_CLIENT_IP, StringUtils.isNotBlank(remoteIp) ? remoteIp .withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
: call.getAttributes().get(TRANS_KEY_CLIENT_IP)) .withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
.withValue(CONTEXT_KEY_CONN_CLIENT_PORT, call.getAttributes().get(TRANS_KEY_CLIENT_PORT))
.withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT)); .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) { if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
Channel internalChannel = getInternalChannel(call); Channel internalChannel = getInternalChannel(call);
@ -138,7 +121,7 @@ public abstract class BaseGrpcServer extends BaseRpcServer {
String remoteIp = remoteAddress.getAddress().getHostAddress(); String remoteIp = remoteAddress.getAddress().getHostAddress();
Attributes attrWrapper = transportAttrs.toBuilder() Attributes attrWrapper = transportAttrs.toBuilder()
.set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort) .set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
.set(TRANS_KEY_CLIENT_IP, remoteIp).set(TRANS_KEY_CLIENT_PORT, remotePort) .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
.set(TRANS_KEY_LOCAL_PORT, localPort).build(); .set(TRANS_KEY_LOCAL_PORT, localPort).build();
String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID); String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId); Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);
@ -161,6 +144,7 @@ public abstract class BaseGrpcServer extends BaseRpcServer {
} }
} }
}).build(); }).build();
server.start(); server.start();
} }
@ -225,17 +209,17 @@ public abstract class BaseGrpcServer extends BaseRpcServer {
static final Attributes.Key<String> TRANS_KEY_CONN_ID = Attributes.Key.create("conn_id"); static final Attributes.Key<String> TRANS_KEY_CONN_ID = Attributes.Key.create("conn_id");
static final Attributes.Key<String> TRANS_KEY_CLIENT_IP = Attributes.Key.create("client_ip"); static final Attributes.Key<String> TRANS_KEY_REMOTE_IP = Attributes.Key.create("remote_ip");
static final Attributes.Key<Integer> TRANS_KEY_CLIENT_PORT = Attributes.Key.create("client_port"); static final Attributes.Key<Integer> TRANS_KEY_REMOTE_PORT = Attributes.Key.create("remote_port");
static final Attributes.Key<Integer> TRANS_KEY_LOCAL_PORT = Attributes.Key.create("local_port"); static final Attributes.Key<Integer> TRANS_KEY_LOCAL_PORT = Attributes.Key.create("local_port");
static final Context.Key<String> CONTEXT_KEY_CONN_ID = Context.key("conn_id"); static final Context.Key<String> CONTEXT_KEY_CONN_ID = Context.key("conn_id");
static final Context.Key<String> CONTEXT_KEY_CONN_CLIENT_IP = Context.key("client_ip"); static final Context.Key<String> CONTEXT_KEY_CONN_REMOTE_IP = Context.key("remote_ip");
static final Context.Key<Integer> CONTEXT_KEY_CONN_CLIENT_PORT = Context.key("client_port"); static final Context.Key<Integer> CONTEXT_KEY_CONN_REMOTE_PORT = Context.key("remote_port");
static final Context.Key<Integer> CONTEXT_KEY_CONN_LOCAL_PORT = Context.key("local_port"); static final Context.Key<Integer> CONTEXT_KEY_CONN_LOCAL_PORT = Context.key("local_port");

View File

@ -38,10 +38,10 @@ import org.springframework.stereotype.Service;
import java.util.Map; import java.util.Map;
import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CHANNEL; import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CHANNEL;
import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN_CLIENT_IP;
import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN_CLIENT_PORT;
import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN_ID; import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN_ID;
import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN_LOCAL_PORT; import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN_LOCAL_PORT;
import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN_REMOTE_IP;
import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN_REMOTE_PORT;
/** /**
* grpc bi stream request . * grpc bi stream request .
@ -56,7 +56,7 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
ConnectionManager connectionManager; ConnectionManager connectionManager;
private void traceDetailIfNecessary(Payload grpcRequest) { private void traceDetailIfNecessary(Payload grpcRequest) {
String clientIp = CONTEXT_KEY_CONN_CLIENT_IP.get(); String clientIp = grpcRequest.getMetadata().getClientIp();
String connectionId = CONTEXT_KEY_CONN_ID.get(); String connectionId = CONTEXT_KEY_CONN_ID.get();
try { try {
if (connectionManager.traced(clientIp)) { if (connectionManager.traced(clientIp)) {
@ -80,13 +80,16 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
final Integer localPort = CONTEXT_KEY_CONN_LOCAL_PORT.get(); final Integer localPort = CONTEXT_KEY_CONN_LOCAL_PORT.get();
final int clientPort = CONTEXT_KEY_CONN_CLIENT_PORT.get(); final int remotePort = CONTEXT_KEY_CONN_REMOTE_PORT.get();
final String clientIp = CONTEXT_KEY_CONN_CLIENT_IP.get(); String remoteIp = CONTEXT_KEY_CONN_REMOTE_IP.get();
String clientIp = "";
@Override @Override
public void onNext(Payload payload) { public void onNext(Payload payload) {
clientIp = payload.getMetadata().getClientIp();
traceDetailIfNecessary(payload); traceDetailIfNecessary(payload);
Object parseObj = null; Object parseObj = null;
@ -112,9 +115,9 @@ public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestSt
appName = labels.get(Constants.APPNAME); appName = labels.get(Constants.APPNAME);
} }
ConnectionMeta metaInfo = new ConnectionMeta(connectionId, clientIp, clientPort, localPort, ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
ConnectionType.GRPC.getType(), setUpRequest.getClientVersion(), appName, remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
setUpRequest.getLabels()); setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
metaInfo.setTenant(setUpRequest.getTenant()); metaInfo.setTenant(setUpRequest.getTenant());
Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get()); Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
connection.setAbilities(setUpRequest.getAbilities()); connection.setAbilities(setUpRequest.getAbilities());

View File

@ -22,9 +22,9 @@ import com.alibaba.nacos.api.grpc.auto.RequestGrpc;
import com.alibaba.nacos.api.remote.request.Request; import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.RequestMeta; import com.alibaba.nacos.api.remote.request.RequestMeta;
import com.alibaba.nacos.api.remote.request.ServerCheckRequest; import com.alibaba.nacos.api.remote.request.ServerCheckRequest;
import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.Response; import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ResponseCode; import com.alibaba.nacos.api.remote.response.ResponseCode;
import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.ServerCheckResponse; import com.alibaba.nacos.api.remote.response.ServerCheckResponse;
import com.alibaba.nacos.common.remote.client.grpc.GrpcUtils; import com.alibaba.nacos.common.remote.client.grpc.GrpcUtils;
import com.alibaba.nacos.core.remote.Connection; import com.alibaba.nacos.core.remote.Connection;
@ -37,8 +37,6 @@ import io.grpc.stub.StreamObserver;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN_CLIENT_IP;
import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN_CLIENT_PORT;
import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN_ID; import static com.alibaba.nacos.core.remote.grpc.BaseGrpcServer.CONTEXT_KEY_CONN_ID;
/** /**
@ -57,9 +55,8 @@ public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase {
private ConnectionManager connectionManager; private ConnectionManager connectionManager;
private void traceIfNecessary(Payload grpcRequest, boolean receive) { private void traceIfNecessary(Payload grpcRequest, boolean receive) {
String clientIp = CONTEXT_KEY_CONN_CLIENT_IP.get(); String clientIp = grpcRequest.getMetadata().getClientIp();
String connectionId = CONTEXT_KEY_CONN_ID.get(); String connectionId = CONTEXT_KEY_CONN_ID.get();
try { try {
if (connectionManager.traced(clientIp)) { if (connectionManager.traced(clientIp)) {
Loggers.REMOTE_DIGEST.info("[{}]Payload {},meta={},body={}", connectionId, receive ? "receive" : "send", Loggers.REMOTE_DIGEST.info("[{}]Payload {},meta={},body={}", connectionId, receive ? "receive" : "send",
@ -163,9 +160,8 @@ public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase {
try { try {
Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get()); Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta(); RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(CONTEXT_KEY_CONN_CLIENT_IP.get()); requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get()); requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientPort(CONTEXT_KEY_CONN_CLIENT_PORT.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion()); requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels()); requestMeta.setLabels(connection.getMetaInfo().getLabels());
connectionManager.refreshActiveTime(requestMeta.getConnectionId()); connectionManager.refreshActiveTime(requestMeta.getConnectionId());
@ -178,8 +174,9 @@ public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase {
Loggers.REMOTE_DIGEST Loggers.REMOTE_DIGEST
.error("[{}] Fail to handle request from connection [{}] ,error message :{}", "grpc", connectionId, .error("[{}] Fail to handle request from connection [{}] ,error message :{}", "grpc", connectionId,
e); e);
Payload payloadResponse = GrpcUtils Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(
.convert(buildErrorResponse(ResponseCode.FAIL.getCode(), e.getMessage())); (e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
e.getMessage()));
traceIfNecessary(payloadResponse, false); traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse); responseObserver.onNext(payloadResponse);
responseObserver.onCompleted(); responseObserver.onCompleted();