diff options
Diffstat (limited to 'hsm-core/Hsm/Core/Zmq/Server.hs')
-rw-r--r-- | hsm-core/Hsm/Core/Zmq/Server.hs | 81 |
1 files changed, 81 insertions, 0 deletions
diff --git a/hsm-core/Hsm/Core/Zmq/Server.hs b/hsm-core/Hsm/Core/Zmq/Server.hs new file mode 100644 index 0000000..5663cd8 --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq/Server.hs @@ -0,0 +1,81 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeFamilies #-} + +module Hsm.Core.Zmq.Server + ( Server + , send + , runServer + ) where + +import Data.Binary (Binary) +import Data.Text (Text, pack, unpack) +import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) +import Effectful.Dispatch.Static qualified as E +import Effectful.Log (Log, localDomain, logInfo_, logTrace_) +import Effectful.Reader.Static (Reader, ask) +import Effectful.Resource (Resource) +import GHC.Records (HasField) +import Hsm.Core.Message (message) +import Hsm.Core.Zmq (withSocket) +import Streamly.Data.Fold qualified as S (drain) +import Streamly.Data.Stream qualified as S (Stream, fold, mapM) +import System.ZMQ4 qualified as Z + +data Server a b + +type instance DispatchOf Server = Static E.WithSideEffects + +newtype instance E.StaticRep Server = + Server (Z.Socket Z.Pub) + +domain :: Text +domain = "server" + +send :: + forall env es a. + ( HasField "name" env Text + , Log :> es + , Reader env :> es + , Server :> es + , Binary a + , Show a + ) + => S.Stream (Eff es) a + -> Eff es () +send = S.fold S.drain . S.mapM sender + where + sender :: a -> Eff es () + sender payload = do + Server sock <- E.getStaticRep + env <- ask @env + E.unsafeEff_ $ Z.send sock [] $ message env.name payload + localDomain domain $ logTrace_ $ "Message sent: " <> pack (show payload) + +runServer :: + forall env es a. + ( HasField "pubEp" env Text + , IOE :> es + , Log :> es + , Reader env :> es + , Resource :> es + ) + => Eff (Server : es) a + -> Eff es a +runServer action = withSocket Z.Pub >>= run + where + run :: Z.Socket Z.Pub -> Eff es a + run sock = E.evalStaticRep (Server sock) $ initialize >> action + where + bind :: Text -> Eff (Server : es) () + bind = E.unsafeEff_ . Z.bind sock . unpack + -- + initialize :: Eff (Server : es) () + initialize = + localDomain domain $ do + logInfo_ "Initializing ZMQ server" + env <- ask @env + bind env.pubEp + logTrace_ $ "Publishing to " <> env.pubEp |