diff options
| author | Paul Oliver <contact@pauloliver.dev> | 2024-08-24 11:57:18 -0700 | 
|---|---|---|
| committer | Paul Oliver <contact@pauloliver.dev> | 2024-12-01 07:01:30 -0800 | 
| commit | f0854265f7a1b59078308965d33fe2583a5c0f9c (patch) | |
| tree | d8b06110d84fce783f1cc91aa37155351c655b2c /hsm-core/Hsm | |
Initial commit
Diffstat (limited to 'hsm-core/Hsm')
| -rw-r--r-- | hsm-core/Hsm/Core/App.hs | 88 | ||||
| -rw-r--r-- | hsm-core/Hsm/Core/Fsm.hs | 89 | ||||
| -rw-r--r-- | hsm-core/Hsm/Core/Message.hs | 27 | ||||
| -rw-r--r-- | hsm-core/Hsm/Core/Zmq.hs | 175 | 
4 files changed, 379 insertions, 0 deletions
| diff --git a/hsm-core/Hsm/Core/App.hs b/hsm-core/Hsm/Core/App.hs new file mode 100644 index 0000000..1bb3465 --- /dev/null +++ b/hsm-core/Hsm/Core/App.hs @@ -0,0 +1,88 @@ +{-# LANGUAGE ApplicativeDo #-} +{-# LANGUAGE ImportQualifiedPost #-} + +module Hsm.Core.App +  ( launch +  , launchWithEcho +  ) +where + +import Control.Applicative ((<**>)) +import Data.Aeson (FromJSON, Result (Error, Success), Value, fromJSON) +import Data.Aeson.Key (fromString) +import Data.Aeson.KeyMap (KeyMap, (!?)) +import Data.Composition ((.:)) +import Data.Function.Slip (slipl) +import Data.Maybe (fromMaybe) +import Data.Text (pack, unpack) +import Data.Yaml (decodeFileThrow) +import Effectful.Log qualified as L +import Log.Backend.StandardOutput (withStdOutLogger) +import Options.Applicative qualified as P +import System.IO.Echo (withoutInputEcho) + +data Options = Options String L.LogLevel + +type App e a = e -> L.Logger -> L.LogLevel -> IO a + +parser :: P.Parser Options +parser = do +  config <- +    P.strOption $ +      P.help "Path to services config file" +        <> P.short 'c' +        <> P.long "config" +        <> P.metavar "PATH" +        <> P.value "config.yaml" +        <> P.showDefault +  level <- +    P.option (P.eitherReader $ L.readLogLevelEither . pack) $ +      P.help "Log level" +        <> P.short 'l' +        <> P.long "log-level" +        <> P.metavar "LEVEL" +        <> P.value L.LogInfo +        <> P.showDefaultWith (unpack . L.showLogLevel) +  pure $ Options config level + +launchWith +  :: forall e a +   . FromJSON e +  => String +  -> App e a +  -> (IO a -> IO a) +  -> IO a +launchWith name app wrapper = do +  Options path level <- P.execParser info +  returnEnv path >>= runApp level + where +  title :: String +  title = "Launch " <> name <> " service" + +  description :: P.InfoMod Options +  description = P.fullDesc <> P.progDesc title + +  info :: P.ParserInfo Options +  info = P.info (parser <**> P.helper) description + +  err :: String +  err = "Service configuration for " <> name <> " not found" + +  load :: KeyMap Value -> Value +  load configs = fromMaybe (error err) $ configs !? fromString name + +  check :: Result e -> e +  check (Success e) = e +  check (Error str) = error str + +  returnEnv :: String -> IO e +  returnEnv = fmap (check . fromJSON . load) . decodeFileThrow + +  runApp :: L.LogLevel -> e -> IO a +  runApp = wrapper . withStdOutLogger .: slipl app + +launch :: FromJSON e => String -> App e a -> IO a +launch = slipl launchWith withoutInputEcho + +launchWithEcho :: FromJSON e => String -> App e a -> IO a +launchWithEcho = slipl launchWith id diff --git a/hsm-core/Hsm/Core/Fsm.hs b/hsm-core/Hsm/Core/Fsm.hs new file mode 100644 index 0000000..e0f54a3 --- /dev/null +++ b/hsm-core/Hsm/Core/Fsm.hs @@ -0,0 +1,89 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE OverloadedStrings #-} + +module Hsm.Core.Fsm +  ( FsmState (FsmState) +  , FsmResult (FsmResult) +  , FsmOutput (FsmOutput) +  , fsm +  , fsmStream +  , pLogAttention +  , pLogInfo +  , pLogTrace +  ) +where + +import Control.Monad (forM_) +import Data.Aeson.Types (emptyObject) +import Data.Composition ((.:)) +import Data.Function.Slip (slipl) +import Data.List (singleton) +import Data.Maybe (fromJust, isJust) +import Data.Text (Text) +import Effectful (Eff, (:>)) +import Effectful.Log qualified as L +import Effectful.Reader.Static (Reader, ask) +import Effectful.State.Static.Local (State, get, put) +import Streamly.Data.Stream (Stream, mapM, takeWhile) +import Prelude hiding (mapM, takeWhile) + +data FsmState i o e s +  = FsmState Text (i -> e -> s -> FsmOutput i o e s) + +data FsmResult i o e s +  = FsmResult o s (FsmState i o e s) + +data FsmOutput i o e s +  = FsmOutput (Maybe (FsmResult i o e s)) [(L.LogLevel, Text)] + +type FsmConstraint i o e s es = +  ( L.Log :> es +  , Reader e :> es +  , State (FsmState i o e s) :> es +  , State s :> es +  ) + +fsm :: forall i o e s es. FsmConstraint i o e s es => i -> Eff es (Maybe o) +fsm input = +  L.localDomain "fsm" $ do +    FsmState name action <- get +    state <- get +    env <- ask +    L.logTrace_ $ "Entering state " <> name +    FsmOutput res logs <- return $ action input env state +    L.localDomain name $ forM_ logs $ uncurry $ slipl L.logMessage emptyObject +    L.logTrace_ $ "Exiting state " <> name +    maybe exit push res + where +  push :: FsmResult i o e s -> Eff es (Maybe o) +  push (FsmResult output state next) = do +    put state +    put next +    return $ Just output + +  exit :: Eff es (Maybe o) +  exit = do +    L.logAttention_ "No state returned, exiting FSM" +    return Nothing + +fsmStream +  :: forall i o e s es +   . FsmConstraint i o e s es +  => Stream (Eff es) i +  -> Stream (Eff es) o +fsmStream = mapM (return . fromJust) . takeWhile isJust . mapM (fsm @_ @_ @e @s) + +type LogList = [(L.LogLevel, Text)] + +pLog :: L.LogLevel -> Text -> LogList +pLog = singleton .: (,) + +pLogAttention :: Text -> LogList +pLogAttention = pLog L.LogAttention + +pLogInfo :: Text -> LogList +pLogInfo = pLog L.LogInfo + +pLogTrace :: Text -> LogList +pLogTrace = pLog L.LogTrace diff --git a/hsm-core/Hsm/Core/Message.hs b/hsm-core/Hsm/Core/Message.hs new file mode 100644 index 0000000..2fb5d3c --- /dev/null +++ b/hsm-core/Hsm/Core/Message.hs @@ -0,0 +1,27 @@ +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE OverloadedStrings #-} + +module Hsm.Core.Message +  ( message +  , topic +  , body +  ) +where + +import Data.Binary (Binary, decode, encode) +import Data.ByteString (ByteString, fromStrict, toStrict) +import Data.ByteString.Char8 qualified as B (breakSubstring, drop, length) +import Data.Text (Text) +import Data.Text.Encoding (decodeUtf8, encodeUtf8) + +sep :: ByteString +sep = "//" + +message :: Binary a => Text -> a -> ByteString +message t b = encodeUtf8 t <> sep <> toStrict (encode b) + +topic :: ByteString -> Text +topic = decodeUtf8 . fst . B.breakSubstring sep + +body :: Binary a => ByteString -> a +body = decode . fromStrict . B.drop (B.length sep) . snd . B.breakSubstring sep diff --git a/hsm-core/Hsm/Core/Zmq.hs b/hsm-core/Hsm/Core/Zmq.hs new file mode 100644 index 0000000..fe764eb --- /dev/null +++ b/hsm-core/Hsm/Core/Zmq.hs @@ -0,0 +1,175 @@ +{-# LANGUAGE AllowAmbiguousTypes #-} +{-# LANGUAGE ImportQualifiedPost #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeFamilies #-} + +module Hsm.Core.Zmq +  ( Client +  , receive +  , runClient +  , Server +  , send +  , runServer +  ) +where + +import Control.Monad (forM_) +import Data.Aeson.Types (Value, emptyObject) +import Data.Binary (Binary) +import Data.Function ((&)) +import Data.Text (Text, show, unpack) +import Data.Text.Encoding (encodeUtf8) +import Data.Time.Clock (UTCTime, getCurrentTime) +import Effectful (Dispatch (Static), DispatchOf, Eff, Effect, IOE, (:>)) +import Effectful.Dispatch.Static qualified as S +import Effectful.Log qualified as L +import Effectful.Reader.Static (Reader, ask) +import Effectful.Resource (Resource, allocate) +import GHC.Records (HasField) +import Hsm.Core.Message (body, message, topic) +import System.ZMQ4 qualified as Z +import Prelude hiding (show) + +type DefaultLoggerIO = UTCTime -> L.LogLevel -> Text -> Value -> IO () + +type LoggerIO = L.LogLevel -> Text -> IO () + +type ZmqResource t = (Z.Context, Z.Socket t) + +type ZmqConstraint es = (IOE :> es, L.Log :> es, Resource :> es) + +withSocket +  :: forall t es +   . ( Z.SocketType t +     , ZmqConstraint es +     ) +  => t +  -> Eff es (Z.Socket t) +withSocket stype = L.getLoggerIO >>= withResource . logIO + where +  logIO :: DefaultLoggerIO -> LoggerIO +  logIO loggerIO level msg = do +    now <- getCurrentTime +    loggerIO now level msg emptyObject + +  withResource :: LoggerIO -> Eff es (Z.Socket t) +  withResource logger = snd . snd <$> allocate acquire release +   where +    acquire :: IO (ZmqResource t) +    acquire = do +      logger L.LogTrace "Acquiring ZMQ context" +      context <- Z.context +      socket <- Z.socket context stype +      return (context, socket) + +    release :: ZmqResource t -> IO () +    release (context, socket) = do +      logger L.LogTrace "Releasing ZMQ context" +      Z.close socket +      Z.shutdown context + +data Client :: Effect + +type instance DispatchOf Client = Static S.WithSideEffects + +newtype instance S.StaticRep Client = Client (Z.Socket Z.Sub) + +clientDomain :: Text +clientDomain = "client" + +receive +  :: forall a es +   . ( Binary a +     , Client :> es +     , L.Log :> es +     , Show a +     ) +  => Eff es a +receive = do +  Client socket <- S.getStaticRep +  msg <- S.unsafeEff_ $ Z.receive socket +  "Message received [" <> topic msg <> "]: " <> show (body @a msg) +    & L.logTrace_ +    & L.localDomain clientDomain +  return $ body msg + +runClient +  :: forall e es a +   . ( HasField "subEps" e [Text] +     , HasField "topics" e [Text] +     , Reader e :> es +     , ZmqConstraint es +     ) +  => Eff (Client : es) a +  -> Eff es a +runClient action = withSocket Z.Sub >>= runAction + where +  runAction :: Z.Socket Z.Sub -> Eff es a +  runAction socket = S.evalStaticRep (Client socket) $ initialize >> action +   where +    connect :: Text -> Eff (Client : es) () +    connect = S.unsafeEff_ . Z.connect socket . unpack + +    subscribe :: Text -> Eff (Client : es) () +    subscribe = S.unsafeEff_ . Z.subscribe socket . encodeUtf8 + +    initialize :: Eff (Client : es) () +    initialize = +      L.localDomain clientDomain $ do +        L.logInfo_ "Initializing ZMQ client" +        env <- ask @e +        forM_ env.subEps connect +        forM_ env.topics subscribe +        L.logTrace_ $ "Listening to " <> show env.subEps +        L.logTrace_ $ "Subscribed to " <> show env.topics + +data Server :: Effect + +type instance DispatchOf Server = Static S.WithSideEffects + +newtype instance S.StaticRep Server = Server (Z.Socket Z.Pub) + +serverDomain :: Text +serverDomain = "server" + +send +  :: forall a e es +   . ( Binary a +     , HasField "name" e Text +     , L.Log :> es +     , Reader e :> es +     , Server :> es +     , Show a +     ) +  => a +  -> Eff es () +send payload = do +  Server s <- S.getStaticRep +  env <- ask @e +  S.unsafeEff_ $ Z.send s [] $ message env.name payload +  L.localDomain serverDomain $ L.logTrace_ $ "Message sent: " <> show payload + +runServer +  :: forall e es a +   . ( HasField "pubEp" e Text +     , Reader e :> es +     , ZmqConstraint es +     ) +  => Eff (Server : es) a +  -> Eff es a +runServer action = withSocket Z.Pub >>= runAction + where +  runAction :: Z.Socket Z.Pub -> Eff es a +  runAction socket = S.evalStaticRep (Server socket) $ initialize >> action +   where +    bind :: Text -> Eff (Server : es) () +    bind = S.unsafeEff_ . Z.bind socket . unpack + +    initialize :: Eff (Server : es) () +    initialize = +      L.localDomain serverDomain $ do +        L.logInfo_ "Initializing ZMQ server" +        env <- ask @e +        bind env.pubEp +        L.logTrace_ $ "Publishing to " <> env.pubEp | 
