aboutsummaryrefslogtreecommitdiff
path: root/hsm-core/Hsm/Core/Zmq/Server.hs
diff options
context:
space:
mode:
authorPaul Oliver <contact@pauloliver.dev>2024-12-29 17:05:34 +0000
committerPaul Oliver <contact@pauloliver.dev>2025-01-16 18:30:09 -0800
commitcc639b06c7126fac7b445d8f778455620d7f8f50 (patch)
treea4c5c7c0b0a9cdb5bea0891e198003035065e57d /hsm-core/Hsm/Core/Zmq/Server.hs
Initial
Diffstat (limited to 'hsm-core/Hsm/Core/Zmq/Server.hs')
-rw-r--r--hsm-core/Hsm/Core/Zmq/Server.hs81
1 files changed, 81 insertions, 0 deletions
diff --git a/hsm-core/Hsm/Core/Zmq/Server.hs b/hsm-core/Hsm/Core/Zmq/Server.hs
new file mode 100644
index 0000000..5663cd8
--- /dev/null
+++ b/hsm-core/Hsm/Core/Zmq/Server.hs
@@ -0,0 +1,81 @@
+{-# LANGUAGE AllowAmbiguousTypes #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TypeFamilies #-}
+
+module Hsm.Core.Zmq.Server
+ ( Server
+ , send
+ , runServer
+ ) where
+
+import Data.Binary (Binary)
+import Data.Text (Text, pack, unpack)
+import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>))
+import Effectful.Dispatch.Static qualified as E
+import Effectful.Log (Log, localDomain, logInfo_, logTrace_)
+import Effectful.Reader.Static (Reader, ask)
+import Effectful.Resource (Resource)
+import GHC.Records (HasField)
+import Hsm.Core.Message (message)
+import Hsm.Core.Zmq (withSocket)
+import Streamly.Data.Fold qualified as S (drain)
+import Streamly.Data.Stream qualified as S (Stream, fold, mapM)
+import System.ZMQ4 qualified as Z
+
+data Server a b
+
+type instance DispatchOf Server = Static E.WithSideEffects
+
+newtype instance E.StaticRep Server =
+ Server (Z.Socket Z.Pub)
+
+domain :: Text
+domain = "server"
+
+send ::
+ forall env es a.
+ ( HasField "name" env Text
+ , Log :> es
+ , Reader env :> es
+ , Server :> es
+ , Binary a
+ , Show a
+ )
+ => S.Stream (Eff es) a
+ -> Eff es ()
+send = S.fold S.drain . S.mapM sender
+ where
+ sender :: a -> Eff es ()
+ sender payload = do
+ Server sock <- E.getStaticRep
+ env <- ask @env
+ E.unsafeEff_ $ Z.send sock [] $ message env.name payload
+ localDomain domain $ logTrace_ $ "Message sent: " <> pack (show payload)
+
+runServer ::
+ forall env es a.
+ ( HasField "pubEp" env Text
+ , IOE :> es
+ , Log :> es
+ , Reader env :> es
+ , Resource :> es
+ )
+ => Eff (Server : es) a
+ -> Eff es a
+runServer action = withSocket Z.Pub >>= run
+ where
+ run :: Z.Socket Z.Pub -> Eff es a
+ run sock = E.evalStaticRep (Server sock) $ initialize >> action
+ where
+ bind :: Text -> Eff (Server : es) ()
+ bind = E.unsafeEff_ . Z.bind sock . unpack
+ --
+ initialize :: Eff (Server : es) ()
+ initialize =
+ localDomain domain $ do
+ logInfo_ "Initializing ZMQ server"
+ env <- ask @env
+ bind env.pubEp
+ logTrace_ $ "Publishing to " <> env.pubEp