Compare commits

..

3 commits

5 changed files with 179 additions and 168 deletions

View file

@ -10,8 +10,8 @@ Request ::= SEQUENCE {
action Action,
requestID INTEGER,
sender NodeState,
parts INTEGER (0..150), -- number of message parts
part INTEGER (0..150), -- part number of this message, starts at 1
parts INTEGER (1..150), -- number of message parts
part INTEGER (1..150), -- part number of this message, starts at 1
actionPayload CHOICE {
queryIDRequestPayload QueryIDRequestPayload,
joinRequestPayload JoinRequestPayload,

View file

@ -3,7 +3,7 @@
module Hash2Pub.ASN1Coding where
import Control.Exception (displayException)
import Data.ASN1.BinaryEncoding -- asn1-encoding package
import Data.ASN1.BinaryEncoding
import Data.ASN1.Encoding
import Data.ASN1.Error ()
import Data.ASN1.Parse
@ -17,8 +17,8 @@ import Data.Time.Clock.POSIX ()
import Safe
import Hash2Pub.FediChord
import Hash2Pub.Utils
import Hash2Pub.ProtocolTypes
import Hash2Pub.Utils
import Debug.Trace

View file

@ -16,25 +16,27 @@ module Hash2Pub.DHTProtocol
where
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TBQueue
import Control.Concurrent.STM.TQueue
import qualified Data.ByteString as BS
import qualified Data.Map as Map
import Data.Maybe (fromMaybe, maybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX
import Network.Socket hiding (recv, recvFrom, send, sendTo)
import Network.Socket hiding (recv, recvFrom, send,
sendTo)
import Network.Socket.ByteString
import System.Timeout
import Hash2Pub.ASN1Coding
import Hash2Pub.FediChord (CacheEntry (..), NodeCache, NodeID,
NodeState (..),
import Hash2Pub.FediChord (CacheEntry (..), NodeCache,
NodeID, NodeState (..),
cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred,
cacheLookupSucc, getPredecessors,
getSuccessors, localCompare,
putPredecessors, putSuccessors)
cacheLookupSucc,
getPredecessors, getSuccessors,
localCompare, putPredecessors,
putSuccessors)
import Hash2Pub.ProtocolTypes
import Debug.Trace (trace)
@ -121,41 +123,71 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
-- ====== message send and receive operations ======
requestQueryID :: NodeState -> NodeID -> IO NodeState
-- 1. do a local lookup for the l closest nodes
-- 2. create l sockets
-- 3. send a message async concurrently to all l nodes
-- 4. collect the results, insert them into cache
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
requestQueryID ns targetID = do
cacheSnapshot <- readIORef $ getNodeCacheRef ns
let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
-- FOUND can only be returned if targetID is owned by local node
case localResult of
FOUND thisNode -> return thisNode
FORWARD nodeSet ->
sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
responses = mapM
--requestQueryID :: NodeState -> NodeID -> IO NodeState
---- 1. do a local lookup for the l closest nodes
---- 2. create l sockets
---- 3. send a message async concurrently to all l nodes
---- 4. collect the results, insert them into cache
---- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
--requestQueryID ns targetID = do
-- cacheSnapshot <- readIORef $ getNodeCacheRef ns
-- let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
-- -- FOUND can only be returned if targetID is owned by local node
-- case localResult of
-- FOUND thisNode -> return thisNode
-- FORWARD nodeSet ->
-- sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
-- -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
-- responses = mapM
-- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestTo :: Int -- ^ timeout in seconds
-> Int -- ^ number of retries
-> FediChordMessage -- ^ the message to be sent
-> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestTo timeout attempts msg sock = do
sendRequestTo timeoutMillis numAttempts msg sock = do
let requests = serialiseMessage 1200 msg
-- create a queue for passing received response messages back, even after a timeout
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
-- start sendAndAck with timeout
-- ToDo: make attempts and timeout configurable
attempts 3 . timeout 5000 $ do
attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
recvdParts <- atomically $ flushTBQueue responseQ
-- PLACEHOLDER
pure Set.empty
where
-- state reingeben: state = noch nicht geackte messages, result = responses
sendAndAck :: Socket -> StateT (Map.Map Integer BS.ByteString) IO (Set.Set FediChordMessage)
sendAndAck sock = do
remainingSends <- get
sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> IO ()
sendAndAck responseQueue sock remainingSends = do
sendMany sock $ Map.elems remainingSends
-- timeout pro receive socket, danach catMaybes
-- wichtig: Pakete können dupliziert werden, dh es können mehr ACKs als gesendete parts ankommen
replicateM
-- if all requests have been acked/ responded to, return prematurely
recvLoop responseQueue remainingSends Set.empty
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> Set.Set Integer -- ^ already received response part numbers
-> IO ()
recvLoop responseQueue remainingSends' receivedPartNums = do
-- 65535 is maximum length of UDP packets, as long as
-- no IPv6 jumbograms are used
response <- deserialiseMessage <$> recv sock 65535
case response of
-- drop errors
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums
Right msg -> do
atomically $ writeTBQueue responseQueue msg
let
newRemaining = Map.delete (part msg) remainingSends'
newReceivedParts = Set.insert (part msg) receivedPartNums
-- ToDo: handle responses with more parts than the request
if Map.null newRemaining && Set.size receivedPartNums == fromIntegral (parts msg)
then pure ()
else recvLoop responseQueue newRemaining receivedPartNums

View file

@ -122,8 +122,8 @@ a `localCompare` b
-- | represents a node and all its important state
data NodeState = NodeState {
nid :: NodeID
data NodeState = NodeState
{ nid :: NodeID
, domain :: String
-- ^ full public domain name the node is reachable under
, ipAddr :: HostAddress6
@ -132,48 +132,33 @@ data NodeState = NodeState {
-- ^ port of the DHT itself
, apPort :: Maybe PortNumber
-- ^ port of the ActivityPub relay and storage service
-- might have to be queried first
, vServerID :: Integer
-- ^ ID of this vserver
-- ==== internal state ====
, internals :: Maybe InternalNodeState
-- ^ data not present in the representation of remote nodes
-- is put into its own type.
-- This is usually @Nothing@ for all remote nodes.
} deriving (Show, Eq)
}
deriving (Show, Eq)
-- | encapsulates all data and parameters that are not present for remote nodes
data InternalNodeState = InternalNodeState {
nodeCache :: IORef NodeCache
data InternalNodeState = InternalNodeState
{ nodeCache :: IORef NodeCache
-- ^ EpiChord node cache with expiry times for nodes
-- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@.
-- encapsulated into an IORef for allowing concurrent reads without locking
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
-- only processed by a single writer thread to prevent lost updates.
-- All nodeCache modifying functions have to be partially applied enough before
-- being put into the queue.
--
, successors :: [NodeID] -- could be a set instead as these are ordered as well
-- ^ successor nodes in ascending order by distance
, predecessors :: [NodeID]
-- ^ predecessor nodes in ascending order by distance
----- protocol parameters -----
-- TODO: evaluate moving these somewhere else
, kNeighbours :: Int
-- ^ desired length of predecessor and successor list
-- needs to be parameterisable for simulation purposes
, lNumBestNodes :: Int
-- ^ number of best next hops to provide
-- needs to be parameterisable for simulation purposes
, pNumParallelQueries :: Int
-- ^ number of parallel sent queries
-- needs to be parameterisable for simulation purposes
, jEntriesPerSlice :: Int
-- ^ number of desired entries per cache slice
-- needs to be parameterisable for simulation purposes
} deriving (Show, Eq)
}
deriving (Show, Eq)
-- | defining Show instances to be able to print NodeState for debug purposes
instance Typeable a => Show (IORef a) where
@ -230,11 +215,7 @@ getLNumBestNodes = getInternals_ lNumBestNodes
type NodeCache = Map.Map NodeID CacheEntry
-- |an entry of the 'nodeCache' can hold 2 different kinds of data
data CacheEntry =
-- | an entry representing its validation status, the node state and its timestamp
NodeEntry Bool NodeState POSIXTime
-- | a proxy field for closing the ring structure, indicating the lookup shall be
-- resumed at the given @NodeID@ unless the @ProxyEntry@ itself holds a @NodeEntry@
data CacheEntry = NodeEntry Bool NodeState POSIXTime
| ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry)
deriving (Show, Eq)
@ -247,7 +228,9 @@ instance Ord CacheEntry where
extractID (NodeEntry _ eState _) = nid eState
extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache"
data ProxyDirection = Backwards | Forwards deriving (Show, Eq)
data ProxyDirection = Backwards
| Forwards
deriving (Show, Eq)
instance Enum ProxyDirection where
toEnum (-1) = Backwards
@ -430,11 +413,12 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
-- persist them on disk so they can be used for all following bootstraps
-- | configuration values used for initialising the FediChord DHT
data FediChordConf = FediChordConf {
confDomain :: String
data FediChordConf = FediChordConf
{ confDomain :: String
, confIP :: HostAddress6
, confDhtPort :: Int
} deriving (Show, Eq)
}
deriving (Show, Eq)
-- | initialise data structures, compute own IDs and bind to listening socket
-- ToDo: load persisted state, thus this function already operates in IO

View file

@ -5,69 +5,64 @@ import Data.Time.Clock.POSIX (POSIXTime)
import Hash2Pub.FediChord
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^return closest nodes from local cache.
-- whole cache entry is returned for making
-- the entry time stamp available to the
-- protocol serialiser
| FOUND NodeState -- ^node is the responsible node for queried ID
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry)
| FOUND NodeState
deriving (Show, Eq)
-- === protocol serialisation data types
data Action =
QueryID
data Action = QueryID
| Join
| Leave
| Stabilise
| Ping
deriving (Show, Eq, Enum)
data FediChordMessage =
Request {
requestID :: Integer
data FediChordMessage = Request
{ requestID :: Integer
, sender :: NodeState
, parts :: Integer
, part :: Integer
-- ^ part starts at 0
-- ^ part starts at 1
, action :: Action
, payload :: Maybe ActionPayload
}
| Response {
responseTo :: Integer
| Response
{ responseTo :: Integer
, senderID :: NodeID
, parts :: Integer
, part :: Integer
, action :: Action
, payload :: Maybe ActionPayload
} deriving (Show, Eq)
}
deriving (Show, Eq)
data ActionPayload =
QueryIDRequestPayload {
queryTargetID :: NodeID
data ActionPayload = QueryIDRequestPayload
{ queryTargetID :: NodeID
, queryLBestNodes :: Integer
}
| JoinRequestPayload
| LeaveRequestPayload {
leaveSuccessors :: [NodeID]
| LeaveRequestPayload
{ leaveSuccessors :: [NodeID]
, leavePredecessors :: [NodeID]
}
| StabiliseRequestPayload
| PingRequestPayload
| QueryIDResponsePayload {
queryResult :: QueryResponse
| QueryIDResponsePayload
{ queryResult :: QueryResponse
}
| JoinResponsePayload {
joinSuccessors :: [NodeID]
| JoinResponsePayload
{ joinSuccessors :: [NodeID]
, joinPredecessors :: [NodeID]
, joinCache :: [RemoteCacheEntry]
}
| LeaveResponsePayload
| StabiliseResponsePayload {
stabiliseSuccessors :: [NodeID]
| StabiliseResponsePayload
{ stabiliseSuccessors :: [NodeID]
, stabilisePredecessors :: [NodeID]
}
| PingResponsePayload {
pingNodeStates :: [NodeState]
| PingResponsePayload
{ pingNodeStates :: [NodeState]
}
deriving (Show, Eq)