Merge branch 'fix_networking': fix some DHT joins and crashes

several major flaws in the basic DHT communication had been discovered,
see individual commits for details
Trolli Schmittlauch 2020-08-27 00:39:08 +02:00
commit 1b5fc039b3
5 changed files with 118 additions and 90 deletions

@ -47,7 +47,7 @@ extra-source-files:
common deps
build-depends: base ^>=, containers ^>=, bytestring, utf8-string ^>=, network ^>=, time ^>=, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays
ghc-options: -Wall
ghc-options: -Wall -Wpartial-fields

@ -1,16 +1,16 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE OverloadedStrings #-}
module Main where
import System.Random
import Control.Concurrent
import Control.Monad (forM_)
import Control.Monad.State.Class
import Control.Monad.State.Strict (evalStateT)
import Control.Monad.IO.Class
import qualified Network.HTTP.Client as HTTP
import Control.Concurrent
import Control.Monad (forM_)
import Control.Monad.IO.Class
import Control.Monad.State.Class
import Control.Monad.State.Strict (evalStateT)
import qualified Network.HTTP.Client as HTTP
import System.Random
import Hash2Pub.PostService (clientPublishPost, Hashtag)
import Hash2Pub.PostService (Hashtag, clientPublishPost)
-- placeholder post data definition

@ -49,9 +49,11 @@ import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception
import Control.Monad (foldM, forM, forM_, void, when)
import Control.Monad.Except (MonadError (..), runExceptT)
import Control.Monad.IO.Class (MonadIO (..))
import qualified Data.ByteString as BS
import Data.Either (rights)
import Data.Foldable (foldl', foldr', foldrM)
import Data.Foldable (foldl', foldr', foldrM)
import Data.Functor.Identity
import Data.IP (IPv6, fromHostAddress6,
@ -514,28 +516,28 @@ requestJoin toJoinOn ownStateSTM = do
([], Set.empty, Set.empty)
-- sort, slice and set the accumulated successors and predecessors
newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap
-- the contacted node itself is a successor as well and, with few
-- nodes, can be a predecessor as well
newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap
writeTVar ownStateSTM newState
pure (cacheInsertQ, newState)
-- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ
if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else if null (predecessors joinedState) && null (successors joinedState)
then pure $ Left "join error: no predecessors or successors"
-- successful join
else do
-- wait for migration data to be completely received
waitForMigrationFrom (nodeService prn) (getNid ownState)
pure $ Right ownStateSTM
else do
-- wait for migration data to be completely received
waitForMigrationFrom (nodeService prn) (getNid toJoinOn)
pure $ Right ownStateSTM
`catch` (\e -> pure . Left $ displayException (e :: IOException))
-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID.
requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node
requestQueryID :: (MonadIO m, MonadError String m)
=> LocalNodeState s -- ^ NodeState of the querying node
-> NodeID -- ^ target key ID to look up
-> IO RemoteNodeState -- ^ the node responsible for handling that key
-> m RemoteNodeState -- ^ the node responsible for handling that key
-- 1. do a local lookup for the l closest nodes
-- 2. create l sockets
-- 3. send a message async concurrently to all l nodes
@ -543,23 +545,23 @@ requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
-- TODO: deal with lookup failures
requestQueryID ns targetID = do
firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns
firstCacheSnapshot <- liftIO . readTVarIO . nodeCacheSTM $ ns
-- TODO: make maxAttempts configurable
queryIdLookupLoop firstCacheSnapshot ns 50 targetID
-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining
queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState
queryIdLookupLoop :: (MonadIO m, MonadError String m) => NodeCache -> LocalNodeState s -> Int -> NodeID -> m RemoteNodeState
-- return node itself as default fallback value against infinite recursion.
-- TODO: consider using an Either instead of a default value
queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns
queryIdLookupLoop _ ns 0 _ = throwError "exhausted maximum lookup attempts"
queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do
let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID
-- FOUND can only be returned if targetID is owned by local node
case localResult of
FOUND thisNode -> pure thisNode
FORWARD nodeSet -> do
responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
now <- getPOSIXTime
responseEntries <- liftIO $ sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
now <- liftIO getPOSIXTime
-- check for a FOUND and return it
case responseEntries of
FOUND foundNode -> pure foundNode
@ -593,8 +595,10 @@ sendQueryIdMessages targetID ns lParam targets = do
-- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing
now <- getPOSIXTime
-- collect cache entries from all responses
foldM (\acc resp -> do
let entrySet = case queryResult <$> payload resp of
foldrM (\resp acc -> do
responseResult = queryResult <$> payload resp
entrySet = case responseResult of
Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now)
Just (FORWARD resultset) -> resultset
_ -> Set.empty
@ -604,10 +608,15 @@ sendQueryIdMessages targetID ns lParam targets = do
-- return accumulated QueryResult
pure $ case acc of
-- once a FOUND as been encountered, return this as a result
isFound@FOUND{} -> isFound
FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet
FOUND{} -> acc
| maybe False isFound responseResult -> fromJust responseResult
| otherwise -> FORWARD $ entrySet `Set.union` accSet
) (FORWARD Set.empty) responses
isFound FOUND{} = True
isFound _ = False
-- | Create a QueryID message to be supplied to 'sendRequestTo'
lookupMessage :: Integral i
@ -649,8 +658,7 @@ requestStabilise ns neighbour = do
([],[]) respSet
-- update successfully responded neighbour in cache
now <- getPOSIXTime
maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet)
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned"
else Right (responsePreds, responseSuccs)
@ -766,14 +774,14 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> IO ()
sendAndAck responseQueue sock' remainingSends = do
sendMany sock $ Map.elems remainingSends
sendMany sock' $ Map.elems remainingSends
-- if all requests have been acked/ responded to, return prematurely
recvLoop sock' responseQueue remainingSends Set.empty Nothing
recvLoop :: Socket
-> TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> Set.Set Integer -- ^ already received response part numbers
-> Maybe Integer -- ^ total number of response parts if already known
-> Maybe Integer -- ^ total number of response parts if already known
-> IO ()
recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do
-- 65535 is maximum length of UDP packets, as long as
@ -786,10 +794,11 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
newTotalParts = if isFinalPart msg then Just (part msg) else totalParts
newRemaining = Map.delete (part msg) remainingSends'
newReceivedParts = Set.insert (part msg) receivedPartNums
if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
if Map.null newRemaining && maybe False (\p -> Set.size newReceivedParts == fromIntegral p) newTotalParts
then pure ()
else recvLoop sock' responseQueue newRemaining receivedPartNums newTotalParts
else recvLoop sock' responseQueue newRemaining newReceivedParts newTotalParts
-- drop errors and invalid messages
Right Request{} -> pure () -- expecting a response, not a request
Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts
@ -816,6 +825,18 @@ queueDeleteEntry :: NodeID
-> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
-- | enqueue the timestamp update and verification marking of an entry in the
-- global 'NodeCache'.
queueUpdateVerifieds :: Foldable c
=> c NodeID
-> LocalNodeState s
-> IO ()
queueUpdateVerifieds nIds ns = do
now <- getPOSIXTime
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
markCacheEntryAsVerified (Just now) nid'
-- | retry an IO action at most *i* times until it delivers a result
attempts :: Int -- ^ number of retries *i*
-> IO (Maybe a) -- ^ action to retry

