1.java实现读取本地数据到kafka生产数据
/**
* Created by 王一宁 on 2019/11/6.
*/
public class kafkaProducer {
public static void main(String[] args) throws Exception{
Properties prop = new Properties();
//指定kafka broker地址
prop.put("bootstrap.servers", "hadoop1:9092");
//指定key value的序列化方式
prop.put("key.serializer", StringSerializer.class.getName());
prop.put("value.serializer", StringSerializer.class.getName());
//指定topic名称
String topic = "wang";
//创建producer链接
KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);
//创建Java IO
InputStream file = new FileInputStream("D:\\APP\\IDEA\\workplace\\FlinkTurbineFaultDiagnosis\\src\\main\\resources\\turbine\\GW20000120160101.txt");
InputStreamReader fileInputStream = new InputStreamReader(file);
BufferedReader reader = new BufferedReader(fileInputStream);
String line = null;
while ((line = reader.readLine()) != null) {
//生产消息
producer.send(new ProducerRecord<String, String>(topic,line));
Thread.sleep(1000);
}
reader.close();
file.close();
fileInputStream.close();
//关闭链接
producer.close();
}
}
2.在linux服务器中,直接开启一个消费者,就可以看到生产的数据了,或者手写一个java消费者,消费同一个Topic的数据。
3.java实现flink集成kafka消费者的实现代码
/**
* 消费Kafka中得数据
* @author 王一宁
* @date 2020/1/2 12:12
*/
public class StreamingFromKafka {
public static void main(String[] args) throws Exception{
//获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//kafka配置
String topic = "wang";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","hadoop1:9092");//多个的话可以指定
prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("auto.offset.reset","latest");
prop.setProperty("group.id","consumer1");
FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(), prop);
//获取数据
DataStream<String> text = env.addSource(myConsumer);
//打印
text.print().setParallelism(1);
//执行
//env.execute("StreamingFormCollection");
env.execute();
}
}
评论