1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
{-# LANGUAGE ImportQualifiedPost #-}
-- Module : Hsm.Core.Pipes
-- Maintainer : contact@pauloliver.dev
module Hsm.Core.Pipes
( Proxy
, (>->)
, await
, yield
, runEffect
, Producer
, Pipe
, Consumer
)
where
import Control.Exception.Safe (MonadCatch, MonadThrow)
import Control.Monad.Reader (MonadReader, ask, local)
import Control.Monad.State (MonadState, get, put)
import Control.Monad.Trans.Resource (MonadResource, liftResourceT)
import Data.Composition ((.:.))
import Effectful (Eff, IOE, (:>))
import Effectful.Log (Log, MonadLog, getLoggerEnv, localData, localDomain, localMaxLogLevel, logMessage)
import Effectful.Reader.Static qualified as E (Reader, ask, local)
import Effectful.Resource (Resource)
import Effectful.State.Static.Local qualified as E (State, get, put)
import Pipes (MonadIO, X, hoist, lift, liftIO)
import Pipes qualified as P (Proxy, await, runEffect, yield, (>->))
-- Wraps @Pipes.Proxy@ with @Eff@ as its internal monad. This provides
-- composable streaming plus @Eff@ as a means to constrain effects within
-- individual pipeline components.
newtype Proxy a' a b' b e s es r = Proxy (P.Proxy a' a b' b (Eff es) r) deriving (Applicative, Functor, Monad, MonadCatch, MonadThrow)
(>->) :: Proxy a' a () b e s es r -> Proxy () b c' c e s es r -> Proxy a' a c' c e s es r
Proxy a >-> Proxy b = Proxy $ a P.>-> b
await :: Proxy () a y' y e s es a
await = Proxy P.await
yield :: a -> Proxy x' x () a e s es ()
yield = Proxy . P.yield
runEffect :: Proxy X () () X e s es r -> Eff es r
runEffect (Proxy effect) = P.runEffect effect
instance Log :> es => MonadLog (Proxy a' a b' b e s es) where
getLoggerEnv = Proxy $ lift getLoggerEnv
localData env (Proxy action) = Proxy $ hoist (localData env) action
localDomain domain (Proxy action) = Proxy $ hoist (localDomain domain) action
localMaxLogLevel level (Proxy action) = Proxy $ hoist (localMaxLogLevel level) action
logMessage = Proxy . lift .:. logMessage
instance E.Reader e :> es => MonadReader e (Proxy a' a b' b e s es) where
ask = Proxy $ lift E.ask
local f (Proxy action) = Proxy $ hoist (E.local f) action
instance (IOE :> es, Resource :> es) => MonadResource (Proxy a' a b' b e s es) where
liftResourceT = Proxy . lift . liftResourceT
instance E.State s :> es => MonadState s (Proxy a' a b' b e s es) where
get = Proxy $ lift E.get
put = Proxy . lift . E.put
instance IOE :> es => MonadIO (Proxy a' a b' b e s es) where
liftIO = Proxy . lift . liftIO
type Producer b e s es = Proxy X () () b e s es
type Pipe a b e s es = Proxy () a () b e s es
type Consumer a e s es = Proxy () a () X e s es
|