您的当前位置:首页正文

使用SeaTunnel进行一键数据同步

2024-11-08 来源:个人技术集锦

一.Apache SeaTunnel

      Apache SeaTunnel是一款数据集成工具,支持批处理和流处理。它允许用户通过简单的配置实现数据的抽取,转换和加载(ETL过程)。SaeTunnel提供了多种连接器,能够轻松集成不同的数据源项目和目标,包括关系型数据库,NoSQL数据库,文件系统等。其灵活性和扩展性使其成为企业数据集成的重要选择。

二.使用方式

2.1.使用SeaTunnel Zeta 引擎

  • 本人此次同步涉及到Oracle,PostgresSql, Doris, Mysql

  • 所需jar包有:ojdbc8.jar postgresql-42.3.3.jar mysql-connector-java-8.0.18.jar connector-doris-2.3.8.jar connector-jdbc-2.3.8.jar

三. 数据同步

DorisSink参数详解:

NameTypeRequiredDefaultDescription
fenodesStringYes-Doris 集群 fenodes 地址, 格式是 "fe_ip:fe_http_port, ..."
query-portintNo9030Doris Fenodes mysql协议查询端口
usernameStringYes-Doris 用户名
passwordStringYes-Doris 密码
databaseStringYes-Doris数据库名称 , 使用 ${database_name} 表示上游数据库名称。
tableStringYes-Doris 表名, 使用 ${table_name} 表示上游表名。
table.identifierStringYes-Doris 表的名称,2.3.5 版本后将弃用,请使用 database 和 table 代替。
sink.label-prefixStringYes-stream load导入使用的标签前缀。 在2pc场景下,需要全局唯一性来保证SeaTunnel的EOS语义。
sink.enable-2pcboolNofalse是否启用两阶段提交(2pc),默认为 false。 对于两阶段提交,请参考。
sink.enable-deleteboolNo-是否启用删除。 该选项需要Doris表开启批量删除功能(0.15+版本默认开启),且仅支持Unique模型。 您可以在此获得更多详细信息
sink.check-intervalintNo10000加载过程中检查异常时间间隔。
sink.max-retriesintNo3向数据库写入记录失败时的最大重试次数。
sink.buffer-sizeintNo256 * 1024用于缓存stream load数据的缓冲区大小。
sink.buffer-countintNo3用于缓存stream load数据的缓冲区计数。
doris.batch.sizeintNo1024每次http请求写入doris的批量大小,当row达到该大小或者执行checkpoint时,缓存的数据就会写入服务器。
needs_unsupported_type_castingbooleanNofalse是否启用不支持的类型转换,例如 Decimal64 到 Double。
schema_save_modeEnumnoCREATE_SCHEMA_WHEN_NOT_EXISTschema保存模式,请参考下面的schema_save_mode
data_save_modeEnumnoAPPEND_DATA数据保存模式,请参考下面的data_save_mode
save_mode_create_templatestringnosee below见下文。
custom_sqlStringno-当data_save_mode选择CUSTOM_PROCESSING时,需要填写CUSTOM_SQL参数。 该参数通常填写一条可以执行的SQL。 SQL将在同步任务之前执行。
doris.configmapyes-该选项用于支持自动生成sql时的insert、delete、update等操作,以及支持的格式。

schema_save_mode[Enum]

在开启同步任务之前,针对现有的表结构选择不同的处理方案。 选项介绍:
RECREATE_SCHEMA :表不存在时创建,表保存时删除并重建。

CREATE_SCHEMA_WHEN_NOT_EXIST :表不存在时会创建,表存在时跳过。
ERROR_WHEN_SCHEMA_NOT_EXIST :表不存在时会报错。
IGNORE :忽略对表的处理。

data_save_mode[Enum]

在开启同步任务之前,针对目标端已有的数据选择不同的处理方案。 选项介绍:
DROP_DATA: 保留数据库结构并删除数据。
APPEND_DATA:保留数据库结构,保留数据。
CUSTOM_PROCESSING:用户自定义处理。
ERROR_WHEN_DATA_EXISTS:有数据时报错。

PS:这个参数在本人亲测时DROP_DATA 这个选项并不会生效,原因是在Sink端无权限清空表中数据,感兴趣的可以做一下测试

save_mode_create_template

使用模板自动创建Doris表, 会根据上游数据类型和schema类型创建相应的建表语句, 默认模板可以根据情况进行修改。

默认模板:

CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (
${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
 UNIQUE KEY (${rowtype_primary_key})
DISTRIBUTED BY HASH (${rowtype_primary_key})
 PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
)

如果模板中填写了自定义字段,例如添加 id 字段

CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
(   
    id,
    ${rowtype_fields}
) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key})
    DISTRIBUTED BY HASH (${rowtype_primary_key})
    PROPERTIES
(
    "replication_num" = "1"
);

连接器会自动从上游获取对应类型完成填充, 并从“rowtype_fields”中删除 id 字段。 该方法可用于自定义字段类型和属性的修改。

可以使用以下占位符:

  • database:用于获取上游schema中的数据库。
  • table_name:用于获取上游schema中的表名。
  • rowtype_fields:用于获取上游schema中的所有字段,自动映射到Doris的字段描述。
  • rowtype_primary_key:用于获取上游模式中的主键(可能是列表)
  • rowtype_unique_key:用于获取上游模式中的唯一键(可能是列表)

数据类型映射

Doris 数据类型SeaTunnel 数据类型
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
TINYINT
INTINT
SMALLINT
TINYINT
BIGINTBIGINT
INT
SMALLINT
TINYINT
LARGEINTBIGINT
INT
SMALLINT
TINYINT
FLOATFLOAT
DOUBLEDOUBLE
FLOAT
DECIMALDECIMAL
DOUBLE
FLOAT
DATEDATE
DATETIMETIMESTAMP
CHARSTRING
VARCHARSTRING
STRINGSTRING
ARRAYARRAY
MAPMAP
JSONSTRING
HLL尚不支持
BITMAP尚不支持
QUANTILE_STATE尚不支持
STRUCT尚不支持
支持的导入数据格式

支持的格式包括 CSV 和 JSON。

调优指南

适当增加sink.buffer-sizedoris.batch.size的值可以提高写性能。

在流模式下,如果doris.batch.sizecheckpoint.interval都配置为较大的值,最后到达的数据可能会有较大的延迟(延迟的时间就是检查点间隔的时间)。

这是因为最后到达的数据总量可能不会超过doris.batch.size指定的阈值。因此,在接收到数据的数据量没有超过该阈值之前只有检查点才会触发提交操作。因此,需要选择一个合适的检查点间隔。

此外,如果你通过sink.enable-2pc=true属性启用2pc。sink.buffer-size将会失去作用,只有检查点才能触发提交。

OracleSource参数详解:

DatasourceSupported VersionsDriverUrlMaven
OracleDifferent dependency version has different driver class.oracle.jdbc.OracleDriverjdbc:oracle:thin:@datasource01:1523:xe

上述参数与数据库版本相关,使用时请严格对应版本,避免因jar包与数据库版本冲突产生相关问题。

Source Options

NameTypeRequiredDefaultDescription
urlStringYes-The URL of the JDBC connection. Refer to a case: jdbc:oracle:thin:@datasource01:1523:xe
driverStringYes-The jdbc class name used to connect to the remote data source,
if you use MySQL the value is oracle.jdbc.OracleDriver.
userStringNo-Connection instance user name
passwordStringNo-Connection instance password
queryStringYes-Query statement
connection_check_timeout_secIntNo30The time in seconds to wait for the database operation used to validate the connection to complete
partition_columnStringNo-The column name for parallelism's partition, only support numeric type,Only support numeric type primary key, and only can config one column.
partition_lower_boundBigDecimalNo-The partition_column min value for scan, if not set SeaTunnel will query database get min value.
partition_upper_boundBigDecimalNo-The partition_column max value for scan, if not set SeaTunnel will query database get max value.
partition_numIntNojob parallelismThe number of partition count, only support positive integer. default value is job parallelism
fetch_sizeIntNo0For queries that return a large number of objects,you can configure
the row fetch size used in the query toimprove performance by
reducing the number database hits required to satisfy the selection criteria.
Zero means use jdbc default value.
propertiesMapNo-Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the
specific implementation of the driver. For example, in MySQL, properties take precedence over the URL.

 Task Example

# Defining the runtime environment
env {
  parallelism = 4
  job.mode = "BATCH"
}
source{
    Jdbc {
        url = "jdbc:oracle:thin:@datasource01:1523:xe"
        driver = "oracle.jdbc.OracleDriver"
        user = "root"
        password = "123456"
        query = "SELECT * FROM TEST_TABLE"
    }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
    Console {}
}

显示全文