聊聊chronos的DeleteBgWorker

本文主要研究一下chronos的DeleteBgWorker

DeleteBgWorker

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/workers/DeleteBgWorker.java

public class DeleteBgWorker {
    private static final Logger LOGGER = LoggerFactory.getLogger(DeleteBgWorker.class);

    private static final DeleteConfig DELETE_CONFIG = ConfigManager.getConfig().getDeleteConfig();
    private static final int SAVE_HOURS_OF_DATA = DELETE_CONFIG.getSaveHours();
    private static final long INITIAL_DELAY_MINUTES = 1; // 1 分钟
    private static final long PERIOD_MINUTES = 10;       // 10 分钟

    private static volatile DeleteBgWorker instance = null;
    private static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    /**
     * 2017/10/13 00:00:00
     */
    private static final long MIN_TIMESTAMP = 1507824000;

    private static final ScheduledExecutorService SCHEDULE = new ScheduledThreadPoolExecutor(1,
            new BasicThreadFactory.Builder().namingPattern("delete-bg-worker-schedule-%d").daemon(true).build());

    private DeleteBgWorker() {
    }

    public void start() {
        SCHEDULE.scheduleAtFixedRate(() -> {
            byte[] beginKey = String.valueOf(MIN_TIMESTAMP).getBytes(Charsets.UTF_8);

            final long seekTimestampInSecond = MetaService.getSeekTimestamp();
            byte[] endKey = String.valueOf(seekTimestampInSecond - SAVE_HOURS_OF_DATA * 60 * 60).getBytes(Charsets.UTF_8);
            deleteRange(beginKey, endKey);
        }, INITIAL_DELAY_MINUTES, PERIOD_MINUTES, TimeUnit.MINUTES);
        LOGGER.info("DeleteBgWorker has started, initialDelayInMinutes:{}", INITIAL_DELAY_MINUTES);
    }

    private void deleteRange(final byte[] beginKey, final byte[] endKey) {
        LOGGER.info("deleteRange start, beginKey:{}, endKey:{}", new String(beginKey), new String(endKey));
        final long start = System.currentTimeMillis();
        RDB.deleteFilesInRange(CFManager.CFH_DEFAULT, beginKey, endKey);
        LOGGER.info("deleteRange end, beginKey:{}({}), endKey:{}({}), cost:{}ms",
                new String(beginKey), formatter.format(Long.parseLong(new String(beginKey)) * 1000),
                new String(endKey), formatter.format(Long.parseLong(new String(endKey)) * 1000),
                System.currentTimeMillis() - start);
    }

    public void stop() {
        SCHEDULE.shutdownNow();
        while (!SCHEDULE.isShutdown()) {
            LOGGER.info("DeleteBgWorker is shutting down!");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                LOGGER.info("DeleteBgWorker was forced to shutdown, err:{}", e.getMessage(), e);
            }
        }
        LOGGER.info("DeleteBgWorker was shutdown!");
    }

    public static DeleteBgWorker getInstance() {
        if (instance == null) {
            synchronized (DeleteBgWorker.class) {
                if (instance == null) {
                    instance = new DeleteBgWorker();
                }
            }
        }
        return instance;
    }
}
  • DeleteBgWorker提供了静态方法getInstance来获取或创建单例,该类提供了start、stop两个方法;start方法会往SCHEDULE注册一个调度任务,每隔PERIOD_MINUTES执行一次,它主要执行deleteRange方法;deleteRange主要是执行RDB.deleteFilesInRange(CFManager.CFH_DEFAULT, beginKey, endKey),它会从MetaService获取seekTimestamp来计算endKey;stop方法则是关闭SCHEDULE

deleteFilesInRange

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/db/RDB.java

public class RDB {
    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(RDB.class);

    static RocksDB DB;

    //......

