aboutsummaryrefslogtreecommitdiff
path: root/hsm-core/Hsm
diff options
context:
space:
mode:
authorPaul Oliver <contact@pauloliver.dev>2024-12-29 17:05:34 +0000
committerPaul Oliver <contact@pauloliver.dev>2025-01-16 18:30:09 -0800
commitcc639b06c7126fac7b445d8f778455620d7f8f50 (patch)
treea4c5c7c0b0a9cdb5bea0891e198003035065e57d /hsm-core/Hsm
Initial
Diffstat (limited to 'hsm-core/Hsm')
-rw-r--r--hsm-core/Hsm/Core/App.hs21
-rw-r--r--hsm-core/Hsm/Core/Env.hs29
-rw-r--r--hsm-core/Hsm/Core/Fsm.hs61
-rw-r--r--hsm-core/Hsm/Core/Log.hs20
-rw-r--r--hsm-core/Hsm/Core/Message.hs25
-rw-r--r--hsm-core/Hsm/Core/Options.hs40
-rw-r--r--hsm-core/Hsm/Core/Zmq.hs34
-rw-r--r--hsm-core/Hsm/Core/Zmq/Client.hs108
-rw-r--r--hsm-core/Hsm/Core/Zmq/Server.hs81
9 files changed, 419 insertions, 0 deletions
diff --git a/hsm-core/Hsm/Core/App.hs b/hsm-core/Hsm/Core/App.hs
new file mode 100644
index 0000000..11759be
--- /dev/null
+++ b/hsm-core/Hsm/Core/App.hs
@@ -0,0 +1,21 @@
+module Hsm.Core.App
+ ( launch
+ ) where
+
+import Data.Aeson (FromJSON)
+import Data.Text (Text)
+import Effectful.Log (LogLevel, Logger)
+import Hsm.Core.Env (environment)
+import Hsm.Core.Options (Options(Options), options)
+import Log.Backend.StandardOutput (withStdOutLogger)
+
+launch ::
+ FromJSON env
+ => Text
+ -> (IO app -> IO app)
+ -> (env -> Logger -> LogLevel -> IO app)
+ -> IO app
+launch name wrapper app = do
+ Options path level <- options name
+ env <- environment name path
+ wrapper $ withStdOutLogger $ \logger -> app env logger level
diff --git a/hsm-core/Hsm/Core/Env.hs b/hsm-core/Hsm/Core/Env.hs
new file mode 100644
index 0000000..4e7986f
--- /dev/null
+++ b/hsm-core/Hsm/Core/Env.hs
@@ -0,0 +1,29 @@
+module Hsm.Core.Env
+ ( environment
+ , deriveFromYaml
+ ) where
+
+import Data.Aeson (FromJSON, Result(Error, Success), Value, fromJSON)
+import Data.Aeson.Key (fromText)
+import Data.Aeson.KeyMap (KeyMap, (!?))
+import Data.Aeson.TH (defaultOptions, deriveFromJSON, rejectUnknownFields)
+import Data.Maybe (fromMaybe)
+import Data.Text (Text, unpack)
+import Data.Yaml (decodeFileThrow)
+import Language.Haskell.TH (Dec, Name, Q)
+
+environment :: FromJSON env => Text -> Text -> IO env
+environment name = fmap (check . fromJSON . load) . decodeFileThrow . unpack
+ where
+ load :: KeyMap Value -> Value
+ load keymap =
+ fromMaybe
+ (error $ "Service configuration for " <> unpack name <> " not found)")
+ $ keymap !? fromText name
+ --
+ check :: Result env -> env
+ check (Success env) = env
+ check (Error str) = error str
+
+deriveFromYaml :: Name -> Q [Dec]
+deriveFromYaml = deriveFromJSON defaultOptions {rejectUnknownFields = True}
diff --git a/hsm-core/Hsm/Core/Fsm.hs b/hsm-core/Hsm/Core/Fsm.hs
new file mode 100644
index 0000000..43fa497
--- /dev/null
+++ b/hsm-core/Hsm/Core/Fsm.hs
@@ -0,0 +1,61 @@
+{-# LANGUAGE AllowAmbiguousTypes #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+module Hsm.Core.Fsm
+ ( FsmState(FsmState)
+ , FsmOutput(FsmOutput)
+ , FsmResult(FsmResult)
+ , fsm
+ ) where
+
+import Data.Maybe (fromJust, isJust)
+import Data.Text (Text)
+import Effectful (Eff, (:>))
+import Effectful.Log (Log, LogLevel, localDomain, logAttention_, logTrace_)
+import Effectful.Reader.Static (Reader, ask)
+import Effectful.State.Static.Local (State, get, put)
+import Hsm.Core.Log (logTup)
+import Streamly.Data.Stream qualified as S (Stream, mapM, takeWhile)
+
+data FsmState i o env sta =
+ FsmState Text (i -> env -> sta -> FsmOutput i o env sta)
+
+data FsmOutput i o env sta =
+ FsmOutput (Maybe (FsmResult i o env sta)) [(LogLevel, Text)]
+
+data FsmResult i o env sta =
+ FsmResult o sta (FsmState i o env sta)
+
+fsm ::
+ forall i o env sta es.
+ ( Log :> es
+ , Reader env :> es
+ , State (FsmState i o env sta) :> es
+ , State sta :> es
+ )
+ => S.Stream (Eff es) i
+ -> S.Stream (Eff es) o
+fsm = S.mapM (return . fromJust) . S.takeWhile isJust . S.mapM run
+ where
+ exit :: Eff es (Maybe o)
+ exit = do
+ logAttention_ "No state returned, exiting FSM"
+ return Nothing
+ --
+ push :: FsmResult i o env sta -> Eff es (Maybe o)
+ push (FsmResult out sta next) = do
+ put sta
+ put next
+ return $ Just out
+ --
+ run :: i -> Eff es (Maybe o)
+ run input =
+ localDomain "fsm" $ do
+ FsmState name action <- get
+ sta <- get
+ env <- ask
+ logTrace_ $ "Entering state " <> name
+ FsmOutput res logs <- return $ action input env sta
+ localDomain name $ mapM_ logTup logs
+ logTrace_ $ "Exiting state " <> name
+ maybe exit push res
diff --git a/hsm-core/Hsm/Core/Log.hs b/hsm-core/Hsm/Core/Log.hs
new file mode 100644
index 0000000..9f1f357
--- /dev/null
+++ b/hsm-core/Hsm/Core/Log.hs
@@ -0,0 +1,20 @@
+module Hsm.Core.Log
+ ( withLogIO
+ , logTup
+ ) where
+
+import Data.Aeson.Types (emptyObject)
+import Data.Text (Text)
+import Data.Time.Clock (getCurrentTime)
+import Effectful (Eff, (:>))
+import Effectful.Log (Log, LogLevel, getLoggerIO, logMessage)
+
+withLogIO :: Log :> es => Eff es (LogLevel -> Text -> IO ())
+withLogIO = do
+ logIO <- getLoggerIO
+ return $ \level message -> do
+ now <- getCurrentTime
+ logIO now level message emptyObject
+
+logTup :: Log :> es => (LogLevel, Text) -> Eff es ()
+logTup (level, message) = logMessage level message emptyObject
diff --git a/hsm-core/Hsm/Core/Message.hs b/hsm-core/Hsm/Core/Message.hs
new file mode 100644
index 0000000..b2a9f23
--- /dev/null
+++ b/hsm-core/Hsm/Core/Message.hs
@@ -0,0 +1,25 @@
+{-# LANGUAGE OverloadedStrings #-}
+
+module Hsm.Core.Message
+ ( message
+ , topic
+ , body
+ ) where
+
+import Data.Binary (Binary, decode, encode)
+import Data.ByteString (ByteString, fromStrict, toStrict)
+import Data.ByteString.Char8 qualified as B (breakSubstring, drop, length)
+import Data.Text (Text)
+import Data.Text.Encoding (decodeUtf8, encodeUtf8)
+
+sep :: ByteString
+sep = "//"
+
+message :: Binary a => Text -> a -> ByteString
+message t b = encodeUtf8 t <> sep <> toStrict (encode b)
+
+topic :: ByteString -> Text
+topic = decodeUtf8 . fst . B.breakSubstring sep
+
+body :: Binary a => ByteString -> a
+body = decode . fromStrict . B.drop (B.length sep) . snd . B.breakSubstring sep
diff --git a/hsm-core/Hsm/Core/Options.hs b/hsm-core/Hsm/Core/Options.hs
new file mode 100644
index 0000000..29e40a4
--- /dev/null
+++ b/hsm-core/Hsm/Core/Options.hs
@@ -0,0 +1,40 @@
+{-# LANGUAGE OverloadedStrings #-}
+
+module Hsm.Core.Options
+ ( Options(Options)
+ , options
+ ) where
+
+import Control.Applicative ((<**>))
+import Data.Text (Text, pack, unpack)
+import Effectful.Log (LogLevel(LogInfo), readLogLevelEither, showLogLevel)
+import Options.Applicative qualified as P
+import Options.Applicative.Text qualified as P
+
+data Options =
+ Options Text LogLevel
+
+parser :: P.Parser Options
+parser =
+ Options
+ <$> P.textOption
+ (P.help "Path to services config file"
+ <> P.short 'c'
+ <> P.long "config"
+ <> P.metavar "PATH"
+ <> P.value "servconf.yaml"
+ <> P.showDefault)
+ <*> P.option
+ (P.eitherReader $ readLogLevelEither . pack)
+ (P.help "Log level"
+ <> P.short 'l'
+ <> P.long "log-level"
+ <> P.metavar "LEVEL"
+ <> P.value LogInfo
+ <> P.showDefaultWith (unpack . showLogLevel))
+
+options :: Text -> IO Options
+options name =
+ P.customExecParser (P.prefs $ P.columns 100)
+ $ P.info (parser <**> P.helper)
+ $ P.fullDesc <> P.progDesc ("Launch " <> unpack name <> " service")
diff --git a/hsm-core/Hsm/Core/Zmq.hs b/hsm-core/Hsm/Core/Zmq.hs
new file mode 100644
index 0000000..8c12133
--- /dev/null
+++ b/hsm-core/Hsm/Core/Zmq.hs
@@ -0,0 +1,34 @@
+{-# LANGUAGE OverloadedStrings #-}
+
+module Hsm.Core.Zmq
+ ( withSocket
+ ) where
+
+import Data.Text (Text)
+import Effectful (Eff, IOE, (:>))
+import Effectful.Log (Log, LogLevel(LogTrace))
+import Effectful.Resource (Resource, allocate)
+import Hsm.Core.Log (withLogIO)
+import System.ZMQ4 qualified as Z
+
+withSocket ::
+ forall t es. (Z.SocketType t, IOE :> es, Log :> es, Resource :> es)
+ => t
+ -> Eff es (Z.Socket t)
+withSocket stype = withLogIO >>= bracket
+ where
+ bracket :: (LogLevel -> Text -> IO ()) -> Eff es (Z.Socket t)
+ bracket logIO = snd . snd <$> allocate acquire release
+ where
+ acquire :: IO (Z.Context, Z.Socket t)
+ acquire = do
+ logIO LogTrace "Acquiring ZMQ context"
+ cont <- Z.context
+ sock <- Z.socket cont stype
+ return (cont, sock)
+ --
+ release :: (Z.Context, Z.Socket t) -> IO ()
+ release (cont, sock) = do
+ logIO LogTrace "Releasing ZMQ context"
+ Z.close sock
+ Z.shutdown cont
diff --git a/hsm-core/Hsm/Core/Zmq/Client.hs b/hsm-core/Hsm/Core/Zmq/Client.hs
new file mode 100644
index 0000000..793841e
--- /dev/null
+++ b/hsm-core/Hsm/Core/Zmq/Client.hs
@@ -0,0 +1,108 @@
+{-# LANGUAGE AllowAmbiguousTypes #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TypeFamilies #-}
+
+module Hsm.Core.Zmq.Client
+ ( Client
+ , receive
+ , poll
+ , runClient
+ ) where
+
+import Control.Monad.Loops (whileM)
+import Data.Binary (Binary)
+import Data.Text (Text, pack, unpack)
+import Data.Text.Encoding (encodeUtf8)
+import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>))
+import Effectful.Dispatch.Static qualified as E
+import Effectful.Log (Log, localDomain, logInfo_, logTrace_)
+import Effectful.Reader.Static (Reader, ask)
+import Effectful.Resource (Resource)
+import GHC.Records (HasField)
+import Hsm.Core.Message (body, topic)
+import Hsm.Core.Zmq (withSocket)
+import Streamly.Data.Stream (Stream, repeatM)
+import System.ZMQ4 qualified as Z
+
+data Client a b
+
+type instance DispatchOf Client = Static E.WithSideEffects
+
+newtype instance E.StaticRep Client =
+ Client (Z.Socket Z.Sub)
+
+domain :: Text
+domain = "client"
+
+receiver ::
+ forall es a. (Log :> es, Client :> es, Binary a, Show a)
+ => Eff es a
+receiver = do
+ Client sock <- E.getStaticRep
+ message <- E.unsafeEff_ $ Z.receive sock
+ localDomain domain
+ $ logTrace_
+ $ "Message received ["
+ <> topic message
+ <> "]: "
+ <> pack (show $ body @a message)
+ return $ body message
+
+receive ::
+ forall es a. (Log :> es, Client :> es, Binary a, Show a)
+ => Stream (Eff es) a
+receive = repeatM receiver
+
+poll ::
+ forall es a. (Log :> es, Client :> es, Binary a, Show a)
+ => Stream (Eff es) [a]
+poll = repeatM poller
+ where
+ newMsg :: Eff es Bool
+ newMsg = do
+ Client sock <- E.getStaticRep
+ peek <- E.unsafeEff_ $ Z.poll 0 [Z.Sock sock [Z.In] Nothing]
+ return $ peek /= [[]]
+ --
+ poller :: Eff es [a]
+ poller = do
+ ms <- whileM newMsg receiver
+ localDomain domain
+ $ localDomain "poller"
+ $ logTrace_
+ $ pack (show $ length ms) <> " new message(s) on queue"
+ return ms
+
+runClient ::
+ forall env es a.
+ ( HasField "subEps" env [Text]
+ , HasField "topics" env [Text]
+ , IOE :> es
+ , Log :> es
+ , Reader env :> es
+ , Resource :> es
+ )
+ => Eff (Client : es) a
+ -> Eff es a
+runClient action = withSocket Z.Sub >>= run
+ where
+ run :: Z.Socket Z.Sub -> Eff es a
+ run sock = E.evalStaticRep (Client sock) $ initialize >> action
+ where
+ connect :: Text -> Eff (Client : es) ()
+ connect = E.unsafeEff_ . Z.connect sock . unpack
+ --
+ subscribe :: Text -> Eff (Client : es) ()
+ subscribe = E.unsafeEff_ . Z.subscribe sock . encodeUtf8
+ --
+ initialize :: Eff (Client : es) ()
+ initialize =
+ localDomain domain $ do
+ logInfo_ "Initializing ZMQ client"
+ env <- ask @env
+ mapM_ connect env.subEps
+ mapM_ subscribe env.topics
+ logTrace_ $ "Listening to " <> pack (show env.subEps)
+ logTrace_ $ "Subscribed to " <> pack (show env.topics)
diff --git a/hsm-core/Hsm/Core/Zmq/Server.hs b/hsm-core/Hsm/Core/Zmq/Server.hs
new file mode 100644
index 0000000..5663cd8
--- /dev/null
+++ b/hsm-core/Hsm/Core/Zmq/Server.hs
@@ -0,0 +1,81 @@
+{-# LANGUAGE AllowAmbiguousTypes #-}
+{-# LANGUAGE DataKinds #-}
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TypeFamilies #-}
+
+module Hsm.Core.Zmq.Server
+ ( Server
+ , send
+ , runServer
+ ) where
+
+import Data.Binary (Binary)
+import Data.Text (Text, pack, unpack)
+import Effectful (Dispatch(Static), DispatchOf, Eff, IOE, (:>))
+import Effectful.Dispatch.Static qualified as E
+import Effectful.Log (Log, localDomain, logInfo_, logTrace_)
+import Effectful.Reader.Static (Reader, ask)
+import Effectful.Resource (Resource)
+import GHC.Records (HasField)
+import Hsm.Core.Message (message)
+import Hsm.Core.Zmq (withSocket)
+import Streamly.Data.Fold qualified as S (drain)
+import Streamly.Data.Stream qualified as S (Stream, fold, mapM)
+import System.ZMQ4 qualified as Z
+
+data Server a b
+
+type instance DispatchOf Server = Static E.WithSideEffects
+
+newtype instance E.StaticRep Server =
+ Server (Z.Socket Z.Pub)
+
+domain :: Text
+domain = "server"
+
+send ::
+ forall env es a.
+ ( HasField "name" env Text
+ , Log :> es
+ , Reader env :> es
+ , Server :> es
+ , Binary a
+ , Show a
+ )
+ => S.Stream (Eff es) a
+ -> Eff es ()
+send = S.fold S.drain . S.mapM sender
+ where
+ sender :: a -> Eff es ()
+ sender payload = do
+ Server sock <- E.getStaticRep
+ env <- ask @env
+ E.unsafeEff_ $ Z.send sock [] $ message env.name payload
+ localDomain domain $ logTrace_ $ "Message sent: " <> pack (show payload)
+
+runServer ::
+ forall env es a.
+ ( HasField "pubEp" env Text
+ , IOE :> es
+ , Log :> es
+ , Reader env :> es
+ , Resource :> es
+ )
+ => Eff (Server : es) a
+ -> Eff es a
+runServer action = withSocket Z.Pub >>= run
+ where
+ run :: Z.Socket Z.Pub -> Eff es a
+ run sock = E.evalStaticRep (Server sock) $ initialize >> action
+ where
+ bind :: Text -> Eff (Server : es) ()
+ bind = E.unsafeEff_ . Z.bind sock . unpack
+ --
+ initialize :: Eff (Server : es) ()
+ initialize =
+ localDomain domain $ do
+ logInfo_ "Initializing ZMQ server"
+ env <- ask @env
+ bind env.pubEp
+ logTrace_ $ "Publishing to " <> env.pubEp