summaryrefslogtreecommitdiff
path: root/hsm-core/Hsm
diff options
context:
space:
mode:
authorPaul Oliver <contact@pauloliver.dev>2024-08-24 11:57:18 -0700
committerPaul Oliver <contact@pauloliver.dev>2024-12-01 07:01:30 -0800
commitf0854265f7a1b59078308965d33fe2583a5c0f9c (patch)
treed8b06110d84fce783f1cc91aa37155351c655b2c /hsm-core/Hsm
Initial commitHEADmaster
Diffstat (limited to 'hsm-core/Hsm')
-rw-r--r--hsm-core/Hsm/Core/App.hs88
-rw-r--r--hsm-core/Hsm/Core/Fsm.hs89
-rw-r--r--hsm-core/Hsm/Core/Message.hs27
-rw-r--r--hsm-core/Hsm/Core/Zmq.hs175
4 files changed, 379 insertions, 0 deletions
diff --git a/hsm-core/Hsm/Core/App.hs b/hsm-core/Hsm/Core/App.hs
new file mode 100644
index 0000000..1bb3465
--- /dev/null
+++ b/hsm-core/Hsm/Core/App.hs
@@ -0,0 +1,88 @@
+{-# LANGUAGE ApplicativeDo #-}
+{-# LANGUAGE ImportQualifiedPost #-}
+
+module Hsm.Core.App
+ ( launch
+ , launchWithEcho
+ )
+where
+
+import Control.Applicative ((<**>))
+import Data.Aeson (FromJSON, Result (Error, Success), Value, fromJSON)
+import Data.Aeson.Key (fromString)
+import Data.Aeson.KeyMap (KeyMap, (!?))
+import Data.Composition ((.:))
+import Data.Function.Slip (slipl)
+import Data.Maybe (fromMaybe)
+import Data.Text (pack, unpack)
+import Data.Yaml (decodeFileThrow)
+import Effectful.Log qualified as L
+import Log.Backend.StandardOutput (withStdOutLogger)
+import Options.Applicative qualified as P
+import System.IO.Echo (withoutInputEcho)
+
+data Options = Options String L.LogLevel
+
+type App e a = e -> L.Logger -> L.LogLevel -> IO a
+
+parser :: P.Parser Options
+parser = do
+ config <-
+ P.strOption $
+ P.help "Path to services config file"
+ <> P.short 'c'
+ <> P.long "config"
+ <> P.metavar "PATH"
+ <> P.value "config.yaml"
+ <> P.showDefault
+ level <-
+ P.option (P.eitherReader $ L.readLogLevelEither . pack) $
+ P.help "Log level"
+ <> P.short 'l'
+ <> P.long "log-level"
+ <> P.metavar "LEVEL"
+ <> P.value L.LogInfo
+ <> P.showDefaultWith (unpack . L.showLogLevel)
+ pure $ Options config level
+
+launchWith
+ :: forall e a
+ . FromJSON e
+ => String
+ -> App e a
+ -> (IO a -> IO a)
+ -> IO a
+launchWith name app wrapper = do
+ Options path level <- P.execParser info
+ returnEnv path >>= runApp level
+ where
+ title :: String
+ title = "Launch " <> name <> " service"
+
+ description :: P.InfoMod Options
+ description = P.fullDesc <> P.progDesc title
+
+ info :: P.ParserInfo Options
+ info = P.info (parser <**> P.helper) description
+
+ err :: String
+ err = "Service configuration for " <> name <> " not found"
+
+ load :: KeyMap Value -> Value
+ load configs = fromMaybe (error err) $ configs !? fromString name
+
+ check :: Result e -> e
+ check (Success e) = e
+ check (Error str) = error str
+
+ returnEnv :: String -> IO e
+ returnEnv = fmap (check . fromJSON . load) . decodeFileThrow
+
+ runApp :: L.LogLevel -> e -> IO a
+ runApp = wrapper . withStdOutLogger .: slipl app
+
+launch :: FromJSON e => String -> App e a -> IO a
+launch = slipl launchWith withoutInputEcho
+
+launchWithEcho :: FromJSON e => String -> App e a -> IO a
+launchWithEcho = slipl launchWith id
diff --git a/hsm-core/Hsm/Core/Fsm.hs b/hsm-core/Hsm/Core/Fsm.hs
new file mode 100644
index 0000000..e0f54a3
--- /dev/null
+++ b/hsm-core/Hsm/Core/Fsm.hs
@@ -0,0 +1,89 @@
+{-# LANGUAGE AllowAmbiguousTypes #-}
+{-# LANGUAGE ImportQualifiedPost #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+module Hsm.Core.Fsm
+ ( FsmState (FsmState)
+ , FsmResult (FsmResult)
+ , FsmOutput (FsmOutput)
+ , fsm
+ , fsmStream
+ , pLogAttention
+ , pLogInfo
+ , pLogTrace
+ )
+where
+
+import Control.Monad (forM_)
+import Data.Aeson.Types (emptyObject)
+import Data.Composition ((.:))
+import Data.Function.Slip (slipl)
+import Data.List (singleton)
+import Data.Maybe (fromJust, isJust)
+import Data.Text (Text)
+import Effectful (Eff, (:>))
+import Effectful.Log qualified as L
+import Effectful.Reader.Static (Reader, ask)
+import Effectful.State.Static.Local (State, get, put)
+import Streamly.Data.Stream (Stream, mapM, takeWhile)
+import Prelude hiding (mapM, takeWhile)
+
+data FsmState i o e s
+ = FsmState Text (i -> e -> s -> FsmOutput i o e s)
+
+data FsmResult i o e s
+ = FsmResult o s (FsmState i o e s)
+
+data FsmOutput i o e s
+ = FsmOutput (Maybe (FsmResult i o e s)) [(L.LogLevel, Text)]
+
+type FsmConstraint i o e s es =
+ ( L.Log :> es
+ , Reader e :> es
+ , State (FsmState i o e s) :> es
+ , State s :> es
+ )
+
+fsm :: forall i o e s es. FsmConstraint i o e s es => i -> Eff es (Maybe o)
+fsm input =
+ L.localDomain "fsm" $ do
+ FsmState name action <- get
+ state <- get
+ env <- ask
+ L.logTrace_ $ "Entering state " <> name
+ FsmOutput res logs <- return $ action input env state
+ L.localDomain name $ forM_ logs $ uncurry $ slipl L.logMessage emptyObject
+ L.logTrace_ $ "Exiting state " <> name
+ maybe exit push res
+ where
+ push :: FsmResult i o e s -> Eff es (Maybe o)
+ push (FsmResult output state next) = do
+ put state
+ put next
+ return $ Just output
+
+ exit :: Eff es (Maybe o)
+ exit = do
+ L.logAttention_ "No state returned, exiting FSM"
+ return Nothing
+
+fsmStream
+ :: forall i o e s es
+ . FsmConstraint i o e s es
+ => Stream (Eff es) i
+ -> Stream (Eff es) o
+fsmStream = mapM (return . fromJust) . takeWhile isJust . mapM (fsm @_ @_ @e @s)
+
+type LogList = [(L.LogLevel, Text)]
+
+pLog :: L.LogLevel -> Text -> LogList
+pLog = singleton .: (,)
+
+pLogAttention :: Text -> LogList
+pLogAttention = pLog L.LogAttention
+
+pLogInfo :: Text -> LogList
+pLogInfo = pLog L.LogInfo
+
+pLogTrace :: Text -> LogList
+pLogTrace = pLog L.LogTrace
diff --git a/hsm-core/Hsm/Core/Message.hs b/hsm-core/Hsm/Core/Message.hs
new file mode 100644
index 0000000..2fb5d3c
--- /dev/null
+++ b/hsm-core/Hsm/Core/Message.hs
@@ -0,0 +1,27 @@
+{-# LANGUAGE ImportQualifiedPost #-}
+{-# LANGUAGE OverloadedStrings #-}
+
+module Hsm.Core.Message
+ ( message
+ , topic
+ , body
+ )
+where
+
+import Data.Binary (Binary, decode, encode)
+import Data.ByteString (ByteString, fromStrict, toStrict)
+import Data.ByteString.Char8 qualified as B (breakSubstring, drop, length)
+import Data.Text (Text)
+import Data.Text.Encoding (decodeUtf8, encodeUtf8)
+
+sep :: ByteString
+sep = "//"
+
+message :: Binary a => Text -> a -> ByteString
+message t b = encodeUtf8 t <> sep <> toStrict (encode b)
+
+topic :: ByteString -> Text
+topic = decodeUtf8 . fst . B.breakSubstring sep
+
+body :: Binary a => ByteString -> a
+body = decode . fromStrict . B.drop (B.length sep) . snd . B.breakSubstring sep
diff --git a/hsm-core/Hsm/Core/Zmq.hs b/hsm-core/Hsm/Core/Zmq.hs
new file mode 100644
index 0000000..fe764eb
--- /dev/null
+++ b/hsm-core/Hsm/Core/Zmq.hs
@@ -0,0 +1,175 @@
+{-# LANGUAGE AllowAmbiguousTypes #-}
+{-# LANGUAGE ImportQualifiedPost #-}
+{-# LANGUAGE OverloadedRecordDot #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TypeFamilies #-}
+
+module Hsm.Core.Zmq
+ ( Client
+ , receive
+ , runClient
+ , Server
+ , send
+ , runServer
+ )
+where
+
+import Control.Monad (forM_)
+import Data.Aeson.Types (Value, emptyObject)
+import Data.Binary (Binary)
+import Data.Function ((&))
+import Data.Text (Text, show, unpack)
+import Data.Text.Encoding (encodeUtf8)
+import Data.Time.Clock (UTCTime, getCurrentTime)
+import Effectful (Dispatch (Static), DispatchOf, Eff, Effect, IOE, (:>))
+import Effectful.Dispatch.Static qualified as S
+import Effectful.Log qualified as L
+import Effectful.Reader.Static (Reader, ask)
+import Effectful.Resource (Resource, allocate)
+import GHC.Records (HasField)
+import Hsm.Core.Message (body, message, topic)
+import System.ZMQ4 qualified as Z
+import Prelude hiding (show)
+
+type DefaultLoggerIO = UTCTime -> L.LogLevel -> Text -> Value -> IO ()
+
+type LoggerIO = L.LogLevel -> Text -> IO ()
+
+type ZmqResource t = (Z.Context, Z.Socket t)
+
+type ZmqConstraint es = (IOE :> es, L.Log :> es, Resource :> es)
+
+withSocket
+ :: forall t es
+ . ( Z.SocketType t
+ , ZmqConstraint es
+ )
+ => t
+ -> Eff es (Z.Socket t)
+withSocket stype = L.getLoggerIO >>= withResource . logIO
+ where
+ logIO :: DefaultLoggerIO -> LoggerIO
+ logIO loggerIO level msg = do
+ now <- getCurrentTime
+ loggerIO now level msg emptyObject
+
+ withResource :: LoggerIO -> Eff es (Z.Socket t)
+ withResource logger = snd . snd <$> allocate acquire release
+ where
+ acquire :: IO (ZmqResource t)
+ acquire = do
+ logger L.LogTrace "Acquiring ZMQ context"
+ context <- Z.context
+ socket <- Z.socket context stype
+ return (context, socket)
+
+ release :: ZmqResource t -> IO ()
+ release (context, socket) = do
+ logger L.LogTrace "Releasing ZMQ context"
+ Z.close socket
+ Z.shutdown context
+
+data Client :: Effect
+
+type instance DispatchOf Client = Static S.WithSideEffects
+
+newtype instance S.StaticRep Client = Client (Z.Socket Z.Sub)
+
+clientDomain :: Text
+clientDomain = "client"
+
+receive
+ :: forall a es
+ . ( Binary a
+ , Client :> es
+ , L.Log :> es
+ , Show a
+ )
+ => Eff es a
+receive = do
+ Client socket <- S.getStaticRep
+ msg <- S.unsafeEff_ $ Z.receive socket
+ "Message received [" <> topic msg <> "]: " <> show (body @a msg)
+ & L.logTrace_
+ & L.localDomain clientDomain
+ return $ body msg
+
+runClient
+ :: forall e es a
+ . ( HasField "subEps" e [Text]
+ , HasField "topics" e [Text]
+ , Reader e :> es
+ , ZmqConstraint es
+ )
+ => Eff (Client : es) a
+ -> Eff es a
+runClient action = withSocket Z.Sub >>= runAction
+ where
+ runAction :: Z.Socket Z.Sub -> Eff es a
+ runAction socket = S.evalStaticRep (Client socket) $ initialize >> action
+ where
+ connect :: Text -> Eff (Client : es) ()
+ connect = S.unsafeEff_ . Z.connect socket . unpack
+
+ subscribe :: Text -> Eff (Client : es) ()
+ subscribe = S.unsafeEff_ . Z.subscribe socket . encodeUtf8
+
+ initialize :: Eff (Client : es) ()
+ initialize =
+ L.localDomain clientDomain $ do
+ L.logInfo_ "Initializing ZMQ client"
+ env <- ask @e
+ forM_ env.subEps connect
+ forM_ env.topics subscribe
+ L.logTrace_ $ "Listening to " <> show env.subEps
+ L.logTrace_ $ "Subscribed to " <> show env.topics
+
+data Server :: Effect
+
+type instance DispatchOf Server = Static S.WithSideEffects
+
+newtype instance S.StaticRep Server = Server (Z.Socket Z.Pub)
+
+serverDomain :: Text
+serverDomain = "server"
+
+send
+ :: forall a e es
+ . ( Binary a
+ , HasField "name" e Text
+ , L.Log :> es
+ , Reader e :> es
+ , Server :> es
+ , Show a
+ )
+ => a
+ -> Eff es ()
+send payload = do
+ Server s <- S.getStaticRep
+ env <- ask @e
+ S.unsafeEff_ $ Z.send s [] $ message env.name payload
+ L.localDomain serverDomain $ L.logTrace_ $ "Message sent: " <> show payload
+
+runServer
+ :: forall e es a
+ . ( HasField "pubEp" e Text
+ , Reader e :> es
+ , ZmqConstraint es
+ )
+ => Eff (Server : es) a
+ -> Eff es a
+runServer action = withSocket Z.Pub >>= runAction
+ where
+ runAction :: Z.Socket Z.Pub -> Eff es a
+ runAction socket = S.evalStaticRep (Server socket) $ initialize >> action
+ where
+ bind :: Text -> Eff (Server : es) ()
+ bind = S.unsafeEff_ . Z.bind socket . unpack
+
+ initialize :: Eff (Server : es) ()
+ initialize =
+ L.localDomain serverDomain $ do
+ L.logInfo_ "Initializing ZMQ server"
+ env <- ask @e
+ bind env.pubEp
+ L.logTrace_ $ "Publishing to " <> env.pubEp