前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >得物自建 DTS 平台的技术演进 | 精选

得物自建 DTS 平台的技术演进 | 精选

原创
作者头像
得物技术
发布2023-07-13 17:25:02
3250
发布2023-07-13 17:25:02
举报
文章被收录于专栏:得物技术得物技术

0 前言

DTS 是数据传输平台 (Data Transfer Platform 的缩写)

随着得物 App 的用户流量增长,业务选择的数据库越来越多样化,异构数据源之间的数据同步需求也逐渐增多。为了控制成本并更好地支持业务发展,我们决定自建 DTS 平台。本文主要从技术选型、能力支持与演化的角度出发,分享了在 DTS 平台升级过程中获得的经验,并提供一些参考。

1 技术选型

DTS 的主要目标是支持不同类型的数据源之间的数据交互,包括关系型数据库(RDBMS)、NoSQL 数据库、OLAP 等,同时整合了数据库配置管理、数据订阅、数据同步、数据迁移、DRC 双活数据同步支持、数据巡检、监控报警、统一权限等多个模块,以构建安全、可扩展、高可用的数据架构平台。

1.1 能力对比

图片
图片

1.2 DTS 1.0 - 以 canal/otter/datax 作为执行引擎

图片
图片

1.3 为什么要切换到 Flink?

为了支持多种读端数据源和写端数据源,需要一个统一数据处理框架,以减少重复组件和提高开发效率。同时数据源类型和组件的维护难度与复杂度呈线性增长,现有的组件需要统一维护到一个项目中。

Canal 和 Otter 等组件的社区活跃度低,很长时间没有得到维护更新。因此,需要选择一个新的、活跃的框架。此外,现有组件也无法有效支持全量 + 增量一体化的操作。

因此,使用一个统一的数据处理框架,能够同时支持多种读端数据源和写端数据源,以及全量 + 增量一体化的功能,是必要的。这样能够降低组件的维护难度和复杂度,提高开发效率。

通过 DTS 2.0,我们希望将 canal/otter/datax 演化为一个任务执行框架 + 管理平台,能够为后续大量数据源迭代提速。

1.4 DTS 2.0 以 Flink 作为执行引擎

现有的开发流程:

  • 统一的任务执行框架,集成 flink 并引入 connectors 根据配置组装出具体的 DTS 任务
  • 维护并研发新的 connector

当我们需要支持新的数据源,首先将数据源相关插件维护在 connector 中,接着在执行框架中引入需要的组件,其中存在大量的可复用的功能,这样就做到了 connector 及功能组件复用的效果。

2 DTS 现有能力

图片
图片

3 我们做了什么?

3.1 DTS Connectors 框架 - 数据源支持提速

在 Flink CDC 基础上实现的全量 / 增量任务同步框架,基本的架构如下

图片
图片

其中 Connector 中分别实现了 Flink 提供的 SourceFunction 和 SinkFunction 函数,分别负责从读端读取数据,往写端写入数据,因此一个 Connector 可同时存在于上游或者下游。

任务的启动流程:

- 指定任务 Json 配置,根据类型加载 SourceFunction 和 SinkFunction 构建通用能力函数并启动

a. 任务的 Main 函数如下所示,根据如下的 Json 文件加载到对应的 Connector 中的 SourceFactory 或者 SinkFactory 来构造对应的 DataStream。

DataStream 是 Flink 中提供的数据流操作类

代码语言:javascript
复制
public class Main {    public static void main(String[] args) throws Exception {
        // 解析参数        ParameterTool parameterTool = ParameterTool.fromArgs(args);        String[] parsedArgs = parseArgs(parameterTool);
        Options options = new OptionParser(parsedArgs).getOptions();        options.setJobName(options.getJobName());
        // 执行任务        StreamExecutionEnvironment environment =                EnvFactory.createStreamExecutionEnvironment(options);        exeJob(environment, options);    }

任务 Json 配置:

代码语言:javascript
复制
{  "job":{    "content":{      "reader":{        "name":"binlogreader",        "parameter":{          "accessKey":"",          "binlogOssApiUrl":"",          "delayBetweenRestartAttempts":2000,          "fetchSize":1,          "instanceId":"",          "rdsPlatform":"",          "restartAttempts":5,          "secretKey":"",          "serverTimezone":"",          "splitSize":1024,          "startupMode":"LATEST_OFFSET"        }      },      "writer":{        "name":"jdbcwriter",        "parameter":{          "batchSize":10000,          "concurrentWrite":true,          ],          "dryRun":false,          "dumpCommitData":false,          "errorRecord":0,          "flushIntervalMills":30000,          "poolSize":10,          "retries":3,          "smallBatchSize":200        }      }    },
  }}

b. 我们提供了两个抽象工厂类,SourceFactory, SinkFactory, 其中的 createSource, createSink 便是子工厂需要实现的方法,不同的数据源实现不同。

代码语言:javascript
复制
public abstract class SourceFactory<T> {    public abstract DataStream<T> createSource();}public abstract class SinkFactory<T> {    public abstract void createSink(DataStream<T> rowData) throws Exception;}

c. 接下来,我们只需要实现对应的子工厂方法就可以了

代码语言:javascript
复制
public class BinlogSourceFactory extends AbstractJdbcSourceFactory {    @Override    public DataStream<TableRowData> createSource() {
        List<String> tables = this.binlogSourceConf.getConnection().getTable();        Set<String> databaseList = new HashSet<>(2);
        // 使用对应的Connector构建DataStream    }}

d. 通用能力函数:RateLimitFunction, BinlogPositionFunction 其中分别实现了对应的任务能力,例如限流,任务位点保存等。

代码语言:javascript
复制
public class RateLimiterMapFunction<T> extends RichMapFunction<T, T> {

