作者 | 周来

导读:2018 年初,51 信用卡为了上线一版全新的风控主模型,前后花了将近 3 个月时间,其中大部分时间耗在修复数据偏差上,开发同学苦不堪言,所以对于风控计算能力以及效率的提升,迫在眉睫。

1. 业务背景与痛点

51 信用卡是中国最大的线上信用卡管理平台, 业务涵盖个人负债管理、信用卡科技服务、线上信贷撮合及投资理财服务。目前在 51 人品贷借贷场景,用户需要先导入个人资料,主要包括信用卡账单、运营商、通讯录等数据,然后触发一系列风控模型计算,包括实时的授信出额、A 卡信用评分准入 (要求秒级完成),以及近线的反欺诈审核流程 (要求 30 分钟内完成)。风控模型依托大量变量,这些变量从开发到上线在以前的研发流程如下

  • 风控模型同学先基于离线数据源使用 Hive SQL 挖掘一系列变量,经过建模筛选后将变量需求提给开发同学 ;

  • 开发同学与模型同学一起核对数据源以及变量实现逻辑,将每个变量用 Groovy 代码实现;

  • 在线灰度运行,与离线 Hive T+1 产出的变量以及模型分数做一致性比对,满足较高的一致性和稳定性后才最终发布;

这个过程中主要存在两个问题,严重影响了模型的交付速度与质量:

  • 1、在线 / 离线数据源不一致: 实时数据源的清洗逻辑与离线数据源清洗逻辑不一致,同样是信用卡账单数据源,数据字段的标准和格式并不统一;

  • 2、在线 / 离线变量实现逻辑不一致: 离线 Hive SQL 变量逻辑较复杂,一般会经过多层 ETL 才产出,对应的 Groovy 代码会迅速膨胀,当出现结果变量不一致时, 需要按依赖逐层排查,十分耗时; 在 2018 年初为了上线一版全新的风控主模型,前后花了将近 3 个月时间,其中大部分时间耗在修复数据偏差上,开发同学苦不堪言,所以对于风控计算能力以及效率的提升,迫在眉睫。

2. 技术目标与方案选型

2018 下半年,CTO 启动了"光锥"风控平台项目,旨在从根本上提升金融风控整体迭代效率 (范围包括但不限于变量计算、策略模型部署、变量快速分析、策略仿真实验、变量 / 策略 / 模型监控等)。

光锥首先完成了在线 / 离线数据源的统一,对借贷用户下单时刻的主要风控数据源做关联快照,业务人员基于快照做离线挖掘,离线数据源标准向在线数据源看齐,保持一致。在变量计算方面,为了复用模型人员提供的 HiveSQL,减少代码翻译,我们在反欺诈审核环节引入了基于 SparkSQL 的近线计算引擎 (SparkSQL 基本兼容 HiveSQL),在单机 Local 模式下创建多个 SparkSession 并行处理微批订单,与微服务无缝融合,基本上分钟级别可以完成任务。

但是对于实时性要求较高的授信以及订单准入场景,Spark 处理性能达不到要求,我们需要一个实时的高性能 SQL 计算引擎,而且最好能在一定程度上兼容 HiveSQL。

在方案调研上,首先想到的是内存数据库如 H2、Apache Ignite, 随便拿几个业务 SQL 做了测试,发现稍微复杂点有嵌套的 SQL 语句就解析失败,直接放弃。然后重点放在了目前火热的流计算框架上,SparkStreaming、Flink。

不管是 Spark 还是 Flink,在对流计算的 SQL 支持上都比较弱 (当时 Flink1.4 版本的 Streaming SQL 连 LEFT OUTER JOIN 也不支持)。虽然我们已经在一些风控指标计算场景上线了 SparkStreaming 和 Flink 任务,但是主要的风控变量计算任务并不是一个典型的 Streaming 场景,业务上用户可以先导入信用卡账单进行管理,等 1 个月后再来下单触发风控计算,如果实时拉取数据再加载到 Streaming 系统节点显得多余。

