2016-08-24 21 views
0

我需要从SqlDataReader中批量加载DataTable。 SqlDataReader将返回数百万条记录,并且DataTable Load()方法将耗尽可用内存。使用Powershell批量加载SQLDataReader中的DataTable

这里是我当前的代码:

[cmdletBinding(DefaultParameterSetName = 'Instance', 
        SupportsShouldProcess = $true, 
        ConfirmImpact = 'High')] 
Param (
    [string] $SrcServer  = "MySQLServer,12345", 
    [string] $SrcDatabase = "SourceDb", 
    [string] $SrcTable  = "dbo.SourceTable", 
    [string] $SrcQuery  = "SELECT TOP 100 HASHBYTES('SHA',stay_number) as stay_number_h, * FROM $SrcTable", 
    [string] $TgtServer, 
    [string] $TgtDatabase = "TargetDb", 
    [string] $TgtTable  = "tmp.TargetTable", 
    [switch] $Truncate  = $true 
) 

Function ConnectionString([string] $ServerName, [string] $DbName) 
{ 
    "Data Source=$ServerName;Initial Catalog=$DbName;Integrated Security=True;Connection Timeout=30" 
} 

########## Main body ############ 
Write-Host "Starting..." 

If ($TgtServer.Length –eq 0) { 
    $TgtServer = $SrcServer 
} 

If ($TgtDatabase.Length –eq 0) { 
    $TgtDatabase = $SrcDatabase 
} 

If ($TgtTable.Length –eq 0) { 
    $TgtTable = $SrcTable 
} 

If ($Truncate) { 
    Write-Host "Truncating $TgtTable" 
    $TruncateSql = "TRUNCATE TABLE " + $TgtTable 
    Sqlcmd -S $TgtServer -d $TgtDatabase -Q $TruncateSql 
} 

$SrcConnStr = ConnectionString $SrcServer $SrcDatabase 
$SrcConn = New-Object System.Data.SqlClient.SQLConnection($SrcConnStr) 
$CmdText = $SrcQuery 
$SqlCommand = New-Object system.Data.SqlClient.SqlCommand($CmdText, $SrcConn) 
$SrcConn.Open() 
[System.Data.SqlClient.SqlDataReader] $SqlReader = $SqlCommand.ExecuteReader() 

# Can we convert the SqlReader to a DataTable? 
$dtSchema = $SqlReader.GetSchemaTable() 
$dt = New-Object System.Data.DataTable 

if ($dtSchema -ne $null) 
{ 
    foreach ($drow in $dtSchema.Rows) 
    { 
     $columnName = $drow["ColumnName"] 
     $column = New-Object System.Data.DataColumn($columnName, $drow["DataType"]) 
     $column.Unique = $drow["IsUnique"] 
     $column.AllowDBNull = $drow["AllowDBNull"] 
     $column.AutoIncrement = $drow["IsAutoIncrement"] 
     $dt.Columns.Add($column) 
    } 
} 

Write-Host "Now loading DataTable" 
for ($i=0;$i -le 10; $i++) { 
    $i 
    $null = $dt.LoadDataRow($SqlReader,$true) 
} 
Write-Host "DataTable filled, how long and check memory consumption!" 

sleep 30 

$datatable.Clear() 

Write-Host "Finished" 

相关链接:

CSV到SQL Server:
https://gallery.technet.microsoft.com/scriptcenter/Import-Large-CSVs-into-SQL-216223d9 https://gallery.technet.microsoft.com/scriptcenter/Import-Large-CSVs-into-SQL-fa339046

SQL服务器到SQL Server:
https://blogs.technet.microsoft.com/heyscriptingguy/2011/05/06/use-powershell-to-copy-a-table-between-two-sql-server-instances/ https://newsqlblog.com/2011/08/12/moving-data-between-sql-servers-with-powershell/ https://raw.githubusercontent.com/RamblingCookieMonster/PowerShell/master/Invoke-SQLBulkCopy.ps1

理想的情况下我的最终解决方案将支持SS和外部文件的导入(实际上任何可以实现为一个DataTable)

回答

0

不PowerShell的,但是这是我如何处理一批来自DB:

它要求符合您的表中的字段类,并在任何浴大小,你喜欢使用的System.Reflection处理数据:

