菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
422
0

python-grpc

原创
05/13 14:22
阅读数 80846

一:安装

  1. 安装 grpc : pip install grpcio
  2. 安装 grpc tools
    1. pip install grpcio-tools
    2. 包含 protocol buffer 编译器,用于从 .proto 文件生成服务端和客户端代码的插件

二:编写.proto 文件

介绍:

grpc默认使用文件,结构化数据存储格式,适合做数据存储或RPC数据交换格式

基本使用:

定义接口
service DynamicMasker{
    // 动态脱敏接口
    rpc DataMasker(DataMaskerReq) returns(DataMaskerResp) {}
    // 数据识别接口
    rpc DataIdentity(DataIdentityReq) returns(DataIdentityResp) {}
}
分配表示符:

每个字段都有唯一的一个数字标识符

[1,15] :占用一个字节

[16,2047] :占用两个字节

数据标识符如上的1,2,按顺序。

指定字段规则
  1. sigular

  2. repeated: 表示这个字段可以重复任意多次(包括0次)

    message DataMaskerReq{
        repeated FieldData field_data_list = 1;
    }
    
    message FieldData{
        string field_name  = 1;  // 字段名称
        string data_type = 2;  // 脱敏类型
        repeated FieldValues field_values_list = 3;  // 脱敏数据列表
    }
    message FieldValues {
        string field_value = 1;
    }
    

    与 json 格式相对照:

    {
        field_data_list:[
            {
                'field_name': 'id',
                'data_type': 'mobile',
                'field_values_list': [
                    {'field_value': '1'},
                    {'field_value': '2'}
                ]
            },
            {
                'field_name': 'name',
                'data_type': 'mobile',
                'field_values_list': [
                    {'field_value': '1'},
                    {'field_value': '2'},
                    {'field_value': '3'}
                ]
            }
        ]
    }
    
标量数值类型
.proto 文件类型 python类型
double float
float float
int32 int
unit32 int/long
bool bool
默认值
  • string 默认值为空string
  • bytes 默认值为空bytes
  • bool 默认值为false
  • 数值类型 默认值为0
  • 枚举类型 默认值为第一个定义的枚举值,必须为0

注意:当定义了一个接口返回字段为 code,code为0时,接口返回的数据中不会出现 code 这个字段。

枚举
enum ImportType{
    IMPOERT_TYPE_UNKNOWN = 0;  // 第一个定义的枚举值,必须为0
    IMPOERT_TYPE_APPEND = 1; // 追加
    IMPOERT_TYPE_FULLCOVER = 2; // 全量覆盖
}
使用其他消息类型
message Auth{
    string organization_uuid = 1;
    string project_uuid = 2;
    string user_uuid = 3;
    string task_uuid = 4;

}

message RDBMSSqlExportDataReq{
    Auth auth = 1;//权限相关
    string source_uri = 2; //来源数据源
    string sql = 3; //sql语句
}

完整的 .proto 文件

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.dynamic_masker";

package dynamic_masker;

service DynamicMasker{
    // 动态脱敏接口
    rpc DataMasker(DataMaskerReq) returns(DataMaskerResp) {}
    // 数据识别接口
    rpc DataIdentity(DataIdentityReq) returns(DataIdentityResp) {}
}

message DataMaskerReq{
    repeated FieldData field_data_list = 1;
}

message FieldData{
    string field_name  = 1;  // 字段名称
    string masker_algorithm = 2;  // 脱敏类型
    repeated FieldValues field_values_list = 3;  // 脱敏数据列表
}

message FieldValues {
    string field_value = 1;
}

message DataMaskerResp{
    int32 code = 1;  // 返回状态码 0 成功 -1失败
    string msg = 2;  // 失败时,返回错误信息
    repeated MaskedData masked_data_list = 4;  // 返回脱敏后的数据
}

message MaskedResult{
    int32 total_count = 1;  // 脱敏数据总量
    int32 failed_count = 2;  // 脱敏失败数量
}

message MaskedData{
    string field_name = 1;  //字段名称
    repeated FieldValues field_values_list = 2;  // 脱敏后的数据列表
    MaskedResult masked_result = 3; // 每个字段脱敏数量
}

message DataIdentityReq{
    repeated DataIdentityData field_data_list = 1;
}

message DataIdentityResp{
    int32 code = 1;  // 返回状态码 0 成功 -1失败
    string msg = 2;  // 失败时,返回错误信息
    repeated IdentityHitData field_hit_result = 3;
}

message IdentityHitData{
    string field_name = 1;
    string hit = 2;
    string missed_hit = 3;
}

message DataIdentityData{
    string field_name  = 1;  // 字段名称
    string data_type = 2;  // 脱敏类型
    repeated FieldValues field_values_list = 3;  // 脱敏数据列表
}

