aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Oliver <contact@pauloliver.dev>2024-12-29 17:05:34 +0000
committerPaul Oliver <contact@pauloliver.dev>2025-01-16 18:30:09 -0800
commitcc639b06c7126fac7b445d8f778455620d7f8f50 (patch)
treea4c5c7c0b0a9cdb5bea0891e198003035065e57d
Initial
-rw-r--r--.gitignore2
-rw-r--r--README.md52
-rw-r--r--hsm-command/Hsm/Command/Command.hs52
-rw-r--r--hsm-command/Hsm/Command/Readline.hs51
-rw-r--r--hsm-command/Main.hs33
-rw-r--r--hsm-command/hsm-command.cabal48
-rw-r--r--hsm-core/Hsm/Core/App.hs21
-rw-r--r--hsm-core/Hsm/Core/Env.hs29
-rw-r--r--hsm-core/Hsm/Core/Fsm.hs61
-rw-r--r--hsm-core/Hsm/Core/Log.hs20
-rw-r--r--hsm-core/Hsm/Core/Message.hs25
-rw-r--r--hsm-core/Hsm/Core/Options.hs40
-rw-r--r--hsm-core/Hsm/Core/Zmq.hs34
-rw-r--r--hsm-core/Hsm/Core/Zmq/Client.hs108
-rw-r--r--hsm-core/Hsm/Core/Zmq/Server.hs81
-rw-r--r--hsm-core/hsm-core.cabal42
-rw-r--r--hsm-dummy-poller/Main.hs56
-rw-r--r--hsm-dummy-poller/hsm-dummy-poller.cabal25
-rw-r--r--hsm-dummy-pulser/Main.hs64
-rw-r--r--hsm-dummy-pulser/hsm-dummy-pulser.cabal25
-rw-r--r--hsm-dummy-receiver/Main.hs44
-rw-r--r--hsm-dummy-receiver/hsm-dummy-receiver.cabal25
-rw-r--r--servconf.yaml21
-rw-r--r--stack.yaml11
-rw-r--r--stack.yaml.lock26
25 files changed, 996 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..6a97e52
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+**/.stack-work/
+.hsm_command_history
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..01c9427
--- /dev/null
+++ b/README.md
@@ -0,0 +1,52 @@
+# HsMouse
+Experimental control code for robotics. Tested on Raspberry Pi 5.
+
+## Features
+- [`zeromq4-haskell`](https://hackage.haskell.org/package/zeromq4-haskell)
+library is used for IPC.
+- [`effectful`](https://hackage.haskell.org/package/effectful) library is used
+to control effects within monadic computations.
+- [`streamly`](https://hackage.haskell.org/package/streamly) library is used
+to build pipelines modularly and stream data within pipeline elements. E.g.
+`zmq client & processor & zmq server`.
+
+## Build
+Install [`stack`](https://docs.haskellstack.org/en/stable/). I recommend using
+[`ghcup`](https://www.haskell.org/ghcup/) for this. Run `stack build` to
+compile all libraries and executables. Note: you might need to install some
+system dependencies on your host first (e.g. `libzmq`, etc.)
+
+## Test
+On one terminal, run `stack exec dummy-receiver`. This will initialize a ZMQ
+client that will wait for incoming pulses. On a separate terminal, run
+`stack exec dummy-pulser`. You should be able to see pulses being transmitted
+from server to client. E.g.:
+```
+$> stack exec dummy-receiver
+2025-01-12 21:27:02 INFO receiver/client: Initializing ZMQ client
+2025-01-12 21:27:16 INFO receiver/receiver: Received pulse #1
+2025-01-12 21:27:17 INFO receiver/receiver: Received pulse #2
+2025-01-12 21:27:18 INFO receiver/receiver: Received pulse #3
+2025-01-12 21:27:19 INFO receiver/receiver: Received pulse #4
+2025-01-12 21:27:20 INFO receiver/receiver: Received pulse #5
+2025-01-12 21:27:21 INFO receiver/receiver: Received pulse #6
+2025-01-12 21:27:22 INFO receiver/receiver: Received pulse #7
+2025-01-12 21:27:23 INFO receiver/receiver: Received pulse #8
+2025-01-12 21:27:24 INFO receiver/receiver: Received pulse #9
+```
+
+```
+$> stack exec dummy-pulser
+2025-01-12 21:27:15 INFO pulser/server: Initializing ZMQ server
+2025-01-12 21:27:16 INFO pulser/fsm/run: Sending pulse #1
+2025-01-12 21:27:17 INFO pulser/fsm/run: Sending pulse #2
+2025-01-12 21:27:18 INFO pulser/fsm/run: Sending pulse #3
+2025-01-12 21:27:19 INFO pulser/fsm/run: Sending pulse #4
+2025-01-12 21:27:20 INFO pulser/fsm/run: Sending pulse #5
+2025-01-12 21:27:21 INFO pulser/fsm/run: Sending pulse #6
+2025-01-12 21:27:22 INFO pulser/fsm/run: Sending pulse #7
+2025-01-12 21:27:23 INFO pulser/fsm/run: Sending pulse #8
+2025-01-12 21:27:24 INFO pulser/fsm/run: Sending pulse #9
+2025-01-12 21:27:25 ATTENTION pulser/fsm/run: Reached 10 pulses
+2025-01-12 21:27:25 ATTENTION pulser/fsm: No state returned, exiting FSM
+```
diff --git a/hsm-command/Hsm/Command/Command.hs b/hsm-command/Hsm/Command/Command.hs
new file mode 100644
index 0000000..3b53287
--- /dev/null
+++ b/hsm-command/Hsm/Command/Command.hs
@@ -0,0 +1,52 @@
+{-# LANGUAGE DeriveAnyClass #-}
+
+module Hsm.Command.Command
+ ( Direction(X, Z)
+ , Angle(CW, CCW)
+ , Speed(Slow, Mid, Fast)
+ , Command(Move, Rotate)
+ , commandStream
+ ) where
+
+import Data.Binary (Binary)
+import Data.Maybe (fromJust, isJust)
+import Data.Text (pack)
+import Effectful (Eff, (:>))
+import Effectful.Log (Log, logAttention_)
+import GHC.Generics (Generic)
+import Hsm.Command.Readline (Readline, readline)
+import Streamly.Data.Stream qualified as S
+import Text.Read (readEither)
+
+data Direction
+ = X
+ | Z
+ deriving (Binary, Generic, Read, Show)
+
+data Angle
+ = CW
+ | CCW
+ deriving (Binary, Generic, Read, Show)
+
+data Speed
+ = Slow
+ | Mid
+ | Fast
+ deriving (Binary, Generic, Read, Show)
+
+data Command
+ = Move Direction Speed Int
+ | Rotate Angle Speed Int
+ deriving (Binary, Generic, Read, Show)
+
+commandStream ::
+ forall es. (Log :> es, Readline :> es)
+ => S.Stream (Eff es) Command
+commandStream =
+ S.mapMaybeM (parse . fromJust) $ S.takeWhile isJust $ S.repeatM readline
+ where
+ parse :: String -> Eff es (Maybe Command)
+ parse string =
+ case readEither string of
+ Left err -> logAttention_ (pack err) >> return Nothing
+ Right command -> return $ Just command
diff --git a/hsm-command/Hsm/Command/Readline.hs b/hsm-command/Hsm/Command/Readline.hs
new file mode 100644
index 0000000..3c56453
--- /dev/null
+++ b/hsm-command/Hsm/Command/Readline.hs
@@ -0,0 +1,51 @@
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE TypeFamilies #-}
+
+module Hsm.Command.Readline
+ ( Readline
+ , readline
+ , runReadline
+ ) where
+
+import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>))
+import Effectful.Dispatch.Static qualified as S
+import Effectful.Log (Log, getLoggerEnv, leLogger, waitForLogger)
+import Effectful.Resource (Resource, allocate)
+import System.Console.Haskeline qualified as H
+import System.Console.Haskeline.IO qualified as H
+
+data Readline a b
+
+type instance DispatchOf Readline = Static S.WithSideEffects
+
+newtype instance S.StaticRep Readline =
+ Readline H.InputState
+
+readline ::
+ forall es. (Log :> es, Readline :> es)
+ => Eff es (Maybe String)
+readline = do
+ flushLogger
+ Readline hdl <- S.getStaticRep
+ S.unsafeEff_ $ nextLine hdl
+ where
+ flushLogger :: Eff es ()
+ flushLogger = getLoggerEnv >>= S.unsafeEff_ . waitForLogger . leLogger
+ --
+ nextLine :: H.InputState -> IO (Maybe String)
+ nextLine hdl =
+ H.queryInput hdl
+ $ H.handleInterrupt (return Nothing)
+ $ H.withInterrupt
+ $ H.getInputLine "% "
+
+runReadline :: (IOE :> es, Resource :> es) => Eff (Readline : es) a -> Eff es a
+runReadline action = do
+ handle <- snd <$> allocate istate H.cancelInput
+ S.evalStaticRep (Readline handle) action
+ where
+ settings :: H.Settings IO
+ settings = H.defaultSettings {H.historyFile = Just ".hsm_command_history"}
+ --
+ istate :: IO H.InputState
+ istate = H.initializeInput settings
diff --git a/hsm-command/Main.hs b/hsm-command/Main.hs
new file mode 100644
index 0000000..0b24719
--- /dev/null
+++ b/hsm-command/Main.hs
@@ -0,0 +1,33 @@
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TemplateHaskell #-}
+
+import Data.Function ((&))
+import Data.Text (Text)
+import Effectful (runEff)
+import Effectful.Log (runLog)
+import Effectful.Reader.Static (runReader)
+import Effectful.Resource (runResource)
+import Hsm.Command.Command (commandStream)
+import Hsm.Command.Readline (runReadline)
+import Hsm.Core.App (launch)
+import Hsm.Core.Env (deriveFromYaml)
+import Hsm.Core.Zmq.Server (runServer, send)
+
+data Env = Env
+ { name :: Text
+ , pubEp :: Text
+ }
+
+$(deriveFromYaml ''Env)
+
+main :: IO ()
+main =
+ launch @Env "command" id $ \env logger level ->
+ (commandStream & send @Env)
+ & runServer @Env
+ & runLog env.name logger level
+ & runReader env
+ & runReadline
+ & runResource
+ & runEff
diff --git a/hsm-command/hsm-command.cabal b/hsm-command/hsm-command.cabal
new file mode 100644
index 0000000..766f372
--- /dev/null
+++ b/hsm-command/hsm-command.cabal
@@ -0,0 +1,48 @@
+cabal-version: 3.4
+author: Paul Oliver
+build-type: Simple
+maintainer: contact@pauloliver.dev
+name: hsm-command
+version: 0.1.0.0
+
+library
+ build-depends:
+ , base
+ , binary
+ , effectful-core
+ , haskeline
+ , log-effectful
+ , resourcet-effectful
+ , streamly-core
+ , text
+
+ exposed-modules:
+ Hsm.Command.Command
+ Hsm.Command.Readline
+
+ ghc-options: -Wall -Wunused-packages
+ default-language: GHC2021
+
+executable command
+ build-depends:
+ , base
+ , binary
+ , effectful-core
+ , haskeline
+ , hsm-core
+ , log-effectful
+ , resourcet-effectful
+ , streamly-core
+ , text
+
+ main-is: Main.hs
+ other-modules:
+ Hsm.Command.Command
+ Hsm.Command.Readline
+
+ ghc-options: -Wall -Wunused-packages
+
+ if !arch(x86_64)
+ ghc-options: -optl=-mno-fix-cortex-a53-835769
+
+ default-language: GHC2021
diff --git a/hsm-core/Hsm/Core/App.hs b/hsm-core/Hsm/Core/App.hs
new file mode 100644
index 0000000..11759be
--- /dev/null
+++ b/hsm-core/Hsm/Core/App.hs
@@ -0,0 +1,21 @@
+module Hsm.Core.App
+ ( launch
+ ) where
+
+import Data.Aeson (FromJSON)
+import Data.Text (Text)
+import Effectful.Log (LogLevel, Logger)
+import Hsm.Core.Env (environment)
+import Hsm.Core.Options (Options(Options), options)
+import Log.Backend.StandardOutput (withStdOutLogger)
+
+launch ::
+ FromJSON env
+ => Text
+ -> (IO app -> IO app)
+ -> (env -> Logger -> LogLevel -> IO app)
+ -> IO app
+launch name wrapper app = do
+ Options path level <- options name
+ env <- environment name path
+ wrapper $ withStdOutLogger $ \logger -> app env logger level
diff --git a/hsm-core/Hsm/Core/Env.hs b/hsm-core/Hsm/Core/Env.hs
new file mode 100644
index 0000000..4e7986f
--- /dev/null
+++ b/hsm-core/Hsm/Core/Env.hs
@@ -0,0 +1,29 @@
+module Hsm.Core.Env
+ ( environment
+ , deriveFromYaml
+ ) where
+
+import Data.Aeson (FromJSON, Result(Error, Success), Value, fromJSON)
+import Data.Aeson.Key (fromText)
+import Data.Aeson.KeyMap (KeyMap, (!?))
+import Data.Aeson.TH (defaultOptions, deriveFromJSON, rejectUnknownFields)
+import Data.Maybe (fromMaybe)
+import Data.Text (Text, unpack)
+import Data.Yaml (decodeFileThrow)
+import Language.Haskell.TH (Dec, Name, Q)
+
+environment :: FromJSON env => Text -> Text -> IO env
+environment name = fmap (check . fromJSON . load) . decodeFileThrow . unpack
+ where
+ load :: KeyMap Value -> Value
+ load keymap =
+ fromMaybe
+ (error $ "Service configuration for " <> unpack name <> " not found)")
+ $ keymap !? fromText name
+ --
+ check :: Result env -> env
+ check (Success env) = env
+ check (Error str) = error str
+
+deriveFromYaml :: Name -> Q [Dec]
+deriveFromYaml = deriveFromJSON defaultOptions {rejectUnknownFields = True}
diff --git a/hsm-core/Hsm/Core/Fsm.hs b/hsm-core/Hsm/Core/Fsm.hs
new file mode 100644
index 0000000..43fa497
--- /dev/null
+++ b/hsm-core/Hsm/Core/Fsm.hs
@@ -0,0 +1,61 @@
+{-# LANGUAGE AllowAmbiguousTypes #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+module Hsm.Core.Fsm
+ ( FsmState(FsmState)
+ , FsmOutput(FsmOutput)
+ , FsmResult(FsmResult)
+ , fsm
+ ) where
+
+import Data.Maybe (fromJust, isJust)
+import Data.Text (Text)
+import Effectful (Eff, (:>))
+import Effectful.Log (Log, LogLevel, localDomain, logAttention_, logTrace_)
+import Effectful.Reader.Static (Reader, ask)
+import Effectful.State.Static.Local (State, get, put)
+import Hsm.Core.Log (logTup)
+import Streamly.Data.Stream qualified as S (Stream, mapM, takeWhile)
+
+data FsmState i o env sta =
+ FsmState Text (i -> env -> sta -> FsmOutput i o env sta)
+
+data FsmOutput i o env sta =
+ FsmOutput (Maybe (FsmResult i o env sta)) [(LogLevel, Text)]
+
+data FsmResult i o env sta =
+ FsmResult o sta (FsmState i o env sta)
+
+fsm ::
+ forall i o env sta es.
+ ( Log :> es
+ , Reader env :> es
+ , State (FsmState i o env sta) :> es
+ , State sta :> es
+ )
+ => S.Stream (Eff es) i
+ -> S.Stream (Eff es) o
+fsm = S.mapM (return . fromJust) . S.takeWhile isJust . S.mapM run
+ where
+ exit :: Eff es (Maybe o)
+ exit = do
+ logAttention_ "No state returned, exiting FSM"
+ return Nothing
+ --
+ push :: FsmResult i o env sta -> Eff es (Maybe o)
+ push (FsmResult out sta next) = do
+ put sta
+ put next
+ return $ Just out
+ --
+ run :: i -> Eff es (Maybe o)
+ run input =
+ localDomain "fsm" $ do
+ FsmState name action <- get
+ sta <- get
+ env <- ask
+ logTrace_ $ "Entering state " <> name
+ FsmOutput res logs <- return $ action input env sta
+ localDomain name $ mapM_ logTup logs
+ logTrace_ $ "Exiting state " <> name
+ maybe exit push res
diff --git a/hsm-core/Hsm/Core/Log.hs b/hsm-core/Hsm/Core/Log.hs
new file mode 100644
index 0000000..9f1f357
--- /dev/null
+++ b/hsm-core/Hsm/Core/Log.hs
@@ -0,0 +1,20 @@
+module Hsm.Core.Log
+ ( withLogIO
+ , logTup
+ ) where
+
+import Data.Aeson.Types (emptyObject)
+import Data.Text (Text)
+import Data.Time.Clock (getCurrentTime)
+import Effectful (Eff, (:>))
+import Effectful.Log (Log, LogLevel, getLoggerIO, logMessage)
+
+withLogIO :: Log :> es => Eff es (LogLevel -> Text -> IO ())
+withLogIO = do
+ logIO <- getLoggerIO
+ return $ \level message -> do
+ now <- getCurrentTime
+ logIO now level message emptyObject
+
+logTup :: Log :> es => (LogLevel, Text) -> Eff es ()
+logTup (level, message) = logMessage level message emptyObject
diff --git a/hsm-core/Hsm/Core/Message.hs b/hsm-core/Hsm/Core/Message.hs
new file mode 100644
index 0000000..b2a9f23
--- /dev/null
+++ b/hsm-core/Hsm/Core/Message.hs
@@ -0,0 +1,25 @@
+{-# LANGUAGE OverloadedStrings #-}
+
+module Hsm.Core.Message
+ ( message
+ , topic
+ , body
+ ) where
+
+import Data.Binary (Binary, decode, encode)
+import Data.ByteString (ByteString, fromStrict, toStrict)
+import Data.ByteString.Char8 qualified as B (breakSubstring, drop, length)
+import Data.Text (Text)
+import Data.Text.Encoding (decodeUtf8, encodeUtf8)
+
+sep :: ByteString
+sep = "//"
+
+message :: Binary a => Text -> a -> ByteString
+message t b = encodeUtf8 t <> sep <> toStrict (encode b)
+
+topic :: ByteString -> Text
+topic = decodeUtf8 . fst . B.breakSubstring sep
+
+body :: Binary a => ByteString -> a
+body = decode . fromStrict . B.drop (B.length sep) . snd . B.breakSubstring sep
diff --git a/hsm-core/Hsm/Core/Options.hs b/hsm-core/Hsm/Core/Options.hs
new file mode 100644
index 0000000..29e40a4
--- /dev/null
+++ b/hsm-core/Hsm/Core/Options.hs
@@ -0,0 +1,40 @@
+{-# LANGUAGE OverloadedStrings #-}
+
+module Hsm.Core.Options
+ ( Options(Options)
+ , options
+ ) where
+
+import Control.Applicative ((<**>))
+import Data.Text (Text, pack, unpack)
+import Effectful.Log (LogLevel(LogInfo), readLogLevelEither, showLogLevel)
+import Options.Applicative qualified as P
+import Options.Applicative.Text qualified as P
+
+data Options =
+ Options Text LogLevel
+
+parser :: P.Parser Options
+parser =
+ Options
+ <$> P.textOption
+ (P.help "Path to services config file"
+ <> P.short 'c'
+ <> P.long "config"
+ <> P.metavar "PATH"
+ <> P.value "servconf.yaml"
+ <> P.showDefault)
+ <*> P.option
+ (P.eitherReader $ readLogLevelEither . pack)
+ (P.help "Log level"
+ <> P.short 'l'
+ <> P.long "log-level"
+ <> P.metavar "LEVEL"
+ <> P.value LogInfo
+ <> P.showDefaultWith (unpack . showLogLevel))
+
+options :: Text -> IO Options
+options name =
+ P.customExecParser (P.prefs $ P.columns 100)
+ $ P.info (parser <**> P.helper)
+ $ P.fullDesc <> P.progDesc ("Launch " <> unpack name <> " service")
diff --git a/hsm-core/Hsm/Core/Zmq.hs b/hsm-core/Hsm/Core/Zmq.hs
new file mode 100644
index 0000000..8c12133
--- /dev/null
+++ b/hsm-core/Hsm/Core/Zmq.hs
@@ -0,0 +1,34 @@
+{-# LANGUAGE OverloadedStrings #-}
+
+module Hsm.Core.Zmq
+ ( withSocket
+ ) where
+
+import Data.Text (Text)
+import Effectful (Eff, IOE, (:>))
+import Effectful.Log (Log, LogLevel(LogTrace))
+import Effectful.Resource (Resource, allocate)
+import Hsm.Core.Log (withLogIO)
+import System.ZMQ4 qualified as Z
+
+withSocket ::
+ forall t es. (Z.SocketType t, IOE :> es, Log :> es, Resource :> es)
+ => t
+ -> Eff es (Z.Socket t)
+withSocket stype = withLogIO >>= bracket
+ where
+ bracket :: (LogLevel -> Text -> IO ()) -> Eff es (Z.Socket t)
+ bracket logIO = snd . snd <$> allocate acquire release
+ where
+ acquire :: IO (Z.Context, Z.Socket t)
+ acquire = do
+ logIO LogTrace "Acquiring ZMQ context"
+ cont <- Z.context
+ sock <- Z.socket cont stype
+ return (cont, sock)
+ --
+ release :: (Z.Context, Z.Socket t) -> IO ()
+ release (cont, sock) = do
+ logIO LogTrace "Releasing ZMQ context"
+ Z.close sock
+ Z.shutdown cont
diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs
new file mode 100644
index 0000000..793841e
--- /dev/null
+++ b/hsm-core/Hsm/Core/Zmq/Client.hs
@@ -0,0 +1,108 @@
+{-# LANGUAGE AllowAmbiguousTypes #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TypeFamilies #-}
+
+module Hsm.Core.Zmq.Client
+ ( Client
+ , receive
+ , poll
+ , runClient
+ ) where
+
+import Control.Monad.Loops (whileM)
+import Data.Binary (Binary)
+import Data.Text (Text, pack, unpack)
+import Data.Text.Encoding (encodeUtf8)
+import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>))
+import Effectful.Dispatch.Static qualified as E
+import Effectful.Log (Log, localDomain, logInfo_, logTrace_)
+import Effectful.Reader.Static (Reader, ask)
+import Effectful.Resource (Resource)
+import GHC.Records (HasField)
+import Hsm.Core.Message (body, topic)
+import Hsm.Core.Zmq (withSocket)
+import Streamly.Data.Stream (Stream, repeatM)
+import System.ZMQ4 qualified as Z
+
+data Client a b
+
+type instance DispatchOf Client = Static E.WithSideEffects
+
+newtype instance E.StaticRep Client =
+ Client (Z.Socket Z.Sub)
+
+domain :: Text
+domain = "client"
+
+receiver ::
+ forall es a. (Log :> es, Client :> es, Binary a, Show a)
+ => Eff es a
+receiver = do
+ Client sock <- E.getStaticRep
+ message <- E.unsafeEff_ $ Z.receive sock
+ localDomain domain
+ $ logTrace_
+ $ "Message received ["
+ <> topic message
+ <> "]: "
+ <> pack (show $ body @a message)
+ return $ body message
+
+receive ::
+ forall es a. (Log :> es, Client :> es, Binary a, Show a)
+ => Stream (Eff es) a
+receive = repeatM receiver
+
+poll ::
+ forall es a. (Log :> es, Client :> es, Binary a, Show a)
+ => Stream (Eff es) [a]
+poll = repeatM poller
+ where
+ newMsg :: Eff es Bool
+ newMsg = do
+ Client sock <- E.getStaticRep
+ peek <- E.unsafeEff_ $ Z.poll 0 [Z.Sock sock [Z.In] Nothing]
+ return $ peek /= [[]]
+ --
+ poller :: Eff es [a]
+ poller = do
+ ms <- whileM newMsg receiver
+ localDomain domain
+ $ localDomain "poller"
+ $ logTrace_
+ $ pack (show $ length ms) <> " new message(s) on queue"
+ return ms
+
+runClient ::
+ forall env es a.
+ ( HasField "subEps" env [Text]
+ , HasField "topics" env [Text]
+ , IOE :> es
+ , Log :> es
+ , Reader env :> es
+ , Resource :> es
+ )
+ => Eff (Client : es) a
+ -> Eff es a
+runClient action = withSocket Z.Sub >>= run
+ where
+ run :: Z.Socket Z.Sub -> Eff es a
+ run sock = E.evalStaticRep (Client sock) $ initialize >> action
+ where
+ connect :: Text -> Eff (Client : es) ()
+ connect = E.unsafeEff_ . Z.connect sock . unpack
+ --
+ subscribe :: Text -> Eff (Client : es) ()
+ subscribe = E.unsafeEff_ . Z.subscribe sock . encodeUtf8
+ --
+ initialize :: Eff (Client : es) ()
+ initialize =
+ localDomain domain $ do
+ logInfo_ "Initializing ZMQ client"
+ env <- ask @env
+ mapM_ connect env.subEps
+ mapM_ subscribe env.topics
+ logTrace_ $ "Listening to " <> pack (show env.subEps)
+ logTrace_ $ "Subscribed to " <> pack (show env.topics)
diff --git a/hsm-core/Hsm/Core/Zmq/Server.hs b/hsm-core/Hsm/Core/Zmq/Server.hs
new file mode 100644
index 0000000..5663cd8
--- /dev/null
+++ b/hsm-core/Hsm/Core/Zmq/Server.hs
@@ -0,0 +1,81 @@
+{-# LANGUAGE AllowAmbiguousTypes #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TypeFamilies #-}
+
+module Hsm.Core.Zmq.Server
+ ( Server
+ , send
+ , runServer
+ ) where
+
+import Data.Binary (Binary)
+import Data.Text (Text, pack, unpack)
+import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>))
+import Effectful.Dispatch.Static qualified as E
+import Effectful.Log (Log, localDomain, logInfo_, logTrace_)
+import Effectful.Reader.Static (Reader, ask)
+import Effectful.Resource (Resource)
+import GHC.Records (HasField)
+import Hsm.Core.Message (message)
+import Hsm.Core.Zmq (withSocket)
+import Streamly.Data.Fold qualified as S (drain)
+import Streamly.Data.Stream qualified as S (Stream, fold, mapM)
+import System.ZMQ4 qualified as Z
+
+data Server a b
+
+type instance DispatchOf Server = Static E.WithSideEffects
+
+newtype instance E.StaticRep Server =
+ Server (Z.Socket Z.Pub)
+
+domain :: Text
+domain = "server"
+
+send ::
+ forall env es a.
+ ( HasField "name" env Text
+ , Log :> es
+ , Reader env :> es
+ , Server :> es
+ , Binary a
+ , Show a
+ )
+ => S.Stream (Eff es) a
+ -> Eff es ()
+send = S.fold S.drain . S.mapM sender
+ where
+ sender :: a -> Eff es ()
+ sender payload = do
+ Server sock <- E.getStaticRep
+ env <- ask @env
+ E.unsafeEff_ $ Z.send sock [] $ message env.name payload
+ localDomain domain $ logTrace_ $ "Message sent: " <> pack (show payload)
+
+runServer ::
+ forall env es a.
+ ( HasField "pubEp" env Text
+ , IOE :> es
+ , Log :> es
+ , Reader env :> es
+ , Resource :> es
+ )
+ => Eff (Server : es) a
+ -> Eff es a
+runServer action = withSocket Z.Pub >>= run
+ where
+ run :: Z.Socket Z.Pub -> Eff es a
+ run sock = E.evalStaticRep (Server sock) $ initialize >> action
+ where
+ bind :: Text -> Eff (Server : es) ()
+ bind = E.unsafeEff_ . Z.bind sock . unpack
+ --
+ initialize :: Eff (Server : es) ()
+ initialize =
+ localDomain domain $ do
+ logInfo_ "Initializing ZMQ server"
+ env <- ask @env
+ bind env.pubEp
+ logTrace_ $ "Publishing to " <> env.pubEp
diff --git a/hsm-core/hsm-core.cabal b/hsm-core/hsm-core.cabal
new file mode 100644
index 0000000..bbdbbb3
--- /dev/null
+++ b/hsm-core/hsm-core.cabal
@@ -0,0 +1,42 @@
+cabal-version: 3.4
+author: Paul Oliver
+build-type: Simple
+maintainer: contact@pauloliver.dev
+name: hsm-core
+version: 0.1.0.0
+
+library
+ build-depends:
+ , aeson
+ , base
+ , binary
+ , bytestring
+ , effectful-core
+ , log-base
+ , log-effectful
+ , monad-loops
+ , optparse-applicative
+ , optparse-text
+ , resourcet-effectful
+ , streamly-core
+ , template-haskell
+ , text
+ , time
+ , yaml
+ , zeromq4-haskell
+
+ exposed-modules:
+ Hsm.Core.App
+ Hsm.Core.Env
+ Hsm.Core.Fsm
+ Hsm.Core.Log
+ Hsm.Core.Message
+ Hsm.Core.Zmq.Client
+ Hsm.Core.Zmq.Server
+
+ other-modules:
+ Hsm.Core.Options
+ Hsm.Core.Zmq
+
+ ghc-options: -Wall -Wunused-packages
+ default-language: GHC2021
diff --git a/hsm-dummy-poller/Main.hs b/hsm-dummy-poller/Main.hs
new file mode 100644
index 0000000..7a35230
--- /dev/null
+++ b/hsm-dummy-poller/Main.hs
@@ -0,0 +1,56 @@
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TemplateHaskell #-}
+
+import Control.Monad (forM_)
+import Data.Function ((&))
+import Data.Text (Text, pack)
+import Effectful (Eff, (:>), runEff)
+import Effectful.Concurrent (Concurrent, runConcurrent, threadDelay)
+import Effectful.Log (Log, localDomain, logInfo_, runLog)
+import Effectful.Reader.Static (Reader, asks, runReader)
+import Effectful.Resource (runResource)
+import Hsm.Core.App (launch)
+import Hsm.Core.Env (deriveFromYaml)
+import Hsm.Core.Zmq.Client (poll, runClient)
+import Streamly.Data.Fold qualified as S (drain)
+import Streamly.Data.Stream qualified as S (Stream, fold, mapM)
+import System.IO.Echo (withoutInputEcho)
+
+data Env = Env
+ { name :: Text
+ , subEps :: [Text]
+ , topics :: [Text]
+ , period :: Int
+ }
+
+$(deriveFromYaml ''Env)
+
+handle ::
+ forall es. (Concurrent :> es, Log :> es, Reader Env :> es)
+ => S.Stream (Eff es) [Int]
+ -> Eff es ()
+handle =
+ S.fold S.drain . S.mapM (\p -> asks period >>= threadDelay >> handler p)
+ where
+ receiverDomain :: Text
+ receiverDomain = "receiver"
+ --
+ handler :: [Int] -> Eff es ()
+ handler [] = localDomain receiverDomain $ logInfo_ "No pulse received yet"
+ handler ps =
+ forM_ ps $ \p ->
+ localDomain receiverDomain
+ $ logInfo_
+ $ "Received pulse #" <> pack (show p)
+
+main :: IO ()
+main =
+ launch @Env "dummy-poller" withoutInputEcho $ \env logger level ->
+ (poll & handle)
+ & runClient @Env
+ & runConcurrent
+ & runLog env.name logger level
+ & runReader env
+ & runResource
+ & runEff
diff --git a/hsm-dummy-poller/hsm-dummy-poller.cabal b/hsm-dummy-poller/hsm-dummy-poller.cabal
new file mode 100644
index 0000000..801cf68
--- /dev/null
+++ b/hsm-dummy-poller/hsm-dummy-poller.cabal
@@ -0,0 +1,25 @@
+cabal-version: 3.4
+author: Paul Oliver
+build-type: Simple
+maintainer: contact@pauloliver.dev
+name: hsm-dummy-poller
+version: 0.1.0.0
+
+executable dummy-poller
+ build-depends:
+ , base
+ , echo
+ , effectful
+ , hsm-core
+ , log-effectful
+ , resourcet-effectful
+ , streamly-core
+ , text
+
+ main-is: Main.hs
+ ghc-options: -Wall -Wunused-packages
+
+ if !arch(x86_64)
+ ghc-options: -optl=-mno-fix-cortex-a53-835769
+
+ default-language: GHC2021
diff --git a/hsm-dummy-pulser/Main.hs b/hsm-dummy-pulser/Main.hs
new file mode 100644
index 0000000..3d734ba
--- /dev/null
+++ b/hsm-dummy-pulser/Main.hs
@@ -0,0 +1,64 @@
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TemplateHaskell #-}
+
+import Data.Function ((&))
+import Data.Text (Text, pack)
+import Effectful (Eff, (:>), runEff)
+import Effectful.Concurrent (Concurrent, runConcurrent, threadDelay)
+import Effectful.Log (LogLevel(LogAttention, LogInfo), runLog)
+import Effectful.Reader.Static (Reader, asks, runReader)
+import Effectful.Resource (runResource)
+import Effectful.State.Static.Local (evalState)
+import Hsm.Core.App (launch)
+import Hsm.Core.Env (deriveFromYaml)
+import Hsm.Core.Fsm qualified as F
+import Hsm.Core.Zmq.Server (runServer, send)
+import Streamly.Data.Stream (Stream, repeatM)
+import System.IO.Echo (withoutInputEcho)
+
+data Env = Env
+ { name :: Text
+ , pubEp :: Text
+ , period :: Int
+ , pulses :: Int
+ }
+
+$(deriveFromYaml ''Env)
+
+pulse :: (Concurrent :> es, Reader Env :> es) => Stream (Eff es) ()
+pulse = repeatM $ asks period >>= threadDelay
+
+stateRun :: F.FsmState () Int Env Int
+stateRun = F.FsmState "run" action
+ where
+ action :: () -> Env -> Int -> F.FsmOutput () Int Env Int
+ action _ env sta =
+ if sta < env.pulses
+ then next
+ else exit
+ where
+ next :: F.FsmOutput () Int Env Int
+ next =
+ F.FsmOutput
+ (Just $ F.FsmResult sta (succ sta) stateRun)
+ [(LogInfo, "Sending pulse #" <> pack (show sta))]
+ --
+ exit :: F.FsmOutput () Int Env Int
+ exit =
+ F.FsmOutput
+ Nothing
+ [(LogAttention, "Reached " <> pack (show env.pulses) <> " pulses")]
+
+main :: IO ()
+main =
+ launch @Env "dummy-pulser" withoutInputEcho $ \env logger level ->
+ (pulse & F.fsm @_ @_ @Env @Int & send @Env @_ @Int)
+ & runServer @Env
+ & evalState @Int 1
+ & evalState stateRun
+ & runConcurrent
+ & runLog env.name logger level
+ & runReader env
+ & runResource
+ & runEff
diff --git a/hsm-dummy-pulser/hsm-dummy-pulser.cabal b/hsm-dummy-pulser/hsm-dummy-pulser.cabal
new file mode 100644
index 0000000..747f62c
--- /dev/null
+++ b/hsm-dummy-pulser/hsm-dummy-pulser.cabal
@@ -0,0 +1,25 @@
+cabal-version: 3.4
+author: Paul Oliver
+build-type: Simple
+maintainer: contact@pauloliver.dev
+name: hsm-dummy-pulser
+version: 0.1.0.0
+
+executable dummy-pulser
+ build-depends:
+ , base
+ , echo
+ , effectful
+ , hsm-core
+ , log-effectful
+ , resourcet-effectful
+ , streamly-core
+ , text
+
+ main-is: Main.hs
+ ghc-options: -Wall -Wunused-packages
+
+ if !arch(x86_64)
+ ghc-options: -optl=-mno-fix-cortex-a53-835769
+
+ default-language: GHC2021
diff --git a/hsm-dummy-receiver/Main.hs b/hsm-dummy-receiver/Main.hs
new file mode 100644
index 0000000..c0aa98a
--- /dev/null
+++ b/hsm-dummy-receiver/Main.hs
@@ -0,0 +1,44 @@
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TemplateHaskell #-}
+
+import Data.Function ((&))
+import Data.Text (Text, pack)
+import Effectful (Eff, (:>), runEff)
+import Effectful.Log (Log, localDomain, logInfo_, runLog)
+import Effectful.Reader.Static (runReader)
+import Effectful.Resource (runResource)
+import Hsm.Core.App (launch)
+import Hsm.Core.Env (deriveFromYaml)
+import Hsm.Core.Zmq.Client (receive, runClient)
+import Streamly.Data.Fold qualified as S (drain)
+import Streamly.Data.Stream qualified as S (Stream, fold, mapM)
+import System.IO.Echo (withoutInputEcho)
+
+data Env = Env
+ { name :: Text
+ , subEps :: [Text]
+ , topics :: [Text]
+ }
+
+$(deriveFromYaml ''Env)
+
+handle ::
+ forall es. Log :> es
+ => S.Stream (Eff es) Int
+ -> Eff es ()
+handle = S.fold S.drain . S.mapM handler
+ where
+ handler :: Int -> Eff es ()
+ handler p =
+ localDomain "receiver" $ logInfo_ $ "Received pulse #" <> pack (show p)
+
+main :: IO ()
+main =
+ launch @Env "dummy-receiver" withoutInputEcho $ \env logger level ->
+ (receive & handle)
+ & runClient @Env
+ & runLog env.name logger level
+ & runReader env
+ & runResource
+ & runEff
diff --git a/hsm-dummy-receiver/hsm-dummy-receiver.cabal b/hsm-dummy-receiver/hsm-dummy-receiver.cabal
new file mode 100644
index 0000000..9738bbe
--- /dev/null
+++ b/hsm-dummy-receiver/hsm-dummy-receiver.cabal
@@ -0,0 +1,25 @@
+cabal-version: 3.4
+author: Paul Oliver
+build-type: Simple
+maintainer: contact@pauloliver.dev
+name: hsm-dummy-receiver
+version: 0.1.0.0
+
+executable dummy-receiver
+ build-depends:
+ , base
+ , echo
+ , effectful-core
+ , hsm-core
+ , log-effectful
+ , resourcet-effectful
+ , streamly-core
+ , text
+
+ main-is: Main.hs
+ ghc-options: -Wall -Wunused-packages
+
+ if !arch(x86_64)
+ ghc-options: -optl=-mno-fix-cortex-a53-835769
+
+ default-language: GHC2021
diff --git a/servconf.yaml b/servconf.yaml
new file mode 100644
index 0000000..9bf4c63
--- /dev/null
+++ b/servconf.yaml
@@ -0,0 +1,21 @@
+command:
+ name: command
+ pubEp: tcp://0.0.0.0:10000
+dummy-poller:
+ name: poller
+ period: 3000000
+ subEps:
+ - tcp://0.0.0.0:10001
+ topics:
+ - pulser
+dummy-pulser:
+ name: pulser
+ period: 1000000
+ pubEp: tcp://0.0.0.0:10001
+ pulses: 10
+dummy-receiver:
+ name: receiver
+ subEps:
+ - tcp://0.0.0.0:10001
+ topics:
+ - pulser
diff --git a/stack.yaml b/stack.yaml
new file mode 100644
index 0000000..aff8282
--- /dev/null
+++ b/stack.yaml
@@ -0,0 +1,11 @@
+allow-newer: true
+extra-deps:
+ - log-effectful-1.0.1.0
+ - resourcet-effectful-1.0.1.0
+packages:
+ - hsm-command
+ - hsm-core
+ - hsm-dummy-poller
+ - hsm-dummy-pulser
+ - hsm-dummy-receiver
+snapshot: lts-23.3
diff --git a/stack.yaml.lock b/stack.yaml.lock
new file mode 100644
index 0000000..aa298d4
--- /dev/null
+++ b/stack.yaml.lock
@@ -0,0 +1,26 @@
+# This file was autogenerated by Stack.
+# You should not edit this file by hand.
+# For more information, please see the documentation at:
+# https://docs.haskellstack.org/en/stable/topics/lock_files
+
+packages:
+- completed:
+ hackage: log-effectful-1.0.1.0@sha256:79d1c821db1c1d95cf109f813f13a2588e45dbacb5f797eefc200ff7a5984923,2466
+ pantry-tree:
+ sha256: 5ab7c6b553ea50ce7b6218e86db9ff3632b0482dd00aaf749ebece93b968e0ca
+ size: 326
+ original:
+ hackage: log-effectful-1.0.1.0
+- completed:
+ hackage: resourcet-effectful-1.0.1.0@sha256:13f94c9832d0d1573abbabcddc5c3aa3c341973d1d442445795593e355e7803e,2115
+ pantry-tree:
+ sha256: ef0db7bdeca5df1e722958cf5c8f3205ed5bf92b111e0fbc5d1a3c592d1c210e
+ size: 283
+ original:
+ hackage: resourcet-effectful-1.0.1.0
+snapshots:
+- completed:
+ sha256: dd89d2322cb5af74c6ab9d96c0c5f6c8e6653e0c991d619b4bb141a49cb98668
+ size: 679282
+ url: https://raw.githubusercontent.com/commercialhaskell/stackage-snapshots/master/lts/23/3.yaml
+ original: lts-23.3