sentinel流控成效是怎样见效的_sentinel2卫星
sentinel流控效果是如何生效的
一、sentinel简介
Sentinel 是面向分布式、多语言异构化服务架构的流量治理组件,主要以流量为切入点,从流量控制、流量路由、熔断降级、系统自适应保护等多个维度来帮助用户保障微服务的稳定性
随着微服务的流行,服务和服务之间的稳定性变得越来越重要。伴随着公司用户量和流量的日益增加,对于数据库的压力是越来越大,Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护
二、源码入口
使用sentinel有两种方式,一种方式是对需要限流和降级的接口资源方法上面加入@SentinelResource注解,还有一种就是通过拦截器的方式进行资源保护限流和降级,其实两种方法执行的关键方法都是同一段代码,今天我们暂时通过注解这种aop切面的形式,sentinel源码是如何实现限流和降级的呢
底层源码的工作还是基于SpringBoot的自动装配原理,在spring-cloud-starter-alibaba-sentinel.jar,下面的spring.fatories里面的SentinelAutoConfiguration类,我们来看看这个类的源码
在Spring容器启动的时候实例化了一个SentinelResourceAspect类,看这个类的命名就应该可以大概猜到这个类就是一个切面类,我们点击进入
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
@Pointcut("@annotation(.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) thros Throable {
Method originMethod = resolveMethod(pjp);
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
if (annotation == null) {
// Should not go through here.
thro ne IllegalStateException("Wrong state for SentinelResource annotation");
}
String resourceName = getResourceName(annotation.value(), originMethod);
EntryType entryType = annotation.entryType();
int resourceType = annotation.resourceType();
Entry entry = null;
try {
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
return pjp.proceed();
} catch (BlockException ex) {
return handleBlockException(pjp, annotation, ex);
} catch (Throable ex) {
Class<? extends Throable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
// The ignore list ill be checked first.
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
thro ex;
}
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
traceException(ex);
return handleFallback(pjp, annotation, ex);
}
// No fallback function can handle the exception, so thro it out.
thro ex;
} finally {
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
}
}
一看就知道这个切面类,是利用的aop的技术,切点就是我们上面说的@SentinelResource注解,所以当我们的接口方法上面加了@SentinelResource注解,执行这个接口里面的业务逻辑前会先进入这个切面类的invokeResourceWithSentinel方法,我们看看这个切面是如何执行的,我们重点看SphU.entry(resourceName, resourceType, entryType, pjp.getArgs())这个方法
public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
thros BlockException {
return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
}
在这个Env的类的实例化的时候会执行static代码块的逻辑
public class Env {
public static final Sph sph = ne CtSph();
static {
// If init fails, the process ill exit.
InitExecutor.doInit();
}
}
我们继续看看这个InitExecutor.doInit()方法里面到底做了什么动作
public static void doInit() {
if (!initialized.pareAndSet(false, true)) {
return;
}
try {
List initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
List initList = ne ArrayList();
for (InitFunc initFunc : initFuncs) {
RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName());
insertSorted(initList, initFunc);
}
for (OrderWrapper : initList) {
.func.init();
RecordLog.info("[InitExecutor] Executing {} ith order {}",
.func.getClass().getCanonicalName(), .order);
}
} catch (Exception ex) {
RecordLog.arn("[InitExecutor] WARN: Initialization failed", ex);
ex.printStackTrace();
} catch (Error error) {
RecordLog.arn("[InitExecutor] ERROR: Initialization failed ith fatal error", error);
error.printStackTrace();
}
}
用cas做了并发安全处理,保证多线程情况下面只有一个线程执行成功,然后这里主要的逻辑是 SpiLoader.of(InitFunc.class).loadInstanceListSorted(),然后会调用到里面的load方法
/
Load all Provider instances of the specified Service, sorted by order value in class's {@link Spi} annotation
@return Sorted Provider instances list
/
public List loadInstanceListSorted() {
load();
return createInstanceList(sortedClassList);
}
/
Load the Provider class from Provider configuration file
/
public void load() {
if (!loaded.pareAndSet(false, true)) {
return;
}
String fullFileName = SPI_FILE_PREFIX + service.getName();
ClassLoader classLoader;
if (SentinelConfig.shouldUseContextClassloader()) {
classLoader = Thread.currentThread().getContextClassLoader();
} else {
classLoader = service.getClassLoader();
}
if (classLoader == null) {
classLoader = ClassLoader.getSystemClassLoader();
}
Enumeration urls = null;
try {
urls = classLoader.getResources(fullFileName);
} catch (IOException e) {
fail("Error locating SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader, e);
}
if (urls == null || !urls.hasMoreElements()) {
RecordLog.arn("No SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader);
return;
}
hile (urls.hasMoreElements()) {
URL url = urls.nextElement();
InputStream in = null;
BufferedReader br = null;
try {
in = url.openStream();
br = ne BufferedReader(ne InputStreamReader(in, StandardCharsets.UTF_8));
String line;
hile ((line = br.readLine()) != null) {
if (StringUtil.isBlank(line)) {
// Skip blank line
continue;
}
line = line.trim();
int mentIndex = line.indexOf("#");
if (mentIndex == 0) {
// Skip ment line
continue;
}
if (mentIndex > 0) {
line = line.substring(0, mentIndex);
}
line = line.trim();
Class clazz = null;
try {
clazz = (Class) Class.forName(line, false, classLoader);
} catch (ClassNotFoundException e) {
fail("class " + line + " not found", e);
}
if (!service.isAssignableFrom(clazz)) {
fail("class " + clazz.getName() + "is not subtype of " + service.getName() + ",SPI configuration file=" + fullFileName);
}
classList.add(clazz);
Spi spi = clazz.getAnnotation(Spi.class);
String aliasName = spi == null || "".equals(spi.value()) ? clazz.getName() : spi.value();
if (classMap.containsKey(aliasName)) {
Class<? extends S> existClass = classMap.get(aliasName);
fail("Found repeat alias name for " + clazz.getName() + " and "
+ existClass.getName() + ",SPI configuration file=" + fullFileName);
}
classMap.put(aliasName, clazz);
if (spi != null && spi.isDefault()) {
if (defaultClass != null) {
fail("Found more than one default Provider, SPI configuration file=" + fullFileName);
}
defaultClass = clazz;
}
RecordLog.info("[SpiLoader] Found SPI implementation for SPI {}, provider={}, aliasName={}"
+ ", isSingleton={}, isDefault={}, order={}",
service.getName(), line, aliasName
, spi == null ? true : spi.isSingleton()
, spi == null ? false : spi.isDefault()
, spi == null ? 0 : spi.order());
}
} catch (IOException e) {
fail("error reading SPI configuration file", e);
} finally {
closeResources(in, br);
}
}
sortedClassList.addAll(classList);
Collections.sort(sortedClassList, ne Comparator>() {
@Override
public int pare(Class<? extends S> o1, Class<? extends S> o2) {
Spi spi1 = o1.getAnnotation(Spi.class);
int order1 = spi1 == null ? 0 : spi1.order();
Spi spi2 = o2.getAnnotation(Spi.class);
int order2 = spi2 == null ? 0 : spi2.order();
return Integer.pare(order1, order2);
}
});
}
在这个load方法里面这里会加载到sentinel-core.jar下面META-INF目录下面的services里面的所有的实现类,通过反射实例化
这里我们先返回进入到这个entryWithType方法里面去
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args)
thros BlockException {
return entryWithType(name, resourceType, entryType, count, false, args);
}
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
Object[] args) thros BlockException {
StringResourceWrapper resource = ne StringResourceWrapper(name, entryType, resourceType);
return entryWithPriority(resource, count, prioritized, args);
}
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
thros BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking ill be done.
return ne CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// Global sitch is close, no rule checking ill do.
if (!Constants.ON) {
return ne CtEntry(resourceWrapper, null, context);
}
ProcessorSlot
这里会调用到entryWithPriority这个方法里面,这里面用到了责任链的设计模式,会构造一个处理的链路,我们关注一下这个lookProcessChain(resourceWrapper)方法
ProcessorSlot
会从一个缓存map里面去取,如果为空的话, 调用SlotChainProvider.neSlotChain()构造一个链路,并且放到map里面缓存
public static ProcessorSlotChain neSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.arn("[SlotChainProvider] Wrong state hen resolving slot chain builder, using default");
slotChainBuilder = ne DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
这里也是用到jdk的spi机制,会加载sentinel-core.jar里面META-INF下面的services下面配置的ProcessSlot的实现类
# Sentinel default ProcessorSlots
.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
.alibaba.csp.sentinel.slots.logger.LogSlot
.alibaba.csp.sentinel.slots.statistic.StatisticSlot
.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
.alibaba.csp.sentinel.slots.system.SystemSlot
.alibaba.csp.sentinel.slots.block.flo.FloSlot
.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
最终通过DefaultSlotChainBuilder的build方法构造了一个处理器责任链,如下图
然后依次调用 chain.entry(context, resourceWrapper, null, count, prioritized, args)方法,依次调用图中的各个slot类,这里我们重点看看FloSlot类和DegradeSlot这两个类的entry方法
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) thros Throable {
checkFlo(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
在这个FloSlot的entry方法,会校验我们控制台配置的限流流控规则,最终会调用到checkFlo方法
void checkFlo(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
thros BlockException {
checker.checkFlo(ruleProvider, resource, context, node, count, prioritized);
}
public void checkFlo(Function> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) thros BlockException {
if (ruleProvider == null || resource == null) {
return;
}
Collection rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FloRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
thro ne FloException(rule.getLimitApp(), rule);
}
}
}
}
通过Collection
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) thros Throable {
Method originMethod = resolveMethod(pjp);
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
if (annotation == null) {
// Should not go through here.
thro ne IllegalStateException("Wrong state for SentinelResource annotation");
}
String resourceName = getResourceName(annotation.value(), originMethod);
EntryType entryType = annotation.entryType();
int resourceType = annotation.resourceType();
Entry entry = null;
try {
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
return pjp.proceed();
} catch (BlockException ex) {
return handleBlockException(pjp, annotation, ex);
} catch (Throable ex) {
Class<? extends Throable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
// The ignore list ill be checked first.
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
thro ex;
}
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
traceException(ex);
return handleFallback(pjp, annotation, ex);
}
// No fallback function can handle the exception, so thro it out.
thro ex;
} finally {
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
}
protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex)
thros Throable {
// Execute block handler if configured.
Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(),
annotation.blockHandlerClass());
if (blockHandlerMethod != null) {
Object[] originArgs = pjp.getArgs();
// Construct args.
Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1);
args[args.length - 1] = ex;
return invoke(pjp, blockHandlerMethod, args);
}
// If no block handler is present, then go to fallback.
return handleFallback(pjp, annotation, ex);
}
最终会拿到这个SentinelResource注解里面配置的异常处理方法完成调用,至此整个sentinel流控保护流程就结束了,至于在这个责任链里面是如何统计各项指标,如何判断qps是否超过配置的最大值,以及sentinel里面的熔断,断路器何时进行打开,何时进行半开,何时关闭,里面的算法很复杂,以后专门写一篇文章来分析