我做了一次小尝试,使用了 80 行 Haskell 代码来构建 wc 程序,并且最终得到的结果是比手动调优过的C语言实现更快。
作为参考,在开始讲述我的具体实现过程之前,先介绍一下准备工作。我使用的是Mac版本的wc,具体实现的源代码可以点击这里查看。
需要注意的是,本文并不是想要表达Haskell比C“更好”的意思,而是希望通过这个有趣的探索,使得大家能够从中掌握一些编程技巧。
下面是我们所要考虑的标准:
按惯例,我们应该先尝试最笨的办法,搞清楚背后的机制,然后再一点点改进。在Haskell中计算字符、行和单词数的最笨办法是什么呢?我们可以读取文件,然后运行length、length . words和length . lines函数来计数!
stupid :: FilePath -> IO (Int, Int, Int)
stupid fp = do
contents <- readFile fp
return (length s, length (words s), length (lines s))
这个办法竟然是可行的,并且为我们提供了与wc相同的答案,只不过你得等好一阵儿……测试大型文件的时候我等得不耐烦了(远不止几分钟),但在较小的测试文件(90 MB)上得到了以下结果: 90 MB测试文件:
wc | stupid-wc | |
---|---|---|
运行时间 | 0.37s | 17.07s |
峰值内存占用 | 1.86MB | 2403MB |
不用说还有改进的余地…
思考一下速度为什么这么慢呢。首先想到的是,我们要遍历文件的内容3次!这也意味着在我们遍历列表时GHC没法做垃圾回收,因为列表正在被占用。我们将文件的每个字符都保存在一个链表中,结果区区90MB的文件竟然占据了2.4 GB内存!
难怪结果这么差呢。那么是不是可以简化为在结构上传递一次?我们要计数的是三件很简单的事情,所以也许一次就能处理完毕了。想要遍历结构一次性得到最终结果,我想到了folds。
使用fold对字符或行来计数是很容易的:字符数总是加一,当前字符是换行字符时,行数加一;但是单词数呢?我们不能每遇到一个空格就加一个单词数,因为会有连续空格的情况。我们需要判断前一个字符是否为空格,只有在遇到一个新单词时才增加计数。这样做不是很难。我们将使用Data.List中的foldl’作为实现。
import Data.List
import Data.Char
simpleFold :: FilePath -> IO (Int, Int, Int)
simpleFold fp = do
countFile <$> readFile fp
countFile :: String -> (Int, Int, Int)
countFile s =
let (cs, ws, ls, _) = foldl' go (0, 0, 0, False) s
in (cs, ws, ls)
where
go :: (Int, Int, Int, Bool) -> Char -> (Int, Int, Int, Bool)
go (cs, ws, ls, wasSpace) c =
let addLine | c == '\n' = 1
| otherwise = 0
addWord | wasSpace = 0
| isSpace c = 1
| otherwise = 0
in (cs + 1, ws + addWord, ls + addLine, isSpace c)
结果遇到了更严重的问题!这段程序需要运行好久,而且很快就吃掉了超过3GB内存!出什么事了?原来我们使用了foldl的严格版本(由尾部打勾’表示);但只有“弱头范式”(WHNF)才是严格的,意味着它在元组累加器的结构上是严格的,而在实际值上却不是!很烦人,因为这意味着我们正在建立大量额外内容,迭代完整个文件之后我们才能完整地求值!有时稍微偷懒一下就会像这样出事。不注意的话,这种内存泄漏很容易导致Web服务器崩溃! 90MB测试文件:
wc | simple-fold-wc | |
---|---|---|
运行时间 | 0.37s | 太长了,懒得等 |
峰值内存占用 | 1.86MB | >3GB |
我们可以告诉GHC在每次迭代中严格评估元组的内容,这样就解决问题了。一种简单的方法是使用BangPatterns扩展。它让我们使用!在参数列表中强制对函数的每次运行求值。下面是go的新版本:
{-# LANGUAGE BangPatterns #-}
...
go :: (Int, Int, Int, Bool) -> Char -> (Int, Int, Int, Bool)
go (!cs, !ws, !ls, !wasSpace) c =
let addLine | c == '\n' = 1
| otherwise = 0
addWord | wasSpace = 0
| isSpace c = 1
| otherwise = 0
in (cs + 1, ws + addWord, ls + addLine, isSpace c)
简单的改动就能极大提升运行速度;下面是我们新的测试结果: 90MB测试文件:
wc | strict-fold-wc | |
---|---|---|
运行时间 | 0.37s | 8.12s |
峰值内存占用 | 1.86MB | 3.7MB |
很好,现在内存状况大大改善了,90MB的文件只用了几MB内存,说明我们终于能正确传输文件内容了!虽然偷懒在前面惹了麻烦,但现在我们是在正确的位置上偷懒,它为我们免费提供了流传输!流传输是自然发生的,因为readFile实际上在做延迟IO。延迟IO在Web服务器之类的场景中可能会是麻烦,因为你不确定IO到底何时执行,但在我们的情况下它提供了好得多的内存驻留。
现在暂时不必担心内存问题了,所以再来关心性能吧!我想到的一个办法就是尝试切换到ByteString上取代String。使用String意味着我们在读取文件时对文件隐式解码,这需要花费时间,并且我们在整个过程中都使用了链表,因此我们无法轻松利用批处理或在读取数据时缓存的好处。
实际上这种更改很容易做到,bytestring包提供了模块:Data.ByteString.Lazy.Char8,该模块提供了像字符串一样处理延迟ByteString的操作,同时拥有ByteString的所有性能优势。请注意,它实际上并没有验证每个字节是否是有效字符,也没有做任何解码,因此我们要确保传递的数据都是有效的。默认情况下,wc假定其输入是ASCII编码,因此我们也这样做的话应该会很安全。如果我们的输入是ASCII,那么此模块中的函数将正常运行。
从字面上看,我要做的唯一更改是将Data.List导入切换到Data.ByteString.Lazy.Char8,然后将readFile和foldl’函数切换到其ByteString版本:
import Data.Char
import qualified Data.ByteString.Lazy.Char8 as BS
simpleFold :: FilePath -> IO (Int, Int, Int)
simpleFold fp = do
simpleFoldCountFile <$> BS.readFile fp
simpleFoldCountFile :: BS.ByteString -> (Int, Int, Int)
simpleFoldCountFile s =
let (cs, ws, ls, _) = BS.foldl' go (0, 0, 0, False) s
in (cs, ws, ls)
where
go :: (Int, Int, Int, Bool) -> Char -> (Int, Int, Int, Bool)
go (!cs, !ws, !ls, !wasSpace) c =
let addLine | c == '\n' = 1
| otherwise = 0
addWord | wasSpace = 0
| isSpace c = 1
| otherwise = 0
in (cs + 1, ws + addWord, ls + addLine, isSpace c)
这个小小的变化使我们的时间缩短了将近一半! 90MB测试文件:
wc | bs-fold-wc | |
---|---|---|
运行时间 | 0.37s | 3.41s |
峰值内存占用 | 1.86MB | 5.48MB |
显然我们在不断进步。内存使用量略有增加,但似乎是固定的开销。不幸的是我们离wc还有几个数量级的差距;那么还有什么可以做的呢。
我想试一下这个。现代PC一般有很多CPU核心,而且新一代机器的核心数量增长速度比主频增长快得多,所以应该好好利用一下。
拆分这种计算并非易事。为了利用多个核心,我们需要将工作分解为多个部分。理论上讲这很容易,只需将文件拆分为多个块,然后为每个内核分配一个块即可!深究起来就不是这么简单了。组合字符计数非常容易,我们可以把每个块的总数加起来。行数也可以这么做,但单词数就要出问题了!如果在一个单词的中间或在几个连续的空格中间断开会发生什么?为了合并单词计数,我们需要跟踪每个块的开始和结束状态,还要妥善地把它们组合起来。这就得涉及很多簿记工作了,我可不想这么干。
这里的救星是Monoids!Monoid的关联定律意味着,只要我们能够发展出一个合法的monoid,那么就算在这种并行条件下它也能正常工作,这是很自然的事情。但我们能不能编写一个Monoid来处理单词计数的这种复杂性呢?
当然可以!虽然具体的做法不是一眼就能看出来的,但有一整类计数问题都归于这个范畴,所幸我之前就已经研究过这类问题。基本上,我们需要计算给定的不变量从一个序列的开始到结束阶段所改变的次数。之前我已经归一化了这类monoid,将其命名为flux monoids(http://hackage.haskell.org/package/flux-monoid)。我们需要做的是计算从空格字符变为非空格字符的次数。我们可以使用Flux monoid本身来表达这一点,但由于我们非常关注严格性和性能,因此我会定义一个定制版本的Flux monoid,如下:
data CharType = IsSpace | NotSpace
deriving Show
data Flux =
Flux !CharType
{-# UNPACK #-} !Int
!CharType
| Unknown
deriving Show
我们只在单词计数部分需要这些类型。 CharType表示给定字符是否被视为空格;然后Flux类型代表一小段文本,存储以下字段:最左边的字符是否为空格,整个文本块中有多少个单词,以及最右边的字符是否为空格。实际上,我们没有将文本保留在结构中,因为这个问题中我们不需要它。我UNPACK了Int,并严格限制了所有字段,以免遇到之前使用延迟元组时出现的问题。使用严格数据类型意味着我不需要在计算中使用BangPatterns。
接下来我们需要这种类型的semigroup和Monoid实例!
instance Semigroup Flux where
Unknown <> x = x
x <> Unknown = x
Flux l n NotSpace <> Flux NotSpace n' r = Flux l (n + n' - 1) r
Flux l n _ <> Flux _ n' r = Flux l (n + n') r
instance Monoid Flux where
mempty = Unknown
这里的Unknown构造函数表示一个Monoidal身份,我们实际上可以省略它,并使用Maybe来将Semigroup提升为Monoid,但是Maybe会在我们的Semigroup附加项中引入不必要的延迟!为了简单起见,我将其定义为类型的一部分。 我们定义的(<>)操作检查两个文本块的连接点是否出现在一个单词的中间,如果确实如此,则必须分别计算同一单词的开始和结束,然后总计时减去一个来平衡计数。
最后,我们需要一种从单个字符构建Flux对象的方法。
flux :: Char -> Flux
flux c | isSpace c = Flux IsSpace 0 IsSpace
| otherwise = Flux NotSpace 1 NotSpace
这很简单,我们将以非空格字符开头和结尾的字符组视为“单词”,然后一个字符两边都是零单词计数的空格的话,它自己就不算单词。 可能你还是不太明白,但计算单词数的部分到这里就全齐了!
>>> foldMap flux "testing one two three"
Flux NotSpace 4 NotSpace
>>> foldMap flux "testing on" <> foldMap flux "e two three"
Flux NotSpace 4 NotSpace
>>> foldMap flux "testing one " <> foldMap flux " two three"
Flux NotSpace 4 NotSpace
看起来一切正常! 我们已经做好了单词数统计部分,现在我们需要字符计数和行计数的Monoidal版本。这里很简单:
data Counts =
Counts { charCount :: {-# UNPACK #-} !Int
, wordCount :: !Flux
, lineCount :: {-# UNPACK #-} !Int
}
deriving (Show)
instance Semigroup Counts where
(Counts a b c) <> (Counts a' b' c') = Counts (a + a') (b <> b') (c + c')
instance Monoid Counts where
mempty = Counts 0 mempty 0
没问题!同样,我们需要一种将单个字符转换为Counts对象的方法:
countChar :: Char -> Counts
countChar c =
Counts { charCount = 1
, wordCount = flux c
, lineCount = if (c == '\n') then 1 else 0
}
我们也尝试一下:
>>> foldMap countChar "one two\nthree"
Counts {charCount = 13, wordCount = Flux NotSpace 3 NotSpace, lineCount = 1}
看上去很好!你可以随意试验一些内容,确认这是合法的Monoid。 有了合法的Monoid,我们就能轻松拆分文件了!
继续下一步之前,我们试着将Monoid加到已有代码中,确认所获得的答案是相同的。
module MonoidBSFold where
import Data.Char
import qualified Data.ByteString.Lazy.Char8 as BS
monoidBSFold :: FilePath -> IO Counts
monoidBSFold paths = monoidFoldFile <$> BS.readFile fp
monoidFoldFile :: BS.ByteString -> Counts
monoidFoldFile = BS.foldl' (\a b -> a <> countChar b) mempty
我们已经将一些复杂性移到了Counts类型中,这样就能真正简化这里的实现了。总的来说效果不错,因为测试单个数据类型要容易得多。 附带的好处是,这一更改以某种方式进一步加快了速度!
90MB测试文件:
wc | monoid-bs-fold-wc | |
---|---|---|
运行时间 | 0.37s | 1.94s |
峰值内存占用 | 1.86MB | 3.83MB |
通过这一更改,我们节省了很多时间和内存,具体原因我也不太清楚。使用完全严格的数据结构后,我们可能已经消除了潜伏在某处的延迟。
更新:guibou向我指出,我们的Flux和Counts类型使用UNPACK编译指示,而我们之前使用的是常规ol’元组。显然,GHC有时对于UNPACK元组足够聪明,但是在这种情况下可能失效了。通过UNPACK,我们可以节省一些指针间接操作并使用更少的内存!
接下来我们想内联一些定义!为什么?因为要提高性能!我们可以使用INLINE编译指示告诉GHC我们的函数对性能至关重要,然后它将为我们内联;还可能会触发进一步的优化。
monoidBSFold :: FilePath -> IO Counts
monoidBSFold paths = monoidBSFoldFile <$> BS.readFile fp
{-# INLINE monoidBSFold #-}
monoidBSFoldFile :: BS.ByteString -> Counts
monoidBSFoldFile = BS.foldl' (\a b -> a <> countChar b) mempty
{-# INLINE monoidBSFoldFile #-}
我还把INLINE添加到了countChar和flux函数里。那就看看这会不会产生不一样的结果吧: 90MB测试文件:
原始 | 内联 | |
---|---|---|
运行时间 | 1.94s | 0.47s |
峰值内存占用 | 3.83MB | 4.35MB |
有趣的是,我们的运行时间似乎减少了75%!我也搞不清楚这是侥幸还是真的碰到了什么幸运的开关,但总之我很满意!它大幅提升了我们的内存使用量,但暂时还不需要担心这件事。
下面将其与C版本对比:
90MB测试文件:
wc | inlined-monoid-bs-wc | |
---|---|---|
运行时间 | 0.37s | 0.47s |
峰值内存占用 | 1.86MB | 4.35MB |
到这一步,我们和wc的差距已经大幅缩小了。但这里的运行时间都在1秒以内,因此我要扩大测试文件的体积,然后多运行几次来看看能发现什么新东西。
我扩展到了一个543MB的纯文本文件,并连续运行了几次以预热缓存。这显然很重要,因为经过几次预热后运行时间减少了整整33%。我知道我的测试方法并不完全是“科学的”,但可以用来对我们的代码给出一个很好的估计。那么下面就是大文件上的测试表现:
543MB测试文件:
wc | inlined-monoid-bs-wc | |
---|---|---|
运行时间 | 2.06s | 2.73s |
峰值内存占用 | 1.85MB | 3.97MB |
从这里我们可以看到,结果已经非常接近了!考虑到我们把wc克隆到了一个高级垃圾回收语言中,代码却只有80行左右,这个结果还是不错的!
你可能不会认为并行化到多个核心上能有多大效果,因为整个操作可能都以IO瓶颈为主,但是我还是会这样试试,毕竟我既固执又无聊嘛。
我们已经用一个Monoid表达了问题,这意味着拆分计算应该是很简单的!这里的诀窍在于读取我们数据的过程。如果我们尝试读取所有数据,然后将其拆分为多个块,则必须立即将整个文件加载到内存中,这就会大幅提升内存驻留,而且也可能会影响性能!我们可以试着流式传输并以这种方式拆分,但随后我们必须先处理第一个块,然后才能开始第二个拆分,你应该能意识到这里的问题所在。所以我的做法是为每个核心启动一个单独的线程,然后在这些线程中各自打开一个文件句柄。然后在将合并计数之前,我将查看各个句柄以消除相交的偏移量,并对文件的各个非重叠部分执行我们的操作。
这样就OK,另外我太喜欢在Haskell中编写并发代码了!
import Types
import Control.Monad
import Data.Traversable
import Data.Bits
import GHC.Conc (numCapabilities)
import Control.Concurrent.Async
import Data.Foldable
import System.IO
import System.Posix.Files
import qualified Data.ByteString.Lazy.Char8 as BL
import Data.ByteString.Internal (c2w)
import GHC.IO.Handle
multiCoreCount :: FilePath -> IO Counts
multiCoreCount fp = do
putStrLn ("Using available cores: " <> show numCapabilities)
size <- fromIntegral . fileSize <$> getFileStatus fp
let chunkSize = fromIntegral (size `div` numCapabilities)
fold <$!> (forConcurrently [0..numCapabilities-1] $ \n -> do
-- Take all remaining bytes on the last capability due to integer division anomolies
let limiter = if n == numCapabilities - 1
then id
else BL.take (fromIntegral chunkSize)
let offset = fromIntegral (n * chunkSize)
fileHandle <- openBinaryFile fp ReadMode
hSeek fileHandle AbsoluteSeek offset
countBytes . limiter <$!> BL.hGetContents fileHandle)
{-# INLINE handleSplitUTF #-}
countBytes :: BL.ByteString -> Counts
countBytes = BL.foldl' (\a b -> a <> countChar b) mempty
{-# INLINE countBytes #-}
这里发生了很多事情,我会尽量逐一解释。 我们可以从GHC.Conc导入程序可用的“能力”数量(即我们可以访问的核心数量)。接下来,我们在要计数的文件上运行fileStat,以获取文件中的字节数。然后我们使用整数除法来确定每个核心应处理多少字节。整数除法会将结果四舍五入,因此我们必须小心回收可能被遗漏的字节。然后我们使用Control.Concurrent.Async中的forConcurrently为每个核心运行一个单独的线程。
在每个线程内,我们检查是否位于处理文件最后一个块的线程内,如果是,则应读取直到文件结尾为止,以便从之前的舍入错误中提取剩余字节;否则,我们会将自己限制为只处理chunkSize字节。然后,我们可以将块大小乘以线程号来计算文件的偏移量。我们打开一个二进制文件句柄,然后使用hSeek将我们的句柄移动到线程的起始偏移量。接着我们可以简单地读取分配的字节数,并使用与以前相同的逻辑将它们fold下来。处理完每个线程后,我们将使用一个简单的fold,将每个块的计数合并为总计数。
我们在一些地方使用<$!>来增加额外的严格性,因为我们要确保fold操作发生在每个线程内,而不是在线程联结之后。我可能加了太多严格性注解了,但是多做总比少做然后遗漏过去要好。
然后就带这只小狗出去兜风吧!
预热缓存之后,我在配备SSD的4核2013 Macbook Pro上各跑了几次测试,结果取平均:
543MB测试文件:
wc | multicore-wc | |
---|---|---|
运行时间 | 2.07s | 1.23s |
峰值内存占用 | 1.87MB | 7.06MB |
效果似乎非常明显!实际上我们比一些经过数十年手工优化的C代码还要快。大家可以抱着怀疑的态度先接受这个结果;很难这里的缓存发生了什么事情。可能有多层的磁盘缓存在起作用。也许多线程只在从缓存中读取文件时有用?
我稍微研究了下,看起来某些存储设备可能会因为并行文件读取而加快输出速度,还有些设备却可能会因此变慢。你的结果可能会不一样。如果有人是SSD方面的专家,我很乐意就此与他探讨。无论如何,我对结果是很满意的。
更新:真的有些读者是SSD方面的专家!Paul Tanner给我写了一封邮件,解释说现代的NVMe驱动器往往可以从这种并行性中受益,只要我们不访问同一个块即可(这里就没有)。不幸的是,我的古老Macbook没有这种存储器,但从好的方面来看,这意味着这些代码在现代存储器上实际会跑得更快。谢谢Paul!
作为参考,我们程序的实际用户时间为4.22s(分为4个内核),这意味着就实际处理器周期而言,并行程序的效率并不如简单版本,但使用多个内核后可以减少“真正的”挂钟时间。
到目前为止,我们一直在避免某些事情,就是假设每个文件都是简单的ASCII!可现实并不是这样的。如今许多文档都使用UTF-8编码,如果文档里只有有效的ASCII字符的话那就和ASCII文件完全相同,但如果那些疯狂的青少年在其中放了一些表情符号,那么一切都会搞砸了。
这个问题分为两个方面。首先,我们目前计数的是BYTES而不是CHARACTERS,因为在ASCII码域中它们在语义上是相同的。在我们当前的代码中,如果遇到UTF-8编码的表情,虽然它只算一个字符,但我们会把它算作至少2个字符。我们是应该解码这些东西,但是说起来容易做起来难,因为我们将文件分割成了任意字节数的块;这意味着我们可能会将某个表情分成两个不同的块,导致解码无效!简直是一场恶梦。
这也是多线程wc可能并不合适的另一个原因,但我并没有那么容易气馁。接下来我将做一些假设:
做出这两个假设后,我们可以利用UTF-8编码方案的一些细节来解决我们的问题。首先,根据UTF-8规范我们知道它与ASCII完全向后兼容。这意味着每个ASCII字节在UTF-8编码中都是完全相同的字节。其次,我们知道文件中的其他字节与有效ASCII字节的编码不会冲突。你可以在UTF-8维基百科页面(https://en.wikipedia.org/wiki/UTF-8)上的图表中查看原因。连续字节以前导“1”开头,而没有ASCII字节会以“ 1”开头。
这两个事实意味着我们可以安全地保持当前的“空格”检测逻辑不变!我们不可能“分割”空格或换行符,因为它们都被编码在单个字节中,而且我们知道不会意外地计数属于不同代码点的某个字节,因为ASCII字节的编码没有重叠 。但我们确实需要更改字符计数逻辑。
关于UTF-8的最后一个事实是,每个UTF-8编码的代码点都只包含集合中的一个字节:0xxxxxxx、110xxxxx、1110xxxx、11110xxx。连续字节全部以10开始,因此如果我们计算除以10开头的字节以外的所有字节,那么即使将代码点划分为不同的块,我们也将对每个代码点进行精确计数!
所有这些事实结合在一起,就意味着我们可以编写一个每字节的monoid来一并计算UTF-8代码点或ASCII字符!
请注意,从技术上讲Unicode代码点(codepoint)与“字符”不同,有许多代码点(比如变音符号)会“融合”起来,以显示为单个字符,但是据我所知wc也不能单独处理它们。
实际上,我们当前的Counts monoid已经很好了,我们只需调整countChar函数即可:
import Data.Bits
import Data.ByteString.Internal (c2w)
countByte :: Char -> Counts
countByte c =
Counts {
-- Only count bytes at the START of a codepoint, not continuation bytes
charCount = if (bitAt 7 && not (bitAt 6)) then 0 else 1
, wordCount = flux c
, lineCount = if (c == '\n') then 1 else 0
}
where
bitAt = testBit (c2w c)
{-# INLINE countByte #-}
就是这样!现在我们就可以处理UTF-8或ASCII了;我们甚至不需要知道正在处理的是哪种编码,总之都会给出正确的答案。 wc(至少是Macbook上的版本)有一个-m标志,用于在计数时处理多字节字符。一些快速实验表明,要求wc处理多字节字符会大大减慢处理过程(它需要解码每个字节)。拿我们的版本对比一下(我已经确认,在带有许多非ASCII字符的大型UTF-8编码文档上运行时也将获得相同的结果):
543MB测试文件:
wc-mwl | multicore-utf8-wc | |
---|---|---|
运行时间 | 5.56s | 3.07s |
峰值内存占用 | 1.86MB | 7.52MB |
就像预测的那样,我们遥遥领先!我们的新版本比之前只计数每个字节时要慢一些(现在要进行一些额外的位检查),因此在程序中添加一个utf标志可能是个好主意,这样我们就能一直以最快的速度处理给定的输入了。
自从这篇文章发布以来,Harendra Kumar为我提供了新的性能调优方法,不仅改善了性能,还使我们能够从stdin中启用流输入!代码也很漂亮!
秘密在于streamly这个库(https://github.com/composewell/streamly),这是一个出色的高级高性能流式传输库。下面来看一些代码!再次感谢Harendra Kumar的这一实现:
module Streaming where
import Types
import Data.Traversable
import GHC.Conc (numCapabilities)
import System.IO (openFile, IOMode(..))
import qualified Streamly as S
import qualified Streamly.Data.String as S
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Memory.Array as A
import qualified Streamly.Internal.FileSystem.Handle as FH
streamingBytestream :: FilePath -> IO Counts
streamingBytestream fp = do
src <- openFile fp ReadMode
S.foldl' mappend mempty
$ S.aheadly
$ S.maxThreads numCapabilities
$ S.mapM countBytes
$ FH.toStreamArraysOf 1024000 src
where
countBytes =
S.foldl' (\acc c -> acc <> countByte c) mempty
. S.decodeChar8
. A.toStream
{-# INLINE streamingBytestream #-}
注意:这里直接使用了Github存储库中的7.10版本。它还使用了一些内部模块。 首先,我们只需要打开文件即可。
接下来是流代码,我们将从下至上阅读它,遵循信息流的顺序。
FH.toStreamArraysOf 1024000 src
这会将来自文件句柄的字节分块为Byte数组流。使用Byte数组要比延迟ByteString更快!我们将为文件的大约每个MB使用一个单独的数组,你可以根据自己的喜好来调整。
S.mapM countBytes
它使用mapM在数组上运行countBytes函数。countBytes本身从数组创建一个流,并使用Monoidal字节计数器对其进行流式fold:
countBytes =
S.foldl' (\acc c -> acc <> countByte c) mempty
. S.decodeChar8
. A.toStream
接下来,我们告诉streamly以并行方式在数组上运行映射,从而允许单独的线程处理每个1MB的块。我们将线程数限制为我们的核心数。一旦我们读完数据,就可以立即对其处理,并且计数代码没有任何理由要阻塞,因此添加比核心数更多的线程可能只会增加调度程序的工作量。
S.maxThreads numCapabilities
Streamly提供了许多不同的流评估策略,我们使用aheadly这个策略,允许并行处理流元素,但仍保证输出按照与输入对应的顺序发出。由于我们使用的是Monoid,因此只要一切都以适当的顺序结束,我们就可以按照自己喜欢的方式对计算分块:
S.aheadly
至此,我们已经计数了所有1MB的输入块,但还是需要将所有块合并在一起;我们可以将它们全部映射到另一个流式fold中:
S.foldl' mappend mempty
完事了!来测试一下吧! 下面是我们的543MB测试文件的非utf版本结果:
wc | streaming-wc | |
---|---|---|
运行时间 | 2.07s | 1.07s |
峰值内存占用 | 1.87MB | 17.81MB |
我们可以看到速度变得更快了,但要消耗大量的内存,我觉得可以通过调整输入分块大小来缓解这种情况。那就尝试一下,下面是100KB块与1MB块的对比:
streaming-wc (100KB chunks) | streaming-wc (1MB chunks) | |
---|---|---|
运行时间 | 1.20s | 1.07s |
峰值内存占用 | 8.02MB | 17.81MB |
验证了我的猜想,我们可以用一些性能来换取相当大的内存空间。我对结果已经非常满意了,但你也可以随意测试其他调整策略。
最后我们测试一下543MB测试文件的UTF8版本,结果如下:
wc-mwl | streaming-utf-wc (1MB chunks) | |
---|---|---|
运行时间 | 5.56s | 2.67s |
峰值内存占用 | 1.86MB | 17.o2MB |
我们的速度还在变快!现在我们可能想要减少一些内存使用量了!
总的来说,我认为流式传输版本是我的最爱,它非常高级、易读,并且可以从任意文件句柄(包括stdin)读取,后者是wc的一种非常常见的用例。Streamly非常酷。
结果如何?我觉得非常棒!我们的单核lazy-bytestring wc表现是差不多的。切换到多核方法后事情就不一样了!没有磁盘缓存预热的话,我们的wc克隆在实践中是否更快还有待观察,但就原始性能而言,我们大大提升了运行速度!流式版本应该对磁盘缓存没有那么大的依赖才是。
Haskell作为一种语言来说并不是很完美,但如果我在编写高级的、完全经过类型检查的代码时,还能获得与C程序相当的性能,那么我就认为它是更好的选择。
原文链接:
领取专属 10元无门槛券
私享最新 技术干货