{-# LANGUAGE ImportQualifiedPost #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} -- Module : Hsm.Core.Zmq -- Maintainer : contact@pauloliver.dev module Hsm.Core.Zmq ( client , server ) where import Control.Monad (forM_, forever) import Control.Monad.IO.Class (liftIO) import Control.Monad.Reader (ask) import Data.Binary (Binary) import Data.Text (Text, pack, unpack) import Data.Text.Encoding (encodeUtf8) import Effectful (IOE, (:>)) import Effectful.Log (Log, LogLevel (LogTrace), localDomain, logInfo_, logTrace_) import Effectful.Reader.Static (Reader) import Effectful.Resource (Resource, allocate) import GHC.Records (HasField) import Hsm.Core.LogIO (LoggerIO, getLoggerIO) import Hsm.Core.Message (body, message) import Hsm.Core.Pipes (Consumer, Producer, Proxy, await, yield) import System.ZMQ4 qualified as Z data ZmqResource t = ZmqResource Z.Context (Z.Socket t) type ZmqC e es = (IOE :> es, Log :> es, Reader e :> es, Resource :> es) type UsingC t e es = (Z.SocketType t, ZmqC e es) type ClientC a e es = (Binary a, HasField "name" e Text, HasField "subEps" e [Text], HasField "topics" e [Text], ZmqC e es) type ServerC a e es = (Binary a, HasField "name" e Text, HasField "pubEp" e Text, ZmqC e es) type Action t a' a b' b e s es = Z.Socket t -> e -> Proxy a' a b' b e s es () using :: forall t a' a b' b e s es. UsingC t e es => t -> Action t a' a b' b e s es -> Proxy a' a b' b e s es () using stype action = getLoggerIO >>= safely where safely :: LoggerIO -> Proxy a' a b' b e s es () safely logger = allocate acquire release >>= \(_, ZmqResource _ socket) -> ask >>= action socket where acquire :: IO (ZmqResource t) acquire = do logger LogTrace "Acquiring ZMQ context" context <- Z.context socket <- Z.socket context stype return $ ZmqResource context socket release :: ZmqResource t -> IO () release (ZmqResource context socket) = do logger LogTrace "Releasing ZMQ context" Z.close socket Z.shutdown context client :: ClientC a e es => Producer a e s es () client = using Z.Sub $ \socket e -> localDomain "client" $ do logInfo_ "Initializing ZMQ client" liftIO $ forM_ e.subEps $ Z.connect socket . unpack liftIO $ forM_ e.topics $ Z.subscribe socket . encodeUtf8 logTrace_ $ "Listening to " <> pack (show e.subEps) logTrace_ $ "Subscribed to " <> pack (show e.topics) forever $ liftIO (Z.receive socket) >>= yield . body server :: ServerC a e es => Consumer a e s es () server = using Z.Pub $ \socket e -> localDomain "server" $ do logInfo_ "Initializing ZMQ server" liftIO $ Z.bind socket $ unpack e.pubEp logTrace_ $ "Publishing to " <> e.pubEp forever $ await >>= liftIO . Z.send socket [] . message e.name