Compare commits

..

No commits in common. "beffab99a0b11a7a01553541a4d64253845a83ca" and "0e6f126b3ba69f8211b7135bbdce3afdd0b63c2e" have entirely different histories.

5 changed files with 168 additions and 179 deletions

View file

@ -10,8 +10,8 @@ Request ::= SEQUENCE {
action Action, action Action,
requestID INTEGER, requestID INTEGER,
sender NodeState, sender NodeState,
parts INTEGER (1..150), -- number of message parts parts INTEGER (0..150), -- number of message parts
part INTEGER (1..150), -- part number of this message, starts at 1 part INTEGER (0..150), -- part number of this message, starts at 1
actionPayload CHOICE { actionPayload CHOICE {
queryIDRequestPayload QueryIDRequestPayload, queryIDRequestPayload QueryIDRequestPayload,
joinRequestPayload JoinRequestPayload, joinRequestPayload JoinRequestPayload,

View file

@ -3,7 +3,7 @@
module Hash2Pub.ASN1Coding where module Hash2Pub.ASN1Coding where
import Control.Exception (displayException) import Control.Exception (displayException)
import Data.ASN1.BinaryEncoding import Data.ASN1.BinaryEncoding -- asn1-encoding package
import Data.ASN1.Encoding import Data.ASN1.Encoding
import Data.ASN1.Error () import Data.ASN1.Error ()
import Data.ASN1.Parse import Data.ASN1.Parse
@ -17,8 +17,8 @@ import Data.Time.Clock.POSIX ()
import Safe import Safe
import Hash2Pub.FediChord import Hash2Pub.FediChord
import Hash2Pub.ProtocolTypes
import Hash2Pub.Utils import Hash2Pub.Utils
import Hash2Pub.ProtocolTypes
import Debug.Trace import Debug.Trace

View file

@ -15,31 +15,29 @@ module Hash2Pub.DHTProtocol
) )
where where
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TBQueue
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import qualified Data.Map as Map import qualified Data.Map as Map
import Data.Maybe (fromMaybe, maybe) import Data.Maybe (fromMaybe, maybe)
import qualified Data.Set as Set import qualified Data.Set as Set
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Network.Socket hiding (recv, recvFrom, send, import Network.Socket hiding (recv, recvFrom, send, sendTo)
sendTo)
import Network.Socket.ByteString import Network.Socket.ByteString
import System.Timeout import System.Timeout
import Hash2Pub.ASN1Coding import Hash2Pub.ASN1Coding
import Hash2Pub.FediChord (CacheEntry (..), NodeCache, import Hash2Pub.FediChord (CacheEntry (..), NodeCache, NodeID,
NodeID, NodeState (..), NodeState (..),
cacheGetNodeStateUnvalidated, cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred, cacheLookup, cacheLookupPred,
cacheLookupSucc, cacheLookupSucc, getPredecessors,
getPredecessors, getSuccessors, getSuccessors, localCompare,
localCompare, putPredecessors, putPredecessors, putSuccessors)
putSuccessors)
import Hash2Pub.ProtocolTypes import Hash2Pub.ProtocolTypes
import Debug.Trace (trace) import Debug.Trace (trace)
-- === queries === -- === queries ===
@ -123,71 +121,41 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
-- ====== message send and receive operations ====== -- ====== message send and receive operations ======
--requestQueryID :: NodeState -> NodeID -> IO NodeState requestQueryID :: NodeState -> NodeID -> IO NodeState
---- 1. do a local lookup for the l closest nodes -- 1. do a local lookup for the l closest nodes
---- 2. create l sockets -- 2. create l sockets
---- 3. send a message async concurrently to all l nodes -- 3. send a message async concurrently to all l nodes
---- 4. collect the results, insert them into cache -- 4. collect the results, insert them into cache
---- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) -- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
--requestQueryID ns targetID = do requestQueryID ns targetID = do
-- cacheSnapshot <- readIORef $ getNodeCacheRef ns cacheSnapshot <- readIORef $ getNodeCacheRef ns
-- let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
-- -- FOUND can only be returned if targetID is owned by local node -- FOUND can only be returned if targetID is owned by local node
-- case localResult of case localResult of
-- FOUND thisNode -> return thisNode FOUND thisNode -> return thisNode
-- FORWARD nodeSet -> FORWARD nodeSet ->
-- sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
-- -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
-- responses = mapM responses = mapM
-- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestTo :: Int -- ^ timeout in seconds sendRequestTo :: Int -- ^ timeout in seconds
-> Int -- ^ number of retries -> Int -- ^ number of retries
-> FediChordMessage -- ^ the message to be sent -> FediChordMessage -- ^ the message to be sent
-> Socket -- ^ connected socket to use for sending -> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses -> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestTo timeoutMillis numAttempts msg sock = do sendRequestTo timeout attempts msg sock = do
let requests = serialiseMessage 1200 msg let requests = serialiseMessage 1200 msg
-- create a queue for passing received response messages back, even after a timeout
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
-- start sendAndAck with timeout
-- ToDo: make attempts and timeout configurable -- ToDo: make attempts and timeout configurable
attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests attempts 3 . timeout 5000 $ do
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
recvdParts <- atomically $ flushTBQueue responseQ
-- PLACEHOLDER
pure Set.empty
where where
-- state reingeben: state = noch nicht geackte messages, result = responses -- state reingeben: state = noch nicht geackte messages, result = responses
sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses sendAndAck :: Socket -> StateT (Map.Map Integer BS.ByteString) IO (Set.Set FediChordMessage)
-> Socket -- ^ the socket used for sending and receiving for this particular remote node sendAndAck sock = do
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts remainingSends <- get
-> IO ()
sendAndAck responseQueue sock remainingSends = do
sendMany sock $ Map.elems remainingSends sendMany sock $ Map.elems remainingSends
-- if all requests have been acked/ responded to, return prematurely -- timeout pro receive socket, danach catMaybes
recvLoop responseQueue remainingSends Set.empty -- wichtig: Pakete können dupliziert werden, dh es können mehr ACKs als gesendete parts ankommen
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses replicateM
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> Set.Set Integer -- ^ already received response part numbers
-> IO ()
recvLoop responseQueue remainingSends' receivedPartNums = do
-- 65535 is maximum length of UDP packets, as long as
-- no IPv6 jumbograms are used
response <- deserialiseMessage <$> recv sock 65535
case response of
-- drop errors
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums
Right msg -> do
atomically $ writeTBQueue responseQueue msg
let
newRemaining = Map.delete (part msg) remainingSends'
newReceivedParts = Set.insert (part msg) receivedPartNums
-- ToDo: handle responses with more parts than the request
if Map.null newRemaining && Set.size receivedPartNums == fromIntegral (parts msg)
then pure ()
else recvLoop responseQueue newRemaining receivedPartNums

