diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 8930edc..f5fcdbd 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -41,13 +41,14 @@ module Hash2Pub.DHTProtocol ) where +import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TBQueue import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception -import Control.Monad (foldM, forM, forM_) +import Control.Monad (foldM, forM, forM_, when) import qualified Data.ByteString as BS import Data.Either (rights) import Data.Foldable (foldl', foldr') @@ -75,10 +76,11 @@ import Hash2Pub.FediChordTypes (CacheEntry (..), LocalNodeState (..), LocalNodeStateSTM, NodeCache, NodeID, NodeState (..), - RealNode (..), + RealNode (..), RealNodeSTM, RemoteNodeState (..), RingEntry (..), RingMap (..), - addRMapEntry, addRMapEntryWith, + Service (..), addRMapEntry, + addRMapEntryWith, cacheGetNodeStateUnvalidated, cacheLookup, cacheLookupPred, cacheLookupSucc, genNodeID, @@ -250,7 +252,8 @@ ackRequest _ _ = Map.empty -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue -- the response to be sent. -handleIncomingRequest :: LocalNodeStateSTM s -- ^ the handling node +handleIncomingRequest :: Service s (RealNodeSTM s) + => LocalNodeStateSTM s -- ^ the handling node -> TQueue (BS.ByteString, SockAddr) -- ^ send queue -> Set.Set FediChordMessage -- ^ all parts of the request to handle -> SockAddr -- ^ source address of the request @@ -422,10 +425,10 @@ respondPing nsSTM msgSet = do -- this modifies node state, so locking and IO seems to be necessary. -- Still try to keep as much code as possible pure -respondJoin :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondJoin nsSTM msgSet = do -- atomically read and modify the node state according to the parsed request - responseMsg <- atomically $ do + (dataMigration, responseMsg) <- atomically $ do nsSnap <- readTVar nsSTM cache <- readTVar $ nodeCacheSTM nsSnap let @@ -455,17 +458,24 @@ respondJoin nsSTM msgSet = do , payload = Just responsePayload } writeTVar nsSTM joinedNS - pure joinResponse + ownService <- nodeService <$> readTVar (parentRealNode nsSnap) + let + serviceDataMigrator = migrateData ownService lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS) + lowerKeyBound = maybe (getNid nsSnap) getNid $ headMay (predecessors nsSnap) + pure (Just serviceDataMigrator, joinResponse) -- otherwise respond with empty payload - else pure Response { + else pure (Nothing, Response { requestID = requestID aRequestPart , senderID = getNid nsSnap , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 , isFinalPart = False , action = Join , payload = Nothing - } + }) + -- as DHT response is required immediately, fork the service data migration push + -- into a new thread. That's kind of ugly but the best I can think of so far + when (isJust dataMigration) (forkIO (fromJust dataMigration >> pure ()) >> pure ()) pure $ serialiseMessage sendMessageSize responseMsg -- TODO: notify service layer to copy over data now handled by the new joined node diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 44ea80a..f3a482c 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -281,10 +281,10 @@ fediChordVserverLeave ns = do (\neighb -> do leaveResponse <- requestLeave ns neighb case leaveResponse of - Left _ -> sendUntilSuccess (i+1) + Left _ -> sendUntilSuccess (i+1) -- return first successfully contacted neighbour, -- so it can be contacted by the service layer for migration - Right _ -> pure $ Right neighb + Right _ -> pure $ Right neighb ) $ atMay (successors ns) i migrateSuccessor :: (MonadError String m, MonadIO m) => m () @@ -579,7 +579,7 @@ sendThread sock sendQ = forever $ do sendAllTo sock packet addr -- | Sets up and manages the main server threads of FediChord -fediMainThreads :: Socket -> LocalNodeStateSTM s -> IO () +fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO () fediMainThreads sock nsSTM = do ns <- readTVarIO nsSTM putStrLn "launching threads" @@ -622,7 +622,8 @@ requestMapPurge mapVar = forever $ do -- | Wait for messages, deserialise them, manage parts and acknowledgement status, -- and pass them to their specific handling function. -fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue +fediMessageHandler :: Service s (RealNodeSTM s) + => TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue -> LocalNodeStateSTM s -- ^ acting NodeState -> IO ()