Compare commits

...

3 commits

5 changed files with 179 additions and 168 deletions

View file

@ -10,8 +10,8 @@ Request ::= SEQUENCE {
action Action, action Action,
requestID INTEGER, requestID INTEGER,
sender NodeState, sender NodeState,
parts INTEGER (0..150), -- number of message parts parts INTEGER (1..150), -- number of message parts
part INTEGER (0..150), -- part number of this message, starts at 1 part INTEGER (1..150), -- part number of this message, starts at 1
actionPayload CHOICE { actionPayload CHOICE {
queryIDRequestPayload QueryIDRequestPayload, queryIDRequestPayload QueryIDRequestPayload,
joinRequestPayload JoinRequestPayload, joinRequestPayload JoinRequestPayload,

View file

@ -3,7 +3,7 @@
module Hash2Pub.ASN1Coding where module Hash2Pub.ASN1Coding where
import Control.Exception (displayException) import Control.Exception (displayException)
import Data.ASN1.BinaryEncoding -- asn1-encoding package import Data.ASN1.BinaryEncoding
import Data.ASN1.Encoding import Data.ASN1.Encoding
import Data.ASN1.Error () import Data.ASN1.Error ()
import Data.ASN1.Parse import Data.ASN1.Parse
@ -17,8 +17,8 @@ import Data.Time.Clock.POSIX ()
import Safe import Safe
import Hash2Pub.FediChord import Hash2Pub.FediChord
import Hash2Pub.Utils
import Hash2Pub.ProtocolTypes import Hash2Pub.ProtocolTypes
import Hash2Pub.Utils
import Debug.Trace import Debug.Trace

View file

@ -15,29 +15,31 @@ module Hash2Pub.DHTProtocol
) )
where where
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TBQueue
import Control.Concurrent.STM.TBQueue import Control.Concurrent.STM.TQueue
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import qualified Data.Map as Map import qualified Data.Map as Map
import Data.Maybe (fromMaybe, maybe) import Data.Maybe (fromMaybe, maybe)
import qualified Data.Set as Set import qualified Data.Set as Set
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Network.Socket hiding (recv, recvFrom, send, sendTo) import Network.Socket hiding (recv, recvFrom, send,
sendTo)
import Network.Socket.ByteString import Network.Socket.ByteString
import System.Timeout import System.Timeout
import Hash2Pub.ASN1Coding import Hash2Pub.ASN1Coding
import Hash2Pub.FediChord (CacheEntry (..), NodeCache, NodeID, import Hash2Pub.FediChord (CacheEntry (..), NodeCache,
NodeState (..), NodeID, NodeState (..),
cacheGetNodeStateUnvalidated, cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred, cacheLookup, cacheLookupPred,
cacheLookupSucc, getPredecessors, cacheLookupSucc,
getSuccessors, localCompare, getPredecessors, getSuccessors,
putPredecessors, putSuccessors) localCompare, putPredecessors,
putSuccessors)
import Hash2Pub.ProtocolTypes import Hash2Pub.ProtocolTypes
import Debug.Trace (trace) import Debug.Trace (trace)
-- === queries === -- === queries ===
@ -121,41 +123,71 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
-- ====== message send and receive operations ====== -- ====== message send and receive operations ======
requestQueryID :: NodeState -> NodeID -> IO NodeState --requestQueryID :: NodeState -> NodeID -> IO NodeState
-- 1. do a local lookup for the l closest nodes ---- 1. do a local lookup for the l closest nodes
-- 2. create l sockets ---- 2. create l sockets
-- 3. send a message async concurrently to all l nodes ---- 3. send a message async concurrently to all l nodes
-- 4. collect the results, insert them into cache ---- 4. collect the results, insert them into cache
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) ---- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
requestQueryID ns targetID = do --requestQueryID ns targetID = do
cacheSnapshot <- readIORef $ getNodeCacheRef ns -- cacheSnapshot <- readIORef $ getNodeCacheRef ns
let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID -- let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
-- FOUND can only be returned if targetID is owned by local node -- -- FOUND can only be returned if targetID is owned by local node
case localResult of -- case localResult of
FOUND thisNode -> return thisNode -- FOUND thisNode -> return thisNode
FORWARD nodeSet -> -- FORWARD nodeSet ->
sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet -- sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
responses = mapM -- responses = mapM
-- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestTo :: Int -- ^ timeout in seconds sendRequestTo :: Int -- ^ timeout in seconds
-> Int -- ^ number of retries -> Int -- ^ number of retries
-> FediChordMessage -- ^ the message to be sent -> FediChordMessage -- ^ the message to be sent
-> Socket -- ^ connected socket to use for sending -> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses -> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestTo timeout attempts msg sock = do sendRequestTo timeoutMillis numAttempts msg sock = do
let requests = serialiseMessage 1200 msg let requests = serialiseMessage 1200 msg
-- create a queue for passing received response messages back, even after a timeout
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
-- start sendAndAck with timeout
-- ToDo: make attempts and timeout configurable -- ToDo: make attempts and timeout configurable
attempts 3 . timeout 5000 $ do attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
recvdParts <- atomically $ flushTBQueue responseQ
-- PLACEHOLDER
pure Set.empty
where where
-- state reingeben: state = noch nicht geackte messages, result = responses -- state reingeben: state = noch nicht geackte messages, result = responses
sendAndAck :: Socket -> StateT (Map.Map Integer BS.ByteString) IO (Set.Set FediChordMessage) sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
sendAndAck sock = do -> Socket -- ^ the socket used for sending and receiving for this particular remote node
remainingSends <- get -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> IO ()
sendAndAck responseQueue sock remainingSends = do
sendMany sock $ Map.elems remainingSends sendMany sock $ Map.elems remainingSends
-- timeout pro receive socket, danach catMaybes -- if all requests have been acked/ responded to, return prematurely
-- wichtig: Pakete können dupliziert werden, dh es können mehr ACKs als gesendete parts ankommen recvLoop responseQueue remainingSends Set.empty
replicateM recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> Set.Set Integer -- ^ already received response part numbers
-> IO ()
recvLoop responseQueue remainingSends' receivedPartNums = do
-- 65535 is maximum length of UDP packets, as long as
-- no IPv6 jumbograms are used
response <- deserialiseMessage <$> recv sock 65535
case response of
-- drop errors
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums
Right msg -> do
atomically $ writeTBQueue responseQueue msg
let
newRemaining = Map.delete (part msg) remainingSends'
newReceivedParts = Set.insert (part msg) receivedPartNums
-- ToDo: handle responses with more parts than the request
if Map.null newRemaining && Set.size receivedPartNums == fromIntegral (parts msg)
then pure ()
else recvLoop responseQueue newRemaining receivedPartNums

