[转]如何使用Redis流和Apache Spark处理实时数据?
1. 本文要点
- Apache Spark 的流框架(Structured Streaming)为数据流带来了 SQL 查询功能,让用户可以实时、可扩展地处理数据。
- Redis 流(Redis Stream)是 Redis 5.0 新引入的数据结构,能够以亚毫秒级的延迟高速收集、保存和分发数据。
- 用户集成 Redis 流和流框架后就能简化连续应用程序(continuous application)的扩展工作。
- 开源的 Spark-Redis 库将 Apache Spark 与 Redis 连接起来。该库为 Redis 数据结构提供 RDD 和数据帧 API,使用户可以将 Redis 流用作流框架的数据源。
流框架是 Apache Spark 2.0 新引入的一项功能,在业界和数据工程社区中引起了很大关注。流框架 API 构建于 Spark SQL 引擎之上,为流数据提供类似 SQL 的界面。
早期的 Apache Spark 以微批处理方式处理流框架查询,延迟大约为 100 毫秒。
去年的 2.3 版本引入了低延迟(1 毫秒)的“连续处理”,进一步推动了流框架的应用。
为了让 Spark 保持高速的连续处理状态,你需要使用像 Redis 这样的高速流数据库来支持它。
Redis 开源内存数据库以其高速度和亚毫秒级延迟闻名于世。最近 Redis 5.0 新推出了一种名为 Redis 流的数据结构,使 Redis 能够在多个生产者和消费者之间消费、保存和分发流数据。
现在的问题是,将 Redis 流作为流数据库,Apache Spark 作为数据处理引擎,两者共同部署,怎样才能做到最佳搭配?
用 Scala 编写的 Spark-Redis 库就集成了 Apache Spark 和 Redis,使用它可以:
- 在 Redis 中以 RDD 的形式读写数据
- 在 Redis 中以数据帧的形式读写数据(例如,它允许将 Spark SQL 表映射到 Redis 数据结构)
- 使用 Redis 流作为流框架的数据源
- 在流框架之后将 Redis 实现为接收器
本文中我将介绍一个真实场景,并指导你如何使用 Redis 和 Apache Spark 实时处理流数据。
2. 模拟场景:计算实时点击
假设我们是一家广告公司,在热门网站上投放广告。我们根据社交媒体上的热门图片制作包含流行话题梗的动图,并将其作为广告投放出去。为了最大化利润,我们必须识别出能获得病毒式传播或赢得更多点击次数的资产,这样就能加大它们的投放力度了。
我们的大部分资产传播期很短,所以能实时处理点击的话,我们就能快速生成传播趋势图,这对业务至关重要。我们理想中的流数据解决方案必须记录所有广告点击并实时处理,然后计算每项资产的实时点击次数。以下是设计思路:
2-1. 输入
对于每次点击,我们的数据提取方案(图 1 中的方框 1)将资产 ID 和广告费用放在 Redis 流中:
1 |
XADD clicks * asset [asset id] cost [actual cost] |
例如:
1 |
XADD clicks * asset aksh1hf98qw7tt9q7 cost 29 |
2-2. 输出
在图 1 中的方框 2 部分处理数据之后,我们的结果会存储在数据存储区中。数据查询方案(图 1 中的方框 3)为数据提供了一个 SQL 接口,我们可以用它查询最近几分钟的最高点击次数:
1 2 3 4 5 6 7 |
select asset, count from clicks order by count desc asset count ----------------- ----- aksh1hf98qw7tt9q7 2392 i2dfb8fg023714ins 2010 jsg82t8jasvdh2389 1938 |
3. 构建解决方案
现在我们已经定义好了业务需求,接下来探讨如何使用 Redis 5.0 和 Apache Spark 2.4 构建其解决方案。在本文中我用的是 Scala 编程语言,但你也可以在 Java 或 Python 中使用 Spark-Redis 库。
这张流程图看起来非常简单:首先系统将数据提取到 Redis 流,然后 Redis 流将数据作为 Spark 进程消费,并将结果聚合传回 Redis,最后使用 Spark-SQL 接口在 Redis 中查询结果。
- 数据提取:我选择用 Redis 流提取数据,因为它是 Redis 中的内置数据结构,每秒可处理超过一百万次读写操作。此外它还可以根据时间自动对数据排序,并支持简化数据读取方式的消费者组。Spark-Redis 库支持将 Redis 流作为数据源,因此它完全符合我们对流式数据库使用 Apache Spark 引擎的需求。
- 数据处理:Apache Spark 中的流框架 API 是我们处理数据的绝佳选择,而 Spark-Redis 库使我们能够将到达 Redis 流的数据转换为数据帧。使用流框架时,我们可以用微批处理或 Spark 的连续处理模式运行查询。我们还可以开发一个自定义的“编写器”来将数据写入指定目的地。如图 2 所示,我们将使用哈希数据结构将输出写入 Redis。
- 数据查询:Spark-Redis 库允许你将本机 Redis 数据结构映射为数据帧。我们可以声明一个将列映射到哈希数据结构特定键的“临时表”,并且由于 Redis 的速度非常快,延迟在亚毫秒级别,我们可以使用 Spark-SQL 获得实时查询能力。
之后我将逐个介绍如何开发并运行解决方案的各个组件。在那之前,我们先用适当的工具来初始化开发环境。
4. 寻找合适的开发工具
在我们的示例中,我们将使用 Homebrew 包管理器在 macOS 上下载和安装软件,你也可以根据你操作系统的情况选择其他包管理器。
- Redis 5.0或更高版本: 首先,我们需要在环境中下载并安装 Redis 5.x。旧版本的 Redis 不支持 Redis 流。
在 Homebrew 上,我们用下面的命令安装并启动 Redis 5.0:
1 2 |
$ brew install Redis $ brew services start Redis |
如果你用的还是旧版 Redis,可以用下面的命令升级它:
1 |
$ brew upgrade Redis |
- Apacke Spark 2.3或更高版本: 接下来我们从官方网站下载并安装 Apache Spark,或者使用 Homebrew 安装:
1$ brew install apache-spark - Scala 2.12.8或更高版本:Scala 也是一样的操作:
1$ brew install scala - Apache Maven:我们需要用 Maven 来构建 Spark-Redis 库。
1$ brew install maven - JDK 1.8或更高版本:我们可以使用下面的命令从甲骨文网站或 Homebrew 下载并安装这个 JDK。对于最新版本的 JDK,我们需要用 java 替换 java8。
1$ brew cask install java8 - Spark-Redis库:这是我们解决方案的核心部分,这里从 GitHub 下载库并构建软件包,如下所示:
123$ git clone https://github.com/RedisLabs/spark-redis.git$ cd spark-redis$ mvn clean package -DskipTests
这会在./target/ 目录下加入 spark-redis-<version>-jar-with-dependencies.jar
。在我的设置中这个文件是 spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar
- SBT 1.2.8或更高版本:SBT 是一个 Scala 构建工具,可简化管理和构建 Scala 文件的工作。
1$ brew install sbt - 开发环境:最后该设置文件夹结构并构建文件了。本示例中我们将把程序代码放在“scala”目录下。
12$ mkdir scala$ cd ./scala
使用以下内容创建一个新文件 build.sbt:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
name := "RedisExample" version := "1.0" scalaVersion := "2.12.8" val sparkVersion = "2.4.0" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion, "org.apache.spark" %% "spark-sql" % sparkVersion, "org.apache.spark" %% "spark-catalyst" % sparkVersion ) |
初始化目录。用以下命令初始化包目录:
1 2 3 |
$ mkdir ./src/main/scala/ $ mkdir ./lib $ sbt package |
将spark-redis-<version>-jar-with-dependencies.jar
复制到 lib 目录。
如架构部分所述,我们的解决方案包含三个部分:数据提取组件、Spark 引擎内的数据处理器和数据查询接口。在本节中我将详细说明这三个部分并组合出一个有效的解决方案。
- 提取 Redis 流
Redis 流是一种仅附加数据结构。假设 Apache Spark 的连续处理单元将消费这些数据,我们可以将消息数限制为一百万。稍微修改一下前面提到的命令:
1 |
XADD clicks MAXLEN ~ 1000000 * asset aksh1hf98qw7tt9q7 cost 29 |
大多数流行的 Redis 客户端都支持 Redis 流,因此根据你的编程语言,你可以选择适用 Python 的 redis-py、适用 Java 的 Jedis 或 Lettuce、适用 Node.js 的 node-redis 等等。
- 数据处理
这一部分分为三个小节:
- 从 Redis 流读取和处理数据
- 将结果存储在 Redis 中
- 运行程序
- 从 Redis 流读取数据
要在 Spark 中从 Redis 流读取数据,我们需要明白怎样连接到 Redis,以及 Redis 流中数据的 Schema 结构。
为了连接到 Redis,我们必须为 Redis 创建一个带有连接参数的新 Spark 会话(SparkSession):
1 2 3 4 5 6 7 |
val spark = SparkSession .builder() .appName("redis-example") .master("local[*]") .config("spark.redis.host", "localhost") .config("spark.redis.port", "6379") .getOrCreate() |
设置 Schema 结构时,我们用“clicks”命名流,并为“stream.keys”设置一个“clicks”的选项。由于每个流元素都包含一项资产以及与之相关的成本,因此我们将创建一个包含两个 StructField 的数组的 StructType——一个用于“asset”,另一个用于“cost”,如下所示:
1 2 3 4 5 6 7 8 9 |
val clicks = spark .readStream .format("redis") .option("stream.keys","clicks") .schema(StructType(Array( StructField("asset", StringType), StructField("cost", LongType) ))) .load() |
在第一个程序中我们对每个资产的点击次数感兴趣。为此创建一个数据帧,其中包含按资产计数分组的数据:
1 |
val byasset = clicks.groupBy("asset").count |
最后一步是启动流框架查询:
1 2 3 4 5 |
val query = byasset .writeStream .outputMode("update") .foreach(clickWriter) .start() |
注意这里我们使用自己的 ForeachWriter 将结果写回 Redis。如果要将输出转到控制台,可以将查询写成:
1 2 3 4 5 |
val query = byasset .writeStream .outputMode("update") .format("console") .start() |
对于 连续处理 而言,我们希望在查询中添加’trigger’命令:.trigger(Trigger.Continuous(“1 second”))。trigger 命令不适用于聚合查询,因此我们无法把它插入这个示例。
下面是完整的程序代码。它会从 Redis 流读取新的点击数据并使用 Spark 的流框架 API 处理。如果你想在自己的环境中尝试,请将程序保存在 src/main/scala 下,命名为 ClickAnalysis.scala。(如果你的 Redis 服务器不是在端口 6379 上本地运行的,请根据具体情况设置连接参数。)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
// Program: ClickAnalysis.scala // import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import com.redislabs.provider.redis._ object ClickAnalysis { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("redis-example") .master("local[*]") .config("spark.redis.host", "localhost") .config("spark.redis.port", "6379") .getOrCreate() val clicks = spark .readStream .format("redis") .option("stream.keys","clicks") .schema(StructType(Array( StructField("asset", StringType), StructField("cost", LongType) ))) .load() val byasset = clicks.groupBy("asset").count val clickWriter : ClickForeachWriter = new ClickForeachWriter("localhost","6379") val query = byasset .writeStream .outputMode("update") .foreach(clickWriter) .start() query.awaitTermination() } // End main } //End object |
- 将结果存储在 Redis 中
为了将结果写回 Redis,我们可以开发一个名为 ClickForeachWriter 的自定义 ForeachWriter。它会扩展 ForeachWriter,并使用 Redis 的 Java 客户端 Jedis 连接到 Redis 上。下面是完整的程序代码,保存为 ClickForeachWriter.scala:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
// Program: ClickForeachWriter.scala // import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.Row import redis.clients.jedis.Jedis class ClickForeachWriter(p_host: String, p_port: String) extends ForeachWriter[Row]{ val host: String = p_host val port: String = p_port var jedis: Jedis = _ def connect() = { jedis = new Jedis(host, port.toInt) } override def open(partitionId: Long, version: Long): Boolean = { return true } override def process(record: Row) = { var asset = record.getString(0); var count = record.getLong(1); if(jedis == null){ connect() } jedis.hset("click:"+asset, "asset", asset) jedis.hset("click:"+asset, "count", count.toString) jedis.expire("click:"+asset, 300) } override def close(errorOrNull: Throwable) = { } } |
在这部分程序中有一点需要注意:它将结果存储在哈希数据结构中,其键遵循语法“click:”。我将在本文的最后一节中将此结构转换成数据帧来使用。另一点需要指出的是键的过期时间是完全可选的。上面展示了如何在每次点击被记录时让键的寿命延长五分钟(300 秒)。
- 运行程序
在我们运行之前首先需要编译程序。转到主目录(我们存储 build.sbt 的目录)运行命令:
1 |
$ sbt package |
我们的程序应该能顺利编译通过,没有错误。如果出现了错误,请修复它们并重新运行 sbt 包。编译完成后,在同一目录中运行以下命令来启动程序:
1 2 3 |
spark-submit --class ClickAnalysis --jars ./lib/spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar --master local[*] ./target/scala-2.12/redisexample_2.12-1.0.jar |
如果你不喜欢调试消息,可以停止程序(按 ctrl 加 c)并编辑 /usr/local/Cellar/apache-spark/2.4.0/libexec/conf/(或 log4j.properties 文件存储的目录)下的 log4j.properties,并将 log4j.rootCategory 更改为 WARN,如下所示:
1 |
log4j.rootCategory=WARN, console |
该程序将自动从 Redis 流中提取消息。如果 Redis 流中没有消息,它将异步侦听新消息。我们可以在新的控制台中启动 redis-cli 并向 Redis 流添加一条消息,以测试它是否在正常消费消息:
1 2 |
$ redis-cli redis-cli> XADD clicks * asset test cost 100 |
一切顺利的话,我们应该能在哈希数据结构中读取结果:
1 2 3 4 5 |
redis-cli> hgetall click:test 1) "asset" 2) "test" 3) "count" 4) "1" |
- 查询数据:将 Redis 数据读取为数据帧
我们解决方案的最后一个组件实际上为 Redis 数据提供了一个 SQL 接口。通过 SQL 命令读取数据又是一个两步过程:首先,我们为 Redis 数据定义 SQL schema;其次,我们运行 SQL 命令。
但在此之前,我们需要从主目录上在控制台运行 spark-sql,如下所示:
1 2 |
$ spark-sql --jars ./lib/spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar |
然后会转到 spark-sql 提示符下:
1 |
spark-sql> |
现在我们要为 Redis 哈希数据结构中存储的数据定义 SQL schema。如前所述,我们将每个资产的数据存储在由键:click:表示的哈希数据结构中。哈希结构中还有一个键:count。创建 schema 并将其映射到 Redis 哈希数据结构的命令是:
1 2 |
spark-sql> CREATE TABLE IF NOT EXISTS clicks(asset STRING, count INT) USING org.apache.spark.sql.redis OPTIONS (table 'click') |
此命令创建一个名为“clicks”的新表视图。它使用 Spark-Redis 库中指定的指令将“asset”和“count”列映射到哈希结构中的对应字段。现在我们可以运行查询:
1 2 3 |
spark-sql> select * from clicks; test 1 Time taken: 0.088 seconds, Fetched 1 row(s) |
如果要以编程方式运行 SQL 查询,请参阅 Apache Spark 提供的有关如何使用 ODBC/JDBC 驱动程序连接到 Spark 引擎的文档。
5. 我们的成果是什么?
在本文中,我演示了如何使用 Redis 流作为 Apache Spark 引擎的数据源,介绍了 Redis 流是怎样为流框架用例提供支持的。我还展示了如何使用 Apache Spark 中的数据帧 API 读取 Redis 数据,并融合流框架和数据帧的理念说明了 Spark-Redis 库可以实现的功能。
Redis 流简化了高速收集和分发数据的任务。将其与 Apache Spark 中的流框架相结合,可以支持需要实时计算的各种解决方案,包括物联网、欺诈检测、人工智能和机器学习、实时分析等。
Roshan Kumar 是 Redis Labs 的高级产品经理。他在软件开发和技术领域的产品管理方面拥有丰富的经验。他曾在惠普公司和一些成功的硅谷创业公司工作。他拥有计算机科学学士学位和美国加利福尼亚州圣克拉拉大学的 MBA 学位。