@ -166,6 +166,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
currentlyResponsible <- liftEither lookupResp
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
-- 2. then send a join to the currently responsible node
liftIO $ putStrLn "send a bootstrap Join"
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
liftEither joinResult
@ -244,26 +245,24 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
initCache resp
currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
pure $ Right currentlyResponsible
currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
pure currentlyResponsible
-- | join a node to the DHT using the global node cache
-- node's position.
fediChordVserverJoin :: Service s (RealNodeSTM s)
fediChordVserverJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
=> LocalNodeStateSTM s -- ^ the local 'NodeState'
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-> m (LocalNodeStateSTM s) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message
fediChordVserverJoin nsSTM = do
ns <- readTVarIO nsSTM
ns <- liftIO $ readTVarIO nsSTM
-- 1. get routed to the currently responsible node
currentlyResponsible <- requestQueryID ns $ getNid ns
putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
-- 2. then send a join to the currently responsible node
joinResult <- requestJoin currentlyResponsible nsSTM
case joinResult of
Left err -> pure . Left $ "Error joining on " <> err
Right joinedNS -> pure . Right $ joinedNS
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
liftEither joinResult
fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m ()
fediChordVserverLeave ns = do
@ -323,7 +322,7 @@ joinOnNewEntriesThread nsSTM = loop
pure ()
-- otherwise try joining
FORWARD _ -> do
joinResult <- fediChordVserverJoin nsSTM
joinResult <- runExceptT $ fediChordVserverJoin nsSTM
-- on join failure, sleep and retry
-- TODO: make delay configurable
@ -504,18 +503,26 @@ stabiliseThread nsSTM = forever $ do
-- try looking up additional neighbours if list too short
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM
nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
atomically $ do
latestNs <- readTVar nsSTM
writeTVar nsSTM $ addPredecessors [nextEntry] latestNs
nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
(const $ pure ())
(\entry -> atomically $ do
latestNs <- readTVar nsSTM
writeTVar nsSTM $ addPredecessors [entry] latestNs
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM
nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
atomically $ do
latestNs <- readTVar nsSTM
writeTVar nsSTM $ addSuccessors [nextEntry] latestNs
nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
(const $ pure ())
(\entry -> atomically $ do
latestNs <- readTVar nsSTM
writeTVar nsSTM $ addSuccessors [entry] latestNs
newNs <- readTVarIO nsSTM
@ -638,7 +645,7 @@ requestMapPurge :: MVar RequestMap -> IO ()
requestMapPurge mapVar = forever $ do
rMapState <- takeMVar mapVar
now <- getPOSIXTime
putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) ->
putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) ->
now - ts < responsePurgeAge
) rMapState
threadDelay $ round responsePurgeAge * 2 * 10^6
@ -757,7 +764,7 @@ getKeyResponsibility nodeSTM lookupKey = do
-- new entry.
-- If no vserver is active in the DHT, 'Nothing' is returned.
updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber))
updateLookupCache nodeSTM lookupKey = do
updateLookupCache nodeSTM keyToLookup = do
(node, lookupSource) <- atomically $ do
node <- readTVar nodeSTM
let firstVs = headMay (vservers node)
@ -767,18 +774,25 @@ updateLookupCache nodeSTM lookupKey = do
pure (node, lookupSource)
maybe (do
-- if no local node available, delete cache entry and return Nothing
atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey
atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete keyToLookup
pure Nothing
(\n -> do
-- start a lookup from the node, update the cache with the lookup result and return it
newResponsible <- requestQueryID n lookupKey
let newEntry = (getDomain newResponsible, getServicePort newResponsible)
now <- getPOSIXTime
-- atomic update against lost updates
atomically $ modifyTVar' (lookupCacheSTM node) $
Map.insert lookupKey (CacheEntry False newEntry now)
pure $ Just newEntry
-- TODO: better retry management, because having no vserver joined yet should
-- be treated differently than other reasons for not getting a result.
newResponsible <- runExceptT $ requestQueryID n keyToLookup
(const $ pure Nothing)
(\result -> do
let newEntry = (getDomain result, getServicePort result)
now <- getPOSIXTime
-- atomic update against lost updates
atomically $ modifyTVar' (lookupCacheSTM node) $
Map.insert keyToLookup (CacheEntry False newEntry now)
pure $ Just newEntry
) lookupSource

@ -10,37 +10,30 @@ module Hash2Pub.PostService where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever, when, void)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.STM
import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever, void,
import Control.Monad.IO.Class (liftIO)
import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
import Data.Maybe (fromMaybe, isJust)
import Data.String (fromString)
import qualified Data.Text.Lazy as Txt
import Data.Text.Normalize (NormalizationMode (NFC),
import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet
import Data.Maybe (fromMaybe, isJust)
import Data.String (fromString)
import qualified Data.Text.Lazy as Txt
import Data.Text.Normalize (NormalizationMode (NFC), normalize)
import Data.Time.Clock.POSIX
import Data.Typeable (Typeable)
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTPT
import Data.Typeable (Typeable)
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTPT
import System.Random
import Text.Read (readEither)
import Text.Read (readEither)
import qualified Network.Wai.Handler.Warp as Warp
import qualified Network.Wai.Handler.Warp as Warp
import Servant
import Servant.Client
import Servant.Server
import Hash2Pub.FediChordTypes
import Hash2Pub.RingMap
@ -355,7 +348,7 @@ clientDeliverSubscriptions :: PostService d
-> (String, Int) -- ^ hostname and port of instance to deliver to
-> IO (Either String ()) -- Either signals success or failure
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
-- collect tag intearval
-- collect tag interval
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
-- extract subscribers and posts