Skip to content

Commit d9d7e04

Browse files
Manuel Bärenzturion
authored andcommitted
WIP
1 parent 0ed6592 commit d9d7e04

File tree

27 files changed

+513
-182
lines changed

27 files changed

+513
-182
lines changed

automaton/automaton.cabal

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ common opts
4040
these >=1.1 && <=1.3,
4141
transformers >=0.5,
4242
witherable ^>=0.5,
43+
sop-core ^>=0.5,
44+
free >= 5.1,
4345

4446
if flag(dev)
4547
ghc-options: -Werror
@@ -68,6 +70,7 @@ library
6870
Data.Automaton
6971
Data.Automaton.Filter
7072
Data.Automaton.Recursive
73+
Data.Automaton.Schedule
7174
Data.Automaton.Trans.Accum
7275
Data.Automaton.Trans.Changeset
7376
Data.Automaton.Trans.Except
@@ -100,6 +103,7 @@ test-suite automaton-test
100103
Automaton
101104
Automaton.Except
102105
Automaton.Filter
106+
Automaton.Schedule
103107
Automaton.Trans.Accum
104108
Automaton.Trans.Changeset
105109
Stream

automaton/src/Data/Automaton.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ initialised :: (Monad m) => (a -> m b) -> Automaton m a b
656656
initialised = Automaton . Stateful . StreamT.initialised . ReaderT
657657
{-# INLINE initialised #-}
658658

659-
-- | Like 'initialised', but ignores the input.
659+
-- | Like 'initialised_', but ignores the input.
660660
initialised_ :: (Monad m) => m b -> Automaton m a b
661661
initialised_ = initialised . const
662662
{-# INLINE initialised_ #-}
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
{-# LANGUAGE DerivingStrategies #-}
2+
{-# LANGUAGE ExistentialQuantification #-}
3+
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
4+
{-# LANGUAGE RankNTypes #-}
5+
6+
-- FIXME haddocks
7+
module Data.Automaton.Schedule where
8+
9+
-- base
10+
import Control.Arrow
11+
import Control.Concurrent (forkIO, newEmptyMVar, putMVar, readMVar, takeMVar, tryTakeMVar)
12+
import Control.Monad (forM_, void)
13+
import Control.Monad.IO.Class (MonadIO)
14+
import Control.Monad.Identity (Identity (..))
15+
import Data.Function ((&))
16+
import Data.Functor ((<&>))
17+
import Data.Functor.Compose (Compose (..))
18+
import Data.Kind (Type)
19+
import Data.List.NonEmpty as N
20+
import Data.Maybe (fromMaybe, maybeToList)
21+
22+
-- base-compat
23+
import Data.Foldable1 (Foldable1 (foldrMap1))
24+
25+
-- transformers
26+
import Control.Monad.Trans.Accum (AccumT (..), runAccumT)
27+
import Control.Monad.Trans.Class (MonadTrans (..))
28+
import Control.Monad.Trans.Except (ExceptT (..))
29+
import Control.Monad.Trans.Maybe (MaybeT (..))
30+
import Control.Monad.Trans.Reader (ReaderT (..))
31+
import Control.Monad.Trans.Writer.CPS qualified as CPS
32+
import Control.Monad.Trans.Writer.Lazy qualified as Lazy
33+
import Control.Monad.Trans.Writer.Strict qualified as Strict
34+
35+
-- sop-core
36+
import Data.SOP (HCollapse (hcollapse), HSequence (htraverse'), I (..), K (..), NP (..), SListI, hmap, hzipWith)
37+
38+
-- free
39+
import Control.Monad.Trans.Free (FreeF (..), FreeT (..), iterT, liftF)
40+
41+
-- automaton
42+
43+
import Control.Monad.Trans.Changeset (ChangesetT (..))
44+
import Data.Automaton (Automaton (..), arrM, constM, feedback, handleAutomaton, initialised_, liftS, reactimate, withAutomaton_)
45+
import Data.Automaton qualified as Automaton
46+
import Data.Automaton.Trans.Except (exceptS)
47+
import Data.Automaton.Trans.Maybe (runMaybeS)
48+
import Data.Automaton.Trans.Reader (readerS, runReaderS)
49+
import Data.Monoid.RightAction (RightAction)
50+
import Data.Stream (StreamT (..), concatS)
51+
import Data.Stream.Optimized (OptimizedStreamT (Stateful), toStreamT)
52+
import Data.Stream.Result
53+
import Data.Tuple (swap)
54+
55+
class MonadSchedule m where
56+
-- | Run a nonempty list of automata concurrently.
57+
schedule :: NonEmpty (Automaton m a b) -> Automaton m a b
58+
59+
-- | Start all streams in the background and send their values to a shared 'MVar'.
60+
instance MonadSchedule IO where
61+
schedule automata = proc a -> do
62+
(output, input) <- initialised_ startStreams -< ()
63+
arrM $ void . tryTakeMVar -< input
64+
arrM $ uncurry putMVar -< (input, a)
65+
arrM takeMVar -< output
66+
where
67+
startStreams = do
68+
output <- newEmptyMVar
69+
input <- newEmptyMVar
70+
forM_ automata $ \automaton -> forkIO $ reactimate $ lastMVarValue input >>> automaton >>> arrM (putMVar output)
71+
return (output, input)
72+
lastMVarValue var = feedback Nothing $ proc ((), aMaybe) -> do
73+
case aMaybe of
74+
Nothing -> do
75+
a <- constM $ readMVar var -< ()
76+
returnA -< (a, Just a)
77+
Just a -> do
78+
aNewMaybe <- constM $ tryTakeMVar var -< ()
79+
let aNew = fromMaybe a aNewMaybe
80+
returnA -< (aNew, aNewMaybe)
81+
82+
instance (Monad m, MonadSchedule m) => MonadSchedule (ReaderT r m) where
83+
schedule =
84+
fmap runReaderS
85+
>>> schedule
86+
>>> readerS
87+
88+
instance (Monad m, MonadSchedule m) => MonadSchedule (ExceptT e m) where
89+
schedule =
90+
fmap exceptS
91+
>>> schedule
92+
>>> withAutomaton_ (fmap sequenceA >>> ExceptT)
93+
94+
instance (Monad m, MonadSchedule m) => MonadSchedule (MaybeT m) where
95+
schedule =
96+
fmap runMaybeS
97+
>>> schedule
98+
>>> withAutomaton_ (fmap sequenceA >>> MaybeT)
99+
100+
instance (Monoid w, Monad m, MonadSchedule m) => MonadSchedule (CPS.WriterT w m) where
101+
schedule =
102+
fmap (withAutomaton_ (CPS.runWriterT >>> fmap (\(Result s a, w) -> Result s (a, w))))
103+
>>> schedule
104+
>>> withAutomaton_ (fmap (\(Result s (a, w)) -> (Result s a, w)) >>> CPS.writerT)
105+
106+
instance (Monoid w, Monad m, MonadSchedule m) => MonadSchedule (Strict.WriterT w m) where
107+
schedule =
108+
fmap (withAutomaton_ (Strict.runWriterT >>> fmap (\(Result s a, w) -> Result s (a, w))))
109+
>>> schedule
110+
>>> withAutomaton_ (fmap (\(Result s (a, w)) -> (Result s a, w)) >>> Strict.WriterT)
111+
112+
instance (Monoid w, Monad m, MonadSchedule m) => MonadSchedule (Lazy.WriterT w m) where
113+
schedule =
114+
fmap (withAutomaton_ (Lazy.runWriterT >>> fmap (\(Result s a, w) -> Result s (a, w))))
115+
>>> schedule
116+
>>> withAutomaton_ (fmap (\(Result s (a, w)) -> (Result s a, w)) >>> Lazy.WriterT)
117+
118+
-- | This will share the accumulated log from the past with all automata
119+
instance (Monoid w, Monad m, MonadSchedule m) => MonadSchedule (AccumT w m) where
120+
schedule =
121+
fmap (withAutomaton_ (runAccumT >>> ReaderT >>> CPS.writerT))
122+
>>> schedule
123+
>>> withAutomaton_ (CPS.runWriterT >>> runReaderT >>> AccumT)
124+
125+
-- | This will share the accumulated state from the past with all automata
126+
instance (Monoid w, RightAction w s, Monad m, MonadSchedule m) => MonadSchedule (ChangesetT s w m) where
127+
schedule =
128+
fmap (withAutomaton_ (getChangesetT >>> ReaderT >>> fmap swap >>> CPS.writerT))
129+
>>> schedule
130+
>>> withAutomaton_ (CPS.runWriterT >>> fmap swap >>> runReaderT >>> ChangesetT)
131+
132+
-- | Cycle through all automata in a round-robin fashion
133+
instance MonadSchedule Identity where
134+
schedule =
135+
fmap (getAutomaton >>> toStreamT)
136+
>>> foldrMap1 buildStreams consStreams
137+
>>> roundRobinStreams
138+
>>> fmap N.toList
139+
>>> concatS
140+
>>> Stateful
141+
>>> Automaton
142+
where
143+
buildStreams :: StreamT m b -> Streams m b
144+
buildStreams StreamT {state, step} =
145+
Streams
146+
{ states = I state :* Nil
147+
, steps = Step (ResultStateT step) :* Nil
148+
}
149+
150+
consStreams :: StreamT m b -> Streams m b -> Streams m b
151+
consStreams StreamT {state, step} Streams {states, steps} =
152+
Streams
153+
{ states = I state :* states
154+
, steps = Step (ResultStateT step) :* steps
155+
}
156+
157+
-- FIXME take care to reverse & test
158+
159+
roundRobinStreams :: (Functor m, Applicative m) => Streams m b -> StreamT m (NonEmpty b)
160+
roundRobinStreams Streams {states, steps} =
161+
StreamT
162+
{ state = states
163+
, step = \s ->
164+
s
165+
& hzipWith (\Step {getStep} (I s) -> getResultStateT getStep s <&> RunningResult & Compose) steps
166+
& htraverse' getCompose
167+
<&> ( \results ->
168+
Result
169+
(results & hmap (getRunningResult >>> resultState >>> I))
170+
(results & hmap (getRunningResult >>> output >>> K) & hnonemptycollapse)
171+
)
172+
}
173+
174+
hnonemptycollapse :: (SListI as) => NP (K b) (a ': as) -> NonEmpty b
175+
hnonemptycollapse (K a :* as) = a :| hcollapse as
176+
177+
-- | A nonempty list of 'StreamT's, unzipped into their states and their steps.
178+
data Streams m b
179+
= forall state (states :: [Type]).
180+
(SListI states) =>
181+
Streams
182+
{ states :: NP I (state ': states)
183+
, steps :: NP (Step m b) (state ': states)
184+
}
185+
186+
-- | One step of a stream, with the state type argument going last, so it is usable with sop-core.
187+
newtype Step m b state = Step {getStep :: ResultStateT state m b}
188+
189+
-- | The result of a stream, with the type arguments swapped, so it's usable with sop-core
190+
newtype RunningResult b state = RunningResult {getRunningResult :: Result state b}
191+
192+
-- * The symbolic effect of skipping one step of an automaton
193+
194+
newtype SkipT m a = SkipT {getSkipT :: FreeT Identity m a}
195+
deriving newtype (Functor, Applicative, Monad, MonadTrans, MonadIO)
196+
197+
type Yield = SkipT Identity
198+
199+
runSkipS :: (Functor m, Monad m) => Automaton (SkipT m) a b -> Automaton m a (Maybe b)
200+
runSkipS = handleAutomaton $ \StreamT {state, step} ->
201+
StreamT
202+
{ state = step state
203+
, step = \s -> ReaderT $ \a -> do
204+
oneTick <- runFreeT $ getSkipT $ runReaderT s a
205+
return $ case oneTick of
206+
Pure (Result s' b) -> Result (step s') (Just b)
207+
Free (Identity cont) -> Result (lift $ SkipT cont) Nothing
208+
}
209+
210+
instance (Monad m, MonadSchedule m) => MonadSchedule (SkipT m) where
211+
schedule = fmap runSkipS >>> schedule >>> fmap maybeToList >>> Automaton.concatS >>> liftS
212+
213+
skip :: (Monad m) => SkipT m ()
214+
skip = SkipT $ liftF $ pure ()
215+
216+
runSkipT :: (Monad m) => SkipT m a -> m a
217+
runSkipT = iterT runIdentity . getSkipT
218+
219+
runSkipTWith :: (Monad m) => m () -> SkipT m a -> m a
220+
runSkipTWith action = iterT (\ima -> action >> runIdentity ima) . getSkipT
221+
222+
runYield :: Yield a -> a
223+
runYield = runIdentity . runSkipT

automaton/src/Data/Stream.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,9 @@ withStreamT f StreamT {state, step} = StreamT state $ fmap f step
289289
This function lets a stream control the speed at which it produces data,
290290
since it can decide to produce any amount of output at every step.
291291
-}
292+
293+
-- FIXME this reverses? doc?
294+
-- FIXME generalise to traversable?
292295
concatS :: (Monad m) => StreamT m [a] -> StreamT m a
293296
concatS StreamT {state, step} =
294297
StreamT

automaton/test/Automaton.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import Test.Tasty.HUnit (testCase, (@?=))
2929
-- automaton
3030
import Automaton.Except
3131
import Automaton.Filter
32+
import Automaton.Schedule
3233
import Automaton.Trans.Accum
3334
import Automaton.Trans.Changeset
3435
import Data.Automaton
@@ -80,6 +81,7 @@ tests =
8081
, Automaton.Filter.tests
8182
, Automaton.Trans.Accum.tests
8283
, Automaton.Trans.Changeset.tests
84+
, Automaton.Schedule.tests
8385
]
8486

8587
inMaybe :: Automaton Maybe (Maybe a) a
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
module Automaton.Schedule where
2+
3+
-- base
4+
import Control.Category ((>>>))
5+
import Control.Monad (replicateM)
6+
import Control.Monad.Identity (Identity (runIdentity))
7+
import Data.Functor (($>))
8+
import Data.List.NonEmpty (NonEmpty (..))
9+
10+
-- changeset
11+
12+
import Control.Monad.Changeset.Class (MonadChangeset (..))
13+
import Control.Monad.Trans.Changeset (Count (Increment), changeSingle, runChangeset)
14+
15+
-- tasty
16+
import Test.Tasty (testGroup)
17+
18+
-- tasty-hunit
19+
import Test.Tasty.HUnit (testCase, (@?=))
20+
21+
-- automaton
22+
import Data.Automaton (accumulateWith, constM, embed)
23+
import Data.Automaton.Schedule (runSkipS, runYield, schedule, skip)
24+
25+
tests =
26+
testGroup
27+
"Schedule"
28+
[ testGroup
29+
"SkipT"
30+
[ testCase "SkipT skips an output step" $ do
31+
let output = runIdentity $ embed (runSkipS $ constM (wait 5 $> (5 :: Int)) >>> accumulateWith (+) 0) $ replicate 10 ()
32+
output @?= [Nothing, Nothing, Nothing, Nothing, Just 5, Nothing, Nothing, Nothing, Nothing, Just 10]
33+
, testCase "schedule waits chronologically (mirrored)" $ do
34+
let output = runIdentity $ embed (runSkipS $ constM (wait 3 $> (3 :: Int)) >>> accumulateWith (+) 0) $ replicate 10 ()
35+
output @?= [Nothing, Nothing, Just 3, Nothing, Nothing, Just 6, Nothing, Nothing, Just 9, Nothing]
36+
]
37+
, testGroup
38+
"schedule"
39+
[ testCase "schedule waits chronologically" $ do
40+
let output = runYield $ embed (schedule $ (\n -> constM (wait n $> n) >>> accumulateWith (+) 0) <$> 3 :| [5]) $ replicate 10 ()
41+
output @?= [3, 5, 6, 9, 10, 12, 15, 15, 18, 20]
42+
, testCase "schedule waits chronologically (mirrored)" $ do
43+
let output = runYield $ embed (schedule $ (\n -> constM (wait n $> n) >>> accumulateWith (+) 0) <$> 5 :| [3]) $ replicate 10 ()
44+
output @?= [3, 5, 6, 9, 10, 12, 15, 15, 18, 20]
45+
]
46+
, testGroup
47+
"ChangesetT"
48+
[ testCase "Single automaton is unchanged" $ do
49+
let output = flip runChangeset (0 :: Int) $ flip embed (replicate 5 ()) $ schedule $ pure $ constM $ changeSingle Increment >> current
50+
output @?= ([1, 2, 3, 4, 5], 5)
51+
, testCase "Two automata see global state" $ do
52+
let output = flip runChangeset (0 :: Int) $ flip embed (replicate 10 ()) $ schedule $ constM (changeSingle Increment >> pure (-1)) :| [constM current]
53+
output
54+
@?= (
55+
[ -1
56+
, 0 -- First tick of both automata: Second one doesn't yet see the log of the other
57+
, -1
58+
, 1 -- Second joint tick: Log from the first reaches the second automaton
59+
, -1
60+
, 2
61+
, -1
62+
, 3
63+
, -1
64+
, 4
65+
]
66+
, 5
67+
)
68+
, testCase "Two automata see global state (mirrored)" $ do
69+
let output = flip runChangeset (0 :: Int) $ flip embed (replicate 10 ()) $ schedule $ constM current :| [constM (changeSingle Increment >> pure (-1))]
70+
output
71+
@?= (
72+
[ 0 -- First tick of both automata: Second one doesn't yet see the log of the other
73+
, -1
74+
, 1 -- Second joint tick: Log from the first reaches the second automaton
75+
, -1
76+
, 2
77+
, -1
78+
, 3
79+
, -1
80+
, 4
81+
, -1
82+
]
83+
, 5
84+
)
85+
]
86+
]
87+
where
88+
wait n = replicateM (n - 1) skip

flake.nix

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
(hfinal: hprev: lib.optionalAttrs prev.stdenv.isDarwin {
7171
# For custom version: Don't test because tests don't work on Mac (https://github.com/tweag/monad-bayes/issues/368)
7272
monad-bayes = dontCheck hprev.monad-bayes;
73-
monad-schedule = dontCheck hprev.monad-schedule;
7473
})
7574
(hfinal: hprev: lib.optionalAttrs (lib.versionAtLeast hprev.ghc.version "9.10") {
7675
# Remove these after https://github.com/turion/rhine/issues/399

0 commit comments

Comments
 (0)