View file

@ -122,58 +122,43 @@ a `localCompare` b
-- | represents a node and all its important state -- | represents a node and all its important state
data NodeState = NodeState { data NodeState = NodeState
nid :: NodeID { nid :: NodeID
, domain :: String , domain :: String
-- ^ full public domain name the node is reachable under -- ^ full public domain name the node is reachable under
, ipAddr :: HostAddress6 , ipAddr :: HostAddress6
-- the node's public IPv6 address -- the node's public IPv6 address
, dhtPort :: PortNumber , dhtPort :: PortNumber
-- ^ port of the DHT itself -- ^ port of the DHT itself
, apPort :: Maybe PortNumber , apPort :: Maybe PortNumber
-- ^ port of the ActivityPub relay and storage service -- ^ port of the ActivityPub relay and storage service
-- might have to be queried first , vServerID :: Integer
, vServerID :: Integer -- ^ ID of this vserver
-- ^ ID of this vserver , internals :: Maybe InternalNodeState
-- ^ data not present in the representation of remote nodes
-- ==== internal state ==== }
, internals :: Maybe InternalNodeState deriving (Show, Eq)
-- ^ data not present in the representation of remote nodes
-- is put into its own type.
-- This is usually @Nothing@ for all remote nodes.
} deriving (Show, Eq)
-- | encapsulates all data and parameters that are not present for remote nodes -- | encapsulates all data and parameters that are not present for remote nodes
data InternalNodeState = InternalNodeState { data InternalNodeState = InternalNodeState
nodeCache :: IORef NodeCache { nodeCache :: IORef NodeCache
-- ^ EpiChord node cache with expiry times for nodes -- ^ EpiChord node cache with expiry times for nodes
-- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@. , cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- encapsulated into an IORef for allowing concurrent reads without locking -- ^ cache updates are not written directly to the 'nodeCache' but queued and
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache) , successors :: [NodeID] -- could be a set instead as these are ordered as well
-- ^ cache updates are not written directly to the 'nodeCache' but queued and -- ^ successor nodes in ascending order by distance
-- only processed by a single writer thread to prevent lost updates. , predecessors :: [NodeID]
-- All nodeCache modifying functions have to be partially applied enough before -- ^ predecessor nodes in ascending order by distance
-- being put into the queue. , kNeighbours :: Int
-- -- ^ desired length of predecessor and successor list
, successors :: [NodeID] -- could be a set instead as these are ordered as well , lNumBestNodes :: Int
-- ^ successor nodes in ascending order by distance -- ^ number of best next hops to provide
, predecessors :: [NodeID] , pNumParallelQueries :: Int
-- ^ predecessor nodes in ascending order by distance -- ^ number of parallel sent queries
----- protocol parameters ----- , jEntriesPerSlice :: Int
-- TODO: evaluate moving these somewhere else -- ^ number of desired entries per cache slice
, kNeighbours :: Int }
-- ^ desired length of predecessor and successor list deriving (Show, Eq)
-- needs to be parameterisable for simulation purposes
, lNumBestNodes :: Int
-- ^ number of best next hops to provide
-- needs to be parameterisable for simulation purposes
, pNumParallelQueries :: Int
-- ^ number of parallel sent queries
-- needs to be parameterisable for simulation purposes
, jEntriesPerSlice :: Int
-- ^ number of desired entries per cache slice
-- needs to be parameterisable for simulation purposes
} deriving (Show, Eq)
-- | defining Show instances to be able to print NodeState for debug purposes -- | defining Show instances to be able to print NodeState for debug purposes
instance Typeable a => Show (IORef a) where instance Typeable a => Show (IORef a) where
@ -230,12 +215,8 @@ getLNumBestNodes = getInternals_ lNumBestNodes
type NodeCache = Map.Map NodeID CacheEntry type NodeCache = Map.Map NodeID CacheEntry
-- |an entry of the 'nodeCache' can hold 2 different kinds of data -- |an entry of the 'nodeCache' can hold 2 different kinds of data
data CacheEntry = data CacheEntry = NodeEntry Bool NodeState POSIXTime
-- | an entry representing its validation status, the node state and its timestamp | ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry)
NodeEntry Bool NodeState POSIXTime
-- | a proxy field for closing the ring structure, indicating the lookup shall be
-- resumed at the given @NodeID@ unless the @ProxyEntry@ itself holds a @NodeEntry@
| ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry)
deriving (Show, Eq) deriving (Show, Eq)
-- | as a compromise, only NodeEntry components are ordered by their NodeID -- | as a compromise, only NodeEntry components are ordered by their NodeID
@ -247,7 +228,9 @@ instance Ord CacheEntry where
extractID (NodeEntry _ eState _) = nid eState extractID (NodeEntry _ eState _) = nid eState
extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache" extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache"
data ProxyDirection = Backwards | Forwards deriving (Show, Eq) data ProxyDirection = Backwards
| Forwards
deriving (Show, Eq)
instance Enum ProxyDirection where instance Enum ProxyDirection where
toEnum (-1) = Backwards toEnum (-1) = Backwards
@ -430,11 +413,12 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
-- persist them on disk so they can be used for all following bootstraps -- persist them on disk so they can be used for all following bootstraps
-- | configuration values used for initialising the FediChord DHT -- | configuration values used for initialising the FediChord DHT
data FediChordConf = FediChordConf { data FediChordConf = FediChordConf
confDomain :: String { confDomain :: String
, confIP :: HostAddress6 , confIP :: HostAddress6
, confDhtPort :: Int , confDhtPort :: Int
} deriving (Show, Eq) }
deriving (Show, Eq)
-- | initialise data structures, compute own IDs and bind to listening socket -- | initialise data structures, compute own IDs and bind to listening socket
-- ToDo: load persisted state, thus this function already operates in IO -- ToDo: load persisted state, thus this function already operates in IO