View file

@ -122,43 +122,58 @@ a `localCompare` b
-- | represents a node and all its important state -- | represents a node and all its important state
data NodeState = NodeState data NodeState = NodeState {
{ nid :: NodeID nid :: NodeID
, domain :: String , domain :: String
-- ^ full public domain name the node is reachable under -- ^ full public domain name the node is reachable under
, ipAddr :: HostAddress6 , ipAddr :: HostAddress6
-- the node's public IPv6 address -- the node's public IPv6 address
, dhtPort :: PortNumber , dhtPort :: PortNumber
-- ^ port of the DHT itself -- ^ port of the DHT itself
, apPort :: Maybe PortNumber , apPort :: Maybe PortNumber
-- ^ port of the ActivityPub relay and storage service -- ^ port of the ActivityPub relay and storage service
, vServerID :: Integer -- might have to be queried first
-- ^ ID of this vserver , vServerID :: Integer
, internals :: Maybe InternalNodeState -- ^ ID of this vserver
-- ^ data not present in the representation of remote nodes
} -- ==== internal state ====
deriving (Show, Eq) , internals :: Maybe InternalNodeState
-- ^ data not present in the representation of remote nodes
-- is put into its own type.
-- This is usually @Nothing@ for all remote nodes.
} deriving (Show, Eq)
-- | encapsulates all data and parameters that are not present for remote nodes -- | encapsulates all data and parameters that are not present for remote nodes
data InternalNodeState = InternalNodeState data InternalNodeState = InternalNodeState {
{ nodeCache :: IORef NodeCache nodeCache :: IORef NodeCache
-- ^ EpiChord node cache with expiry times for nodes -- ^ EpiChord node cache with expiry times for nodes
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache) -- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@.
-- ^ cache updates are not written directly to the 'nodeCache' but queued and -- encapsulated into an IORef for allowing concurrent reads without locking
, successors :: [NodeID] -- could be a set instead as these are ordered as well , cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ successor nodes in ascending order by distance -- ^ cache updates are not written directly to the 'nodeCache' but queued and
, predecessors :: [NodeID] -- only processed by a single writer thread to prevent lost updates.
-- ^ predecessor nodes in ascending order by distance -- All nodeCache modifying functions have to be partially applied enough before
, kNeighbours :: Int -- being put into the queue.
-- ^ desired length of predecessor and successor list --
, lNumBestNodes :: Int , successors :: [NodeID] -- could be a set instead as these are ordered as well
-- ^ number of best next hops to provide -- ^ successor nodes in ascending order by distance
, pNumParallelQueries :: Int , predecessors :: [NodeID]
-- ^ number of parallel sent queries -- ^ predecessor nodes in ascending order by distance
, jEntriesPerSlice :: Int ----- protocol parameters -----
-- ^ number of desired entries per cache slice -- TODO: evaluate moving these somewhere else
} , kNeighbours :: Int
deriving (Show, Eq) -- ^ desired length of predecessor and successor list
-- needs to be parameterisable for simulation purposes
, lNumBestNodes :: Int
-- ^ number of best next hops to provide
-- needs to be parameterisable for simulation purposes
, pNumParallelQueries :: Int
-- ^ number of parallel sent queries
-- needs to be parameterisable for simulation purposes
, jEntriesPerSlice :: Int
-- ^ number of desired entries per cache slice
-- needs to be parameterisable for simulation purposes
} deriving (Show, Eq)
-- | defining Show instances to be able to print NodeState for debug purposes -- | defining Show instances to be able to print NodeState for debug purposes
instance Typeable a => Show (IORef a) where instance Typeable a => Show (IORef a) where
@ -215,8 +230,12 @@ getLNumBestNodes = getInternals_ lNumBestNodes
type NodeCache = Map.Map NodeID CacheEntry type NodeCache = Map.Map NodeID CacheEntry
-- |an entry of the 'nodeCache' can hold 2 different kinds of data -- |an entry of the 'nodeCache' can hold 2 different kinds of data
data CacheEntry = NodeEntry Bool NodeState POSIXTime data CacheEntry =
| ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry) -- | an entry representing its validation status, the node state and its timestamp
NodeEntry Bool NodeState POSIXTime
-- | a proxy field for closing the ring structure, indicating the lookup shall be
-- resumed at the given @NodeID@ unless the @ProxyEntry@ itself holds a @NodeEntry@
| ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry)
deriving (Show, Eq) deriving (Show, Eq)
-- | as a compromise, only NodeEntry components are ordered by their NodeID -- | as a compromise, only NodeEntry components are ordered by their NodeID
@ -228,9 +247,7 @@ instance Ord CacheEntry where
extractID (NodeEntry _ eState _) = nid eState extractID (NodeEntry _ eState _) = nid eState
extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache" extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache"
data ProxyDirection = Backwards data ProxyDirection = Backwards | Forwards deriving (Show, Eq)
| Forwards
deriving (Show, Eq)
instance Enum ProxyDirection where instance Enum ProxyDirection where
toEnum (-1) = Backwards toEnum (-1) = Backwards
@ -413,12 +430,11 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
-- persist them on disk so they can be used for all following bootstraps -- persist them on disk so they can be used for all following bootstraps
-- | configuration values used for initialising the FediChord DHT -- | configuration values used for initialising the FediChord DHT
data FediChordConf = FediChordConf data FediChordConf = FediChordConf {
{ confDomain :: String confDomain :: String
, confIP :: HostAddress6 , confIP :: HostAddress6
, confDhtPort :: Int , confDhtPort :: Int
} } deriving (Show, Eq)
deriving (Show, Eq)
-- | initialise data structures, compute own IDs and bind to listening socket -- | initialise data structures, compute own IDs and bind to listening socket
-- ToDo: load persisted state, thus this function already operates in IO -- ToDo: load persisted state, thus this function already operates in IO

