利用Apache Flink对物联网及时数据阐发_利用apache
使用Apache Flink对物联网实时数据分析
1、前言
最近项目涉及物联网设备实时数据、采集、传输、展示,以及大数据预警报警等功能(thingsBoard+kafka+MongoDB)。统计分析模块采用的传统的基于数据库的统计分析,对于海量实时数据统计分析性能和响应速度压力巨大,要实现低延迟的实时计算和秒级多维实时查询有技术挑战。Apache Flink 在数据分析领域中应用广泛,其实时处理能力以及运算速度都能满足大规模数据处理的需求,可以与物联网技术结合,实现对海量传感器数据的实时分析和管理。而且社区活跃技术方面也不断的创新和优化,支持许多流行数据源,如Kafka、Hadoop HDFS、ES等,具有丰富的生态系统,Flink社区也提供了丰富的工具和库。这里主要介绍基本概念,和一个实际的示例。
2、基本概念
(1)、定义
Apache Flink 是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨在所有常见的集群环境中运行,以任意规模和内存级速度执行计算。
(2)、Flink架构
Flink 是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。Flink 运行时由两种类型的进程组成一个 JobManager 和一个或者多个 TaskManager。
对于分布式执行,Flink 将算子的 subtasks 链接成 tasks。每个 task 由一个线程执行。将算子链接成 task 是个有用的优化它减少线程间切换、缓冲的开销,并且减少延迟的增加整体吞吐量。
(3)流处理
在自然环境中,数据的产生原本就是流式的。无论是来自 Web 服务器的事件数据,证券交易所的交易数据,还是来自工厂车间机器上的传感器数据,其数据都是流式的。当你分析数据时,可以围绕 有界流(bounded)或 无界流(unbounded)两种模型来组织处理数据,,选择不同的模型,程序的执行和处理方式也都会不同。
批处理是有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。
流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,程序必须持续不断地对到达的数据进行处理。
在 Flink 中,应用程序由用户自定义算子转换而来的流式 dataflos 所组成。这些流式 dataflos 形成了有向图,以一个或多个源(source)开始,并以一个或多个汇(sink)结束。
核心的API,从开发步骤的角度来讲,主要分为四大部分Environment、Source、Transform、Sink,flink执行过程(env -> source -> transform -> sink)。
Environment,Flink Job在提交执行计算时,需要建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。
流式处理环境获取
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
批处理环境
val env = ExecutionEnvironment.getExecutionEnvironment
Source, Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源。数据源类型很多常用的包括集合类型(数据临时存储到内存中,形成特殊的数据结构后),文件中读取数据,Kafka中读取数据,用户可以自定义数据源。
Transform 算子,转换算子,把当前的DataStream转化为另一个DataStream。数据处理的核心,有很多种算子如下图
最常用的map、flatMap、Filter、keyBy、aggregation、reduce等算子,这些算子具体作用基本上看英文名字就知道了,使用方式很灵活支持拉姆达表达式。
KeyBy
KeyBy 算子将DataStream转换成一个KeyedStream。KeyedStream是一种特殊的DataStream,事实上,KeyedStream继承了DataStream,DataStream的各元素随机分布在各Task Slot(任务槽)中,KeyedStream的各元素按照Key分组,分配到各Task Slot中。我们需要向keyBy算子传递一个参数,以告知Flink以什么字段作为Key进行分组。
stream.keyBy(0)
或者
stream.keyBy(ne KeySelector() {
@Override
public String getKey(String x) thros Exception {
return x.();
}
})
aggregation,常见的聚合操作有sum、max、min等,这些聚合操作统称为aggregation。aggregation需要一个参数来指定按照哪个字段进行聚合。跟keyBy相似,我们可以使用数字位置来指定对哪个字段进行聚合,也可以使用字段名。与批处理不同,这些聚合函数是对流数据进行聚合,流数据是依次进入Flink的,聚合操作是对之前流入的数据进行统计聚合。sum算子的功能对该字段进行加和,并将结果保存在该字段上。min操作无法确定其他字段的数值。
stream.keyBy(0).sum(1).print() //0字段分组,1字段求和
reduce 在按照同一个Key分组的数据流上生效,它接受两个输入,生成一个输出,即两两合一地进行汇总操作,生成一个同类型的新元素。
Sink其实可以表示为将处理完成数据进行存储,或者将处理完的数据发送到指定的存储系统(比如Oracle、Kafka等)(官方提供了一部分的框架的sink,用户可自定义实现自己的sink)。
(4)、有状态的数据操作(Stateful Operations)
在流处理中,有些操作仅仅在某一时间针对单一事件(如事件转换map),有些操作需要记住多个事件的信息并进行处理(indo operators),后者的这些操作称为有状态的操作。有状态的操作一般被维护在内置的key/value存储中。这些状态信息会跟数据流一起分区并且分布存储,并且可以通过有状态的数据操作来访问。这些key/value的状态信息仅在带key的数据流(通过keyBy() 函数处理过)中才能访问到。数据流按照key排列能保证所有的状态更新都是本地操作,保证一致性且无事务问题。这种排列方式使Flink能够透明的再分发状态信息和调整数据流分区。
(5)、窗口
我们在操作无界数据流时,经常需要应对以下问题,我们经常把无界数据流分解成有界数据流聚合分析。
Flink 有一些内置的窗口分配器,如下
Fixed indos(固定窗口)在 Flink 中被也称为 Tumbling indos(滚动窗口),将时间切割成具有固定时间长度的段。滚动窗口之间不会重叠。
Sliding indos(滑动窗口)滑动窗口是滚动窗口更一般化的表现的形式,由窗口大小和滑动间隔这两个属性来定义。如果滑动间隔小于窗口大小,那么不同的窗口之间就会存在重叠;如果滑动间隔大于窗口大小,不同窗口之间就会存在间隔;如果滑动间隔等于窗口大小,就相当于滚动窗口。
Session Windos(会话窗口)和滚动窗口与滑动窗口不同的是,会话窗口并没有固定的窗口大小;它是一种动态窗口,通常由超时间隔(timeout gap)来定义。当超过一段时间没有新的事件到达,则可以认为窗口关闭了。
有三种最基本的操作窗口内的事件的选项: 像批量处理,ProcessWindoFunction 会缓存 Iterable 和窗口内容,供接下来全量计算;或者像流处理,每一次有事件被分配到窗口时,都会调用 ReduceFunction 或者 AggregateFunction 来增量计算;或者结合两者,通过 ReduceFunction 或者 AggregateFunction 预聚合的增量计算结果在触发窗口时, 提供给 ProcessWindoFunction 做全量计算。
stream
.keyBy()
.indo()
.reduce|aggregate|process();
示例代码:
DataStream input = ...;
input
.keyBy(x -> x.key)
.indo(TumblingEventTimeWindos.of(Time.minutes(1)))
.process(ne MyWastefulMax());
public static class MyWastefulMax extends ProcessWindoFunction<
SensorReading, // 输入类型
Tuple3, // 输出类型
String, // 键类型
TimeWindo> { // 窗口类型
@Override
public void process(
String key,
Context context,
Iterable events,
Collector> out) {
int max = 0;
for (SensorReading event : events) {
max = Math.max(event.value, max);
}
out.collect(Tuple3.of(key, context.indo().getEnd(), max));
}
}
(6)、Watermark
怎么确定一个窗口是否已经结束,这在流式数据处理系统中并非一个很容易解决的问题。如果窗口是基于处理时间的,那么问题确实容易解决,因为处理时间是完全基于本地时钟的;如果窗口基于事件时间,由于分布式系统中消息可能存在延迟、乱序到达的问题,即便系统已经接收到窗口边界以外的数据了,也不能确定前面的所有数据都已经到达了。
(7)、Checkpoint
Flink 的 Checkpoint 机制是其可靠性的基石。当一个任务在运行过程中出现故障时,可以根据 Checkpoint 的信息恢复到故障之前的某一状态,然后从该状态恢复任务的运行。 在 Flink 中,Checkpoint 机制采用的是 chandy-lamport(分布式快照)算法,通过 Checkpoint 机制,保证了 Flink 程序内部的 Exactly Once 语义。
3、安装部署
(1)、下载
官网(archive.apache./dist/flink/),flink-1.9.3-bin-scala_2.12.tgz下载后解压到硬盘目录,
源码地址github./apache/flink
(2)、运行(需要安装java运行环境)
只需要进入到解压目录的bin目录下,运行start-cluster.bat
(3)、访问UI
(4)、运行示例
命令行输入
flink.bat run D:flinkexamplesbatchWordCount.jar -input D:flinkREADME.txt -output D:f
linkREADME_CountWord_Result.txt
可以在管理界面查看任务运行情况
4、应用示例
创建流处理环境,创建一个Java maven项目,pom文件中引入需要包
pom文件
<?xml version="1.0" encoding="UTF-8"?>
4.0.0
.example
mvntest
1.0-SNAPSHOT
mvntest
http://.example.
UTF-8
1.7
1.7
junit
junit
4.11
test
.apache.flink
flink-connector-kafka_2.12
1.9.3
test
.apache.flink
flink-java
1.9.3
.apache.flink
flink-scala_2.12
1.9.3
.apache.flink
flink-clients_2.12
1.9.3
.apache.flink
flink-streaming-scala_2.12
1.9.3
provided
.apache.flink
flink-connector-kafka_2.12
1.9.3
pile
maven-clean-plugin
3.1.0
maven-resources-plugin
3.0.2
maven-piler-plugin
3.8.0
maven-surefire-plugin
2.22.1
maven-jar-plugin
3.0.2
maven-install-plugin
2.5.2
maven-deploy-plugin
2.8.2
maven-site-plugin
3.7.1
maven-project-info-reports-plugin
3.0.0
.apache.maven.plugins
maven-piler-plugin
8
创建kafaka数据源
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
Properties properties=ne Properties();
//kafka 连接信息
properties.setProperty("bootstrap.servers","11.11.160.158:9092");
properties.setProperty("group.id", "kafka-group");
DataStream stream = env
.addSource(ne FlinkKafkaConsumer<>("kafkaic", ne SimpleStringSchema(), properties));
数据处理
//分组
stream.keyBy(ne KeySelector() {
@Override
public String getKey(String x) thros Exception {
return ;
}
})
//窗口
//.indo(TumblingEventTimeWindos.of(Time.minutes(1)))
.timeWindo(Time.seconds(30))
//数据处理
.process(ne MyProcessHandler())
//自定义数据输出
.addSink(ne MySinkFunction()).name("sinktest");
public static class MyProcessHandler extends ProcessWindoFunction<
String, // input type
Tuple3, // output type
String, // key type
TimeWindo> { // indo type
@Override
public void process(String key, Context context, Iterable events, Collector> out) thros Exception {
out.collect(Tuple3.of(key.toString(), context.indo().getEnd(), max));
}
@Override
public void clear(Context context) thros Exception {
super.clear(context);
}
}
public static class MySinkFunction extends RichSinkFunction> {
@Override
public void open(Configuration parameters) thros Exception {
super.open(parameters);
}
@Override
public void invoke(Tuple3 value, Context context) thros Exception {
System.out.println(value.f1);
System.out.println(context.currentProcessingTime());
System.out.println("-------------------end");
}
}
上传jar包运行任务
java命令窗口可以查看输出,也可以通过管理UI查看任务运行情况。
5、
Flink功能强大,性能优秀,在众多场景下都能发挥出巨大价值,尤其是在处理大规模数据、实时分析、机器学习等领域具有广泛的应用和前景。