summaryrefslogtreecommitdiff
path: root/hsm-core/Hsm/Core/Zmq.hs
blob: fe764eb08ee56f4847c1b7e0d09105d261814d87 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeFamilies #-}

module Hsm.Core.Zmq
  ( Client
  , receive
  , runClient
  , Server
  , send
  , runServer
  )
where

import Control.Monad (forM_)
import Data.Aeson.Types (Value, emptyObject)
import Data.Binary (Binary)
import Data.Function ((&))
import Data.Text (Text, show, unpack)
import Data.Text.Encoding (encodeUtf8)
import Data.Time.Clock (UTCTime, getCurrentTime)
import Effectful (Dispatch (Static), DispatchOf, Eff, Effect, IOE, (:>))
import Effectful.Dispatch.Static qualified as S
import Effectful.Log qualified as L
import Effectful.Reader.Static (Reader, ask)
import Effectful.Resource (Resource, allocate)
import GHC.Records (HasField)
import Hsm.Core.Message (body, message, topic)
import System.ZMQ4 qualified as Z
import Prelude hiding (show)

type DefaultLoggerIO = UTCTime -> L.LogLevel -> Text -> Value -> IO ()

type LoggerIO = L.LogLevel -> Text -> IO ()

type ZmqResource t = (Z.Context, Z.Socket t)

type ZmqConstraint es = (IOE :> es, L.Log :> es, Resource :> es)

withSocket
  :: forall t es
   . ( Z.SocketType t
     , ZmqConstraint es
     )
  => t
  -> Eff es (Z.Socket t)
withSocket stype = L.getLoggerIO >>= withResource . logIO
 where
  logIO :: DefaultLoggerIO -> LoggerIO
  logIO loggerIO level msg = do
    now <- getCurrentTime
    loggerIO now level msg emptyObject

  withResource :: LoggerIO -> Eff es (Z.Socket t)
  withResource logger = snd . snd <$> allocate acquire release
   where
    acquire :: IO (ZmqResource t)
    acquire = do
      logger L.LogTrace "Acquiring ZMQ context"
      context <- Z.context
      socket <- Z.socket context stype
      return (context, socket)

    release :: ZmqResource t -> IO ()
    release (context, socket) = do
      logger L.LogTrace "Releasing ZMQ context"
      Z.close socket
      Z.shutdown context

data Client :: Effect

type instance DispatchOf Client = Static S.WithSideEffects

newtype instance S.StaticRep Client = Client (Z.Socket Z.Sub)

clientDomain :: Text
clientDomain = "client"

receive
  :: forall a es
   . ( Binary a
     , Client :> es
     , L.Log :> es
     , Show a
     )
  => Eff es a
receive = do
  Client socket <- S.getStaticRep
  msg <- S.unsafeEff_ $ Z.receive socket
  "Message received [" <> topic msg <> "]: " <> show (body @a msg)
    & L.logTrace_
    & L.localDomain clientDomain
  return $ body msg

runClient
  :: forall e es a
   . ( HasField "subEps" e [Text]
     , HasField "topics" e [Text]
     , Reader e :> es
     , ZmqConstraint es
     )
  => Eff (Client : es) a
  -> Eff es a
runClient action = withSocket Z.Sub >>= runAction
 where
  runAction :: Z.Socket Z.Sub -> Eff es a
  runAction socket = S.evalStaticRep (Client socket) $ initialize >> action
   where
    connect :: Text -> Eff (Client : es) ()
    connect = S.unsafeEff_ . Z.connect socket . unpack

    subscribe :: Text -> Eff (Client : es) ()
    subscribe = S.unsafeEff_ . Z.subscribe socket . encodeUtf8

    initialize :: Eff (Client : es) ()
    initialize =
      L.localDomain clientDomain $ do
        L.logInfo_ "Initializing ZMQ client"
        env <- ask @e
        forM_ env.subEps connect
        forM_ env.topics subscribe
        L.logTrace_ $ "Listening to " <> show env.subEps
        L.logTrace_ $ "Subscribed to " <> show env.topics

data Server :: Effect

type instance DispatchOf Server = Static S.WithSideEffects

newtype instance S.StaticRep Server = Server (Z.Socket Z.Pub)

serverDomain :: Text
serverDomain = "server"

send
  :: forall a e es
   . ( Binary a
     , HasField "name" e Text
     , L.Log :> es
     , Reader e :> es
     , Server :> es
     , Show a
     )
  => a
  -> Eff es ()
send payload = do
  Server s <- S.getStaticRep
  env <- ask @e
  S.unsafeEff_ $ Z.send s [] $ message env.name payload
  L.localDomain serverDomain $ L.logTrace_ $ "Message sent: " <> show payload

runServer
  :: forall e es a
   . ( HasField "pubEp" e Text
     , Reader e :> es
     , ZmqConstraint es
     )
  => Eff (Server : es) a
  -> Eff es a
runServer action = withSocket Z.Pub >>= runAction
 where
  runAction :: Z.Socket Z.Pub -> Eff es a
  runAction socket = S.evalStaticRep (Server socket) $ initialize >> action
   where
    bind :: Text -> Eff (Server : es) ()
    bind = S.unsafeEff_ . Z.bind socket . unpack

    initialize :: Eff (Server : es) ()
    initialize =
      L.localDomain serverDomain $ do
        L.logInfo_ "Initializing ZMQ server"
        env <- ask @e
        bind env.pubEp
        L.logTrace_ $ "Publishing to " <> env.pubEp