首页
搜索 搜索
聚焦

小米集团基于Apache Doris的OLAP实践

2023-07-31 15:33:41 DataFunTalk
一、系统选型和应用现状

首先来介绍一下小米集团OLAP系统选型与应用现状。


【资料图】

1、系统选型

在小米内部,OLAP引擎主要的应用场景是BI看板和报表分析。早期通过引入Kylin来满足面向主题式的报表分析的需求,当时没有集团层面通用的BI平台,都是各个业务部门自建自己的BI看板。后来小米决定要建立全集团通用的BI平台,Kylin的灵活性就不太够了,我们就需要做一次选型,选择一款在各个业务场景之间更通用的OLAP方案,通过调研我们选择了SparkSQL+Kudu+HDFS这种方案。

计算层使用了SparkSQL,存储层使用了Kudu和HDFS。存储层做了冷热数据的分离,热数据会写入到Kudu,冷数据会存储在HDFS。数据的存储做了按天分区,每天会将Kudu的一部分数据转冷,在晚上集群负载比较低的时候,将数据转储到HDFS。在查询层做了Kudu和HDFS的联合视图,当用户需要查询看板的时候,就通过Spark SQL查询Kudu和HDFS的联合视图,把查询的结果通过看板进行展示。

早期这一方案在一定程度上能够满足小米的需求,但仍然存在一些问题,比如需要维护的组件过多,运维成本相对较高。另外,Spark底层是基于批处理的机制,数据在做Shuffle的时候,中间的结果可能需要落盘,就会导致使用SparkSQL查询时性能相对较低。随着小米内部业务的持续增长,对于OLAP 的需求越来越多,之前的方案在一些场景上已难以满足业务需求,因此我们希望能够选择一款更加优秀的系统来替代之前的SparkSQL+Kudu+HDFS的架构。

经过调研,我们选择了Apache Doris,这个系统是存算一体的,查询性能也相对较好。小米内部在2019年的时候首次引入了Apache Doris,当时的版本还比较老,是一个非向量化的版本,数据在内存里是按照行存的形式来做计算的,所以就是没有很好地使用到CPU的向量化计算的能力。早期这个系统在性能方面也能够满足业务需求,但随着近几年公司业务的快速发展,一些业务对OLAP系统的查询性能提出了更高的要求,非向量化版本的查询性能确实在一些场景上显得捉襟见肘了。

在2022年的时候,我们又进行了一次系统选型,希望能够引入更优秀的OLAP系统。我们调研了ClickHouse,它是一款非常优秀的OLAP系统。我们在小米内部也使用了实际业务进行了测试,测试结果是它的单表查询性能非常好,但在多表查询上还是不能满足我们的业务需求。另外,数据导入方面, ClickHouse没有事务来保证数据写入的原子性。用户如果有一部分数据写入失败并进行了重试,可能最终的数据就会出现部分数据的重复,这种情况在一些业务上是难以接受的,所以我们最终就放弃了 Clickhouse 的方案。

随着Doris向量化版本的发布,且最近几年Apache Doris社区也发展非常迅速,底层的引擎也进行了更新换代,支持了向量化的存储和计算引擎。我们又进行了向量化版本的测试,当时在小米内部使用了Doris 1.1.2 这个版本在实际业务场景进行测试,发现相比之前的0.13这个非向量化版本,查询的性能整体提升有1倍以上,在部分场景上查询性能提升有3- 5倍。最终我们就选择继续留在了Apache Doris上,并推动了Apache Doris向量化版本的上线。

以上就是我们内部系统选型的历史。

2、Apache Doris优势

我们选型使用Apache Doris主要是因为Doris 有以下几个方面的优势:

首先,Doris查询性能比较好,它的底层支持了比较多的索引,能够提升查询效率,用户也可以创建物化视图或者是RollUp 来加速查询。当然在支持了向量化引擎以后,查询的性能就更好了。

其次,我们选择 Doris 是因为支持的是标准的SQL,对用户使用非常友好,用户可以使用 MySQL 客户端直接连接 Doris 来进行查询,用户使用成本低。

Doris 的第三个优势是它不依赖于外部的组件,运维非常简单。另外,它对于分布式的支持比较好,可以很方便地进行集群的扩容和缩容。数据是多副本的存储,如果有副本出现了异常,系统具有副本自动修复的能力。

