目录
一、配置kafka信息
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.91.180:9092");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 手动提交
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // 自动提交时,提交时间
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "userfriend_group2");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 配置Kafka信息。消费消息
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("user_friends"));
二、配置hbase信息,连接hbase数据库
需要先在hbase创建对应的命令空间和table
Put对象,按照不同需求,从文件中截取对应字段
// 配置Kafka信息。消费消息
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);