一个简单的基于 Redis 的分布式任务调度器 —— Java 语言实现

折腾了一周的 Java Quartz 集群任务调度,很遗憾没能搞定,网上的相关文章也少得可怜,在多节点(多进程)环境下 Quartz 似乎无法动态增减任务,恼火。无奈之下自己撸了一个简单的任务调度器,结果只花了不到 2天时间,而且感觉非常简单好用,代码量也不多,扩展性很好。

实现一个分布式的任务调度器有几个关键的考虑点

  1. 单次任务和循环任务好做,难的是 cron 表达式的解析和时间计算怎么做?
  2. 多进程同一时间如何保证一个任务的互斥性?
  3. 如何动态变更增加和减少任务?

代码实例

在深入讲解实现方法之前,我们先来看看这个调度器是如何使用的

class Demo {    public static void main(String[] args) {        var redis = new RedisStore();        // sample 为任务分组名称        var store = new RedisTaskStore(redis, "sample");        // 5s 为任务锁寿命        var scheduler = new DistributedScheduler(store, 5);        // 注册一个单次任务        scheduler.register(Trigger.onceOfDelay(5), Task.of("once1", () -> {            System.out.println("once1");        }));        // 注册一个循环任务        scheduler.register(Trigger.periodOfDelay(5, 5), Task.of("period2", () -> {            System.out.println("period2");        }));        // 注册一个 CRON 任务        scheduler.register(Trigger.cronOfMinutes(1), Task.of("cron3", () -> {            System.out.println("cron3");        }));        // 设置全局版本号        scheduler.version(1);        // 注册监听器        scheduler.listener(ctx -> {            System.out.println(ctx.task().name() + " is complete");        });        // 启动调度器        scheduler.start();    }}

当代码升级任务需要增加减少时(或者变更调度时间),只需要递增全局版本号,现有的进程中的任务会自动被重新调度,那些没有被注册的任务(任务减少)会自动清除。新增的任务(新任务)在老代码的进程里是不会被调度的(没有新任务的代码无法调度),被清除的任务(老任务)在老代码的进程里会被取消调度。

比如我们要取消 period2 任务,增加 period4 任务

class Demo {    public static void main(String[] args) {        var redis = new RedisStore();        // sample 为任务分组名称        var store = new RedisTaskStore(redis, "sample");        // 5s 为任务锁寿命        var scheduler = new DistributedScheduler(store, 5);        // 注册一个单次任务        scheduler.register(Trigger.onceOfDelay(5), Task.of("once1", () -> {            System.out.println("once1");        }));        // 注册一个 CRON 任务        scheduler.register(Trigger.cronOfMinutes(1), Task.of("cron3", () -> {            System.out.println("cron3");        }));        // 注册一个循环任务        scheduler.register(Trigger.periodOfDelay(5, 10), Task.of("period4", () -> {            System.out.println("period4");        }));        // 递增全局版本号        scheduler.version(2);        // 注册监听器        scheduler.listener(ctx -> {            System.out.println(ctx.task().name() + " is complete");        });        // 启动调度器        scheduler.start();    }}

cron4j

<dependency>    <groupId>it.sauronsoftware.cron4j</groupId>    <artifactId>cron4j</artifactId>    <version>2.2.5</version></dependency>

这个开源的 library 包含了基础的 cron 表达式解析功能,它还提供了任务的调度功能,不过这里并不需要使用它的调度器。我只会用到它的表达式解析功能,以及一个简单的方法用来判断当前的时间是否匹配表达式(是否该运行任务了)。

我们对 cron 的时间精度要求很低,1 分钟判断一次当前的时间是否到了该运行任务的时候就可以了。

class SchedulingPattern {    // 表达式是否有效    boolean validate(String cronExpr);    // 是否应该运行任务了(一分钟判断一次)    boolean match(long nowTs);}

任务的互斥性

因为是分布式任务调度器,多进程环境下要控制同一个任务在调度的时间点只能有一个进程运行。使用 Redis 分布式锁很容易就可以搞定。锁需要保持一定的时间(比如默认 5s)。

所有的进程都会在同一时间调度这个任务,但是只有一个进程可以抢到锁。因为分布式环境下时间的不一致性,不同机器上的进程会有较小的时间差异窗口,锁必须保持一个窗口时间,这里我默认设置为 5s(可定制),这就要求不同机器的时间差不能超过 5s,超出了这个值就会出现重复调度。

public boolean grabTask(String name) {    var holder = new Holder<Boolean>();    redis.execute(jedis -> {        var lockKey = keyFor("task_lock", name);        var ok = jedis.set(lockKey, "true", SetParams.setParams().nx().ex(lockAge));        holder.value(ok != null);    });    return holder.value();}

全局版本号

我们给任务列表附上一个全局的版本号,当业务上需要增加或者减少调度任务时,通过变更版本号来触发进程的任务重加载。这个重加载的过程包含轮询全局版本号(Redis 的一个key),如果发现版本号变动,立即重新加载任务列表配置并重新调度所有的任务。

private void scheduleReload() {    // 1s 对比一次    this.scheduler.scheduleWithFixedDelay(() -> {        try {            if (this.reloadIfChanged()) {                this.rescheduleTasks();            }        } catch (Exception e) {            LOG.error("reloading tasks error", e);        }    }, 0, 1, TimeUnit.SECONDS);}

重新调度任务先要取消当前所有正在调度的任务,然后调度刚刚加载的所有任务。

private void rescheduleTasks() {    this.cancelAllTasks();    this.scheduleTasks();}private void cancelAllTasks() {    this.futures.forEach((name, future) -> {        LOG.warn("cancelling task {}", name);        future.cancel(false);    });    this.futures.clear();}

因为需要将任务持久化,所以设计了一套任务的序列化格式,这个也很简单,使用文本符号分割任务配置属性就行。

// 一次性任务(startTime)ONCE@2019-04-29T15:26:29.946+0800// 循环任务,(startTime,endTime,period),这里任务的结束时间是天荒地老PERIOD@2019-04-29T15:26:29.949+0800|292278994-08-17T15:12:55.807+0800|5// cron 任务,一分钟一次CRON@*/1 * * * *$ redis-cli127.0.0.1:6379> hgetall sample_triggers1) "task3"2) "CRON@*/1 * * * *"3) "task2"4) "PERIOD@2019-04-29T15:26:29.949+0800|292278994-08-17T15:12:55.807+0800|5"5) "task1"6) "ONCE@2019-04-29T15:26:29.946+0800"7) "task4"8) "PERIOD@2019-04-29T15:26:29.957+0800|292278994-08-17T15:12:55.807+0800|10"

线程池

时间调度会有一个单独的线程(单线程线程池),任务的运行由另外一个线程池来完成(数量可定制)。

class DistributedScheduler {    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();    private ExecutorService executor = Executors.newFixedThreadPool(threads);}

之所以要将线程池分开,是为了避免任务的执行(IO)影响了时间的精确调度。

支持无互斥任务

互斥任务要求任务的单进程运行,无互斥任务就是没有加分布式锁的任务,可以多进程同时运行。默认需要互斥。

class Task {    /**     * 是否需要考虑多进程互斥(true表示不互斥,多进程能同时跑)     */    private boolean concurrent;    private String name;    private Runnable runner;    ...    public static Task of(String name, Runnable runner) {        return new Task(name, false, runner);    }    public static Task concurrent(String name, Runnable runner) {        return new Task(name, true, runner);    }}

增加回调接口

考虑到调度器的使用者可能需要对任务运行状态进行监控,这里增加了一个简单的回调接口,目前功能比较简单。能汇报运行结果(成功还是异常)和运行的耗时

class TaskContext {    private Task task;    private long cost;  // 运行时间    private boolean ok;    private Throwable e;}interface ISchedulerListener {    public void onComplete(TaskContext ctx);}

支持存储扩展

目前只实现了 Redis 和 Memory 形式的任务存储,扩展到 zk、etcd、关系数据库也是可行的,实现下面的接口即可。

interface ITaskStore {  public long getRemoteVersion();  public Map<String, String> getAllTriggers();  public void saveAllTriggers(long version, Map<String, String> triggers);  public boolean grabTask(String name);}

代码地址

https://github.com/pyloque/taskin
Image placeholder
csh1989
未设置
  40人点赞

没有讨论,发表一下自己的看法吧

推荐文章
基于 Zookeeper 的分布式锁实现

1.背景最近在学习Zookeeper,在刚开始接触Zookeeper的时候,完全不知道Zookeeper有什么用。且很多资料都是将Zookeeper描述成一个“类Unix/Linux文件系统”的中间件

Peloton:优步开源的统一资源调度器

Peloton最初是在2018年11月份引入的,并在2019年3月份正式开源。Peloton是为像优步这样拥有数百万个容器和数万个节点的规模公司设计的,它提供了高级的资源管理特性,比如弹性资源共享、层

滴滴 曾奇:谈谈我所认识的分布式锁

桔妹导读:随着计算机技术和工程架构的发展,微服务变得越来越热。如今,绝大多数服务都处于分布式环境中,其中,数据一致性是我们一直关注的重点。分布式锁到底是什么?经过了哪些发展演进?工程上有哪些实现方案?

10分钟搞懂:亿级用户的分布式数据存储解决方案!

来源:IT进阶思维原创,转载请注明原出处内容提供:李智慧,前阿里巴巴技术专家,《大型网站技术架构》作者6月6日晚,林志玲与Akira公布婚讯、徐蔡坤祝福高考同学超常发挥,粉丝们百万的转发和点赞造成微博

干货 | 揭秘京东数科强一致、高性能的分布式事务中间件JDTX

导读:在分布式数据库、云原生数据库、NewSQL等名词在数据库领域层出不穷的当今,变革——在这个相对稳定的领域已愈加不可避免。相比于完全革新,渐进式增强的方案在拥有厚重沉淀的行业则更受青睐。同所有分布

快看,我们的分布式缓存就是这样把注册中心搞崩塌

写公众号两年以来,每当有机会写故障类主题的时候,我都会在开始前静静地望着显示器很久,经过多次煎熬和挣扎之后才敢提起笔来,为什么呢?因为这样的话题很容易招来吐槽,比如“说了半天,不就是配置没配好吗?”,

宜信微服务任务调度平台建设实践|分享实录

导读:如今,无论是互联网应用还是企业级应用,都充斥着大量的批处理任务,常常需要一些任务调度系统帮助我们解决问题。随着微服务化架构的逐步演进,单体架构逐渐演变为分布式、微服务架构。内容来源:宜信技术学院

宜信开源|微服务任务调度平台SIA-TASK入手实践

引言最近宜信开源微服务任务调度平台SIA-TASK,SIA-TASK属于分布式的任务调度平台,使用起来简单方便,非常容易入手,部署搭建好SIA-TASK任务调度平台之后,编写TASK后配置JOB进行调

Go 语言实战向导

实现初衷 简单可依赖,充分利用go已有的东西,不另外增加复杂、难以理解的东西,这样做的好处包括:更容易跟随go的升级而升级,降低使用者学习成本 yii提供的controller/action的路由方式

使用 Vue.js 和 Iris 共建一个简单的 Todo MVC 应用

本文用Golang的Iris框架作为后端服务,vuejs渲染前端UI,用websocket通信。基于监听hash变化director.js库实现简单路由,axios库与后方沟通,netoffos.j

使用 Vue.js 和 Iris 共建一个简单的 Todo MVC 应用

数据服务 packagetodo import"sync" //Item条目数据 typeItemstruct{ SessionIDstring`json:"-"` IDint64`json:"i

Go语言高级编程_6.1 分布式id生成器

6.1分布式id生成器 有时我们需要能够生成类似MySQL自增ID这样不断增大,同时又不会重复的id。以支持业务中的高并发场景。比较典型的,电商促销时,短时间内会有大量的订单涌入到系统,比如每秒10w

Go语言高级编程_6.2 分布式锁

6.2分布式锁 在单机程序并发或并行修改全局变量时,需要对修改行为加锁以创造临界区。为什么需要加锁呢?我们看看在不加锁的情况下并发计数会发生什么情况: packagemain import( "sy

Go语言高级编程_6.4 分布式搜索引擎

6.4分布式搜索引擎 在Web一章中,我们提到MySQL很脆弱。数据库系统本身要保证实时和强一致性,所以其功能设计上都是为了满足这种一致性需求。比如writeaheadlog的设计,基于B+树实现的索

Go语言高级编程_6.6 分布式配置管理

6.6分布式配置管理 在分布式系统中,常困扰我们的还有上线问题。虽然目前有一些优雅重启方案,但实际应用中可能受限于我们系统内部的运行情况而没有办法做到真正的“优雅”。比如我们为了对去下游的流量进行限制

Go语言高级编程_6.7 分布式爬虫

6.7分布式爬虫 互联网时代的信息爆炸是很多人倍感头痛的问题,应接不暇的新闻、信息、视频,无孔不入地侵占着我们的碎片时间。但另一方面,在我们真正需要数据的时候,却感觉数据并不是那么容易获取的。比如我们

分布式Redis深度历险-复制

摘要Redis深度历险分为两个部分,单机Redis和分布式Redis。本文为分布式Redis深度历险系列的第一篇,主要内容为Redis的复制功能。Redis的复制功能的作用和大多数分布式存储系统一样,

深入浅出百亿请求高可用Redis(codis)分布式集群揭秘

摘要:作为noSql中的kv数据库的王者,redis以其高性能,低时延,丰富的数据结构备受开发者青睐,但是由于redis在水平伸缩性上受限,如何做到能够水平扩容,同时对业务无侵入性是很多使用redis

分布式时序数据库QTSDB的设计与实现

现有的开源时序数据库influxdb只支持单机运行,在面临大量数据写入时,会出现查询慢,机器负载高,单机容量的限制。为了解决这一问题,360基础架构团队在单机influxdb的基础上,开发了集群版——

如果有人再问你怎么实现分布式延时消息,这篇文章丢给他

1.背景上篇文章介绍了RocketMQ整体架构和原理有兴趣的可以阅读一下,在这篇文章中的延时消息部分,我写道开源版的RocketMQ只提供了18个层级的消息队列延时,这个功能在开源版中显得特别鸡肋,但

PHP 进程池与轮询调度算法实现多任务

phper请了解进程调度策略,CPU时间片,进程控制【创建,销毁,回收,进程信号】与及进程运行流程和基本的进程组,信号中断原理,以及进程之间的关系。关于进程的更多内容可参考本人前面撸过的文章或是百度了

从关系型数据库到分布式机器学习,揭秘腾讯大数据十年发展历程

大数据技术在过去10多年中极大改变了企业对数据的存储、处理和分析方式。如今,大数据技术逐渐成熟,涵盖了计算、存储、数仓、数据集成、可视化、NOSQL、OLAP分析、机器学习等丰富领域。在未来,大数据技

云原生时代,分布式系统设计必备知识图谱(内含22个知识点)

作者|杨泽强(竹涧)阿里云技术专家我们身处于一个充斥着分布式系统解决方案的计算机时代,无论是支付宝、微信这样顶级流量产品、还是区块链、IOT等热门概念、抑或如火如荼的容器生态技术如Kubernetes

中国移动智能硬件质量报告解读 分布式路由市场你了解多少?

今年6月份,中国移动终端实验室发布了《中国移动2019年智能硬件质量报告》(第一期),并于近日对该报告进行了相关解读,同时对优秀智能硬件产品进行颁奖。根据介绍,本次报告在内容上主要包括手机产品综合评测

为什么分布式网络是一种新兴趋势?

互联网的大规模采用可归功于以下五个重要因素:TCP/IPTCP/IP(传输控制协议/Internet协议)是指Internet上使用的标准数据通信协议集。它由DARPA开发,并由互联网工程任务组(IE