{-# LANGUAGE AllowAmbiguousTypes #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TypeFamilies #-} module Hsm.Core.Zmq.Server ( Server , send , runServer ) where import Data.ByteString (ByteString) import Data.Text (Text, unpack) import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) import Effectful.Dispatch.Static qualified as E import Effectful.Log (Log, localDomain, logInfo_, logTrace_) import Effectful.Reader.Static (Reader, ask) import Effectful.Resource (Resource) import GHC.Records (HasField) import Hsm.Core.Message (topic) import Hsm.Core.Zmq (withSocket) import Streamly.Data.Fold qualified as S (drain) import Streamly.Data.Stream qualified as S (Stream, fold, mapM) import System.ZMQ4 qualified as Z data Server a b type instance DispatchOf Server = Static E.WithSideEffects newtype instance E.StaticRep Server = Server (Z.Socket Z.Pub) domain :: Text domain = "server" send :: (Log :> es, Server :> es) => S.Stream (Eff es) ByteString -> Eff es () send = S.fold S.drain . S.mapM sender where sender message = do Server sock <- E.getStaticRep E.unsafeEff_ $ Z.send sock [] message localDomain domain $ logTrace_ $ "Message sent with topic: " <> topic message runServer :: forall env es a. ( HasField "pubEp" env Text , IOE :> es , Log :> es , Reader env :> es , Resource :> es ) => Eff (Server : es) a -> Eff es a runServer action = withSocket Z.Pub >>= \sock -> E.evalStaticRep (Server sock) $ do localDomain domain $ do logInfo_ "Initializing ZMQ server" env <- ask @env E.unsafeEff_ $ Z.bind sock $ unpack env.pubEp logTrace_ $ "Publishing to " <> env.pubEp action