summaryrefslogtreecommitdiff
path: root/hsm-core/Hsm/Core/Zmq.hs
diff options
context:
space:
mode:
Diffstat (limited to 'hsm-core/Hsm/Core/Zmq.hs')
-rw-r--r--hsm-core/Hsm/Core/Zmq.hs175
1 files changed, 175 insertions, 0 deletions
diff --git a/hsm-core/Hsm/Core/Zmq.hs b/hsm-core/Hsm/Core/Zmq.hs
new file mode 100644
index 0000000..fe764eb
--- /dev/null
+++ b/hsm-core/Hsm/Core/Zmq.hs
@@ -0,0 +1,175 @@
+{-# LANGUAGE AllowAmbiguousTypes #-}
+{-# LANGUAGE ImportQualifiedPost #-}
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TypeFamilies #-}
+
+module Hsm.Core.Zmq
+ ( Client
+ , receive
+ , runClient
+ , Server
+ , send
+ , runServer
+ )
+where
+
+import Control.Monad (forM_)
+import Data.Aeson.Types (Value, emptyObject)
+import Data.Binary (Binary)
+import Data.Function ((&))
+import Data.Text (Text, show, unpack)
+import Data.Text.Encoding (encodeUtf8)
+import Data.Time.Clock (UTCTime, getCurrentTime)
+import Effectful (Dispatch (Static), DispatchOf, Eff, Effect, IOE, (:>))
+import Effectful.Dispatch.Static qualified as S
+import Effectful.Log qualified as L
+import Effectful.Reader.Static (Reader, ask)
+import Effectful.Resource (Resource, allocate)
+import GHC.Records (HasField)
+import Hsm.Core.Message (body, message, topic)
+import System.ZMQ4 qualified as Z
+import Prelude hiding (show)
+
+type DefaultLoggerIO = UTCTime -> L.LogLevel -> Text -> Value -> IO ()
+
+type LoggerIO = L.LogLevel -> Text -> IO ()
+
+type ZmqResource t = (Z.Context, Z.Socket t)
+
+type ZmqConstraint es = (IOE :> es, L.Log :> es, Resource :> es)
+
+withSocket
+ :: forall t es
+ . ( Z.SocketType t
+ , ZmqConstraint es
+ )
+ => t
+ -> Eff es (Z.Socket t)
+withSocket stype = L.getLoggerIO >>= withResource . logIO
+ where
+ logIO :: DefaultLoggerIO -> LoggerIO
+ logIO loggerIO level msg = do
+ now <- getCurrentTime
+ loggerIO now level msg emptyObject
+
+ withResource :: LoggerIO -> Eff es (Z.Socket t)
+ withResource logger = snd . snd <$> allocate acquire release
+ where
+ acquire :: IO (ZmqResource t)
+ acquire = do
+ logger L.LogTrace "Acquiring ZMQ context"
+ context <- Z.context
+ socket <- Z.socket context stype
+ return (context, socket)
+
+ release :: ZmqResource t -> IO ()
+ release (context, socket) = do
+ logger L.LogTrace "Releasing ZMQ context"
+ Z.close socket
+ Z.shutdown context
+
+data Client :: Effect
+
+type instance DispatchOf Client = Static S.WithSideEffects
+
+newtype instance S.StaticRep Client = Client (Z.Socket Z.Sub)
+
+clientDomain :: Text
+clientDomain = "client"
+
+receive
+ :: forall a es
+ . ( Binary a
+ , Client :> es
+ , L.Log :> es
+ , Show a
+ )
+ => Eff es a
+receive = do
+ Client socket <- S.getStaticRep
+ msg <- S.unsafeEff_ $ Z.receive socket
+ "Message received [" <> topic msg <> "]: " <> show (body @a msg)
+ & L.logTrace_
+ & L.localDomain clientDomain
+ return $ body msg
+
+runClient
+ :: forall e es a
+ . ( HasField "subEps" e [Text]
+ , HasField "topics" e [Text]
+ , Reader e :> es
+ , ZmqConstraint es
+ )
+ => Eff (Client : es) a
+ -> Eff es a
+runClient action = withSocket Z.Sub >>= runAction
+ where
+ runAction :: Z.Socket Z.Sub -> Eff es a
+ runAction socket = S.evalStaticRep (Client socket) $ initialize >> action
+ where
+ connect :: Text -> Eff (Client : es) ()
+ connect = S.unsafeEff_ . Z.connect socket . unpack
+
+ subscribe :: Text -> Eff (Client : es) ()
+ subscribe = S.unsafeEff_ . Z.subscribe socket . encodeUtf8
+
+ initialize :: Eff (Client : es) ()
+ initialize =
+ L.localDomain clientDomain $ do
+ L.logInfo_ "Initializing ZMQ client"
+ env <- ask @e
+ forM_ env.subEps connect
+ forM_ env.topics subscribe
+ L.logTrace_ $ "Listening to " <> show env.subEps
+ L.logTrace_ $ "Subscribed to " <> show env.topics
+
+data Server :: Effect
+
+type instance DispatchOf Server = Static S.WithSideEffects
+
+newtype instance S.StaticRep Server = Server (Z.Socket Z.Pub)
+
+serverDomain :: Text
+serverDomain = "server"
+
+send
+ :: forall a e es
+ . ( Binary a
+ , HasField "name" e Text
+ , L.Log :> es
+ , Reader e :> es
+ , Server :> es
+ , Show a
+ )
+ => a
+ -> Eff es ()
+send payload = do
+ Server s <- S.getStaticRep
+ env <- ask @e
+ S.unsafeEff_ $ Z.send s [] $ message env.name payload
+ L.localDomain serverDomain $ L.logTrace_ $ "Message sent: " <> show payload
+
+runServer
+ :: forall e es a
+ . ( HasField "pubEp" e Text
+ , Reader e :> es
+ , ZmqConstraint es
+ )
+ => Eff (Server : es) a
+ -> Eff es a
+runServer action = withSocket Z.Pub >>= runAction
+ where
+ runAction :: Z.Socket Z.Pub -> Eff es a
+ runAction socket = S.evalStaticRep (Server socket) $ initialize >> action
+ where
+ bind :: Text -> Eff (Server : es) ()
+ bind = S.unsafeEff_ . Z.bind socket . unpack
+
+ initialize :: Eff (Server : es) ()
+ initialize =
+ L.localDomain serverDomain $ do
+ L.logInfo_ "Initializing ZMQ server"
+ env <- ask @e
+ bind env.pubEp
+ L.logTrace_ $ "Publishing to " <> env.pubEp