{-# LANGUAGE AllowAmbiguousTypes #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TypeFamilies #-} module Hsm.Core.Zmq.Client ( Client , receive , poll , runClient ) where import Control.Monad (forM_) import Control.Monad.Loops (whileM) import Data.Binary (Binary) import Data.Text (Text, pack, unpack) import Data.Text.Encoding (encodeUtf8) import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) import Effectful.Dispatch.Static qualified as E import Effectful.Log (Log, localDomain, logInfo_, logTrace_) import Effectful.Reader.Static (Reader, ask) import Effectful.Resource (Resource) import GHC.Records (HasField) import Hsm.Core.Message (body, topic) import Hsm.Core.Zmq (withSocket) import Streamly.Data.Stream (Stream, repeatM) import System.ZMQ4 qualified as Z data Client a b type instance DispatchOf Client = Static E.WithSideEffects newtype instance E.StaticRep Client = Client (Z.Socket Z.Sub) domain :: Text domain = "client" receiver :: forall es a. (Log :> es, Client :> es, Binary a, Show a) => Eff es a receiver = do Client sock <- E.getStaticRep message <- E.unsafeEff_ $ Z.receive sock localDomain domain $ logTrace_ $ "Message received [" <> topic message <> "]: " <> pack (show $ body @a message) return $ body message receive :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) a receive = repeatM receiver poll :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) [a] poll = repeatM $ do ms <- whileM newMsg receiver localDomain domain $ localDomain "poller" $ logTrace_ $ pack (show $ length ms) <> " new message(s) on queue" return ms where newMsg = do Client sock <- E.getStaticRep peek <- E.unsafeEff_ $ Z.poll 0 [Z.Sock sock [Z.In] Nothing] return $ peek /= [[]] runClient :: forall env es a. ( HasField "subEps" env [Text] , HasField "topics" env [Text] , IOE :> es , Log :> es , Reader env :> es , Resource :> es ) => Eff (Client : es) a -> Eff es a runClient action = withSocket Z.Sub >>= \sock -> E.evalStaticRep (Client sock) $ do localDomain domain $ do logInfo_ "Initializing ZMQ client" env <- ask @env forM_ env.subEps $ E.unsafeEff_ . Z.connect sock . unpack forM_ env.topics $ E.unsafeEff_ . Z.subscribe sock . encodeUtf8 logTrace_ $ "Listening to " <> pack (show env.subEps) logTrace_ $ "Subscribed to " <> pack (show env.topics) action