您的当前位置:首页正文

[文件格式/数据存储] Parquet:开源、高效的列式存储文件格式协议

2025-02-05 来源:个人技术集锦

序:缘起 => 用 java 读取 parquet 文件

  • 生产环境有设备出重大事故,又因一关键功能无法使用,亟需将生产环境的原始MQTT报文(以 parquet 文件格式 + zstd 压缩格式 落盘)DOWN到本地,读取并解析。

概述:Parquet

  • 摘要:

Apache Parquet是一种高效、灵活且可扩展的列式存储格式,专为大规模数据处理而设计。
它通过列式存储、数据压缩和编码优化,显著提高了数据的读取和写入性能。
Parquet与多种数据处理框架和数据模型兼容,使其成为大数据生态系统中不可或缺的一部分。

什么是 Parquet? Apache Hadoop社区发起的、开源的、大数据量场景的、高效的列式存储、压缩与传输的文件格式协议

  • Parquet 是一个开源的、用于存储和传输大数据的、高效的、基于列式存储的文件格式,它使用一种称为 Columnar数据压缩算法来优化数据压缩和传输。
  • 缘起。Parquet 的灵感来源于2010Google 发表的 Dremel 论文,该论文介绍了一种支持嵌套结构列式存储格式
  • 创始与开发团队。最初由Twitter和Cloudera联合开发,并于2013年捐赠给Apache软件基金会,于2015年5月从 Apache 孵化器毕业,成为 Apache 顶级项目。

Apache 开源项目 Hadoop 创建,并作为 Hadoop 的一部分进行维护。

  • 应用领域。Parquet专为大规模数据处理而设计,支持多种数据处理框架,如: Apache SparkHadoop MapReducePrestoApache Flink等。
  • 优势。Parquet文件格式的优势在于高效的压缩高效的列式存储,使得它在大数据处理中具有很高的性能。

核心特点

  • 列式存储:Parquet以列式格式存储数据。

这意味着每一列的数据被连续存储。这种存储方式使得对特定列的读取操作更加高效,因为只需要读取相关的列数据,而无需扫描整个行。这在处理大规模数据集时尤其有用,尤其是在执行聚合查询和分析时。

  • 数据压缩:Parquet支持多种压缩算法,如:zstdSnappyGzipLZO等。

压缩可以显著减少存储空间的占用,并提高数据的读写速度。通过压缩,数据在磁盘上的存储效率更高,同时在读取时可以更快地加载到内存中。

  • 数据编码:Parquet提供了多种数据编码方式,如字典编码、RLE(Run-Length Encoding)等。这些编码方式可以进一步优化数据的存储和读取性能,尤其是在处理重复数据时。

  • 可扩展性:Parquet文件可以被分割成多个块Row Groups),每个块可以独立读取和处理。

这种设计使得Parquet文件非常适合分布式处理框架,如Apache Spark和Hadoop MapReduce,因为它们可以并行处理文件的不同部分。

  • 与多种数据模型兼容:Parquet支持多种数据模型,包括AvroThriftProtobuf

这意味着您可以使用不同的数据模型来定义和存储数据,而Parquet能够无缝地处理这些数据。

应用场景

Parquet广泛应用于以下领域:

  • 大数据分析:由于其高效的存储和读取性能,Parquet非常适合用于大规模数据分析,如数据仓库、数据湖等。
  • 机器学习:在机器学习中,Parquet可以用于存储和处理训练数据,支持快速的数据加载和特征提取。
  • 实时数据处理:Parquet的列式存储和压缩特性使其在实时数据处理中表现出色,能够快速读取和处理数据。

官方文献

原理分析:Parquet文件格式( File Format)

  • 文件格式:
  • Block(HDFS块):指的是HDFS中的一个块,对于描述这种文件格式,其含义不变。该文件格式设计得能够在HDFS上良好运作。
  • 文件(File):一个必须包含文件元数据的HDFS文件。实际上,它不需要包含数据本身。
  • 行组(Row group):数据在行方向的逻辑分区。对于行组,没有保证其存在物理结构。一个行组由数据集中每个列块组成。
  • 列块(Column chunk):一个特定列的数据块。它们存在于特定的行组中,并且在文件中保证是连续的。
  • 页面(Page)列块被划分成页面。页面在概念上是一个不可分割的最小单元(就压缩编码而言)。在一个列块中可以有多个页面类型,它们交织在一起。
  • 从层次上看,一个文件包含一个或多个行组。一个行组包含每列恰好一个列块列块包含一个或多个页面

Parquet文件 - 行组 - 列块 - 页面

实现框架与客户端

