From 70d3e37b1a088209fe84abf07a39d14dec116c6b Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Sat, 24 Aug 2024 11:57:18 -0700 Subject: Initial commit --- hsm-core/Hsm/Core/Zmq.hs | 80 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 hsm-core/Hsm/Core/Zmq.hs (limited to 'hsm-core/Hsm/Core/Zmq.hs') diff --git a/hsm-core/Hsm/Core/Zmq.hs b/hsm-core/Hsm/Core/Zmq.hs new file mode 100644 index 0000000..69ff59a --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq.hs @@ -0,0 +1,80 @@ +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} + +-- Module : Hsm.Core.Zmq +-- Maintainer : contact@pauloliver.dev +module Hsm.Core.Zmq + ( client + , server + ) +where + +import Control.Monad (forM_, forever) +import Control.Monad.IO.Class (liftIO) +import Control.Monad.Reader (ask) +import Data.Binary (Binary) +import Data.Text (Text, pack, unpack) +import Data.Text.Encoding (encodeUtf8) +import Effectful (IOE, (:>)) +import Effectful.Log (Log, LogLevel (LogTrace), localDomain, logInfo_, logTrace_) +import Effectful.Reader.Static (Reader) +import Effectful.Resource (Resource, allocate) +import GHC.Records (HasField) +import Hsm.Core.LogIO (LoggerIO, getLoggerIO) +import Hsm.Core.Message (body, message) +import Hsm.Core.Pipes (Consumer, Producer, Proxy, await, yield) +import System.ZMQ4 qualified as Z + +data ZmqResource t = ZmqResource Z.Context (Z.Socket t) + +type ZmqC e es = (IOE :> es, Log :> es, Reader e :> es, Resource :> es) + +type UsingC t e es = (Z.SocketType t, ZmqC e es) + +type ClientC a e es = (Binary a, HasField "name" e Text, HasField "subEps" e [Text], HasField "topics" e [Text], ZmqC e es) + +type ServerC a e es = (Binary a, HasField "name" e Text, HasField "pubEp" e Text, ZmqC e es) + +type Action t a' a b' b e s es = Z.Socket t -> e -> Proxy a' a b' b e s es () + +using :: forall t a' a b' b e s es. UsingC t e es => t -> Action t a' a b' b e s es -> Proxy a' a b' b e s es () +using stype action = getLoggerIO >>= safely + where + safely :: LoggerIO -> Proxy a' a b' b e s es () + safely logger = + allocate acquire release >>= \(_, ZmqResource _ socket) -> + ask >>= action socket + where + acquire :: IO (ZmqResource t) + acquire = do + logger LogTrace "Acquiring ZMQ context" + context <- Z.context + socket <- Z.socket context stype + return $ ZmqResource context socket + + release :: ZmqResource t -> IO () + release (ZmqResource context socket) = do + logger LogTrace "Releasing ZMQ context" + Z.close socket + Z.shutdown context + +client :: ClientC a e es => Producer a e s es () +client = + using Z.Sub $ \socket e -> + localDomain "client" $ do + logInfo_ "Initializing ZMQ client" + liftIO $ forM_ e.subEps $ Z.connect socket . unpack + liftIO $ forM_ e.topics $ Z.subscribe socket . encodeUtf8 + logTrace_ $ "Listening to " <> pack (show e.subEps) + logTrace_ $ "Subscribed to " <> pack (show e.topics) + forever $ liftIO (Z.receive socket) >>= yield . body + +server :: ServerC a e es => Consumer a e s es () +server = + using Z.Pub $ \socket e -> + localDomain "server" $ do + logInfo_ "Initializing ZMQ server" + liftIO $ Z.bind socket $ unpack e.pubEp + logTrace_ $ "Publishing to " <> e.pubEp + forever $ await >>= liftIO . Z.send socket [] . message e.name -- cgit v1.2.1