aboutsummaryrefslogtreecommitdiff
path: root/hsm-core/Hsm/Core
diff options
context:
space:
mode:
authorPaul Oliver <contact@pauloliver.dev>2025-01-17 14:37:20 -0800
committerPaul Oliver <contact@pauloliver.dev>2025-01-17 19:16:43 -0800
commitebb88408c1d0884b5ca9b7d68bf76d31c33d2e5b (patch)
treec7c2c6b636e8eb89f2d4c6accf77a8c671b8ab9f /hsm-core/Hsm/Core
parentdc6bf1472c930ff1448c419d3205148bce1b787e (diff)
Allows services to publish different topicsHEADmaster
Diffstat (limited to 'hsm-core/Hsm/Core')
-rw-r--r--hsm-core/Hsm/Core/Zmq/Client.hs19
-rw-r--r--hsm-core/Hsm/Core/Zmq/Server.hs27
2 files changed, 16 insertions, 30 deletions
diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs
index 6093e54..97728f7 100644
--- a/hsm-core/Hsm/Core/Zmq/Client.hs
+++ b/hsm-core/Hsm/Core/Zmq/Client.hs
@@ -13,7 +13,7 @@ module Hsm.Core.Zmq.Client
import Control.Monad (forM_)
import Control.Monad.Loops (whileM)
-import Data.Binary (Binary)
+import Data.ByteString (ByteString)
import Data.Text (Text, pack, unpack)
import Data.Text.Encoding (encodeUtf8)
import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>))
@@ -22,7 +22,7 @@ import Effectful.Log (Log, localDomain, logInfo_, logTrace_)
import Effectful.Reader.Static (Reader, ask)
import Effectful.Resource (Resource)
import GHC.Records (HasField)
-import Hsm.Core.Message (body, topic)
+import Hsm.Core.Message (topic)
import Hsm.Core.Zmq (withSocket)
import Streamly.Data.Stream (Stream, repeatM)
import System.ZMQ4 qualified as Z
@@ -37,24 +37,19 @@ newtype instance E.StaticRep Client =
domain :: Text
domain = "client"
-receiver ::
- forall es a. (Log :> es, Client :> es, Binary a, Show a)
- => Eff es a
+receiver :: (Log :> es, Client :> es) => Eff es ByteString
receiver = do
Client sock <- E.getStaticRep
message <- E.unsafeEff_ $ Z.receive sock
localDomain domain
$ logTrace_
- $ "Message received ["
- <> topic message
- <> "]: "
- <> pack (show $ body @a message)
- return $ body message
+ $ "Message received with topic: " <> topic message
+ return message
-receive :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) a
+receive :: (Log :> es, Client :> es) => Stream (Eff es) ByteString
receive = repeatM receiver
-poll :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) [a]
+poll :: (Log :> es, Client :> es) => Stream (Eff es) [ByteString]
poll =
repeatM $ do
ms <- whileM newMsg receiver
diff --git a/hsm-core/Hsm/Core/Zmq/Server.hs b/hsm-core/Hsm/Core/Zmq/Server.hs
index 2e9217b..6ea8aeb 100644
--- a/hsm-core/Hsm/Core/Zmq/Server.hs
+++ b/hsm-core/Hsm/Core/Zmq/Server.hs
@@ -10,15 +10,15 @@ module Hsm.Core.Zmq.Server
, runServer
) where
-import Data.Binary (Binary)
-import Data.Text (Text, pack, unpack)
+import Data.ByteString (ByteString)
+import Data.Text (Text, unpack)
import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>))
import Effectful.Dispatch.Static qualified as E
import Effectful.Log (Log, localDomain, logInfo_, logTrace_)
import Effectful.Reader.Static (Reader, ask)
import Effectful.Resource (Resource)
import GHC.Records (HasField)
-import Hsm.Core.Message (message)
+import Hsm.Core.Message (topic)
import Hsm.Core.Zmq (withSocket)
import Streamly.Data.Fold qualified as S (drain)
import Streamly.Data.Stream qualified as S (Stream, fold, mapM)
@@ -34,24 +34,15 @@ newtype instance E.StaticRep Server =
domain :: Text
domain = "server"
-send ::
- forall env es a.
- ( HasField "name" env Text
- , Log :> es
- , Reader env :> es
- , Server :> es
- , Binary a
- , Show a
- )
- => S.Stream (Eff es) a
- -> Eff es ()
+send :: (Log :> es, Server :> es) => S.Stream (Eff es) ByteString -> Eff es ()
send = S.fold S.drain . S.mapM sender
where
- sender payload = do
+ sender message = do
Server sock <- E.getStaticRep
- env <- ask @env
- E.unsafeEff_ $ Z.send sock [] $ message env.name payload
- localDomain domain $ logTrace_ $ "Message sent: " <> pack (show payload)
+ E.unsafeEff_ $ Z.send sock [] message
+ localDomain domain
+ $ logTrace_
+ $ "Message sent with topic: " <> topic message
runServer ::
forall env es a.