{-# LANGUAGE AllowAmbiguousTypes #-} {-# LANGUAGE ImportQualifiedPost #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TypeFamilies #-} module Hsm.Core.Zmq ( Client , receive , runClient , Server , send , runServer ) where import Control.Monad (forM_) import Data.Aeson.Types (Value, emptyObject) import Data.Binary (Binary) import Data.Function ((&)) import Data.Text (Text, show, unpack) import Data.Text.Encoding (encodeUtf8) import Data.Time.Clock (UTCTime, getCurrentTime) import Effectful (Dispatch (Static), DispatchOf, Eff, Effect, IOE, (:>)) import Effectful.Dispatch.Static qualified as S import Effectful.Log qualified as L import Effectful.Reader.Static (Reader, ask) import Effectful.Resource (Resource, allocate) import GHC.Records (HasField) import Hsm.Core.Message (body, message, topic) import System.ZMQ4 qualified as Z import Prelude hiding (show) type DefaultLoggerIO = UTCTime -> L.LogLevel -> Text -> Value -> IO () type LoggerIO = L.LogLevel -> Text -> IO () type ZmqResource t = (Z.Context, Z.Socket t) type ZmqConstraint es = (IOE :> es, L.Log :> es, Resource :> es) withSocket :: forall t es . ( Z.SocketType t , ZmqConstraint es ) => t -> Eff es (Z.Socket t) withSocket stype = L.getLoggerIO >>= withResource . logIO where logIO :: DefaultLoggerIO -> LoggerIO logIO loggerIO level msg = do now <- getCurrentTime loggerIO now level msg emptyObject withResource :: LoggerIO -> Eff es (Z.Socket t) withResource logger = snd . snd <$> allocate acquire release where acquire :: IO (ZmqResource t) acquire = do logger L.LogTrace "Acquiring ZMQ context" context <- Z.context socket <- Z.socket context stype return (context, socket) release :: ZmqResource t -> IO () release (context, socket) = do logger L.LogTrace "Releasing ZMQ context" Z.close socket Z.shutdown context data Client :: Effect type instance DispatchOf Client = Static S.WithSideEffects newtype instance S.StaticRep Client = Client (Z.Socket Z.Sub) clientDomain :: Text clientDomain = "client" receive :: forall a es . ( Binary a , Client :> es , L.Log :> es , Show a ) => Eff es a receive = do Client socket <- S.getStaticRep msg <- S.unsafeEff_ $ Z.receive socket "Message received [" <> topic msg <> "]: " <> show (body @a msg) & L.logTrace_ & L.localDomain clientDomain return $ body msg runClient :: forall e es a . ( HasField "subEps" e [Text] , HasField "topics" e [Text] , Reader e :> es , ZmqConstraint es ) => Eff (Client : es) a -> Eff es a runClient action = withSocket Z.Sub >>= runAction where runAction :: Z.Socket Z.Sub -> Eff es a runAction socket = S.evalStaticRep (Client socket) $ initialize >> action where connect :: Text -> Eff (Client : es) () connect = S.unsafeEff_ . Z.connect socket . unpack subscribe :: Text -> Eff (Client : es) () subscribe = S.unsafeEff_ . Z.subscribe socket . encodeUtf8 initialize :: Eff (Client : es) () initialize = L.localDomain clientDomain $ do L.logInfo_ "Initializing ZMQ client" env <- ask @e forM_ env.subEps connect forM_ env.topics subscribe L.logTrace_ $ "Listening to " <> show env.subEps L.logTrace_ $ "Subscribed to " <> show env.topics data Server :: Effect type instance DispatchOf Server = Static S.WithSideEffects newtype instance S.StaticRep Server = Server (Z.Socket Z.Pub) serverDomain :: Text serverDomain = "server" send :: forall a e es . ( Binary a , HasField "name" e Text , L.Log :> es , Reader e :> es , Server :> es , Show a ) => a -> Eff es () send payload = do Server s <- S.getStaticRep env <- ask @e S.unsafeEff_ $ Z.send s [] $ message env.name payload L.localDomain serverDomain $ L.logTrace_ $ "Message sent: " <> show payload runServer :: forall e es a . ( HasField "pubEp" e Text , Reader e :> es , ZmqConstraint es ) => Eff (Server : es) a -> Eff es a runServer action = withSocket Z.Pub >>= runAction where runAction :: Z.Socket Z.Pub -> Eff es a runAction socket = S.evalStaticRep (Server socket) $ initialize >> action where bind :: Text -> Eff (Server : es) () bind = S.unsafeEff_ . Z.bind socket . unpack initialize :: Eff (Server : es) () initialize = L.localDomain serverDomain $ do L.logInfo_ "Initializing ZMQ server" env <- ask @e bind env.pubEp L.logTrace_ $ "Publishing to " <> env.pubEp