我们选择 Doris 的最后也是最重要的一个原因,就是它完全开源,社区比较活跃,便于后期的维护和升级。现在Apache Doris 背后也有商业化公司的支持,我们内部线上服务遇到的问题,如果内部不能自己解决,也能够很快得到社区的支持,帮助我们解决。

以上这些就是我们选择Apache Doris的主要原因。

3、应用现状

我们在2019年的时候引入Doris,经过几年的发展,目前 Doris 在小米内部已经得到了非常广泛的应用,比较核心的、较大的业务有几十个,还有数百个小一些的业务。

内部使用Doris比较大的业务,包括用户行为分析、AB实验、用户画像、小米造车、小米有品、新零售、天星数科、广告投放、广告BI和智能制造等等。Doris 的集群在我们内部现在有数十个,其中最大的集群有 99 个 Be 节点, 3 个 Fe 节点,集群的规模比较大。最大的集群每天有几十个流式的数据导入任务,单表每天最大的数据增量有 120 亿,这个集群每天能够承担2万次以上的有效查询。

二、小米数据生态中的Doris

在第二部分,将介绍小米内部数据生态中的Doris。

1、小米BI平台架构

在小米内部,Doris 的一个较大的应用场景是作为BI 平台的底层的数据源。在 BI 平台底层,除了使用 Doris 作为数据源之外,还有MySQL、 Hive 和Iceberg。MySQL 和 Doris 主要面向的是实时分析的业务场景,MySQL 针对的是数据量比较小,但是查询非常敏感,要求查询延迟非常低的业务。相比于MySQL ,Doris 面向的是大数据量的业务,查询的延迟要求比 MySQL 可能会稍微低一些,一般支持的是秒级的查询延迟。Hive 和 Iceberg 主要针对的是数据量更大、离线分析的场景。

在小米的 BI 平台,用户可以直接在语义模型层通过拖拽组件或者是编写 SQL 的方式进行语义建模,模型创建完成之后就可以对数据源进行分析查询,并且把查询的结果在看板上进行可视化展示。语义模型层除了支持拖拽组或编写 SQL 来建模之外,还支持了指标和维度定义、日期转换、函数处理等一些比较复杂的能力。

另外,在语义模型层,也支持了查询的自动加速。用户在创建看板的时候选择了 Hive 或Iceberg作为数据源,同时选择使用 Doris 进行查询加速,那么用户创建好看板之后,就会由平台生成数据同步任务,根据用户看板的数据分析需求,使用 Spark或Presto查询Hive或Iceberg生成中间表,再把中间表数据导入到 Doris 里。用户在查询看板的时候就不用直接通过Presto或Spark来查 Hive 或Iceberg的数据了,可以直接路由到 Doris 查询中间表的数据,进行加速查询。

小米 BI 平台除了支持PC 端看板,也支持了移动端看板。小米内部也引入了Apache Kyuubi组件,主要是尽量对上层的用户屏蔽底层的具体查询引擎。之前每一个查询引擎都是上层用户直连的,引入Kyuubi之后可以提供统一的 SQL 入口,由Kyuubi进行具体引擎的连接。当然对于历史的、直接使用 Doris 的用户场景,还是支持上层业务通过 JDBC 的方式来连接Doris进行分析查询。

这就是Doris在小米 BI 平台的使用情况。

2、数据工场

小米内部使用了数据工场对底层的存储和计算引擎进行了封装。数据工场是小米自研的一款面向数据开发和数据分析人员的数据平台,底层集成了Doris、Iceberg、Hive、MySQL、Kudu等的存储能力,同时也集成了Spark、Flink、Presto等的计算能力,对底层的存储和计算能力进行了统一的封装,提供给集团各个业务来进行数据的存储和计算。数据工场对底层的这些存储和计算引擎也提供了统一的元数据管理、统一的权限管理、数据作业管理以及数据治理的能力。

3、统一元数据管理

对于底层的每一个存储和计算引擎,在数据工场都做了统一的元数据管理,由数据工场给上层的业务提供了统一的元数据视图。比如对于Doris来说,底层的元数据就包括Doris服务信息、集群信息、库信息、表信息和分区信息,在统一元数据管理系统提供的元数据视图就是“doris.集群名.库名.表名.分区名”。通过这种方式对上层服务提供了统一的元数据视图。通过元数据管理对所有的存储资源可以进行统一管理,形成了统一的资源视图,可以对所有的资源变更和访问进行有效的审计。对于每一次Doris集群的查询或写入访问,都可以在元数据管理系统里进行记录和审计。

