迟来的干货 | Kafka权限管理实战

今年有很多小伙伴在公众号或者微信留言问能不能整一篇有关Kafka权限管理的文章,迫于工作关系,这个需求一直拖后。后来知道我的好友兼Kafka高玩 —— 【一岁小宝 】他一直在弄这一块的东西,所以我就厚着face皮去催他整一篇,也就是你们接下来看到的内容。本文原文地址:https://www.jianshu.com/p/09129c9f4c80。

有关Kafka权限管理的资料可谓是少之又少,当你遇到这块内容的时候,如果靠自己摸索的话会耗费很长的时间。如果现在还没有接触到这一块,也建议收藏本文,以备不时之需。

本片篇幅很大,在编辑本文的时候,右下角字数统计18000字(算上了英文字符,tx的字数统计有点怪异),建议先马后看。

一、概述

1、Kafka的权限分类

1)、身份认证(Authentication):对client 与服务器的连接进行身份认证,brokers和zookeeper之间的连接进行Authentication(producer 和 consumer)、其他 brokers、tools与 brokers 之间连接的认证。

2)、权限控制(Authorization):实现对于消息级别的权限控制,clients的读写操作进行Authorization:(生产/消费/group)数据权限。

2、实现方式

自0.9.0.0版本开始Kafka社区添加了许多功能用于提高Kafka群集的安全性,Kafka提供SSL或者SASL两种安全策略。SSL方式主要是通过CA令牌实现,此文主要介绍SASL方式。

1)SASL验证:

验证方式Kafka版本特点
SASL/PLAIN0.10.0.0不能动态增加用户
SASL/SCRAM0.10.2.0可以动态增加用户
SASL/Kerberos0.9.0.0需要独立部署验证服务
SASL/OAUTHBEARER2.0.0需自己实现接口实现token的创建和验证,需要额外Oauth服务

2)SSL加密: 使用SSL加密在代理和客户端之间,代理之间或代理和工具之间传输的数据。

二、使用SASL进行身份验证

1、SASL/PLAIN验证

1.1 版本

相关包版本下载地址
Kafka0.11.0.0http://kafka.apache.org/downloads
Zookeeper3.4.8https://www.apache.org/dyn/closer.cgi/zookeeper/

1.2 Zookeeper配置

1)修改zoo.cfg增加两行配置:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl

2)配置JAAS文件:

Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-sec"
};

说明:定义了需要链接到Zookeeper服务器的用户名和密码

3)加入需要的包:

kafka-clients-0.10.0.1.jar
lz4-1.3.0.jar
slf4j-api-1.7.21.jar
slf4j-log4j12-1.7.21.jar
snappy-java-1.1.2.6.jar

如果没有引入对应的kafka包,启动kafka时会报找不到org.apache.kafka.common.security.plain.PlainLoginModule包的错误。

Zookeeper引入加载包需改Zookeeper脚本zkEnv.sh,在最后加入:

