aboutsummaryrefslogtreecommitdiff
path: root/hsm-core/Hsm/Core/Zmq
diff options
context:
space:
mode:
Diffstat (limited to 'hsm-core/Hsm/Core/Zmq')
-rw-r--r--hsm-core/Hsm/Core/Zmq/Client.hs108
-rw-r--r--hsm-core/Hsm/Core/Zmq/Server.hs81
2 files changed, 189 insertions, 0 deletions
diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs
new file mode 100644
index 0000000..793841e
--- /dev/null
+++ b/hsm-core/Hsm/Core/Zmq/Client.hs
@@ -0,0 +1,108 @@
+{-# LANGUAGE AllowAmbiguousTypes #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TypeFamilies #-}
+
+module Hsm.Core.Zmq.Client
+ ( Client
+ , receive
+ , poll
+ , runClient
+ ) where
+
+import Control.Monad.Loops (whileM)
+import Data.Binary (Binary)
+import Data.Text (Text, pack, unpack)
+import Data.Text.Encoding (encodeUtf8)
+import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>))
+import Effectful.Dispatch.Static qualified as E
+import Effectful.Log (Log, localDomain, logInfo_, logTrace_)
+import Effectful.Reader.Static (Reader, ask)
+import Effectful.Resource (Resource)
+import GHC.Records (HasField)
+import Hsm.Core.Message (body, topic)
+import Hsm.Core.Zmq (withSocket)
+import Streamly.Data.Stream (Stream, repeatM)
+import System.ZMQ4 qualified as Z
+
+data Client a b
+
+type instance DispatchOf Client = Static E.WithSideEffects
+
+newtype instance E.StaticRep Client =
+ Client (Z.Socket Z.Sub)
+
+domain :: Text
+domain = "client"
+
+receiver ::
+ forall es a. (Log :> es, Client :> es, Binary a, Show a)
+ => Eff es a
+receiver = do
+ Client sock <- E.getStaticRep
+ message <- E.unsafeEff_ $ Z.receive sock
+ localDomain domain
+ $ logTrace_
+ $ "Message received ["
+ <> topic message
+ <> "]: "
+ <> pack (show $ body @a message)
+ return $ body message
+
+receive ::
+ forall es a. (Log :> es, Client :> es, Binary a, Show a)
+ => Stream (Eff es) a
+receive = repeatM receiver
+
+poll ::
+ forall es a. (Log :> es, Client :> es, Binary a, Show a)
+ => Stream (Eff es) [a]
+poll = repeatM poller
+ where
+ newMsg :: Eff es Bool
+ newMsg = do
+ Client sock <- E.getStaticRep
+ peek <- E.unsafeEff_ $ Z.poll 0 [Z.Sock sock [Z.In] Nothing]
+ return $ peek /= [[]]
+ --
+ poller :: Eff es [a]
+ poller = do
+ ms <- whileM newMsg receiver
+ localDomain domain
+ $ localDomain "poller"
+ $ logTrace_
+ $ pack (show $ length ms) <> " new message(s) on queue"
+ return ms
+
+runClient ::
+ forall env es a.
+ ( HasField "subEps" env [Text]
+ , HasField "topics" env [Text]
+ , IOE :> es
+ , Log :> es
+ , Reader env :> es
+ , Resource :> es
+ )
+ => Eff (Client : es) a
+ -> Eff es a
+runClient action = withSocket Z.Sub >>= run
+ where
+ run :: Z.Socket Z.Sub -> Eff es a
+ run sock = E.evalStaticRep (Client sock) $ initialize >> action
+ where
+ connect :: Text -> Eff (Client : es) ()
+ connect = E.unsafeEff_ . Z.connect sock . unpack
+ --
+ subscribe :: Text -> Eff (Client : es) ()
+ subscribe = E.unsafeEff_ . Z.subscribe sock . encodeUtf8
+ --
+ initialize :: Eff (Client : es) ()
+ initialize =
+ localDomain domain $ do
+ logInfo_ "Initializing ZMQ client"
+ env <- ask @env
+ mapM_ connect env.subEps
+ mapM_ subscribe env.topics
+ logTrace_ $ "Listening to " <> pack (show env.subEps)
+ logTrace_ $ "Subscribed to " <> pack (show env.topics)
diff --git a/hsm-core/Hsm/Core/Zmq/Server.hs b/hsm-core/Hsm/Core/Zmq/Server.hs
new file mode 100644
index 0000000..5663cd8
--- /dev/null
+++ b/hsm-core/Hsm/Core/Zmq/Server.hs
@@ -0,0 +1,81 @@
+{-# LANGUAGE AllowAmbiguousTypes #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TypeFamilies #-}
+
+module Hsm.Core.Zmq.Server
+ ( Server
+ , send
+ , runServer
+ ) where
+
+import Data.Binary (Binary)
+import Data.Text (Text, pack, unpack)
+import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>))
+import Effectful.Dispatch.Static qualified as E
+import Effectful.Log (Log, localDomain, logInfo_, logTrace_)
+import Effectful.Reader.Static (Reader, ask)
+import Effectful.Resource (Resource)
+import GHC.Records (HasField)
+import Hsm.Core.Message (message)
+import Hsm.Core.Zmq (withSocket)
+import Streamly.Data.Fold qualified as S (drain)
+import Streamly.Data.Stream qualified as S (Stream, fold, mapM)
+import System.ZMQ4 qualified as Z
+
+data Server a b
+
+type instance DispatchOf Server = Static E.WithSideEffects
+
+newtype instance E.StaticRep Server =
+ Server (Z.Socket Z.Pub)
+
+domain :: Text
+domain = "server"
+
+send ::
+ forall env es a.
+ ( HasField "name" env Text
+ , Log :> es
+ , Reader env :> es
+ , Server :> es
+ , Binary a
+ , Show a
+ )
+ => S.Stream (Eff es) a
+ -> Eff es ()
+send = S.fold S.drain . S.mapM sender
+ where
+ sender :: a -> Eff es ()
+ sender payload = do
+ Server sock <- E.getStaticRep
+ env <- ask @env
+ E.unsafeEff_ $ Z.send sock [] $ message env.name payload
+ localDomain domain $ logTrace_ $ "Message sent: " <> pack (show payload)
+
+runServer ::
+ forall env es a.
+ ( HasField "pubEp" env Text
+ , IOE :> es
+ , Log :> es
+ , Reader env :> es
+ , Resource :> es
+ )
+ => Eff (Server : es) a
+ -> Eff es a
+runServer action = withSocket Z.Pub >>= run
+ where
+ run :: Z.Socket Z.Pub -> Eff es a
+ run sock = E.evalStaticRep (Server sock) $ initialize >> action
+ where
+ bind :: Text -> Eff (Server : es) ()
+ bind = E.unsafeEff_ . Z.bind sock . unpack
+ --
+ initialize :: Eff (Server : es) ()
+ initialize =
+ localDomain domain $ do
+ logInfo_ "Initializing ZMQ server"
+ env <- ask @env
+ bind env.pubEp
+ logTrace_ $ "Publishing to " <> env.pubEp