当前位置 : 首页 » 文章分类 :  开发  »  Netty

Netty

Netty 是一个异步事件驱动网络应用框架,可用来快速开发高性能且可维护的客户端服务器应用。

Netty
https://netty.io/

netty / netty
https://github.com/netty/netty

netty 4.0 的一项重大改进就是将全部入栈、出栈流量数据都放入堆外内存中了。


Netty 堆外内存

Netty堆外内存泄漏排查,这一篇全讲清楚了
https://segmentfault.com/a/1190000021469481

先要熟悉带 Cleaner 的 DirectByteBuffer 的堆外内存回收原理。

noCleaner 的 DirectByteBuffer

Netty 在 4.1 引入 noCleaner 策略:创建不带 Cleaner 的 DirectByteBuffer 对象,这样做的好处是绕开带 Cleaner 的 DirectByteBuffer 执行构造方法和执行 Cleaner 的 clean() 方法中一些额外开销,当堆外内存不够的时候,不会触发 System.gc(),提高性能。

以非池化直接内存申请为例 UnpooledByteBufAllocator.newDirectBuffer() 申请堆外内存时,先判断 noCleaner 是否可用

  • noCleaner 为 true:创建 InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf 对象,即 noCleaner 的 DirectByteBuffer
  • noCleaner 为 false:创建 InstrumentedUnpooledUnsafeDirectByteBuf 对象,即 hasCleaner 的 DirectByteBuffer
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
    public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetector) {
        this(preferDirect, disableLeakDetector, PlatformDependent.useDirectBufferNoCleaner());
    }

    public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetector, boolean tryNoCleaner) {
        super(preferDirect);
        this.disableLeakDetector = disableLeakDetector;
        noCleaner = tryNoCleaner && PlatformDependent.hasUnsafe() && PlatformDependent.hasDirectBufferNoCleanerConstructor();
    }

    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        final ByteBuf buf;
        if (PlatformDependent.hasUnsafe()) {
            buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
        return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
    }
}

noCleaner 的结果来自静态方法 PlatformDependent.useDirectBufferNoCleaner()

public final class PlatformDependent {
    private static final boolean USE_DIRECT_BUFFER_NO_CLEANER;

    static {
        long maxDirectMemory = SystemPropertyUtil.getLong("io.netty.maxDirectMemory", -1);

        if (maxDirectMemory == 0 || !hasUnsafe() || !PlatformDependent0.hasDirectBufferNoCleanerConstructor()) {
            USE_DIRECT_BUFFER_NO_CLEANER = false;
            DIRECT_MEMORY_COUNTER = null;
        } else {
            USE_DIRECT_BUFFER_NO_CLEANER = true;
            if (maxDirectMemory < 0) {
                maxDirectMemory = MAX_DIRECT_MEMORY;
                if (maxDirectMemory <= 0) {
                    DIRECT_MEMORY_COUNTER = null;
                } else {
                    DIRECT_MEMORY_COUNTER = new AtomicLong();
                }
            } else {
                DIRECT_MEMORY_COUNTER = new AtomicLong();
            }
        }
    }
    
    public static boolean useDirectBufferNoCleaner() {
        return USE_DIRECT_BUFFER_NO_CLEANER;
    }
}

PlatformDependent 类在初始化时会根据系统属性、-XX:MaxDirectMemorySize 和 -Dio.netty.maxDirectMemory 等判断是否使用 noCleaner 策略。
有 arthas 的话,可以直接执行 ognl '@io.netty.util.internal.PlatformDependent@useDirectBufferNoCleaner()' 确认 Netty 使用的是否是 noCleaner 的堆外内存

noCleaner 的 DirectByteBuffer 和 hasCleaner 的 DirectByteBuffer 的区别如下:
1、构造方式不同:
noCleaner 的 DirectByteBuffer:由反射调用 private DirectByteBuffer(long addr, int cap) 创建,cleaner 是 null
hasCleaner 的 DirectByteBuffer:由 new DirectByteBuffer(int cap) 创建

2、释放内存的方式不同
noCleaner 的 DirectByteBuffer:使用 UnSafe.freeMemory(address);
hasCleaner 的 DirectByteBuffer:使用 DirectByteBuffer 的 Cleaner 的 clean() 方法