    private transient FlinkConnectorRateLimiter rateLimiter;

    @Override    public T map(T value) throws Exception {        if (rateLimiterEnabled) {            rateLimiter.acquire(1);        }        return value;    }

当任务所需的函数都创建完成后,任务就真正开始运行了。

收益:

使用一套封装完善且易扩展的框架能够提高开发效率并降低后续代码的维护成本。相比于 DTS1.0、Canal 和 Otter 等项目,该项目的维护成本大大降低,同时提供了更好的扩展性,使得我们能够在短期内支持 PostgreSQL、MongoDB、Hbase、StarRocks 等不同的数据源。

3.2 RDS 日志获取

DTS 通过提供增量和全量同步能力为业务提供数据同步功能,但在增量订阅 / 同步任务执行过程中,可能会遇到一些异常情况。其中,以下三种情况需要特别处理:

  • Binlog 可用性

云厂商的数据库实例本地 binlog 有效期 8 小时,过期部分进行 OSS 备份。MySQL 业务高峰期或者 DDL 变产生大量的 binlog,  DTS 任务尝试获取过期数据失败,任务因此中断。因此,DTS 支持了本地 binlog+OSS 备份 binlog 的获取及切换,保障日志可用性。

  • 数据库 实例主从切换

RDS 经常会发生主备节点切换,在切换的过程中要保证数据不丢。由于切换前后两个数据库实例 Binlog 文件一般都是不一致的,此时任务位点记录方式是 BinlogPosition 模式,则在切换之后任务需要自动进行 Binlog 对齐操作,进而保证数据的完整性。将新数据实例上的位点查询时间戳提前 1-2 分钟即可。

  • 读实例订阅支持

DTS 任务 binlog dump 连接数过多造成主库压力及影响 DDL 变更,因此需要支持读库订阅。云厂商的读库不提供备份,在读库日志过期时需要切换到主库进行读取。

3.3 全量增量一体化功能

图片
图片

全量增量一体化是指先同步存量数据,待存量结束之后再开始同步增量数据。其中也加入了增量阶段的 OSS 备份日志获取。但存量阶段依然存在一些问题,需要进一步改造优化。

全量模式下新增表先进行存量数据同步再进行增量数据同步,该任务中已存在的表会因此导致数据延迟。待新增表数据同步完成,任务延迟则会恢复正常。

3.4 数据源接入 - starrocks, postgres 等

支持从 mysql 同步到 starrocks 和 postgres, 在任务执行框架的基础上,只需要开发 starrocks-connector, postgres connector 支持对应的数据源即可。其中的其他能力,像多表同步、分库分表等场景都可以达到复用的效果。

3.5 JBDC 写入改造

脚本扩展和动态表名路由:

图片
图片

数据合并和多线程写入:

图片
图片

3.6 监控告警

DTS 任务需要采集 flink 任务指标,主要包括任务延迟、各个算子阶段的写入速率,算子被压及使用率等。其中 任务延迟需要接入告警服务,于是我们选择了引入 redis 来缓存任务的延迟时间,再上报到告警服务来完成飞书的消息和电话告警。

4 最佳实践

4.1 0000-00-00 00:00:00 时间戳的问题

MySQL 的时间戳允许为 0000-00-00 00:00:00, 在 Flink 任务中通常会被转换为 null, 导致写入下游数据源失败,因此需要做特殊标记对于不同的数据源做不同的转化保证写入的正切行。

4.2 Flink CDC 任务 serverId 唯一性

Flink CDC source 会伪装成 MySQL slave 节点,为了保证数据的准确性,每个 slave 必须拥有唯一的 serverId 来标记该 slave 的唯一性。因此在 flink cdc 的任务中我们为每一个任务分配了一个唯一的 serverId 区间 (范围区间是为了支持多并行度)。

4.3 Flink 任务数据序列化瓶颈

在 flink 任务中使用 DataStreamAPI 并使用比较复杂的数据结构进行传输时,算子之间的序列化成本较高,两个方向,一是建立更为高效的数据结构进行传输,二是开启 flink 对象复用,并尽可能减少不同并行度之间的数据传输。

5 未来演进

DTS 作为一个数据同步平台主要功能是尽可能提供高效的数据源同步功能,助力于多变的业务场景。

5.1 基于 Flink SQL 的 ETL 任务管理

流式数据处理除了现有的 DataStream API 还存在 SQL 的形式,SQL 作为一种通用的语言,对于数据相关的业务同学极大的降低了学习成本。而通过 Flink SQL 可以做到的 ETL 流式数据加工也能解决一些复杂业务场景的处理逻辑,将业务逻辑转化为 DAG 的流式处理图,通过拖拽的方式也能方便使用,FLINK SQL 的演进方向能够和现有的 Flink DataStream API 互补。

应用场景:ETL 强大的流式数据转换处理能力大幅提升数据集成效率,也能建实时报表体系,提高分析效率,同时也可以应用于一些实时大屏的场景。

5.2 统一技术栈

将现有的 DTS 能力都迁移到 Flink 平台上,保持统一的技术栈,能够极大的降低维护成本。现有遗留的双向同步、数据比对等能力需要做进一步的改造和迁移,符合整体技术收敛的趋势。

6 总结

本文主要分享了以下几个方面:Flink 相比现有的技术栈带来的收益,切换到 Flink 以后的迭代方向及架构功能上的变更、带来新的问题如何解决,以及未来的一些迭代方向,希望能让大家有所收获。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档