另外在计算过程中通过外部接口获取数据后可以直接进行内存计算,并不需要中间存储,如果单机计算足够快,也不需要引入分布式计算,避免产生额外开销。

后来我们分别调研了 Flink 和 Apache Beam 在 Batch 模式下的单机本地 SQL 执行框架,Flink 里是 DataSet+CollectionEnvironment,Beam 里是 PCollection+DirectRunner,两者都能将 Java 集合数据直接映射为表, 然后联合多表进行 SQL 查询。由于解析完 SQL 就生成执行代码直接运行,没有 Job 生成调度逻辑,也没有像 Spark 里的 Stage 划分,Executor 回收清理等操作,执行效率较高,在我们的真实业务数据集上性能表现非常不错。

Flink 和 Beam 都使用了热门的 SQL 引擎项目 Apache Calcite 进行 SQL 解析和逻辑计划生成,然后实现自己的物理计划以及生成执行代码运行 (在分布式模式下,Beam 会将计划适配代理给指定 Runner 执行)。Calcite 也被多个其他 Apache 项目使用,其中 Hive 和阿里的 MaxCompute 单独使用 Calcite 来做基于代价的优化,Hive 对 Calcite 的使用方式如下:

Hive 通过 Antlr 解析 SQL 生成抽象语法树 (ASTNode),然后翻译成 Calcite 的逻辑计划 (LogicPlan), 基于代价优化完成等价转换,再翻译回 Hive 能处理的 Optimized ASTNode。实际上 Calcite 的 SQL 语法并不兼容 HiveSQL, 在数据类型和操作符 (Operator) 支持上也有很大不同。但是在关系代数 (Relational algebra) 层面是相同的,所以可以完成上述转换。当然 Hive 也有 Local 运行模式,由于生成的执行计划面向 MR 任务,在低延时高性能计算场景,无法满足要求。

虽然 Calcite 并不兼容 HiveSQL, 但它对工业级的 SQL 高级特性有较好的支持 (如复杂类型、窗口函数、表函数等 OLAP 特性),同时它默认内置了一套基于 Linq4j 的本地 SQL 执行框架,用于执行代码生成和运行。我们的目标是计算引擎要在业务上兼容 HiveSQL, 不管是选择 Flink 还是 Beam 都需要对 Calcite 做二次改造和扩展,考虑到 Calcite 本身的简洁设计以及较强的扩展性,我们决定直接基于 Calcite 二次开发,实现一套在业务上兼容 HiveSQL 的本地实时 SQL 计算引擎。

3. 方案设计与实现

先简单介绍下 Calcite 的架构和 SQL 处理流程:

 对于一次 Sql 查询,一般经过以下流程:

  • 先由 Parser 解析生成 SqlNode 节点树 ;

  • 由 Validator 完成节点类型推导以及必要的表达式验证优化;

  • 由 SqlToRelConverter 将 SqlNode 节点树转化为代表逻辑计划的 RelNode;

  • 在查询优化器 (QueryOptimizer) 里内置了 100+ 的转换规则,分别用于逻辑计划优化以及物理计划转换;

  • 生成的物理计划 RelNode 被代码实现器遍历处理,各子节点由对应的实现器生成执行代码,最后组装成一个执行 Class;

Calcite 会为每个 SQL 动态生成一个实现了 Bindable 接口的 Class,然后动态编译创建实例后传入数据集执行计算。这里的数据集可以通过 Schema 进行灵活定义,我们在业务上针对每一次订单请求会创建一份 Schema,存储当前请求处理过程的所有表数据,处理完成后堆内存后面会被回收。

受益于 Calcite 的可插拔架构,大多数项目会使用 Calcite 先生成 LogicPlan,然后添加一系列 rules 来完成第三方物理计划转换与执行代码生成。考虑到研发成本,我们希望尽可能重用 Calcite 默认的物理计划和执行代码生成逻辑。为了支持 HiveSQL 的语法和操作符,我们主要做了以下改造和扩展:

  • 1、替换默认的语法解析器 Parser.jj,支持部分的 Hive 语法:Calcite 使用 JavaCC 来做语法解析,有一套内建的关键字和操作符。我们会按需添加一些业务要用到的关键字,如 rlike、regexp:

