Merge branch 'data_migration': closes #36

this simple implementation still contains minor design issues, for which
tickets have been filed though
This commit is contained in:
Trolli Schmittlauch 2020-08-18 12:05:55 +02:00
commit b48251d29d
8 changed files with 245 additions and 75 deletions

View file

@ -89,8 +89,8 @@ StabiliseResponsePayload ::= SEQUENCE {
LeaveRequestPayload ::= SEQUENCE {
successors SEQUENCE OF NodeState,
predecessors SEQUENCE OF NodeState
-- ToDo: transfer of own data to newly responsible node
predecessors SEQUENCE OF NodeState,
doMigration BOOLEAN
}
LeaveResponsePayload ::= NULL -- just a confirmation

View file

@ -38,6 +38,7 @@ splitPayload numParts pl@LeaveRequestPayload{} = [
LeaveRequestPayload {
leaveSuccessors = atDef [] (listInto numParts $ leaveSuccessors pl) (thisPart-1)
, leavePredecessors = atDef [] (listInto numParts $ leavePredecessors pl) (thisPart-1)
, leaveDoMigration = leaveDoMigration pl
} | thisPart <- [1..numParts] ]
splitPayload numParts pl@StabiliseResponsePayload{} = [
StabiliseResponsePayload {
@ -134,9 +135,8 @@ encodePayload payload'@LeaveRequestPayload{} =
<> [End Sequence
, Start Sequence]
<> concatMap encodeNodeState (leavePredecessors payload')
<> [End Sequence
, End Sequence]
-- currently StabiliseResponsePayload and LeaveRequestPayload are equal
<> [End Sequence]
<> [Boolean (leaveDoMigration payload'), End Sequence]
encodePayload payload'@StabiliseResponsePayload{} =
Start Sequence
: Start Sequence
@ -144,8 +144,7 @@ encodePayload payload'@StabiliseResponsePayload{} =
<> [End Sequence
, Start Sequence]
<> concatMap encodeNodeState (stabilisePredecessors payload')
<> [End Sequence
, End Sequence]
<> [End Sequence, End Sequence]
encodePayload payload'@StabiliseRequestPayload = [Null]
encodePayload payload'@QueryIDResponsePayload{} =
let
@ -415,9 +414,11 @@ parseLeaveRequest :: ParseASN1 ActionPayload
parseLeaveRequest = onNextContainer Sequence $ do
succ' <- onNextContainer Sequence (getMany parseNodeState)
pred' <- onNextContainer Sequence (getMany parseNodeState)
doMigration <- parseBool
pure $ LeaveRequestPayload {
leaveSuccessors = succ'
, leavePredecessors = pred'
, leaveDoMigration = doMigration
}
parseLeaveResponse :: ParseASN1 ActionPayload

View file

@ -19,6 +19,7 @@ module Hash2Pub.DHTProtocol
, sendQueryIdMessages
, requestQueryID
, requestJoin
, requestLeave
, requestPing
, requestStabilise
, lookupMessage
@ -40,13 +41,14 @@ module Hash2Pub.DHTProtocol
)
where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception
import Control.Monad (foldM, forM, forM_)
import Control.Monad (foldM, forM, forM_, void, when)
import qualified Data.ByteString as BS
import Data.Either (rights)
import Data.Foldable (foldl', foldr')
@ -74,10 +76,11 @@ import Hash2Pub.FediChordTypes (CacheEntry (..),
LocalNodeState (..),
LocalNodeStateSTM, NodeCache,
NodeID, NodeState (..),
RealNode (..),
RealNode (..), RealNodeSTM,
RemoteNodeState (..),
RingEntry (..), RingMap (..),
addRMapEntry, addRMapEntryWith,
Service (..), addRMapEntry,
addRMapEntryWith,
cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred,
cacheLookupSucc, genNodeID,
@ -249,7 +252,8 @@ ackRequest _ _ = Map.empty
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
-- the response to be sent.
handleIncomingRequest :: LocalNodeStateSTM s -- ^ the handling node
handleIncomingRequest :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
-> SockAddr -- ^ source address of the request
@ -262,6 +266,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
case headMay . Set.elems $ msgSet of
Nothing -> pure ()
Just aPart -> do
let (SockAddrInet6 _ _ sourceIP _) = sourceAddr
queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns
-- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue
maybe (pure ()) (
@ -269,17 +274,36 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
)
=<< (case action aPart of
Ping -> Just <$> respondPing nsSTM msgSet
Join -> Just <$> respondJoin nsSTM msgSet
Join -> dropSpoofedIDs sourceIP nsSTM msgSet respondJoin
-- ToDo: figure out what happens if not joined
QueryID -> Just <$> respondQueryID nsSTM msgSet
-- only when joined
Leave -> if isJoined ns then Just <$> respondLeave nsSTM msgSet else pure Nothing
Stabilise -> if isJoined ns then Just <$> respondStabilise nsSTM msgSet else pure Nothing
Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing
Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing
)
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
-- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash
-- TODO: test case: mixed message types of parts
where
-- | Filter out requests with spoofed node IDs by recomputing the ID using
-- the sender IP.
-- For valid (non-spoofed) sender IDs, the passed responder function is invoked.
dropSpoofedIDs :: HostAddress6 -- msg source address
-> LocalNodeStateSTM s
-> Set.Set FediChordMessage -- message parts of the request
-> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- reponder function to be invoked for valid requests
-> IO (Maybe (Map.Map Integer BS.ByteString))
dropSpoofedIDs addr nsSTM' msgSet' responder =
let
aRequestPart = Set.elemAt 0 msgSet
senderNs = sender aRequestPart
givenSenderID = getNid senderNs
recomputedID = genNodeID addr (getDomain senderNs) (fromInteger $ getVServerID senderNs)
in
if recomputedID == givenSenderID
then Just <$> responder nsSTM' msgSet'
else pure Nothing
-- ....... response sending .......
@ -328,8 +352,7 @@ respondQueryID nsSTM msgSet = do
-- | Respond to a Leave request by removing the leaving node from local data structures
-- and confirming with response.
-- TODO: copy over key data from leaver and confirm
respondLeave :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondLeave :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondLeave nsSTM msgSet = do
-- combine payload of all parts
let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) ->
@ -338,16 +361,15 @@ respondLeave nsSTM msgSet = do
)
([],[]) msgSet
aRequestPart = Set.elemAt 0 msgSet
senderID = getNid . sender $ aRequestPart
leaveSenderID = getNid . sender $ aRequestPart
responseMsg <- atomically $ do
nsSnap <- readTVar nsSTM
-- remove leaving node from successors, predecessors and NodeCache
writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry senderID
writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry leaveSenderID
writeTVar nsSTM $
-- add predecessors and successors of leaving node to own lists
setPredecessors (filter ((/=) senderID . getNid) $ requestPreds <> predecessors nsSnap)
. setSuccessors (filter ((/=) senderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap
-- TODO: handle handover of key data
setPredecessors (filter ((/=) leaveSenderID . getNid) $ requestPreds <> predecessors nsSnap)
. setSuccessors (filter ((/=) leaveSenderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap
let leaveResponse = Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
@ -357,6 +379,10 @@ respondLeave nsSTM msgSet = do
, payload = Just LeaveResponsePayload
}
pure leaveResponse
-- if awaiting an incoming service data migration, collect the lock without blocking this thread
when (maybe False leaveDoMigration (payload aRequestPart)) $ do
ownService <- atomically $ nodeService <$> (readTVar nsSTM >>= (readTVar . parentRealNode))
void (forkIO $ waitForMigrationFrom ownService leaveSenderID)
pure $ serialiseMessage sendMessageSize responseMsg
-- | respond to stabilise requests by returning successor and predecessor list
@ -399,12 +425,11 @@ respondPing nsSTM msgSet = do
}
pure $ serialiseMessage sendMessageSize pingResponse
-- this modifies node state, so locking and IO seems to be necessary.
-- Still try to keep as much code as possible pure
respondJoin :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin nsSTM msgSet = do
-- atomically read and modify the node state according to the parsed request
responseMsg <- atomically $ do
(dataMigration, responseMsg) <- atomically $ do
nsSnap <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM nsSnap
let
@ -434,24 +459,31 @@ respondJoin nsSTM msgSet = do
, payload = Just responsePayload
}
writeTVar nsSTM joinedNS
pure joinResponse
ownService <- nodeService <$> readTVar (parentRealNode nsSnap)
let
serviceDataMigrator = migrateData ownService (getNid nsSnap) lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS)
lowerKeyBound = maybe (getNid nsSnap) getNid $ headMay (predecessors nsSnap)
pure (Just serviceDataMigrator, joinResponse)
-- otherwise respond with empty payload
else pure Response {
else pure (Nothing, Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = Join
, payload = Nothing
}
})
-- as DHT response is required immediately, fork the service data migration push
-- into a new thread. That's kind of ugly but the best I can think of so far
when (isJust dataMigration) (forkIO (fromJust dataMigration >> pure ()) >> pure ())
pure $ serialiseMessage sendMessageSize responseMsg
-- TODO: notify service layer to copy over data now handled by the new joined node
-- ....... request sending .......
-- | send a join request and return the joined 'LocalNodeState' including neighbours
requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted
requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted
-> LocalNodeStateSTM s -- ^ joining NodeState
-> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information
requestJoin toJoinOn ownStateSTM = do
@ -490,12 +522,15 @@ requestJoin toJoinOn ownStateSTM = do
pure (cacheInsertQ, newState)
-- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ
pure $ if responses == Set.empty
then Left $ "join error: got no response from " <> show (getNid toJoinOn)
if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else if null (predecessors joinedState) && null (successors joinedState)
then Left "join error: no predecessors or successors"
then pure $ Left "join error: no predecessors or successors"
-- successful join
else Right ownStateSTM
else do
-- wait for migration data to be completely received
waitForMigrationFrom (nodeService prn) (getNid ownState)
pure $ Right ownStateSTM
)
`catch` (\e -> pure . Left $ displayException (e :: IOException))
@ -625,6 +660,38 @@ requestStabilise ns neighbour = do
) responses
-- | Send a Leave request to the specified node.
-- Service data transfer needs to be done separately, as not all neighbours
-- that need to know about the leaving handle the new service data.
requestLeave :: LocalNodeState s
-> Bool -- whether to migrate service data
-> RemoteNodeState -- target node
-> IO (Either String ()) -- error or success
requestLeave ns doMigration target = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
let leavePayload = LeaveRequestPayload {
leaveSuccessors = successors ns
, leavePredecessors = predecessors ns
, leaveDoMigration = doMigration
}
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo 5000 3 (\rid ->
Request {
requestID = rid
, sender = toRemoteNodeState ns
, part = 1
, isFinalPart = False
, action = Leave
, payload = Just leavePayload
}
)
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
either
-- forward IO error messages
(pure . Left)
-- empty payload, so no processing required
(const . pure . Right $ ())
responses
requestPing :: LocalNodeState s -- ^ sending node
-> RemoteNodeState -- ^ node to be PINGed
-> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node

View file

@ -3,7 +3,6 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeSynonymInstances #-}
{- |
Module : FediChord
Description : An opinionated implementation of the EpiChord DHT by Leong et al.
@ -40,7 +39,7 @@ module Hash2Pub.FediChord (
, bsAsIpAddr
, FediChordConf(..)
, fediChordInit
, fediChordJoin
, fediChordVserverJoin
, fediChordBootstrapJoin
, tryBootstrapJoining
, fediMainThreads
@ -153,7 +152,8 @@ nodeStateInit realNodeSTM = do
-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed
-- for resolving the new node's position.
fediChordBootstrapJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState'
fediChordBootstrapJoin :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the local 'NodeState'
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message
@ -171,7 +171,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
-- Unjoined try joining instead.
convergenceSampleThread :: LocalNodeStateSTM s -> IO ()
convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
convergenceSampleThread nsSTM = forever $ do
nsSnap <- readTVarIO nsSTM
parentNode <- readTVarIO $ parentRealNode nsSnap
@ -202,7 +202,7 @@ convergenceSampleThread nsSTM = forever $ do
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
tryBootstrapJoining :: LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s))
tryBootstrapJoining :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s))
tryBootstrapJoining nsSTM = do
bss <- atomically $ do
nsSnap <- readTVar nsSTM
@ -250,10 +250,11 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
-- | join a node to the DHT using the global node cache
-- node's position.
fediChordJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState'
fediChordVserverJoin :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the local 'NodeState'
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message
fediChordJoin nsSTM = do
fediChordVserverJoin nsSTM = do
ns <- readTVarIO nsSTM
-- 1. get routed to the currently responsible node
currentlyResponsible <- requestQueryID ns $ getNid ns
@ -264,10 +265,48 @@ fediChordJoin nsSTM = do
Left err -> pure . Left $ "Error joining on " <> err
Right joinedNS -> pure . Right $ joinedNS
fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m ()
fediChordVserverLeave ns = do
-- TODO: deal with failure of all successors, e.g. by invoking a stabilise
-- and looking up further successors. So far we just fail here.
_ <- migrateSuccessor
-- then send leave messages to all other neighbours
-- TODO: distinguish between sending error causes on our side and on the
-- network/ target side. The latter cannot be fixed anyways while the
-- former could be worked around
-- send a leave message to all neighbours
forM_ (predecessors ns <> successors ns) $ liftIO . requestLeave ns False
where
sendUntilSuccess i = maybe
(pure $ Left "Exhausted all successors")
(\neighb -> do
leaveResponse <- requestLeave ns True neighb
case leaveResponse of
Left _ -> sendUntilSuccess (i+1)
-- return first successfully contacted neighbour,
-- so it can be contacted by the service layer for migration
Right _ -> pure $ Right neighb
)
$ atMay (successors ns) i
migrateSuccessor :: (MonadError String m, MonadIO m) => m ()
migrateSuccessor = do
-- send leave message to first responding successor
successorLeave <- liftIO $ sendUntilSuccess 0
-- trigger service data transfer for abandoned key space
migrateToNode <- liftEither successorLeave
let lowerKeyBound = maybe (getNid ns) getNid $ headMay (predecessors ns)
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode ns)
-- previously held data is the one between the immediate predecessor and
-- the own ID
migrationResult <- liftIO $ migrateData ownService (getNid ns) lowerKeyBound (getNid ns) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode)
liftEither migrationResult
-- | Wait for new cache entries to appear and then try joining on them.
-- Exits after successful joining.
joinOnNewEntriesThread :: LocalNodeStateSTM s -> IO ()
joinOnNewEntriesThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
joinOnNewEntriesThread nsSTM = loop
where
loop = do
@ -284,14 +323,13 @@ joinOnNewEntriesThread nsSTM = loop
pure ()
-- otherwise try joining
FORWARD _ -> do
joinResult <- fediChordJoin nsSTM
joinResult <- fediChordVserverJoin nsSTM
either
-- on join failure, sleep and retry
-- TODO: make delay configurable
(const $ threadDelay (30 * 10^6) >> loop)
(const $ pure ())
joinResult
emptyset = Set.empty -- because pattern matches don't accept qualified names
-- | cache updater thread that waits for incoming NodeCache update instructions on
@ -427,9 +465,9 @@ checkCacheSliceInvariants ns
-- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until
-- one responds, and get their neighbours for maintaining the own neighbour lists.
-- If necessary, request new neighbours.
stabiliseThread :: LocalNodeStateSTM s -> IO ()
stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
stabiliseThread nsSTM = forever $ do
ns <- readTVarIO nsSTM
oldNs <- readTVarIO nsSTM
putStrLn "stabilise run: begin"
@ -440,8 +478,8 @@ stabiliseThread nsSTM = forever $ do
-- don't contact all neighbours unless the previous one failed/ Left ed
predStabilise <- stabiliseClosestResponder ns predecessors 1 []
succStabilise <- stabiliseClosestResponder ns predecessors 1 []
predStabilise <- stabiliseClosestResponder oldNs predecessors 1 []
succStabilise <- stabiliseClosestResponder oldNs predecessors 1 []
let
(predDeletes, predNeighbours) = either (const ([], [])) id predStabilise
@ -480,12 +518,34 @@ stabiliseThread nsSTM = forever $ do
writeTVar nsSTM $ addSuccessors [nextEntry] latestNs
)
newNs <- readTVarIO nsSTM
let
oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs
newPredecessor = headMay $ predecessors newNs
-- manage need for service data migration:
maybe (pure ()) (\newPredecessor' ->
when (
isJust newPredecessor
&& oldPredecessor /= newPredecessor'
-- case: predecessor has changed in some way => own responsibility has changed in some way
-- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data
-- If this is due to a node leaving without transfering its data, try getting it from a redundant copy
-- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it
&& isInOwnResponsibilitySlice newPredecessor' oldNs) $ do
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs)
migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor')
-- TODO: deal with migration failure, e.g retry
pure ()
)
newPredecessor
putStrLn "stabilise run: end"
-- TODO: make delay configurable
threadDelay (60 * 10^6)
where
-- | send a stabilise request to the n-th neighbour
-- (specified by the provided getter function) and on failure retr
-- (specified by the provided getter function) and on failure retry
-- with the n+1-th neighbour.
-- On success, return 2 lists: The failed nodes and the potential neighbours
-- returned by the queried node.
@ -543,7 +603,7 @@ sendThread sock sendQ = forever $ do
sendAllTo sock packet addr
-- | Sets up and manages the main server threads of FediChord
fediMainThreads :: Socket -> LocalNodeStateSTM s -> IO ()
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO ()
fediMainThreads sock nsSTM = do
ns <- readTVarIO nsSTM
putStrLn "launching threads"
@ -586,7 +646,8 @@ requestMapPurge mapVar = forever $ do
-- | Wait for messages, deserialise them, manage parts and acknowledgement status,
-- and pass them to their specific handling function.
fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue
fediMessageHandler :: Service s (RealNodeSTM s)
=> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
-> LocalNodeStateSTM s -- ^ acting NodeState
-> IO ()

