网站首页> 文章专栏> Java实现Flink集成Kafka生产者
Java实现Flink集成Kafka生产者
路人王 天津 2020-03-12 161 0 0

代码如下:

/**
 * KafkaSink 生产者下沉到消费者
 *
 * Created by 王一宁 on 2018/10/23.
 */
public class StreamingKafkaSink {

    public static void main(String[] args) throws Exception {
        //获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //checkpoint配置
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //设置statebackend
        //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));

        DataStreamSource<String> text = env.socketTextStream("hadoop1", 9001, "\n");

        String brokerList = "hadoop1:9092";
        String topic = "t1";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers",brokerList);

        //第一种解决方案,设置FlinkKafkaProducer011里面的事务超时时间
        //设置事务超时时间
        //prop.setProperty("transaction.timeout.ms",60000*15+"");
        //第二种解决方案,设置kafka的最大事务超时时间
        //FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());

        //使用仅一次语义的kafkaProducer
        FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
        text.addSink(myProducer);

        env.execute("StreamingFromCollection");

    }
}

评论

评论  分享  打赏