聊聊chronos的pullFromDefaultCFAndPush

本文主要研究一下chronos的pullFromDefaultCFAndPush

pullFromDefaultCFAndPush

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPushService.java

public class MqPushService {

    //......

    public void pullFromDefaultCFAndPush() {
        final long seekTimestamp = MetaService.getSeekTimestamp();
        final long zkSeekTimestamp = MetaService.getZkSeekTimestamp();

        // backup的seekTimestamp不能超过master的seekTimestamp
        if (MasterElection.isBackup()) {
            if (seekTimestamp >= zkSeekTimestamp) {
                LOGGER.debug("backup's pull from db should stop for seekTimestamp > zkSeekTimestamp, seekTimestamp:{}, zkSeekTimestamp:{}, Thread:{}",
                        seekTimestamp, zkSeekTimestamp, Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                }
                return;
            }
        }

        // seekTimestamp不能超过当前时间
        final long now = TsUtils.genTS();
        if (seekTimestamp > now) {
            LOGGER.debug("pull from db should stop for seekTimestamp > now, seekTimestamp:{}, now:{}, Thread:{}",
                    seekTimestamp, now, round, Thread.currentThread().getName());
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
            }
            return;
        }

        round++;
        final long start = System.currentTimeMillis();
        final long diff = start / 1000 - seekTimestamp;
        LOGGER.info("pull from db start, seekTimestamp:{}, currTimestamp:{}, diff:{} round:{}",
                seekTimestamp, start / 1000, diff, round);
        MetricService.putSeekLatency(MasterElection.getState().toString(), diff + 10); // 因为0上传到metric之后不显示

        // 迭代出当前 seekTimestamp 下所有数据
        int count = 0;
        try (RocksIterator it = RDB.newIterator(CFManager.CFH_DEFAULT)) {
            for (it.seek(KeyUtils.genSeekKey(seekTimestamp)); it.isValid(); it.next()) {
                final String dMsgId = new String(it.key());
                final InternalKey internalKey = new InternalKey(dMsgId);

                //......

                boolean needMetricWriteQpsAfterSplit = false;

                // 循环消息需要插入一条新的消息, 如果失效, 则不再插入
                if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()
                        || internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {
                    final InternalKey nextInternalKey = new InternalKey(internalKey).nextUniqDelayMsgId();
                    if (!KeyUtils.isInvalidMsg(nextInternalKey)) {
                        batcher.putToDefaultCF(nextInternalKey.genUniqDelayMsgIdWithSegmentInfoIfHas(), it.value(), null, nextInternalKey, Actions.ADD.getValue());
                        needMetricWriteQpsAfterSplit = true;
                    }
                }

                byte[] bytes = it.value();
                if (internalKey.getSegmentNum() > 0) {
                    try {
                        OUTPUT.write(it.value());
                        LOGGER.info("segment merge, dMsgId:{}, value.len:{}, value.acc.len:{}", internalKey.genUniqDelayMsgIdWithSegmentInfoIfHas(), it.value().length, OUTPUT.size());
                        if (internalKey.getSegmentNum() != (internalKey.getSegmentIndex() - Constants.SEGMENT_INDEX_BASE + 1)) {
                            continue;
                        }
                        bytes = OUTPUT.toByteArray();
                        OUTPUT.reset();
                    } catch (IOException e) {
                        LOGGER.error("error while output.write byte array, msg:{}", e.getMessage(), e);
                    }
                }

                // 如果解析不出来, 说明格式有问题, 抛弃掉该条消息, 不阻塞
                final InternalValue internalValue = JsonUtils.fromJsonString(bytes, InternalValue.class);
                if (internalValue == null) {
                    continue;
                }

                //......

                count++;

                try {
                    blockingQueue.put(new InternalPair(internalKey, internalValue));
                } catch (InterruptedException e) {
                    LOGGER.error("error while put to blockingQueue, dMsgId:{}", dMsgId);
                }

                if (count % INTERNAL_PAIR_COUNT == 0) {
                    sendConcurrent(blockingQueue, round);
                }
            }

            sendConcurrent(blockingQueue, round);
        }

