diff options
Diffstat (limited to 'hsm-core/Hsm/Core/Zmq/Client.hs')
-rw-r--r-- | hsm-core/Hsm/Core/Zmq/Client.hs | 108 |
1 files changed, 108 insertions, 0 deletions
diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs new file mode 100644 index 0000000..793841e --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq/Client.hs @@ -0,0 +1,108 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeFamilies #-} + +module Hsm.Core.Zmq.Client + ( Client + , receive + , poll + , runClient + ) where + +import Control.Monad.Loops (whileM) +import Data.Binary (Binary) +import Data.Text (Text, pack, unpack) +import Data.Text.Encoding (encodeUtf8) +import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) +import Effectful.Dispatch.Static qualified as E +import Effectful.Log (Log, localDomain, logInfo_, logTrace_) +import Effectful.Reader.Static (Reader, ask) +import Effectful.Resource (Resource) +import GHC.Records (HasField) +import Hsm.Core.Message (body, topic) +import Hsm.Core.Zmq (withSocket) +import Streamly.Data.Stream (Stream, repeatM) +import System.ZMQ4 qualified as Z + +data Client a b + +type instance DispatchOf Client = Static E.WithSideEffects + +newtype instance E.StaticRep Client = + Client (Z.Socket Z.Sub) + +domain :: Text +domain = "client" + +receiver :: + forall es a. (Log :> es, Client :> es, Binary a, Show a) + => Eff es a +receiver = do + Client sock <- E.getStaticRep + message <- E.unsafeEff_ $ Z.receive sock + localDomain domain + $ logTrace_ + $ "Message received [" + <> topic message + <> "]: " + <> pack (show $ body @a message) + return $ body message + +receive :: + forall es a. (Log :> es, Client :> es, Binary a, Show a) + => Stream (Eff es) a +receive = repeatM receiver + +poll :: + forall es a. (Log :> es, Client :> es, Binary a, Show a) + => Stream (Eff es) [a] +poll = repeatM poller + where + newMsg :: Eff es Bool + newMsg = do + Client sock <- E.getStaticRep + peek <- E.unsafeEff_ $ Z.poll 0 [Z.Sock sock [Z.In] Nothing] + return $ peek /= [[]] + -- + poller :: Eff es [a] + poller = do + ms <- whileM newMsg receiver + localDomain domain + $ localDomain "poller" + $ logTrace_ + $ pack (show $ length ms) <> " new message(s) on queue" + return ms + +runClient :: + forall env es a. + ( HasField "subEps" env [Text] + , HasField "topics" env [Text] + , IOE :> es + , Log :> es + , Reader env :> es + , Resource :> es + ) + => Eff (Client : es) a + -> Eff es a +runClient action = withSocket Z.Sub >>= run + where + run :: Z.Socket Z.Sub -> Eff es a + run sock = E.evalStaticRep (Client sock) $ initialize >> action + where + connect :: Text -> Eff (Client : es) () + connect = E.unsafeEff_ . Z.connect sock . unpack + -- + subscribe :: Text -> Eff (Client : es) () + subscribe = E.unsafeEff_ . Z.subscribe sock . encodeUtf8 + -- + initialize :: Eff (Client : es) () + initialize = + localDomain domain $ do + logInfo_ "Initializing ZMQ client" + env <- ask @env + mapM_ connect env.subEps + mapM_ subscribe env.topics + logTrace_ $ "Listening to " <> pack (show env.subEps) + logTrace_ $ "Subscribed to " <> pack (show env.topics) |