Netty 在启动时需要判断检查当前环境、环境配置参数是否允许 noCleaner 策略(具体逻辑位于 PlatformDependent 的 static 代码块),例如运行在 Android 下时,是没有 Unsafe 类的,不允许使用noCleaner 策略,如果不允许,则使用 hasCleaner 策略


为什么要使用 noCleaner 的堆外内存

一是因为带 cleaner 的 DirectByteBuffer 堆外内存自动回收不实时:需要 ByteBuffer 对象被 GC 线程回收才会触发,如果 ByteBuffer 对象进入老年代后才变得可回收,则需要等到发送频率较低老年代GC才会触发

另一方面,Netty 需要基于 ByteBuf.release() 方法执行其他操作,例如池化内存释放回内存池,否则该对象会被内存池一直标记为已使用。


-XX:MaxDirectMemorySize 和 -Dio.netty.maxDirectMemory

-XX:MaxDirectMemorySize
用于限制 Netty 中 hasCleaner 策略的 DirectByteBuffer 堆外内存的大小,默认值是 JVM 能从操作系统申请的最大内存,如果内存本身没限制,则值为 Long.MAX_VALUE 个字节(默认值由 Runtime.getRuntime().maxMemory() 返回),代码位于 java.nio.Bits#reserveMemory() 方法中
-XX:MaxDirectMemorySize 无法限制 Netty 中 noCleaner 策略的 DirectByteBuffer 堆外内存的大小

-Dio.netty.maxDirectMemory
用于限制 noCleaner 策略下 Netty 的 DirectByteBuffer 分配的最大堆外内存的大小,如果该值为0,则使用 hasCleaner 策略,代码位于 PlatformDependent#incrementMemoryCounter() 方法中


查看 Netty 使用了多少堆外内存

有 arthas 的话,先通过 ognl '@io.netty.util.internal.PlatformDependent@useDirectBufferNoCleaner()' 确认使用的是否是 noCleaner 的堆外内存。

1、hasCleaner 的 DirectByteBuffer 使用量监控
java.nio.Bits 类是有记录 hasCleaner 的 DirectByteBuffer 堆外内存的使用情况,但是该类是包级别的访问权限,不能直接获取,可以通过 MXBean 来获取

List<BufferPoolMXBean> bufferPoolMXBeans = ManagementFactoryHelper.getBufferPoolMXBeans();
BufferPoolMXBean directBufferMXBean = bufferPoolMXBeans.get(0);
// hasCleaner的DirectBuffer的数量
long count = directBufferMXBean.getCount();
// hasCleaner的DirectBuffer的堆外内存占用大小,单位字节
long memoryUsed = directBufferMXBean.getMemoryUsed();

此外,MappedByteBuffer 是基于 FileChannelImpl.map 进行进行 mmap 内存映射(零拷贝的一种实现)得到的另外一种堆外内存的 ByteBuffer,可以通过 ManagementFactoryHelper.getBufferPoolMXBeans().get(1) 获取到该堆外内存的监控指标

2、noCleaner 的 DirectByteBuffer 使用量监控
看静态方法 PlatformDependent.usedDirectMemory() 的返回即可
arthas 执行

ognl '@io.netty.util.internal.PlatformDependent@maxDirectMemory()'
ognl '@io.netty.util.internal.PlatformDependent@usedDirectMemory()'

Netty 内存泄漏检测

Netty 自带了内存泄漏检测工具,可用于检测出 ByteBuf 对象被 GC 回收但 ByteBuf 管理的内存没有释放的情况,但不适用 ByteBuf 对象还没被 GC 回收内存泄漏的情况,例如任务队列积压。

Netty 提供 4 个内存泄漏检测级别:

  • disabled 完全关闭内存泄露检测
  • simple 以约1%的抽样率检测是否泄露,默认级别
  • advanced 抽样率同simple,但显示详细的泄露报告
  • paranoid 抽样率为100%,显示报告信息同 advanced

在命令行参数配置,比如 -Dio.netty.leakDetectionLevel=paranoid

Netty 内存泄漏检测原理(WeakReference 的引用队列)

