LOADING STUFF...

Apache Spark Delta Lake 写数据使用及实现原理代码解析

–>

Delta Lake 写数据是其最基本的功能,而且其使用和现有的 Spark 写 Parquet 文件基本一致,在介绍 Delta Lake 实现原理之前先来看看如何使用它,具体使用如下:

Delta Lake 写数据原理

前面简单了解了如何使用 Delta Lake 来写数据,本小结我们将深入介绍 Delta Lake 是如何保证写数据的基本原理以及如何保证事务性。

得益于 Apache Spark 强大的数据源 API,我们可以很方便的给 Spark 添加任何数据源,Delta Lake 也不例外。Delta Lake 就是使用 DataSource V1 版本的 API 实现的一种新的数据源,我们调用 df.write.format("delta") 其实底层调用的是 org.apache.spark.sql.delta.sources.DeltaDataSource类。

为了简单起见,本文介绍的是 Delta Lake 批量写的实现,实时流写 Delta Lake 本文不涉及,后面有机会再介绍。

 Delta Lake 批量写扩展了 org.apache.spark.sql.sources.CreatableRelationProvider 特质,并实现了其中的方法。我们调用上面的写数据方法首先会调用 DeltaDataSource 类的 createRelation 方法,它的具体实现如下:

其中 mode 就是保持数据的模式,支持 Append、Overwrite、ErrorIfExists 以及 Ignore 等。parameters 这个传递的参数,比如分区字段、数据保存路径以及 Delta 支持的一些参数(replaceWhere、mergeSchema、overwriteSchema 等,具体参见 org.apache.spark.sql.delta.DeltaOptions);data 就是我们需要保存的数据。

createRelation 方法紧接着就是获取数据保存的路径,分区字段等信息。然后初始化 deltaLog,deltaLog 的初始化会做很多事情,比如会读取磁盘所有的事务日志(_delta_log 目录下),并构建最新事务日志的最新快照,里面可以拿到最新数据的版本。由于 deltaLog 的初始化成本比较高,所以 deltaLog 初始化完之后会缓存到 deltaLogCache 中,这是一个使用 Guava 的 CacheBuilder 类实现的一个缓存,缓存的数据保持一小时,缓存大小可以通过 delta.log.cacheSize 参数进行设置。只要写数据的路径是一样的,就只需要初始化一次 deltaLog,后面直接从缓存中拿即可。除非之前缓存的 deltaLog 被清理了,或者无效才会再次初始化。DeltaLog 类是 Delta Lake 中最重要的类之一,涉及的内容非常多,所以我们会单独使用一篇文章进行介绍。

紧接着初始化 WriteIntoDelta,WriteIntoDelta 扩展自 RunnableCommand,Delta Lake 中的更新、删除、合并都是扩展这个类的。初始化完 WriteIntoDelta 之后,就会调用 run 方法执行真正的写数据操作。WriteIntoDelta 的 run 方法实现如下:

Delta Lake 所有的更新操作都是在事务中进行的,deltaLog.withNewTransaction 就是一个事务,withNewTransaction 的实现如下:

在开启事务之前,需要更新当前表事务的快照,因为在执行写数据之前,这张表可能已经被修改了,执行 update 操作之后,就可以拿到当前表的最新版本,紧接着开启乐观事务锁。thunk(txn) 就是需要执行的事务操作,对应 deltaLog.withNewTransaction 里面的所有代码。

我们回到上面的 run 方法。val actions = write(txn, sparkSession) 就是执行写数据的操作,它的实现如下:

如果 txn.readVersion == -1,说明是第一次写数据到 Delta Lake 表,所以当这个值大于 -1 的时候,需要判断一下写数据的操作是否合法。

由于 Delta Lake 底层使用的是 Parquet 格式,所以 Delta Lake 表也支持模式的增加合并等,这就是 updateMetadata 函数对应的操作。

因为 Delta Lake 表支持分区,所以我们可能在写数据的时候指定某个分区进行覆盖。

真正写数据的操作是 txn.writeFiles 函数执行的,具体实现如下:

Delta Lake 写操作最终调用 Spark 的 FileFormatWriter.write 方法进行的,通过这个方法的复用将我们真正的数据写入到 Delta Lake 表里面去了。

在 Delta Lake 中,如果是新增文件则会在事务日志中使用 AddFile 类记录相关的信息,AddFile 持久化到事务日志里面的内容如下:

可以看出 AddFile 里面记录了新增文件的保存路径,分区信息,新增的文件大小,修改时间等信息。如果是删除文件,也会在事务日志里面记录这个删除操作,对应的就是使用 RemoveFile 类存储,RemoveFile 持久化到事务日志里面的内容如下:

RemoveFile 里面保存了删除文件的路径,删除时间等信息。如果新增一个文件,再删除一个文件,那么最新的事务日志快照里面只会保存删除这个文件的记录。从这里面也可以看出, Delta Lake 删除、新增 ACID 是针对文件级别的。

上面的写操作肯定会产生新的文件,所以写操作之后就需要拿到新增的文件(val newFiles = txn.writeFiles(data, Some(options)) )newFiles(AddFile) 和需要删除的文件(RemoveFile)。针对那些文件需要删除需要做一些判断,主要分两种情况(具体参见 write 方法里面的):

  • 如果是全表覆盖,则直接从缓存在内存中最新的事务日志快照中拿出所有 AddFile 文件,然后将其标记为 RemoveFile;

  • 如果是分区内的覆盖,则从缓存在内存中最新的事务日志快照中拿出对应分区下的 AddFile 文件,然后将其标记为 RemoveFile。

最后 write 方法返回新增的文件和需要删除的文件(newFiles ++ deletedFiles),这些文件最终需要记录到事务日志里面去。关于事务日志是如何写进去的请参见这篇文章的详细分析。


新福利:

从9月11日开始至10月15日截止,一共五周时间,每周二我会从公众号底部留言互动+分享+再看综合最多的读者中抽取一名读者,免费包邮送实体新书《HBase原理与实践》,留言互动起来吧~

猜你喜欢

1、

2、

3、

4、

本文来源 互联网收集,文章内容系作者个人观点,不代表 本站 对观点赞同或支持。如需转载,请注明文章来源,如您发现有涉嫌抄袭侵权的内容,请联系本站核实处理。

© 版权声明

相关文章