    public static boolean deleteFilesInRange(final ColumnFamilyHandle cfh, final byte[] beginKey,
                                             final byte[] endKey) {
        try {
            DB.deleteRange(cfh, beginKey, endKey);
            LOGGER.debug("succ delete range, columnFamilyHandle:{}, beginKey:{}, endKey:{}",
                    cfh.toString(), new String(beginKey), new String(endKey));
        } catch (RocksDBException e) {
            LOGGER.error("error while delete range, columnFamilyHandle:{}, beginKey:{}, endKey:{}, err:{}",
                    cfh.toString(), new String(beginKey), new String(endKey), e.getMessage(), e);
            return false;
        }
        return true;
    }

    //......
}
  • deleteFilesInRange方法主要是执行DB.deleteRange(cfh, beginKey, endKey)

MetaService

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MetaService.java

public class MetaService {
    private static final Logger LOGGER = LoggerFactory.getLogger(MetaService.class);

    private static volatile long seekTimestamp = -1;
    private static volatile long zkSeekTimestamp = -1;
    private static volatile Map<String, Long> zkQidOffsets = new ConcurrentHashMap<>();
    private static final DbConfig dbConfig = ConfigManager.getConfig().getDbConfig();

    private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1,
            new BasicThreadFactory.Builder().namingPattern("offset-seektimestamp-schedule-%d").daemon(true).build());

    public static void load() {
        final long start = System.currentTimeMillis();
        if (seekTimestamp == -1) {
            seekTimestamp = loadSeekTimestampFromFile();
        }
        final long cost = System.currentTimeMillis() - start;
        LOGGER.info("succ load seekTimestamp, seekTimestamp:{}, cost:{}ms", seekTimestamp, cost);

        SCHEDULER.scheduleWithFixedDelay(() -> {
            // 如果是master则拉取并上报zk offset和seekOffset
            if (MasterElection.isMaster()) {
                MqConsumeStatService.getInstance().uploadOffsetsToZk();
                uploadSeekTimestampToZk();
            }
        }, 5, 1, TimeUnit.SECONDS);
    }

    private static long loadSeekTimestampFromFile() {
        String seekTimestampStr = FileIOUtils.readFile2String(dbConfig.getSeekTimestampPath());
        if (StringUtils.isBlank(seekTimestampStr)) {
            final long initSeekTimestamp = TsUtils.genTS();
            boolean result = FileIOUtils.writeFileFromString(dbConfig.getSeekTimestampPath(), String.valueOf(initSeekTimestamp));
            if (result) {
                LOGGER.info("init seekTimestamp and succ save, current seekTimestamp:{}", initSeekTimestamp);
            } else {
                LOGGER.error("init seekTimestamp and fail to save, current seekTimestamp:{}", initSeekTimestamp);
            }
            return initSeekTimestamp;
        }
        LOGGER.info("succ load seekTimestamp from file, seekTimestamp:{}", Long.parseLong(seekTimestampStr));
        return Long.parseLong(seekTimestampStr);
    }

    public static long getSeekTimestamp() {
        return seekTimestamp;
    }

    /**
     * 此处的lock不能去掉
     * 判断消息超时时
     */
    public static void nextSeekTimestamp() {
        Batcher.lock.lock();
        try {
            seekTimestamp++;
            boolean result = FileIOUtils.writeFileFromString(dbConfig.getSeekTimestampPath(), String.valueOf(seekTimestamp));
            if (result) {
                LOGGER.info("incr seekTimestamp and succ save, next seekTimestamp:{}", seekTimestamp);
            } else {
                LOGGER.error("incr seekTimestamp and fail to save, next seekTimestamp:{}", seekTimestamp);
            }
        } finally {
            Batcher.lock.unlock();
        }
    }

    public static void uploadSeekTimestampToZk() {
        String seekTimestampStr = String.valueOf(MetaService.getSeekTimestamp());
        ZkUtils.createOrUpdateValue(Constants.SEEK_TIMESTAMP_ZK_PATH, seekTimestampStr);
        LOGGER.debug("upload seekTimestamp to zk, seekTimestamp:{}", seekTimestampStr);
    }

    public static Map<String, Long> getZkQidOffsets() {
        return zkQidOffsets;
    }

    public static void setZkQidOffsets(Map<String, Long> zkQidOffsets) {
        MetaService.zkQidOffsets = zkQidOffsets;
    }

    public static long getZkSeekTimestamp() {
        return zkSeekTimestamp;
    }

    public static void setZkSeekTimestamp(long zkSeekTimestamp) {
        MetaService.zkSeekTimestamp = zkSeekTimestamp;
    }
}
  • MetaService提供了load、getSeekTimestamp、nextSeekTimestamp、uploadSeekTimestampToZk等方法;load方法在seekTimestamp为-1时执行loadSeekTimestampFromFile,之后注册一个定时任务每隔1秒,判断是否是master,如果是则执行MqConsumeStatService.getInstance().uploadOffsetsToZk()以及uploadSeekTimestampToZk;nextSeekTimestamp方法会先更新内存的seekTimestamp,然后使用FileIOUtils.writeFileFromString将其值写入文件

