flink使用kafka为数据源

摘要:
版本>org.apache.frink<${flink.版本}<版本>1.2.16</版本>&书信电报;importorg.apach.flink.api.java.tuple.Tuple3;importorg.apache.flink.streaming.api.datastream.datastream;
<flink.version>1.10.2</flink.version>

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
            <scope>runtime</scope>
        </dependency>

1.增加flink依赖

2.代码实现

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.guava18.com.google.common.base.Strings;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;
public class KafkaExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.1.85:9092,192.168.1.86:9092,192.168.1.87:9092");
        properties.setProperty("group.id", "g2");
        DeserializationSchema<String> deserializationSchema = new SimpleStringSchema();
        String topic = "customer_statusChangedEvent";
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> text = env.addSource(
                new FlinkKafkaConsumer<String>(topic, deserializationSchema, properties));

        DataStream<Tuple2<String, Integer>> dataStream = text
                .filter(p -> !Strings.isNullOrEmpty(p))
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String p) throws Exception {
                        CustomerStatusChangedEvent event = JsonHelper.fromJson(p, CustomerStatusChangedEvent.class);
                        return new Tuple2<String, Integer>(Long.toString(event.getCustomerId()),

                                event.getNewStatus());
                    }
                });

        dataStream.print();
        try {
            env.execute("Flink-Kafka");
        } catch (Exception ex) {

        }
    }
}
public  class CustomerStatusChangedEvent {
    private Long customerId;
    @JsonProperty("nStatus")
    private Integer newStatus;
    @JsonProperty("oStatus")
    private Integer oldStatus;

    public Long getCustomerId() {
        return customerId;
    }

    public void setCustomerId(Long customerId) {
        this.customerId = customerId;
    }

    public Integer getNewStatus() {
        return newStatus;
    }

    public void setNewStatus(Integer newStatus) {
        this.newStatus = newStatus;
    }

    public Integer getOldStatus() {
        return oldStatus;
    }

    public void setOldStatus(Integer oldStatus) {
        this.oldStatus = oldStatus;
    }
}

4.输出

当kafka的对应的topic有数据产生时, 将输出 类似下面数据

(5010,1)

免责声明:文章转载自《flink使用kafka为数据源》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇C++中随机数的生成nohup命令及其输出文件下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

WPF ListBox CheckBox 里面如何添加字段进行显示

                                                             <Grid DockPanel.Dock="Top" Height="300"> <DockPanel> <Grid DockPanel.Dock="Left" Margin="180,0,0,0">...

利用span设置文字固定宽度

<input type="radio" name="dispMode" value="manul"/><label for="rdoManul"><span class="modeText">手动下载</span></label> <input type=...

进阶:案例五: Dynamic 创建 Business Graphic

效果图: step: 无需节点无需UI 1、添加属性 2、代码: method WDDOMODIFYVIEW . DATA:lr_graph TYPE REF TO cl_wd_business_graphics, lr_cat TYPE REF TO cl_wd_category, lr_series TYPE REF...

a 标签解决,新窗口打开跨域问题

<a href="https://xxxx.vip/home" rel="nofollow noreferrer" target="_blank"><img src="images/logo.png" alt="新窗口打开"></a> 重点在:rel属性...

Uni-app基础实战富文本框解析 WordPress rest api实例(二)

Uni-app基础实战富文本框解析 WordPress rest api实例 文本是更具上篇文章uni-app上下拉刷新的续文有需要了解上文的请点击下面连接访问 传送门: Uni-app实战上加载新下拉刷新 WordPress rest api实例 那么我们就开始了,主要的要是去介绍了以下一个插件的使用方式。官方的富文本框有markdown和html两种方...

Linux下计划任务以及crontab权限问题

在Linux工作环境下,我们有时可能会需要在未来某个时间执行某个命令或脚本,但是我们又不可能定个闹钟,然后到点了再去执行吧,这多麻烦。还好我们的Linux系统这么强大,提供了任务计划这个功能,我们就不需要守着点去执行相应的命令或脚本了。当我们定义好了任务计划之后,就可以去做别的事情了,等到了我们自己定义的那个时间点,你所定义的任务操作系统会自动执行,这就被...