public static void sql_Reader_To_Type(Type t, SqlDataReader r) 
    { 

     List<object> ret = new List<object>(); 
     while (r.Read()) 
     { 

      FieldInfo[] f = t.GetFields(); 
      object o = Activator.CreateInstance(t); 
      for (int i = 0; i < f.Length; i++) 
      { 
       string thisType = f[i].FieldType.ToString(); 
       switch (thisType) 
       { 
        case "System.String": 

         f[i].SetValue(o, Convert.ToString(r[f[i].Name])); 
         break; 
        case "System.Int16": 
         f[i].SetValue(o, Convert.ToInt16(r[f[i].Name])); 
         break; 
        case "System.Int32": 
         f[i].SetValue(o, Convert.ToInt32(r[f[i].Name])); 
         break; 
        case "System.Int64": 
         f[i].SetValue(o, Convert.ToInt64(r[f[i].Name])); 
         break; 
        case "System.Double": 
         // Console.WriteLine("converting " + f[i].Name + " to double"); 
         double th; 
         if (r[f[i].Name] == null) 
         { 
          th = 0; 
         } 
         else 
         { 
          if (r[f[i].Name].GetType() == typeof(DBNull)) 
          { 
           th = 0; 
          } 
          else 
          { 
           th = Convert.ToDouble(r[f[i].Name]); 
          } 
         } 
         f[i].SetValue(o, th); 
         break; 
        case "System.Boolean": 
         f[i].SetValue(o, Convert.ToInt32(r[f[i].Name]) == 1 ? true : false); 
         break; 
        case "System.DateTime": 
         f[i].SetValue(o, Convert.ToDateTime(r[f[i].Name])); 
         break; 
        default: 
         throw new Exception("Missed data type in sql select in getClassMembers class line 73"); 

       } 
      } 
      ret.Add(o); 
      if (ret.Count == 50000) 
      { 
       //process data 
       ret.Clear(); 
      } 

     } 
     if (ret.Count > 0) 
     { 
      //processdata 
     } 


    } 

Create Table (firstName varchar(25), lastName varchar(50), birthday date) 

class person{public string firstName; public string lastName; public DateTime birthday} 

sql_Reader_To_Type(typeof(person), com.ExecuteReader()); 
0

如果你想避免泛型类型和使用数据行:

public static void sql_Reader_To_DataTable_With_Buffer(Type t, SqlDataReader r) 
    { 

     DataTable dt = create_DataTable_From_Generic_Class(t); 
     while (r.Read()) 
     { 

      FieldInfo[] f = t.GetFields(); 
      object[] rowData = new object[dt.Columns.Count]; 
      for (int i = 0; i < f.Length; i++) 
      { 
       string thisType = f[i].FieldType.ToString(); 
       switch (thisType) 
       { 
        case "System.String": 

         rowData[i] = Convert.ToString(r[f[i].Name]); 
         break; 
        case "System.Int16": 
         rowData[i] = Convert.ToInt16(r[f[i].Name]); 
         break; 
        case "System.Int32": 
         rowData[i] = Convert.ToInt32(r[f[i].Name]); 
         break; 
        case "System.Int64": 
         rowData[i] = Convert.ToInt64(r[f[i].Name]); 
         break; 
        case "System.Double": 
         // Console.WriteLine("converting " + f[i].Name + " to double"); 
         rowData[i] = Convert.ToDouble(r[f[i].Name]); 
         break; 
        case "System.Boolean": 
         rowData[i] = Convert.ToInt32(r[f[i].Name]) == 1 ? true : false; 
         break; 
        case "System.DateTime": 
         rowData[i] = Convert.ToDateTime(r[f[i].Name]); 
         break; 
        default: 
         throw new Exception("Missed data type in sql select in getClassMembers class line 73"); 

       } 
      } 
      dt.Rows.Add(rowData); 
      if (dt.Rows.Count == 50000) 
      { 
       //process table (dt); 
       dt.Rows.Clear(); 
      } 

     } 
     if (dt.Rows.Count > 0) 
     { 
      //processTable (dt); 

     } 


    } 
com.CommandText = "select top 10000000000000000 * from myHugeTable"; 
SqlDataReader = com.ExecuteDataReader(); 
sql_Reader_To_DataTable_With_Buffer(typeof(person),read);