本文为 原创技术文章,发表于博客园博客,未经作者本人允许禁止任何形式的转载。
前言
很显然,你应该不至于使用 EntityFramework 直接插入 10W 数据到数据库中,那可能得用上个几分钟。EntityFramework 最被人诟病的地方就是它的性能,处理大量数据时的效率。此种条件下,通常会转回使用 ADO.NET 来完成任务。
但是,如果已经在项目中使用了 EntityFramework,如果碰到需要直接向数据库中插入 10W 的数据的需求,引入 ADO.NET 和 SqlBulkCopy 的组合将打破 EntityFramework 作为 ORM 所带来的优势,我们不得不再次去编写那些 SQL 语句,关注表结构的细节,相应的代码可维护性也在下降。
那么,假设我们将 SqlBulkCopy 的功能封装为 EntityFramework 中的一个扩展方法,通过接口像外暴露 BulkInsert 方法。这样,我们既没有改变使用 EntityFramework 的习惯,同时也隐藏了 SqlBulkCopy 的代码细节,更重要的是,合理的封装演进出复用的可能性,可以在多个 Entity 表中使用。
环境准备
以下测试基于 版本。
首先定义一个 Customer 类:
1 public class Customer2 {3 public long Id { get; set; }4 public string Name { get; set; }5 public string Address { get; set; }6 public string Phone { get; set; }7 }
通过 CustomerMap 类将 Entity 映射到数据库表结构:
1 public class CustomerMap : EntityTypeConfiguration2 { 3 public CustomerMap() 4 { 5 // Primary Key 6 this.HasKey(t => t.Id); 7 8 // Properties 9 this.Property(t => t.Name)10 .IsRequired()11 .HasMaxLength(256);12 13 this.Property(t => t.Phone)14 .IsRequired()15 .HasMaxLength(256);16 17 // Table & Column Mappings18 this.ToTable("Customer", "STORE");19 this.Property(t => t.Id).HasColumnName("Id");20 this.Property(t => t.Name).HasColumnName("Name");21 this.Property(t => t.Address).HasColumnName("Address");22 this.Property(t => t.Phone).HasColumnName("Phone");23 }24 }
我们定义数据库的名字为 “Retail”,则使用 RetailEntities 类来实现 DbContext :
1 public class RetailEntities : DbContext 2 { 3 static RetailEntities() 4 { 5 Database.SetInitializer( 6 new DropCreateDatabaseAlways ()); 7 } 8 9 public RetailEntities()10 : base("Name=RetailEntities")11 {12 }13 14 public DbSet Customers { get; set; }15 16 protected override void OnModelCreating(DbModelBuilder modelBuilder)17 {18 modelBuilder.Configurations.Add(new CustomerMap());19 }20 }
将 DatabaseInitializer 设置为 DropCreateDatabaseAlways,这样我们可以保证每次都针对新表进行测试。
如果需要更复杂的模型,我们将基于如下的模型进行测试:
测试主机
数据库:Microsoft SQL Server 2012 (64-bit)
EntityFramework 插入 10W 数据需要多久
我们先来看下EntityFramework 插入 10W 数据需要多久。
构造 10W 个 Customer 实例:
1 int customerCount = 100000; 2 3 Listcustomers = new List (); 4 for (int i = 0; i < customerCount; i++) 5 { 6 Customer customer = new Customer() 7 { 8 Name = "Dennis Gao" + i, 9 Address = "Beijing" + i,10 Phone = "18888888888" + i,11 };12 customers.Add(customer);13 14 Console.Write(".");15 }
使用如下语法来将上面构造的 10W 数据保存到数据库中:
1 using (RetailEntities context = new RetailEntities())2 {3 foreach (var entity in customers)4 {5 context.Customers.Add(entity);6 }7 context.SaveChanges();8 }
通过 context.SaveChanges() 来保证一次事务提交。
为了计算使用时间,在上面代码的前后加上 Stopwatch 来计算:
1 Stopwatch watch = Stopwatch.StartNew(); 2 3 using (RetailEntities context = new RetailEntities()) 4 { 5 foreach (var entity in customers) 6 { 7 context.Customers.Add(entity); 8 } 9 context.SaveChanges();10 }11 12 watch.Stop();13 Console.WriteLine(string.Format(14 "{0} customers are created, cost {1} milliseconds.", 15 customerCount, watch.ElapsedMilliseconds));
然后运行,
好吧,我应该没有耐心等待它运行完。
现在减少数据量进行测试,将数据数量降低到 500 条,
空表插入500 条数据耗时 5652 毫秒。我多测了几遍,这个数据稳定在 5 秒以上。
将数据量改变到 1000 条,
将数据量改变到 1500 条,
将数据量改变到 10000 条,
那么我们估计下 10W 数据大概需要 10W / 500 * 2 = 至少 400 秒 = 至少 6 分钟。
好吧,慢是毋庸置疑的。
SqlBulkCopy 接口描述
Microsoft SQL Server 提供一个称为 bcp 的流行的命令提示符实用工具,用于将数据从一个表移动到另一个表(表既可以在同一个服务器上,也可以在不同服务器上)。 SqlBulkCopy 类允许编写提供类似功能的托管代码解决方案。 还有其他将数据加载到 SQL Server 表的方法(例如 INSERT 语句),但相比之下 SqlBulkCopy 提供明显的性能优势。
使用 SqlBulkCopy 类只能向 SQL Server 表写入数据。 但是,数据源不限于 SQL Server;可以使用任何数据源,只要数据可加载到 DataTable 实例或可使用 IDataReader 实例读取数据。
将所提供的 数组中的所有行复制到 SqlBulkCopy 对象的 属性指定的目标表中。 | |
将所提供的 中的所有行复制到 SqlBulkCopy 对象的 属性指定的目标表中。 | |
将所提供的 中的所有行复制到 SqlBulkCopy 对象的 属性指定的目标表中。 | |
只将与所提供 中所提供行状态匹配的行复制到 SqlBulkCopy 对象的 属性指定的目标表中。 |
在 .NET 4.5 中还提供了支持 async 语法的接口。
的异步版本,将 数组中提供的所有行都复制到由 SqlBulkCopy 对象的 属性指定的目标表中。 | |
的异步版本,将 中提供的所有行都复制到 SqlBulkCopy 对象的 属性指定的目标表中。 | |
的异步版本,将 中提供的所有行都复制到 SqlBulkCopy 对象的 属性指定的目标表中。 | |
的异步版本,将 数组中提供的所有行都复制到由 SqlBulkCopy 对象的 属性指定的目标表中。取消标记可用于在命令超时超过前请求放弃操作。通过返回的任务对象将报告异常。 | |
的异步版本,之间与 中所提供行状态匹配的行复制到 SqlBulkCopy 对象的 属性中指定的目标表中。 | |
的异步版本,将 中提供的所有行都复制到 SqlBulkCopy 对象的 属性指定的目标表中。取消标记可用于在命令超时超过前请求放弃操作。通过返回的任务对象将报告异常。 | |
的异步版本,将 中提供的所有行都复制到 SqlBulkCopy 对象的 属性指定的目标表中。取消标记可用于在命令超时超过前请求放弃操作。通过返回的任务对象将报告异常。 | |
的异步版本,之间与 中所提供行状态匹配的行复制到 SqlBulkCopy 对象的 属性中指定的目标表中。取消标记可用于在命令超时超过前请求放弃操作。通过返回的任务对象将报告异常。 |
这里,我们选用 DataTable 来构建数据源,将 10W 数据导入 DataTable 中。可以看出,我们需要构建出给定 Entity 类型所对应的数据表的 DataTable,将所有的 entities 数据插入到 DataTable 中。
构建 TableMapping 映射
此时,我并不想手工书写表中的各字段名称,同时,我可能甚至都不想关心 Entity 类到底被映射到了数据库中的哪一张表上。
此处,我们定义一个 TableMapping 类,用于存储一张数据库表的映射信息。
在获取和生成 TableMapping 之前,我们需要先定义和获取 DbMapping 类。
1 internal class DbMapping 2 { 3 public DbMapping(DbContext context) 4 { 5 _context = context; 6 7 var objectContext = ((IObjectContextAdapter)context).ObjectContext; 8 _metadataWorkspace = objectContext.MetadataWorkspace; 9 10 _codeFirstEntityContainer = _metadataWorkspace.GetEntityContainer("CodeFirstDatabase", DataSpace.SSpace);11 12 MapDb();13 }14 }
通过读取 CodeFirstEntityContainer 中的元数据,我们可以获取到指定数据库中的所有表的信息。
1 private void MapDb() 2 { 3 ExtractTableColumnEdmMembers(); 4 5 Listtables = 6 _metadataWorkspace 7 .GetItems(DataSpace.OCSpace) 8 .Select(x => x.GetPrivateFieldValue("EdmItem") as EntityType) 9 .Where(x => x != null)10 .ToList();11 12 foreach (var table in tables)13 {14 MapTable(table);15 }16 }
进而,根据表映射类型的定义,可以获取到表中字段的映射信息。
1 private void MapTable(EntityType tableEdmType) 2 { 3 string identity = tableEdmType.FullName; 4 EdmType baseEdmType = tableEdmType; 5 EntitySet storageEntitySet = null; 6 7 while (!_codeFirstEntityContainer.TryGetEntitySetByName(baseEdmType.Name, false, out storageEntitySet)) 8 { 9 if (baseEdmType.BaseType == null) break;10 baseEdmType = baseEdmType.BaseType;11 }12 if (storageEntitySet == null) return;13 14 var tableName = (string)storageEntitySet.MetadataProperties["Table"].Value;15 var schemaName = (string)storageEntitySet.MetadataProperties["Schema"].Value;16 17 var tableMapping = new TableMapping(identity, schemaName, tableName);18 _tableMappings.Add(identity, tableMapping);19 _primaryKeysMapping.Add(identity, storageEntitySet.ElementType.KeyMembers.Select(x => x.Name).ToList());20 21 foreach (var prop in storageEntitySet.ElementType.Properties)22 {23 MapColumn(identity, _tableMappings[identity], prop);24 }25 }
然后,可以将表信息和字段信息存放到 TableMapping 和 ColumnMapping 当中。
internal class TableMapping{ public string TableTypeFullName { get; private set; } public string SchemaName { get; private set; } public string TableName { get; private set; } public ColumnMapping[] Columns { get { return _columnMappings.Values.ToArray(); } }}
构建 DataTable 数据
终于,有了 TableMapping 映射之后,我们可以开始创建 DataTable 了。
1 private static DataTable BuildDataTable(TableMapping tableMapping) 2 { 3 var entityType = typeof(T); 4 string tableName = string.Join(@".", tableMapping.SchemaName, tableMapping.TableName); 5 6 var dataTable = new DataTable(tableName); 7 var primaryKeys = new List (); 8 9 foreach (var columnMapping in tableMapping.Columns)10 {11 var propertyInfo = entityType.GetProperty(columnMapping.PropertyName, '.');12 columnMapping.Type = propertyInfo.PropertyType;13 14 var dataColumn = new DataColumn(columnMapping.ColumnName);15 16 Type dataType;17 if (propertyInfo.PropertyType.IsNullable(out dataType))18 {19 dataColumn.DataType = dataType;20 dataColumn.AllowDBNull = true;21 }22 else23 {24 dataColumn.DataType = propertyInfo.PropertyType;25 dataColumn.AllowDBNull = columnMapping.Nullable;26 }27 28 if (columnMapping.IsIdentity)29 {30 dataColumn.Unique = true;31 if (propertyInfo.PropertyType == typeof(int)32 || propertyInfo.PropertyType == typeof(long))33 {34 dataColumn.AutoIncrement = true;35 }36 else continue;37 }38 else39 {40 dataColumn.DefaultValue = columnMapping.DefaultValue;41 }42 43 if (propertyInfo.PropertyType == typeof(string))44 {45 dataColumn.MaxLength = columnMapping.MaxLength;46 }47 48 if (columnMapping.IsPk)49 {50 primaryKeys.Add(dataColumn);51 }52 53 dataTable.Columns.Add(dataColumn);54 }55 56 dataTable.PrimaryKey = primaryKeys.ToArray();57 58 return dataTable;59 }
通过 Schema 名称和表名称来构建指定 Entity 类型的 DataTable 对象。
然后将,entities 数据列表中的数据导入到 DataTable 对象之中。
1 private static DataTable CreateDataTable(TableMapping tableMapping, IEnumerable entities) 2 { 3 var dataTable = BuildDataTable (tableMapping); 4 5 foreach (var entity in entities) 6 { 7 DataRow row = dataTable.NewRow(); 8 9 foreach (var columnMapping in tableMapping.Columns)10 {11 var @value = entity.GetPropertyValue(columnMapping.PropertyName);12 13 if (columnMapping.IsIdentity) continue;14 15 if (@value == null)16 {17 row[columnMapping.ColumnName] = DBNull.Value;18 }19 else20 {21 row[columnMapping.ColumnName] = @value;22 }23 }24 25 dataTable.Rows.Add(row);26 }27 28 return dataTable;29 }
SqlBulkCopy 导入数据
终于,数据源准备好了。然后使用如下代码结构,调用 WriteToServer 方法,将数据写入数据库。
1 using (DataTable dataTable = CreateDataTable(tableMapping, entities))2 {3 using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(transaction.Connection, options, transaction))4 {5 sqlBulkCopy.BatchSize = batchSize;6 sqlBulkCopy.DestinationTableName = dataTable.TableName;7 sqlBulkCopy.WriteToServer(dataTable);8 }9 }
看下保存 500 数据的效果,用时 1.9 秒。
看下保存 1W 数据的效果,用时 2.1 秒。
看下保存 10W 数据的效果,用时 7.5 秒。
再试下 100W 数据的效果,用时 27 秒。
封装 BulkInsert 扩展方法
我们可以为 DbContext 添加一个 BulkInsert 扩展方法。
1 internal static class DbContextBulkOperationExtensions 2 { 3 public const int DefaultBatchSize = 1000; 4 5 public static void BulkInsert(this DbContext context, IEnumerable entities, int batchSize = DefaultBatchSize) 6 { 7 var provider = new BulkOperationProvider(context); 8 provider.Insert(entities, batchSize); 9 }10 }
IBulkableRepository 接口
在下面两篇文章中,我介绍了精炼的 IRepository 接口。
1 public interface IRepository2 where T : class3 {4 IQueryable Query();5 void Insert(T entity);6 void Update(T entity);7 void Delete(T entity);8 }
当我们需要扩展 BulkInsert 功能时,可以通过继承来完成功能扩展。
1 public interface IBulkableRepository: IRepository 2 where T : class3 {4 void BulkInsert(IEnumberable entities);5 }
这样,我们就可以使用很自然的方式直接使用 BulkInsert 功能了。
代码在哪里
代码在这里:
参考资料
本文为 原创技术文章,发表于博客园博客,未经作者本人允许禁止任何形式的转载。