4、统一权限管理

底层不同的引擎可能具有不同的权限体系,Doris使用的是用户名和密码的方式进行鉴权、小米内部的Hive使用的是Kerberos的方式来鉴权、小米内部的Talos这种自研的消息队列使用的是AKSK这种方式来进行鉴权。

为了对上层用户屏蔽底层不同系统的权限体系,小米在数据工场做了权限的代理,给上层用户提供统一的权限管理能力。在数据工场引入了用户空间这一概念,在用户空间下包含不同的用户,每一个用户都具有不同的角色。owner角色是用户空间的所有者,这个用户空间下发生的所有账单都会挂在 owner 所在的团队下面;Admin角色是用户空间的管理者,可以增加或者是删除用户;Dev这个角色是普通的用户,可以访问这个用户空间下所有的存储和计算的引擎。用户不需要针对每一个引擎使用不同的鉴权体系来进行鉴权,而是由用户空间进行权限的代理,比如用户需要在数据工场查询某一个 Doris 表,不需要自己传入用户名和密码,员工 ID 就会和这个用户空间进行绑定,员工查询的时候会由他所在的用户空间去查询自己所拥有的表的用户名和密码,代替用户进行鉴权。

如果有业务想接入Doris,就需要在数据工场提交建库的申请,他需要选择具体的集群名称,描述自己的业务场景,提供查询的延迟要求,提供预估的数据量。当用户提交了请求之后,Doris运维的同学就会收到用户提交的建库请求,然后针对建库请求里的场景信息,进行审批,同时给用户提供一些使用方面的指导。用户完成建库审批之后,就可以直接在对应的集群上去做建表的操作了。建表的操作可以直接在数据工场来提交,可以通过DDL 的方式写SQL来建表,也可以通过可视化的方式来选择建表。用户建好表之后就可以进行数据的写入和数据查询了。

5、数据作业管理

小米的数据工场也提供了数据作业管理的能力,比如用户数据需要从业务侧写到 Doris ,可以在数据工场直接提交数据写入的作业,用户数据会首先写入到小米自研的消息队列Talos里,再从 Talos写到各个不同的系统里。

如果用户的数据是离线数据,会先写到Hive或者Iceberg里,如果是实时数据,就会经过Flink或Spark做一些简单的处理,再通过Stream Load的方式写到Doris里面来。如果用户的离线数据是在 Hive里,后期想把数据同步到 Doris 里进行分析查询,就可以直接在数据工场提交Broker Load作业,把Hive的表数据通过Broker Load 的方式写入到Doris,也可以通过在数据工场写 FlinkSQL或者是SparkSQL的方式,从 Hive或Iceberg里查数据,然后把查到的数据通过 Stream Load 的方式写入到Doris里。FlinkSQL和SparkSQL底层其实是对Doris的Stream Load数据写入方式进行了封装,通过Stream Load的方式把数据并发地写入到Doris里面。

6、数据治理

小米针对 Doris 也提供了数据治理的能力,包括数据安全管理,数据质量管理,数据成本管理。

数据安全管理方面,我们会定期去扫 Doris 全量集群的库和表,对每一个字段进行安全等级的定义,对于隐私级别高的数据,需要做加密的存储或者在网络层面进行隔离。

数据质量管理,就是针对服务的可用性进行持续地监测和治理,对于一些可用性方面的隐患进行持续跟进,并与不同的用户进行 SLA 的确定和握手。

数据成本管理,主要是做数据的分层存储以及数据生命周期的管理,把常用的数据分区保存在Doris上,把一些历史的、不常访问的分区从 Doris 里删除,这些数据在Hive或者Iceberg上面是有备份的。有了这种数据生命周期的管理,能够极大地降低Doris存储的成本,通过数据治理来实现数据安全、服务可靠和成本降低的目标。

三、小米用户行为分析实践

在第三部分,向大家介绍一下Doris在小米用户行为分析场景的实践。

1、小米用户行为分析平台

