diff options
Diffstat (limited to 'hsm-core')
-rw-r--r-- | hsm-core/Hsm/Core/Env.hs | 7 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Fsm.hs | 9 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Zmq.hs | 25 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Zmq/Client.hs | 59 | ||||
-rw-r--r-- | hsm-core/Hsm/Core/Zmq/Server.hs | 25 |
5 files changed, 45 insertions, 80 deletions
diff --git a/hsm-core/Hsm/Core/Env.hs b/hsm-core/Hsm/Core/Env.hs index 4e7986f..8ef7464 100644 --- a/hsm-core/Hsm/Core/Env.hs +++ b/hsm-core/Hsm/Core/Env.hs @@ -3,9 +3,9 @@ module Hsm.Core.Env , deriveFromYaml ) where -import Data.Aeson (FromJSON, Result(Error, Success), Value, fromJSON) +import Data.Aeson (FromJSON, Result(Error, Success), fromJSON) import Data.Aeson.Key (fromText) -import Data.Aeson.KeyMap (KeyMap, (!?)) +import Data.Aeson.KeyMap ((!?)) import Data.Aeson.TH (defaultOptions, deriveFromJSON, rejectUnknownFields) import Data.Maybe (fromMaybe) import Data.Text (Text, unpack) @@ -15,13 +15,10 @@ import Language.Haskell.TH (Dec, Name, Q) environment :: FromJSON env => Text -> Text -> IO env environment name = fmap (check . fromJSON . load) . decodeFileThrow . unpack where - load :: KeyMap Value -> Value load keymap = fromMaybe (error $ "Service configuration for " <> unpack name <> " not found)") $ keymap !? fromText name - -- - check :: Result env -> env check (Success env) = env check (Error str) = error str diff --git a/hsm-core/Hsm/Core/Fsm.hs b/hsm-core/Hsm/Core/Fsm.hs index 6f9910e..d1c2f5d 100644 --- a/hsm-core/Hsm/Core/Fsm.hs +++ b/hsm-core/Hsm/Core/Fsm.hs @@ -39,23 +39,18 @@ fsm :: -> S.Stream (Eff es) o fsm = S.mapM (return . fromJust) . S.takeWhile isJust . S.mapM run where - exit :: Eff es (Maybe o) exit = do logAttention_ "No state returned, exiting FSM" return Nothing - -- - push :: FsmResult i o env sta -> Eff es (Maybe o) push (FsmResult out sta next) = do put sta put next return $ Just out - -- - run :: i -> Eff es (Maybe o) run input = localDomain "fsm" $ do FsmState name action <- get - sta <- get - env <- ask + sta <- get @sta + env <- ask @env logTrace_ $ "Entering state " <> name FsmOutput res logs <- return $ action input env sta localDomain name $ mapM_ logTup logs diff --git a/hsm-core/Hsm/Core/Zmq.hs b/hsm-core/Hsm/Core/Zmq.hs index 8c12133..2f70d48 100644 --- a/hsm-core/Hsm/Core/Zmq.hs +++ b/hsm-core/Hsm/Core/Zmq.hs @@ -4,7 +4,6 @@ module Hsm.Core.Zmq ( withSocket ) where -import Data.Text (Text) import Effectful (Eff, IOE, (:>)) import Effectful.Log (Log, LogLevel(LogTrace)) import Effectful.Resource (Resource, allocate) @@ -12,23 +11,19 @@ import Hsm.Core.Log (withLogIO) import System.ZMQ4 qualified as Z withSocket :: - forall t es. (Z.SocketType t, IOE :> es, Log :> es, Resource :> es) + (Z.SocketType t, IOE :> es, Log :> es, Resource :> es) => t -> Eff es (Z.Socket t) withSocket stype = withLogIO >>= bracket where - bracket :: (LogLevel -> Text -> IO ()) -> Eff es (Z.Socket t) bracket logIO = snd . snd <$> allocate acquire release where - acquire :: IO (Z.Context, Z.Socket t) - acquire = do - logIO LogTrace "Acquiring ZMQ context" - cont <- Z.context - sock <- Z.socket cont stype - return (cont, sock) - -- - release :: (Z.Context, Z.Socket t) -> IO () - release (cont, sock) = do - logIO LogTrace "Releasing ZMQ context" - Z.close sock - Z.shutdown cont + acquire = + logIO LogTrace "Acquiring ZMQ context" >> do + cont <- Z.context + sock <- Z.socket cont stype + return (cont, sock) + release (cont, sock) = + logIO LogTrace "Releasing ZMQ context" >> do + Z.close sock + Z.shutdown cont diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs index 793841e..6093e54 100644 --- a/hsm-core/Hsm/Core/Zmq/Client.hs +++ b/hsm-core/Hsm/Core/Zmq/Client.hs @@ -11,6 +11,7 @@ module Hsm.Core.Zmq.Client , runClient ) where +import Control.Monad (forM_) import Control.Monad.Loops (whileM) import Data.Binary (Binary) import Data.Text (Text, pack, unpack) @@ -50,30 +51,23 @@ receiver = do <> pack (show $ body @a message) return $ body message -receive :: - forall es a. (Log :> es, Client :> es, Binary a, Show a) - => Stream (Eff es) a +receive :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) a receive = repeatM receiver -poll :: - forall es a. (Log :> es, Client :> es, Binary a, Show a) - => Stream (Eff es) [a] -poll = repeatM poller +poll :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) [a] +poll = + repeatM $ do + ms <- whileM newMsg receiver + localDomain domain + $ localDomain "poller" + $ logTrace_ + $ pack (show $ length ms) <> " new message(s) on queue" + return ms where - newMsg :: Eff es Bool newMsg = do Client sock <- E.getStaticRep peek <- E.unsafeEff_ $ Z.poll 0 [Z.Sock sock [Z.In] Nothing] return $ peek /= [[]] - -- - poller :: Eff es [a] - poller = do - ms <- whileM newMsg receiver - localDomain domain - $ localDomain "poller" - $ logTrace_ - $ pack (show $ length ms) <> " new message(s) on queue" - return ms runClient :: forall env es a. @@ -86,23 +80,14 @@ runClient :: ) => Eff (Client : es) a -> Eff es a -runClient action = withSocket Z.Sub >>= run - where - run :: Z.Socket Z.Sub -> Eff es a - run sock = E.evalStaticRep (Client sock) $ initialize >> action - where - connect :: Text -> Eff (Client : es) () - connect = E.unsafeEff_ . Z.connect sock . unpack - -- - subscribe :: Text -> Eff (Client : es) () - subscribe = E.unsafeEff_ . Z.subscribe sock . encodeUtf8 - -- - initialize :: Eff (Client : es) () - initialize = - localDomain domain $ do - logInfo_ "Initializing ZMQ client" - env <- ask @env - mapM_ connect env.subEps - mapM_ subscribe env.topics - logTrace_ $ "Listening to " <> pack (show env.subEps) - logTrace_ $ "Subscribed to " <> pack (show env.topics) +runClient action = + withSocket Z.Sub >>= \sock -> + E.evalStaticRep (Client sock) $ do + localDomain domain $ do + logInfo_ "Initializing ZMQ client" + env <- ask @env + forM_ env.subEps $ E.unsafeEff_ . Z.connect sock . unpack + forM_ env.topics $ E.unsafeEff_ . Z.subscribe sock . encodeUtf8 + logTrace_ $ "Listening to " <> pack (show env.subEps) + logTrace_ $ "Subscribed to " <> pack (show env.topics) + action diff --git a/hsm-core/Hsm/Core/Zmq/Server.hs b/hsm-core/Hsm/Core/Zmq/Server.hs index 5663cd8..2e9217b 100644 --- a/hsm-core/Hsm/Core/Zmq/Server.hs +++ b/hsm-core/Hsm/Core/Zmq/Server.hs @@ -47,7 +47,6 @@ send :: -> Eff es () send = S.fold S.drain . S.mapM sender where - sender :: a -> Eff es () sender payload = do Server sock <- E.getStaticRep env <- ask @env @@ -64,18 +63,12 @@ runServer :: ) => Eff (Server : es) a -> Eff es a -runServer action = withSocket Z.Pub >>= run - where - run :: Z.Socket Z.Pub -> Eff es a - run sock = E.evalStaticRep (Server sock) $ initialize >> action - where - bind :: Text -> Eff (Server : es) () - bind = E.unsafeEff_ . Z.bind sock . unpack - -- - initialize :: Eff (Server : es) () - initialize = - localDomain domain $ do - logInfo_ "Initializing ZMQ server" - env <- ask @env - bind env.pubEp - logTrace_ $ "Publishing to " <> env.pubEp +runServer action = + withSocket Z.Pub >>= \sock -> + E.evalStaticRep (Server sock) $ do + localDomain domain $ do + logInfo_ "Initializing ZMQ server" + env <- ask @env + E.unsafeEff_ $ Z.bind sock $ unpack env.pubEp + logTrace_ $ "Publishing to " <> env.pubEp + action |