select * from t where c1 rlike"^\d+$"
  • 2、优先使用我们自定义的操作符注册表来查找函数:

在计算引擎初始化阶段,先通过扫描 Hive Class,将 Hive 函数自动桥接并添加到注册表,大概有 200+ 的 UDF、30+ 的 UDAF 以及部分常用的 UDTF,另外还有我们数仓自定义的业务函数,在桥接适配中,会保留内建操作符的某些特性,以便查询优化器能正确识别优化;

  • 3、支持操作符类型动态推导与验证:

Calcite 的 SqlValidator 会对操作符节点以及上下文参数做类型推导与验证。但是它的类型系统和 Hive 并不相同,我们需要对两者的类型系统做映射转换才能完成 Hive 操作符的类型推导验证。另外 Hive 函数有个很棒的特性:一个特定函数的参数和返回值可以支持不确定类型,函数内部会完成不兼容类型的强制转换。例如函数:

add_months(start_date,add_months)

start_date 可以是 date 类型,也可以是 string。 我们需要先将 Calcite 数据类型转换为 Hive 的数据类型,然后桥接 Hive 实现动态类型推导验证,再将 Hive 函数返回类型转换回 Calcite 类型。这部分类似 SparkSQL Catalyst 连接 Hive 的处理方式,主要数据类型的映射关系如下:

对于复杂类型,目前我们只按需支持了 Array;

  • 4、为 Hive 操作符实现执行代码:

除了一般函数外,还需要对一些特殊操作符的代码实现器做替换,例如数组取下标:

selectsplit("a,b,c",",")[0]

Calcite 默认索引从 [1] 开始,而 Hive 是从 [0] 开始, 我们为代码实现器的注册表添加了扩展机制,允许在初始化注册后为操作符覆盖或者添加新的实现器。对于 UDF、UDAF、UDTF 等分别实现了通用的代码实现器,可以生成相应的执行代码;

  • 5、隐式类型转换增强:

Calcite 已经具备了一定的隐式类型转换,但是相比 Hive 还不够。前面已经提到 Hive 函数自身能够处理类型转换,但在其他场景,如 Join 时,Hive 支持对不兼容类型做等值连接:

select t1.*,t2.* from t1 join t2 on t1.intVal=t2.stringVal

我们参考 Hive 规则对 HashJoin 算法的 Key 进行了隐式转换。另外在强制转换的过程中遇到无法转换的情况,如 cast("" as int) ,Calcite 会报错,而 Hive 会转换成 null;

在功能基本满足后,我们做了相应的 Benchmark 对比测试,在性能优化上主要做了几点:

  • 1、为相同的 SQL 查询缓存执行对象实例: Calcite 在 1.18.0 版本后对生成的 Class 做到了无状态,所以不用担心线程安全问题;

  • 2、Join 算法优化:

在基本的 Projection、Join、Aggregate 场景和不同量级的数据集上,相同 sql 的执行耗时和 Flink Local 模式差不多,但是对于包含非等值谓语条件的 Join,在 10000*10000 数据集上耗时却相差几十倍,如:

SELECT t1.*,t2.* FROM item1 t1 LEFTOUTERJOIN item2 t2 ON t1.i_item_sk=t2.i_item_sk and t1.i_item_sk <15000

由于包含了谓语条件 t1.i_item_sk <15000,Calcite 对这类 Join 采用 NestedLoopJoin(嵌套循环) 算法实现,没有做优化。而 Flink 和 Spark 在 Shuffle 后数据会自然排序好,所以采用了更高效的 Sort-Merge-Join 算法实现。在我们优化了相应的转换 rule 以及 HashJoin 算法后,此类 Join 性能提升了几十倍 (已提交 patch 给社区)。在真实的业务场景里,SQL 往往较复杂,有多层嵌套,由于可以复用完整的执行实例,我们的计算性能表现比 Flink 的 Local 模式更优。

