From cc639b06c7126fac7b445d8f778455620d7f8f50 Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Sun, 29 Dec 2024 17:05:34 +0000 Subject: Initial --- hsm-core/Hsm/Core/Zmq/Client.hs | 108 ++++++++++++++++++++++++++++++++++++++++ hsm-core/Hsm/Core/Zmq/Server.hs | 81 ++++++++++++++++++++++++++++++ 2 files changed, 189 insertions(+) create mode 100644 hsm-core/Hsm/Core/Zmq/Client.hs create mode 100644 hsm-core/Hsm/Core/Zmq/Server.hs (limited to 'hsm-core/Hsm/Core/Zmq') diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs new file mode 100644 index 0000000..793841e --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq/Client.hs @@ -0,0 +1,108 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeFamilies #-} + +module Hsm.Core.Zmq.Client + ( Client + , receive + , poll + , runClient + ) where + +import Control.Monad.Loops (whileM) +import Data.Binary (Binary) +import Data.Text (Text, pack, unpack) +import Data.Text.Encoding (encodeUtf8) +import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) +import Effectful.Dispatch.Static qualified as E +import Effectful.Log (Log, localDomain, logInfo_, logTrace_) +import Effectful.Reader.Static (Reader, ask) +import Effectful.Resource (Resource) +import GHC.Records (HasField) +import Hsm.Core.Message (body, topic) +import Hsm.Core.Zmq (withSocket) +import Streamly.Data.Stream (Stream, repeatM) +import System.ZMQ4 qualified as Z + +data Client a b + +type instance DispatchOf Client = Static E.WithSideEffects + +newtype instance E.StaticRep Client = + Client (Z.Socket Z.Sub) + +domain :: Text +domain = "client" + +receiver :: + forall es a. (Log :> es, Client :> es, Binary a, Show a) + => Eff es a +receiver = do + Client sock <- E.getStaticRep + message <- E.unsafeEff_ $ Z.receive sock + localDomain domain + $ logTrace_ + $ "Message received [" + <> topic message + <> "]: " + <> pack (show $ body @a message) + return $ body message + +receive :: + forall es a. (Log :> es, Client :> es, Binary a, Show a) + => Stream (Eff es) a +receive = repeatM receiver + +poll :: + forall es a. (Log :> es, Client :> es, Binary a, Show a) + => Stream (Eff es) [a] +poll = repeatM poller + where + newMsg :: Eff es Bool + newMsg = do + Client sock <- E.getStaticRep + peek <- E.unsafeEff_ $ Z.poll 0 [Z.Sock sock [Z.In] Nothing] + return $ peek /= [[]] + -- + poller :: Eff es [a] + poller = do + ms <- whileM newMsg receiver + localDomain domain + $ localDomain "poller" + $ logTrace_ + $ pack (show $ length ms) <> " new message(s) on queue" + return ms + +runClient :: + forall env es a. + ( HasField "subEps" env [Text] + , HasField "topics" env [Text] + , IOE :> es + , Log :> es + , Reader env :> es + , Resource :> es + ) + => Eff (Client : es) a + -> Eff es a +runClient action = withSocket Z.Sub >>= run + where + run :: Z.Socket Z.Sub -> Eff es a + run sock = E.evalStaticRep (Client sock) $ initialize >> action + where + connect :: Text -> Eff (Client : es) () + connect = E.unsafeEff_ . Z.connect sock . unpack + -- + subscribe :: Text -> Eff (Client : es) () + subscribe = E.unsafeEff_ . Z.subscribe sock . encodeUtf8 + -- + initialize :: Eff (Client : es) () + initialize = + localDomain domain $ do + logInfo_ "Initializing ZMQ client" + env <- ask @env + mapM_ connect env.subEps + mapM_ subscribe env.topics + logTrace_ $ "Listening to " <> pack (show env.subEps) + logTrace_ $ "Subscribed to " <> pack (show env.topics) diff --git a/hsm-core/Hsm/Core/Zmq/Server.hs b/hsm-core/Hsm/Core/Zmq/Server.hs new file mode 100644 index 0000000..5663cd8 --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq/Server.hs @@ -0,0 +1,81 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeFamilies #-} + +module Hsm.Core.Zmq.Server + ( Server + , send + , runServer + ) where + +import Data.Binary (Binary) +import Data.Text (Text, pack, unpack) +import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) +import Effectful.Dispatch.Static qualified as E +import Effectful.Log (Log, localDomain, logInfo_, logTrace_) +import Effectful.Reader.Static (Reader, ask) +import Effectful.Resource (Resource) +import GHC.Records (HasField) +import Hsm.Core.Message (message) +import Hsm.Core.Zmq (withSocket) +import Streamly.Data.Fold qualified as S (drain) +import Streamly.Data.Stream qualified as S (Stream, fold, mapM) +import System.ZMQ4 qualified as Z + +data Server a b + +type instance DispatchOf Server = Static E.WithSideEffects + +newtype instance E.StaticRep Server = + Server (Z.Socket Z.Pub) + +domain :: Text +domain = "server" + +send :: + forall env es a. + ( HasField "name" env Text + , Log :> es + , Reader env :> es + , Server :> es + , Binary a + , Show a + ) + => S.Stream (Eff es) a + -> Eff es () +send = S.fold S.drain . S.mapM sender + where + sender :: a -> Eff es () + sender payload = do + Server sock <- E.getStaticRep + env <- ask @env + E.unsafeEff_ $ Z.send sock [] $ message env.name payload + localDomain domain $ logTrace_ $ "Message sent: " <> pack (show payload) + +runServer :: + forall env es a. + ( HasField "pubEp" env Text + , IOE :> es + , Log :> es + , Reader env :> es + , Resource :> es + ) + => Eff (Server : es) a + -> Eff es a +runServer action = withSocket Z.Pub >>= run + where + run :: Z.Socket Z.Pub -> Eff es a + run sock = E.evalStaticRep (Server sock) $ initialize >> action + where + bind :: Text -> Eff (Server : es) () + bind = E.unsafeEff_ . Z.bind sock . unpack + -- + initialize :: Eff (Server : es) () + initialize = + localDomain domain $ do + logInfo_ "Initializing ZMQ server" + env <- ask @env + bind env.pubEp + logTrace_ $ "Publishing to " <> env.pubEp -- cgit v1.2.1