云服务器内容精选

  • 代码样例 Token认证机制支持API,用户可在二次开发样例的Producer()和Consumer()中对其进行配置。 Producer()配置的样例代码如下: public static Properties initProperties() { Properties props = new Properties(); KafkaProperties kafkaProc = KafkaProperties.getInstance(); // Broker地址列表 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // 客户端ID props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); // Key序列化类 props.put(KEY_SERIALIZER, kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); // Value序列化类 props.put(VALUE_SERIALIZER, kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); // 服务名 props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); // 域名 props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); // 分区类名 props.put(PARTITIONER_NAME, kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner")); // 生成Token配置 StringBuilder token = new StringBuilder(); String LINE_SEPARATOR = System.getProperty("line.separator"); token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR); /** * 用户自己生成的Token的TOKENID */ token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR); /** * 用户自己生成的Token的HMAC */ token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR); token.append("tokenauth=true;"); // 用户使用的SASL机制,配置为SC RAM -SHA-512 props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", token.toString()); return props; } Consumer()配置的样例代码如下: public static Properties initProperties() { Properties props = new Properties(); KafkaProperties kafkaProc = KafkaProperties.getInstance(); // Broker连接地址 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // Group id props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); // 是否自动提交offset props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); // 自动提交offset的时间间隔 props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000")); // 会话超时时间 props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000")); // 消息Key值使用的反序列化类 props.put(KEY_DESERIALIZER, kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); // 消息内容使用的反序列化类 props.put(VALUE_DESERIALIZER, kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); // 安全协议类型 props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); // 服务名 props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); // 域名 props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); // 生成Token配置 StringBuilder token = new StringBuilder(); String LINE_SEPARATOR = System.getProperty("line.separator"); token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR); /** * 用户自己生成的Token的TOKENID */ token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR); /** * 用户自己生成的Token的HMAC */ token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR); token.append("tokenauth=true;"); // 用户使用的SASL机制,配置为SCRAM-SHA-512 props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", token.toString()); return props; }
  • 代码样例 Token认证机制支持API,用户可在二次开发样例的Producer()和Consumer()中对其进行配置。 Producer()配置的样例代码如下: public static Properties initProperties() { Properties props = new Properties(); KafkaProperties kafkaProc = KafkaProperties.getInstance(); // Broker地址列表 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // 客户端ID props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); // Key序列化类 props.put(KEY_SERIALIZER, kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); // Value序列化类 props.put(VALUE_SERIALIZER, kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); // 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); // 服务名 props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); // 域名 props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); // 分区类名 props.put(PARTITIONER_NAME, kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner")); // 生成Token配置 StringBuilder token = new StringBuilder(); String LINE_SEPARATOR = System.getProperty("line.separator"); token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR); /** * 用户自己生成的Token的TOKENID */ token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR); /** * 用户自己生成的Token的HMAC */ token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR); token.append("tokenauth=true;"); // 用户使用的SASL机制,配置为SCRAM-SHA-512 props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", token.toString()); return props; } Consumer()配置的样例代码如下: public static Properties initProperties() { Properties props = new Properties(); KafkaProperties kafkaProc = KafkaProperties.getInstance(); // Broker连接地址 props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // Group id props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); // 是否自动提交offset props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); // 自动提交offset的时间间隔 props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000")); // 会话超时时间 props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000")); // 消息Key值使用的反序列化类 props.put(KEY_DESERIALIZER, kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); // 消息内容使用的反序列化类 props.put(VALUE_DESERIALIZER, kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); // 安全协议类型 props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); // 服务名 props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); // 域名 props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); // 生成Token配置 StringBuilder token = new StringBuilder(); String LINE_SEPARATOR = System.getProperty("line.separator"); token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR); /** * 用户自己生成的Token的TOKENID */ token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR); /** * 用户自己生成的Token的HMAC */ token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR); token.append("tokenauth=true;"); // 用户使用的SASL机制,配置为SCRAM-SHA-512 props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", token.toString()); return props; }