小米的用户行为分析平台是一个基于海量数据的一站式、全场景、多模型、多维度、自助式和可视化的分析平台。平台底层可以对接不同的业务数据,所以它是一个通用的分析平台。在集团内部需要做用户行为分析的业务都可以接入到这个平台来进行分析。这个平台目前支持了事件分析、留存分析、漏斗分析、分布分析和路径分析等分析方式。平台用户在小米用户行为分析平台根据分析需求配置查询任务,生成SQL,并下发给Doris引擎执行分析查询,并将查询结果在平台进行可视化展示。

2、事件模型

业务要做行为分析,数据源主要是来自于各个业务在网页或者APP上面的埋点数据。用户在网页或者APP上面的各种操作都会抽象成一个事件的实体,这个事件实体称为事件的模型。小米的用户行为分析平台主要是基于事件模型进行建模。

事件模型主要包含五个要素,分别是Who、When、Where、How和What。Who 表示这次事件是谁触发的,在 Doris 的表中会对应一个字段,即用户ID;When 表示事件触发的时间,在Doris表里对应的字段是一个时间戳,一般会精确到毫秒级别;Where表示事件触发的地点,这个字段在Doris表里对应的字段可能是通过 IP 解析出来的省份或城市;How表示这个事件是通过什么方式来触发的,一般是 APP 的版本号或者浏览器,或者是用户手机的信息等等;What表示这个事件到底是什么类型的事件,比如它是一个 APP 的下载事件,或者是音乐的播放事件,或者是一个点击事件等等。这就是我们进行用户行为分析的一个事件模型。

3、事件分析

在小米业务中,进行用户行为分析最多的方式是事件分析。事件分析也包含了三个要素,分别是事件、指标和维度。平台用户可以针对不同的事件、指标、维度去做灵活的组合。事件就是用户在网页或者是 APP 上面的行为,表示业务的过程。指标是具体的数值,比如页面的访问量、访问的时长等等。要做事件分析,就需要数据的实时采集,事件的实时分析,事件、维度和指标的灵活选择。通过事件分析能够研究到某个行为发生对业务的影响以及影响的程度。

这里举一个简单的例子,在小米的用户行为分析平台,要做事件分析,需要选择具体的分析数据源,这个数据源对应了一张Doris表,然后需要选择事件、维度和指标。如上图中所示,要做事件分析,先选择事件分析的时间,比如5月30号的0点到5月30号的 23 点,针对某一个 APP 按小时来统计下载这个事件触发的用户数。

我们在这个页面上执行事件、维度和指标的选择之后,点击查询,系统中就会生成一条SQL,接着由平台下发SQL到 Doris 系统进行查询,再把查询结果返回给平台,平台再根据结果做可视化的展示。我们可以在图中看到明显的变化趋势。

4、留存分析

小米的用户行为分析平台也支持做留存分析。留存分析是一种用来分析用户参与情况和活跃程度的分析模型,能够考察进入初始行为的用户中有多少人进行了后续的行为。留存分析也是用来衡量产品对于用户价值高低的重要方法。

为了支持留存分析,我们基于Apache Doris 的聚合函数的框架,开发了两个聚合函数,分别用来计算单用户的留存数据和全量用户的留存数据。

单用户的留存函数retention_info(),可以传入以下几个参数:第一个参数start_time,只有在这个时间之后发生的事件,我们才认为是有效的事件。在这个时间点之前的事件都可以忽略;第二个参数是unit,是留存分析的时间单位,现在支持的主要是按天来做留存,所以这个参数可以传入“day”的字符串;第三个参数就是event_time,每一个事件发生的时间可以通过这个参数传进去;第四个参数是 event_type,用来判断传入的这个事件是初次事件,还是留存事件。

前面图片示例中的SQL,在内层查询里是按照distinct_id来做的分组,每一个distinct_id对应的是一个用户,所以内层查询能够计算出每一个用户的留存数据。我们在这个例子里,可以看到传给retention_info的实参,把view作为初次事件,buy作为留存事件,在传入事件类型的时候,初次事件传1,留存事件传2,对于不属于初次事件也不属于留存事件的其它事件,传入的就是0。通过内层查询我们就可以计算出每一个用户实际的留存数据,再结合外层的查询 retention_count()做一次聚合,最终计算出全量用户的留存数据。

