Disruptor的简单介绍与应用

前言

最近工作比较忙,在工作项目中,看了很多人都自己实现了一套数据任务处理机制,个人感觉有点乱,且也方便他人的后续维护,所以想到了一种数据处理模式,即生产者、缓冲队列、消费者的模式来统一大家的实现逻辑。

下面时是对Disruptor基本使用的演示。使用中需要引入依赖

<dependency>
  <groupId>com.lmax</groupId>
  <artifactId>disruptor</artifactId>
  <version>3.4.2</version>
</dependency>

名称解释

  • Ring Buffer

    环境的缓存区,3.0版本以前被认为是Disruptor的主要成员。3.0版本以后,环形缓冲区只负责通过Disruptor的事件方式来对数据进行存储和更新。在一些高级的应用场景中,Ring Buffer可以由用户的自定义实现完全替代。

  • Sequence

    Disruptor使用Sequence作为一种方法来确定特定组件的位置。每个使用者(EventProcessor)与Disruptor本身一样维护一个序列。大多数并发代码依赖于这些序列值的移动,因此序列支持AtomicLong的许多当前特性。事实上,两者之间唯一真正的区别是序列包含额外的功能,以防止序列和其他值之间的错误共享。

  • Sequencer

    Sequencer是真正的核心,该接口的两个实现(单生产者, 多消费者)实现了所有用于在生产者和使用者之间的快速、正确的传递数据的并发算法。

  • Sequence Barrier

    序列屏障由Sequencer产生,包含对Sequencer和任何依赖消费者的序列的引用。它包含确定是否有任何事件可供使用者处理的逻辑。

  • Wait Strategy

    等待策略确定消费者将如何等待生产者产生的消息,Disruptor将消息放到事件(Event)中。

  • Event

    从生产者到消费者的数据单位。不存在完全由用户定义的事件的特定代码的表示形式。

  • EventProcessor

    EventProcessor持有特定消费者(Consumer)的Sequence,并提供用于调用事件处理实现的事件循环。

  • BatchEventProcessor

    BatchEventProcessor它包含事件循环的有效实现,并将回调到已使用的EventHandle接口实现。

  • EventHandler

    Disruptor定义的事件处理接口,由用户实现,用于处理事件,是Consumer的真正实现。

  • Producer

    生产者,只是泛指调用Disruptor发布事件的用户代码,Disruptor没有定义特定接口或类型。

架构图

简单实用Disruptor

1 定义事件

事件就是通过Disruptor进行交换的数据类型。
package com.disruptor;

public class Data {

    private long value;

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}

2 定义事件工厂

事件工厂定义了如何实例化第一步中定义的事件。Disruptor通过EventFactory在RingBuffer中预创建Event的实例。

一个Event实例被用作一个数据槽,发布者发布前,先从RingBuffer获得一个Event的实例,然后往Event实例中插入数据,然后再发布到RingBuffer中,最后由Consumer获得Event实例并从中读取数据。

package com.disruptor;

import com.lmax.disruptor.EventFactory;

public class DataFactory implements EventFactory<Data> {

    @Override
    public Data newInstance() {
        return new Data();
    }
}

3 定义生产者

package com.disruptor;

import com.lmax.disruptor.RingBuffer;

import java.nio.ByteBuffer;

public class Producer {

    private final RingBuffer<Data> ringBuffer;

