diff options
Diffstat (limited to 'hsm-core/Hsm/Core/Zmq/Client.hs')
-rw-r--r-- | hsm-core/Hsm/Core/Zmq/Client.hs | 59 |
1 files changed, 22 insertions, 37 deletions
diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs index 793841e..6093e54 100644 --- a/hsm-core/Hsm/Core/Zmq/Client.hs +++ b/hsm-core/Hsm/Core/Zmq/Client.hs @@ -11,6 +11,7 @@ module Hsm.Core.Zmq.Client , runClient ) where +import Control.Monad (forM_) import Control.Monad.Loops (whileM) import Data.Binary (Binary) import Data.Text (Text, pack, unpack) @@ -50,30 +51,23 @@ receiver = do <> pack (show $ body @a message) return $ body message -receive :: - forall es a. (Log :> es, Client :> es, Binary a, Show a) - => Stream (Eff es) a +receive :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) a receive = repeatM receiver -poll :: - forall es a. (Log :> es, Client :> es, Binary a, Show a) - => Stream (Eff es) [a] -poll = repeatM poller +poll :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) [a] +poll = + repeatM $ do + ms <- whileM newMsg receiver + localDomain domain + $ localDomain "poller" + $ logTrace_ + $ pack (show $ length ms) <> " new message(s) on queue" + return ms where - newMsg :: Eff es Bool newMsg = do Client sock <- E.getStaticRep peek <- E.unsafeEff_ $ Z.poll 0 [Z.Sock sock [Z.In] Nothing] return $ peek /= [[]] - -- - poller :: Eff es [a] - poller = do - ms <- whileM newMsg receiver - localDomain domain - $ localDomain "poller" - $ logTrace_ - $ pack (show $ length ms) <> " new message(s) on queue" - return ms runClient :: forall env es a. @@ -86,23 +80,14 @@ runClient :: ) => Eff (Client : es) a -> Eff es a -runClient action = withSocket Z.Sub >>= run - where - run :: Z.Socket Z.Sub -> Eff es a - run sock = E.evalStaticRep (Client sock) $ initialize >> action - where - connect :: Text -> Eff (Client : es) () - connect = E.unsafeEff_ . Z.connect sock . unpack - -- - subscribe :: Text -> Eff (Client : es) () - subscribe = E.unsafeEff_ . Z.subscribe sock . encodeUtf8 - -- - initialize :: Eff (Client : es) () - initialize = - localDomain domain $ do - logInfo_ "Initializing ZMQ client" - env <- ask @env - mapM_ connect env.subEps - mapM_ subscribe env.topics - logTrace_ $ "Listening to " <> pack (show env.subEps) - logTrace_ $ "Subscribed to " <> pack (show env.topics) +runClient action = + withSocket Z.Sub >>= \sock -> + E.evalStaticRep (Client sock) $ do + localDomain domain $ do + logInfo_ "Initializing ZMQ client" + env <- ask @env + forM_ env.subEps $ E.unsafeEff_ . Z.connect sock . unpack + forM_ env.topics $ E.unsafeEff_ . Z.subscribe sock . encodeUtf8 + logTrace_ $ "Listening to " <> pack (show env.subEps) + logTrace_ $ "Subscribed to " <> pack (show env.topics) + action |