        needCancelMap.forEach((uniqDelayMsgId, tombstoneKey) -> {
            final InternalKey internalKey = new InternalKey(uniqDelayMsgId);
            final InternalKey tombstoneInternalKey = new InternalKey(tombstoneKey);

            // 残留的循环消息取消需要重新添加进去, 否则会删除不掉
            if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()
                    || internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {
                final InternalKey nextTombstoneKey = tombstoneInternalKey.nextUniqDelayMsgId();
                final InternalKey nextInternalKey = internalKey.nextUniqDelayMsgId();
                if (!KeyUtils.isInvalidMsg(nextTombstoneKey)) {
                    String topic = needCancelTopicMap.get(uniqDelayMsgId);
                    batcher.putToDefaultCF(nextTombstoneKey.genUniqDelayMsgId(),
                            new CancelWrap(nextInternalKey.genUniqDelayMsgId(), topic).toJsonString(),
                            topic, nextInternalKey, Actions.CANCEL.getValue());

                } else {
                    LOGGER.info("pull from db succ cancel message of tombstone key, tombstone dMsgId:{}",
                            nextTombstoneKey.genUniqDelayMsgId());
                }
            }
        });
        batcher.flush();

        needCancelMap.clear();
        needCancelTopicMap.clear();

        // 更新offset
        MetaService.nextSeekTimestamp();

        LOGGER.info("pull from db finish push, pushCost:{}ms, count:{}, seekTimestamp:{}, round:{}",
                System.currentTimeMillis() - start, count, seekTimestamp, round);
    }

    //......
}
  • pullFromDefaultCFAndPush方法先从metaService获取seekTimestamp及zkSeekTimestamp,若seekTimestamp超过当前时间则提前返回;之后从RDB.newIterator(CFManager.CFH_DEFAULT)获取RocksIterator进行遍历,读取dMsgId构造internalKey,若其type是LOOP_DELAY或者LOOP_EXPONENT_DELAY则通过batcher.putToDefaultCF重新放入rocksdb;之后读取it.value()构造internalValue,紧接着构造InternalPair放入到blockingQueue,之后在count % INTERNAL_PAIR_COUNT == 0时执行sendConcurrent,在循环结束之后再执行一次sendConcurrent;最后更新MetaService.nextSeekTimestamp()

sendConcurrent

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPushService.java

public class MqPushService {

    //......

    private void sendConcurrent(final BlockingQueue<InternalPair> blockingQueue, final long round) {
        if (blockingQueue.size() == 0) {
            LOGGER.info("pull from db sendConcurrent start, return for no message to send, round:{}", round);
            return;
        }

        final long sendCount = blockingQueue.size();
        LOGGER.info("pull from db sendConcurrent start, send count:{}, round:{}", sendCount, round);
        final long start = System.currentTimeMillis();
        final CountDownLatch cdl = new CountDownLatch(blockingQueue.size());
        InternalPair internalPair;
        while ((internalPair = blockingQueue.poll()) != null) {
            final InternalPair immutableInternalPair = internalPair;
            pushThreadPool.execute(() -> {
                while (!send(
                        immutableInternalPair.getInternalValue().getTopic(),
                        immutableInternalPair.getInternalValue().getBody().getBytes(Charsets.UTF_8),
                        immutableInternalPair.getInternalKey(),
                        immutableInternalPair.getInternalValue().getTags(),
                        immutableInternalPair.getInternalValue().getProperties(),
                        false)) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                    }
                }
                cdl.countDown();
            });
        }

        try {
            cdl.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        final long cost = System.currentTimeMillis() - start;
        LOGGER.info("pull from db sendConcurrent end, send count:{}, round:{}, cost:{}ms", sendCount, round, cost);
    }

    //......
}
  • sendConcurrent方法会执行blockingQueue.poll(),然后执行send方法

