网站首页> 文章专栏> Java模拟读取本地数据到Flink集成的Kafka并消费数据
Java模拟读取本地数据到Flink集成的Kafka并消费数据
路人王 天津 187 0 0

1.java实现读取本地数据到kafka生产数据

/**
 * Created by 王一宁 on 2019/11/6.
 */
public class kafkaProducer {

    public static void main(String[] args) throws Exception{
        Properties prop = new Properties();
        //指定kafka broker地址
        prop.put("bootstrap.servers", "hadoop1:9092");
        //指定key value的序列化方式
        prop.put("key.serializer", StringSerializer.class.getName());
        prop.put("value.serializer", StringSerializer.class.getName());
        //指定topic名称
        String topic = "wang";

        //创建producer链接
        KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);
        //创建Java IO
        InputStream file = new FileInputStream("D:\\APP\\IDEA\\workplace\\FlinkTurbineFaultDiagnosis\\src\\main\\resources\\turbine\\GW20000120160101.txt");
        InputStreamReader fileInputStream = new InputStreamReader(file);
        BufferedReader reader = new BufferedReader(fileInputStream);
        String line = null;
        while ((line = reader.readLine()) != null) {
            //生产消息
            producer.send(new ProducerRecord<String, String>(topic,line));
            Thread.sleep(1000);
        }
        reader.close();
        file.close();
        fileInputStream.close();

        //关闭链接
        producer.close();
    }
}

2.在linux服务器中,直接开启一个消费者,就可以看到生产的数据了,或者手写一个java消费者,消费同一个Topic的数据。

3.java实现flink集成kafka消费者的实现代码

/**
 * 消费Kafka中得数据
 * @author 王一宁
 * @date 2020/1/2 12:12
 */
public class StreamingFromKafka {
    public static void main(String[] args) throws Exception{
        //获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //kafka配置
        String topic = "wang";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","hadoop1:9092");//多个的话可以指定
        prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("auto.offset.reset","latest");
        prop.setProperty("group.id","consumer1");

        FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(), prop);
        //获取数据
        DataStream<String> text = env.addSource(myConsumer);

        //打印
        text.print().setParallelism(1);
        //执行
        //env.execute("StreamingFormCollection");
        env.execute();
    }
}

评论

评论  分享  打赏