{-# LANGUAGE AllowAmbiguousTypes #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TypeFamilies #-} module Hsm.Core.Zmq.Server ( Server , send , runServer ) where import Data.Binary (Binary) import Data.Text (Text, pack, unpack) import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) import Effectful.Dispatch.Static qualified as E import Effectful.Log (Log, localDomain, logInfo_, logTrace_) import Effectful.Reader.Static (Reader, ask) import Effectful.Resource (Resource) import GHC.Records (HasField) import Hsm.Core.Message (message) import Hsm.Core.Zmq (withSocket) import Streamly.Data.Fold qualified as S (drain) import Streamly.Data.Stream qualified as S (Stream, fold, mapM) import System.ZMQ4 qualified as Z data Server a b type instance DispatchOf Server = Static E.WithSideEffects newtype instance E.StaticRep Server = Server (Z.Socket Z.Pub) domain :: Text domain = "server" send :: forall env es a. ( HasField "name" env Text , Log :> es , Reader env :> es , Server :> es , Binary a , Show a ) => S.Stream (Eff es) a -> Eff es () send = S.fold S.drain . S.mapM sender where sender :: a -> Eff es () sender payload = do Server sock <- E.getStaticRep env <- ask @env E.unsafeEff_ $ Z.send sock [] $ message env.name payload localDomain domain $ logTrace_ $ "Message sent: " <> pack (show payload) runServer :: forall env es a. ( HasField "pubEp" env Text , IOE :> es , Log :> es , Reader env :> es , Resource :> es ) => Eff (Server : es) a -> Eff es a runServer action = withSocket Z.Pub >>= run where run :: Z.Socket Z.Pub -> Eff es a run sock = E.evalStaticRep (Server sock) $ initialize >> action where bind :: Text -> Eff (Server : es) () bind = E.unsafeEff_ . Z.bind sock . unpack -- initialize :: Eff (Server : es) () initialize = localDomain domain $ do logInfo_ "Initializing ZMQ server" env <- ask @env bind env.pubEp logTrace_ $ "Publishing to " <> env.pubEp