diff options
Diffstat (limited to 'hsm-core/Hsm/Core/Zmq')
-rw-r--r-- | hsm-core/Hsm/Core/Zmq/Client.hs | 59 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Zmq/Server.hs | 25 |
2 files changed, 31 insertions, 53 deletions
diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs index 793841e..6093e54 100644 --- a/hsm-core/Hsm/Core/Zmq/Client.hs +++ b/hsm-core/Hsm/Core/Zmq/Client.hs @@ -11,6 +11,7 @@ module Hsm.Core.Zmq.Client , runClient ) where +import Control.Monad (forM_) import Control.Monad.Loops (whileM) import Data.Binary (Binary) import Data.Text (Text, pack, unpack) @@ -50,30 +51,23 @@ receiver = do <> pack (show $ body @a message) return $ body message -receive :: - forall es a. (Log :> es, Client :> es, Binary a, Show a) - => Stream (Eff es) a +receive :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) a receive = repeatM receiver -poll :: - forall es a. (Log :> es, Client :> es, Binary a, Show a) - => Stream (Eff es) [a] -poll = repeatM poller +poll :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) [a] +poll = + repeatM $ do + ms <- whileM newMsg receiver + localDomain domain + $ localDomain "poller" + $ logTrace_ + $ pack (show $ length ms) <> " new message(s) on queue" + return ms where - newMsg :: Eff es Bool newMsg = do Client sock <- E.getStaticRep peek <- E.unsafeEff_ $ Z.poll 0 [Z.Sock sock [Z.In] Nothing] return $ peek /= [[]] - -- - poller :: Eff es [a] - poller = do - ms <- whileM newMsg receiver - localDomain domain - $ localDomain "poller" - $ logTrace_ - $ pack (show $ length ms) <> " new message(s) on queue" - return ms runClient :: forall env es a. @@ -86,23 +80,14 @@ runClient :: ) => Eff (Client : es) a -> Eff es a -runClient action = withSocket Z.Sub >>= run - where - run :: Z.Socket Z.Sub -> Eff es a - run sock = E.evalStaticRep (Client sock) $ initialize >> action - where - connect :: Text -> Eff (Client : es) () - connect = E.unsafeEff_ . Z.connect sock . unpack - -- - subscribe :: Text -> Eff (Client : es) () - subscribe = E.unsafeEff_ . Z.subscribe sock . encodeUtf8 - -- - initialize :: Eff (Client : es) () - initialize = - localDomain domain $ do - logInfo_ "Initializing ZMQ client" - env <- ask @env - mapM_ connect env.subEps - mapM_ subscribe env.topics - logTrace_ $ "Listening to " <> pack (show env.subEps) - logTrace_ $ "Subscribed to " <> pack (show env.topics) +runClient action = + withSocket Z.Sub >>= \sock -> + E.evalStaticRep (Client sock) $ do + localDomain domain $ do + logInfo_ "Initializing ZMQ client" + env <- ask @env + forM_ env.subEps $ E.unsafeEff_ . Z.connect sock . unpack + forM_ env.topics $ E.unsafeEff_ . Z.subscribe sock . encodeUtf8 + logTrace_ $ "Listening to " <> pack (show env.subEps) + logTrace_ $ "Subscribed to " <> pack (show env.topics) + action diff --git a/hsm-core/Hsm/Core/Zmq/Server.hs b/hsm-core/Hsm/Core/Zmq/Server.hs index 5663cd8..2e9217b 100644 --- a/hsm-core/Hsm/Core/Zmq/Server.hs +++ b/hsm-core/Hsm/Core/Zmq/Server.hs @@ -47,7 +47,6 @@ send :: -> Eff es () send = S.fold S.drain . S.mapM sender where - sender :: a -> Eff es () sender payload = do Server sock <- E.getStaticRep env <- ask @env @@ -64,18 +63,12 @@ runServer :: ) => Eff (Server : es) a -> Eff es a -runServer action = withSocket Z.Pub >>= run - where - run :: Z.Socket Z.Pub -> Eff es a - run sock = E.evalStaticRep (Server sock) $ initialize >> action - where - bind :: Text -> Eff (Server : es) () - bind = E.unsafeEff_ . Z.bind sock . unpack - -- - initialize :: Eff (Server : es) () - initialize = - localDomain domain $ do - logInfo_ "Initializing ZMQ server" - env <- ask @env - bind env.pubEp - logTrace_ $ "Publishing to " <> env.pubEp +runServer action = + withSocket Z.Pub >>= \sock -> + E.evalStaticRep (Server sock) $ do + localDomain domain $ do + logInfo_ "Initializing ZMQ server" + env <- ask @env + E.unsafeEff_ $ Z.bind sock $ unpack env.pubEp + logTrace_ $ "Publishing to " <> env.pubEp + action |