View file

@ -1,74 +1,69 @@
module Hash2Pub.ProtocolTypes where module Hash2Pub.ProtocolTypes where
import qualified Data.Set as Set import qualified Data.Set as Set
import Data.Time.Clock.POSIX (POSIXTime) import Data.Time.Clock.POSIX (POSIXTime)
import Hash2Pub.FediChord import Hash2Pub.FediChord
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^return closest nodes from local cache. data QueryResponse = FORWARD (Set.Set RemoteCacheEntry)
-- whole cache entry is returned for making | FOUND NodeState
-- the entry time stamp available to the deriving (Show, Eq)
-- protocol serialiser
| FOUND NodeState -- ^node is the responsible node for queried ID
deriving (Show, Eq)
-- === protocol serialisation data types -- === protocol serialisation data types
data Action = data Action = QueryID
QueryID | Join
| Join | Leave
| Leave | Stabilise
| Stabilise | Ping
| Ping deriving (Show, Eq, Enum)
deriving (Show, Eq, Enum)
data FediChordMessage = data FediChordMessage = Request
Request { { requestID :: Integer
requestID :: Integer , sender :: NodeState
, sender :: NodeState , parts :: Integer
, parts :: Integer , part :: Integer
, part :: Integer -- ^ part starts at 1
-- ^ part starts at 0 , action :: Action
, action :: Action , payload :: Maybe ActionPayload
, payload :: Maybe ActionPayload }
} | Response
| Response { { responseTo :: Integer
responseTo :: Integer , senderID :: NodeID
, senderID :: NodeID , parts :: Integer
, parts :: Integer , part :: Integer
, part :: Integer , action :: Action
, action :: Action , payload :: Maybe ActionPayload
, payload :: Maybe ActionPayload }
} deriving (Show, Eq) deriving (Show, Eq)
data ActionPayload = data ActionPayload = QueryIDRequestPayload
QueryIDRequestPayload { { queryTargetID :: NodeID
queryTargetID :: NodeID , queryLBestNodes :: Integer
, queryLBestNodes :: Integer }
} | JoinRequestPayload
| JoinRequestPayload | LeaveRequestPayload
| LeaveRequestPayload { { leaveSuccessors :: [NodeID]
leaveSuccessors :: [NodeID] , leavePredecessors :: [NodeID]
, leavePredecessors :: [NodeID] }
} | StabiliseRequestPayload
| StabiliseRequestPayload | PingRequestPayload
| PingRequestPayload | QueryIDResponsePayload
| QueryIDResponsePayload { { queryResult :: QueryResponse
queryResult :: QueryResponse }
} | JoinResponsePayload
| JoinResponsePayload { { joinSuccessors :: [NodeID]
joinSuccessors :: [NodeID] , joinPredecessors :: [NodeID]
, joinPredecessors :: [NodeID] , joinCache :: [RemoteCacheEntry]
, joinCache :: [RemoteCacheEntry] }
} | LeaveResponsePayload
| LeaveResponsePayload | StabiliseResponsePayload
| StabiliseResponsePayload { { stabiliseSuccessors :: [NodeID]
stabiliseSuccessors :: [NodeID] , stabilisePredecessors :: [NodeID]
, stabilisePredecessors :: [NodeID] }
} | PingResponsePayload
| PingResponsePayload { { pingNodeStates :: [NodeState]
pingNodeStates :: [NodeState] }
}
deriving (Show, Eq) deriving (Show, Eq)
-- | global limit of parts per message used when (de)serialising messages. -- | global limit of parts per message used when (de)serialising messages.