Apache SeaTunnel是一款数据集成工具,支持批处理和流处理。它允许用户通过简单的配置实现数据的抽取,转换和加载(ETL过程)。SaeTunnel提供了多种连接器,能够轻松集成不同的数据源项目和目标,包括关系型数据库,NoSQL数据库,文件系统等。其灵活性和扩展性使其成为企业数据集成的重要选择。
本人此次同步涉及到Oracle,PostgresSql, Doris, Mysql
所需jar包有:ojdbc8.jar postgresql-42.3.3.jar mysql-connector-java-8.0.18.jar connector-doris-2.3.8.jar connector-jdbc-2.3.8.jar
DorisSink参数详解:
Name | Type | Required | Default | Description |
---|---|---|---|---|
fenodes | String | Yes | - | Doris 集群 fenodes 地址, 格式是 "fe_ip:fe_http_port, ..." |
query-port | int | No | 9030 | Doris Fenodes mysql协议查询端口 |
username | String | Yes | - | Doris 用户名 |
password | String | Yes | - | Doris 密码 |
database | String | Yes | - | Doris 数据库名称 , 使用 ${database_name} 表示上游数据库名称。 |
table | String | Yes | - | Doris 表名, 使用 ${table_name} 表示上游表名。 |
table.identifier | String | Yes | - | Doris 表的名称,2.3.5 版本后将弃用,请使用 database 和 table 代替。 |
sink.label-prefix | String | Yes | - | stream load导入使用的标签前缀。 在2pc场景下,需要全局唯一性来保证SeaTunnel的EOS语义。 |
sink.enable-2pc | bool | No | false | 是否启用两阶段提交(2pc),默认为 false。 对于两阶段提交,请参考。 |
sink.enable-delete | bool | No | - | 是否启用删除。 该选项需要Doris表开启批量删除功能(0.15+版本默认开启),且仅支持Unique模型。 您可以在此获得更多详细信息 |
sink.check-interval | int | No | 10000 | 加载过程中检查异常时间间隔。 |
sink.max-retries | int | No | 3 | 向数据库写入记录失败时的最大重试次数。 |
sink.buffer-size | int | No | 256 * 1024 | 用于缓存stream load数据的缓冲区大小。 |
sink.buffer-count | int | No | 3 | 用于缓存stream load数据的缓冲区计数。 |
doris.batch.size | int | No | 1024 | 每次http请求写入doris的批量大小,当row达到该大小或者执行checkpoint时,缓存的数据就会写入服务器。 |
needs_unsupported_type_casting | boolean | No | false | 是否启用不支持的类型转换,例如 Decimal64 到 Double。 |
schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式,请参考下面的schema_save_mode |
data_save_mode | Enum | no | APPEND_DATA | 数据保存模式,请参考下面的data_save_mode 。 |
save_mode_create_template | string | no | see below | 见下文。 |
custom_sql | String | no | - | 当data_save_mode选择CUSTOM_PROCESSING时,需要填写CUSTOM_SQL参数。 该参数通常填写一条可以执行的SQL。 SQL将在同步任务之前执行。 |
doris.config | map | yes | - | 该选项用于支持自动生成sql时的insert、delete、update等操作,以及支持的格式。 |
在开启同步任务之前,针对现有的表结构选择不同的处理方案。 选项介绍:RECREATE_SCHEMA
:表不存在时创建,表保存时删除并重建。
CREATE_SCHEMA_WHEN_NOT_EXIST
:表不存在时会创建,表存在时跳过。ERROR_WHEN_SCHEMA_NOT_EXIST
:表不存在时会报错。IGNORE
:忽略对表的处理。
在开启同步任务之前,针对目标端已有的数据选择不同的处理方案。 选项介绍:DROP_DATA
: 保留数据库结构并删除数据。APPEND_DATA
:保留数据库结构,保留数据。CUSTOM_PROCESSING
:用户自定义处理。ERROR_WHEN_DATA_EXISTS
:有数据时报错。
PS:这个参数在本人亲测时DROP_DATA 这个选项并不会生效,原因是在Sink端无权限清空表中数据,感兴趣的可以做一下测试
使用模板自动创建Doris表, 会根据上游数据类型和schema类型创建相应的建表语句, 默认模板可以根据情况进行修改。
默认模板:
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (
${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
UNIQUE KEY (${rowtype_primary_key})
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
)
如果模板中填写了自定义字段,例如添加 id 字段
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
(
id,
${rowtype_fields}
) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key})
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES
(
"replication_num" = "1"
);
连接器会自动从上游获取对应类型完成填充, 并从“rowtype_fields”中删除 id 字段。 该方法可用于自定义字段类型和属性的修改。
可以使用以下占位符:
Doris 数据类型 | SeaTunnel 数据类型 |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT TINYINT |
INT | INT SMALLINT TINYINT |
BIGINT | BIGINT INT SMALLINT TINYINT |
LARGEINT | BIGINT INT SMALLINT TINYINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE FLOAT |
DECIMAL | DECIMAL DOUBLE FLOAT |
DATE | DATE |
DATETIME | TIMESTAMP |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
ARRAY | ARRAY |
MAP | MAP |
JSON | STRING |
HLL | 尚不支持 |
BITMAP | 尚不支持 |
QUANTILE_STATE | 尚不支持 |
STRUCT | 尚不支持 |
支持的格式包括 CSV 和 JSON。
适当增加sink.buffer-size
和doris.batch.size
的值可以提高写性能。
在流模式下,如果doris.batch.size
和checkpoint.interval
都配置为较大的值,最后到达的数据可能会有较大的延迟(延迟的时间就是检查点间隔的时间)。
这是因为最后到达的数据总量可能不会超过doris.batch.size指定的阈值。因此,在接收到数据的数据量没有超过该阈值之前只有检查点才会触发提交操作。因此,需要选择一个合适的检查点间隔。
此外,如果你通过sink.enable-2pc=true
属性启用2pc。sink.buffer-size
将会失去作用,只有检查点才能触发提交。
OracleSource参数详解:
Datasource | Supported Versions | Driver | Url | Maven |
---|---|---|---|---|
Oracle | Different dependency version has different driver class. | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@datasource01:1523:xe |
上述参数与数据库版本相关,使用时请严格对应版本,避免因jar包与数据库版本冲突产生相关问题。
Name | Type | Required | Default | Description |
---|---|---|---|---|
url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:oracle:thin:@datasource01:1523:xe |
driver | String | Yes | - | The jdbc class name used to connect to the remote data source, if you use MySQL the value is oracle.jdbc.OracleDriver . |
user | String | No | - | Connection instance user name |
password | String | No | - | Connection instance password |
query | String | Yes | - | Query statement |
connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete |
partition_column | String | No | - | The column name for parallelism's partition, only support numeric type,Only support numeric type primary key, and only can config one column. |
partition_lower_bound | BigDecimal | No | - | The partition_column min value for scan, if not set SeaTunnel will query database get min value. |
partition_upper_bound | BigDecimal | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. |
partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. default value is job parallelism |
fetch_size | Int | No | 0 | For queries that return a large number of objects,you can configure the row fetch size used in the query toimprove performance by reducing the number database hits required to satisfy the selection criteria. Zero means use jdbc default value. |
properties | Map | No | - | Additional connection configuration parameters,when properties and URL have the same parameters, the priority is determined by the specific implementation of the driver. For example, in MySQL, properties take precedence over the URL. |
# Defining the runtime environment
env {
parallelism = 4
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:oracle:thin:@datasource01:1523:xe"
driver = "oracle.jdbc.OracleDriver"
user = "root"
password = "123456"
query = "SELECT * FROM TEST_TABLE"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
}