2016-07-25 25 views
4

我已经通过Insert 2 million rows into SQL Server quickly链接,发现我可以通过使用批量插入来完成此操作。所以我试图创建数据表(如下面的代码),而是因为这是一个巨大的文件(超过300K行)我得到在我的代码的OutOfMemoryEexception如何从文本文件中读取数百万行并快速插入表格

string line; 
DataTable data = new DataTable(); 
string[] columns = null;  
bool isInserted = false;   

using (TextReader tr = new StreamReader(_fileName, Encoding.Default)) 
{ 
    if (columns == null) 
    { 
     line = tr.ReadLine(); 
     columns = line.Split(','); 
    } 

    for (int iColCount = 0; iColCount < columns.Count(); iColCount++) 
    { 
     data.Columns.Add("Column" + iColCount, typeof(string)); 
    }      

    string[] columnVal; 

    while ((line = tr.ReadLine()) != null) 
    { 
     columnVal = line.Split(','); // OutOfMemoryException throwing in this line 
     data.Rows.Add(columnVal); 
    } 
} 

经过长期的工作,我修改了我的代码为以下,但随后还我收到OutOfMemoryException异常在添加行的时间到数据表

DataTable data = new DataTable(); 
string[] columns = null; 
var line = string.Empty; 
using (TextReader tr = new StreamReader(_fileName, Encoding.Default)) 
{ 
    if (columns == null) 
    { 
     line = tr.ReadLine(); 
     columns = line.Split(','); 
    } 

    for (int iColCount = 0; iColCount < columns.Count(); iColCount++) 
    { 
     data.Columns.Add("Column" + iColCount, typeof(string)); 
    } 
    } 

    // Split the rows in 20000 rows in different list 

    var _fileList = File.ReadLines(_fileName, Encoding.Default).ToList(); 
    var splitChunks = new List<List<string>>(); 
    splitChunks = SplitFile(_fileList, 20000); 

Parallel.ForEach(splitChunks, lstChunks => 
{ 
    foreach (var rows in lstChunks) 
    { 
    string[] lineFields = rows.Split(','); 
    DataRow row = datatbl.NewRow(); 
    for (int iCount = 0; iCount < lineFields.Count(); iCount++) 
    { 
     row[iCount] = lineFields[iCount] == string.Empty ? "" : lineFields[iCount].ToString(); 
    } 
    datatbl.Rows.Add(row); 
    } 
}); 

我能为下一级为下面的代码做批量插入:

SqlConnection SqlConnectionObj = GetSQLConnection(); 
SqlBulkCopy bulkCopy = new SqlBulkCopy(SqlConnectionObj, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null); 
bulkCopy.DestinationTableName = "TempTable"; 
bulkCopy.WriteToServer(data); 

文件包含以下类型的数据

4714,1370的,AUSRICHTEN MASCHINELL

4870,1370,PLATTE STECKEN

0153,1900,填缝枪

0154,1900,新的终结者

0360,1470,MU 186 MACCH。 X LAV。 S/A ASTE PS174

9113-H22,1970,MC钻头

代码需要此转换成6行和3列。

有没有更快的方法来实现上述功能来读取文件并创建用于批量插入的数据表?所以我不应该从索引异常中获取内存。

在此先感谢。

+0

这是否需要以编程方式完成?如果这只是一个关闭,你可以使用SSMS工具 – Mark

+0

你可以分区吗?像读取数千行,批量插入这些数据,然后重新使用数据表为接下来的数千行等等。 – dlatikay

+0

是的,它需要,因为我有更多的代码来根据env从不同的服务器获取文件。 (DEV,TST,PROD)。并有更多的功能。 – Rocky

回答

3

您得到OutOfMemoryException的原因是因为你是 创建一个内存中的数据表,并尝试插入300K行 进去

这是一个很大的数据放在内存。

取而代之,您应该做的是从文本文件中读取的每一定数量的行 - 您需要将其插入到数据库中。

如何做到这一点取决于您,您可以使用SQL或批量复制 - 但请记住,您无法读取整个文本文件并将其保存在内存中,因此请以块为单位。

+0

看来你还在将所有数据加载到一个数据表对象中,如果我错了,请纠正我,因为我找不到你正在实例化datatbl。要么在每个foreach事件中创建一个新的数据表,要么在每次插入后删除parallel.foreach并清除表数据。 – gilmishal

+0

就我个人而言,我发现使用SQL查询为了将数据插入表中可以更简单快捷 - 它也可能更快,因为您正在跳过使用数据表对象 - 但您的方式也可以工作。 – gilmishal

0

我发现忘记了一个DataTable,并且逐行使用普通的旧SQLClient的速度更快。也更简单。这也击败了流式SQL函数,该函数被认为是在SQL Server数据库中获取数据的最快方式。

试试看,并测量速度,看看它是否足够快为你。如果不是的话,你总是可以尝试重新格式化文件(如果需要的话),让SQL Server使用它的Bulk Insert为你做工作。

1

使用SqlBulkCopy.WriteToServerIDataReader的解决方案。我正在使用CSV,但我希望能够轻松修改其他类型。 SqlBulkCopyIDateReader只使用了3件事,我们必须实现它们:

  • public int FieldCount {get; }
  • public bool Read()
  • public object GetValue(int i)

