Compare commits
3 commits
e3bfa26ddb
...
702684b1a9
Author | SHA1 | Date | |
---|---|---|---|
|
702684b1a9 | ||
|
43eb04dfea | ||
|
ad1465c5fe |
|
@ -203,14 +203,14 @@ queryIdLookupLoop cacheSnapshot ns targetID = do
|
|||
now <- getPOSIXTime
|
||||
newLCache <- foldM (\oldCache resp -> do
|
||||
let entriesToInsert = case queryResult <$> payload resp of
|
||||
Just (FOUND result1) -> [addCacheEntryPure now (RemoteCacheEntry result1 now)]
|
||||
Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset
|
||||
Just (FOUND result1) -> [RemoteCacheEntry result1 now]
|
||||
Just (FORWARD resultset) -> Set.elems resultset
|
||||
_ -> []
|
||||
-- forward entries to global cache
|
||||
forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) entry
|
||||
queueAddEntries entriesToInsert ns
|
||||
-- insert entries into local cache copy
|
||||
pure $ foldl' (
|
||||
\oldLCache insertFunc -> insertFunc oldLCache
|
||||
pure $ foldr' (
|
||||
addCacheEntryPure now
|
||||
) oldCache entriesToInsert
|
||||
) cacheSnapshot responses
|
||||
|
||||
|
@ -283,7 +283,7 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
|||
|
||||
|
||||
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
||||
queueAddEntries :: [RemoteCacheEntry]
|
||||
queueAddEntries :: Foldable c => c RemoteCacheEntry
|
||||
-> LocalNodeState
|
||||
-> IO ()
|
||||
queueAddEntries entries ns = do
|
||||
|
@ -334,4 +334,5 @@ mkSendSocket dest destPort = do
|
|||
destAddr <- addrAddress <$> resolve (Just dest) (Just destPort)
|
||||
sendSock <- socket AF_INET6 Datagram defaultProtocol
|
||||
setSocketOption sendSock IPv6Only 1
|
||||
connect sendSock destAddr
|
||||
pure sendSock
|
||||
|
|
|
@ -37,6 +37,8 @@ module Hash2Pub.FediChord (
|
|||
, bsAsIpAddr
|
||||
, FediChordConf(..)
|
||||
, fediChordInit
|
||||
, fediChordJoin
|
||||
, fediChordBootstrapJoin
|
||||
, nodeStateInit
|
||||
, mkServerSocket
|
||||
, mkSendSocket
|
||||
|
@ -109,11 +111,13 @@ nodeStateInit conf = do
|
|||
}
|
||||
pure initialState
|
||||
|
||||
fediChordJoin :: LocalNodeState -- ^ the local 'NodeState'
|
||||
-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed
|
||||
-- for resolving the new node's position.
|
||||
fediChordBootstrapJoin :: LocalNodeState -- ^ the local 'NodeState'
|
||||
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
|
||||
-> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a
|
||||
-- successful join, otherwise an error message
|
||||
fediChordJoin ns (joinHost, joinPort) = do
|
||||
fediChordBootstrapJoin ns (joinHost, joinPort) = do
|
||||
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined
|
||||
sock <- mkSendSocket joinHost joinPort
|
||||
-- 1. get routed to placement of own ID until FOUND:
|
||||
|
@ -132,20 +136,26 @@ fediChordJoin ns (joinHost, joinPort) = do
|
|||
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
||||
)
|
||||
initCache bootstrapResponse
|
||||
fediChordJoin bootstrapCache ns
|
||||
|
||||
-- | join a node to the DHT, using the provided cache snapshot for resolving the new
|
||||
-- node's position.
|
||||
fediChordJoin :: NodeCache -- ^ a snapshot of the NodeCache to
|
||||
-- use for ID lookup
|
||||
-> LocalNodeState -- ^ the local 'NodeState'
|
||||
-> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a
|
||||
-- successful join, otherwise an error message
|
||||
fediChordJoin cacheSnapshot ns = do
|
||||
-- get routed to the currently responsible node, based on the response
|
||||
-- from the bootstrapping node
|
||||
currentlyResponsible <- queryIdLookupLoop bootstrapCache ns $ getNid ns
|
||||
-- do actual join
|
||||
currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns $ getNid ns
|
||||
-- 2. then send a join to the currently responsible node
|
||||
joinResult <- requestJoin currentlyResponsible ns
|
||||
case joinResult of
|
||||
Nothing -> pure . Left $ "Error joining on " <> show currentlyResponsible
|
||||
Just joinedNS -> pure . Right $ joinedNS
|
||||
|
||||
|
||||
-- 2. then send a join to the currently responsible node
|
||||
-- after successful join, finally add own node to the cache
|
||||
|
||||
|
||||
-- | cache updater thread that waits for incoming NodeCache update instructions on
|
||||
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
|
||||
cacheWriter :: LocalNodeState -> IO ()
|
||||
|
|
|
@ -422,6 +422,7 @@ data FediChordConf = FediChordConf
|
|||
{ confDomain :: String
|
||||
, confIP :: HostAddress6
|
||||
, confDhtPort :: Int
|
||||
, confBootstrapNodes :: [(String, PortNumber)]
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
|
|
|
@ -18,15 +18,17 @@ main = do
|
|||
-- currently no masking is necessary, as there is nothing to clean up
|
||||
cacheWriterThread <- forkIO $ cacheWriter thisNode
|
||||
-- idea: list of bootstrapping nodes, try joining within a timeout
|
||||
joinedState <- fediChordBootstrapJoin thisNode $ head . confBootstrapNodes $ conf
|
||||
-- stop main thread from terminating during development
|
||||
getChar
|
||||
pure ()
|
||||
|
||||
readConfig :: IO FediChordConf
|
||||
readConfig = do
|
||||
confDomainString : ipString : portString : _ <- getArgs
|
||||
confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs
|
||||
pure $ FediChordConf {
|
||||
confDomain = confDomainString
|
||||
, confIP = toHostAddress6 . read $ ipString
|
||||
, confDhtPort = read portString
|
||||
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue