服务接入 Sentinel 集群流控¶
为服务接入 Sentinel 集群流控时,需要从服务器和客户端两方面调整配置。
集群流控服务器¶
DCE 5.0 不提供托管的 Sentinel 服务器,所以用户需自己启动 Sentinel 官方提供的集群流控服务器,且该服务器需要满足以下条件:
-
连接 Nacos,以便从 Nacos 读取持久化配置
-
为 Sentinel 服务器命名,以便作为配置前缀来区分多个集群流控服务器的
NamespaceSet
和ServerTransport
配置ReadableDataSource<String, Set<String>> namespaceDs = new NacosDataSource<>(nacosAddress, DEFAULT_GROUP, serverName + NAMESPACESET_POSTFIX, source -> JSON.parseObject(source, new TypeReference<Set<String>>() {})); ClusterServerConfigManager.registerNamespaceSetProperty(namespaceDs.getProperty()); ReadableDataSource<String, ServerTransportConfig> transportConfigDs = new NacosDataSource<>(nacosAddress, DEFAULT_GROUP, serverName + SERVERTRANSPORT_POSTFIX, source -> JSON.parseObject(source, new TypeReference<ServerTransportConfig>() {})); ClusterServerConfigManager.registerServerTransportProperty(transportConfigDs.getProperty());
-
相关的代码实现,可参考 sentinel-cluster-flow-control-java
集群流控客户端¶
由于 Sentinel 官方提供的客户端 sentinel-cluster-client-default
SDK 只适配内存模式使用。而 DCE 5.0 也为集群流控客户端配置做了持久化,所以官方原始客户端 SDK 已无效。
-
首先,用户需引入如下代码:
public class SentinelClusterClientInitFunc implements InitFunc { private String nacosAddress; private Properties properties = new Properties(); private String groupId; private String dataId; @Override public void init() throws Exception { getNacosAddr(); parseAppName(); initDynamicRuleProperty(); initClientConfigProperty(); initClientServerAssignProperty(); initStateProperty(); } private void getNacosAddr() { nacosAddress = System.getProperty("nacos.address"); if (StringUtil.isBlank(nacosAddress)){ throw new RuntimeException("nacos address start param must be set"); } System.out.printf("nacos address: %s\n", nacosAddress); } private void parseAppName() { String[] apps = APPNAME.split("@@"); System.out.printf("app name: %s\n", APPNAME); if (apps.length != 3) { throw new RuntimeException("app name format must be set like this: {{namespaceId}}@@{{groupName}}@@{{appName}}"); } else if (StringUtil.isBlank(apps[1])){ throw new RuntimeException("group name cannot be empty"); } properties.put(PropertyKeyConst.NAMESPACE, apps[0]); properties.put(PropertyKeyConst.SERVER_ADDR, nacosAddress); properties.put(PropertyKeyConst.USERNAME, DEFAULT_NACOS_USERNAME); properties.put(PropertyKeyConst.PASSWORD, DEFAULT_NACOS_PASSWORD); groupId = apps[1]; dataId = apps[2]; } private void initDynamicRuleProperty() { ReadableDataSource<String, List<FlowRule>> ruleSource = new NacosDataSource<>(properties, groupId, dataId + FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {})); FlowRuleManager.register2Property(ruleSource.getProperty()); ReadableDataSource<String, List<ParamFlowRule>> paramRuleSource = new NacosDataSource<>(properties, groupId, dataId + PARAM_FLOW_POSTFIX, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {})); ParamFlowRuleManager.register2Property(paramRuleSource.getProperty()); } private void initClientConfigProperty() { ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new NacosDataSource<>(properties, groupId, dataId + CLUSTER_CLIENT_POSTFIX, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() {})); ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty()); } private void initClientServerAssignProperty() { // Cluster map format: // [ // { // "machineId": "10.64.0.81@8720", // "ip": "10.64.0.81", // "port": 18730, // "clientSet": ["10.64.0.81@8721", "10.64.0.81@8722"] // } // ] ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new NacosDataSource<>(properties, groupId, dataId + CLUSTER_MAP_POSTFIX, source -> { List<ClusterGroupDto> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupDto>>() {}); return Optional.ofNullable(groupList) .flatMap(this::extractClientAssignment) .orElse(null); }); ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty()); } private void initStateProperty() { ReadableDataSource<String, Integer> clusterModeDs = new NacosDataSource<>(properties, groupId, dataId + CLUSTER_MAP_POSTFIX, source -> { List<ClusterGroupDto> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupDto>>() {}); return Optional.ofNullable(groupList) .map(this::extractMode) .orElse(ClusterStateManager.CLUSTER_NOT_STARTED); }); ClusterStateManager.registerProperty(clusterModeDs.getProperty()); } private int extractMode(List<ClusterGroupDto> groupList) { if (groupList.stream().anyMatch(this::machineEqual)) { return ClusterStateManager.CLUSTER_SERVER; } boolean canBeClient = groupList.stream() .flatMap(e -> e.getClientSet().stream()) .filter(Objects::nonNull) .anyMatch(e -> e.equals(getCurrentMachineId())); return canBeClient ? ClusterStateManager.CLUSTER_CLIENT : ClusterStateManager.CLUSTER_NOT_STARTED; } private Optional<ClusterClientAssignConfig> extractClientAssignment(List<ClusterGroupDto> groupList) { if (groupList.stream().anyMatch(this::machineEqual)) { return Optional.empty(); } for (ClusterGroupDto group : groupList) { if (group.getClientSet().contains(getCurrentMachineId())) { String ip = group.getIp(); Integer port = group.getPort(); return Optional.of(new ClusterClientAssignConfig(ip, port)); } } return Optional.empty(); } private boolean machineEqual(ClusterGroupDto group) { return getCurrentMachineId().equals(group.getMachineId()); } private String getCurrentMachineId() { return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getRuntimePort(); } private static final String SEPARATOR = "@"; }
-
接着在 Spring 容器中通过如下方式加载: