summaryrefslogtreecommitdiff
path: root/hsm-core/Hsm/Core/Zmq.hs
diff options
context:
space:
mode:
Diffstat (limited to 'hsm-core/Hsm/Core/Zmq.hs')
-rw-r--r--hsm-core/Hsm/Core/Zmq.hs80
1 files changed, 80 insertions, 0 deletions
diff --git a/hsm-core/Hsm/Core/Zmq.hs b/hsm-core/Hsm/Core/Zmq.hs
new file mode 100644
index 0000000..69ff59a
--- /dev/null
+++ b/hsm-core/Hsm/Core/Zmq.hs
@@ -0,0 +1,80 @@
+{-# LANGUAGE ImportQualifiedPost #-}
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+-- Module : Hsm.Core.Zmq
+-- Maintainer : contact@pauloliver.dev
+module Hsm.Core.Zmq
+ ( client
+ , server
+ )
+where
+
+import Control.Monad (forM_, forever)
+import Control.Monad.IO.Class (liftIO)
+import Control.Monad.Reader (ask)
+import Data.Binary (Binary)
+import Data.Text (Text, pack, unpack)
+import Data.Text.Encoding (encodeUtf8)
+import Effectful (IOE, (:>))
+import Effectful.Log (Log, LogLevel (LogTrace), localDomain, logInfo_, logTrace_)
+import Effectful.Reader.Static (Reader)
+import Effectful.Resource (Resource, allocate)
+import GHC.Records (HasField)
+import Hsm.Core.LogIO (LoggerIO, getLoggerIO)
+import Hsm.Core.Message (body, message)
+import Hsm.Core.Pipes (Consumer, Producer, Proxy, await, yield)
+import System.ZMQ4 qualified as Z
+
+data ZmqResource t = ZmqResource Z.Context (Z.Socket t)
+
+type ZmqC e es = (IOE :> es, Log :> es, Reader e :> es, Resource :> es)
+
+type UsingC t e es = (Z.SocketType t, ZmqC e es)
+
+type ClientC a e es = (Binary a, HasField "name" e Text, HasField "subEps" e [Text], HasField "topics" e [Text], ZmqC e es)
+
+type ServerC a e es = (Binary a, HasField "name" e Text, HasField "pubEp" e Text, ZmqC e es)
+
+type Action t a' a b' b e s es = Z.Socket t -> e -> Proxy a' a b' b e s es ()
+
+using :: forall t a' a b' b e s es. UsingC t e es => t -> Action t a' a b' b e s es -> Proxy a' a b' b e s es ()
+using stype action = getLoggerIO >>= safely
+ where
+ safely :: LoggerIO -> Proxy a' a b' b e s es ()
+ safely logger =
+ allocate acquire release >>= \(_, ZmqResource _ socket) ->
+ ask >>= action socket
+ where
+ acquire :: IO (ZmqResource t)
+ acquire = do
+ logger LogTrace "Acquiring ZMQ context"
+ context <- Z.context
+ socket <- Z.socket context stype
+ return $ ZmqResource context socket
+
+ release :: ZmqResource t -> IO ()
+ release (ZmqResource context socket) = do
+ logger LogTrace "Releasing ZMQ context"
+ Z.close socket
+ Z.shutdown context
+
+client :: ClientC a e es => Producer a e s es ()
+client =
+ using Z.Sub $ \socket e ->
+ localDomain "client" $ do
+ logInfo_ "Initializing ZMQ client"
+ liftIO $ forM_ e.subEps $ Z.connect socket . unpack
+ liftIO $ forM_ e.topics $ Z.subscribe socket . encodeUtf8
+ logTrace_ $ "Listening to " <> pack (show e.subEps)
+ logTrace_ $ "Subscribed to " <> pack (show e.topics)
+ forever $ liftIO (Z.receive socket) >>= yield . body
+
+server :: ServerC a e es => Consumer a e s es ()
+server =
+ using Z.Pub $ \socket e ->
+ localDomain "server" $ do
+ logInfo_ "Initializing ZMQ server"
+ liftIO $ Z.bind socket $ unpack e.pubEp
+ logTrace_ $ "Publishing to " <> e.pubEp
+ forever $ await >>= liftIO . Z.send socket [] . message e.name