summaryrefslogtreecommitdiff
path: root/hsm-core/Hsm/Core/Zmq.hs
blob: 69ff59a6ed30078b7f4b29ff6c41df4562ffeffa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}

-- Module     : Hsm.Core.Zmq
-- Maintainer : contact@pauloliver.dev
module Hsm.Core.Zmq
  ( client
  , server
  )
where

import Control.Monad (forM_, forever)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Reader (ask)
import Data.Binary (Binary)
import Data.Text (Text, pack, unpack)
import Data.Text.Encoding (encodeUtf8)
import Effectful (IOE, (:>))
import Effectful.Log (Log, LogLevel (LogTrace), localDomain, logInfo_, logTrace_)
import Effectful.Reader.Static (Reader)
import Effectful.Resource (Resource, allocate)
import GHC.Records (HasField)
import Hsm.Core.LogIO (LoggerIO, getLoggerIO)
import Hsm.Core.Message (body, message)
import Hsm.Core.Pipes (Consumer, Producer, Proxy, await, yield)
import System.ZMQ4 qualified as Z

data ZmqResource t = ZmqResource Z.Context (Z.Socket t)

type ZmqC e es = (IOE :> es, Log :> es, Reader e :> es, Resource :> es)

type UsingC t e es = (Z.SocketType t, ZmqC e es)

type ClientC a e es = (Binary a, HasField "name" e Text, HasField "subEps" e [Text], HasField "topics" e [Text], ZmqC e es)

type ServerC a e es = (Binary a, HasField "name" e Text, HasField "pubEp" e Text, ZmqC e es)

type Action t a' a b' b e s es = Z.Socket t -> e -> Proxy a' a b' b e s es ()

using :: forall t a' a b' b e s es. UsingC t e es => t -> Action t a' a b' b e s es -> Proxy a' a b' b e s es ()
using stype action = getLoggerIO >>= safely
 where
  safely :: LoggerIO -> Proxy a' a b' b e s es ()
  safely logger =
    allocate acquire release >>= \(_, ZmqResource _ socket) ->
      ask >>= action socket
   where
    acquire :: IO (ZmqResource t)
    acquire = do
      logger LogTrace "Acquiring ZMQ context"
      context <- Z.context
      socket <- Z.socket context stype
      return $ ZmqResource context socket

    release :: ZmqResource t -> IO ()
    release (ZmqResource context socket) = do
      logger LogTrace "Releasing ZMQ context"
      Z.close socket
      Z.shutdown context

client :: ClientC a e es => Producer a e s es ()
client =
  using Z.Sub $ \socket e ->
    localDomain "client" $ do
      logInfo_ "Initializing ZMQ client"
      liftIO $ forM_ e.subEps $ Z.connect socket . unpack
      liftIO $ forM_ e.topics $ Z.subscribe socket . encodeUtf8
      logTrace_ $ "Listening to " <> pack (show e.subEps)
      logTrace_ $ "Subscribed to " <> pack (show e.topics)
      forever $ liftIO (Z.receive socket) >>= yield . body

server :: ServerC a e es => Consumer a e s es ()
server =
  using Z.Pub $ \socket e ->
    localDomain "server" $ do
      logInfo_ "Initializing ZMQ server"
      liftIO $ Z.bind socket $ unpack e.pubEp
      logTrace_ $ "Publishing to " <> e.pubEp
      forever $ await >>= liftIO . Z.send socket [] . message e.name