From ebb88408c1d0884b5ca9b7d68bf76d31c33d2e5b Mon Sep 17 00:00:00 2001 From: Paul Oliver Date: Fri, 17 Jan 2025 14:37:20 -0800 Subject: Allows services to publish different topics --- hsm-command/Hsm/Command/Command.hs | 34 +++++++++++++++++++----- hsm-command/Main.hs | 2 +- hsm-command/hsm-command.cabal | 2 ++ hsm-core/Hsm/Core/Zmq/Client.hs | 19 +++++-------- hsm-core/Hsm/Core/Zmq/Server.hs | 27 +++++++------------ hsm-dummy-fail/Main.hs | 14 ++++++---- hsm-dummy-fail/hsm-dummy-fail.cabal | 1 + hsm-dummy-poller/Main.hs | 8 +++--- hsm-dummy-poller/hsm-dummy-poller.cabal | 1 + hsm-dummy-pulser/Main.hs | 30 +++++++++------------ hsm-dummy-pulser/hsm-dummy-pulser.cabal | 1 + hsm-dummy-receiver/Main.hs | 5 +++- hsm-dummy-receiver/hsm-dummy-receiver.cabal | 1 + hsm-status/Main.hs | 41 ++++++++++++++++------------- hsm-status/hsm-status.cabal | 1 + 15 files changed, 104 insertions(+), 83 deletions(-) diff --git a/hsm-command/Hsm/Command/Command.hs b/hsm-command/Hsm/Command/Command.hs index 53964c4..902d637 100644 --- a/hsm-command/Hsm/Command/Command.hs +++ b/hsm-command/Hsm/Command/Command.hs @@ -1,4 +1,8 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} module Hsm.Command.Command ( Direction(X, Z) @@ -9,12 +13,16 @@ module Hsm.Command.Command ) where import Data.Binary (Binary) +import Data.ByteString (ByteString) import Data.Maybe (fromJust, isJust) -import Data.Text (pack) +import Data.Text (Text, pack) import Effectful (Eff, (:>)) -import Effectful.Log (Log, logAttention_) +import Effectful.Log (Log, logAttention_, logTrace_) +import Effectful.Reader.Static (Reader, ask) import GHC.Generics (Generic) +import GHC.Records (HasField) import Hsm.Command.Readline (Readline, readline) +import Hsm.Core.Message (message) import Streamly.Data.Stream qualified as S import Text.Read (readEither) @@ -39,11 +47,23 @@ data Command | Rotate Angle Speed Int deriving (Binary, Generic, Read, Show) -commandStream :: (Log :> es, Readline :> es) => S.Stream (Eff es) Command +commandStream :: + forall env es. + ( HasField "name" env Text + , Log :> es + , Reader env :> es + , Readline :> es + ) + => S.Stream (Eff es) ByteString commandStream = S.mapMaybeM (parse . fromJust) $ S.takeWhile isJust $ S.repeatM readline where - parse string = - case readEither string of - Left err -> logAttention_ (pack err) >> return Nothing - Right command -> return $ Just command + parse string = do + case readEither @Command string of + Left err -> do + logAttention_ $ pack err + return Nothing + Right command -> do + env <- ask @env + logTrace_ $ "Sending command: " <> pack (show command) + return $ Just $ message env.name command diff --git a/hsm-command/Main.hs b/hsm-command/Main.hs index efcbc6e..d135e53 100644 --- a/hsm-command/Main.hs +++ b/hsm-command/Main.hs @@ -26,7 +26,7 @@ $(deriveFromYaml ''Env) main :: IO () main = launch @Env "command" id $ \env logger level -> - (commandStream & send @Env) + (commandStream @Env & send) & runServer @Env & runLog env.name logger level & runReader env diff --git a/hsm-command/hsm-command.cabal b/hsm-command/hsm-command.cabal index 836bf07..ae83574 100644 --- a/hsm-command/hsm-command.cabal +++ b/hsm-command/hsm-command.cabal @@ -9,6 +9,7 @@ library build-depends: , base , binary + , bytestring , effectful-core , haskeline , hsm-core @@ -28,6 +29,7 @@ executable command build-depends: , base , binary + , bytestring , effectful-core , haskeline , hsm-core diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs index 6093e54..97728f7 100644 --- a/hsm-core/Hsm/Core/Zmq/Client.hs +++ b/hsm-core/Hsm/Core/Zmq/Client.hs @@ -13,7 +13,7 @@ module Hsm.Core.Zmq.Client import Control.Monad (forM_) import Control.Monad.Loops (whileM) -import Data.Binary (Binary) +import Data.ByteString (ByteString) import Data.Text (Text, pack, unpack) import Data.Text.Encoding (encodeUtf8) import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) @@ -22,7 +22,7 @@ import Effectful.Log (Log, localDomain, logInfo_, logTrace_) import Effectful.Reader.Static (Reader, ask) import Effectful.Resource (Resource) import GHC.Records (HasField) -import Hsm.Core.Message (body, topic) +import Hsm.Core.Message (topic) import Hsm.Core.Zmq (withSocket) import Streamly.Data.Stream (Stream, repeatM) import System.ZMQ4 qualified as Z @@ -37,24 +37,19 @@ newtype instance E.StaticRep Client = domain :: Text domain = "client" -receiver :: - forall es a. (Log :> es, Client :> es, Binary a, Show a) - => Eff es a +receiver :: (Log :> es, Client :> es) => Eff es ByteString receiver = do Client sock <- E.getStaticRep message <- E.unsafeEff_ $ Z.receive sock localDomain domain $ logTrace_ - $ "Message received [" - <> topic message - <> "]: " - <> pack (show $ body @a message) - return $ body message + $ "Message received with topic: " <> topic message + return message -receive :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) a +receive :: (Log :> es, Client :> es) => Stream (Eff es) ByteString receive = repeatM receiver -poll :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) [a] +poll :: (Log :> es, Client :> es) => Stream (Eff es) [ByteString] poll = repeatM $ do ms <- whileM newMsg receiver diff --git a/hsm-core/Hsm/Core/Zmq/Server.hs b/hsm-core/Hsm/Core/Zmq/Server.hs index 2e9217b..6ea8aeb 100644 --- a/hsm-core/Hsm/Core/Zmq/Server.hs +++ b/hsm-core/Hsm/Core/Zmq/Server.hs @@ -10,15 +10,15 @@ module Hsm.Core.Zmq.Server , runServer ) where -import Data.Binary (Binary) -import Data.Text (Text, pack, unpack) +import Data.ByteString (ByteString) +import Data.Text (Text, unpack) import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>)) import Effectful.Dispatch.Static qualified as E import Effectful.Log (Log, localDomain, logInfo_, logTrace_) import Effectful.Reader.Static (Reader, ask) import Effectful.Resource (Resource) import GHC.Records (HasField) -import Hsm.Core.Message (message) +import Hsm.Core.Message (topic) import Hsm.Core.Zmq (withSocket) import Streamly.Data.Fold qualified as S (drain) import Streamly.Data.Stream qualified as S (Stream, fold, mapM) @@ -34,24 +34,15 @@ newtype instance E.StaticRep Server = domain :: Text domain = "server" -send :: - forall env es a. - ( HasField "name" env Text - , Log :> es - , Reader env :> es - , Server :> es - , Binary a - , Show a - ) - => S.Stream (Eff es) a - -> Eff es () +send :: (Log :> es, Server :> es) => S.Stream (Eff es) ByteString -> Eff es () send = S.fold S.drain . S.mapM sender where - sender payload = do + sender message = do Server sock <- E.getStaticRep - env <- ask @env - E.unsafeEff_ $ Z.send sock [] $ message env.name payload - localDomain domain $ logTrace_ $ "Message sent: " <> pack (show payload) + E.unsafeEff_ $ Z.send sock [] message + localDomain domain + $ logTrace_ + $ "Message sent with topic: " <> topic message runServer :: forall env es a. diff --git a/hsm-dummy-fail/Main.hs b/hsm-dummy-fail/Main.hs index 4e293c8..b2b8988 100644 --- a/hsm-dummy-fail/Main.hs +++ b/hsm-dummy-fail/Main.hs @@ -2,15 +2,17 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TemplateHaskell #-} +import Data.ByteString (ByteString) import Data.Function ((&)) import Data.Text (Text) import Effectful (Eff, (:>), runEff) import Effectful.Concurrent (Concurrent, runConcurrent, threadDelay) import Effectful.Log (runLog) -import Effectful.Reader.Static (Reader, asks, runReader) +import Effectful.Reader.Static (Reader, ask, runReader) import Effectful.Resource (runResource) import Hsm.Core.App (launch) import Hsm.Core.Env (deriveFromYaml) +import Hsm.Core.Message (message) import Hsm.Core.Zmq.Server (runServer, send) import Hsm.Status.Error (Error(Error)) import Streamly.Data.Stream (Stream, fromEffect) @@ -24,13 +26,15 @@ data Env = Env $(deriveFromYaml ''Env) -singleError :: (Concurrent :> es, Reader Env :> es) => Stream (Eff es) Error +singleError :: + (Concurrent :> es, Reader Env :> es) => Stream (Eff es) ByteString singleError = fromEffect $ do -- Seemingly, the service needs to be alive for a bit for ZMQ comms to -- kick in. - asks alive >>= threadDelay - return $ Error 0 "Sent from dummy-fail service" + env <- ask @Env + threadDelay env.alive + return $ message env.name $ Error 0 "Sent from dummy-fail service" -- Dummy fail service: -- Proof of concept. Publishes a single error that can be catched by a @@ -38,7 +42,7 @@ singleError = main :: IO () main = launch @Env "dummy-fail" withoutInputEcho $ \env logger level -> - (singleError & send @Env) + (singleError & send) & runServer @Env & runConcurrent & runLog env.name logger level diff --git a/hsm-dummy-fail/hsm-dummy-fail.cabal b/hsm-dummy-fail/hsm-dummy-fail.cabal index 269ea9c..ed0941c 100644 --- a/hsm-dummy-fail/hsm-dummy-fail.cabal +++ b/hsm-dummy-fail/hsm-dummy-fail.cabal @@ -8,6 +8,7 @@ version: 0.1.0.0 executable dummy-fail build-depends: , base + , bytestring , echo , effectful , hsm-core diff --git a/hsm-dummy-poller/Main.hs b/hsm-dummy-poller/Main.hs index 9f2fad9..bc60c7c 100644 --- a/hsm-dummy-poller/Main.hs +++ b/hsm-dummy-poller/Main.hs @@ -3,6 +3,7 @@ {-# LANGUAGE TemplateHaskell #-} import Control.Monad (forM_) +import Data.ByteString (ByteString) import Data.Function ((&)) import Data.Text (Text, pack) import Effectful (Eff, (:>), runEff) @@ -12,6 +13,7 @@ import Effectful.Reader.Static (Reader, asks, runReader) import Effectful.Resource (runResource) import Hsm.Core.App (launch) import Hsm.Core.Env (deriveFromYaml) +import Hsm.Core.Message (body) import Hsm.Core.Zmq.Client (poll, runClient) import Streamly.Data.Fold qualified as S (drain) import Streamly.Data.Stream qualified as S (Stream, fold, mapM) @@ -28,10 +30,10 @@ $(deriveFromYaml ''Env) handle :: (Concurrent :> es, Log :> es, Reader Env :> es) - => S.Stream (Eff es) [Int] + => S.Stream (Eff es) [ByteString] -> Eff es () handle = - S.fold S.drain . S.mapM (\p -> asks period >>= threadDelay >> handler p) + S.fold S.drain . S.mapM (\ps -> asks period >>= threadDelay >> handler ps) where receiverDomain = "receiver" handler [] = localDomain receiverDomain $ logInfo_ "No pulse received yet" @@ -39,7 +41,7 @@ handle = forM_ ps $ \p -> localDomain receiverDomain $ logInfo_ - $ "Received pulse #" <> pack (show p) + $ "Received pulse #" <> pack (show $ body @Int p) -- Dummy poller service: -- Proof of concept. Polls for "pulses" through ZMQ at a set interval and diff --git a/hsm-dummy-poller/hsm-dummy-poller.cabal b/hsm-dummy-poller/hsm-dummy-poller.cabal index 801cf68..ad04874 100644 --- a/hsm-dummy-poller/hsm-dummy-poller.cabal +++ b/hsm-dummy-poller/hsm-dummy-poller.cabal @@ -8,6 +8,7 @@ version: 0.1.0.0 executable dummy-poller build-depends: , base + , bytestring , echo , effectful , hsm-core diff --git a/hsm-dummy-pulser/Main.hs b/hsm-dummy-pulser/Main.hs index d15b616..cc16cd4 100644 --- a/hsm-dummy-pulser/Main.hs +++ b/hsm-dummy-pulser/Main.hs @@ -2,6 +2,7 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TemplateHaskell #-} +import Data.ByteString (ByteString) import Data.Function ((&)) import Data.Text (Text, pack) import Effectful (Eff, (:>), runEff) @@ -13,6 +14,7 @@ import Effectful.State.Static.Local (evalState) import Hsm.Core.App (launch) import Hsm.Core.Env (deriveFromYaml) import Hsm.Core.Fsm qualified as F +import Hsm.Core.Message (message) import Hsm.Core.Zmq.Server (runServer, send) import Streamly.Data.Stream (Stream, repeatM) import System.IO.Echo (withoutInputEcho) @@ -29,29 +31,23 @@ $(deriveFromYaml ''Env) pulse :: (Concurrent :> es, Reader Env :> es) => Stream (Eff es) () pulse = repeatM $ asks period >>= threadDelay -stateRun :: F.FsmState () Int Env Int -stateRun = F.FsmState "run" action - where - action _ env sta = - if sta < env.pulses - then next - else exit - where - next = - F.FsmOutput - (Just $ F.FsmResult sta (succ sta) stateRun) - [(LogInfo, "Sending pulse #" <> pack (show sta))] - exit = - F.FsmOutput - Nothing - [(LogAttention, "Reached " <> pack (show env.pulses) <> " pulses")] +stateRun :: F.FsmState () ByteString Env Int +stateRun = + F.FsmState "run" $ \_ env sta -> + if sta < env.pulses + then F.FsmOutput + (Just $ F.FsmResult (message env.name sta) (succ sta) stateRun) + [(LogInfo, "Sending pulse #" <> pack (show sta))] + else F.FsmOutput + Nothing + [(LogAttention, "Sent " <> pack (show env.pulses) <> " pulses")] -- Dummy pulser service: -- Proof of concept. Publishes a "pulse" through ZMQ at a set interval. main :: IO () main = launch @Env "dummy-pulser" withoutInputEcho $ \env logger level -> - (pulse & F.fsm @_ @_ @Env @Int & send @Env @_ @Int) + (pulse & F.fsm @_ @_ @Env @Int & send) & runServer @Env & evalState @Int 1 & evalState stateRun diff --git a/hsm-dummy-pulser/hsm-dummy-pulser.cabal b/hsm-dummy-pulser/hsm-dummy-pulser.cabal index 747f62c..428c139 100644 --- a/hsm-dummy-pulser/hsm-dummy-pulser.cabal +++ b/hsm-dummy-pulser/hsm-dummy-pulser.cabal @@ -8,6 +8,7 @@ version: 0.1.0.0 executable dummy-pulser build-depends: , base + , bytestring , echo , effectful , hsm-core diff --git a/hsm-dummy-receiver/Main.hs b/hsm-dummy-receiver/Main.hs index 451e9c4..6f3db81 100644 --- a/hsm-dummy-receiver/Main.hs +++ b/hsm-dummy-receiver/Main.hs @@ -2,6 +2,7 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TemplateHaskell #-} +import Data.ByteString (ByteString) import Data.Function ((&)) import Data.Text (Text, pack) import Effectful (Eff, (:>), runEff) @@ -10,6 +11,7 @@ import Effectful.Reader.Static (runReader) import Effectful.Resource (runResource) import Hsm.Core.App (launch) import Hsm.Core.Env (deriveFromYaml) +import Hsm.Core.Message (body) import Hsm.Core.Zmq.Client (receive, runClient) import Streamly.Data.Fold qualified as S (drain) import Streamly.Data.Stream qualified as S (Stream, fold, mapM) @@ -23,7 +25,7 @@ data Env = Env $(deriveFromYaml ''Env) -handle :: Log :> es => S.Stream (Eff es) Int -> Eff es () +handle :: Log :> es => S.Stream (Eff es) ByteString -> Eff es () handle = S.fold S.drain . S.mapM handler where handler = @@ -32,6 +34,7 @@ handle = S.fold S.drain . S.mapM handler . mappend "Received pulse #" . pack . show + . body @Int -- Dummy receiver service: -- Proof of concept. Listens for "pulses" through ZMQ and logs each time one diff --git a/hsm-dummy-receiver/hsm-dummy-receiver.cabal b/hsm-dummy-receiver/hsm-dummy-receiver.cabal index 9738bbe..61f24a3 100644 --- a/hsm-dummy-receiver/hsm-dummy-receiver.cabal +++ b/hsm-dummy-receiver/hsm-dummy-receiver.cabal @@ -8,6 +8,7 @@ version: 0.1.0.0 executable dummy-receiver build-depends: , base + , bytestring , echo , effectful-core , hsm-core diff --git a/hsm-status/Main.hs b/hsm-status/Main.hs index 6220474..ec883ff 100644 --- a/hsm-status/Main.hs +++ b/hsm-status/Main.hs @@ -2,6 +2,7 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TemplateHaskell #-} +import Data.ByteString (ByteString) import Data.Function ((&)) import Data.Set (Set, singleton) import Data.Text (Text, pack) @@ -13,6 +14,7 @@ import Effectful.State.Static.Local (evalState) import Hsm.Core.App (launch) import Hsm.Core.Env (deriveFromYaml) import Hsm.Core.Fsm qualified as F +import Hsm.Core.Message (body) import Hsm.Core.Zmq.Client (poll, runClient) import Hsm.GPIO (GPIO, GPIOEffect, runGPIO, toggle) import Hsm.Status.Error (Error(Error)) @@ -33,28 +35,29 @@ $(deriveFromYaml ''Env) result :: Bool - -> F.FsmState [Error] Bool Env () - -> [Error] - -> F.FsmOutput [Error] Bool Env () -result sta next es = - F.FsmOutput (Just $ F.FsmResult sta () next) (logError <$> es) + -> F.FsmState [ByteString] Bool Env () + -> [ByteString] + -> F.FsmOutput [ByteString] Bool Env () +result sta next errs = + F.FsmOutput (Just $ F.FsmResult sta () next) (logError <$> errs) where - logError (Error code msg) = - ( LogAttention - , "Error received with code " - <> pack (show code) - <> " and message: " - <> msg) + logError err = + let Error code msg = body err + in ( LogAttention + , "Error received with code " + <> pack (show code) + <> " and message: " + <> msg) -stateOk :: F.FsmState [Error] Bool Env () +stateOk :: F.FsmState [ByteString] Bool Env () stateOk = - F.FsmState "ok" $ \msg _ _ -> - if null msg - then result True stateOk msg - else result False stateError msg + F.FsmState "ok" $ \errs _ _ -> + if null errs + then result True stateOk errs + else result False stateError errs -stateError :: F.FsmState [Error] Bool Env () -stateError = F.FsmState "error" $ \msg _ _ -> result False stateError msg +stateError :: F.FsmState [ByteString] Bool Env () +stateError = F.FsmState "error" $ \errs _ _ -> result False stateError errs handle :: (GPIOEffect Bool :> es, Log :> es, Reader Env :> es) @@ -75,7 +78,7 @@ mapper env False = singleton env.gpioError main :: IO () main = launch "status" withoutInputEcho $ \env logger level -> - (poll @_ @Error & F.fsm @_ @_ @Env @() & handle) + (poll & F.fsm @_ @_ @Env @() & handle) & runClient @Env & runGPIO (mapper env) & evalState () diff --git a/hsm-status/hsm-status.cabal b/hsm-status/hsm-status.cabal index 64528bd..feeff93 100644 --- a/hsm-status/hsm-status.cabal +++ b/hsm-status/hsm-status.cabal @@ -19,6 +19,7 @@ executable status build-depends: , base , binary + , bytestring , containers , echo , effectful-core -- cgit v1.2.1