Netty
Netty 是一个异步事件驱动网络应用框架,可用来快速开发高性能且可维护的客户端服务器应用。
Netty
https://netty.io/
netty / netty
https://github.com/netty/netty
netty 4.0 的一项重大改进就是将全部入栈、出栈流量数据都放入堆外内存中了。
Netty 堆外内存
Netty堆外内存泄漏排查,这一篇全讲清楚了
https://segmentfault.com/a/1190000021469481
先要熟悉带 Cleaner 的 DirectByteBuffer 的堆外内存回收原理。
noCleaner 的 DirectByteBuffer
Netty 在 4.1 引入 noCleaner 策略:创建不带 Cleaner 的 DirectByteBuffer 对象,这样做的好处是绕开带 Cleaner 的 DirectByteBuffer 执行构造方法和执行 Cleaner 的 clean() 方法中一些额外开销,当堆外内存不够的时候,不会触发 System.gc(),提高性能。
以非池化直接内存申请为例 UnpooledByteBufAllocator.newDirectBuffer()
申请堆外内存时,先判断 noCleaner 是否可用
- noCleaner 为 true:创建 InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf 对象,即 noCleaner 的 DirectByteBuffer
- noCleaner 为 false:创建 InstrumentedUnpooledUnsafeDirectByteBuf 对象,即 hasCleaner 的 DirectByteBuffer
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetector) {
this(preferDirect, disableLeakDetector, PlatformDependent.useDirectBufferNoCleaner());
}
public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetector, boolean tryNoCleaner) {
super(preferDirect);
this.disableLeakDetector = disableLeakDetector;
noCleaner = tryNoCleaner && PlatformDependent.hasUnsafe() && PlatformDependent.hasDirectBufferNoCleanerConstructor();
}
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
}
noCleaner 的结果来自静态方法 PlatformDependent.useDirectBufferNoCleaner()
public final class PlatformDependent {
private static final boolean USE_DIRECT_BUFFER_NO_CLEANER;
static {
long maxDirectMemory = SystemPropertyUtil.getLong("io.netty.maxDirectMemory", -1);
if (maxDirectMemory == 0 || !hasUnsafe() || !PlatformDependent0.hasDirectBufferNoCleanerConstructor()) {
USE_DIRECT_BUFFER_NO_CLEANER = false;
DIRECT_MEMORY_COUNTER = null;
} else {
USE_DIRECT_BUFFER_NO_CLEANER = true;
if (maxDirectMemory < 0) {
maxDirectMemory = MAX_DIRECT_MEMORY;
if (maxDirectMemory <= 0) {
DIRECT_MEMORY_COUNTER = null;
} else {
DIRECT_MEMORY_COUNTER = new AtomicLong();
}
} else {
DIRECT_MEMORY_COUNTER = new AtomicLong();
}
}
}
public static boolean useDirectBufferNoCleaner() {
return USE_DIRECT_BUFFER_NO_CLEANER;
}
}
PlatformDependent 类在初始化时会根据系统属性、-XX:MaxDirectMemorySize 和 -Dio.netty.maxDirectMemory 等判断是否使用 noCleaner 策略。
有 arthas 的话,可以直接执行 ognl '@io.netty.util.internal.PlatformDependent@useDirectBufferNoCleaner()'
确认 Netty 使用的是否是 noCleaner 的堆外内存
noCleaner 的 DirectByteBuffer 和 hasCleaner 的 DirectByteBuffer 的区别如下:
1、构造方式不同:
noCleaner 的 DirectByteBuffer:由反射调用 private DirectByteBuffer(long addr, int cap) 创建,cleaner 是 null
hasCleaner 的 DirectByteBuffer:由 new DirectByteBuffer(int cap) 创建
2、释放内存的方式不同
noCleaner 的 DirectByteBuffer:使用 UnSafe.freeMemory(address);
hasCleaner 的 DirectByteBuffer:使用 DirectByteBuffer 的 Cleaner 的 clean() 方法
Netty 在启动时需要判断检查当前环境、环境配置参数是否允许 noCleaner 策略(具体逻辑位于 PlatformDependent 的 static 代码块),例如运行在 Android 下时,是没有 Unsafe 类的,不允许使用noCleaner 策略,如果不允许,则使用 hasCleaner 策略
为什么要使用 noCleaner 的堆外内存
一是因为带 cleaner 的 DirectByteBuffer 堆外内存自动回收不实时:需要 ByteBuffer 对象被 GC 线程回收才会触发,如果 ByteBuffer 对象进入老年代后才变得可回收,则需要等到发送频率较低老年代 GC 才会触发
另一方面,Netty 需要基于 ByteBuf.release() 方法执行其他操作,例如池化内存释放回内存池,否则该对象会被内存池一直标记为已使用。
-XX:MaxDirectMemorySize 和 -Dio.netty.maxDirectMemory
-XX:MaxDirectMemorySize
用于限制 Netty 中 hasCleaner 策略的 DirectByteBuffer 堆外内存的大小,默认值是 JVM 能从操作系统申请的最大内存,如果内存本身没限制,则值为 Long.MAX_VALUE 个字节(默认值由 Runtime.getRuntime().maxMemory() 返回),代码位于 java.nio.Bits#reserveMemory()
方法中
-XX:MaxDirectMemorySize 无法限制 Netty 中 noCleaner 策略的 DirectByteBuffer 堆外内存的大小
-Dio.netty.maxDirectMemory
用于限制 noCleaner 策略下 Netty 的 DirectByteBuffer 分配的最大堆外内存的大小,如果该值为0,则使用 hasCleaner 策略,代码位于 PlatformDependent#incrementMemoryCounter()
方法中
查看 Netty 使用了多少堆外内存
有 arthas 的话,先通过 ognl '@io.netty.util.internal.PlatformDependent@useDirectBufferNoCleaner()'
确认使用的是否是 noCleaner 的堆外内存。
1、hasCleaner 的 DirectByteBuffer 使用量监控
java.nio.Bits 类是有记录 hasCleaner 的 DirectByteBuffer 堆外内存的使用情况,但是该类是包级别的访问权限,不能直接获取,可以通过 MXBean 来获取
List<BufferPoolMXBean> bufferPoolMXBeans = ManagementFactoryHelper.getBufferPoolMXBeans();
BufferPoolMXBean directBufferMXBean = bufferPoolMXBeans.get(0);
// hasCleaner的DirectBuffer的数量
long count = directBufferMXBean.getCount();
// hasCleaner的DirectBuffer的堆外内存占用大小,单位字节
long memoryUsed = directBufferMXBean.getMemoryUsed();
此外,MappedByteBuffer 是基于 FileChannelImpl.map 进行进行 mmap 内存映射(零拷贝的一种实现)得到的另外一种堆外内存的 ByteBuffer,可以通过 ManagementFactoryHelper.getBufferPoolMXBeans().get(1)
获取到该堆外内存的监控指标
2、noCleaner 的 DirectByteBuffer 使用量监控
看静态方法 PlatformDependent.usedDirectMemory()
的返回即可
arthas 执行
ognl '@io.netty.util.internal.PlatformDependent@maxDirectMemory()'
ognl '@io.netty.util.internal.PlatformDependent@usedDirectMemory()'
Netty 内存泄漏检测
Netty 自带了内存泄漏检测工具,可用于检测出 ByteBuf 对象被 GC 回收但 ByteBuf 管理的内存没有释放的情况,但不适用 ByteBuf 对象还没被 GC 回收内存泄漏的情况,例如任务队列积压。
Netty 提供 4 个内存泄漏检测级别:
disabled
完全关闭内存泄露检测simple
以约1%的抽样率检测是否泄露,默认级别advanced
抽样率同simple,但显示详细的泄露报告paranoid
抽样率为100%,显示报告信息同 advanced
在命令行参数配置,比如 -Dio.netty.leakDetectionLevel=paranoid
Netty 内存泄漏检测原理(WeakReference 的引用队列)
Netty 内存泄漏检测的原理是利用弱引用(WeakReference)的引用队列(refQueue),通过将 ByteBuf 对象用弱引用包装起来,当发生GC时,如果 GC 线程检测到 ByteBuf 对象只被弱引用对象关联,会将该 WeakReference 加入 refQueue;
当 ByteBuf 内存被正常释放,会调用 WeakReference 的 clear() 方法解除对 ByteBuf 的引用,后续 GC 线程不会再将该 WeakReference 加入 refQueue;
Netty 在每次创建 ByteBuf 时,基于抽样率,抽样命中时会轮询(poll) refQueue 中的 WeakReference 对象,轮询返回的非null的 WeakReference 关联的 ByteBuf 即为泄漏的堆外内存
如果禁用了内存泄漏检测,直接返回 ByteBuf,否则通过 toLeakAwareBuffer(buf)
返回一个经过包装的 ByteBuf:
toLeakAwareBuffer()
中根据不同的内存泄漏检测级别返回不同的包装结构
public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
ResourceLeakTracker<ByteBuf> leak;
switch (ResourceLeakDetector.getLevel()) {
case SIMPLE:
leak = AbstractByteBuf.leakDetector.track(buf);
if (leak != null) {
buf = new SimpleLeakAwareByteBuf(buf, leak);
}
break;
case ADVANCED:
case PARANOID:
leak = AbstractByteBuf.leakDetector.track(buf);
if (leak != null) {
buf = new AdvancedLeakAwareByteBuf(buf, leak);
}
break;
default:
break;
}
return buf;
}
}
track()
中根据采样级别 随机/全部 采样将 ByteBuf 包装为 DefaultResourceLeak
public class ResourceLeakDetector<T> {
private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
public final ResourceLeakTracker<T> track(T obj) {
return track0(obj);
}
@SuppressWarnings("unchecked")
private DefaultResourceLeak track0(T obj) {
Level level = ResourceLeakDetector.level;
if (level == Level.DISABLED) {
return null;
}
// PARANOID 以下级别,随机抽样
if (level.ordinal() < Level.PARANOID.ordinal()) {
if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
reportLeak();
return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
return null;
}
// PARANOID 级别,全部采样
reportLeak();
return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
}
DefaultResourceLeak
是弱引用 WeakReference 的子类,内部调用了 WeakReference 的带引用队列的构造方法,传入 refQueue
private static final class DefaultResourceLeak<T> extends WeakReference<Object> implements ResourceLeakTracker<T>, ResourceLeak {
DefaultResourceLeak(
Object referent,
ReferenceQueue<Object> refQueue,
Set<DefaultResourceLeak<?>> allLeaks) {
super(referent, refQueue);
assert referent != null;
// Store the hash of the tracked object to later assert it in the close(...) method.
// It's important that we not store a reference to the referent as this would disallow it from
// be collected via the WeakReference.
trackedHash = System.identityHashCode(referent);
allLeaks.add(this);
// Create a new Record so we always have the creation stacktrace included.
headUpdater.set(this, new TraceRecord(TraceRecord.BOTTOM));
this.allLeaks = allLeaks;
}
}
Netty 中的三种帧编码方式
方式 | 解码 | 特点 |
---|---|---|
固定长度 | FixedLengthFrameDecoder | 简单,浪费空间 |
分隔符 | DelimiterBasedFrameDecoder | 简单,需要转义分隔符 |
length字段 | LengthFieldBasedFrameDecoder | 推荐 |
Netty解决粘包半包问题(含netty实例)
https://segmentfault.com/a/1190000023664467
FixedLengthFrameDecoder
DelimiterBasedFrameDecoder
LengthFieldBasedFrameDecoder
RxNetty
RxNetty 是将 Reactive 异步机制(类似Node.js) 带入了 Netty.
RxNetty是Netty的Reactive扩展
https://www.jdon.com/46629
Netty Pipeline 流水线
一条 Netty 通道需要很多业务处理器(Handler)来处理业务。每条通道内部都有一条流水线(Pipeline)将 Handler 装配起来。Netty 的业务处理器流水线 ChannelPipeline 是基于责任链设计模式来设计的,内部是一个双向链表结构,能够支持动态地添加和删除业务处理器。
在 Netty 的设计中 Handler 是无状态的,不保存和 Channel 有关的信息。Handler 的目标是将自己的处理逻辑做得很通用,可以给不同的 Channel 使用。与 Handler 不同的是,Pipeline 是有状态的,保存了 Channel 的关系。于是,Handler 和 Pipeline 之间需要一个中间角色将它们联系起来: ChannelHandlerContext(通道处理器上下文)
当业务处理器被添加到流水线中时会为其专门创建一个 ChannelHandlerContext 实例,主要封装了 ChannelHandler(通道处理器)和 ChannelPipeline(通道流水线)之间的关联关系。所以,流水线 ChannelPipeline 中的双向链接实质是一个由 ChannelHandlerContext 组成的双向链表。
Channel 拥有一条 ChannelPipeline,每一个流水线节点为一个 ChannelHandlerContext 上下文对象,每一个上下文中包裹了一个 ChannelHandler。在 ChannelHandler 的入站/出站处理方法中,Netty 会传递一个 Context 实例作为实际参数。处理器中的回调代码可以通过 Context 实参,在业务处理过程中去获取 ChannelPipeline 实例或者 Channel 实例。
页面信息
location:
protocol
: host
: hostname
: origin
: pathname
: href
: document:
referrer
: navigator:
platform
: userAgent
: