MAPREDUCE服务 MRS-使用Kafka Token认证:代码样例

时间:2024-07-29 08:59:50

代码样例

Token认证机制支持API,用户可在二次开发样例的Producer()Consumer()中对其进行配置。

  • Producer()配置的样例代码如下:
    public static Properties initProperties() {
    	Properties props = new Properties();
    	KafkaProperties kafkaProc = KafkaProperties.getInstance();
    
    	// Broker地址列表
    	props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
    	// 客户端ID
    	props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer"));
    	// Key序列化类
    	props.put(KEY_SERIALIZER,
    			kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
    	// Value序列化类
    	props.put(VALUE_SERIALIZER,
    			kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
    	// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
    	props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
    	// 服务名
    	props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
    	//  域名 
    	props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
    	// 分区类名
    	props.put(PARTITIONER_NAME,
    			kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner"));
    	// 生成Token配置
    	StringBuilder token = new StringBuilder();
    	String LINE_SEPARATOR = System.getProperty("line.separator");
    	token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR);
    	/**
    	 * 用户自己生成的Token的TOKENID
    	 */
    	token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR);
    	/**
    	 * 用户自己生成的Token的HMAC
    	 */
    	token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR);
    	token.append("tokenauth=true;");
    	// 用户使用的SASL机制,配置为SC RAM -SHA-512
    	props.put("sasl.mechanism", "SCRAM-SHA-512");
    	props.put("sasl.jaas.config", token.toString());
    
    	return props;
    }
  • Consumer()配置的样例代码如下:
    public static Properties initProperties() {
    	Properties props = new Properties();
    	KafkaProperties kafkaProc = KafkaProperties.getInstance();
            // Broker连接地址
            props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
            // Group id
            props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer"));
            // 是否自动提交offset
            props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true"));
            // 自动提交offset的时间间隔
            props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000"));
            // 会话超时时间
            props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000"));
            // 消息Key值使用的反序列化类
            props.put(KEY_DESERIALIZER,
                kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"));
            // 消息内容使用的反序列化类
            props.put(VALUE_DESERIALIZER,
                kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"));
            // 安全协议类型
            props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
            // 服务名
            props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
            // 域名
            props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
    	// 生成Token配置
    	StringBuilder token = new StringBuilder();
    	String LINE_SEPARATOR = System.getProperty("line.separator");
    	token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR);
    	/**
    	 * 用户自己生成的Token的TOKENID
    	 */
    	token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR);
    	/**
    	 * 用户自己生成的Token的HMAC
    	 */
    	token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR);
    	token.append("tokenauth=true;");
    	// 用户使用的SASL机制,配置为SCRAM-SHA-512
    	props.put("sasl.mechanism", "SCRAM-SHA-512");
    	props.put("sasl.jaas.config", token.toString());
    
    	return props;
    }
support.huaweicloud.com/devg-lts-mrs/mrs_07_130021.html