小结

DeleteBgWorker提供了静态方法getInstance来获取或创建单例,该类提供了start、stop两个方法;start方法会往SCHEDULE注册一个调度任务,每隔PERIOD_MINUTES执行一次,它主要执行deleteRange方法;deleteRange主要是执行RDB.deleteFilesInRange(CFManager.CFH_DEFAULT, beginKey, endKey),它会从MetaService获取seekTimestamp来计算endKey;stop方法则是关闭SCHEDULE

doc

Image placeholder
dxinfo
未设置
  11人点赞

没有讨论,发表一下自己的看法吧

推荐文章
聊聊chronos的pullFromDefaultCFAndPush

序本文主要研究一下chronos的pullFromDefaultCFAndPushpullFromDefaultCFAndPushDDMQ/carrera-chronos/src/main/java/

聊聊chronos的BackupDB

序本文主要研究一下chronos的BackupDBBackupDBDDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/db/Backup

workerman源码-workerman启动流程

前面我们跟着代码看了一遍workerman的初始化流程.但对于如何监听端口.等操作还没有具体的实现.我们这次就来看一下.workerman是如何监听端口并运行的.runAll在前面我们初始化方法过后,

react-native中IOS的webview和js层通信 - UIWebview

前言在9012的最后一篇写到了在rn中安卓的webview的通信原理,而作为0202年的第一篇,继续讨论上年rn中webview通信剩下的部分。背景:对于webview,了解过的人都知道在ios端会存

一小时快速搭建基于阿里云容器服务-Kubernetes的Web应用

本文面向的读者如果您是一个Kubernetes的初学者,本文可以帮助你快速在云上搭建一个可实际使用的集群环境,并发布自己的第一个应用。你无须提前准备任何的硬件资源或者下载任何的软件包。 如果您已经有一

Go语言高级编程_4.7 pbgo: 基于Protobuf的框架

4.7pbgo:基于Protobuf的框架 pbgo是我们专门针对本节内容设计的较为完整的迷你框架,它基于Protobuf的扩展语法,通过插件自动生成rpc和rest相关代码。在本章第二节我们已经展示

打造高逼格、可视化的Docker容器监控系统平台

关于Docker技术的文章之前也断断续续写了几篇:Docker容器系列文章|Docker技术入门(一)Docker容器系列文章|Docker技术入门(二)Docker容器系列文章|这20个Docker

为什么说谷歌Anthos是kubernetes的翻版?

在本周纽约的一次会议活动中,谷歌谈到了Anthos。那么,Anthos到底是什么?有哪些新功能?本文将逐一解答!什么是Anthos?从官方资料来看,Anthos是谷歌的混合云平台,主要作用是保护客户的

Fish Redux中的Dispatch是怎么实现的?

