diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 8930edc..13dd434 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -41,13 +41,14 @@ module Hash2Pub.DHTProtocol ) where +import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TBQueue import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception -import Control.Monad (foldM, forM, forM_) +import Control.Monad (foldM, forM, forM_, when) import qualified Data.ByteString as BS import Data.Either (rights) import Data.Foldable (foldl', foldr') @@ -75,10 +76,11 @@ import Hash2Pub.FediChordTypes (CacheEntry (..), LocalNodeState (..), LocalNodeStateSTM, NodeCache, NodeID, NodeState (..), - RealNode (..), + RealNode (..), RealNodeSTM, RemoteNodeState (..), RingEntry (..), RingMap (..), - addRMapEntry, addRMapEntryWith, + Service (..), addRMapEntry, + addRMapEntryWith, cacheGetNodeStateUnvalidated, cacheLookup, cacheLookupPred, cacheLookupSucc, genNodeID, @@ -250,7 +252,8 @@ ackRequest _ _ = Map.empty -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue -- the response to be sent. -handleIncomingRequest :: LocalNodeStateSTM s -- ^ the handling node +handleIncomingRequest :: Service s (RealNodeSTM s) + => LocalNodeStateSTM s -- ^ the handling node -> TQueue (BS.ByteString, SockAddr) -- ^ send queue -> Set.Set FediChordMessage -- ^ all parts of the request to handle -> SockAddr -- ^ source address of the request @@ -422,10 +425,10 @@ respondPing nsSTM msgSet = do -- this modifies node state, so locking and IO seems to be necessary. -- Still try to keep as much code as possible pure -respondJoin :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondJoin nsSTM msgSet = do -- atomically read and modify the node state according to the parsed request - responseMsg <- atomically $ do + (dataMigration, responseMsg) <- atomically $ do nsSnap <- readTVar nsSTM cache <- readTVar $ nodeCacheSTM nsSnap let @@ -455,24 +458,31 @@ respondJoin nsSTM msgSet = do , payload = Just responsePayload } writeTVar nsSTM joinedNS - pure joinResponse + ownService <- nodeService <$> readTVar (parentRealNode nsSnap) + let + serviceDataMigrator = migrateData ownService (getNid nsSnap) lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS) + lowerKeyBound = maybe (getNid nsSnap) getNid $ headMay (predecessors nsSnap) + pure (Just serviceDataMigrator, joinResponse) -- otherwise respond with empty payload - else pure Response { + else pure (Nothing, Response { requestID = requestID aRequestPart , senderID = getNid nsSnap , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 , isFinalPart = False , action = Join , payload = Nothing - } + }) + -- as DHT response is required immediately, fork the service data migration push + -- into a new thread. That's kind of ugly but the best I can think of so far + when (isJust dataMigration) (forkIO (fromJust dataMigration >> pure ()) >> pure ()) pure $ serialiseMessage sendMessageSize responseMsg -- TODO: notify service layer to copy over data now handled by the new joined node -- ....... request sending ....... -- | send a join request and return the joined 'LocalNodeState' including neighbours -requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted +requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted -> LocalNodeStateSTM s -- ^ joining NodeState -> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information requestJoin toJoinOn ownStateSTM = do @@ -511,12 +521,15 @@ requestJoin toJoinOn ownStateSTM = do pure (cacheInsertQ, newState) -- execute the cache insertions mapM_ (\f -> f joinedState) cacheInsertQ - pure $ if responses == Set.empty - then Left $ "join error: got no response from " <> show (getNid toJoinOn) + if responses == Set.empty + then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) else if null (predecessors joinedState) && null (successors joinedState) - then Left "join error: no predecessors or successors" + then pure $ Left "join error: no predecessors or successors" -- successful join - else Right ownStateSTM + else do + -- wait for migration data to be completely received + waitForMigrationFrom (nodeService prn) (getNid ownState) + pure $ Right ownStateSTM ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index c55d94c..f544061 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -152,9 +152,10 @@ nodeStateInit realNodeSTM = do -- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed -- for resolving the new node's position. -fediChordBootstrapJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState' - -> (String, PortNumber) -- ^ domain and port of a bootstrapping node - -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a +fediChordBootstrapJoin :: Service s (RealNodeSTM s) + => LocalNodeStateSTM s -- ^ the local 'NodeState' + -> (String, PortNumber) -- ^ domain and port of a bootstrapping node + -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordBootstrapJoin nsSTM bootstrapNode = do -- can be invoked multiple times with all known bootstrapping nodes until successfully joined @@ -170,7 +171,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do -- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. -- Unjoined try joining instead. -convergenceSampleThread :: LocalNodeStateSTM s -> IO () +convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () convergenceSampleThread nsSTM = forever $ do nsSnap <- readTVarIO nsSTM parentNode <- readTVarIO $ parentRealNode nsSnap @@ -201,7 +202,7 @@ convergenceSampleThread nsSTM = forever $ do -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. -tryBootstrapJoining :: LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s)) +tryBootstrapJoining :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s)) tryBootstrapJoining nsSTM = do bss <- atomically $ do nsSnap <- readTVar nsSTM @@ -249,8 +250,9 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do -- | join a node to the DHT using the global node cache -- node's position. -fediChordVserverJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState' - -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a +fediChordVserverJoin :: Service s (RealNodeSTM s) + => LocalNodeStateSTM s -- ^ the local 'NodeState' + -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordVserverJoin nsSTM = do ns <- readTVarIO nsSTM @@ -281,10 +283,10 @@ fediChordVserverLeave ns = do (\neighb -> do leaveResponse <- requestLeave ns neighb case leaveResponse of - Left _ -> sendUntilSuccess (i+1) + Left _ -> sendUntilSuccess (i+1) -- return first successfully contacted neighbour, -- so it can be contacted by the service layer for migration - Right _ -> pure $ Right neighb + Right _ -> pure $ Right neighb ) $ atMay (successors ns) i migrateSuccessor :: (MonadError String m, MonadIO m) => m () @@ -293,15 +295,18 @@ fediChordVserverLeave ns = do successorLeave <- liftIO $ sendUntilSuccess 0 -- trigger service data transfer for abandoned key space migrateToNode <- liftEither successorLeave + let lowerKeyBound = maybe (getNid ns) getNid $ headMay (predecessors ns) ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode ns) - migrationResult <- liftIO $ migrateData ownService (getNid ns) (getNid migrateToNode) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode) + -- previously held data is the one between the immediate predecessor and + -- the own ID + migrationResult <- liftIO $ migrateData ownService (getNid ns) lowerKeyBound (getNid ns) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode) liftEither migrationResult -- | Wait for new cache entries to appear and then try joining on them. -- Exits after successful joining. -joinOnNewEntriesThread :: LocalNodeStateSTM s -> IO () +joinOnNewEntriesThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () joinOnNewEntriesThread nsSTM = loop where loop = do @@ -576,7 +581,7 @@ sendThread sock sendQ = forever $ do sendAllTo sock packet addr -- | Sets up and manages the main server threads of FediChord -fediMainThreads :: Socket -> LocalNodeStateSTM s -> IO () +fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO () fediMainThreads sock nsSTM = do ns <- readTVarIO nsSTM putStrLn "launching threads" @@ -619,7 +624,8 @@ requestMapPurge mapVar = forever $ do -- | Wait for messages, deserialise them, manage parts and acknowledgement status, -- and pass them to their specific handling function. -fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue +fediMessageHandler :: Service s (RealNodeSTM s) + => TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue -> LocalNodeStateSTM s -- ^ acting NodeState -> IO () diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 214ece2..cbd3a58 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -426,10 +426,13 @@ class Service s d where getListeningPortFromService :: (Integral i) => s d -> i -- | trigger a service data migration of data between the two given keys migrateData :: s d + -> NodeID -- ^ source/ sender node ID -> NodeID -- ^ start key -> NodeID -- ^ end key -> (String, Int) -- ^ hostname and port of target service -> IO (Either String ()) -- ^ success or failure + -- | Wait for an incoming migration from a given node to succeed, may block forever + waitForMigrationFrom :: s d -> NodeID -> IO () instance Hashable.Hashable NodeID where hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 71998df..c277327 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -10,6 +10,7 @@ module Hash2Pub.PostService where import Control.Concurrent import Control.Concurrent.Async +import Control.Concurrent.MVar import Control.Concurrent.STM import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TChan @@ -46,20 +47,21 @@ import Hash2Pub.RingMap data PostService d = PostService - { serviceConf :: ServiceConf + { serviceConf :: ServiceConf -- queues, other data structures - , baseDHT :: (DHT d) => d - , serviceThread :: TVar ThreadId - , subscribers :: TVar RelayTags + , baseDHT :: (DHT d) => d + , serviceThread :: TVar ThreadId + , subscribers :: TVar RelayTags -- ^ for each tag store the subscribers + their queue - , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) + , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) -- ^ tags subscribed by the own node have an assigned lease time - , ownPosts :: TVar (HSet.HashSet Txt.Text) + , ownPosts :: TVar (HSet.HashSet Txt.Text) -- ^ just store the existence of posts for saving memory, - , relayInQueue :: TQueue (Hashtag, PostID, PostContent) + , relayInQueue :: TQueue (Hashtag, PostID, PostContent) -- ^ Queue for processing incoming posts of own instance asynchronously - , postFetchQueue :: TQueue PostID - , httpMan :: HTTP.Manager + , postFetchQueue :: TQueue PostID + , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ())) + , httpMan :: HTTP.Manager } deriving (Typeable) @@ -86,6 +88,7 @@ instance DHT d => Service PostService d where ownPostVar <- newTVarIO HSet.empty relayInQueue' <- newTQueueIO postFetchQueue' <- newTQueueIO + migrationsInProgress' <- newTVarIO HMap.empty httpMan' <- HTTP.newManager HTTP.defaultManagerSettings let thisService = PostService { @@ -97,6 +100,7 @@ instance DHT d => Service PostService d where , ownPosts = ownPostVar , relayInQueue = relayInQueue' , postFetchQueue = postFetchQueue' + , migrationsInProgress = migrationsInProgress' , httpMan = httpMan' } port' = fromIntegral (confServicePort conf) @@ -117,6 +121,17 @@ instance DHT d => Service PostService d where migrateData = clientDeliverSubscriptions + waitForMigrationFrom serv fromID = do + migrationSynchroniser <- atomically $ do + syncPoint <- HMap.lookup fromID <$> readTVar (migrationsInProgress serv) + maybe + -- decision: this function blocks until it gets an incoming migration from given ID + retry + pure + syncPoint + -- block until migration finished + takeMVar migrationSynchroniser + -- | return a WAI application postServiceApplication :: DHT d => PostService d -> Application @@ -136,7 +151,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent -- delivery endpoint of newly published posts of the relay's instance - :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text + :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text -- endpoint for delivering the subscriptions and outstanding queue :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text -- fetch endpoint for posts, full post ID is http://$domain/post/$postid @@ -194,10 +209,14 @@ newtype UnhandledTagException = UnhandledTagException String instance Exception UnhandledTagException -subscriptionDelivery :: DHT d => PostService d -> Txt.Text -> Handler Txt.Text -subscriptionDelivery serv subList = do +subscriptionDelivery :: DHT d => PostService d -> Integer -> Txt.Text -> Handler Txt.Text +subscriptionDelivery serv senderID subList = do let tagSubs = Txt.lines subList + -- signal that the migration is in progress + syncMVar <- liftIO newEmptyMVar + liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $ + HMap.insert (fromInteger senderID) syncMVar -- In favor of having the convenience of rolling back the transaction once a -- not-handled tag occurs, this results in a single large transaction. -- Hopefully the performance isn't too bad. @@ -211,6 +230,12 @@ subscriptionDelivery serv subList = do `catchSTM` (\e -> pure . Left $ show (e :: UnhandledTagException)) -- TODO: potentially log this :: STM (Either String ())) + -- TODO: should this always signal migration finished to avoid deadlocksP + liftIO $ putMVar syncMVar () -- wakes up waiting thread + liftIO $ putMVar syncMVar () -- blocks until waiting thread has resumed + -- delete this migration from ongoing ones + liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $ + HMap.delete (fromInteger senderID) case res of Left err -> throwError err410 {errBody = BSUL.fromString err} Right _ -> pure "" @@ -322,11 +347,12 @@ relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postM -- and their outstanding delivery queue to another instance. -- If the transfer succeeds, the transfered subscribers are removed from the local list. clientDeliverSubscriptions :: PostService d + -> NodeID -- ^ sender node ID -> NodeID -- ^ fromTag -> NodeID -- ^ toTag -> (String, Int) -- ^ hostname and port of instance to deliver to -> IO (Either String ()) -- Either signals success or failure -clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do +clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do -- collect tag intearval intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] @@ -350,7 +376,7 @@ clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do "" intervalTags -- send subscribers - resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) "")) + resp <- runClientM (subscriptionDeliveryClient (getNodeID fromNode) subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) "")) -- on failure return a Left, otherwise delete subscription entry case resp of Left err -> pure . Left . show $ err