send

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPushService.java

public class MqPushService {

    //......

    private boolean send(final String topic, final byte[] body, final InternalKey internalKey, final String tags, final Map<String, String> properties,
                        final boolean direct) {
        final long start = System.nanoTime();
        final String key = internalKey.genUniqDelayMsgId();
        MetricMsgType metricMsgType;

        if (internalKey.getType() == MsgTypes.DELAY.getValue()) {
            metricMsgType = MetricMsgType.DELAY;
        } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) {
            metricMsgType = MetricMsgType.LOOP_DELAY;
        } else {
            metricMsgType = MetricMsgType.UNKNOWN;
        }

        int len = 0;
        if (body != null) {
            len = body.length;
        }

        if (MasterElection.isBackup()) {
            if (direct) {
                LOGGER.info("succ send message(but cancel for backup) directly, topic:{}, dMsgId:{}, len:{}", topic, key, len);
                MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.SEND, MetricPushMsgResult.BACKUP);
            } else {
                LOGGER.info("succ send message(but cancel for backup) from db, topic:{}, dMsgId:{}, len:{}", topic, key, len);
                MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.DB, MetricPushMsgResult.BACKUP);
            }
            return true;
        }

        if (ConfigManager.getConfig().isFakeSend()) {
            try {
                TimeUnit.MILLISECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
            if (direct) {
                LOGGER.info("succ send message directly(fakeSend), topic:{}, dMsgId:{}, len:{}", topic, key, len);
            } else {
                LOGGER.info("succ send message from db(fakeSend), topic:{}, dMsgId:{}, len:{}", topic, key, len);
            }
            return true;
        }

        MessageBuilder messageBuilder = producer.messageBuilder().setTopic(topic).setBody(body).setKey(key).setTags(tags).setRandomPartition();
        if (properties != null && properties.size() > 0) {
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                LOGGER.debug("properties, topic:{}, dMsgId:{}, key:{}, value:{}", topic, key, entry.getKey(), entry.getValue());
                // IMPORTANT: If use addProperty for isPressureTraffic, the property will be ignored
                if (PRESSURE_TRAFFIC_KEY.equals(entry.getKey())) {
                    messageBuilder.setPressureTraffic(Boolean.parseBoolean(entry.getValue()));
                } else {
                    messageBuilder.addProperty(entry.getKey(), entry.getValue());
                }
            }
        }
        messageBuilder.addProperty(PROPERTY_KEY_FROM_CHRONOS, PROPERTY_KEY_FROM_CHRONOS);

        final Result result = messageBuilder.send();
        final long cost = (System.nanoTime() - start) / 1000;
        MetricService.putPushLatency(topic, cost);

        if (result.getCode() == CarreraReturnCode.OK) {
            if (direct) {
                LOGGER.info("succ send message directly, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost);
                MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.SEND, MetricPushMsgResult.OK);
            } else {
                LOGGER.info("succ send message from db, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost);
                MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.DB, MetricPushMsgResult.OK);
            }
            return true;
        } else if (result.getCode() == CarreraReturnCode.FAIL_TOPIC_NOT_EXIST
                || result.getCode() == CarreraReturnCode.FAIL_TOPIC_NOT_ALLOWED
                || result.getCode() == CarreraReturnCode.FAIL_ILLEGAL_MSG
                || result.getCode() == CarreraReturnCode.MISSING_PARAMETERS) {
            if (direct) {
                LOGGER.error("fail send message directly, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost);
                MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.SEND, MetricPushMsgResult.FAIL);
            } else {
                LOGGER.error("fail send message from db, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost);
                MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.DB, MetricPushMsgResult.FAIL);
            }
            return true;
        } else {
            if (direct) {
                LOGGER.error("error while send message directly, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost);
                MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.SEND, MetricPushMsgResult.FAIL);
            } else {
                LOGGER.error("error while send message from db, topic:{}, dMsgId:{}, len:{}, result:{}, cost:{}us", topic, key, len, result, cost);
                MetricService.incPushQps(topic, metricMsgType, MetricMsgToOrFrom.DB, MetricPushMsgResult.FAIL);
            }
            return false;
        }
    }

    //......
}
  • send方法主要是构造messageBuilder,然后执行messageBuilder.send()

