针对IEnumerable已经有多篇文章,本篇介绍如何使用IEnumerable实现ETL. ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过萃取(extract)、转置(transform)、加载(load)至目的端的过程。通常来说,从原始端采集的数据有很多问题,同时可能业务需求与采集的数据格式不相匹配,所以就必须实现ETL过程。
ETL可以理解为一条清洗管线,数据从一端流入,从另一端流出。数据量可能很大,所以管线不大可能也没有必要加载全部内容。同时,一般情况下,从管线流出来的数据会进入新的数据池,很少直接修改到原表。
从管线的概念可以看出,ETL需要构造可组合的链条,首先实现一组组件,然后实现可将这些组件组装为一条ETL管线的框架。IEnumerable一大堆的LINQ扩展,正好帮我们实现了这一思想。
1. 数据的表达
我们先讨论清楚如何表达数据,因为数据处理涉及到动态增减属性的问题,因此一般的实体类是做不到的,我们采用字典来实现。为此我包装了一个实现IDictionary<string, object>的类。叫做FreeDocument。它可以简单表示如下:
////// 自由格式文档 /// public interface IFreeDocument : IDictionarySerializable, IDictionary, IComparable { #region Properties IDictionary DataItems { get; set; } IEnumerable PropertyNames { get; } #endregion }
因此数据的处理,本质就是对每一个字典对象中的键值对进行增删改查。
2 .基本组件
数据清洗组件的基接口是ICollumProcess. 定义如下:
public interface ICollumProcess : IDictionarySerializable { string CollumName { get; set; } //针对的列名 bool ShouldCalculated { get; set; } //是否需要重新计算 double Priority { get; set; } //优先级 void Finish(); //处理完成时的回收函数 void Init(IListdatas); //对数据进行初始化的探测行为 }
更清晰的说,其实派生出四部分:
(1) 生成器
生成器即提供/产生数据的组件。这可能包括生成一个从0-1000的数,获取某个数据表中的数据,或从网页检索的结果。它的接口可以表示如下:
[Interface("ICollumGenerator", "数据生成器", SearchStrategy.FolderSearch)] public interface ICollumGenerator : ICollumProcess { ////// 当前迭代的位置 /// int Position { get; set; } IEnumerableGenerate();/// /// 生成器能生成的文档数量 /// ///int? GenerateCount(); }
最主要的方法是Generate,它能够枚举出一组数据出来,同时还有可能(有时做不到)得到能够生成文档的总数量。
(2)过滤器
过滤器即能够分析一个文档是否满足条件,不满足则剔除的组件。接口也很简单:
[Interface("ICollumDataFilter", "数据列过滤器", SearchStrategy.FolderSearch)] public interface ICollumDataFilter : ICollumProcess { bool FilteData(IFreeDocument data); }
(3)排序器
顾名思义,对数据实现排序的接口,定义如下:
[Interface("ICollumDataSorter", "数据排序器", SearchStrategy.FolderSearch)] public interface ICollumDataSorter : IDictionarySerializable, ICollumProcess,IComparer
排序一般需要升序和降序,但排序最大的问题是破坏了管线的单向流动性和虚拟性。最少LINQ的标准实现上,排序是内存排序,因此必须把数据全部加载进来才能排序,这严重影响了性能。因此目前的排序最好在小数据的情况下进行。
(4)列转换器
它最重要的组件。整个ETL过程,实质上就是不同的列进行变换,组成另外一些列的过程(列就是键值对)。 定义实现如下:
[Interface("ICollumDataTransformer", "数据转换器", SearchStrategy.FolderSearch)] public interface ICollumDataTransformer : ICollumProcess { string NewCollumName { get; set; } SimpleDataType TargetDataType { get; set; } ObservableCollectionFilterLogics { get; set; } object TransformData(IFreeDocument datas); IEnumerable AffectedCollums { get; } }
看着很复杂,但其实就是将文档中的一些列转换为另外一些列。比如对一个字符串的列进行正则替换,或转换其数据类型(如从string变成int)。举个最简单的HTML编解码的例子:
public override object TransformData(IFreeDocument document) { object item = document[CollumName]; if (item == null) return ""; switch (ConvertType) { case ConvertType.Decode: return HttpUtility.HtmlDecode(item.ToString()); break; case ConvertType.Encode: return HttpUtility.HtmlEncode(item.ToString()); break; } return ""; }
3. ETL管线的设计
相信你已经想到,ETL管线的核心就是动态组装的LINQ了。
一个最基本的ETL管理类,应当具有以下的属性:
public ObservableCollection<ICollumProcess> CurrentETLTools { get; set; } //当前已经加载的ETL工具
protected List<Type> AllETLTools { get; set; } //所有能够使用的ETL工具。当然Type只是此处为了方便理解而设定的,更合适的应该是记录了组件元数据,名字和介绍的扩展类。
以及一个方法:
public IEnumerable<IFreeDocument> RefreshDatas(IEnumerable<IFreeDocument> docuts) //从原始数据转换为新的数据
那么,这个函数的实现可以如下定义:
public IEnumerableRefreshDatas(IEnumerable docuts) { if (SampleMount <= 0) { SampleMount = 10; } IEnumerable ienumable = docuts.Where(d=>d!=null).Select(d => d.DictSerialize()); Errorlogs = new List (); List samples = docuts.Take((int) SampleMount).Select(d => d as IFreeDocument).ToList(); foreach (ICollumProcess tool in CurrentETLTools.Where(d => d.ShouldCalculated).OrderByDescending(d => d.Priority)) { tool.SourceCollection = CurrentCollection; tool.Init(samples); if (tool is ICollumDataTransformer) { var ge = tool as ICollumDataTransformer; ienumable = Transform(ge, ienumable); } if (tool is ICollumGenerator) { var ge = tool as ICollumGenerator; if (!ge.CanAppend) //直接拼接 ienumable = ienumable.Concat(ge.Generate()); else { ienumable = ienumable.MergeAll(ge.Generate()); } } else if (tool is ICollumDataFilter) { var t = tool as ICollumDataFilter; ienumable = ienumable.Where(t.FilteData); } else if (tool is ICollumDataSorter) { var s = tool as ICollumDataSorter; switch (s.SortType) { case SortType.AscendSort: ienumable = ienumable.OrderBy(d => d, s); break; case SortType.DescendSort: ienumable = ienumable.OrderByDescending(d => d, s); break; } } tool.Finish(); } return ienumable; }
基本实现思路如上。即通过优先级排序所有加载的ETL组件,并提取一部分样例数据,为组件进行一次初始化。然后通过组装不同的转换器,生成器,排序器和过滤器,最后即可组装为一个新的ienumable对象。注意整个过程都是延迟计算的,只有在真正需要ETL结果时才会进行实质性的操作。
4. 优化ETL管线和实现虚拟视图
以上就是ETL的基本思路。但是仅仅做到这些是很不够的。以下才是这篇文章的核心。
ETL管线破坏了原有集合的特性,原有集合可能是能够支持索引查询甚至能够执行高性能查找的。但ETL将其退化为仅能够枚举。枚举意味着只能从头访问到尾,不能回退和索引。要想使用新集合,就只能访问其前n个元素,或者全部访问。这显然对一些操作是很不利的。
先考虑索引器。如果能满足以下条件:
(1) 管线中不包括排序器和过滤器,因为它们使得得集合产生了乱序。
(2) 原始集合能够支持索引器
(3) 使用的生成器能够提供生成的大小,同时生成器也能够实现索引器
(4) 转换器应当只实现1到1转换,没有额外的副作用。
那么原始集合和新集合元素的对应关系是可计算的。此时索引器就能发挥作用。在实际使用中,转换器是用的最多的。条件不可谓不苛刻。
关于高性能查找,我们先不考虑针对复杂的SQL查询,先考虑那种最简单的find(item[key]==value)的查询。但这个条件更加苛刻:
(1) key在原始集合中必须支持高性能查找
(2) 满足上述索引器的四个条件
(3) 针对key这一列的操作,转换器必须是可逆的。而且最好能实现1-1映射。
所谓可逆的意思就是说,转换器能从A转换为B,同时也能通过结果B反推出结果A。 但这种条件何其苛刻!a*5=b,这样的操作是可逆的,然而正则转换,替换以及绝大多数的运算都是不可逆的。
怎么办呢?可能的做法,就是转换器在转换过程中,就动态地将key的转换结果保存下来。于是,对新集合的查找操作,最后就能一步步回退到原始集合的查找操作。还有更好的办法么?
如何让新集合应对复杂的SQL查询?首先需要解析SQL, 这可能涉及到大量的数学推导和转换。以至于在实现当中因为限制太多,基本上不可能实现。以筛选key为一定范围的数据为例,每次都需要逆向推导,这种推导难度非常大。
5. 智能ETL和用户体验优化
整个ETL过程,是人为观察数据的特性,组合和配置不同的ETL组件,这一过程能够实现自动化吗?
人是很智能的,它能够观察不同数据的格式和类型,发现其中的特征,比如以下数据:
高楼层/21层,南垡头翠成馨园,2004年建,塔楼中楼层/5层,南北豆各庄5号院,2003年建,板楼
人通过观察这么两行的数据,就可以大概的判断出这些信息分别代表的是什么意思,以及如何去分割和转换。可以用正则,提取第一个出现的数字,即楼层,再使用\d{4}提取年份,而用逗号分割,即可得到小区名称。
但是,这个操作依旧需要最少懂得一定程序基础的人来参与,如果用机器来做的话,又该如何做呢?自动化步骤可以分为两个层次:
(1) 自动分割和对齐。
数据尤其是来自web的数据,由于本身是由程序生成的,因此在格式上有高度的统一性,同时分隔符也是类似的,包括逗号,分号,空格,斜杠等。因此,可以统计不同分割符出现的次数,以及对应的位置,通过概率模型,生成最可能的分割方案,使得每一条数据分割出来的长度和子项数量尽可能一致。
(2) 自动识别内容
自动识别内容可以依赖于规则或者识别器。一种比较可靠的方法是通过基于正则的文本规则,构造一组规则组。通常200x这样的数值,很容易被理解为年份,而12:32这样的结构,则很容易被识别为时间。通过基于结构的识别引擎,不仅能够识别”这是什么内容“,更能提出其元数据,比如日期中的日月年等信息,为之后的工作做准备。
Web表格最大的好处,在于它的格式一致性。只要分析很少的具有代表性的样例数据,就能够掌握整个数据集的特征。因此完全可以用比较大的代价获得一个尽可能高的识别模块,而在执行过程中尽量提升性能。