在使用stm、网络管道和管道的GHC Haskell
应用程序中,每个套接字都有一个使用runTCPServer
自动派生的链。Strands可以通过使用广播TChan与其他strands通信。
这展示了我想如何设置管道“链”:
因此,我们这里有两个源(每个源都绑定到辅助管道),它们生成一个Packet
对象,encoder
将接受该对象并将其转换为ByteString
,然后将套接字发送出去。我在有效地融合这两个输入(性能是一个问题)方面遇到了很大的困难。
如果有人能给我指出正确的方向,我将不胜感激。
因为我没有尝试就把这个问题贴出来是不礼貌的,所以我会把我之前尝试过的东西放在这里;
我已经编写/cherrypicked了一个函数,它(阻塞)从一个TMChan (可关闭通道)产生一个源代码;
-- | Takes a generic type of STM chan and, given read and close functionality,
-- returns a conduit 'Source' which consumes the elements of the channel.
chanSource
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> STM (Maybe b)) -- ^ The read function
-> (a -> STM ()) -- ^ The close/finalizer function
-> Source m b
chanSource ch readCh closeCh = ConduitM pull
where close = liftSTM $ closeCh ch
pull = PipeM $ liftSTM $ readCh ch >>= translate
translate = return . maybe (Done ()) (HaveOutput pull close)
同样,将Chan转换为接收器的函数;
-- | Takes a stream and, given write and close functionality, returns a sink
-- which wil consume elements and broadcast them into the channel
chanSink
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> b -> STM()) -- ^ The write function
-> (a -> STM()) -- ^ The close/finalizer function
-> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
where close = const . liftSTM $ closeCh ch
sink = NeedInput push close
write = liftSTM . writeCh ch
push x = PipeM $ write x >> return sink
然后mergeSources是直接的;fork 2线程(我真的不想做,但什么鬼东西),可以把他们的新项目到一个列表中,然后我产生一个源代码;
-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
-- a source which consumes the elements of the channel.
mergeSources
:: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
=> [Source (ResourceT m) a] -- ^ The list of sources
-> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
where push c s = s $$ chanSink c writeTMChan closeTMChan
fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
retn c = return $ chanSource c readTMChan closeTMChan
虽然我成功地使这些函数进行了类型检查,但我没有成功地利用这些函数进行类型检查;
-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
-- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
mergsrc $$ protocol $= encoder =$ appSink appdata
where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
mergsrc = mergeSources [appSource appdata $= decoder, chansrc]
-- | Structure which holds mutable information for clients
data SessionState = SessionState
{ _ssBroadcast :: TMChan Packet -- ^ Outbound packet broadcast channel
}
makeLenses ''SessionState
-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)
-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO
我认为这种方法无论如何都是有缺陷的--有许多中间列表和转换。这不会对性能有好处。寻求指导。
PS。据我所知,这不是;Fusing conduits with multiple inputs的副本,因为在我的情况下,两个源都产生相同的类型,我并不关心Packet
对象是从哪个源产生的,只要我不是在等待一个而另一个已经准备好了要使用的对象。
PPS。我为示例代码中镜头的使用(因此对知识的要求)道歉。
发布于 2013-07-06 03:08:56
我不知道这是否有帮助,但我试着实现了Iain的建议,并制作了一个mergeSources'
的变体,它会在任何一个通道发生变化时立即停止:
mergeSources' :: (MonadIO m, MonadBaseControl IO m)
=> [Source (ResourceT m) a] -- ^ The sources to merge.
-> Int -- ^ The bound of the intermediate channel.
-> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
c <- liftSTM $ newTBMChan bound
mapM_ (\s -> resourceForkIO $
s $$ chanSink c writeTBMChan closeTBMChan) sx
return $ sourceTBMChan c
(这个简单的添加可以在here上找到)。
对你的mergeSources
版本的一些评论(对它们持保留态度,可能是我不太理解一些东西):
...TMChan
而不是...TBMChan
似乎很危险。如果写入者比读取者更快,你的堆就会爆炸。看一下您的图表,如果您的TCP同级读取数据的速度不够快,似乎很容易发生这种情况。所以我肯定会使用...TBMChan
,它可能很大,但是有限制,,
MonadSTM m
约束。所有STM内容都封装到IO
中,并使用liftSTM = liftIO。原子地
我发现,当在serverApp
.
mergeSources'
时,这可能会稍微帮助你解决美容问题liftSTM newTMChan >>= liftA2 (>>) (fsrc ) retn
由于它在(->) r
monad上使用了liftA2
,所以很难阅读。我会说
do c <- liftSTM newTMChan fsrc sx c retn c
会更长,但更容易阅读。
您是否可以创建一个可以使用serverApp
的自包含项目
https://stackoverflow.com/questions/16757060
复制相似问题