小结

pullFromDefaultCFAndPush方法先从metaService获取seekTimestamp及zkSeekTimestamp,若seekTimestamp超过当前时间则提前返回;之后从RDB.newIterator(CFManager.CFH_DEFAULT)获取RocksIterator进行遍历,读取dMsgId构造internalKey,若其type是LOOP_DELAY或者LOOP_EXPONENT_DELAY则通过batcher.putToDefaultCF重新放入rocksdb;之后读取it.value()构造internalValue,紧接着构造InternalPair放入到blockingQueue,之后在count % INTERNAL_PAIR_COUNT == 0时执行sendConcurrent,在循环结束之后再执行一次sendConcurrent;最后更新MetaService.nextSeekTimestamp()

doc

Image placeholder
dxinfo
未设置
  70人点赞

没有讨论,发表一下自己的看法吧

推荐文章
聊聊chronos的BackupDB

序本文主要研究一下chronos的BackupDBBackupDBDDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/db/Backup

聊聊chronos的DeleteBgWorker

序本文主要研究一下chronos的DeleteBgWorkerDeleteBgWorkerDDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chron

一文带你掌握常见的Pandas性能优化方法,让你的pandas飞起来!

微信公众号:「Python读财」如有问题或建议,请公众号留言Pandas是Python中用于数据处理与分析的屠龙刀,想必大家也都不陌生,但Pandas在使用上有一些技巧和需要注意的地方,尤其是对于较大

笨办法学 Linux Bash:Shell、`.profile`、`.bashrc`、`.bash_history`

Bash:Shell、.profile、.bashrc、.bash_history。 当使用CLI(命令行界面)来使用Linux时,你正在与一个名为shell的程序进行交互。所有你输入的都传递给she

pymysql fetchone () , fetchall () , fetchmany ()

最近在用python操作mysql数据库时,碰到了下面这两个函数,标记一下: 1.定义 1.1fetchone(): 返回单个的元组,也就是一条记录(row),如果没有结果则返回None 1.2fet

开源监控系统Prometheus的前世今生

Prometheus是SoundCloud公司开源的监控系统,同时也是继Kubernetes之后,第二个加入CNCF的项目。Prometheus是一个优秀的监控系统,沃趣围绕着Prometheus先后

使用phpStudy小皮面板,typecho的POST数据提交无反应问题

在安装typecho的过程中,遇到提交数据无反应,页面不报错的问题,十分迷惑。刚开始以为是centos系统文件夹权限问题,反复修改权限都无法解决问题。最后发现是,小皮面板默认开启的防火墙问题,直接关闭

Composer包管理工具精讲及搭建自己的PHP开发框架笔记

https://www.lmonkey.com/chapters/dvy9p2EKN/tasksComposer包管理工具精讲及搭建自己的PHP开发框架

基于Pandas+ECharts的金融大数据可视化实现方案

前言最近无意中看到一篇文章,介绍的是在IPythonNotebook里实现ECharts的可视化效果。我个人对ECharts一直是推崇有加,是baidu发布的开源项目中我比较喜欢的一个,绝对是良心之作

2019机器学习框架之争:与Tensorflow竞争白热化,进击的PyTorch赢在哪里?

大数据文摘出品来源:thegradient编译:张大笔茹、曹培信、刘俊寰、牛婉扬、Andy2019年,机器学习框架之争进入了新阶段:PyTorch与TensorFlow成为最后两大玩家,PyTorch

为什么说谷歌Anthos是kubernetes的翻版?

