start restructuring joinOnNewEntries flow
- overview comment on possible flow - cache query - doesn't compile yet
This commit is contained in:
parent
578cc362b9
commit
9a61c186e3
|
@ -127,9 +127,16 @@ fediChordInit initConf serviceRunner = do
|
||||||
either (\err -> do
|
either (\err -> do
|
||||||
-- handle unsuccessful join
|
-- handle unsuccessful join
|
||||||
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
putStrLn $ err <> " Error joining, start listening for incoming requests anyways"
|
||||||
|
-- add an unjoined placeholder vserver to be able to listen for
|
||||||
|
-- incoming request
|
||||||
|
placeholderVS <- nodeStateInit realNodeSTM 0
|
||||||
|
placeholderVSSTM <- newTVarIO placeholderVS
|
||||||
|
atomically . modifyTVar' realNodeSTM $
|
||||||
|
addVserver (getNid placeholderVS, placeholderVSSTM)
|
||||||
|
-- TODO: on which bootstrap node vserver to join? (#77)
|
||||||
-- launch thread attempting to join on new cache entries
|
-- launch thread attempting to join on new cache entries
|
||||||
-- TODO: adjust joinOnNewEntriesThread to k-choices
|
-- TODO: adjust joinOnNewEntriesThread to k-choices
|
||||||
--_ <- forkIO $ joinOnNewEntriesThread firstVSSTM
|
_ <- forkIO $ joinOnNewEntriesThread realNodeSTM
|
||||||
async (fediMainThreads serverSock realNodeSTM)
|
async (fediMainThreads serverSock realNodeSTM)
|
||||||
)
|
)
|
||||||
(\_ -> do
|
(\_ -> do
|
||||||
|
@ -370,9 +377,8 @@ tryBootstrapJoining nodeSTM = do
|
||||||
joinResult <- tryJoining bss (fediChordBootstrapJoin firstVSSTM)
|
joinResult <- tryJoining bss (fediChordBootstrapJoin firstVSSTM)
|
||||||
either
|
either
|
||||||
(pure . Left)
|
(pure . Left)
|
||||||
(\_ -> fmap Right . atomically . modifyTVar' nodeSTM $ (\rn -> rn
|
(\_ -> fmap Right . atomically . modifyTVar' nodeSTM $
|
||||||
{ vservers = HMap.insert (getNid firstVS) firstVSSTM (vservers rn) }
|
addVserver (getNid firstVS, firstVSSTM)
|
||||||
)
|
|
||||||
)
|
)
|
||||||
(joinResult :: Either String ())
|
(joinResult :: Either String ())
|
||||||
|
|
||||||
|
@ -473,24 +479,44 @@ fediChordVserverLeave ns = do
|
||||||
|
|
||||||
-- | Wait for new cache entries to appear and then try joining on them.
|
-- | Wait for new cache entries to appear and then try joining on them.
|
||||||
-- Exits after successful joining.
|
-- Exits after successful joining.
|
||||||
joinOnNewEntriesThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
|
joinOnNewEntriesThread :: Service s (RealNodeSTM s) => RealNodeSTM s -> IO ()
|
||||||
joinOnNewEntriesThread nsSTM = loop
|
joinOnNewEntriesThread nodeSTM = loop
|
||||||
where
|
where
|
||||||
|
-- situation 1: not joined yet -> vservers == empty
|
||||||
|
-- problem: empty vservers means not responding to incoming requests, so
|
||||||
|
-- another node cannot join on us an we not on them (as they're still
|
||||||
|
-- unjoined as well)
|
||||||
|
-- solution: on failure still join a dummy node, also add it as vserver
|
||||||
|
-- problem: once another node has joined on the dummy vserver, we shouldn't
|
||||||
|
-- just delete it again as it now relies on it as a neighbour
|
||||||
|
-- => either trigger a kChoicesNodeJoin with the flag that **not** at least one
|
||||||
|
-- single node needs to be joined (read vservers initially), or do an accelerated
|
||||||
|
-- periodic rebalance
|
||||||
|
-- TODO: document this approach in the docs
|
||||||
loop = do
|
loop = do
|
||||||
nsSnap <- readTVarIO nsSTM
|
lookupResult <- atomically $ do
|
||||||
(lookupResult, parentNode) <- atomically $ do
|
nodeSnap <- readTVar nodeSTM
|
||||||
cache <- readTVar $ nodeCacheSTM nsSnap
|
case headMay (HMap.toList $ vservers nodeSnap) of
|
||||||
parentNode <- readTVar $ parentRealNode nsSnap
|
Nothing -> retry
|
||||||
case queryLocalCache nsSnap cache 1 (getNid nsSnap) of
|
Just vsSTM -> do
|
||||||
-- empty cache, block until cache changes and then retry
|
-- take any active vserver as heuristic for whether this node has
|
||||||
|
-- successfully joined:
|
||||||
|
-- If the node hasn't joined yet, only a single placeholder node
|
||||||
|
-- is active…
|
||||||
|
firstVS <- readTVar vsSTM
|
||||||
|
cache <- readTVar $ globalNodeCacheSTM nodeSnap
|
||||||
|
case queryLocalCache firstVS cache 1 (getNid firstVS) of
|
||||||
|
-- …which, having no neighbours, returns an empty forward list
|
||||||
|
-- -> block until cache changes and then retry
|
||||||
(FORWARD s) | Set.null s -> retry
|
(FORWARD s) | Set.null s -> retry
|
||||||
result -> pure (result, parentNode)
|
result -> pure result
|
||||||
case lookupResult of
|
case lookupResult of
|
||||||
-- already joined
|
-- already joined
|
||||||
FOUND _ ->
|
FOUND _ ->
|
||||||
pure ()
|
pure ()
|
||||||
-- otherwise try joining
|
-- otherwise try joining
|
||||||
FORWARD _ -> do
|
FORWARD _ -> do
|
||||||
|
-- do normal join, but without bootstrap nodes
|
||||||
joinResult <- runExceptT $ fediChordVserverJoin nsSTM
|
joinResult <- runExceptT $ fediChordVserverJoin nsSTM
|
||||||
either
|
either
|
||||||
-- on join failure, sleep and retry
|
-- on join failure, sleep and retry
|
||||||
|
|
|
@ -22,6 +22,7 @@ module Hash2Pub.FediChordTypes
|
||||||
, LoadStats (..)
|
, LoadStats (..)
|
||||||
, emptyLoadStats
|
, emptyLoadStats
|
||||||
, remainingLoadTarget
|
, remainingLoadTarget
|
||||||
|
, addVserver
|
||||||
, SegmentLoadStats (..)
|
, SegmentLoadStats (..)
|
||||||
, setSuccessors
|
, setSuccessors
|
||||||
, setPredecessors
|
, setPredecessors
|
||||||
|
@ -170,6 +171,10 @@ data RealNode s = RealNode
|
||||||
, nodeService :: s (RealNodeSTM s)
|
, nodeService :: s (RealNodeSTM s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
-- | insert a new vserver mapping into a node
|
||||||
|
addVserver :: (NodeID, LocalNodeStateSTM s) -> RealNode s -> RealNode s
|
||||||
|
addVserver (key, nstate) node = node
|
||||||
|
{ vservers = HMap.insert key nstate (vservers node) }
|
||||||
|
|
||||||
type VSMap s = HashMap NodeID (LocalNodeStateSTM s)
|
type VSMap s = HashMap NodeID (LocalNodeStateSTM s)
|
||||||
type RealNodeSTM s = TVar (RealNode s)
|
type RealNodeSTM s = TVar (RealNode s)
|
||||||
|
|
Loading…
Reference in a new issue