三、编译 .proto 文件

编译 .proto 文件时,编译器将生成所选语言的代码

编译命令

python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. dynamic_masker.proto
  • python -m 把库模块当做脚本运行
  • -I 指定 .proto 文件所在路径,proto 文件目录
  • --python_out=. 编译 .proto 文件所生成的文件路径,这里生成到当前目录
  • --grpc_python_out=. 编译生成处理 grpc 相关的代码的路径,这里生成到当前目录

编译后生成的文件

  • dynamic_masker_pb2.py 用来和 protobuf 数据进行交互
  • dynamic_masker_pb2_grpc.py 用来和 grpc 进行交互

四、客户端、服务端代码

服务端 server.py

from grpc_lib import dynamic_masker_pb2
from concurrent import futures
from grpc_lib import dynamic_masker_pb2_grpc
from dm_utils.dm_core import masker_text, data_identity
from dm_utils.dm_algorithm import DMAlg
import grpc
import time
import os


_ONE_DAY_IN_SECONDS = 60 * 60 * 24


class DynamicMasker(dynamic_masker_pb2_grpc.DynamicMaskerServicer):
    def DataMasker(self, request, context):
        algorithm = DMAlg()
        masked_data_list = []
        for field_data in request.field_data_list:
            masked_data = {}
            field_values_list = field_data.field_values_list
            masker_algorithm = field_data.masker_algorithm
            masked_data['field_name'] = field_data.field_name
            masked_data['field_values_list'] = []
            total_count = len(field_values_list)
            if masker_algorithm == 'shuffle':
                # 调用shuffle功能
                masked_data['field_values_list'] = algorithm.shuffle(list(field_values_list))
                masked_data['masked_result'] = {'total_count': total_count, 'failed_count': total_count}
                masked_data_list.append(masked_data)
            else:
                failed_count = 0
                for item in field_values_list:
                    try:
                        masked_value = masker_text(item.field_value, masker_algorithm)
                    except Exception as e:
                        return dynamic_masker_pb2.DataMaskerResp(
                            code=-1,
                            msg=str(e),
                            masked_data_list=masked_data_list
                        )
                    if not masked_value:
                        failed_count += 1
                    else:
                        masked_data['field_values_list'].append({'field_value': masked_value})
                masked_data['masked_result'] = {'total_count': total_count, 'failed_count': failed_count}
                masked_data_list.append(masked_data)

        return dynamic_masker_pb2.DataMaskerResp(
            code=0,
            msg='success',
            masked_data_list=masked_data_list
        )

    def DataIdentity(self, request, context):
        field_hit_result = []
        for field_data in request.field_data_list:
            field_name = field_data.field_name
            data_type = field_data.data_type
            field_values_list = field_data.field_values_list
            identity_data_result = {}
            hit = 0
            missed_hit = 0
            for item in field_values_list:
                try:
                    result = data_identity(item.field_value, data_type)
                except Exception as e:
                    return dynamic_masker_pb2.DataMaskerResp(
                        code=-1,
                        msg=str(e),
                        masked_data_list=field_hit_result
                    )
                if not result:
                    missed_hit += 1
                else:
                    hit += 1
            identity_data_result['field_name'] = field_name
            identity_data_result['hit'] = str(hit)
            identity_data_result['missed_hit'] = str(missed_hit)
            field_hit_result.append(identity_data_result)
        return dynamic_masker_pb2.DataIdentityResp(
            code=0,
            msg='success',
            field_hit_result=field_hit_result
        )


if __name__ == '__main__':
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=int(os.environ['MAX_WORKERS'])))
    dynamic_masker_pb2_grpc.add_DynamicMaskerServicer_to_server(DynamicMasker(), server)
    server.add_insecure_port('0.0.0.0:8000')
    server.start()
    print("sever is opening ,waiting for message...")
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)

客户端 client.py

from grpc_lib import dynamic_masker_pb2_grpc, dynamic_masker_pb2
import grpc


def run():
    channel = grpc.insecure_channel('127.0.0.1:31003')
    stub = dynamic_masker_pb2_grpc.DynamicMaskerStub(channel)
    response = stub.DataMasker(dynamic_masker_pb2.DataMaskerReq(
        field_data_list=[
            {
                'field_name': 'id',
                'masker_algorithm': 'mask',
                'field_values_list': [
                    {'field_value': '1'},
                    {'field_value': '2'}
                ]

            },
            {
                'field_name': 'name',
                'masker_algorithm': 'mask',
                'field_values_list': [
                    {'field_value': '1'},
                    {'field_value': '2'},
                    {'field_value': '3'}
                ]

            }
        ]
    ))
    print("Client received: ", response)
    
if __name__ == '__main__':
    run()

发表评论

0/200
422 点赞
0 评论
收藏