diff options
Diffstat (limited to 'hsm-core/Hsm/Core/Zmq.hs')
-rw-r--r-- | hsm-core/Hsm/Core/Zmq.hs | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/hsm-core/Hsm/Core/Zmq.hs b/hsm-core/Hsm/Core/Zmq.hs new file mode 100644 index 0000000..fe764eb --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq.hs @@ -0,0 +1,175 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeFamilies #-} + +module Hsm.Core.Zmq + ( Client + , receive + , runClient + , Server + , send + , runServer + ) +where + +import Control.Monad (forM_) +import Data.Aeson.Types (Value, emptyObject) +import Data.Binary (Binary) +import Data.Function ((&)) +import Data.Text (Text, show, unpack) +import Data.Text.Encoding (encodeUtf8) +import Data.Time.Clock (UTCTime, getCurrentTime) +import Effectful (Dispatch (Static), DispatchOf, Eff, Effect, IOE, (:>)) +import Effectful.Dispatch.Static qualified as S +import Effectful.Log qualified as L +import Effectful.Reader.Static (Reader, ask) +import Effectful.Resource (Resource, allocate) +import GHC.Records (HasField) +import Hsm.Core.Message (body, message, topic) +import System.ZMQ4 qualified as Z +import Prelude hiding (show) + +type DefaultLoggerIO = UTCTime -> L.LogLevel -> Text -> Value -> IO () + +type LoggerIO = L.LogLevel -> Text -> IO () + +type ZmqResource t = (Z.Context, Z.Socket t) + +type ZmqConstraint es = (IOE :> es, L.Log :> es, Resource :> es) + +withSocket + :: forall t es + . ( Z.SocketType t + , ZmqConstraint es + ) + => t + -> Eff es (Z.Socket t) +withSocket stype = L.getLoggerIO >>= withResource . logIO + where + logIO :: DefaultLoggerIO -> LoggerIO + logIO loggerIO level msg = do + now <- getCurrentTime + loggerIO now level msg emptyObject + + withResource :: LoggerIO -> Eff es (Z.Socket t) + withResource logger = snd . snd <$> allocate acquire release + where + acquire :: IO (ZmqResource t) + acquire = do + logger L.LogTrace "Acquiring ZMQ context" + context <- Z.context + socket <- Z.socket context stype + return (context, socket) + + release :: ZmqResource t -> IO () + release (context, socket) = do + logger L.LogTrace "Releasing ZMQ context" + Z.close socket + Z.shutdown context + +data Client :: Effect + +type instance DispatchOf Client = Static S.WithSideEffects + +newtype instance S.StaticRep Client = Client (Z.Socket Z.Sub) + +clientDomain :: Text +clientDomain = "client" + +receive + :: forall a es + . ( Binary a + , Client :> es + , L.Log :> es + , Show a + ) + => Eff es a +receive = do + Client socket <- S.getStaticRep + msg <- S.unsafeEff_ $ Z.receive socket + "Message received [" <> topic msg <> "]: " <> show (body @a msg) + & L.logTrace_ + & L.localDomain clientDomain + return $ body msg + +runClient + :: forall e es a + . ( HasField "subEps" e [Text] + , HasField "topics" e [Text] + , Reader e :> es + , ZmqConstraint es + ) + => Eff (Client : es) a + -> Eff es a +runClient action = withSocket Z.Sub >>= runAction + where + runAction :: Z.Socket Z.Sub -> Eff es a + runAction socket = S.evalStaticRep (Client socket) $ initialize >> action + where + connect :: Text -> Eff (Client : es) () + connect = S.unsafeEff_ . Z.connect socket . unpack + + subscribe :: Text -> Eff (Client : es) () + subscribe = S.unsafeEff_ . Z.subscribe socket . encodeUtf8 + + initialize :: Eff (Client : es) () + initialize = + L.localDomain clientDomain $ do + L.logInfo_ "Initializing ZMQ client" + env <- ask @e + forM_ env.subEps connect + forM_ env.topics subscribe + L.logTrace_ $ "Listening to " <> show env.subEps + L.logTrace_ $ "Subscribed to " <> show env.topics + +data Server :: Effect + +type instance DispatchOf Server = Static S.WithSideEffects + +newtype instance S.StaticRep Server = Server (Z.Socket Z.Pub) + +serverDomain :: Text +serverDomain = "server" + +send + :: forall a e es + . ( Binary a + , HasField "name" e Text + , L.Log :> es + , Reader e :> es + , Server :> es + , Show a + ) + => a + -> Eff es () +send payload = do + Server s <- S.getStaticRep + env <- ask @e + S.unsafeEff_ $ Z.send s [] $ message env.name payload + L.localDomain serverDomain $ L.logTrace_ $ "Message sent: " <> show payload + +runServer + :: forall e es a + . ( HasField "pubEp" e Text + , Reader e :> es + , ZmqConstraint es + ) + => Eff (Server : es) a + -> Eff es a +runServer action = withSocket Z.Pub >>= runAction + where + runAction :: Z.Socket Z.Pub -> Eff es a + runAction socket = S.evalStaticRep (Server socket) $ initialize >> action + where + bind :: Text -> Eff (Server : es) () + bind = S.unsafeEff_ . Z.bind socket . unpack + + initialize :: Eff (Server : es) () + initialize = + L.localDomain serverDomain $ do + L.logInfo_ "Initializing ZMQ server" + env <- ask @e + bind env.pubEp + L.logTrace_ $ "Publishing to " <> env.pubEp |