From ebb88408c1d0884b5ca9b7d68bf76d31c33d2e5b Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Fri, 17 Jan 2025 14:37:20 -0800 Subject: Allows services to publish different topics --- hsm-core/Hsm/Core/Zmq/Client.hs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) (limited to 'hsm-core/Hsm/Core/Zmq/Client.hs') diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs index 6093e54..97728f7 100644 --- a/hsm-core/Hsm/Core/Zmq/Client.hs +++ b/hsm-core/Hsm/Core/Zmq/Client.hs @@ -13,7 +13,7 @@ module Hsm.Core.Zmq.Client import Control.Monad (forM_) import Control.Monad.Loops (whileM) -import Data.Binary (Binary) +import Data.ByteString (ByteString) import Data.Text (Text, pack, unpack) import Data.Text.Encoding (encodeUtf8) import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) @@ -22,7 +22,7 @@ import Effectful.Log (Log, localDomain, logInfo_, logTrace_) import Effectful.Reader.Static (Reader, ask) import Effectful.Resource (Resource) import GHC.Records (HasField) -import Hsm.Core.Message (body, topic) +import Hsm.Core.Message (topic) import Hsm.Core.Zmq (withSocket) import Streamly.Data.Stream (Stream, repeatM) import System.ZMQ4 qualified as Z @@ -37,24 +37,19 @@ newtype instance E.StaticRep Client = domain :: Text domain = "client" -receiver :: - forall es a. (Log :> es, Client :> es, Binary a, Show a) - => Eff es a +receiver :: (Log :> es, Client :> es) => Eff es ByteString receiver = do Client sock <- E.getStaticRep message <- E.unsafeEff_ $ Z.receive sock localDomain domain $ logTrace_ - $ "Message received [" - <> topic message - <> "]: " - <> pack (show $ body @a message) - return $ body message + $ "Message received with topic: " <> topic message + return message -receive :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) a +receive :: (Log :> es, Client :> es) => Stream (Eff es) ByteString receive = repeatM receiver -poll :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) [a] +poll :: (Log :> es, Client :> es) => Stream (Eff es) [ByteString] poll = repeatM $ do ms <- whileM newMsg receiver -- cgit v1.2.1