#1409 Update from Nacos

This commit is contained in:
nkorange 2019-10-15 17:45:07 +08:00
parent 584d73ba17
commit 13d82582ee
2 changed files with 131 additions and 79 deletions

View File

@ -1,19 +1,18 @@
package com.alibaba.nacos.istio;
import com.alibaba.nacos.istio.mcp.NacosMcpServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* @author nkorange
* @since 1.1.4
*/
@EnableScheduling
@SpringBootApplication
public class IstioApp {
public static void main(String[] args) throws Exception {
final NacosMcpServer server = new NacosMcpServer();
server.start();
server.waitForTerminated();
public static void main(String[] args) {
SpringApplication.run(IstioApp.class, args);
}
}

View File

@ -1,115 +1,170 @@
package com.alibaba.nacos.istio.mcp;
import com.alibaba.nacos.api.common.Constants;
import com.alibaba.nacos.api.naming.utils.NamingUtils;
import com.alibaba.nacos.istio.misc.Loggers;
import com.alibaba.nacos.istio.model.Port;
import com.alibaba.nacos.istio.model.mcp.*;
import com.alibaba.nacos.istio.model.naming.ServiceEntry;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.ServiceManager;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.google.protobuf.Any;
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author nkorange
* @since 1.1.4
*/
@Service
@org.springframework.stereotype.Service
public class NacosMcpService extends ResourceSourceGrpc.ResourceSourceImplBase {
private AtomicInteger connectIdGenerator = new AtomicInteger(0);
private Map<Integer, StreamObserver<Resources>> connnections = new ConcurrentHashMap<>();
private Map<Integer, StreamObserver<Resources>> connnections = new ConcurrentHashMap<>(16);
private Map<String, Resource> resourceMap = new ConcurrentHashMap<>(16);
private Map<String, String> checksumMap = new ConcurrentHashMap<>(16);
private static final String SERVICE_NAME_SPLITTER = "nacos";
private static final String MESSAGE_TYPE_URL = "type.googleapis.com/istio.networking.v1alpha3.ServiceEntry";
private static final long MCP_PUSH_PERIOD_MILLISECONDS = 10000L;
@Autowired
private ServiceManager serviceManager;
public NacosMcpService() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
TimeUnit.SECONDS.sleep(10L);
if (connnections.isEmpty()) {
continue;
}
for (StreamObserver<Resources> observer : connnections.values()) {
observer.onNext(generateResources());
}
}
} catch (Exception e) {
}
}
});
thread.start();
GlobalExecutor.schedule(new McpPushTask(), MCP_PUSH_PERIOD_MILLISECONDS * 2, MCP_PUSH_PERIOD_MILLISECONDS);
}
public Resources generateResources() {
private class McpPushTask implements Runnable {
try {
@Override
public void run() {
String serviceName = "java.mock.1";
int endpointCount = 10;
// Query all services to see if any of them have changes:
Set<String> namespaces = serviceManager.getAllNamespaces();
ServiceEntry.Builder serviceEntryBuilder = ServiceEntry.newBuilder()
.setResolution(ServiceEntry.Resolution.STATIC)
.setLocation(ServiceEntry.Location.MESH_INTERNAL)
.addHosts(serviceName + ".nacos")
.addPorts(Port.newBuilder().setNumber(8080).setName("http").setProtocol("HTTP").build());
for (String namespace : namespaces) {
for (int i = 0; i < endpointCount; i++) {
ServiceEntry.Endpoint endpoint =
ServiceEntry.Endpoint.newBuilder()
.setAddress("10.10.10." + i)
.setWeight(1)
.putPorts("http", 8080)
.putLabels("app", "nacos-istio")
.build();
serviceEntryBuilder.addEndpoints(endpoint);
Map<String, Service> services = serviceManager.getServiceMap(namespace);
if (services.isEmpty()) {
continue;
}
for (Service service : services.values()) {
String convertedName = convertName(service);
// Service not changed:
if (checksumMap.containsKey(convertedName) && checksumMap.get(convertedName).equals(service.getChecksum())) {
continue;
}
// Update the resource:
resourceMap.put(convertedName, convertService(service));
checksumMap.put(convertedName, service.getChecksum());
}
}
ServiceEntry serviceEntry = serviceEntryBuilder.build();
Any any = Any.newBuilder()
.setValue(serviceEntry.toByteString())
.setTypeUrl("type.googleapis.com/istio.networking.v1alpha3.ServiceEntry")
.build();
Metadata metadata = Metadata.newBuilder()
.setName("nacos/" + serviceName)
.putAnnotations("virtual", "1")
.build();
Resource resource = Resource.newBuilder()
.setBody(any)
.setMetadata(metadata)
.build();
Resources resources = Resources.newBuilder()
.addResources(resource)
.addAllResources(resourceMap.values())
.setCollection(CollectionTypes.SERVICE_ENTRY)
.setNonce(String.valueOf(System.currentTimeMillis()))
.build();
Loggers.MAIN.info("generated resources: {}", resources);
if (connnections.isEmpty()) {
return;
}
return resources;
} catch (Exception e) {
Loggers.MAIN.info("MCP push, resource count is: {}", resourceMap.size());
Loggers.MAIN.error("", e);
return null;
if (Loggers.MAIN.isDebugEnabled()) {
Loggers.MAIN.debug("MCP push, sending resources: {}", resources);
}
for (StreamObserver<Resources> observer : connnections.values()) {
observer.onNext(resources);
}
}
}
private String convertName(Service service) {
String serviceName = NamingUtils.getServiceName(service.getName()) + ".sn";
if (!Constants.DEFAULT_GROUP.equals(NamingUtils.getGroupName(service.getName()))) {
serviceName = serviceName + NamingUtils.getGroupName(service.getName()) + ".gn";
}
if (!Constants.DEFAULT_NAMESPACE_ID.equals(service.getNamespaceId())) {
serviceName = serviceName + service.getNamespaceId() + ".ns";
}
return serviceName;
}
private Resource convertService(Service service) {
String serviceName = convertName(service);
ServiceEntry.Builder serviceEntryBuilder = ServiceEntry.newBuilder()
.setResolution(ServiceEntry.Resolution.STATIC)
.setLocation(ServiceEntry.Location.MESH_INTERNAL)
.addHosts(serviceName + "." + SERVICE_NAME_SPLITTER)
.addPorts(Port.newBuilder().setNumber(8080).setName("http").setProtocol("HTTP").build());
for (Instance instance : service.allIPs()) {
if (!instance.isHealthy() || !instance.isEnabled()) {
continue;
}
ServiceEntry.Endpoint endpoint =
ServiceEntry.Endpoint.newBuilder()
.setAddress(instance.getIp())
.setWeight((int) instance.getWeight())
.putAllLabels(instance.getMetadata())
.putPorts("http", instance.getPort())
.build();
serviceEntryBuilder.addEndpoints(endpoint);
}
ServiceEntry serviceEntry = serviceEntryBuilder.build();
Any any = Any.newBuilder()
.setValue(serviceEntry.toByteString())
.setTypeUrl(MESSAGE_TYPE_URL)
.build();
Metadata metadata = Metadata.newBuilder()
.setName(SERVICE_NAME_SPLITTER + "/" + serviceName)
.putAllAnnotations(service.getMetadata())
.putAnnotations("virtual", "1")
.build();
Resource resource = Resource.newBuilder()
.setBody(any)
.setMetadata(metadata)
.build();
return resource;
}
@Override
public StreamObserver<RequestResources> establishResourceStream(StreamObserver<Resources> responseObserver) {
@ -146,8 +201,6 @@ public class NacosMcpService extends ResourceSourceGrpc.ResourceSourceImplBase {
.build();
responseObserver.onNext(resources);
} else {
responseObserver.onNext(generateResources());
}
}