前言开源地址:https://github.com/alibaba/fish-redux我们在使用fish-redux构建应用的时候,界面代码(view)和事件的处理逻辑(reducer,effe

万字长文:聊聊几种主流Docker网络的实现原理

一、容器网络简介容器网络主要解决两大核心问题:一是容器的IP地址分配,二是容器之间的相互通信。本文重在研究第二个问题并且主要研究容器的跨主机通信问题。实现容器跨主机通信的最简单方式就是直接使用host

为什么说Kubernetes的崛起预示着云原生时代到来?

现在,云原生、Kubernetes已经成为企业IT领域的时髦概念,几乎所有的企业都在关注;如果不提这些概念,好像企业就会在云市场竞争中失去绝对话语权。那么,云原生和Kubernetes是怎样一种关系?

开源监控系统Prometheus的前世今生

Prometheus是SoundCloud公司开源的监控系统,同时也是继Kubernetes之后,第二个加入CNCF的项目。Prometheus是一个优秀的监控系统,沃趣围绕着Prometheus先后

基于JS的高性能Flutter动态化框架MXFlutter

导语:18年10月份,手机QQ看点团队尝试使用Flutter,做为iOS开发,一接触到Flutter就马上感受到,Flutter虽然强大,但不能像RN一样动态化是阻碍我们使用她的唯一障碍了。看Goog

vue中的diff算法详解

1.当数据发生变化时,vue是怎么更新节点的?要知道渲染真实DOM的开销是很大的,比如有时候我们修改了某个数据,如果直接渲染到真实dom上会引起整个dom树的重绘和重排,有没有可能我们只更新我们修改的

基于中台实践的DevOps平台有何不同?

为了响应快速变化的市场需求,业务要快速迭代。IT正在向云原生架构转型,解放架构自由度,最大化业务敏捷性,解耦合、敏捷开发、快速部署是当下企业的追求,可以消除研发与运维之间鸿沟的DevOps(研发运维)

JavaScript的DOM应用笔记

获取a标签:document.links获取img标签:document.images获取form标签:document.forms

JavaScript的DOM应用笔记

通过id获取:document.getElementById()写入HTML:innerHTML()内部outerHTML()外部

JavaScript的DOM应用笔记

操作元素属性:element.attribute=newvalueelement.setAttribute(attribute,value)element.getAttribute(attribute

JavaScript的DOM应用笔记

通过id或标签名获取元素对象:getElementById(‘id’)getElementsByTagName(‘标签名’)getElementsByName(‘name’)getElementByC

JavaScript的DOM应用笔记

这里的修改className属性,因为长度会动态改变,所以需要将修改的下标设为0,

有了这8个Chrome扩展工具,Web开发事半功倍!

Chrome浏览器扩展程序,无论对开发人员还是设计人员来说,都是非常有用的,有些扩展程序会对开发工具的某类功能进行增强,也有一些会复制开发工具中的部分特性。从某种意义上来说,Chrome商店中的每个扩

pymysql fetchone () , fetchall () , fetchmany ()

最近在用python操作mysql数据库时,碰到了下面这两个函数,标记一下: 1.定义 1.1fetchone(): 返回单个的元组,也就是一条记录(row),如果没有结果则返回None 1.2fet

webpack中css的url报错?

webpack中css的url报错?css-loader://打包样式中背景图 { test:/\.(png|jpg)$/, loader:"url-loader?limit=8192&name=im

vuetifyjs的优点是什么?

官方网站:https://vuetifyjs.com/zh-Hans/Vuetify优点:几乎不需要任何CSS代码,而element-ui许多布局样式需要我们来编写Vuetify从底层构建起来的语义化

简介PWA与Service Worker

1.关于PWAPWA,全称Progressivewebapps,即渐进式Web应用。PWA技术主要作用为构建跨平台的Web应用程序,并使其具有与原生应用程序相同的用户体验。早在2014年,W3C就已经

{