aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Oliver <contact@pauloliver.dev>2025-01-17 14:37:20 -0800
committerPaul Oliver <contact@pauloliver.dev>2025-01-17 19:16:43 -0800
commitebb88408c1d0884b5ca9b7d68bf76d31c33d2e5b (patch)
treec7c2c6b636e8eb89f2d4c6accf77a8c671b8ab9f
parentdc6bf1472c930ff1448c419d3205148bce1b787e (diff)
Allows services to publish different topicsHEADmaster
-rw-r--r--hsm-command/Hsm/Command/Command.hs34
-rw-r--r--hsm-command/Main.hs2
-rw-r--r--hsm-command/hsm-command.cabal2
-rw-r--r--hsm-core/Hsm/Core/Zmq/Client.hs19
-rw-r--r--hsm-core/Hsm/Core/Zmq/Server.hs27
-rw-r--r--hsm-dummy-fail/Main.hs14
-rw-r--r--hsm-dummy-fail/hsm-dummy-fail.cabal1
-rw-r--r--hsm-dummy-poller/Main.hs8
-rw-r--r--hsm-dummy-poller/hsm-dummy-poller.cabal1
-rw-r--r--hsm-dummy-pulser/Main.hs30
-rw-r--r--hsm-dummy-pulser/hsm-dummy-pulser.cabal1
-rw-r--r--hsm-dummy-receiver/Main.hs5
-rw-r--r--hsm-dummy-receiver/hsm-dummy-receiver.cabal1
-rw-r--r--hsm-status/Main.hs41
-rw-r--r--hsm-status/hsm-status.cabal1
15 files changed, 104 insertions, 83 deletions
diff --git a/hsm-command/Hsm/Command/Command.hs b/hsm-command/Hsm/Command/Command.hs
index 53964c4..902d637 100644
--- a/hsm-command/Hsm/Command/Command.hs
+++ b/hsm-command/Hsm/Command/Command.hs
@@ -1,4 +1,8 @@
+{-# LANGUAGE AllowAmbiguousTypes #-}
+{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
module Hsm.Command.Command
( Direction(X, Z)
@@ -9,12 +13,16 @@ module Hsm.Command.Command
) where
import Data.Binary (Binary)
+import Data.ByteString (ByteString)
import Data.Maybe (fromJust, isJust)
-import Data.Text (pack)
+import Data.Text (Text, pack)
import Effectful (Eff, (:>))
-import Effectful.Log (Log, logAttention_)
+import Effectful.Log (Log, logAttention_, logTrace_)
+import Effectful.Reader.Static (Reader, ask)
import GHC.Generics (Generic)
+import GHC.Records (HasField)
import Hsm.Command.Readline (Readline, readline)
+import Hsm.Core.Message (message)
import Streamly.Data.Stream qualified as S
import Text.Read (readEither)
@@ -39,11 +47,23 @@ data Command
| Rotate Angle Speed Int
deriving (Binary, Generic, Read, Show)
-commandStream :: (Log :> es, Readline :> es) => S.Stream (Eff es) Command
+commandStream ::
+ forall env es.
+ ( HasField "name" env Text
+ , Log :> es
+ , Reader env :> es
+ , Readline :> es
+ )
+ => S.Stream (Eff es) ByteString
commandStream =
S.mapMaybeM (parse . fromJust) $ S.takeWhile isJust $ S.repeatM readline
where
- parse string =
- case readEither string of
- Left err -> logAttention_ (pack err) >> return Nothing
- Right command -> return $ Just command
+ parse string = do
+ case readEither @Command string of
+ Left err -> do
+ logAttention_ $ pack err
+ return Nothing
+ Right command -> do
+ env <- ask @env
+ logTrace_ $ "Sending command: " <> pack (show command)
+ return $ Just $ message env.name command
diff --git a/hsm-command/Main.hs b/hsm-command/Main.hs
index efcbc6e..d135e53 100644
--- a/hsm-command/Main.hs
+++ b/hsm-command/Main.hs
@@ -26,7 +26,7 @@ $(deriveFromYaml ''Env)
main :: IO ()
main =
launch @Env "command" id $ \env logger level ->
- (commandStream & send @Env)
+ (commandStream @Env & send)
& runServer @Env
& runLog env.name logger level
& runReader env
diff --git a/hsm-command/hsm-command.cabal b/hsm-command/hsm-command.cabal
index 836bf07..ae83574 100644
--- a/hsm-command/hsm-command.cabal
+++ b/hsm-command/hsm-command.cabal
@@ -9,6 +9,7 @@ library
build-depends:
, base
, binary
+ , bytestring
, effectful-core
, haskeline
, hsm-core
@@ -28,6 +29,7 @@ executable command
build-depends:
, base
, binary
+ , bytestring
, effectful-core
, haskeline
, hsm-core
diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs
index 6093e54..97728f7 100644
--- a/hsm-core/Hsm/Core/Zmq/Client.hs
+++ b/hsm-core/Hsm/Core/Zmq/Client.hs
@@ -13,7 +13,7 @@ module Hsm.Core.Zmq.Client
import Control.Monad (forM_)
import Control.Monad.Loops (whileM)
-import Data.Binary (Binary)
+import Data.ByteString (ByteString)
import Data.Text (Text, pack, unpack)
import Data.Text.Encoding (encodeUtf8)
import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>))
@@ -22,7 +22,7 @@ import Effectful.Log (Log, localDomain, logInfo_, logTrace_)
import Effectful.Reader.Static (Reader, ask)
import Effectful.Resource (Resource)
import GHC.Records (HasField)
-import Hsm.Core.Message (body, topic)
+import Hsm.Core.Message (topic)
import Hsm.Core.Zmq (withSocket)
import Streamly.Data.Stream (Stream, repeatM)
import System.ZMQ4 qualified as Z
@@ -37,24 +37,19 @@ newtype instance E.StaticRep Client =
domain :: Text
domain = "client"
-receiver ::
- forall es a. (Log :> es, Client :> es, Binary a, Show a)
- => Eff es a
+receiver :: (Log :> es, Client :> es) => Eff es ByteString
receiver = do
Client sock <- E.getStaticRep
message <- E.unsafeEff_ $ Z.receive sock
localDomain domain
$ logTrace_
- $ "Message received ["
- <> topic message
- <> "]: "
- <> pack (show $ body @a message)
- return $ body message
+ $ "Message received with topic: " <> topic message
+ return message
-receive :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) a
+receive :: (Log :> es, Client :> es) => Stream (Eff es) ByteString
receive = repeatM receiver
-poll :: (Log :> es, Client :> es, Binary a, Show a) => Stream (Eff es) [a]
+poll :: (Log :> es, Client :> es) => Stream (Eff es) [ByteString]
poll =
repeatM $ do
ms <- whileM newMsg receiver
diff --git a/hsm-core/Hsm/Core/Zmq/Server.hs b/hsm-core/Hsm/Core/Zmq/Server.hs
index 2e9217b..6ea8aeb 100644
--- a/hsm-core/Hsm/Core/Zmq/Server.hs
+++ b/hsm-core/Hsm/Core/Zmq/Server.hs
@@ -10,15 +10,15 @@ module Hsm.Core.Zmq.Server
, runServer
) where
-import Data.Binary (Binary)
-import Data.Text (Text, pack, unpack)
+import Data.ByteString (ByteString)
+import Data.Text (Text, unpack)
import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>))
import Effectful.Dispatch.Static qualified as E
import Effectful.Log (Log, localDomain, logInfo_, logTrace_)
import Effectful.Reader.Static (Reader, ask)
import Effectful.Resource (Resource)
import GHC.Records (HasField)
-import Hsm.Core.Message (message)
+import Hsm.Core.Message (topic)
import Hsm.Core.Zmq (withSocket)
import Streamly.Data.Fold qualified as S (drain)
import Streamly.Data.Stream qualified as S (Stream, fold, mapM)
@@ -34,24 +34,15 @@ newtype instance E.StaticRep Server =
domain :: Text
domain = "server"
-send ::
- forall env es a.
- ( HasField "name" env Text
- , Log :> es
- , Reader env :> es
- , Server :> es
- , Binary a
- , Show a
- )
- => S.Stream (Eff es) a
- -> Eff es ()
+send :: (Log :> es, Server :> es) => S.Stream (Eff es) ByteString -> Eff es ()
send = S.fold S.drain . S.mapM sender
where
- sender payload = do
+ sender message = do
Server sock <- E.getStaticRep
- env <- ask @env
- E.unsafeEff_ $ Z.send sock [] $ message env.name payload
- localDomain domain $ logTrace_ $ "Message sent: " <> pack (show payload)
+ E.unsafeEff_ $ Z.send sock [] message
+ localDomain domain
+ $ logTrace_
+ $ "Message sent with topic: " <> topic message
runServer ::
forall env es a.
diff --git a/hsm-dummy-fail/Main.hs b/hsm-dummy-fail/Main.hs
index 4e293c8..b2b8988 100644
--- a/hsm-dummy-fail/Main.hs
+++ b/hsm-dummy-fail/Main.hs
@@ -2,15 +2,17 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
+import Data.ByteString (ByteString)
import Data.Function ((&))
import Data.Text (Text)
import Effectful (Eff, (:>), runEff)
import Effectful.Concurrent (Concurrent, runConcurrent, threadDelay)
import Effectful.Log (runLog)
-import Effectful.Reader.Static (Reader, asks, runReader)
+import Effectful.Reader.Static (Reader, ask, runReader)
import Effectful.Resource (runResource)
import Hsm.Core.App (launch)
import Hsm.Core.Env (deriveFromYaml)
+import Hsm.Core.Message (message)
import Hsm.Core.Zmq.Server (runServer, send)
import Hsm.Status.Error (Error(Error))
import Streamly.Data.Stream (Stream, fromEffect)
@@ -24,13 +26,15 @@ data Env = Env
$(deriveFromYaml ''Env)
-singleError :: (Concurrent :> es, Reader Env :> es) => Stream (Eff es) Error
+singleError ::
+ (Concurrent :> es, Reader Env :> es) => Stream (Eff es) ByteString
singleError =
fromEffect $ do
-- Seemingly, the service needs to be alive for a bit for ZMQ comms to
-- kick in.
- asks alive >>= threadDelay
- return $ Error 0 "Sent from dummy-fail service"
+ env <- ask @Env
+ threadDelay env.alive
+ return $ message env.name $ Error 0 "Sent from dummy-fail service"
-- Dummy fail service:
-- Proof of concept. Publishes a single error that can be catched by a
@@ -38,7 +42,7 @@ singleError =
main :: IO ()
main =
launch @Env "dummy-fail" withoutInputEcho $ \env logger level ->
- (singleError & send @Env)
+ (singleError & send)
& runServer @Env
& runConcurrent
& runLog env.name logger level
diff --git a/hsm-dummy-fail/hsm-dummy-fail.cabal b/hsm-dummy-fail/hsm-dummy-fail.cabal
index 269ea9c..ed0941c 100644
--- a/hsm-dummy-fail/hsm-dummy-fail.cabal
+++ b/hsm-dummy-fail/hsm-dummy-fail.cabal
@@ -8,6 +8,7 @@ version: 0.1.0.0
executable dummy-fail
build-depends:
, base
+ , bytestring
, echo
, effectful
, hsm-core
diff --git a/hsm-dummy-poller/Main.hs b/hsm-dummy-poller/Main.hs
index 9f2fad9..bc60c7c 100644
--- a/hsm-dummy-poller/Main.hs
+++ b/hsm-dummy-poller/Main.hs
@@ -3,6 +3,7 @@
{-# LANGUAGE TemplateHaskell #-}
import Control.Monad (forM_)
+import Data.ByteString (ByteString)
import Data.Function ((&))
import Data.Text (Text, pack)
import Effectful (Eff, (:>), runEff)
@@ -12,6 +13,7 @@ import Effectful.Reader.Static (Reader, asks, runReader)
import Effectful.Resource (runResource)
import Hsm.Core.App (launch)
import Hsm.Core.Env (deriveFromYaml)
+import Hsm.Core.Message (body)
import Hsm.Core.Zmq.Client (poll, runClient)
import Streamly.Data.Fold qualified as S (drain)
import Streamly.Data.Stream qualified as S (Stream, fold, mapM)
@@ -28,10 +30,10 @@ $(deriveFromYaml ''Env)
handle ::
(Concurrent :> es, Log :> es, Reader Env :> es)
- => S.Stream (Eff es) [Int]
+ => S.Stream (Eff es) [ByteString]
-> Eff es ()
handle =
- S.fold S.drain . S.mapM (\p -> asks period >>= threadDelay >> handler p)
+ S.fold S.drain . S.mapM (\ps -> asks period >>= threadDelay >> handler ps)
where
receiverDomain = "receiver"
handler [] = localDomain receiverDomain $ logInfo_ "No pulse received yet"
@@ -39,7 +41,7 @@ handle =
forM_ ps $ \p ->
localDomain receiverDomain
$ logInfo_
- $ "Received pulse #" <> pack (show p)
+ $ "Received pulse #" <> pack (show $ body @Int p)
-- Dummy poller service:
-- Proof of concept. Polls for "pulses" through ZMQ at a set interval and
diff --git a/hsm-dummy-poller/hsm-dummy-poller.cabal b/hsm-dummy-poller/hsm-dummy-poller.cabal
index 801cf68..ad04874 100644
--- a/hsm-dummy-poller/hsm-dummy-poller.cabal
+++ b/hsm-dummy-poller/hsm-dummy-poller.cabal
@@ -8,6 +8,7 @@ version: 0.1.0.0
executable dummy-poller
build-depends:
, base
+ , bytestring
, echo
, effectful
, hsm-core
diff --git a/hsm-dummy-pulser/Main.hs b/hsm-dummy-pulser/Main.hs
index d15b616..cc16cd4 100644
--- a/hsm-dummy-pulser/Main.hs
+++ b/hsm-dummy-pulser/Main.hs
@@ -2,6 +2,7 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
+import Data.ByteString (ByteString)
import Data.Function ((&))
import Data.Text (Text, pack)
import Effectful (Eff, (:>), runEff)
@@ -13,6 +14,7 @@ import Effectful.State.Static.Local (evalState)
import Hsm.Core.App (launch)
import Hsm.Core.Env (deriveFromYaml)
import Hsm.Core.Fsm qualified as F
+import Hsm.Core.Message (message)
import Hsm.Core.Zmq.Server (runServer, send)
import Streamly.Data.Stream (Stream, repeatM)
import System.IO.Echo (withoutInputEcho)
@@ -29,29 +31,23 @@ $(deriveFromYaml ''Env)
pulse :: (Concurrent :> es, Reader Env :> es) => Stream (Eff es) ()
pulse = repeatM $ asks period >>= threadDelay
-stateRun :: F.FsmState () Int Env Int
-stateRun = F.FsmState "run" action
- where
- action _ env sta =
- if sta < env.pulses
- then next
- else exit
- where
- next =
- F.FsmOutput
- (Just $ F.FsmResult sta (succ sta) stateRun)
- [(LogInfo, "Sending pulse #" <> pack (show sta))]
- exit =
- F.FsmOutput
- Nothing
- [(LogAttention, "Reached " <> pack (show env.pulses) <> " pulses")]
+stateRun :: F.FsmState () ByteString Env Int
+stateRun =
+ F.FsmState "run" $ \_ env sta ->
+ if sta < env.pulses
+ then F.FsmOutput
+ (Just $ F.FsmResult (message env.name sta) (succ sta) stateRun)
+ [(LogInfo, "Sending pulse #" <> pack (show sta))]
+ else F.FsmOutput
+ Nothing
+ [(LogAttention, "Sent " <> pack (show env.pulses) <> " pulses")]
-- Dummy pulser service:
-- Proof of concept. Publishes a "pulse" through ZMQ at a set interval.
main :: IO ()
main =
launch @Env "dummy-pulser" withoutInputEcho $ \env logger level ->
- (pulse & F.fsm @_ @_ @Env @Int & send @Env @_ @Int)
+ (pulse & F.fsm @_ @_ @Env @Int & send)
& runServer @Env
& evalState @Int 1
& evalState stateRun
diff --git a/hsm-dummy-pulser/hsm-dummy-pulser.cabal b/hsm-dummy-pulser/hsm-dummy-pulser.cabal
index 747f62c..428c139 100644
--- a/hsm-dummy-pulser/hsm-dummy-pulser.cabal
+++ b/hsm-dummy-pulser/hsm-dummy-pulser.cabal
@@ -8,6 +8,7 @@ version: 0.1.0.0
executable dummy-pulser
build-depends:
, base
+ , bytestring
, echo
, effectful
, hsm-core
diff --git a/hsm-dummy-receiver/Main.hs b/hsm-dummy-receiver/Main.hs
index 451e9c4..6f3db81 100644
--- a/hsm-dummy-receiver/Main.hs
+++ b/hsm-dummy-receiver/Main.hs
@@ -2,6 +2,7 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
+import Data.ByteString (ByteString)
import Data.Function ((&))
import Data.Text (Text, pack)
import Effectful (Eff, (:>), runEff)
@@ -10,6 +11,7 @@ import Effectful.Reader.Static (runReader)
import Effectful.Resource (runResource)
import Hsm.Core.App (launch)
import Hsm.Core.Env (deriveFromYaml)
+import Hsm.Core.Message (body)
import Hsm.Core.Zmq.Client (receive, runClient)
import Streamly.Data.Fold qualified as S (drain)
import Streamly.Data.Stream qualified as S (Stream, fold, mapM)
@@ -23,7 +25,7 @@ data Env = Env
$(deriveFromYaml ''Env)
-handle :: Log :> es => S.Stream (Eff es) Int -> Eff es ()
+handle :: Log :> es => S.Stream (Eff es) ByteString -> Eff es ()
handle = S.fold S.drain . S.mapM handler
where
handler =
@@ -32,6 +34,7 @@ handle = S.fold S.drain . S.mapM handler
. mappend "Received pulse #"
. pack
. show
+ . body @Int
-- Dummy receiver service:
-- Proof of concept. Listens for "pulses" through ZMQ and logs each time one
diff --git a/hsm-dummy-receiver/hsm-dummy-receiver.cabal b/hsm-dummy-receiver/hsm-dummy-receiver.cabal
index 9738bbe..61f24a3 100644
--- a/hsm-dummy-receiver/hsm-dummy-receiver.cabal
+++ b/hsm-dummy-receiver/hsm-dummy-receiver.cabal
@@ -8,6 +8,7 @@ version: 0.1.0.0
executable dummy-receiver
build-depends:
, base
+ , bytestring
, echo
, effectful-core
, hsm-core
diff --git a/hsm-status/Main.hs b/hsm-status/Main.hs
index 6220474..ec883ff 100644
--- a/hsm-status/Main.hs
+++ b/hsm-status/Main.hs
@@ -2,6 +2,7 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
+import Data.ByteString (ByteString)
import Data.Function ((&))
import Data.Set (Set, singleton)
import Data.Text (Text, pack)
@@ -13,6 +14,7 @@ import Effectful.State.Static.Local (evalState)
import Hsm.Core.App (launch)
import Hsm.Core.Env (deriveFromYaml)
import Hsm.Core.Fsm qualified as F
+import Hsm.Core.Message (body)
import Hsm.Core.Zmq.Client (poll, runClient)
import Hsm.GPIO (GPIO, GPIOEffect, runGPIO, toggle)
import Hsm.Status.Error (Error(Error))
@@ -33,28 +35,29 @@ $(deriveFromYaml ''Env)
result ::
Bool
- -> F.FsmState [Error] Bool Env ()
- -> [Error]
- -> F.FsmOutput [Error] Bool Env ()
-result sta next es =
- F.FsmOutput (Just $ F.FsmResult sta () next) (logError <$> es)
+ -> F.FsmState [ByteString] Bool Env ()
+ -> [ByteString]
+ -> F.FsmOutput [ByteString] Bool Env ()
+result sta next errs =
+ F.FsmOutput (Just $ F.FsmResult sta () next) (logError <$> errs)
where
- logError (Error code msg) =
- ( LogAttention
- , "Error received with code "
- <> pack (show code)
- <> " and message: "
- <> msg)
+ logError err =
+ let Error code msg = body err
+ in ( LogAttention
+ , "Error received with code "
+ <> pack (show code)
+ <> " and message: "
+ <> msg)
-stateOk :: F.FsmState [Error] Bool Env ()
+stateOk :: F.FsmState [ByteString] Bool Env ()
stateOk =
- F.FsmState "ok" $ \msg _ _ ->
- if null msg
- then result True stateOk msg
- else result False stateError msg
+ F.FsmState "ok" $ \errs _ _ ->
+ if null errs
+ then result True stateOk errs
+ else result False stateError errs
-stateError :: F.FsmState [Error] Bool Env ()
-stateError = F.FsmState "error" $ \msg _ _ -> result False stateError msg
+stateError :: F.FsmState [ByteString] Bool Env ()
+stateError = F.FsmState "error" $ \errs _ _ -> result False stateError errs
handle ::
(GPIOEffect Bool :> es, Log :> es, Reader Env :> es)
@@ -75,7 +78,7 @@ mapper env False = singleton env.gpioError
main :: IO ()
main =
launch "status" withoutInputEcho $ \env logger level ->
- (poll @_ @Error & F.fsm @_ @_ @Env @() & handle)
+ (poll & F.fsm @_ @_ @Env @() & handle)
& runClient @Env
& runGPIO (mapper env)
& evalState ()
diff --git a/hsm-status/hsm-status.cabal b/hsm-status/hsm-status.cabal
index 64528bd..feeff93 100644
--- a/hsm-status/hsm-status.cabal
+++ b/hsm-status/hsm-status.cabal
@@ -19,6 +19,7 @@ executable status
build-depends:
, base
, binary
+ , bytestring
, containers
, echo
, effectful-core