    public Producer(RingBuffer<Data> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void pushData(ByteBuffer byteBuffer) {
        long sequence = ringBuffer.next();

        try {
            Data even = ringBuffer.get(sequence);
            even.setValue(byteBuffer.getLong(0));
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

4 定义消费者

package com.disruptor;

import com.lmax.disruptor.WorkHandler;

import java.text.MessageFormat;


public class Consumer implements WorkHandler<Data> {

    @Override
    public void onEvent(Data data) throws Exception {
        long result = data.getValue() + 1;

        System.out.println(MessageFormat.format("Data process : {0} + 1 = {1}", data.getValue(), result));
    }
}

5 启动Disruptor

  • 测试Demo
package com.disruptor;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

import java.nio.ByteBuffer;
import java.util.concurrent.ThreadFactory;


public class Main {

    private static final int NUMS = 10;

    private static final int SUM = 1000000;

    public static void main(String[] args) {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        long start = System.currentTimeMillis();

        DataFactory factory = new DataFactory();

        int buffersize = 1024;

        Disruptor<Data> disruptor = new Disruptor<Data>(factory, buffersize, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        });

        Consumer[] consumers = new Consumer[NUMS];
        for (int i = 0; i < NUMS; i++) {
            consumers[i] = new Consumer();
        }

        disruptor.handleEventsWithWorkerPool(consumers);
        disruptor.start();

        RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
        Producer producer = new Producer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long i = 0; i < SUM; i++) {
            bb.putLong(0, i);
            producer.pushData(bb);
            System.out.println("Success producer data : " + i);
        }
        long end = System.currentTimeMillis();

        disruptor.shutdown();

        System.out.println("Total time : " + (end - start));
    }
}
  • 结果(部分结果展示)
Data process : 999,987 + 1 = 999,988
Success producer data : 999995
Data process : 999,990 + 1 = 999,991
Data process : 999,989 + 1 = 999,990
Data process : 999,991 + 1 = 999,992
Data process : 999,992 + 1 = 999,993
Data process : 999,993 + 1 = 999,994
Data process : 999,995 + 1 = 999,996
Success producer data : 999996
Success producer data : 999997
Success producer data : 999998
Success producer data : 999999
Data process : 999,994 + 1 = 999,995
Data process : 999,996 + 1 = 999,997
Data process : 999,997 + 1 = 999,998
Data process : 999,998 + 1 = 999,999
Data process : 999,999 + 1 = 1,000,000
Total time : 14202
由结果展示可见,边生产、边消费。

彩蛋

1 事件转换类

package com.mm.demo.disruptor.translator;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.mm.demo.disruptor.entity.Data;

public class DataEventTranslator implements EventTranslatorOneArg<Data, Long> {

    @Override
    public void translateTo(Data event, long sequence, Long arg0) {
        System.out.println(MessageFormat.format("DataEventTranslator arg0 = {0}, seq = {1}", arg0, sequence));
        event.setValue(arg0);
    }
}

2 消费者

2.1 消费者Demo1

消费者每次将event的结果加1。

package com.mm.demo.disruptor.handler;

import com.lmax.disruptor.EventHandler;
import com.mm.demo.disruptor.entity.Data;

import java.text.MessageFormat;

public class D1DataEventHandler implements EventHandler<Data> {

    @Override
    public void onEvent(Data event, long sequence, boolean endOfBatch) throws Exception {
        long result = event.getValue() + 1;
        Thread t = new Thread();
        String name = t.getName();
        System.out.println(MessageFormat.format("consumer "+name+": {0} + 1 = {1}", event.getValue(), result));
    }

}
这里是使用的是EventHandler。也是使用WorkHandler,EventHandler和WorkHandler的区别是前者不需要池化,后者需要池化。

2.2 消费者Demo2

package com.mm.demo.disruptor.handler;

import com.lmax.disruptor.EventHandler;
import com.mm.demo.disruptor.entity.Data;

import java.text.MessageFormat;


public class D2DataEventHandler implements EventHandler<Data> {

    @Override
    public void onEvent(Data event, long sequence, boolean endOfBatch) throws Exception {
        long result = event.getValue() + 2;
        System.out.println(MessageFormat.format("consumer 2: {0} + 2 = {1}", event.getValue(), result));
    }
}

2.3 串行依次计算

Consumer1执行完成再执行Consumer2。

package com.mm.demo.disruptor.process;

import com.lmax.disruptor.dsl.Disruptor;
import com.mm.demo.disruptor.entity.Data;
import com.mm.demo.disruptor.handler.D1DataEventHandler;
import com.mm.demo.disruptor.handler.D2DataEventHandler;

/**
 * 串行依次计算
 * @DateT: 2020-01-07
 */
public class Serial {

    public static void serial(Disruptor<Data> disruptor) {
        disruptor.handleEventsWith(new D1DataEventHandler()).then(new D2DataEventHandler());
        disruptor.start();
    }
}

2.4 并行实时计算

Consumer1和Consumer2同时执行。

package com.mm.demo.disruptor.process;

import com.lmax.disruptor.dsl.Disruptor;
import com.mm.demo.disruptor.entity.Data;
import com.mm.demo.disruptor.handler.D1DataEventHandler;
import com.mm.demo.disruptor.handler.D2DataEventHandler;

/**
 * 并行执行
 * @DateT: 2020-01-07
 */
public class Parallel {

    public static void parallel(Disruptor<Data> dataDisruptor) {
        dataDisruptor.handleEventsWith(new D1DataEventHandler(), new D2DataEventHandler());
        dataDisruptor.start();
    }
}

2.5 测试类

package com.mm.demo.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.mm.demo.disruptor.entity.Data;
import com.mm.demo.disruptor.handler.D1DataEventHandler;
import com.mm.demo.disruptor.process.Parallel;
import com.mm.demo.disruptor.process.Serial;
import com.mm.demo.disruptor.translator.DataEventTranslator;

import javax.swing.plaf.synth.SynthTextAreaUI;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;


public class Main {

    private static final int BUFFER = 1024 * 1024;

    public static void main(String[] args) {

        DataFactory factory = new DataFactory();

        Disruptor<Data> disruptor = new Disruptor<Data>(factory, BUFFER, Executors.defaultThreadFactory(), ProducerType.MULTI, new BlockingWaitStrategy());

      
        Serial.serial(disruptor);
//        Parallel.parallel(disruptor);

        RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
        for (int i = 0; i < 2; i++) {
            ringBuffer.publishEvent(new DataEventTranslator(), (long)i);
        }
        disruptor.shutdown();
    }
}

总结

上边只演示了串行和并行的方式,其实还是通过组合的方式创建不的计算处理方式(需要创建多个事件处理器EventHandler)。

补充等待策略

  • BlockingWaitStrategy:最低效的策略,但是对cpu的消耗是最小的,在各种不同部署环境中能提供更加一致的性能表现。
  • SleepingWaitStrategy:性能和BlockingWaitStrategy差不多少,cpu消耗也类似,但是其对生产者线程的影响最小,适合用于异步处理数据的场景。
  • YieldingWaitStrategy:性能是最好的,适用于低延迟的场景。在要求极高性能且事件处理线程数小于cpu处理核数时推荐使用此策略。
  • BusySpinWaitStrategy:低延迟,但是对cpu资源的占用较多。
  • PhasedBackoffWaitStrategy:上边几种策略的综合体,延迟大,但是占用cpu资源较少。

参考

本文参考了Disruptor源码以及github中的部分说明。

Demo源码地址

github


  • 写作不易,转载请注明出处,喜欢的小伙伴可以关注公众号查看更多喜欢的文章。
  • 联系方式:4272231@163.com
  • QQ:95472323
  • 微信:ffj2000
Image placeholder
忘返
未设置
  84人点赞

没有讨论,发表一下自己的看法吧

推荐文章
PHP接口的介绍与实现

PHP接口的介绍与实现 接口定义了实现某种服务的一般规范,声明了所需的函数和常量,但不指定如何实现。之所以不给出实现的细节,是因为不同的实体可能需要用不同的方式来实现公共的方法定义。关键是要建立必须实

Kafka 集群在马蜂窝大数据平台的优化与应用扩展

马蜂窝技术原创文章,更多干货请订阅公众号:mfwtechKafka是当下热门的消息队列中间件,它可以实时地处理海量数据,具备高吞吐、低延时等特性及可靠的消息异步传递机制,可以很好地解决不同系统间数据的

个人学习系列 - httpd的简单应用

想学习一下前端的代码,自然而然就希望能部署并观察一下自己写的烂代码了。所以,就研究一下httpd这个工具了。httpd的使用docker中的httpd的获取查询httpd的镜像并下载 查询httpd

JavaScript中对“this”的简单理解

1.this的奥秘很多时候,JS中的this对于咱们的初学者很容易产生困惑不解。this的功能很强大,但需要一定付出才能慢慢理解它。对Java、PHP或其他标准语言来看,this表示类方法中当前对象的

angular的注入器是什么?

在依赖注入和依赖查找的时候注入器和提供器就需要使用。接下来就简单介绍一下注入器和提供器。注入器Angular提供的类,一般不需调用,会自动通过组件的构造函数注入。1.当一个提供器提供在模块中时,他是对

深入了解Nodejs Buffer的使用

JavaScript起初为浏览器而设计,没有读取或操作二进制数据流的机制。Buffer类的引入,则让NodeJS拥有操作文件流或网络二进制流的能力。Buffer基本概念Buffer对象的内存分配不是在

leveldb源代码分析系列1.1:memtable中comparator的实现

leveldb中memtable封装了一个skiplist用来存储真正的数据,跳跃列表的实现一定需要定义存储项的序关系,而在leveldb中这个序关系通过comparator相关类来实现。leveld

vue..js 编写的简单音乐播放器

闲暇之余写了一个音乐小应用项目目录代码开始index.html 每日推荐音乐 {{music.title}} {{music.author}} n

Laravel 7 的简单隐式路由模型绑定

在2020年二月份即将到来的Laravel的下一个主要发行版本 ,你可以直接在路由定义中自定义隐式路由模型绑定:Route::get('/posts/{post:slug}',function(Pos

Flink流式计算在节省资源方面的简单分析

关于Flink流式计算节省资源方面你必须知道的技巧小米在流式计算方面经历了Storm、SparkStreaming和Flink的发展历程;从2019年1月接触Flink到现在,已经过去了大半年的时间了

使用 Vue.js 和 Iris 共建一个简单的 Todo MVC 应用

本文用Golang的Iris框架作为后端服务,vuejs渲染前端UI,用websocket通信。基于监听hash变化director.js库实现简单路由,axios库与后方沟通,netoffos.j

使用 Vue.js 和 Iris 共建一个简单的 Todo MVC 应用

数据服务 packagetodo import"sync" //Item条目数据 typeItemstruct{ SessionIDstring`json:"-"` IDint64`json:"i

如何使你的JavaScript代码简单易读

解决同一问题的方法有很多,但有些解决方法很复杂,甚至有些是荒谬的。在这篇文章中,我想谈谈解决同样问题的好方法和坏方法。让我们先从怎样删除数组中的重复项这个简单问题开始。复杂-使用forEach删除重复

浅析RunLoop原理及其应用

引言:一个APP的启动与结束都是伴随着RunLoop循环往复的,不断的循环、不断的往复。当线程被杀掉、APP退出后被系统以占用内存为由杀掉,RunLoop就消失了。但平时开发中很少见到RunLoop,

JavaScript函数及应用笔记

当变量名与函数名相同时,会优先使用函数名,例如:console.log(a);vara=10;functiona(){console.log('aaaaa');}console.log

JavaScript的DOM应用笔记

获取a标签:document.links获取img标签:document.images获取form标签:document.forms

JavaScript的DOM应用笔记

通过id获取:document.getElementById()写入HTML:innerHTML()内部outerHTML()外部

JavaScript的DOM应用笔记

操作元素属性:element.attribute=newvalueelement.setAttribute(attribute,value)element.getAttribute(attribute

JavaScript的DOM应用笔记

通过id或标签名获取元素对象:getElementById(‘id’)getElementsByTagName(‘标签名’)getElementsByName(‘name’)getElementByC

JavaScript的DOM应用笔记

这里的修改className属性,因为长度会动态改变,所以需要将修改的下标设为0,

可视化的JavaScript:JavaScript引擎运行原理

JavaScript很酷,但是JavaScript引擎是如何才能理解我们编写的代码呢?作为JavaScript开发人员,我们通常不需要自己处理编译器。然而,了解JavaScript引擎的基础知识并了解

GoWeb教程_09.1. 预防CSRF攻击

什么是CSRF CSRF(Cross-siterequestforgery),中文名称:跨站请求伪造,也被称为:oneclickattack/sessionriding,缩写为:CSRF/XSRF。

介绍 Go 的数组和切片

学习在Go中使用数组和切片存储数据的优缺点,以及为什么其中一个比另一个更好图片来自于:carrotmadman6.ModifiedbyOpensource.com.CCBY-SA2.0 数组 数组是编

Java 程序员眼中的 Linux_1.0.Linux 介绍

Linux介绍 Linux这个名字 Linux的Wiki介绍:http://zh.wikipedia.org/zh/Linux Linux也称:GNU/Linux,而其中GNU的全称又是:Gnu’sN