上图是小米用户行为分析平台进行留存分析的一个示例,也是需要选择数据源,数据源对应一个具体的业务,在Doris引擎里边对应了一张具体的表,我们需要在这个页面上选择初始行为是什么,后续行为是什么,以及留存的时间是7日的留存还是 14 日的留存,还要选择留存分析的时间范围。在这个实例中,我们选择了最近 14 天,点击查询之后就会生成一条SQL,这条 SQL 就是我们前边看到的,由我们自定义的两个聚合函数来进行留存分析的查询,然后查询下发到Doris引擎来执行,查到的结果返回给用户行为分析平台,进行可视化的展示。

不同颜色的线表示的是初始行为发生的日期,每一条线表示的是初始行为发生之后,在第 0 天、第1天、第2天、第3天分别的留存率。第 0 天就是发生了初始行为的当天就发生了留存事件的用户的留存率,第1天就表示初始行为发生日期的后一天发生了留存事件的用户留存率。

5、漏斗分析

小米的用户行为分析平台支持漏斗分析。漏斗分析是一种用来分析用户行为状态变化从起点到终点各个阶段用户转化率的分析模型。我们可以跟踪漏斗的整个转化过程,在这个转化过程里,都是以用户为单位,将用户的步骤串联起来,并不是把每一个步骤发生的次数做一次简单的计数,需要确保进入到后续步骤中的用户一定是完成了所有的前序步骤。通过漏斗分析,可以了解用户在漏斗的哪一步流失的最多,以及不同类型用户转化的情况。

上图右侧是一个简单的漏斗分析模型。漏斗定义了三个步骤,有 100 个用户触发了步骤1,其中又有 60 个用户触发了步骤2,这60个用户中又有 20 个用户触发了步骤3。我们可以看到,从步骤 1 到步骤 2 用户的转化率是60%,流失率是40%,从步骤 2 到步骤 3 用户的转化率是33%,流失率是67%。完整的漏斗,总体转化率只有20%,总体的流失率达到了80%。

为了支持漏斗分析,我们基于Apache Doris 的聚合函数框架开发了两个聚合函数,分别用来计算单个用户在漏斗的各个阶段的事件数据和全量用户在漏斗各个阶段的转化率。计算单个用户在漏斗各个阶段的事件数据的聚合函数是 funnel_info(),在这个函数中我们可以传入以下几个参数,第一个参数 start_time是我们要进行漏斗分析的起始时间,在这个时间之后的事件才是有效的事件;第二个参数time_window,是有效窗口期,从发生了漏斗第一个步骤的事件开始,在这个窗口期内完成了漏斗所有步骤的用户,才算是完成了一次完整的转化;第三个参数step是我们要传入的事件是漏斗的哪一个步骤的事件;第四个参数是event_time是事件发生的具体时间。

图中的 SQL就是用来做漏斗分析的,使用到了两个聚合函数。内层的查询是按照distinct_id进行了分组,每一个distinct_id对应一个用户。然后我们定义了一个漏斗,包含四个步骤,view作为第一个步骤, open 作为第二个步骤, buy 作为第三个步骤, use 作为第四个步骤。在内层查询里就可以计算出每一个用户在漏斗各个阶段的事件数据,然后将这些用户的事件数据再交给外层查询,由 funnel_count() 这个聚合函数进行聚合分析,最终计算出全量用户在漏斗各个阶段的转化情况。

要使用小米的用户行为分析平台进行漏斗分析,首先需要创建漏斗。创建漏斗就需要选择窗口期,用户触发完第一个步骤之后,只有在这个窗口期内完成整个漏斗的所有步骤才算作是完成了一个完整的转化。创建漏斗还需要定义漏斗的所有步骤,漏斗中至少会包含两个步骤,每一个步骤对应一个事件。创建好漏斗之后,就可以进行漏斗分析了。

在小米的用户行为分析平台,首先需要选择数据源,数据源对应了一个具体的业务,在Doris中对应了一张具体的表;然后可以选择创建好的一个漏斗模型;接着再选择时间,比如进行最近7天的漏斗分析;最后点击查询,会生成一条SQL,SQL使用到了我们自定义的两个聚合函数。平台会把SQL下发给Doris引擎进行查询,平台获取到查询结果之后做一些简单的处理,然后进行可视化展示。