所有其他属性和方法可以是未实现的。有趣的文章约SqlBulkCopy。完整代码:https://dotnetfiddle.net/giG3Ai。这里是切割版本:

namespace SqlBulkCopy 
{ 
    using System; 
    using System.Collections.Generic; 
    using System.IO; 
    using System.Diagnostics; 
    using System.Data; 
    using System.Data.SqlClient; 

    public class CsvReader : IDataReader 
    { 
     private readonly char CSV_DELIMITER = ','; 

     private readonly StreamReader _sr; 
     private readonly Dictionary<string, Func<string, object>> _csv2SqlType; 
     private readonly string[] _headers; 

     private string _line; 
     private string[] _values; 

     public int FieldCount { get { return _headers.Length; } } 

     public CsvReader(string filePath, Dictionary<string, Func<string, object>> csvColumn2SqlTypeDict) 
     { 
      if (string.IsNullOrEmpty(filePath)) 
       throw new ArgumentException("is null or empty", "filePath"); 
      if (!System.IO.File.Exists(filePath)) 
       throw new IOException(string.Format("{0} doesn't exist or access denied", filePath)); 
      if (csvColumn2SqlTypeDict == null) 
       throw new ArgumentNullException("csvColumn2SqlTypeDict"); 

      _sr = new StreamReader(filePath); 
      _csv2SqlType = csvColumn2SqlTypeDict; 
      _headers = ReadHeaders(); 
      ValidateHeaders(); 
     } 
     public object GetValue(int i) 
     { 
      // Get column value 
      var colValue = _values[i]; 
      // Get column name 
      var colName = _headers[i]; 
      // Try to convert to SQL type 
      try { return _csv2SqlType[colName](colValue); } 
      catch { return null; } 
     } 
     public bool Read() 
     { 
      if (_sr.EndOfStream) return false; 

      _line = _sr.ReadLine(); 
      _values = _line.Split(CSV_DELIMITER); 
      // If row is invalid, go to next row 
      if (_values.Length != _headers.Length) 
       return Read(); 
      return true; 
     } 
     public void Dispose() 
     { 
      _sr.Dispose(); 
     } 
     private void ValidateHeaders() 
     { 
      if (_headers.Length != _csv2SqlType.Keys.Count) 
       throw new InvalidOperationException(string.Format("Read {0} columns, but csv2SqlTypeDict contains {1} columns", _headers.Length, _csv2SqlType.Keys)); 
      foreach (var column in _headers) 
      { 
       if (!_csv2SqlType.ContainsKey(column)) 
        throw new InvalidOperationException(string.Format("There is no convertor for column '{0}'", column)); 
      } 
     } 
     private string[] ReadHeaders() 
     { 
      var headerLine = _sr.ReadLine(); 
      if (string.IsNullOrEmpty(headerLine)) 
       throw new InvalidDataException("There is no header in CSV!"); 
      var headers = headerLine.Split(CSV_DELIMITER); 
      if (headers.Length == 0) 
       throw new InvalidDataException("There is no header in CSV after Split!"); 
      return headers; 
     } 
    } 
    public class Program 
    {   
     public static void Main(string[] args) 
     { 
      // Converter from CSV columns to SQL columns 
      var csvColumn2SqlTypeDict = new Dictionary<string, Func<string, object>> 
      { 
       { "int", (s) => Convert.ToInt32(s) }, 
       { "str", (s) => s }, 
       { "double", (s) => Convert.ToDouble(s) }, 
       { "date", (s) => Convert.ToDateTime(s) }, 
      }; 
      Stopwatch sw = Stopwatch.StartNew(); 
      try 
      { 
       // example.csv 
       /*** 
        int,str,double,date 
        1,abcd,2.5,15.04.2002 
        2,dab,2.7,15.04.2007 
        3,daqqb,4.7,14.04.2007 
       ***/ 
       using (var csvReader = new CsvReader("example.csv", csvColumn2SqlTypeDict)) 
       { 
        // TODO!!! Modify to your Connection string 
        var cs = @"Server=localhost\SQLEXPRESS;initial catalog=TestDb;Integrated Security=true"; 
        using (var loader = new SqlBulkCopy(cs, SqlBulkCopyOptions.Default)) 
        { 
         // TODO Modify to your Destination table 
         loader.DestinationTableName = "Test"; 
         // Write from csvReader to database 
         loader.WriteToServer(csvReader); 
        } 
       } 
      } 
      catch(Exception ex) 
      { 
       Console.WriteLine("Got an exception: {0}", ex); 
       Console.WriteLine("Press 'Enter' to quit"); 
       Console.ReadLine(); 
       return; 
      } 
      finally { sw.Stop(); } 
      Console.WriteLine("Data has been written in {0}", sw.Elapsed); 
      Console.WriteLine("Press 'Enter' to quit"); 
      Console.ReadLine(); 
     } 
     private static void ShowCsv(IDataReader dr) 
     { 
      int i = 0; 
      while (dr.Read()) 
      { 
       Console.WriteLine("Row# {0}", i); 
       for (int j = 0; j < dr.FieldCount; j++) 
       { 
        Console.WriteLine("{0} => {1}", j, dr.GetValue(j)); 
       } 
       i++; 
      } 
     } 
    } 
} 
相关问题