Netty 内存泄漏检测的原理是利用弱引用(WeakReference)的引用队列(refQueue),通过将 ByteBuf 对象用弱引用包装起来,当发生GC时,如果 GC 线程检测到 ByteBuf 对象只被弱引用对象关联,会将该 WeakReference 加入 refQueue;
当 ByteBuf 内存被正常释放,会调用 WeakReference 的 clear() 方法解除对 ByteBuf 的引用,后续 GC 线程不会再将该 WeakReference 加入 refQueue;
Netty 在每次创建 ByteBuf 时,基于抽样率,抽样命中时会轮询(poll) refQueue 中的 WeakReference 对象,轮询返回的非null的 WeakReference 关联的 ByteBuf 即为泄漏的堆外内存

如果禁用了内存泄漏检测,直接返回 ByteBuf,否则通过 toLeakAwareBuffer(buf) 返回一个经过包装的 ByteBuf:

toLeakAwareBuffer() 中根据不同的内存泄漏检测级别返回不同的包装结构

public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
    protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
        ResourceLeakTracker<ByteBuf> leak;
        switch (ResourceLeakDetector.getLevel()) {
            case SIMPLE:
                leak = AbstractByteBuf.leakDetector.track(buf);
                if (leak != null) {
                    buf = new SimpleLeakAwareByteBuf(buf, leak);
                }
                break;
            case ADVANCED:
            case PARANOID:
                leak = AbstractByteBuf.leakDetector.track(buf);
                if (leak != null) {
                    buf = new AdvancedLeakAwareByteBuf(buf, leak);
                }
                break;
            default:
                break;
        }
        return buf;
    }
}

track() 中根据采样级别 随机/全部 采样将 ByteBuf 包装为 DefaultResourceLeak

public class ResourceLeakDetector<T> {
    private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();

    public final ResourceLeakTracker<T> track(T obj) {
        return track0(obj);
    }

    @SuppressWarnings("unchecked")
    private DefaultResourceLeak track0(T obj) {
        Level level = ResourceLeakDetector.level;
        if (level == Level.DISABLED) {
            return null;
        }

        // PARANOID 以下级别,随机抽样
        if (level.ordinal() < Level.PARANOID.ordinal()) {
            if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
                reportLeak();
                return new DefaultResourceLeak(obj, refQueue, allLeaks);
            }
            return null;
        }

        // PARANOID 级别,全部采样
        reportLeak();
        return new DefaultResourceLeak(obj, refQueue, allLeaks);
    }
}

DefaultResourceLeak 是弱引用 WeakReference 的子类,内部调用了 WeakReference 的带引用队列的构造方法,传入 refQueue

private static final class DefaultResourceLeak<T> extends WeakReference<Object> implements ResourceLeakTracker<T>, ResourceLeak {
        DefaultResourceLeak(
                Object referent,
                ReferenceQueue<Object> refQueue,
                Set<DefaultResourceLeak<?>> allLeaks) {
            super(referent, refQueue);

            assert referent != null;

            // Store the hash of the tracked object to later assert it in the close(...) method.
            // It's important that we not store a reference to the referent as this would disallow it from
            // be collected via the WeakReference.
            trackedHash = System.identityHashCode(referent);
            allLeaks.add(this);
            // Create a new Record so we always have the creation stacktrace included.
            headUpdater.set(this, new TraceRecord(TraceRecord.BOTTOM));
            this.allLeaks = allLeaks;
        }
}

netty中的三种帧编码方式

方式 解码 特点
固定长度 FixedLengthFrameDecoder 简单,浪费空间
分隔符 DelimiterBasedFrameDecoder 简单,需要转义分隔符
length字段 LengthFieldBasedFrameDecoder 推荐

Netty解决粘包半包问题(含netty实例)
https://segmentfault.com/a/1190000023664467


RxNetty

RxNetty 是将 Reactive 异步机制(类似Node.js) 带入了 Netty.

RxNetty是Netty的Reactive扩展
https://www.jdon.com/46629


上一篇 MySQL-Replication 复制

下一篇 MySQL-DataTypes 数据类型

阅读
评论
2k
阅读预计8分钟
创建日期 2021-07-30
修改日期 2022-07-08
类别
标签

页面信息

location:
protocol:
host:
hostname:
origin:
pathname:
href:
document:
referrer:
navigator:
platform:
userAgent:

评论