diff --git a/FediChord.asn1 b/FediChord.asn1 index dda8bdc..7c53cb0 100644 --- a/FediChord.asn1 +++ b/FediChord.asn1 @@ -10,8 +10,8 @@ Request ::= SEQUENCE { action Action, requestID INTEGER, sender NodeState, - parts INTEGER (1..150), -- number of message parts - part INTEGER (1..150), -- part number of this message, starts at 1 + parts INTEGER (0..150), -- number of message parts + part INTEGER (0..150), -- part number of this message, starts at 1 actionPayload CHOICE { queryIDRequestPayload QueryIDRequestPayload, joinRequestPayload JoinRequestPayload, diff --git a/src/Hash2Pub/ASN1Coding.hs b/src/Hash2Pub/ASN1Coding.hs index fbb74cb..01fbb1d 100644 --- a/src/Hash2Pub/ASN1Coding.hs +++ b/src/Hash2Pub/ASN1Coding.hs @@ -3,7 +3,7 @@ module Hash2Pub.ASN1Coding where import Control.Exception (displayException) -import Data.ASN1.BinaryEncoding +import Data.ASN1.BinaryEncoding -- asn1-encoding package import Data.ASN1.Encoding import Data.ASN1.Error () import Data.ASN1.Parse @@ -17,8 +17,8 @@ import Data.Time.Clock.POSIX () import Safe import Hash2Pub.FediChord -import Hash2Pub.ProtocolTypes import Hash2Pub.Utils +import Hash2Pub.ProtocolTypes import Debug.Trace diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index ecc701c..486721c 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -15,31 +15,29 @@ module Hash2Pub.DHTProtocol ) where -import Control.Concurrent.STM -import Control.Concurrent.STM.TBQueue -import Control.Concurrent.STM.TQueue -import qualified Data.ByteString as BS -import qualified Data.Map as Map -import Data.Maybe (fromMaybe, maybe) -import qualified Data.Set as Set +import Control.Concurrent.STM +import Control.Concurrent.STM.TQueue +import Control.Concurrent.STM.TBQueue +import qualified Data.ByteString as BS +import qualified Data.Map as Map +import Data.Maybe (fromMaybe, maybe) +import qualified Data.Set as Set import Data.Time.Clock.POSIX -import Network.Socket hiding (recv, recvFrom, send, - sendTo) +import Network.Socket hiding (recv, recvFrom, send, sendTo) import Network.Socket.ByteString import System.Timeout import Hash2Pub.ASN1Coding -import Hash2Pub.FediChord (CacheEntry (..), NodeCache, - NodeID, NodeState (..), - cacheGetNodeStateUnvalidated, - cacheLookup, cacheLookupPred, - cacheLookupSucc, - getPredecessors, getSuccessors, - localCompare, putPredecessors, - putSuccessors) +import Hash2Pub.FediChord (CacheEntry (..), NodeCache, NodeID, + NodeState (..), + cacheGetNodeStateUnvalidated, + cacheLookup, cacheLookupPred, + cacheLookupSucc, getPredecessors, + getSuccessors, localCompare, + putPredecessors, putSuccessors) import Hash2Pub.ProtocolTypes -import Debug.Trace (trace) +import Debug.Trace (trace) -- === queries === @@ -123,71 +121,41 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc -- ====== message send and receive operations ====== ---requestQueryID :: NodeState -> NodeID -> IO NodeState ----- 1. do a local lookup for the l closest nodes ----- 2. create l sockets ----- 3. send a message async concurrently to all l nodes ----- 4. collect the results, insert them into cache ----- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) ---requestQueryID ns targetID = do --- cacheSnapshot <- readIORef $ getNodeCacheRef ns --- let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID --- -- FOUND can only be returned if targetID is owned by local node --- case localResult of --- FOUND thisNode -> return thisNode --- FORWARD nodeSet -> --- sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet --- -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 --- responses = mapM +requestQueryID :: NodeState -> NodeID -> IO NodeState +-- 1. do a local lookup for the l closest nodes +-- 2. create l sockets +-- 3. send a message async concurrently to all l nodes +-- 4. collect the results, insert them into cache +-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) +requestQueryID ns targetID = do + cacheSnapshot <- readIORef $ getNodeCacheRef ns + let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID + -- FOUND can only be returned if targetID is owned by local node + case localResult of + FOUND thisNode -> return thisNode + FORWARD nodeSet -> + sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet + -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 + responses = mapM --- | Generic function for sending a request over a connected socket and collecting the response. --- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. sendRequestTo :: Int -- ^ timeout in seconds -> Int -- ^ number of retries -> FediChordMessage -- ^ the message to be sent -> Socket -- ^ connected socket to use for sending -> IO (Set.Set FediChordMessage) -- ^ responses -sendRequestTo timeoutMillis numAttempts msg sock = do +sendRequestTo timeout attempts msg sock = do let requests = serialiseMessage 1200 msg - -- create a queue for passing received response messages back, even after a timeout - responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets - -- start sendAndAck with timeout -- ToDo: make attempts and timeout configurable - attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests - -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. - recvdParts <- atomically $ flushTBQueue responseQ - -- PLACEHOLDER - pure Set.empty + attempts 3 . timeout 5000 $ do where -- state reingeben: state = noch nicht geackte messages, result = responses - sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses - -> Socket -- ^ the socket used for sending and receiving for this particular remote node - -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts - -> IO () - sendAndAck responseQueue sock remainingSends = do + sendAndAck :: Socket -> StateT (Map.Map Integer BS.ByteString) IO (Set.Set FediChordMessage) + sendAndAck sock = do + remainingSends <- get sendMany sock $ Map.elems remainingSends - -- if all requests have been acked/ responded to, return prematurely - recvLoop responseQueue remainingSends Set.empty - recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses - -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts - -> Set.Set Integer -- ^ already received response part numbers - -> IO () - recvLoop responseQueue remainingSends' receivedPartNums = do - -- 65535 is maximum length of UDP packets, as long as - -- no IPv6 jumbograms are used - response <- deserialiseMessage <$> recv sock 65535 - case response of - -- drop errors - Left _ -> recvLoop responseQueue remainingSends' receivedPartNums - Right msg -> do - atomically $ writeTBQueue responseQueue msg - let - newRemaining = Map.delete (part msg) remainingSends' - newReceivedParts = Set.insert (part msg) receivedPartNums - -- ToDo: handle responses with more parts than the request - if Map.null newRemaining && Set.size receivedPartNums == fromIntegral (parts msg) - then pure () - else recvLoop responseQueue newRemaining receivedPartNums + -- timeout pro receive socket, danach catMaybes + -- wichtig: Pakete können dupliziert werden, dh es können mehr ACKs als gesendete parts ankommen + replicateM diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 479d90d..78dc711 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -122,43 +122,58 @@ a `localCompare` b -- | represents a node and all its important state -data NodeState = NodeState - { nid :: NodeID - , domain :: String - -- ^ full public domain name the node is reachable under - , ipAddr :: HostAddress6 - -- the node's public IPv6 address - , dhtPort :: PortNumber - -- ^ port of the DHT itself - , apPort :: Maybe PortNumber - -- ^ port of the ActivityPub relay and storage service - , vServerID :: Integer - -- ^ ID of this vserver - , internals :: Maybe InternalNodeState - -- ^ data not present in the representation of remote nodes - } - deriving (Show, Eq) +data NodeState = NodeState { + nid :: NodeID + , domain :: String + -- ^ full public domain name the node is reachable under + , ipAddr :: HostAddress6 + -- the node's public IPv6 address + , dhtPort :: PortNumber + -- ^ port of the DHT itself + , apPort :: Maybe PortNumber + -- ^ port of the ActivityPub relay and storage service + -- might have to be queried first + , vServerID :: Integer + -- ^ ID of this vserver + + -- ==== internal state ==== + , internals :: Maybe InternalNodeState + -- ^ data not present in the representation of remote nodes + -- is put into its own type. + -- This is usually @Nothing@ for all remote nodes. + } deriving (Show, Eq) -- | encapsulates all data and parameters that are not present for remote nodes -data InternalNodeState = InternalNodeState - { nodeCache :: IORef NodeCache - -- ^ EpiChord node cache with expiry times for nodes - , cacheWriteQueue :: TQueue (NodeCache -> NodeCache) - -- ^ cache updates are not written directly to the 'nodeCache' but queued and - , successors :: [NodeID] -- could be a set instead as these are ordered as well - -- ^ successor nodes in ascending order by distance - , predecessors :: [NodeID] - -- ^ predecessor nodes in ascending order by distance - , kNeighbours :: Int - -- ^ desired length of predecessor and successor list - , lNumBestNodes :: Int - -- ^ number of best next hops to provide - , pNumParallelQueries :: Int - -- ^ number of parallel sent queries - , jEntriesPerSlice :: Int - -- ^ number of desired entries per cache slice - } - deriving (Show, Eq) +data InternalNodeState = InternalNodeState { + nodeCache :: IORef NodeCache + -- ^ EpiChord node cache with expiry times for nodes + -- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@. + -- encapsulated into an IORef for allowing concurrent reads without locking + , cacheWriteQueue :: TQueue (NodeCache -> NodeCache) + -- ^ cache updates are not written directly to the 'nodeCache' but queued and + -- only processed by a single writer thread to prevent lost updates. + -- All nodeCache modifying functions have to be partially applied enough before + -- being put into the queue. + -- + , successors :: [NodeID] -- could be a set instead as these are ordered as well + -- ^ successor nodes in ascending order by distance + , predecessors :: [NodeID] + -- ^ predecessor nodes in ascending order by distance + ----- protocol parameters ----- + -- TODO: evaluate moving these somewhere else + , kNeighbours :: Int + -- ^ desired length of predecessor and successor list + -- needs to be parameterisable for simulation purposes + , lNumBestNodes :: Int + -- ^ number of best next hops to provide + -- needs to be parameterisable for simulation purposes + , pNumParallelQueries :: Int + -- ^ number of parallel sent queries + -- needs to be parameterisable for simulation purposes + , jEntriesPerSlice :: Int + -- ^ number of desired entries per cache slice + -- needs to be parameterisable for simulation purposes + } deriving (Show, Eq) -- | defining Show instances to be able to print NodeState for debug purposes instance Typeable a => Show (IORef a) where @@ -215,8 +230,12 @@ getLNumBestNodes = getInternals_ lNumBestNodes type NodeCache = Map.Map NodeID CacheEntry -- |an entry of the 'nodeCache' can hold 2 different kinds of data -data CacheEntry = NodeEntry Bool NodeState POSIXTime - | ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry) +data CacheEntry = + -- | an entry representing its validation status, the node state and its timestamp + NodeEntry Bool NodeState POSIXTime + -- | a proxy field for closing the ring structure, indicating the lookup shall be + -- resumed at the given @NodeID@ unless the @ProxyEntry@ itself holds a @NodeEntry@ + | ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry) deriving (Show, Eq) -- | as a compromise, only NodeEntry components are ordered by their NodeID @@ -228,9 +247,7 @@ instance Ord CacheEntry where extractID (NodeEntry _ eState _) = nid eState extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache" -data ProxyDirection = Backwards - | Forwards - deriving (Show, Eq) +data ProxyDirection = Backwards | Forwards deriving (Show, Eq) instance Enum ProxyDirection where toEnum (-1) = Backwards @@ -413,12 +430,11 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs -- persist them on disk so they can be used for all following bootstraps -- | configuration values used for initialising the FediChord DHT -data FediChordConf = FediChordConf - { confDomain :: String - , confIP :: HostAddress6 - , confDhtPort :: Int - } - deriving (Show, Eq) +data FediChordConf = FediChordConf { + confDomain :: String + , confIP :: HostAddress6 + , confDhtPort :: Int + } deriving (Show, Eq) -- | initialise data structures, compute own IDs and bind to listening socket -- ToDo: load persisted state, thus this function already operates in IO diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs index c7453ab..4271174 100644 --- a/src/Hash2Pub/ProtocolTypes.hs +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -1,69 +1,74 @@ module Hash2Pub.ProtocolTypes where -import qualified Data.Set as Set -import Data.Time.Clock.POSIX (POSIXTime) +import qualified Data.Set as Set +import Data.Time.Clock.POSIX (POSIXTime) -import Hash2Pub.FediChord +import Hash2Pub.FediChord -data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) - | FOUND NodeState - deriving (Show, Eq) +data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^return closest nodes from local cache. + -- whole cache entry is returned for making + -- the entry time stamp available to the + -- protocol serialiser + | FOUND NodeState -- ^node is the responsible node for queried ID + deriving (Show, Eq) -- === protocol serialisation data types -data Action = QueryID - | Join - | Leave - | Stabilise - | Ping - deriving (Show, Eq, Enum) +data Action = + QueryID + | Join + | Leave + | Stabilise + | Ping + deriving (Show, Eq, Enum) -data FediChordMessage = Request - { requestID :: Integer - , sender :: NodeState - , parts :: Integer - , part :: Integer - -- ^ part starts at 1 - , action :: Action - , payload :: Maybe ActionPayload - } - | Response - { responseTo :: Integer - , senderID :: NodeID - , parts :: Integer - , part :: Integer - , action :: Action - , payload :: Maybe ActionPayload - } - deriving (Show, Eq) +data FediChordMessage = + Request { + requestID :: Integer + , sender :: NodeState + , parts :: Integer + , part :: Integer + -- ^ part starts at 0 + , action :: Action + , payload :: Maybe ActionPayload + } + | Response { + responseTo :: Integer + , senderID :: NodeID + , parts :: Integer + , part :: Integer + , action :: Action + , payload :: Maybe ActionPayload + } deriving (Show, Eq) -data ActionPayload = QueryIDRequestPayload - { queryTargetID :: NodeID - , queryLBestNodes :: Integer - } - | JoinRequestPayload - | LeaveRequestPayload - { leaveSuccessors :: [NodeID] - , leavePredecessors :: [NodeID] - } - | StabiliseRequestPayload - | PingRequestPayload - | QueryIDResponsePayload - { queryResult :: QueryResponse - } - | JoinResponsePayload - { joinSuccessors :: [NodeID] - , joinPredecessors :: [NodeID] - , joinCache :: [RemoteCacheEntry] - } - | LeaveResponsePayload - | StabiliseResponsePayload - { stabiliseSuccessors :: [NodeID] - , stabilisePredecessors :: [NodeID] - } - | PingResponsePayload - { pingNodeStates :: [NodeState] - } +data ActionPayload = + QueryIDRequestPayload { + queryTargetID :: NodeID + , queryLBestNodes :: Integer + } + | JoinRequestPayload + | LeaveRequestPayload { + leaveSuccessors :: [NodeID] + , leavePredecessors :: [NodeID] + } + | StabiliseRequestPayload + | PingRequestPayload + | QueryIDResponsePayload { + queryResult :: QueryResponse + } + | JoinResponsePayload { + joinSuccessors :: [NodeID] + , joinPredecessors :: [NodeID] + , joinCache :: [RemoteCacheEntry] + } + | LeaveResponsePayload + | StabiliseResponsePayload { + stabiliseSuccessors :: [NodeID] + , stabilisePredecessors :: [NodeID] + } + | PingResponsePayload { + pingNodeStates :: [NodeState] + } deriving (Show, Eq) -- | global limit of parts per message used when (de)serialising messages.