Compare commits

..

No commits in common. "99a2b0ba09f3fb3e88f9abd020cde3a03a6501a4" and "beffab99a0b11a7a01553541a4d64253845a83ca" have entirely different histories.

6 changed files with 42 additions and 85 deletions

View file

@ -8,7 +8,7 @@ Action ::= ENUMERATED {queryID, join, leave, stabilise, ping}
Request ::= SEQUENCE { Request ::= SEQUENCE {
action Action, action Action,
requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer requestID INTEGER,
sender NodeState, sender NodeState,
parts INTEGER (1..150), -- number of message parts parts INTEGER (1..150), -- number of message parts
part INTEGER (1..150), -- part number of this message, starts at 1 part INTEGER (1..150), -- part number of this message, starts at 1
@ -25,7 +25,7 @@ Request ::= SEQUENCE {
-- request and response instead of explicit flag -- request and response instead of explicit flag
Response ::= SEQUENCE { Response ::= SEQUENCE {
responseTo INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer responseTo INTEGER,
senderID NodeID, senderID NodeID,
parts INTEGER (0..150), parts INTEGER (0..150),
part INTEGER (0..150), part INTEGER (0..150),

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md extra-source-files: CHANGELOG.md
common deps common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl
ghc-options: -Wall ghc-options: -Wall

View file

@ -7,7 +7,7 @@ module Hash2Pub.DHTProtocol
, markCacheEntryAsVerified , markCacheEntryAsVerified
, RemoteCacheEntry(..) , RemoteCacheEntry(..)
, toRemoteCacheEntry , toRemoteCacheEntry
, remoteNode , remoteNode_
, Action(..) , Action(..)
, ActionPayload(..) , ActionPayload(..)
, FediChordMessage(..) , FediChordMessage(..)
@ -15,25 +15,17 @@ module Hash2Pub.DHTProtocol
) )
where where
import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue import Control.Concurrent.STM.TBQueue
import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TQueue
import Control.Monad (foldM, forM, forM_)
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import Data.Either (rights)
import Data.Foldable (foldl')
import Data.IORef
import qualified Data.Map as Map import qualified Data.Map as Map
import Data.Maybe (fromJust, fromMaybe, mapMaybe, import Data.Maybe (fromMaybe, maybe)
maybe)
import qualified Data.Set as Set import qualified Data.Set as Set
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Network.Socket hiding (recv, recvFrom, send, import Network.Socket hiding (recv, recvFrom, send,
sendTo) sendTo)
import Network.Socket.ByteString import Network.Socket.ByteString
import Safe
import System.Random
import System.Timeout import System.Timeout
import Hash2Pub.ASN1Coding import Hash2Pub.ASN1Coding
@ -42,13 +34,9 @@ import Hash2Pub.FediChord (CacheEntry (..), NodeCache,
cacheGetNodeStateUnvalidated, cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred, cacheLookup, cacheLookupPred,
cacheLookupSucc, cacheLookupSucc,
getCacheWriteQueue,
getLNumBestNodes,
getNodeCacheRef,
getPredecessors, getSuccessors, getPredecessors, getSuccessors,
localCompare, mkSendSocket, localCompare, putPredecessors,
mkServerSocket, putSuccessors)
putPredecessors, putSuccessors)
import Hash2Pub.ProtocolTypes import Hash2Pub.ProtocolTypes
import Debug.Trace (trace) import Debug.Trace (trace)
@ -135,71 +123,36 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
-- ====== message send and receive operations ====== -- ====== message send and receive operations ======
requestQueryID :: NodeState -> NodeID -> IO NodeState --requestQueryID :: NodeState -> NodeID -> IO NodeState
-- 1. do a local lookup for the l closest nodes ---- 1. do a local lookup for the l closest nodes
-- 2. create l sockets ---- 2. create l sockets
-- 3. send a message async concurrently to all l nodes ---- 3. send a message async concurrently to all l nodes
-- 4. collect the results, insert them into cache ---- 4. collect the results, insert them into cache
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) ---- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
requestQueryID ns targetID = do --requestQueryID ns targetID = do
firstCacheSnapshot <- readIORef $ fromJust . getNodeCacheRef $ ns -- cacheSnapshot <- readIORef $ getNodeCacheRef ns
lookupLoop firstCacheSnapshot -- let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
where -- -- FOUND can only be returned if targetID is owned by local node
lookupLoop :: NodeCache -> IO NodeState -- case localResult of
lookupLoop cacheSnapshot = do -- FOUND thisNode -> return thisNode
let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID -- FORWARD nodeSet ->
-- FOUND can only be returned if targetID is owned by local node -- sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
case localResult of -- -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
FOUND thisNode -> pure thisNode -- responses = mapM
FORWARD nodeSet -> do
-- create connected sockets to all query targets
sockets <- mapM (\resultNode -> mkSendSocket (domain resultNode) (dhtPort resultNode)) $ remoteNode <$> Set.toList nodeSet
-- ToDo: make attempts and timeout configurable
queryThreads <- mapM (async . sendRequestTo 5000 3 (lookupMessage targetID)) sockets
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
-- ToDo: exception handling, maybe log them
responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads
-- insert new cache entries both into global cache as well as in local copy, to make sure it is already up to date at next lookup
now <- getPOSIXTime
newLCache <- foldM (\oldCache resp -> do
let entriesToInsert = case queryResult <$> payload resp of
Just (FOUND result1) -> [addCacheEntryPure now (RemoteCacheEntry result1 now)]
Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset
_ -> []
-- forward entries to global cache
forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (fromJust . getCacheWriteQueue $ ns) entry
-- insert entries into local cache copy
pure $ foldl' (
\oldLCache insertFunc -> insertFunc oldLCache
) oldCache entriesToInsert
) cacheSnapshot responses
-- check for a FOUND and return it
let foundResp = headMay . mapMaybe (\resp -> case queryResult <$> payload resp of
Just (FOUND ns') -> Just ns'
_ -> Nothing
) $ responses
-- if no FOUND, recursively call lookup again
maybe (lookupLoop newLCache) pure foundResp
-- todo: random request ID
lookupMessage targetID rID = Request rID ns 1 1 QueryID (Just $ pl ns targetID)
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . fromJust . getLNumBestNodes $ ns }
-- | Generic function for sending a request over a connected socket and collecting the response. -- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestTo :: Int -- ^ timeout in seconds sendRequestTo :: Int -- ^ timeout in seconds
-> Int -- ^ number of retries -> Int -- ^ number of retries
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID -> FediChordMessage -- ^ the message to be sent
-> Socket -- ^ connected socket to use for sending -> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses -> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do sendRequestTo timeoutMillis numAttempts msg sock = do
-- give the message a random request ID let requests = serialiseMessage 1200 msg
randomID <- randomRIO (0, 2^32-1)
let requests = serialiseMessage 1200 $ msgIncomplete randomID
-- create a queue for passing received response messages back, even after a timeout -- create a queue for passing received response messages back, even after a timeout
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
-- start sendAndAck with timeout -- start sendAndAck with timeout
-- ToDo: make attempts and timeout configurable
attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
recvdParts <- atomically $ flushTBQueue responseQ recvdParts <- atomically $ flushTBQueue responseQ
@ -237,6 +190,13 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
else recvLoop responseQueue newRemaining receivedPartNums else recvLoop responseQueue newRemaining receivedPartNums
-- idea: send all parts at once
-- Set/ Map with unacked parts
-- then recv with timeout for |unackedParts| attempts, receive acked parts from set/ map
-- how to manage individual retries? nested "attempts"
-- | retry an IO action at most *i* times until it delivers a result -- | retry an IO action at most *i* times until it delivers a result
attempts :: Int -- ^ number of retries *i* attempts :: Int -- ^ number of retries *i*
-> IO (Maybe a) -- ^ action to retry -> IO (Maybe a) -- ^ action to retry

View file

@ -1,5 +1,4 @@
{-# LANGUAGE DataKinds #-} {-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{- | {- |
@ -25,7 +24,6 @@ module Hash2Pub.FediChord (
, getPredecessors , getPredecessors
, putPredecessors , putPredecessors
, getLNumBestNodes , getLNumBestNodes
, getCacheWriteQueue
, NodeCache , NodeCache
, CacheEntry(..) , CacheEntry(..)
, cacheGetNodeStateUnvalidated , cacheGetNodeStateUnvalidated
@ -45,7 +43,6 @@ module Hash2Pub.FediChord (
, fediChordInit , fediChordInit
, nodeStateInit , nodeStateInit
, mkServerSocket , mkServerSocket
, mkSendSocket
, resolve , resolve
, cacheWriter , cacheWriter
) where ) where
@ -85,7 +82,7 @@ idBits = 256
-- --
-- for being able to check value bounds, the constructor should not be used directly -- for being able to check value bounds, the constructor should not be used directly
-- and new values are created via @toNodeID@ (newtype constructors cannot be hidden) -- and new values are created via @toNodeID@ (newtype constructors cannot be hidden)
newtype NodeID = NodeID { getNodeID :: Integer } deriving stock (Show, Eq) deriving newtype Enum newtype NodeID = NodeID { getNodeID :: Integer } deriving (Eq, Show, Enum)
-- |smart data constructor for NodeID that throws a runtime exception for out-of-bounds values. -- |smart data constructor for NodeID that throws a runtime exception for out-of-bounds values.
-- When needing a runtime-safe constructor with drawbacks, try @fromInteger@ -- When needing a runtime-safe constructor with drawbacks, try @fromInteger@

View file

@ -84,6 +84,6 @@ toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts
toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry
toRemoteCacheEntry _ = Nothing toRemoteCacheEntry _ = Nothing
-- | extract the 'NodeState' from a 'RemoteCacheEntry' -- helper function for use in tests
remoteNode :: RemoteCacheEntry -> NodeState remoteNode_ :: RemoteCacheEntry -> NodeState
remoteNode (RemoteCacheEntry ns _) = ns remoteNode_ (RemoteCacheEntry ns _) = ns

View file

@ -138,12 +138,12 @@ spec = do
queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 2342) `shouldReturn` FORWARD Set.empty queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 2342) `shouldReturn` FORWARD Set.empty
it "works on a cache with less entries than needed" $ do it "works on a cache with less entries than needed" $ do
(FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith2Entries <*> pure 4 <*> pure (toNodeID 2^(9::Integer)+5) (FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith2Entries <*> pure 4 <*> pure (toNodeID 2^(9::Integer)+5)
Set.map (nid . remoteNode) nodeset `shouldBe` Set.fromList [ nid1, nid2 ] Set.map (nid . remoteNode_) nodeset `shouldBe` Set.fromList [ nid1, nid2 ]
it "works on a cache with sufficient entries" $ do it "works on a cache with sufficient entries" $ do
(FORWARD nodeset1) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) (FORWARD nodeset1) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5)
(FORWARD nodeset2) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2^(9::Integer)+5) (FORWARD nodeset2) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2^(9::Integer)+5)
Set.map (nid . remoteNode) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid3] Set.map (nid . remoteNode_) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid3]
Set.map (nid . remoteNode) nodeset2 `shouldBe` Set.fromList [nid4] Set.map (nid . remoteNode_) nodeset2 `shouldBe` Set.fromList [nid4]
it "recognises the node's own responsibility" $ do it "recognises the node's own responsibility" $ do
FOUND selfQueryRes <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure nid1 FOUND selfQueryRes <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure nid1
nid <$> node1 `shouldReturn` nid selfQueryRes nid <$> node1 `shouldReturn` nid selfQueryRes
@ -151,7 +151,7 @@ spec = do
nid <$> node1 `shouldReturn` nid responsibilityResult nid <$> node1 `shouldReturn` nid responsibilityResult
it "does not fail on nodes without neighbours (initial state)" $ do it "does not fail on nodes without neighbours (initial state)" $ do
(FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 11) (FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 11)
Set.map (nid . remoteNode ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3] Set.map (nid . remoteNode_ ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3]
describe "Messages can be encoded to and decoded from ASN.1" $ do describe "Messages can be encoded to and decoded from ASN.1" $ do
-- define test messages -- define test messages