在这个示例中,漏斗包含了四个步骤,分别是曝光、滑动、下载和激活。可以看到,在第一个步骤和第二个步骤之间,用户的转化率是 26.51%;在第二个步骤和第三个步骤之间,用户的转化率是67%;在第三个和第四个步骤之间,用户的转化率是65%。用户的总体转化率是 11.68%。也可以看到在第一个步骤和第二个步骤之间,用户的流失率是最高的。触发了曝光事件之后,有很大一部分用户没有触发滑动这个事件,我们就需要根据计算出来的这些数据去具体地分析用户在这两个阶段之间流失的具体原因,然后对我们的系统或者产品做进一步的优化。漏斗分析为我们的系统优化提供了依据。

6、路径分析

小米用户行为分析平台支持了路径分析,用来分析用户在使用产品时的路径分布的情况,可以洞察到用户全生命周期的行为特征,可以通过可视化用户流全面来了解用户整体的行为路径。通过路径分析,我们可以挖掘出用户频繁访问的路径有哪些,可以寻找用户在单个环节流失的具体原因,为产品优化提供依据。

为了支持路径分析,我们也是基于Apache Doris 的聚合函数框架开发了两个聚合函数,分别用来计算单个用户的路径统计数据和全量用户的路径统计数据。在第一个聚合函数 session_del()中可以传入以下参数:event_name是事件名称;event_time 是事件发生的时间;session_interval 是事件的时间间隔,一个用户连续的两次事件发生的时间如果超过了我们指定的这个时间间隔,就需要把这两个时间从中间切分开,不能把它们作为同一个路径,而是分为两个路径;target_event_name 参数是目标事件,一般会选择一个具体的事件作为路径的起始事件或者是终止事件,就是通过这个参数来设定的;参数is_reverse表示前面选择的target_event_name是路径的起始事件还是路径的终止事件;参数max_level 定义的一个路径最长包含的事件数量。

要去做路径分析,一般会选择一个时间范围,很少对全量数据去做路径分析。在SQL的内层查询里使用distinct_id进行分组,每一个 distinct _id对应的一个用户,通过内层查询就可以计算出单个用户的路径统计数据。然后将所有用户的路径统计数据再交给外层的查询,通过 session_count ()这个聚合函数来做计算,得到全量用户的统计数据。

7、分布分析

小米的用户行为分析平台也支持了分布分析,这种方式是指用户在特定指标下的频次、总额等特征的结构化的分段展现。通过分布分析可以洞察到数据的分布特征,判断极端数值的占比,了解业务的健康程度。

上图就是进行分布分析的一个示例。首先选择具体的数据源,数据源对应一个具体的业务,也是在Doris引擎中对应了一张具体的表;然后选择分布分析的指标和维度,比如时间选择了2023年5月31号到6月1 号这两天的时间范围,我们要进行的是浏览的总次数分布的计算;另外还需要选择数据分布的区间,这里我们使用了默认的区间,用户也可以自定义区间。点击查询就会生成一条SQL,把 SQL 下发到Doris引擎进行查询,然后查询的结果会由小米用户行为分析平台进行一些简单的处理,并进行可视化的展现。

可以看到,在这两天的浏览事件,总次数分布在 10-30 次这个区间里的用户数是最多的,通过这种方式进行分布分析。

四、未来规划

最后简单介绍一下小米未来在Doris方面的规划。

首先,Doris的资源隔离能力不是很好,稳定性还存在一些问题,公共集群上的业务会比较多,可能一个大查询就会占满整个集群资源,影响到其它业务的使用,所以我们希望重点提升 Doris 的稳定性,并且在公共集群上支持大查询的定位和拦截的能力。

另外,小米在部分业务上线了 Doris 的向量化版本,目前内部使用的 Doris 向量化版本是1.1.2,用户反馈总体查询性能提升比较明显,所以后期我们会推动全量的 Doris 集群上线向量化版本。

其次,小米内部对于 OLAP 分析的需求仍在增长,我们还会在小米手机、小米造车等一些核心的业务上来推广Doris。

最后,目前小米内部高性能 OLAP 查询主要还是使用Doris,后面可能还会探索引入一些其它系统来解决不同业务场景的问题,我们需要尽可能对用户屏蔽底层引擎的信息,尽量减少对业务的影响,所以我们还会探索统一SQL入口的解决方案。