2014-01-26 104 views
4

我想要使用xml-conduit,特别是Text.XML.Stream.Parse,以便从大型XML文件中懒惰地提取对象列表。Streaming xml-conduit解析结果

作为测试用例,我使用了the recently re-released StackOverflow data dumps。为了简单起见,我打算从stackoverflow.com-Users.7z中提取所有用户名。即使该文件是一个.7zfile说,这是刚刚的bzip2压缩数据(有可能是在文件的结尾部分7zip的东西,但现在我不关心)。

的XML的简化版本将

<users> 
    <row id="1" DisplayName="StackOverflow"/> 
    ... 
    <row id="2597135" DisplayName="Uli Köhler"/> 
    ... 
</users> 

基于this previous Q&A和示例on Hackage流读取示例XML在BZ2-ED的形式完美的作品对我来说

然而,在使用runghc时运行下面的程序,它运行时不打印任何输出:

{-# LANGUAGE OverloadedStrings #-} 
import Data.Conduit (runResourceT, ($$), ($=)) 
import qualified Data.Conduit.Binary as CB 
import Data.Conduit.BZlib 
import Data.Conduit 
import Data.Text (Text) 
import System.IO 
import Text.XML.Stream.Parse 
import Control.Applicative ((<*)) 

data User = User {name :: Text} deriving (Show) 

parseUserRow = tagName "row" (requireAttr "DisplayName" <* ignoreAttrs) $ \displayName -> do 
    return $ User displayName 

parseUsers = tagNoAttr "users" $ many parseUserRow 

main = do 
    users <- runResourceT $ CB.sourceFile "stackoverflow.com-Users.7z" $= bunzip2 $= parseBytes def $$ force "users required" parseUsers 
    putStrLn $ unlines $ map show users 

我假设这个问题因为Haskell试图在开始打印之前深入评估users列表。该理论得到了该程序的内存使用量的支持,该程序的持续增长速度约为每秒2%(来源:htop)。

我如何“活流”结果到stdout?我认为这是可能的最后添加另一个像$$ CB.sinkFile "output.txt"管道声明。然而,这个特定的版本需要Conduit输出ByteString。你能否指出我正确的方向从哪里出发?

任何帮助将不胜感激!

回答

10

让我首先说xml-conduit中的流助手API已经有多年没有开发过了,并且可能会从重新设想中获益,因为在此期间管道已发生了一些变化。我认为可能有更好的方法来完成任务。

这就是说,让我解释一下你所看到的问题。函数many创建一个结果列表,并且在完成处理之前不会生成任何值。在你的情况下,有这么多的价值,这似乎永远不会发生。最终,当整个文件被读取时,整个用户列表将被一次显示。但这显然不是你想要的行为。

相反,您要做的是创建一个User值,一旦准备好就会生成这些值。你想要做的是基本上用一个新函数替换many函数调用,每次解析结果yield。一个简单的实现的,这可能是:

yieldWhileJust :: Monad m 
       => ConduitM a b m (Maybe b) 
       -> Conduit a m b 
yieldWhileJust consumer = 
    loop 
    where 
    loop = do 
     mx <- consumer 
     case mx of 
      Nothing -> return() 
      Just x -> yield x >> loop 

此外,而不是使用putStrLn $ unlines $ map show,你想整个管道安装到消费者,这将打印每个单独产生User值。这可以通过Data.Conduit.List.mapM_轻松实现,例如:CL.mapM_ (liftIO . print)

我已经根据您的代码放在一起a full example。输入是一个人工生成的无限XML文件,只是为了证明它真的在立即产生输出。

{-# LANGUAGE OverloadedStrings #-} 
{-# LANGUAGE RankNTypes  #-} 
import   Control.Applicative ((<*)) 
import   Control.Concurrent  (threadDelay) 
import   Control.Monad   (forever, void) 
import   Control.Monad.IO.Class (MonadIO (liftIO)) 
import   Data.ByteString  (ByteString) 
import   Data.Conduit 
import qualified Data.Conduit.List  as CL 
import   Data.Text    (Text) 
import   Data.Text.Encoding  (encodeUtf8) 
import   Data.XML.Types   (Event) 
import   Text.XML.Stream.Parse 

-- instead of actually including a large input data file, just for testing purposes 
infiniteInput :: MonadIO m => Source m ByteString 
infiniteInput = do 
    yield "<users>" 
    forever $ do 
     yield $ encodeUtf8 
      "<row id=\"1\" DisplayName=\"StackOverflow\"/><row id=\"2597135\" DisplayName=\"Uli Köhler\"/>" 
     liftIO $ threadDelay 1000000 
    --yield "</users>" -- will never be reached 

data User = User {name :: Text} deriving (Show) 

parseUserRow :: MonadThrow m => Consumer Event m (Maybe User) 
parseUserRow = tagName "row" (requireAttr "DisplayName" <* ignoreAttrs) $ \displayName -> do 
    return $ User displayName 

parseUsers :: MonadThrow m => Conduit Event m User 
parseUsers = void $ tagNoAttr "users" $ yieldWhileJust parseUserRow 

yieldWhileJust :: Monad m 
       => ConduitM a b m (Maybe b) 
       -> Conduit a m b 
yieldWhileJust consumer = 
    loop 
    where 
    loop = do 
     mx <- consumer 
     case mx of 
      Nothing -> return() 
      Just x -> yield x >> loop 

main :: IO() 
main = infiniteInput 
    $$ parseBytes def 
    =$ parseUsers 
    =$ CL.mapM_ print 
+0

非常感谢您抽出时间来写这个!即使我仍然不理解你使用的任何概念(由于缺乏Haskell知识),你真的理解我的问题并提供了一个很好的解决方案。我设法创建了一个解析文件而不是无限流的示例(它花费了一些时间,我会将其作为下面的答案张贴以供进​​一步参考! –

+1

在库的当前版本中,yieldWhileJust已放置在Text中。带有manyYield名称的XML.Stream.Parse。 –

1

基于Michael Snoyman's excellent answer这里是一个修改后的版本,其读取从stackoverflow.com-Users.7z而不是从人工产生的IO流采购的数据。

有关如何直接使用xml-conduit的参考,请参阅Michael's answer。这个答案只是作为一个例子来说明如何在可选的压缩文件中使用那里描述的方法。

这里的主要变化是,你需要使用runResourceT读取该文件,并最终print需要为lift编辑从IO()ResourceT IO()

{-# LANGUAGE OverloadedStrings #-} 
{-# LANGUAGE RankNTypes  #-} 
import qualified Data.Conduit.Binary as CB 
import   Control.Applicative ((<*)) 
import   Control.Concurrent  (threadDelay) 
import   Control.Monad   (forever, void) 
import   Control.Monad.IO.Class (MonadIO (liftIO)) 
import   Data.ByteString  (ByteString) 
import qualified Data.ByteString.Lazy as LB 
import   Data.Conduit 
import qualified Data.Conduit.List  as CL 
import   Data.Text    (Text) 
import   Data.Text.Encoding  (encodeUtf8) 
import   Data.XML.Types   (Event) 
import   Text.XML.Stream.Parse 
import   Data.Conduit.BZlib (bunzip2) 
import   Control.Monad.Trans.Class (lift) 
import   Control.Monad.Trans.Resource (MonadThrow, runResourceT) 

data User = User {name :: Text} deriving (Show) 

parseUserRow :: MonadThrow m => Consumer Event m (Maybe User) 
parseUserRow = tagName "row" (requireAttr "DisplayName" <* ignoreAttrs) $ \displayName -> do 
    return $ User displayName 

parseUsers :: MonadThrow m => Conduit Event m User 
parseUsers = void $ tagNoAttr "users" $ yieldWhileJust parseUserRow 

yieldWhileJust :: Monad m 
       => ConduitM a b m (Maybe b) 
       -> Conduit a m b 
yieldWhileJust consumer = 
    loop 
    where 
    loop = do 
     mx <- consumer 
     case mx of 
      Nothing -> return() 
      Just x -> yield x >> loop 

main :: IO() 
main = runResourceT $ CB.sourceFile "stackoverflow.com-Users.7z" $= bunzip2 $$ parseBytes def 
    =$ parseUsers 
    =$ CL.mapM_ (lift . print)