View file

@ -424,6 +424,15 @@ class Service s d where
-- | run the service
runService :: ServiceConf -> d -> IO (s d)
getListeningPortFromService :: (Integral i) => s d -> i
-- | trigger a service data migration of data between the two given keys
migrateData :: s d
-> NodeID -- ^ source/ sender node ID
-> NodeID -- ^ start key
-> NodeID -- ^ end key
-> (String, Int) -- ^ hostname and port of target service
-> IO (Either String ()) -- ^ success or failure
-- | Wait for an incoming migration from a given node to succeed, may block forever
waitForMigrationFrom :: s d -> NodeID -> IO ()
instance Hashable.Hashable NodeID where
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID

View file

@ -10,6 +10,7 @@ module Hash2Pub.PostService where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TChan
@ -59,6 +60,7 @@ data PostService d = PostService
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
, httpMan :: HTTP.Manager
}
deriving (Typeable)
@ -86,6 +88,7 @@ instance DHT d => Service PostService d where
ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO
postFetchQueue' <- newTQueueIO
migrationsInProgress' <- newTVarIO HMap.empty
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
let
thisService = PostService {
@ -97,6 +100,7 @@ instance DHT d => Service PostService d where
, ownPosts = ownPostVar
, relayInQueue = relayInQueue'
, postFetchQueue = postFetchQueue'
, migrationsInProgress = migrationsInProgress'
, httpMan = httpMan'
}
port' = fromIntegral (confServicePort conf)
@ -115,6 +119,19 @@ instance DHT d => Service PostService d where
getListeningPortFromService = fromIntegral . confServicePort . serviceConf
migrateData = clientDeliverSubscriptions
waitForMigrationFrom serv fromID = do
migrationSynchroniser <- atomically $ do
syncPoint <- HMap.lookup fromID <$> readTVar (migrationsInProgress serv)
maybe
-- decision: this function blocks until it gets an incoming migration from given ID
retry
pure
syncPoint
-- block until migration finished
takeMVar migrationSynchroniser
-- | return a WAI application
postServiceApplication :: DHT d => PostService d -> Application
@ -134,7 +151,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
:<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
-- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
@ -192,10 +209,14 @@ newtype UnhandledTagException = UnhandledTagException String
instance Exception UnhandledTagException
subscriptionDelivery :: DHT d => PostService d -> Txt.Text -> Handler Txt.Text
subscriptionDelivery serv subList = do
subscriptionDelivery :: DHT d => PostService d -> Integer -> Txt.Text -> Handler Txt.Text
subscriptionDelivery serv senderID subList = do
let
tagSubs = Txt.lines subList
-- signal that the migration is in progress
syncMVar <- liftIO newEmptyMVar
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
HMap.insert (fromInteger senderID) syncMVar
-- In favor of having the convenience of rolling back the transaction once a
-- not-handled tag occurs, this results in a single large transaction.
-- Hopefully the performance isn't too bad.
@ -209,6 +230,14 @@ subscriptionDelivery serv subList = do
`catchSTM` (\e -> pure . Left $ show (e :: UnhandledTagException))
-- TODO: potentially log this
:: STM (Either String ()))
-- TODO: should this always signal migration finished to avoid deadlocksP
liftIO $ putMVar syncMVar () -- wakes up waiting thread
-- allow response to be completed independently from waiting thread
_ <- liftIO . forkIO $ do
putMVar syncMVar () -- blocks until waiting thread has resumed
-- delete this migration from ongoing ones
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
HMap.delete (fromInteger senderID)
case res of
Left err -> throwError err410 {errBody = BSUL.fromString err}
Right _ -> pure ""
@ -320,13 +349,14 @@ relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postM
-- and their outstanding delivery queue to another instance.
-- If the transfer succeeds, the transfered subscribers are removed from the local list.
clientDeliverSubscriptions :: PostService d
-> Hashtag -- ^ fromTag
-> Hashtag -- ^ toTag
-> NodeID -- ^ sender node ID
-> NodeID -- ^ fromTag
-> NodeID -- ^ toTag
-> (String, Int) -- ^ hostname and port of instance to deliver to
-> IO (Either String ()) -- Either signals success or failure
clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
-- collect tag intearval
intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack toTag) <$> readTVarIO (subscribers serv)
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
-- extract subscribers and posts
-- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT
@ -348,7 +378,7 @@ clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
""
intervalTags
-- send subscribers
resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
resp <- runClientM (subscriptionDeliveryClient (getNodeID fromNode) subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
-- on failure return a Left, otherwise delete subscription entry
case resp of
Left err -> pure . Left . show $ err

View file

@ -55,6 +55,7 @@ data ActionPayload = QueryIDRequestPayload
| LeaveRequestPayload
{ leaveSuccessors :: [RemoteNodeState]
, leavePredecessors :: [RemoteNodeState]
, leaveDoMigration :: Bool
}
| StabiliseRequestPayload
| PingRequestPayload

View file

@ -189,6 +189,7 @@ spec = do
lReqPayload = LeaveRequestPayload {
leaveSuccessors = someNodes
, leavePredecessors = someNodes
, leaveDoMigration = True
}
stabReqPayload = StabiliseRequestPayload
pingReqPayload = PingRequestPayload