trigger service data migration at join

This commit is contained in:
Trolli Schmittlauch 2020-08-16 17:53:48 +02:00
parent 470ce6f39a
commit 581757965a
2 changed files with 24 additions and 13 deletions

View file

@ -41,13 +41,14 @@ module Hash2Pub.DHTProtocol
)
where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception
import Control.Monad (foldM, forM, forM_)
import Control.Monad (foldM, forM, forM_, when)
import qualified Data.ByteString as BS
import Data.Either (rights)
import Data.Foldable (foldl', foldr')
@ -75,10 +76,11 @@ import Hash2Pub.FediChordTypes (CacheEntry (..),
LocalNodeState (..),
LocalNodeStateSTM, NodeCache,
NodeID, NodeState (..),
RealNode (..),
RealNode (..), RealNodeSTM,
RemoteNodeState (..),
RingEntry (..), RingMap (..),
addRMapEntry, addRMapEntryWith,
Service (..), addRMapEntry,
addRMapEntryWith,
cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred,
cacheLookupSucc, genNodeID,
@ -250,7 +252,8 @@ ackRequest _ _ = Map.empty
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
-- the response to be sent.
handleIncomingRequest :: LocalNodeStateSTM s -- ^ the handling node
handleIncomingRequest :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
-> SockAddr -- ^ source address of the request
@ -422,10 +425,10 @@ respondPing nsSTM msgSet = do
-- this modifies node state, so locking and IO seems to be necessary.
-- Still try to keep as much code as possible pure
respondJoin :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin nsSTM msgSet = do
-- atomically read and modify the node state according to the parsed request
responseMsg <- atomically $ do
(dataMigration, responseMsg) <- atomically $ do
nsSnap <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM nsSnap
let
@ -455,17 +458,24 @@ respondJoin nsSTM msgSet = do
, payload = Just responsePayload
}
writeTVar nsSTM joinedNS
pure joinResponse
ownService <- nodeService <$> readTVar (parentRealNode nsSnap)
let
serviceDataMigrator = migrateData ownService lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS)
lowerKeyBound = maybe (getNid nsSnap) getNid $ headMay (predecessors nsSnap)
pure (Just serviceDataMigrator, joinResponse)
-- otherwise respond with empty payload
else pure Response {
else pure (Nothing, Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = Join
, payload = Nothing
}
})
-- as DHT response is required immediately, fork the service data migration push
-- into a new thread. That's kind of ugly but the best I can think of so far
when (isJust dataMigration) (forkIO (fromJust dataMigration >> pure ()) >> pure ())
pure $ serialiseMessage sendMessageSize responseMsg
-- TODO: notify service layer to copy over data now handled by the new joined node

View file

@ -579,7 +579,7 @@ sendThread sock sendQ = forever $ do
sendAllTo sock packet addr
-- | Sets up and manages the main server threads of FediChord
fediMainThreads :: Socket -> LocalNodeStateSTM s -> IO ()
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO ()
fediMainThreads sock nsSTM = do
ns <- readTVarIO nsSTM
putStrLn "launching threads"
@ -622,7 +622,8 @@ requestMapPurge mapVar = forever $ do
-- | Wait for messages, deserialise them, manage parts and acknowledgement status,
-- and pass them to their specific handling function.
fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue
fediMessageHandler :: Service s (RealNodeSTM s)
=> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
-> LocalNodeStateSTM s -- ^ acting NodeState
-> IO ()