for i in "$ZOOBINDIR"/../for_sasl/*.jar; do
  CLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "

将for_sasl目录下的所有jar文件追加到CLASSPATH变量,再设置一个JVM参数给SERVER_JVMFLAGS变量,这两个变量都会在Zookeeper启动时传给JVM。

4)启动Zookeeper

bin/zkServer.sh start

1.3 Kafka服务端配置

1)kafka增加认证信息:

创建JAAS文件:

KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-sec"
user_admin="admin-sec"
user_producer="prod-sec"
user_consumer="cons-sec";
};

user_XXX 为自定义的用户,所有可以使用的用户必须在此定义,不能再之后新增。

  • producer用于开放生产权限。
  • consumer用于开放消费权限。

JAAS文件定义了链接Kafka Broker时所需要的用户名密码及broker各个节点之间相互通信的用户名密码:

  • username/ password:broker之间通信使用的用户名密码。
  • user_admin/user_producer/user_consumer:客户端(管理员、生产者、消费者)链接broker时所使用到的用户名密码。

2)配置server.properties

listeners=SASL_PLAINTEXT://主机名称:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true     //当没有找到ACL配置时,允许所有的访问操作。

3)修改启动脚本

exec $base_dir/kafka-run-class.sh 
$EXTRA_ARGS -Djava.security.auth.login.config
=/home/qa/Downloads/kafka_2.12-0.11.0.1/config/kafka_server_jaas.conf kafka.Kafka "$@"

1.4 Kafka客户端端配置

1)创建JAAS文件:消费者:kafka_client_consumer_jaas.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="cons-sec";
};

生产者:kafka_client_producer_jaas.conf

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="consumer"
password="cons-sec";
};

2)修改客户端配置信息:分别在producer.properties和consumer.properties添加认证机制

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

consumer.properties中额外加入分组配置

group.id=test-group

3)修改客户端脚本指定JAAS文件加载:

export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka_client_jaas.conf"

1.5 授权

此时已经完成了基本的配置,但是如果测试可以发现无论是生产还是消费,都不被允许。最后一步,我们需要为设置的账号授权。

1)创建主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic test

2)增加生产权限:

./kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
 --authorizer-properties zookeeper.connect=localhost:2181 
--add --allow-principal User:producer --operation Write --topic test

3)配置消费权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add
 --allow-principal User:consumer --operation Read --topic test

4)配置消费分组权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:consumer --operation Read --group test-group

5)生产数据:

./kafka-console-producer-acl.sh --broker-list 127.0.0.1:9092 --topic test

6)消费数据:

bin/kafka-console-consumer-acl.sh --bootstrap-server  127.0.0.1:9092 
--topic test --from-beginning --consumer.config ./config/consumer.properties

我们可以查看配置的权限信息:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer 
--authorizer-properties zookeeper.connect=localhost:2181 --list

Current ACLs for resource Topic:test: User:producer has Allow permission for operations: Write from hosts: * User:consumer has Allow permission for operations: Read from hosts: * Current ACLs for resource Group:test-group: User:consumer has Allow permission for operations: Read from hosts: *

1.6 JAVA客户端

在消费者/生产者初始化属性中引入JAAS文件:

static{
System.setProperty("java.security.auth.login.config","D://demoPeoject//JpaTest//src//main//resources//kafka_client_scram_consumer_jaas.conf");
}
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "SCRAM-SHA-256");

2、SASL/SCRAM验证

上一节,我们通过配置SASL/PLAIN验证,实现了对Kafka的权限控制。但SASL/PLAIN验证有一个问题:只能在JAAS文件KafkaServer中配置用户,一但Kafka启动,无法动态新增用户。

SASL/SCRAM验证可以动态新增用户并分配权限。

2.1 版本

同上文

2.2 启动Zookeeper和Kafka

此方法是把凭证(credential)存储在Zookeeper,可以使用kafka-configs.sh在Zookeeper中创建凭据。对于每个SCRAM机制,必须添加具有机制名称的配置来创建凭证,在启动Kafka broker之前创建代理间通信的凭据。

所以第一步,在没有设置任何权限的配置下启动Kafka和Zookeeper。

2.3 创建SCRAM证书

1)创建broker建通信用户:admin(在使用sasl之前必须先创建,否则启动报错)

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[password=admin-sec],
SCRAM-SHA-512=[password=admin-sec]' --entity-type users --entity-name admin

2)创建生产用户:producer

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[iterations=8192,password=prod-sec],
SCRAM-SHA-512=[password=prod-sec]' --entity-type users --entity-name producer

3)创建消费用户:consumer

bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter 
--add-config 'SCRAM-SHA-256=[iterations=8192,password=cons-sec],
SCRAM-SHA-512=[password=cons-sec]' --entity-type users --entity-name consume

SCRAM-SHA-256/SCRAM-SHA-512是对密码加密的算法,二者有其一即可。

2.4 查看SCRAM证书

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name consumer

bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name producer

2.5 删除SCRAM证书

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' 
--delete-config 'SCRAM-SHA-256' --entity-type users --entity-name producer

2.6 服务端配置

在用户证书创建完毕之后开始Kafka服务端的配置:

1)创建JAAS文件:

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-sec";
};

2)将JAAS配置文件位置作为JVM参数传递给每个Kafka Broker

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/home/qa/Downloads/kafka_2.12-0.11.0.1/config/kafka_server_jaas.conf kafka.Kafka "$@"

3)配置server.properties:

认证配置
listeners=SASL_PLAINTEXT://主机名称:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

可以根据自己的需求选择SASL_SSL或SASL_PLAINTEXT, PLAINTEXT为不加密明文传输,性能好与SSL

4)重启Kafka和Zookeeper

2.7 客户端配置

1)为我们创建的三个用户分别创建三个JAAS文件:

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-sec";
};


KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="consumer"
password="cons-sec";
};


KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="producer"
password="prod-sec";
};

2)修改启动脚本引入JAAS文件:

以生产者为例:

exec $(dirname $0)/kafka-run-class.sh 
-Djava.security.auth.login.config=/home/qa/Downloads/kafka_2.12-0.11.0.1/config/kafka_client_scram_producer_jaas.conf

3)ACL权限配置:此时如果我们生产数据则会发生如下错误:

上文2.3节中我们创建了三个用户,但是还未对其赋予操作权限,接下来我们为其增加权限。

生产者:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add
 --allow-principal User:producer --operation Write --topic test --allow-host 192.168.2.*

消费者:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer 
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:consumer--operation Read --topic test --allow-host 192.168.2.*

为生产者增加分组权限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer
--authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:producer --operation Read --group test-group --allow-host 192.168.2.*

分配权限之后就可以进行生产,消费操作了。(其他颗粒度的权限设置请参考官方文档)

4)查看权限

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer 
--authorizer-properties zookeeper.connect=localhost:2181 --list

2.8 小结

SASL/SCRAM验证方法可以在Kafka服务启动之后,动态的新增用户分并配权限,在业务变动频繁,开发人员多的情况下比SASL/PLAIN方法更加灵活。

3. SASL/OAUTHBEARER

SASL/OAUTHBEARER验证是Kafka2.0版中新增的验证方式:

KIP-255 adds a framework for authenticating to Kafka brokers using OAuth2 bearer tokens. The SASL/ OAuthBurer implementation is customizable using callbacks for token retrieval and validation.

KIP-255增加了一个使用OAuth2承载令牌对KafkaBroker进行认证的框架。SASL/OAuthBurer实现可使用回调进行令牌检索和验证。

3.1 Oauth2.0验证

OAuth(开放授权)是一个开放标准,允许用户授权第三方移动应用访问他们存储在另外的服务提供者上的信息,而不需要将用户名和密码提供给第三方移动应用或分享他们数据的所有内容,OAuth2.0是OAuth协议的延续版本。

Oauth2.0的工作流程如下图:

(A)用户打开客户端以后,客户端要求用户给予授权。

(B)用户同意给予客户端授权。

(C)客户端使用上一步获得的授权,向认证服务器申请令牌。

(D)认证服务器对客户端进行认证以后,确认无误,同意发放令牌。

(E)客户端使用令牌,向资源服务器申请获取资源。

(F)资源服务器确认令牌无误,同意向客户端开放资源。

3.2 JWT规范

Oauth2.0返回验证令牌使用了JWT格式:JSON Web Token(缩写 JWT)是目前最流行的跨域认证解决方案,JWT 的原理是,服务器认证以后,生成一个 JSON 对象,发回给用户。

JWT由三个部分组成,分别是:

  • Header(头部)
  • Claims(载荷)
  • Signature(签名)

1)载荷

{
"sub": "1",
"iss": "http://localhost:8000/auth/login",
"iat": 1451888119,
"exp": 1454516119,
"jti": "37c107e4609ddbcc9c096ea5ee76c667"
"nbf": 1451888119
}

sub:该JWT所面向的用户 iss:该JWT的签发者 iat(issued at):在什么时候签发的token exp(expires):token什么时候过期 nbf(not before):token在此时间之前不能被接收处理 jti:JWT ID为web token提供唯一标识

2)头部

{
"typ": "JWT",
"alg": "HS256"
}

typ:指明数据格式 alg:指明加密算法

3)签名 以上两个部分base64编码之后链接成一个字符串,用HS256算法进行加密。在加密的时候,我们还需要提供一个密钥(secret):

HMACSHA256(
base64UrlEncode(header) + "." +
base64UrlEncode(payload),
secret
)

算出签名以后,把 Header、Payload、Signature 三个部分用”点”(.)分割拼成一个字符串,就是最后的结果。

完整JWT = 载荷 . 头部 . 签名

3.3 版本

相关包版本下载地址
Kafka2.12-2.2.0http://kafka.apache.org/downloads
Zookeeper3.4.8https://www.apache.org/dyn/closer.cgi/zookeeper/

Kafka2.0以上版本才支持此种验证

3.4 默认验证方式(非安全验证)

1)服务端配置:

(A)创建用于broker端通信的JAAS文件:

KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
unsecuredLoginStringClaim_sub="thePrincipalName"
unsecuredLoginListClaim_scope=",KAFKA_BROKER,LOGIN_TO_KAFKA"
unsecuredValidatorRequiredScope="LOGIN_TO_KAFKA"
unsecuredValidatorAllowableClockSkewMs="3000";
};

参数说明:

选项说明
unsecuredValidatorPrincipalClaimName=”value”默认为“sub”,如果需要修改在此定义
unsecuredValidatorScopeClaimName=”value”数值类型
unsecuredValidatorRequiredScope=”value”scope范围值,可配置字符串或者List
unsecuredValidatorAllowableClockSkewMs=”value”允许的时间偏差。单位:秒,默认0

举例:

KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
unsecuredLoginStringClaim_sub="admin";
};

admin用户作为sub的主题用户用于kafka的broker之间通讯。

(B)在启动jvm参数中加入该文件:

-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf 

(C)配置server.properties

listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
sasl.enabled.mechanisms=OAUTHBEARER

2)客户端配置:

(A)配置producer.properties/consumer.properties

security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism=OAUTHBEARER

(B)创建客户端通信的JAAS文件(或配置文件中增加配置项):

sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
unsecuredLoginStringClaim_sub="alice";

JAAS文件样本:

KafkaClient {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule Required
unsecuredLoginStringClaim_sub="thePrincipalName"
unsecuredLoginListClaim_scope="|scopeValue1|scopeValue2"
unsecuredLoginLifetimeSeconds="60";
};

参数说明:

选项说明
unsecuredLoginStringClaim_<claimname>=”value”创建一个值为“value”的claim(不能为iat/exp)
unsecuredLoginNumberClaim_<claimname>=”value”数值类型
unsecuredLoginListClaim_<claimname>=”value”List类型
unsecuredLoginPrincipalClaimName默认为“sub”,如果需要修改在此定义
unsecuredLoginLifetimeSeconds超时时间单位:秒,默认3600
unsecuredLoginScopeClaimName默认为“scope”,如果需要修改在此定义

优先级:配置文件 > JAAS文件

(C)验证过程:

在3.4中我们列出了Kafka提供的一套非安全的验证用于非正式环境,通过创建和验证JWT实现验证。只要配置对应JAAS文件即可,本节我们来看一下验证原理。

通过阅读官方文档了解到Kafka提供了一个接口,通过实现接口,创建OAuthBearer token和验证OAuthBearer token。

此类一共有两个方法,查看此接口的继承关系:

通过名字不难猜到这两个就是Kafka提供的默认的非安全的验证类

OAuthBearerUnsecuredValidatorCallbackHandler
OAuthBearerUnsecuredLoginCallbackHandler

先看OAuthBearerUnsecuredLoginCallbackHandler的configure方法:在saslMechanism是OAUTHBEARER并且jaasConfigEntries存在的情况下为moduleOptions赋值。moduleOptions保存的是JAAS里面的各种配置项。

handleCallback方法:取出配置的选项值,并组成claimsJson和headerJson,封装成OAuthBearerUnsecuredJws对象,把值赋给OAuthBearerTokenCallback。

接下来看OAuthBearerUnsecuredValidatorCallbackHandler:

在handleCallback里对callback里面的值进行校验,如果校验通过则返回OAuthBearerUnsecuredJws,验证成功,否则抛出异常。

3.5 安全验证

Kafka官方文档中说明:

Production use cases will require writing an implementation of org.apache.kafka.common.security.auth.AuthenticateCallbackHandler that can handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback and declaring it via either the sasl.login.callback.handler.class configuration option for a non-broker client or via the listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class configuration option for brokers (when SASL/OAUTHBEARER is the inter-broker protocol). Production use cases will also require writing an implementation of org.apache.kafka.common.security.auth.AuthenticateCallbackHandler that can handle an instance of org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback and declaring it via the listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class broker configuration option.

分别需要编写两个实现类处理OAuthBearerTokenCallback并分别在:

listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class
listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class或者sasl.login.callback.handler.class

3.6 配置

上一节我们讨论了Kafka默认提供的两个实现类,分别实现了AuthenticateCallbackHandler接口,并验证了基于JWT格式的token。受此启发我们可以编写自己的实现类。

1)服务端配置

(A)创建JAAS文件:

KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
LoginStringClaim_sub="admin";
};

(B)在启动jvm参数中加入该文件:

-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf

(C)配置server.properties:

listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
sasl.enabled.mechanisms=OAUTHBEARER

2)实现接口:此处只给出思路,具体验证过程根据业务需求定制,我们定义两个类:

Oauth2AuthenticateLoginCallbackHandler
Oauth2AuthenticateValidatorCallbackHandler
public class Oauth2AuthenticateValidatorCallbackHandler implements AuthenticateCallbackHandler {

 ... 此处省略无关代码...

    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        if (!isConfigured())
            throw new IllegalStateException("Callback handler not configured");
        for (Callback callback : callbacks) {
            if (callback instanceof OAuthBearerValidatorCallback)
                try {
                    OAuthBearerValidatorCallback validationCallback = (OAuthBearerValidatorCallback) callback;
                    handleCallback(validationCallback);
                } catch (KafkaException e) {
                    throw new IOException(e.getMessage(), e);
                }
            else
                throw new UnsupportedCallbackException(callback);
        }
    }

    private void handleCallback(OAuthBearerValidatorCallback callback) {
        String accessToken = callback.tokenValue();
        if (accessToken == null)
            throw new IllegalArgumentException("Callback missing required token value");

        log.info("Trying to introspect Token!");
        OauthBearerTokenJwt token = OauthHttpClient.introspectBearer(accessToken);
        log.info("Trying to introspected");

        // Implement Check Expire Token..
        long now = time.milliseconds();
        if (now > token.expirationTime()) {
            OAuthBearerValidationResult.newFailure("Expired Token, needs refresh!");
        }

        log.info("Validated! token..");
        callback.token(token);
    }
}

public class Oauth2AuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {

 ... 此处省略无关代码...

    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        if (!isConfigured())
            throw new IllegalStateException("Callback handler not configured");
        for (Callback callback : callbacks) {
            if (callback instanceof OAuthBearerTokenCallback)
                try {
                    handleCallback((OAuthBearerTokenCallback) callback);
                } catch (KafkaException e) {
                    throw new IOException(e.getMessage(), e);
                }
            else
                throw new UnsupportedCallbackException(callback);
        }
    }

    private void handleCallback(OAuthBearerTokenCallback callback) {
        if (callback.token() != null)
            throw new IllegalArgumentException("Callback had a toke" +
                    "n already");

        log.info("Try to acquire token!");
        OauthBearerTokenJwt token = OauthHttpClient.login(null);
        log.info("Retrieved token..");
        if (token == null) {
            throw new IllegalArgumentException("Null token returned from server");
        }
        callback.token(token);
    }
}
OauthBearerTokenJwt token = OauthHttpClient.login(null);
OauthBearerTokenJwt token = OauthHttpClient.introspectBearer(accessToken);

区别在于这两个方法:

  • login方法主要是客户端用自己的信息(可是是用户名/密码或者token)创建http或者https请求去Oauth服务器申请token,并封装OauthBearerTokenJwt返回。
  • introspectBearer方法利用自己的accessToken去Oauth服务器做验证(查询数据库,验证失效时间等等),验证成功后同样返回OauthBearerTokenJwt。

本人用springboot搭建一个简易的后端用来模拟Oauth服务。

小结

本节介绍了自Kafka2.0版本新增的SASL/OAUTHBEARER验证以及相关Oauth和JWT技术。分别介绍了Kafka默认的非安全验证方法和正式环境的验证实现方法。

SASL/OAUTHBEARER可以加密传输验证信息,自定义实现类处理创建/验证token。在此过程中可以对接数据库,便于持久化用户权限信息。

尚未解决的问题:权限细粒度不知如何控制,对生产数据,消费数据,分组信息的控制暂时没有找到方法。

Image placeholder
Jiaongfei
未设置
  88人点赞

没有讨论,发表一下自己的看法吧

推荐文章
美团点评Kubernetes集群管理实践

背景作为国内领先的生活服务平台,美团点评很多业务都具有非常显著、规律的“高峰”和“低谷”特征。尤其遇到节假日或促销活动,流量还会在短时间内出现爆发式的增长。这对集群中心的资源弹性和可用性有非常高的要求

基于 Laravel 5.5 + H+UI 框架的权限管理后台

laravel玩的比较少,就自己写了管理系统来熟悉该框架,再次特地记录一下!目前功能只写了权限管理功能,如有不足之处,还请大拿们多多指教!介绍PHP框架:Laravel5.5 前端框架:H+后台主题U

Hyperf 权限管理组件 hyperf-permission 发布

本人正在申请版主,还望各位多评论,收藏,点赞GITHUB:https://github.com/donjan-deng/hyperf-perm...欢迎star,欢迎pr.Hyperf权限管理组件sp

基于 Laravel6.x 构建的博客应用,支持 Markdown,支持 RBAC 权限管理

基于Laravel6.x构建的博客应用,支持Markdown,支持图片拖拽上传,基于RBAC权限管理系统首页基于RBAC的权限管理后台,Dashboard页面统计了用户总数、文章发布总数、评论率、评论

Knative 实战:基于 Kafka 实现消息推送

导读:当前在Knative中已经提供了对Kafka事件源的支持,那么如何基于Kafka实现消息推送呢?本文作者将以阿里云Kafka产品为例,给大家解锁这一新的姿势。作者| 元毅 阿里云智能事业群高级开

如何基于 Kafka 构建一个关系型数据库

在这篇文章里,我将分享如何通过扩展KCache(https://github.com/rayokota/kcache)来实现一个全功能的关系型数据库,我把这个数据库叫作KarelDB(https://

Spring Cloud Stream整合Kafka

引入依赖 org.springframework.cloud spring-cloud-stream-binder-kafka 或 org.springframework.cloud spr

Kafka 优秀的架构设计!它的高性能是如何保证的?

应大部分的小伙伴的要求,今天这篇咱们用大白话带你认识Kafka。Kafka 基础消息系统的作用大部分小伙伴应该都清楚,这里用机油装箱举个例子:所以消息系统就是如上图我们所说的仓库,能在中间过程作为缓存

全网最通俗易懂的Kafka入门

前言只有光头才能变强。文本已收录至我的GitHub仓库,欢迎Star:https://github.com/ZhongFuCheng3y/3y众所周知,消息队列的产品有好几种,这里我选择学习Kafka

Kafka 如何优化内存缓冲机制造成的频繁 GC 问题?

目录1、Kafka的客户端缓冲机制2、内存缓冲造成的频繁GC问题3、Kafka设计者实现的缓冲池机制4、总结一下“ 这篇文章,给大家聊一个硬核的技术知识,我们通过Kafka内核源码中的一些设计思想,来

Kafka 集群在马蜂窝大数据平台的优化与应用扩展

马蜂窝技术原创文章,更多干货请订阅公众号:mfwtechKafka是当下热门的消息队列中间件,它可以实时地处理海量数据,具备高吞吐、低延时等特性及可靠的消息异步传递机制,可以很好地解决不同系统间数据的

通过一个示例了解kafka connect连接器

什么是kafka连接器connect在实际工作中使用kafka,有时候会有类似这样的场景。我们需要把某些数据源的数据导入到kafka,或者把kafka作为数据源导出数据。或者两种场景的需求都要。这算是

分布式场景下Kafka消息顺序性的思考

在业务中使用kafka发送消息异步消费的场景,并且需要实现在消费时实现顺序消费,利用kafka在partition内消息有序的特点,实现消息消费时的有序性。1、在发送消息时,通过指定partition

从理论到案例,请收下这篇 Nginx 监控运维干货

Nginx(“enginex”)是一个开源、免费、高性能的HTTP和反向代理服务器,也可以用于IMAP/POP3代理服务器。充分利用Nginx的特性,可以有效解决流量高并发请求、cc攻击等问题。本文

干货:计算机网络知识总结

一计算机概述 (1),基本术语 结点(node): 网络中的结点可以是计算机,集线器,交换机或路由器等。 链路(link): 从一个结点到另一个结点的一段物理线路。中间没有任何其他交点。 主机

干货:构建复杂的 Eloquent 搜索过滤

最近,我需要在开发的事件管理系统中实现搜索功能。一开始只是简单的几个选项(通过名称,邮箱等搜索),到后面参数变得越来越多。 今天,我会介绍整个过程以及如何构建灵活且可扩展的搜索系统。如果你想查看代码

干货满满 | 来自灵魂的拷问-这21道Redis面试题

1、使用redis有哪些好处?速度快,因为数据存在内存中,类似于HashMap,HashMap的优势就是查找和操作的时间复杂度都是O(1)支持丰富数据类型,支持string,list,set,sort

干货丨爱奇艺CDN IPv6系统配置

1.背景  IPv6是“InternetProtocolVersion6”(互联网协议第6版)的缩写,是互联网工程任务组(IETF)设计的用于替代IPv4的下一代IP协议。IPv4地址资源紧缺严重制约

DTCC 干货 | 腾讯营销数据平台

摘要:广告平台是一个数据驱动的平台,数据在系统中高效流动,形成闭环,产生价值。腾讯广告系统每天有上百亿次请求量,以及上百T的数据,保证数据流的稳定可靠和高性能是数据系统的核心问题。对于数据分析场景,腾

干货 | 每天十亿级数据更新,秒出查询结果,ClickHouse在携程酒店的应用

本文转自 |携程技术中心 作者 |蔡岳毅作者简介蔡岳毅,携程酒店大数据高级研发经理,负责酒店数据智能平台研发,大数据技术创新工作。喜欢探索研究大数据的开源技术框架。一、背景1)携程酒店每天有上千表,累

干货!IP桌面电话选购的五大要点

随着人们对高效便捷工作方式的追求,移动办公和分布式团队愈发普及。协作空间已不再局限于常规的会议室,它可能是一张办公桌或提供Wi-Fi的咖啡厅,空间风格也逐渐向轻松随意的氛围转变。如何随时、随地利用任何

干货 | 阿里巴巴HBase高可用8年抗战回忆录

前言2011年毕玄和竹庄两位大神将HBase引入阿里技术体系,2014年接力棒转到东8区第一位HBasecommiter天梧手中,多年来与淘宝、旺旺、菜鸟、支付宝、高德、大文娱、阿里妈妈等几乎全BU合

干货 | 揭秘京东数科强一致、高性能的分布式事务中间件JDTX

导读:在分布式数据库、云原生数据库、NewSQL等名词在数据库领域层出不穷的当今,变革——在这个相对稳定的领域已愈加不可避免。相比于完全革新,渐进式增强的方案在拥有厚重沉淀的行业则更受青睐。同所有分布

项目管理最佳实践,企业如何进行有效的项目管理

前言:企业在划分项目时,可按照项目的复杂程度、管理范围等将项目分为三个级别,分别是企业级、部门级和小组级(与目标划分原则相同),然后将每一级的目标与项目对应起来。我们知道,企业制定的目标(OKR),一

使用 Node.js 以来的感想与总结

使用PHP时期 在使用php的时候,曾经用的最多的框架就是ThinkPHP,后来尝试体验过LaravelorYii,反正不管用啥框架,PHP的那一堆环境是必不可少,虽然现在有很多集成环境或各种各样的