Java 的源代码学习(5)——Netty源代码学习:AttributeMap 的实现

“Java 的源代码学习”系列已经歇菜了很长时间了,其主要原因是老夫太懒,写这种文章费脑子不说还很花时间。这几天瞄了一眼 Netty 的源代码,发现其设计和实现都相当值得学习,所以这个系列暂时烂尾一下之前没写完的部分,用几篇博文的篇幅把 Netty 的源代码学习完。

AttributeMap 是 Netty 自行设计的一个用于存储属性值的东东,可以理解为是一个线程安全的 Map。而 AttributeMap 的实现 DefaultAttributeMap 是 Channel 最顶级的基类(除了 Object),示意图如下(使用 StarUML 绘制,下同)。

貌似有点混乱,但可以看出,每一个 Channel 都是 DefaultAttributeMap 的子类。按照作者的说法,自己实现而不是用 ConcurrentHashMap 的原因是为了降低内存占用(乃确定有这么复杂的 OO 设计后存储属性节省的内存真的占了很大比例么?),老夫们就从这类的设计入手开始学习 netty 的源代码。

public class DefaultAttributeMap implements AttributeMap {
 
    @SuppressWarnings("rawtypes")
    private static final AtomicReferenceFieldUpdater<DefaultAttributeMap, AtomicReferenceArray> updater =
            AtomicReferenceFieldUpdater.newUpdater(DefaultAttributeMap.class, AtomicReferenceArray.class, "attributes");
 
    private static final int BUCKET_SIZE = 4;
    private static final int MASK = BUCKET_SIZE  - 1;
 
    // Initialize lazily to reduce memory consumption; updated by AtomicReferenceFieldUpdater above.
    @SuppressWarnings("UnusedDeclaration")
    private volatile AtomicReferenceArray<DefaultAttribute<?>> attributes;

(PS:WordPress 的这个新的代码高亮器用起来方便多了,就是没找到怎么设置字体大小以及换行。)

  • 保存属性用的是 AtomicReferenceArray(12 行),与其他 atomic 包中的类类似,该类提供了一个引用数组中针对某一个元素进行原子操作的方法,其本质是访问 Unsafe 类来对数组中对应 offset 的位置进行 compareAndSet(代码省略)。4-5 行是对应的更新器,主要是检查对应的访问权限和类型后访问 Unsafe,代码省略。
  • 7-8 行:每 4 个元素被放在一个“桶”里,从这里可以看出应该是一个分段锁(数组+链表)的设计。
@Override
public <T> Attribute<T> attr(AttributeKey<T> key) {
    if (key == null) {
        throw new NullPointerException("key");
    }
    AtomicReferenceArray<DefaultAttribute<?>> attributes = this.attributes;
    if (attributes == null) {
        // Not using ConcurrentHashMap due to high memory consumption.
        attributes = new AtomicReferenceArray<DefaultAttribute<?>>(BUCKET_SIZE);
 
        if (!updater.compareAndSet(this, null, attributes)) {
            attributes = this.attributes;
        }
    }
  • 9 行:在第一次访问 attr 方法时,才开始分配内存,一共分配长度为 4 的数组,也就是说这个 Map 会把元素存在这 4 个位置中的其中一个链表上。
  • 最开始时,检测到整个 attributes 为 null 的情况,用 CAS 方式尝试更新此属性开始分配第一片内存。如果此时有另一个线程也在更新,而当前线程没有更新成功的话,attributes 仍然为 null。
int i = index(key);
DefaultAttribute<?> head = attributes.get(i);
if (head == null) {
    // No head exists yet which means we may be able to add the attribute without synchronization and just
    // use compare and set. At worst we need to fallback to synchronization and waste two allocations.
    head = new DefaultAttribute();
    DefaultAttribute<T> attr = new DefaultAttribute<T>(head, key);
    head.next = attr;
    attr.prev = head;
    if (attributes.compareAndSet(i, null, head)) {
        // we were able to add it so return the attr right away
        return attr;
    } else {
        head = attributes.get(i);
    }
}
  • 1 行:index(head) 方法为“return key.id() & MASK;”,用于计算要获取的 key 的子数组的下标。每个 key 的 id 是一个递增且不重复的序号,用这个方法来给 key 分配位置。
  • 14 行:如果 head 设置失败了,则说明另一个线程已经初始化了这个数组,那么此时只需拿到的链表进行后面的插入操作。
    synchronized (head) {
        DefaultAttribute<?> curr = head;
        for (;;) {
            DefaultAttribute<?> next = curr.next;
            if (next == null) {
                DefaultAttribute<T> attr = new DefaultAttribute<T>(head, key);
                curr.next = attr;
                attr.prev = curr;
                return attr;
            }
 
            if (next.key == key && !next.removed) {
                return (Attribute<T>) next;
            }
            curr = next;
        }
    }
}

这一段代码就是在对应的链表中查找或者更新相应的属性了。可以看到一把大锁加在的 head 上,之后就是很常见的链表操作了,找不到插入到链表末尾,找到了就返回。这里要注意的是第 12 行的 removed 字段,在一个属性被 remove 时,会调用 DefaultAttribute 的 remove 方法,我们接下来开始看 DefaultAttribute 这个类。

从上面的分析可以看出,AtomicReferenceArray 的每一个元素存储的是一个类型为 DefaultAttribute 的双向链表。它的定义如下:

private static final class DefaultAttribute<T> extends AtomicReference<T> implements Attribute<T> {
 