在 SQL 处理流程图里, 我们的主要扩展点如下:

4. 业务实践与平台化

使用方面,我们参考 Flink 实现了一套简洁的 Table API:

TableEnv tableEnv = HiveTableEnv.getTableEnv();DataTable t1 = tableEnv.fromJavaPojoList(pojoList);DataTable t2 = tableEnv.fromJdbcResultSet(resultSet);tableEnv.addSubSchema("test");tableEnv.registerTable("test","t1",t1);tableEnv.registerTable("test","t2", t2);DataTable queryResult = tableEnv.sqlQuery("select * from test.t1 join test.t2 on t1.id=t2.id");

那么在真实的业务系统里,我们如何利用这个内存 SQL 计算引擎?

在金融风控场景,大多数用户的单数据域的数据量一般在千到万级,所以单条查询的性能基本在百毫秒级或者以内,但是有两类典型场景无法使用纯内存计算:

  • 黑名单匹配: 由于黑名单量较大,而且会频繁更新,无法放入计算引擎管理,我们也不希望引入额外的分布式 Join 方案。针对这个 case,我们引入了外部接口函数,例如:

    isHitBlackMobileList("t1",500)
  • 在运行时会将 t1 临时表的数据传入 isHitBlackMobileList 函数,然后以 500 行每批并行调用外部接口,将匹配到的黑名单 merge 后返回成一个虚拟表,这样做的好处是,名单存储匹配可以由业务方灵活控制,背后可能采用 HBase 或者其他基于布隆过滤器的方案;

  • 关系图谱场景下的关联查询: 和黑名单类似,我们可能需要通过用户的手机号码关联出通讯录里的借贷用户的还款记录,然后计算模型变量。可以先通过外部接口获取对应的数据,背后可能是图数据库查询,最后将未加工的数据放入内存里做 SQL 计算。

为了方便业务人员快速开发变量,我们沉淀了数据源管理和变量计算平台,提供了用于在线开发调试的 SQL IDE, 业务人员能方便地自主部署自己的 SQL 任务,并基于历史数据完成数据校验比对。当 SQL 任务审批发布上线后,配套的变量监控任务也会启动,包括 T+1 的在线 / 离线一致性比对,PSI 以及缺失率等指标监控。

在任务运行时,计算引擎会根据 SQL 提取表名,解析表与表的依赖形成执行 DAG,对于一个 SQL 节点,只要上游依赖表的任务完成,就会被提交到线程池执行。

部署方面,在 51 信用卡,Kubernetes 是通常的应用部署方式,对这类计算型应用,我们开启 HPA 功能进行自动伸缩容,并使用 CPU Manager 特性来保证性能优先的调度策略,这样可以避免因单个任务耗时而引发的排队等待。

经过一段时间试运行和验证,我们于 2019 年初在主业务的 A 卡全新模型上全面使用了自主研发的实时计算引擎,并逐步将其他模型的变量往新的计算平台迁移。在性能表现上,除开数据源拉取,完成一次订单请求的所有变量的计算平均耗时稳定在 1s 以内,而在线 / 离线数据的一致率达到 99% 以上。过去长达 2 个月的一版主模型的迭代周期,现在能缩短到 2 周以内,这从根本上提高了风控模型的迭代效率与质量。

5. 项目开源

为了进一步丰富计算引擎的功能以及适用更多业务场景,我们决定将核心库开源 https://github.com/51nb/marble,另外也提交了部分 issue 和 patch 给 Calcite 社区,有兴趣的同学可以关注。

作者介绍:周来,现负责 51 信用卡风控计算引擎以及风控数据智能分析产品等研发。

声明:本文来自AI前线,版权归作者所有。文章内容仅代表作者独立观点,不代表安全内参立场,转载目的在于传递更多信息。如有侵权,请联系 anquanneican@163.com。