ParquetViewer : 开源的 Parquet 文件查看器

  • ParquetViewer 项目

VSCode + Parquet-Viewer插件

  • url

Parquet-Viewer : Parquet文件在线浏览网站

  • url

org.apache.parquet:*: Java SDK

详情参见本文档:【案例实践】章节,"org.apache.parquet"

Pandas : 集成了 Parquet 的 Python Data Analysis Library

  • python pandas 支持Parquet 格式,与csv 一下简单对比,首先我们测试存储情况
import pandas as pd
df = pd.read_csv('results.csv')

# df.to_parquet('df_test.parquet.zstd', compression='zstd')  
df.to_parquet('df_test.parquet.gzip', compression='gzip')  
df.to_parquet('df_test.parquet.snappy', compression='snappy')  
df.to_parquet('df_test.parquet', compression=None)  

import os
file_size = os.path.getsize('results.csv')
print("results.csv size:",  f'{file_size:,}', "bytes")


file_size = os.path.getsize('df_test.parquet.gzip')
print("df_test.parquet.gzip size:",  f'{file_size:,}', "bytes")

file_size = os.path.getsize('df_test.parquet.snappy')
print("df_test.parquet.snappy size:",  f'{file_size:,}', "bytes")

file_size = os.path.getsize('df_test.parquet')
print("df_test.parquet size:",  f'{file_size:,}', "bytes")

parquet格式的存储,相比 csv 减少80%,结合压缩后是 csv 的 10%,可以节约90% 的存储空间。

  • 为数据加载测试,我们将数据放大100倍。

%time df = pd.read_csv('df_test.csv')
%time df = pd.read_parquet('df_test.parquet.gzip')
%time df = pd.read_parquet('df_test.parquet.snappy')
%time df = pd.read_parquet('df_test.parquet')

使用pandas 加载 dataframe 也有大幅度提升,提升了2-3倍。

案例实践

CASE : 基于 Java SDK 读取Parquet文件

引入依赖

<properties>
	<!-- 1.13.1 / 1.12.0 -->
	<parquet.version>1.13.1</parquet.version>
	<avro.version>1.10.2</avro.version>
	<!-- 3.2.1 / 2.7.3 -->
	<hadoop.version>3.2.1</hadoop.version>
	<!-- 1.5.0-1 / 1.5.5-5 / 与 kafka-clients:1.4-xxx 版本冲突 -->
	<zstd.version>1.5.0-1</zstd.version>
</properties>

<dependency>
	<groupId>org.apache.parquet</groupId>
	<artifactId>parquet-avro</artifactId>
	<version>${parquet.version}</version>
</dependency>
<dependency>
	<groupId>org.apache.parquet</groupId>
	<artifactId>parquet-hadoop</artifactId>
	<version>${parquet.version}</version>
</dependency>

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-mapreduce-client-core</artifactId>
	<version>${hadoop.version}</version>
	<exclusions>
		<exclusion>
			<artifactId>commons-compress</artifactId>
			<groupId>org.apache.commons</groupId>
		</exclusion>
	</exclusions>
</dependency>

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-common</artifactId>
	<version>${hadoop.version}</version>
	<exclusions>
		<exclusion>
			<artifactId>commons-compress</artifactId>
			<groupId>org.apache.commons</groupId>
		</exclusion>
	</exclusions>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-hdfs</artifactId>
	<version>${hadoop.version}</version>
</dependency>

<!-- ztsd
	Zstd 的 RecyclingBufferPool‌ 是一种内存管理机制,用于高效地重用缓冲区,减少内存分配和释放的开销。
	Zstd库中的BufferPool接口提供了多种实现,其中RecyclingBufferPool是一种常见的实现方式。 -->
<dependency>
	<groupId>com.github.luben</groupId>
	<artifactId>zstd-jni</artifactId>
	<!-- 1.5.5-5 / kafka-clients 包冲突 -->
	<version>${zstd.version}</version>
</dependency>

ReadParquetFormatMQTTMessageRawDataDemo

import com.xx.yy.common.utils.DatetimeUtil;
import com.xx.yy.common.utils.FileUtil;
import com.alibaba.fastjson2.JSON;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * 本地读取 Parquet 文件
 * @reference-doc
 *  [1] java读取parquet文件 - https://blog.csdn.net/letterss/article/details/131417952
 *  [2] datax读取Parquet格式文件总列数 - https://blog.csdn.net/letterss/article/details/131189471
 */
@Slf4j
public class ReadParquetFormatMQTTMessageRawDataDemo {
    @SneakyThrows
    public static void main(String[] args) {
        String baseDir = "file:///E:/tmp_data/parquet_mqtt_messages/ods_raw_data_history/";
        String parquetFilePathStr = baseDir + "20081-XXXX-XXXXXXXXXX/e7db5c81e70131d55d4fb4a1752b90f2-1.parquet"; //"output.parquet"

        try {
            // 指定 Parquet 文件路径
            //Path parquetFilePath = new Path("C:\\\\Users\\\\Administrator\\\\Desktop\\\\07fe7433-99c2-41d8-a91d-27a77d99f690-0_4-8-0_20230510103931772.parquet");
            Path parquetFilePath = new Path(parquetFilePathStr);

            //查询总列数,参考博客: https://blog.csdn.net/letterss/article/details/131189471
            int allColumnsCount = getParquetAllColumnsCount(parquetFilePath);
            int columnIndexMax = -1;
            columnIndexMax = allColumnsCount - 1;
            // 创建 ParquetReader.Builder 实例
            ParquetReader.Builder<Group> builder = ParquetReader.builder(new GroupReadSupport(), parquetFilePath);
            // 创建 ParquetReader 实例
            ParquetReader<Group> reader = builder.build();
            // 循环读取 Parquet 文件中的记录
            Group record;

            List<Map<String, Object>> records = new ArrayList<>();

            while ((record = reader.read()) != null) {
                Map<String, Object> recordMap = new HashMap<>();
                // 处理每个记录的逻辑
                for (int i = 0; i <= columnIndexMax; i++) {
                    String fieldKey = record.getType().getType(i).getName(); //record.getType().getFieldName(i);
                    Object fieldValue = record.getValueToString(i, 0);
                    recordMap.put( fieldKey , fieldValue);
                    System.out.println(fieldValue);
                }
                records.add( recordMap );

                //writeMqttMessageRawDataToLocal(recordMap);
            }
            System.out.println(JSON.toJSONString( records ));
            // 关闭读取器
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @SneakyThrows
    public static void writeMqttMessageRawDataToLocal(Map<String, Object> recordMap){
        String targetBaseDir = "E:\\tmp_data\\batch_parse_raw_mqtt_messages\\";
        String hexMqttMessageRawData = (String) recordMap.get("raw_data");
        String deviceId = (String) recordMap.get("device_id");
        Long collectTime = Long.valueOf( (String) recordMap.get("collect_time") );

        String mqttMessageRawDataFile = String.format("%s.%d(%s).hex-bin", deviceId, collectTime, DatetimeUtil.longToString(collectTime, DatetimeUtil.MILLISECOND_TIME_WITH_NUMBER_FORMAT) );
        FileUtil.writeToFile(hexMqttMessageRawData, targetBaseDir + mqttMessageRawDataFile);
        //FileUtils.write();
    }

    /**
	 * 获取 parquet 的 总列数
	 * @reference-doc
	 *   [1] datax读取Parquet格式文件总列数 - https://blog.csdn.net/letterss/article/details/131189471
	 */
    public static int getParquetAllColumnsCount(Path path){
        int columnCount = -1;
        Configuration configuration = new Configuration();
        //Path path = new Path(parquetFilePath);
        try {
            ParquetMetadata metadata = ParquetFileReader.readFooter(configuration, path);
            List<ColumnChunkMetaData> columns = metadata.getBlocks().get(0).getColumns();
            columnCount = columns.size();
            System.out.println("Total column count: " + columnCount);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return columnCount;
    }
}

CASE : Python Pands On Parquet

参见本文档: Pandas 的 描述

CASE : DuckDB 数据库 On Parquet

  • DuckDB 与 Parquet 的集成是无缝的,使得轻松使用SQL进行Parquet 文件上数据进行分析、查询和统计。

  • DuckDB 最大的优势在于能够对庞大的数据集运行查询,无论是具有大量小文件还是一个巨大的文件。使用熟悉的 SQL 语法在个人电脑/笔记本上执行探索性数据分析(EDA)任务,为分析大量数据带来了新的视角。

select 
    home_team    ,count(*),avg(home_score) 
from read_parquet('/home/df_test.parquet.snappy') 
group by home_team

Y FAQ

Q: Apache Parquet、Apache Avro 和 ZSTD 之间的关系和区别是什么?

Apache Parquet、Apache Avro 和 ZSTD 在大数据处理中各自扮演着不同的角色,它们之间的关系和区别如下:

  • Parquet 与 Avro
  • 存储格式:
  • Parquet:列式存储格式,数据按列存储,适合读取密集型操作和分析查询。它在处理大规模数据集时效率更高,尤其是在需要对特定列进行聚合或过滤时。
  • Avro:行式存储格式,数据按行存储,适合写入密集型操作和需要快速序列化/反序列化的场景。它在处理事务性系统或消息序列化时表现更好。
  • 模式管理:
  • Parquet:模式存储在文件尾部,使用Thrift格式,模式演变相对不那么灵活。
  • Avro:模式以JSON格式存储,支持强大的模式演变,包括向后、向前和完全兼容。
  • 压缩与性能:
  • Parquet:支持列级压缩(如Snappy、Gzip、ZSTD等),在分析查询中效率更高。
  • Avro:支持块级压缩(如Snappy、Deflate等),在处理整行数据时效率更高。
  • 使用场景:
  • Parquet:适合数据仓库、大数据分析和批处理。
  • Avro:适合流处理系统(如Apache Kafka)、消息序列化和需要频繁写入的场景。
  • Parquet 与 ZSTD
  • ZSTD 是一种高效的压缩算法,以高压缩率和快速解压缩速度著称。
  • Parquet 是一种文件格式协议,支持多种压缩算法,包括 ZSTD。使用 ZSTD 压缩可以显著减少 Parquet 文件的存储空间,同时保持较高的读取速度。
  • 在实际应用中,ZSTD 压缩在 Parquet 文件中表现出色,尤其是在需要高效存储和快速读取的场景中。
  • 总结
  • Parquet 是一种列式存储格式,适合分析和读取密集型操作,支持多种压缩算法(包括ZSTD)。
  • Avro 是一种行式存储格式,适合写入密集型操作和消息序列化,支持灵活的模式演变。
  • ZSTD 是一种高效的压缩算法,可以与 Parquet 结合使用,以提高存储效率和读取速度。

选择哪种格式取决于您的具体需求:如果需要高效读取和分析数据,Parquet 是更好的选择;如果需要快速写入和序列化数据,Avro 更适合。而 ZSTD 压缩算法可以在 Parquet 文件中提供高效的存储和读取性能。

Q: Parquet 与 Avro 如何结合使用?

Parquet和Avro可以结合使用,以充分利用两者的优势。以下是它们结合使用的方式和场景:

  1. 数据存储与读取
    Avro用于数据写入:Avro是一种基于行的存储格式,适合写入密集型操作,尤其是在需要快速序列化和反序列化数据的场景中(如Kafka)。它支持灵活的模式演变,能够轻松处理数据结构的变化。
    Parquet用于数据读取:Parquet是一种基于列的存储格式,适合读取密集型操作,尤其是在需要对数据进行分析和查询的场景中。它支持高效的列式存储和压缩,能够显著减少I/O操作。

  2. 数据转换
    从Avro到Parquet:在数据湖或数据仓库中,可以将Avro格式的数据转换为Parquet格式。这样可以在保留Avro的写入性能的同时,利用Parquet的高效读取性能。例如,可以使用Apache Spark或Hadoop MapReduce来完成这种转换。
    从Parquet到Avro:在某些需要快速写入的场景中,可以将Parquet格式的数据转换为Avro格式,以便更好地支持流处理。

  3. 混合使用
    结合使用场景:在实际应用中,可以将Avro和Parquet结合使用。例如,可以将较新的数据存储为Avro文件,以便快速写入;而将历史数据转换为Parquet文件,以便高效读取。这种混合使用方式可以在不同的数据处理阶段发挥各自的优势。

  4. 工具支持
    Apache Spark:Apache Spark支持对Avro和Parquet文件的读写操作。可以使用Spark的API将Avro文件转换为Parquet文件,或者在需要时将Parquet文件转换为Avro文件。
    Hadoop MapReduce:Hadoop MapReduce也支持对这两种格式的处理,可以编写MapReduce作业来完成数据格式的转换。

  • 总结
  • Parquet和Avro的结合使用可以充分发挥两者的优势。
  • Avro适合快速写入数据序列化,而Parquet适合高效读取数据分析。通过在不同的数据处理阶段选择合适的格式,可以优化数据存储和处理的性能。

X 参考文献

  • 【参考/推荐】
  • 【参考/推荐】
Configuration conf = new Configuration();
conf.set(Constants.ENDPOINT, "https://s3.eu-central-1.amazonaws.com/");
conf.set(Constants.AWS_CREDENTIALS_PROVIDER,
    DefaultAWSCredentialsProviderChain.class.getName());
// maybe additional configuration properties depending on the credential provider


URI uri = URI.create("s3a://bucketname/path");
org.apache.hadoop.fs.Path path = new Path(uri);

ParquetFileReader pfr = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))
显示全文