aboutsummaryrefslogtreecommitdiff
path: root/hsm-core/Hsm/Core/Zmq/Client.hs
diff options
context:
space:
mode:
authorPaul Oliver <contact@pauloliver.dev>2025-01-16 19:22:18 -0800
committerPaul Oliver <contact@pauloliver.dev>2025-01-17 19:16:43 -0800
commite3ea039428545e185b38c5633fe3576ab32f1f8e (patch)
tree56ab8d1248b4387ceab6094305e7a75699c4e393 /hsm-core/Hsm/Core/Zmq/Client.hs
parente1fa79eb713c249055fb23fcc6684a94f77d8368 (diff)
Cleans excessive type annotations
Diffstat (limited to 'hsm-core/Hsm/Core/Zmq/Client.hs')
-rw-r--r--hsm-core/Hsm/Core/Zmq/Client.hs59
1 files changed, 22 insertions, 37 deletions
diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs
index 793841e..6093e54 100644
--- a/hsm-core/Hsm/Core/Zmq/Client.hs
+++ b/hsm-core/Hsm/Core/Zmq/Client.hs
@@ -11,6 +11,7 @@ module Hsm.Core.Zmq.Client
, runClient
) where
+import Control.Monad (forM_)
import Control.Monad.Loops (whileM)
import Data.Binary (Binary)
import Data.Text (Text, pack, unpack)
@@ -50,30 +51,23 @@ receiver = do
<> pack (show $ body @a message)
return $ body message
-receive ::
- forall es a. (Log :> es, Client :> es, Binary a, Show a)
- => Stream (Eff es) a
+receive :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) a
receive = repeatM receiver
-poll ::
- forall es a. (Log :> es, Client :> es, Binary a, Show a)
- => Stream (Eff es) [a]
-poll = repeatM poller
+poll :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) [a]
+poll =
+ repeatM $ do
+ ms <- whileM newMsg receiver
+ localDomain domain
+ $ localDomain "poller"
+ $ logTrace_
+ $ pack (show $ length ms) <> " new message(s) on queue"
+ return ms
where
- newMsg :: Eff es Bool
newMsg = do
Client sock <- E.getStaticRep
peek <- E.unsafeEff_ $ Z.poll 0 [Z.Sock sock [Z.In] Nothing]
return $ peek /= [[]]
- --
- poller :: Eff es [a]
- poller = do
- ms <- whileM newMsg receiver
- localDomain domain
- $ localDomain "poller"
- $ logTrace_
- $ pack (show $ length ms) <> " new message(s) on queue"
- return ms
runClient ::
forall env es a.
@@ -86,23 +80,14 @@ runClient ::
)
=> Eff (Client : es) a
-> Eff es a
-runClient action = withSocket Z.Sub >>= run
- where
- run :: Z.Socket Z.Sub -> Eff es a
- run sock = E.evalStaticRep (Client sock) $ initialize >> action
- where
- connect :: Text -> Eff (Client : es) ()
- connect = E.unsafeEff_ . Z.connect sock . unpack
- --
- subscribe :: Text -> Eff (Client : es) ()
- subscribe = E.unsafeEff_ . Z.subscribe sock . encodeUtf8
- --
- initialize :: Eff (Client : es) ()
- initialize =
- localDomain domain $ do
- logInfo_ "Initializing ZMQ client"
- env <- ask @env
- mapM_ connect env.subEps
- mapM_ subscribe env.topics
- logTrace_ $ "Listening to " <> pack (show env.subEps)
- logTrace_ $ "Subscribed to " <> pack (show env.topics)
+runClient action =
+ withSocket Z.Sub >>= \sock ->
+ E.evalStaticRep (Client sock) $ do
+ localDomain domain $ do
+ logInfo_ "Initializing ZMQ client"
+ env <- ask @env
+ forM_ env.subEps $ E.unsafeEff_ . Z.connect sock . unpack
+ forM_ env.topics $ E.unsafeEff_ . Z.subscribe sock . encodeUtf8
+ logTrace_ $ "Listening to " <> pack (show env.subEps)
+ logTrace_ $ "Subscribed to " <> pack (show env.topics)
+ action