在本周纽约的一次会议活动中,谷歌谈到了Anthos。那么,Anthos到底是什么?有哪些新功能?本文将逐一解答!什么是Anthos?从官方资料来看,Anthos是谷歌的混合云平台,主要作用是保护客户的

react-native中IOS的webview和js层通信 - UIWebview

前言在9012的最后一篇写到了在rn中安卓的webview的通信原理,而作为0202年的第一篇,继续讨论上年rn中webview通信剩下的部分。背景:对于webview,了解过的人都知道在ios端会存

vue.js的localhost无法打开

课程推荐:web全栈开发就业班--拿到offer再缴学费--融职教育 vue项目不能使用localhost访问 问题 vue项目不能使用localhost访问,但是使用本机的ip加端口号是可以访问的

PHP 中 bind 的用法 self 和 static 的区别

bind是bindTo的静态版本,因此只说bind吧。(还不是太了解为什么要弄出两个版本)官方文档:复制一个闭包,绑定指定的$this对象和类作用域。其实后半句表述很不清楚。我的理解:把一个闭包转换为

聊聊 Vapor

近期对Vapor的无服务实现方式感兴趣,花了些时间研究了下其实践的机制。什么是Vapor? LaravelVaporisaserverlessdeploymentplatformforLaravel,

万字长文:聊聊几种主流Docker网络的实现原理

一、容器网络简介容器网络主要解决两大核心问题:一是容器的IP地址分配,二是容器之间的相互通信。本文重在研究第二个问题并且主要研究容器的跨主机通信问题。实现容器跨主机通信的最简单方式就是直接使用host

聊聊前端排序的那些事

课程推荐:前端开发工程师--学习猿地精品课程 前言貌似前端[1]圈一直以来流传着一种误解:前端用不到算法知识。[2]长久以来,我也曾受这种说法的影响。直到前阵子遇到一个产品需求,回过头来看,发现事实并

聊聊【爬虫开发】这半年来的心得

课程推荐:Python开发工程师--学习猿地--送9个上线商业项目 前言在工作中,已经陆陆续续使用爬虫做需求将近半年时间了,在这半年时间里,从一个python小白到爬虫入门,再至功能实现。从上午PHP

jquery怎么获取a标签href的值?

jquery怎么获取a标签href的值?href是a标签的一个属性,获取属性的值,在jquery中使用attr()方法。获取a标签的href属性就是$('a').attr('href')语法:$(se

有了这8个Chrome扩展工具,Web开发事半功倍!

Chrome浏览器扩展程序,无论对开发人员还是设计人员来说,都是非常有用的,有些扩展程序会对开发工具的某类功能进行增强,也有一些会复制开发工具中的部分特性。从某种意义上来说,Chrome商店中的每个扩

如何使用Vue构建Chrome扩展程序

浏览器扩展程序是可以修改和增强Web浏览器功能的小程序。它们可用于各种任务,例如阻止广告,管理密码,组织标签,改变网页的外观和行为等等。好消息是浏览器扩展并不难写。可以用你已经熟悉的Web技术(HTM

超8千Star,火遍Github的Python反直觉案例集!

大数据文摘授权转载作者:SatwikKansal译者:暮晨Python,是一个设计优美的解释型高级语言,它提供了很多能让程序员感到舒适的功能特性。但有的时候,Python的一些输出结果对于初学者来说似

流畅的python学习笔记-第4章

第4章文本和字节序列[toc]编码和解码markdom可以插入emoji表情包其中作为前缀,x1f54为对应表情的unicode编码🥺emoji 把码位转换成字节序列的过程是编码;把字节序列转换成码位

流畅的python学习笔记-第5章

第5章函数[toc]函数在python中一切都可以视作为对象,包括函数deffunction_try(): '''itisfuncitontrydoc''' print('function_tryd

如何将网站的php版本信息隐藏起来

当我们把网站上线之后,我们可以通过curl的如下命令显示指定网站的头信息,curl的安装方法参考:https://www.wj0511.com/site/d...curl-Ihttps://www.w