diff options
Diffstat (limited to 'hsm-core/Hsm/Core')
| -rw-r--r-- | hsm-core/Hsm/Core/Env.hs | 7 | ||||
| -rw-r--r-- | hsm-core/Hsm/Core/Fsm.hs | 9 | ||||
| -rw-r--r-- | hsm-core/Hsm/Core/Zmq.hs | 25 | ||||
| -rw-r--r-- | hsm-core/Hsm/Core/Zmq/Client.hs | 59 | ||||
| -rw-r--r-- | hsm-core/Hsm/Core/Zmq/Server.hs | 25 | 
5 files changed, 45 insertions, 80 deletions
| diff --git a/hsm-core/Hsm/Core/Env.hs b/hsm-core/Hsm/Core/Env.hs index 4e7986f..8ef7464 100644 --- a/hsm-core/Hsm/Core/Env.hs +++ b/hsm-core/Hsm/Core/Env.hs @@ -3,9 +3,9 @@ module Hsm.Core.Env    , deriveFromYaml    ) where -import Data.Aeson (FromJSON, Result(Error, Success), Value, fromJSON) +import Data.Aeson (FromJSON, Result(Error, Success), fromJSON)  import Data.Aeson.Key (fromText) -import Data.Aeson.KeyMap (KeyMap, (!?)) +import Data.Aeson.KeyMap ((!?))  import Data.Aeson.TH (defaultOptions, deriveFromJSON, rejectUnknownFields)  import Data.Maybe (fromMaybe)  import Data.Text (Text, unpack) @@ -15,13 +15,10 @@ import Language.Haskell.TH (Dec, Name, Q)  environment :: FromJSON env => Text -> Text -> IO env  environment name = fmap (check . fromJSON . load) . decodeFileThrow . unpack    where -    load :: KeyMap Value -> Value      load keymap =        fromMaybe          (error $ "Service configuration for " <> unpack name <> " not found)")          $ keymap !? fromText name -    -- -    check :: Result env -> env      check (Success env) = env      check (Error str) = error str diff --git a/hsm-core/Hsm/Core/Fsm.hs b/hsm-core/Hsm/Core/Fsm.hs index 6f9910e..d1c2f5d 100644 --- a/hsm-core/Hsm/Core/Fsm.hs +++ b/hsm-core/Hsm/Core/Fsm.hs @@ -39,23 +39,18 @@ fsm ::    -> S.Stream (Eff es) o  fsm = S.mapM (return . fromJust) . S.takeWhile isJust . S.mapM run    where -    exit :: Eff es (Maybe o)      exit = do        logAttention_ "No state returned, exiting FSM"        return Nothing -    -- -    push :: FsmResult i o env sta -> Eff es (Maybe o)      push (FsmResult out sta next) = do        put sta        put next        return $ Just out -    -- -    run :: i -> Eff es (Maybe o)      run input =        localDomain "fsm" $ do          FsmState name action <- get -        sta <- get -        env <- ask +        sta <- get @sta +        env <- ask @env          logTrace_ $ "Entering state " <> name          FsmOutput res logs <- return $ action input env sta          localDomain name $ mapM_ logTup logs diff --git a/hsm-core/Hsm/Core/Zmq.hs b/hsm-core/Hsm/Core/Zmq.hs index 8c12133..2f70d48 100644 --- a/hsm-core/Hsm/Core/Zmq.hs +++ b/hsm-core/Hsm/Core/Zmq.hs @@ -4,7 +4,6 @@ module Hsm.Core.Zmq    ( withSocket    ) where -import Data.Text (Text)  import Effectful (Eff, IOE, (:>))  import Effectful.Log (Log, LogLevel(LogTrace))  import Effectful.Resource (Resource, allocate) @@ -12,23 +11,19 @@ import Hsm.Core.Log (withLogIO)  import System.ZMQ4 qualified as Z  withSocket :: -     forall t es. (Z.SocketType t, IOE :> es, Log :> es, Resource :> es) +     (Z.SocketType t, IOE :> es, Log :> es, Resource :> es)    => t    -> Eff es (Z.Socket t)  withSocket stype = withLogIO >>= bracket    where -    bracket :: (LogLevel -> Text -> IO ()) -> Eff es (Z.Socket t)      bracket logIO = snd . snd <$> allocate acquire release        where -        acquire :: IO (Z.Context, Z.Socket t) -        acquire = do -          logIO LogTrace "Acquiring ZMQ context" -          cont <- Z.context -          sock <- Z.socket cont stype -          return (cont, sock) -        -- -        release :: (Z.Context, Z.Socket t) -> IO () -        release (cont, sock) = do -          logIO LogTrace "Releasing ZMQ context" -          Z.close sock -          Z.shutdown cont +        acquire = +          logIO LogTrace "Acquiring ZMQ context" >> do +            cont <- Z.context +            sock <- Z.socket cont stype +            return (cont, sock) +        release (cont, sock) = +          logIO LogTrace "Releasing ZMQ context" >> do +            Z.close sock +            Z.shutdown cont diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs index 793841e..6093e54 100644 --- a/hsm-core/Hsm/Core/Zmq/Client.hs +++ b/hsm-core/Hsm/Core/Zmq/Client.hs @@ -11,6 +11,7 @@ module Hsm.Core.Zmq.Client    , runClient    ) where +import Control.Monad (forM_)  import Control.Monad.Loops (whileM)  import Data.Binary (Binary)  import Data.Text (Text, pack, unpack) @@ -50,30 +51,23 @@ receiver = do          <> pack (show $ body @a message)    return $ body message -receive :: -     forall es a. (Log :> es, Client :> es, Binary a, Show a) -  => Stream (Eff es) a +receive :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) a  receive = repeatM receiver -poll :: -     forall es a. (Log :> es, Client :> es, Binary a, Show a) -  => Stream (Eff es) [a] -poll = repeatM poller +poll :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) [a] +poll = +  repeatM $ do +    ms <- whileM newMsg receiver +    localDomain domain +      $ localDomain "poller" +      $ logTrace_ +      $ pack (show $ length ms) <> " new message(s) on queue" +    return ms    where -    newMsg :: Eff es Bool      newMsg = do        Client sock <- E.getStaticRep        peek <- E.unsafeEff_ $ Z.poll 0 [Z.Sock sock [Z.In] Nothing]        return $ peek /= [[]] -    -- -    poller :: Eff es [a] -    poller = do -      ms <- whileM newMsg receiver -      localDomain domain -        $ localDomain "poller" -        $ logTrace_ -        $ pack (show $ length ms) <> " new message(s) on queue" -      return ms  runClient ::       forall env es a. @@ -86,23 +80,14 @@ runClient ::       )    => Eff (Client : es) a    -> Eff es a -runClient action = withSocket Z.Sub >>= run -  where -    run :: Z.Socket Z.Sub -> Eff es a -    run sock = E.evalStaticRep (Client sock) $ initialize >> action -      where -        connect :: Text -> Eff (Client : es) () -        connect = E.unsafeEff_ . Z.connect sock . unpack -        -- -        subscribe :: Text -> Eff (Client : es) () -        subscribe = E.unsafeEff_ . Z.subscribe sock . encodeUtf8 -        -- -        initialize :: Eff (Client : es) () -        initialize = -          localDomain domain $ do -            logInfo_ "Initializing ZMQ client" -            env <- ask @env -            mapM_ connect env.subEps -            mapM_ subscribe env.topics -            logTrace_ $ "Listening to " <> pack (show env.subEps) -            logTrace_ $ "Subscribed to " <> pack (show env.topics) +runClient action = +  withSocket Z.Sub >>= \sock -> +    E.evalStaticRep (Client sock) $ do +      localDomain domain $ do +        logInfo_ "Initializing ZMQ client" +        env <- ask @env +        forM_ env.subEps $ E.unsafeEff_ . Z.connect sock . unpack +        forM_ env.topics $ E.unsafeEff_ . Z.subscribe sock . encodeUtf8 +        logTrace_ $ "Listening to " <> pack (show env.subEps) +        logTrace_ $ "Subscribed to " <> pack (show env.topics) +      action diff --git a/hsm-core/Hsm/Core/Zmq/Server.hs b/hsm-core/Hsm/Core/Zmq/Server.hs index 5663cd8..2e9217b 100644 --- a/hsm-core/Hsm/Core/Zmq/Server.hs +++ b/hsm-core/Hsm/Core/Zmq/Server.hs @@ -47,7 +47,6 @@ send ::    -> Eff es ()  send = S.fold S.drain . S.mapM sender    where -    sender :: a -> Eff es ()      sender payload = do        Server sock <- E.getStaticRep        env <- ask @env @@ -64,18 +63,12 @@ runServer ::       )    => Eff (Server : es) a    -> Eff es a -runServer action = withSocket Z.Pub >>= run -  where -    run :: Z.Socket Z.Pub -> Eff es a -    run sock = E.evalStaticRep (Server sock) $ initialize >> action -      where -        bind :: Text -> Eff (Server : es) () -        bind = E.unsafeEff_ . Z.bind sock . unpack -        -- -        initialize :: Eff (Server : es) () -        initialize = -          localDomain domain $ do -            logInfo_ "Initializing ZMQ server" -            env <- ask @env -            bind env.pubEp -            logTrace_ $ "Publishing to " <> env.pubEp +runServer action = +  withSocket Z.Pub >>= \sock -> +    E.evalStaticRep (Server sock) $ do +      localDomain domain $ do +        logInfo_ "Initializing ZMQ server" +        env <- ask @env +        E.unsafeEff_ $ Z.bind sock $ unpack env.pubEp +        logTrace_ $ "Publishing to " <> env.pubEp +      action | 
