• 中美研究人员发现新型狗流感病毒 2019-05-29
  • 豫园商城升级改造:这些楼顶可见最好的风景--旅游频道 2019-05-14
  • 头条 —频道 春城壹网 七彩云南 一网天下 2019-05-14
  • 人为某种意识而奋斗是幸福的,获得成绩或成就更幸福。 2019-05-10
  • 【专题】省违反中央八项规定精神和“四风”问题线索举报平台 2019-05-09
  • 确定这是热身赛?吴前拼到大腿抽筋 拆绷带继续干 2019-05-09
  • 应对排放新规 大众德国工厂计划短暂停产 2019-04-26
  • 一师一团土地确权登记颁证工作全面展开 2019-04-26
  • 一语惊坛(5月31日):“我们不一样”,中国向世界许下一个承诺。 2019-04-22
  • 俄罗斯世界杯F组:球迷风采 2019-04-10
  • 5月份国民经济数据发布:中国经济持续稳中向好 2019-04-10
  • 贵州宣讲十九大:干部争当宣讲员 群众心窝暖洋洋 2019-03-25
  • 别空谈,说说看,这个“简单的逻辑关系”是什么关系? 2019-03-25
  • 快过闪电,MIUI 10与MIUI 9速度对比 2019-03-21
  • 泽州去年“免费教育”资金达5211万元 2019-03-19
  • disruptor 高性能之道

    快乐彩开奖号码 www.752o.com disruptor是一个高性能的线程间异步通信的框架,即在同一个JVM进程中的多线程间消息传递。应用disruptor知名项目有如下的一些:Storm, Camel, Log4j2,还有目前的美团点评技术团队也有很多不少的应用,或者说有一些借鉴了它的设计机制。 下面就跟着笔者一起去领略下disruptor高性能之道吧~

    disruptor是一款开源的高性能队列框架,github地址为 https://github.com/LMAX-Exchange/disruptor。

    分析disruptor,只要把event的生产和消费流程弄懂,基本上disruptor的七寸就已经抓住了?;安欢嗨?,赶紧上车,笔者以下面代码为例讲解disruptor:

    public static void main(String[] args) {
        Disruptor<StringEvent> disruptor = new Disruptor<>(StringEvent::new, 1024,
                new PrefixThreadFactory("consumer-pool-", new AtomicInteger(0)), ProducerType.MULTI,
                new BlockingWaitStrategy());
     
        // 注册consumer并启动
        disruptor.handleEventsWith((EventHandler<StringEvent>) (event, sequence, endOfBatch) -> {
            System.out.println(Util.threadName() + "onEvent " + event);
        });
        disruptor.start();
     
        // publisher逻辑
        Executor executor = Executors.newFixedThreadPool(2,
                new PrefixThreadFactory("publisher-pool-", new AtomicInteger(0)));
        while (true) {
            for (int i = 0; i < 2; i++) {
                executor.execute(() -> {
                    Util.sleep(1);
                    disruptor.publishEvent((event, sequence, arg0) -> {
                        event.setValue(arg0 + " " + sequence);
                    }, "hello world");
                });
            }
     
            Util.sleep(1000);
        }
    }
    class StringEvent {
        private String value;
    
        public String getValue() {
            return value;
        }
    
        public void setValue(String value) {
            this.value = value;
        }
    
        @Override
        public String toString() {
            return "StringEvent:{value=" + value + "}";
        }
    }
    
    class PrefixThreadFactory implements ThreadFactory {
        private String prefix;
        private AtomicInteger num;
    
        public PrefixThreadFactory(String prefix, AtomicInteger num) {
            this.prefix = prefix;
            this.num = num;
        }
    
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, prefix + num.getAndIncrement());
        }
    
    }
    
    class Util {
    
        static String threadName() {
            return String.format("%-16s", Thread.currentThread().getName()) + ": ";
        }
    
        static void sleep(long millis) {
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    测试相关类

    event生产流程

    event的生产是从 RingBuffer.publishEvent 开始的,event生产流程步骤如下:
    • 获取待插入(到ringBuffer的)位置,相当于先占个位
    • 往该位置上设置event
    • 设置sequence对应event的标志,通知consumer
    public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
    {
        // 获取当前要设置的sequence序号,然后进行设置并通知消费者
        final long sequence = sequencer.next();
        translateAndPublish(translator, sequence, arg0);
    }
     
    // 获取下一个sequence,直到获取到位置才返回
    public long next(int n) {
        long current;
        long next;
         
        do {
            // 获取当前ringBuffer的可写入sequence
            current = cursor.get();
            next = current + n;
     
            long wrapPoint = next - bufferSize;
            long cachedGatingSequence = gatingSequenceCache.get();
     
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
                // 如果当前没有空位置写入,获取多个consumer中消费进度最小的那个的消费进度
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
     
                if (wrapPoint > gatingSequence) {
                    // 阻塞1ns,然后continue
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }
     
                gatingSequenceCache.set(gatingSequence);
            }
            // cas设置ringBuffer的sequence
            else if (cursor.compareAndSet(current, next)) {
                break;
            }
        } while (true);
     
        return next;
    }
     
    private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) {
        try {
            // 设置event
            translator.translateTo(get(sequence), sequence, arg0);
        } finally {
            sequencer.publish(sequence);
        }
    }
    public void publish(final long sequence) {
        // 1. 设置availableBuffer,表示对应的event是否设置完成,consumer线程中会用到
        //   - 注意,到这里时,event已经设置完成,但是consumer还不知道该sequence对应的event是否设置完成,
        //   - 所以需要设置availableBuffer中sequence对应event的sequence number
        // 2. 通知consumer
        setAvailable(sequence);
        waitStrategy.signalAllWhenBlocking();
    }

    从translateAndPublish中看,如果用户的设置event方法抛出异常,这时event对象是不完整的,那么publish到consumer端,consumer消费的不是完整的数据怎么办呢?在translateAndPublish中需不需要在异常情况下reset event对象呢?关于这个问题笔者之前是有疑问的,关于这个问题笔者提了一个issue,可点击 https://github.com/LMAX-Exchange/disruptor/issues/244 进行查看。

    笔者建议在consumer消费完event之后,进行reset event操作,这样避免下次设置event异常consumer时取到不完整的数据,比如log4j2中的AsyncLogger中处理完log4jEvent之后就会调用clear方法进行重置event。

    event消费流程

    event消费流程入口是BatchEventProcessor.processEvents,event消费流程步骤:
    • 获取当前consumer线程消费的offset,即nextSequence
    • 从ringBuffer获取可用的sequence,没有新的event时,会根据consmer阻塞策略进行执行某些动作
    • 获取event,然后执行event回调
    • 设置当前consumer线程的消费进度
    private void processEvents() {
        T event = null;
        long nextSequence = sequence.get() + 1L;
     
        while (true) {
            try {
                // 获取可用的sequence,默认直到有可用sequence时才返回
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);
                if (batchStartAware != null) {
                    batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
                }
     
                // 执行消费回调动作,注意,这里获取到一个批次event,可能有多个,个数为availableSequence-nextSequence + 1
                // nextSequence == availableSequence表示该批次只有一个event
                while (nextSequence <= availableSequence) {
                    // 获取nextSequence位置上的event
                    event = dataProvider.get(nextSequence);
                    // 用户自定义的event 回调
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                    nextSequence++;
                }
     
                // 设置当前consumer线程的消费进度sequence
                sequence.set(availableSequence);
            } catch (final Throwable ex) {
                exceptionHandler.handleEventException(ex, nextSequence, event);
                sequence.set(nextSequence);
                nextSequence++;
            }
        }
    }
     
    public long waitFor(final long sequence)
            throws AlertException, InterruptedException, TimeoutException{
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
     
        if (availableSequence < sequence) {
            return availableSequence;
        }
     
        // 获取ringBuffer中可安全读的最大的sequence number,该信息存在availableBuffer中的sequence
        // 在MultiProducerSequencer.publish方法中会设置
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }
     
    // 默认consumer阻塞策略 BlockingWaitStrategy
    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        if (cursorSequence.get() < sequence) {
            // 当前ringBuffer的sequence小于sequence,阻塞等待
            // event生产之后会唤醒
            synchronized (mutex) {
                while (cursorSequence.get() < sequence) {
                    barrier.checkAlert();
                    mutex.wait();
                }
            }
        }
     
        while ((availableSequence = dependentSequence.get()) < sequence) {
            barrier.checkAlert();
            ThreadHints.onSpinWait();
        }
     
        return availableSequence;
    }

    从上面的event消费流程来看,消费线程会读取ringBuffer的sequence,然后更新本消费线程内的offset(消费进度sequence),如果有多个event的话,那么就是广播消费模式了(单consumer线程内还是顺序消费),如果不想让event被广播消费(重复消费),可使用如下方法添加consumer线程(WorkHandler是集群消费,EventHandler是广播消费):

    disruptor.handleEventsWithWorkerPool((WorkHandler<StringEvent>) event -> {
        System.out.println(Util.threadName() + "onEvent " + event);
    });

    disruptor高性能之道

    弃用锁机制改用CAS

    event生产流程中获取并自增sequence时用的就是CAS,获取之后该sequence对应位置的操作只会在单线程,没有了并发问题。

    集群消费模式下获取sequence之后也会使用CAS设置为sequence新值,设置本地消费进度,然后再执行获取event并执行回调逻辑。

    注意,disruptor中较多地方使用了CAS,但并不代表完全没有了锁机制,比如默认consumer阻塞策略 BlockingWaitStrategy发挥作用时,consumer消费线程就会阻塞,只不过这只会出现在event生产能力不足是才会存在。如果consumer消费不足,大量event生产导致ringBuffer爆满,这时event生产线程就会轮询调用LockSupport.parkNanos(1),这里的成本也不容小觑(涉及到线程切换损耗)。

     
    避免伪共享引入缓冲行填充

    伪共享讲的是多个CPU时的123级缓存的问题,通常,缓存是以缓存行的方式读取数据,如果A、B两个变量被缓冲在同一行之内,那么对于其中一个的更新会导致另一个缓冲无效,需要从内存中读取,这种无法充分利用缓存行的问题就是伪共享。disruptor相关代码如下:

    class LhsPadding {
        protected long p1, p2, p3, p4, p5, p6, p7;
    }
    class Value extends LhsPadding {
        protected volatile long value;
    }
     
    使用RingBuffer作为数据存储容器

    ringBuffer是一个环形队列,本质是一个数组,size为2的幂次方(方便做&操作),数据位置sequence值会和size做&操作得出数组下标,然后进行数据的读写操作(只在同一个线程内,无并发问题)。

     
    小结

    disruptor初衷是为了解决内存队列的延迟问题,作为一个高性能队列,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都在使用。disruptor的重要机制就是CAS和RingBuffer,借助于它们两个实现数据高效的生产和消费。

    disruptor多生产者多消费者模式下,因为RingBuffer数据的写入是分为2步的(先获取到个sequence,然后写入数据),如果获取到sequence之后,生产者写入RingBuffer较慢,consumer消费较快,那么生产者最终会拖慢consumer消费进度,这一点需注意(如果已经消费到生产者占位的前一个数据了,那么consumer会执行对应的阻塞策略)。在实际使用过程中,如果consumer消费逻辑耗时较长,可以封装成任务交给线程池来处理,避免consumer端拖慢生成者的写入速度。

    disruptor的设计对于开发者来说有哪些借鉴的呢?尽量减少竞争,避免多线程对同一数据做操作,比如disruptor使用CAS获取只会在一个线程内进行读写的event对象,这种思想其实已经在JDK的thread本地内存中有所体现;尽量复用对象,避免大量的内存申请释放,增加GC损耗,disruptor通过复用event对象来保证读写时不会产生对象GC问题;选择合适数据结构,disruptor使用ringBuffer,环形数组来实现数据高效读写。

     

    参考资料:

    1、https://tech.meituan.com/disruptor.html

    posted @ 2018-11-15 09:56 向南l 阅读(...) 评论(...) 编辑 收藏
  • 中美研究人员发现新型狗流感病毒 2019-05-29
  • 豫园商城升级改造:这些楼顶可见最好的风景--旅游频道 2019-05-14
  • 头条 —频道 春城壹网 七彩云南 一网天下 2019-05-14
  • 人为某种意识而奋斗是幸福的,获得成绩或成就更幸福。 2019-05-10
  • 【专题】省违反中央八项规定精神和“四风”问题线索举报平台 2019-05-09
  • 确定这是热身赛?吴前拼到大腿抽筋 拆绷带继续干 2019-05-09
  • 应对排放新规 大众德国工厂计划短暂停产 2019-04-26
  • 一师一团土地确权登记颁证工作全面展开 2019-04-26
  • 一语惊坛(5月31日):“我们不一样”,中国向世界许下一个承诺。 2019-04-22
  • 俄罗斯世界杯F组:球迷风采 2019-04-10
  • 5月份国民经济数据发布:中国经济持续稳中向好 2019-04-10
  • 贵州宣讲十九大:干部争当宣讲员 群众心窝暖洋洋 2019-03-25
  • 别空谈,说说看,这个“简单的逻辑关系”是什么关系? 2019-03-25
  • 快过闪电,MIUI 10与MIUI 9速度对比 2019-03-21
  • 泽州去年“免费教育”资金达5211万元 2019-03-19
  • 龙珠激斗无限龙石版 新浪竞彩足球比分直接 2014年福利彩票走势图 21世界古墓奇兵 主机游戏平台有哪些 双色球字谜 热血羽毛球怎么玩 17051足彩奖金彩果 25号安徽快3开奖结果 浙江风彩排列三走势图 奇迹觉醒攻略平民职业 丛林吉姆黄金国电子游戏 鲁11选5稳赚技巧 吉林快3走势图淘宝 七星彩走势图一综自版 完美世界手游职业需求排行