首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >一个处理管道,2个相同类型的IO源

一个处理管道,2个相同类型的IO源
EN

Stack Overflow用户
提问于 2013-05-26 15:25:04
回答 1查看 2.3K关注 0票数 62

在使用stm、网络管道和管道的GHC Haskell应用程序中,每个套接字都有一个使用runTCPServer自动派生的链。Strands可以通过使用广播TChan与其他strands通信。

这展示了我想如何设置管道“链”:

因此,我们这里有两个源(每个源都绑定到辅助管道),它们生成一个Packet对象,encoder将接受该对象并将其转换为ByteString,然后将套接字发送出去。我在有效地融合这两个输入(性能是一个问题)方面遇到了很大的困难。

如果有人能给我指出正确的方向,我将不胜感激。

因为我没有尝试就把这个问题贴出来是不礼貌的,所以我会把我之前尝试过的东西放在这里;

我已经编写/cherrypicked了一个函数,它(阻塞)从一个TMChan (可关闭通道)产生一个源代码;

代码语言:javascript
复制
-- | Takes a generic type of STM chan and, given read and close functionality,
--   returns a conduit 'Source' which consumes the elements of the channel.
chanSource 
    :: (MonadIO m, MonadSTM m)
    => a                    -- ^ The channel
    -> (a -> STM (Maybe b)) -- ^ The read function
    -> (a -> STM ())        -- ^ The close/finalizer function
    -> Source m b
chanSource ch readCh closeCh = ConduitM pull
    where close     = liftSTM $ closeCh ch
          pull      = PipeM $ liftSTM $ readCh ch >>= translate
          translate = return . maybe (Done ()) (HaveOutput pull close)

同样,将Chan转换为接收器的函数;

代码语言:javascript
复制
-- | Takes a stream and, given write and close functionality, returns a sink
--   which wil consume elements and broadcast them into the channel 
chanSink
    :: (MonadIO m, MonadSTM m)
    => a                 -- ^ The channel
    -> (a -> b -> STM()) -- ^ The write function
    -> (a -> STM())      -- ^ The close/finalizer function
    -> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
    where close  = const . liftSTM $ closeCh ch
          sink   = NeedInput push close
          write  = liftSTM . writeCh ch
          push x = PipeM $ write x >> return sink

然后mergeSources是直接的;fork 2线程(我真的不想做,但什么鬼东西),可以把他们的新项目到一个列表中,然后我产生一个源代码;

代码语言:javascript
复制
-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
--   a source which consumes the elements of the channel.
mergeSources
    :: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
    => [Source (ResourceT m) a]             -- ^ The list of sources
    -> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    where push c s = s $$ chanSink c writeTMChan closeTMChan
          fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
          retn c   = return $ chanSource c readTMChan closeTMChan

虽然我成功地使这些函数进行了类型检查,但我没有成功地利用这些函数进行类型检查;

代码语言:javascript
复制
-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
    use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
    -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
    mergsrc $$ protocol $= encoder =$ appSink appdata
    where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
          mergsrc = mergeSources [appSource appdata $= decoder, chansrc]

-- | Structure which holds mutable information for clients
data SessionState = SessionState
    { _ssBroadcast     :: TMChan Packet -- ^ Outbound packet broadcast channel
    }

makeLenses ''SessionState

-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)

-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO

我认为这种方法无论如何都是有缺陷的--有许多中间列表和转换。这不会对性能有好处。寻求指导。

PS。据我所知,这不是;Fusing conduits with multiple inputs的副本,因为在我的情况下,两个源都产生相同的类型,我并不关心Packet对象是从哪个源产生的,只要我不是在等待一个而另一个已经准备好了要使用的对象。

PPS。我为示例代码中镜头的使用(因此对知识的要求)道歉。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2013-07-06 03:08:56

我不知道这是否有帮助,但我试着实现了Iain的建议,并制作了一个mergeSources'的变体,它会在任何一个通道发生变化时立即停止:

代码语言:javascript
复制
mergeSources' :: (MonadIO m, MonadBaseControl IO m)
              => [Source (ResourceT m) a] -- ^ The sources to merge.
              -> Int -- ^ The bound of the intermediate channel.
              -> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
    c <- liftSTM $ newTBMChan bound
    mapM_ (\s -> resourceForkIO $
                    s $$ chanSink c writeTBMChan closeTBMChan) sx
    return $ sourceTBMChan c

(这个简单的添加可以在here上找到)。

对你的mergeSources版本的一些评论(对它们持保留态度,可能是我不太理解一些东西):

  • 使用...TMChan而不是...TBMChan似乎很危险。如果写入者比读取者更快,你的堆就会爆炸。看一下您的图表,如果您的TCP同级读取数据的速度不够快,似乎很容易发生这种情况。所以我肯定会使用...TBMChan,它可能很大,但是有限制,

  • ,你不需要MonadSTM m约束。所有STM内容都封装到IO中,并使用

liftSTM = liftIO。原子地

我发现,当在serverApp.

  • Just中使用mergeSources'时,这可能会稍微帮助你解决美容问题

liftSTM newTMChan >>= liftA2 (>>) (fsrc ) retn

由于它在(->) r monad上使用了liftA2,所以很难阅读。我会说

do c <- liftSTM newTMChan fsrc sx c retn c

会更长,但更容易阅读。

您是否可以创建一个可以使用serverApp的自包含项目

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/16757060

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档