View file

@ -1,69 +1,74 @@
module Hash2Pub.ProtocolTypes where module Hash2Pub.ProtocolTypes where
import qualified Data.Set as Set import qualified Data.Set as Set
import Data.Time.Clock.POSIX (POSIXTime) import Data.Time.Clock.POSIX (POSIXTime)
import Hash2Pub.FediChord import Hash2Pub.FediChord
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^return closest nodes from local cache.
| FOUND NodeState -- whole cache entry is returned for making
deriving (Show, Eq) -- the entry time stamp available to the
-- protocol serialiser
| FOUND NodeState -- ^node is the responsible node for queried ID
deriving (Show, Eq)
-- === protocol serialisation data types -- === protocol serialisation data types
data Action = QueryID data Action =
| Join QueryID
| Leave | Join
| Stabilise | Leave
| Ping | Stabilise
deriving (Show, Eq, Enum) | Ping
deriving (Show, Eq, Enum)
data FediChordMessage = Request data FediChordMessage =
{ requestID :: Integer Request {
, sender :: NodeState requestID :: Integer
, parts :: Integer , sender :: NodeState
, part :: Integer , parts :: Integer
-- ^ part starts at 1 , part :: Integer
, action :: Action -- ^ part starts at 0
, payload :: Maybe ActionPayload , action :: Action
} , payload :: Maybe ActionPayload
| Response }
{ responseTo :: Integer | Response {
, senderID :: NodeID responseTo :: Integer
, parts :: Integer , senderID :: NodeID
, part :: Integer , parts :: Integer
, action :: Action , part :: Integer
, payload :: Maybe ActionPayload , action :: Action
} , payload :: Maybe ActionPayload
deriving (Show, Eq) } deriving (Show, Eq)
data ActionPayload = QueryIDRequestPayload data ActionPayload =
{ queryTargetID :: NodeID QueryIDRequestPayload {
, queryLBestNodes :: Integer queryTargetID :: NodeID
} , queryLBestNodes :: Integer
| JoinRequestPayload }
| LeaveRequestPayload | JoinRequestPayload
{ leaveSuccessors :: [NodeID] | LeaveRequestPayload {
, leavePredecessors :: [NodeID] leaveSuccessors :: [NodeID]
} , leavePredecessors :: [NodeID]
| StabiliseRequestPayload }
| PingRequestPayload | StabiliseRequestPayload
| QueryIDResponsePayload | PingRequestPayload
{ queryResult :: QueryResponse | QueryIDResponsePayload {
} queryResult :: QueryResponse
| JoinResponsePayload }
{ joinSuccessors :: [NodeID] | JoinResponsePayload {
, joinPredecessors :: [NodeID] joinSuccessors :: [NodeID]
, joinCache :: [RemoteCacheEntry] , joinPredecessors :: [NodeID]
} , joinCache :: [RemoteCacheEntry]
| LeaveResponsePayload }
| StabiliseResponsePayload | LeaveResponsePayload
{ stabiliseSuccessors :: [NodeID] | StabiliseResponsePayload {
, stabilisePredecessors :: [NodeID] stabiliseSuccessors :: [NodeID]
} , stabilisePredecessors :: [NodeID]
| PingResponsePayload }
{ pingNodeStates :: [NodeState] | PingResponsePayload {
} pingNodeStates :: [NodeState]
}
deriving (Show, Eq) deriving (Show, Eq)
-- | global limit of parts per message used when (de)serialising messages. -- | global limit of parts per message used when (de)serialising messages.