    private static final long serialVersionUID = -2661411462200283011L;
 
    // The head of the linked-list this attribute belongs to
    private final DefaultAttribute<?> head;
    private final AttributeKey<T> key;
 
    // Double-linked list to prev and next node to allow fast removal
    private DefaultAttribute<?> prev;
    private DefaultAttribute<?> next;
 
    // Will be set to true one the attribute is removed via getAndRemove() or remove()
    private volatile boolean removed;

head 字段存储的是该元素所在的链表表头,prev 和 next 则是双链表中的前后两个元素,removed 用来标记当前属性有没有被干掉(话说一个属性额外存储了这么多字段真的节省了内存吗?)。

@Override
public void remove() {
    removed = true;
    set(null);
    remove0();
}
 
private void remove0() {
    synchronized (head) {
        if (prev == null) {
            // Removed before.
            return;
        }
 
        prev.next = next;
 
        if (next != null) {
            next.prev = prev;
        }
 
        // Null out prev and next - this will guard against multiple remove0() calls which may corrupt
        // the linked list for the bucket.
        prev = null;
        next = null;
    }
}

在 remove 时,也是一把大锁加在了 head 上(这也就是为什么额外需要一个 head 字段),然后可以随心所欲地对链表进行动刀。总体上看,DefaultAttributeMap 为了减少内存占用使用的时一个双向链表来保存各属性值,同时设置了 4 个桶来减少并发时加锁的几率。

个人觉得这个设计【不好】,为了实现双向链表以及分段锁每个属性额外存储了多个字段,本来一个 4 个字节的 int 属性,这里至少额外需要 1 个 boolean 类型的 removed (4 个字节)、三个链表相关的引用类型(3*8=24 个字节,64 位且不压缩指针)、DefaultAttribute 继承了 AtomicReference 而导致的其他开销,所以感觉有点得不偿失。

另外位了减少竞争引用了一个长度为 4 的数组,毕竟号称要减少内存占用,所以也不能设置得太多,不然就更浪费啦。然而在高并发下,如果属性很多,要对链表进行顺序查找,这个也很花时间而导致 head 被锁的时间过长。因此个人不太赞同这个设计,不过其实际跑起来与 ConcurrentHashMap 有多大区别,恐怕只有作者自己才有数据了。

AttributeKey 继承自 AbstractConstant 类,用来记录每个属性的唯一编号以及名字:

public abstract class AbstractConstant<T extends AbstractConstant<T>> implements Constant<T> {
 
    private static final AtomicLong uniqueIdGenerator = new AtomicLong();
    private final int id;
    private final String name;
    private final long uniquifier;

这里用了一个 AtomicLong 用来做发号器,只要不重复就行(保证能放入对应的桶中),其他都无所谓。这里的泛型中的“循环”类型主要是方便进行 compareTo。

AttributeKey 本身用了一个 ConstantPool 类存储属性值。在 getOrCreate 方法中,采用了类似乐观锁的方式:

public final class AttributeKey<T> extends AbstractConstant<AttributeKey<T>> {
 
    private static final ConstantPool<AttributeKey<Object>> pool = new ConstantPool<AttributeKey<Object>>() {
        @Override
        protected AttributeKey<Object> newConstant(int id, String name) {
            return new AttributeKey<Object>(id, name);
        }
    };
 
    /**
     * Returns the singleton instance of the {@link AttributeKey} which has the specified {@code name}.
     */
    @SuppressWarnings("unchecked")
    public static <T> AttributeKey<T> valueOf(String name) {
        return (AttributeKey<T>) pool.valueOf(name);
    }
 
    /**
     * Get existing constant by name or creates new one if not exists. Threadsafe
     *
     * @param name the name of the {@link Constant}
     */
    private T getOrCreate(String name) {
        T constant = constants.get(name);
        if (constant == null) {
            final T tempConstant = newConstant(nextId(), name);
            constant = constants.putIfAbsent(name, tempConstant);
            if (constant == null) {
                return tempConstant;
            }
        }
 
        return constant;
    }

这个类最终还是用回了 ConcurrentHashMap:

public abstract class ConstantPool<T extends Constant<T>> {
 
    private final ConcurrentMap<String, T> constants = PlatformDependent.newConcurrentHashMap();
 
    private final AtomicInteger nextId = new AtomicInteger(1);

所以也就是说,AttributeMap 用来存储对应 Channel 中有哪些属性,而属性的具体值则是由 AttributeKey 自己保存在一个静态的 ConcurrentHashMap 里的。所以 AttributeKey 是一个全局唯一的属性(在同一个进程中),是值游离在 AttributeMap 之外的。

今天先研究到这里了,下面几篇博文会着重研究 Netty 中其他重要的类。

✏️ 有任何想法?欢迎发邮件告诉老夫:daozhihun@outlook.com