Compare commits
3 commits
0e6f126b3b
...
beffab99a0
Author | SHA1 | Date | |
---|---|---|---|
|
beffab99a0 | ||
|
c31baa3635 | ||
|
8d18f952cd |
|
@ -10,8 +10,8 @@ Request ::= SEQUENCE {
|
|||
action Action,
|
||||
requestID INTEGER,
|
||||
sender NodeState,
|
||||
parts INTEGER (0..150), -- number of message parts
|
||||
part INTEGER (0..150), -- part number of this message, starts at 1
|
||||
parts INTEGER (1..150), -- number of message parts
|
||||
part INTEGER (1..150), -- part number of this message, starts at 1
|
||||
actionPayload CHOICE {
|
||||
queryIDRequestPayload QueryIDRequestPayload,
|
||||
joinRequestPayload JoinRequestPayload,
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
module Hash2Pub.ASN1Coding where
|
||||
|
||||
import Control.Exception (displayException)
|
||||
import Data.ASN1.BinaryEncoding -- asn1-encoding package
|
||||
import Data.ASN1.BinaryEncoding
|
||||
import Data.ASN1.Encoding
|
||||
import Data.ASN1.Error ()
|
||||
import Data.ASN1.Parse
|
||||
|
@ -17,8 +17,8 @@ import Data.Time.Clock.POSIX ()
|
|||
import Safe
|
||||
|
||||
import Hash2Pub.FediChord
|
||||
import Hash2Pub.Utils
|
||||
import Hash2Pub.ProtocolTypes
|
||||
import Hash2Pub.Utils
|
||||
|
||||
import Debug.Trace
|
||||
|
||||
|
|
|
@ -16,25 +16,27 @@ module Hash2Pub.DHTProtocol
|
|||
where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.STM.TQueue
|
||||
import Control.Concurrent.STM.TBQueue
|
||||
import Control.Concurrent.STM.TQueue
|
||||
import qualified Data.ByteString as BS
|
||||
import qualified Data.Map as Map
|
||||
import Data.Maybe (fromMaybe, maybe)
|
||||
import qualified Data.Set as Set
|
||||
import Data.Time.Clock.POSIX
|
||||
import Network.Socket hiding (recv, recvFrom, send, sendTo)
|
||||
import Network.Socket hiding (recv, recvFrom, send,
|
||||
sendTo)
|
||||
import Network.Socket.ByteString
|
||||
import System.Timeout
|
||||
|
||||
import Hash2Pub.ASN1Coding
|
||||
import Hash2Pub.FediChord (CacheEntry (..), NodeCache, NodeID,
|
||||
NodeState (..),
|
||||
import Hash2Pub.FediChord (CacheEntry (..), NodeCache,
|
||||
NodeID, NodeState (..),
|
||||
cacheGetNodeStateUnvalidated,
|
||||
cacheLookup, cacheLookupPred,
|
||||
cacheLookupSucc, getPredecessors,
|
||||
getSuccessors, localCompare,
|
||||
putPredecessors, putSuccessors)
|
||||
cacheLookupSucc,
|
||||
getPredecessors, getSuccessors,
|
||||
localCompare, putPredecessors,
|
||||
putSuccessors)
|
||||
import Hash2Pub.ProtocolTypes
|
||||
|
||||
import Debug.Trace (trace)
|
||||
|
@ -121,41 +123,71 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
|
|||
|
||||
-- ====== message send and receive operations ======
|
||||
|
||||
requestQueryID :: NodeState -> NodeID -> IO NodeState
|
||||
-- 1. do a local lookup for the l closest nodes
|
||||
-- 2. create l sockets
|
||||
-- 3. send a message async concurrently to all l nodes
|
||||
-- 4. collect the results, insert them into cache
|
||||
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
|
||||
requestQueryID ns targetID = do
|
||||
cacheSnapshot <- readIORef $ getNodeCacheRef ns
|
||||
let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
|
||||
-- FOUND can only be returned if targetID is owned by local node
|
||||
case localResult of
|
||||
FOUND thisNode -> return thisNode
|
||||
FORWARD nodeSet ->
|
||||
sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
|
||||
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||
responses = mapM
|
||||
--requestQueryID :: NodeState -> NodeID -> IO NodeState
|
||||
---- 1. do a local lookup for the l closest nodes
|
||||
---- 2. create l sockets
|
||||
---- 3. send a message async concurrently to all l nodes
|
||||
---- 4. collect the results, insert them into cache
|
||||
---- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
|
||||
--requestQueryID ns targetID = do
|
||||
-- cacheSnapshot <- readIORef $ getNodeCacheRef ns
|
||||
-- let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
|
||||
-- -- FOUND can only be returned if targetID is owned by local node
|
||||
-- case localResult of
|
||||
-- FOUND thisNode -> return thisNode
|
||||
-- FORWARD nodeSet ->
|
||||
-- sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
|
||||
-- -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||
-- responses = mapM
|
||||
|
||||
-- | Generic function for sending a request over a connected socket and collecting the response.
|
||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
||||
sendRequestTo :: Int -- ^ timeout in seconds
|
||||
-> Int -- ^ number of retries
|
||||
-> FediChordMessage -- ^ the message to be sent
|
||||
-> Socket -- ^ connected socket to use for sending
|
||||
-> IO (Set.Set FediChordMessage) -- ^ responses
|
||||
sendRequestTo timeout attempts msg sock = do
|
||||
sendRequestTo timeoutMillis numAttempts msg sock = do
|
||||
let requests = serialiseMessage 1200 msg
|
||||
-- create a queue for passing received response messages back, even after a timeout
|
||||
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
|
||||
-- start sendAndAck with timeout
|
||||
-- ToDo: make attempts and timeout configurable
|
||||
attempts 3 . timeout 5000 $ do
|
||||
attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests
|
||||
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
|
||||
recvdParts <- atomically $ flushTBQueue responseQ
|
||||
-- PLACEHOLDER
|
||||
pure Set.empty
|
||||
where
|
||||
-- state reingeben: state = noch nicht geackte messages, result = responses
|
||||
sendAndAck :: Socket -> StateT (Map.Map Integer BS.ByteString) IO (Set.Set FediChordMessage)
|
||||
sendAndAck sock = do
|
||||
remainingSends <- get
|
||||
sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
||||
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
|
||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
||||
-> IO ()
|
||||
sendAndAck responseQueue sock remainingSends = do
|
||||
sendMany sock $ Map.elems remainingSends
|
||||
-- timeout pro receive socket, danach catMaybes
|
||||
-- wichtig: Pakete können dupliziert werden, dh es können mehr ACKs als gesendete parts ankommen
|
||||
replicateM
|
||||
-- if all requests have been acked/ responded to, return prematurely
|
||||
recvLoop responseQueue remainingSends Set.empty
|
||||
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
||||
-> Set.Set Integer -- ^ already received response part numbers
|
||||
-> IO ()
|
||||
recvLoop responseQueue remainingSends' receivedPartNums = do
|
||||
-- 65535 is maximum length of UDP packets, as long as
|
||||
-- no IPv6 jumbograms are used
|
||||
response <- deserialiseMessage <$> recv sock 65535
|
||||
case response of
|
||||
-- drop errors
|
||||
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums
|
||||
Right msg -> do
|
||||
atomically $ writeTBQueue responseQueue msg
|
||||
let
|
||||
newRemaining = Map.delete (part msg) remainingSends'
|
||||
newReceivedParts = Set.insert (part msg) receivedPartNums
|
||||
-- ToDo: handle responses with more parts than the request
|
||||
if Map.null newRemaining && Set.size receivedPartNums == fromIntegral (parts msg)
|
||||
then pure ()
|
||||
else recvLoop responseQueue newRemaining receivedPartNums
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -122,8 +122,8 @@ a `localCompare` b
|
|||
|
||||
|
||||
-- | represents a node and all its important state
|
||||
data NodeState = NodeState {
|
||||
nid :: NodeID
|
||||
data NodeState = NodeState
|
||||
{ nid :: NodeID
|
||||
, domain :: String
|
||||
-- ^ full public domain name the node is reachable under
|
||||
, ipAddr :: HostAddress6
|
||||
|
@ -132,48 +132,33 @@ data NodeState = NodeState {
|
|||
-- ^ port of the DHT itself
|
||||
, apPort :: Maybe PortNumber
|
||||
-- ^ port of the ActivityPub relay and storage service
|
||||
-- might have to be queried first
|
||||
, vServerID :: Integer
|
||||
-- ^ ID of this vserver
|
||||
|
||||
-- ==== internal state ====
|
||||
, internals :: Maybe InternalNodeState
|
||||
-- ^ data not present in the representation of remote nodes
|
||||
-- is put into its own type.
|
||||
-- This is usually @Nothing@ for all remote nodes.
|
||||
} deriving (Show, Eq)
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- | encapsulates all data and parameters that are not present for remote nodes
|
||||
data InternalNodeState = InternalNodeState {
|
||||
nodeCache :: IORef NodeCache
|
||||
data InternalNodeState = InternalNodeState
|
||||
{ nodeCache :: IORef NodeCache
|
||||
-- ^ EpiChord node cache with expiry times for nodes
|
||||
-- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@.
|
||||
-- encapsulated into an IORef for allowing concurrent reads without locking
|
||||
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
|
||||
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
|
||||
-- only processed by a single writer thread to prevent lost updates.
|
||||
-- All nodeCache modifying functions have to be partially applied enough before
|
||||
-- being put into the queue.
|
||||
--
|
||||
, successors :: [NodeID] -- could be a set instead as these are ordered as well
|
||||
-- ^ successor nodes in ascending order by distance
|
||||
, predecessors :: [NodeID]
|
||||
-- ^ predecessor nodes in ascending order by distance
|
||||
----- protocol parameters -----
|
||||
-- TODO: evaluate moving these somewhere else
|
||||
, kNeighbours :: Int
|
||||
-- ^ desired length of predecessor and successor list
|
||||
-- needs to be parameterisable for simulation purposes
|
||||
, lNumBestNodes :: Int
|
||||
-- ^ number of best next hops to provide
|
||||
-- needs to be parameterisable for simulation purposes
|
||||
, pNumParallelQueries :: Int
|
||||
-- ^ number of parallel sent queries
|
||||
-- needs to be parameterisable for simulation purposes
|
||||
, jEntriesPerSlice :: Int
|
||||
-- ^ number of desired entries per cache slice
|
||||
-- needs to be parameterisable for simulation purposes
|
||||
} deriving (Show, Eq)
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- | defining Show instances to be able to print NodeState for debug purposes
|
||||
instance Typeable a => Show (IORef a) where
|
||||
|
@ -230,11 +215,7 @@ getLNumBestNodes = getInternals_ lNumBestNodes
|
|||
type NodeCache = Map.Map NodeID CacheEntry
|
||||
|
||||
-- |an entry of the 'nodeCache' can hold 2 different kinds of data
|
||||
data CacheEntry =
|
||||
-- | an entry representing its validation status, the node state and its timestamp
|
||||
NodeEntry Bool NodeState POSIXTime
|
||||
-- | a proxy field for closing the ring structure, indicating the lookup shall be
|
||||
-- resumed at the given @NodeID@ unless the @ProxyEntry@ itself holds a @NodeEntry@
|
||||
data CacheEntry = NodeEntry Bool NodeState POSIXTime
|
||||
| ProxyEntry (NodeID, ProxyDirection) (Maybe CacheEntry)
|
||||
deriving (Show, Eq)
|
||||
|
||||
|
@ -247,7 +228,9 @@ instance Ord CacheEntry where
|
|||
extractID (NodeEntry _ eState _) = nid eState
|
||||
extractID (ProxyEntry _ _) = error "proxy entries should never appear outside of the NodeCache"
|
||||
|
||||
data ProxyDirection = Backwards | Forwards deriving (Show, Eq)
|
||||
data ProxyDirection = Backwards
|
||||
| Forwards
|
||||
deriving (Show, Eq)
|
||||
|
||||
instance Enum ProxyDirection where
|
||||
toEnum (-1) = Backwards
|
||||
|
@ -430,11 +413,12 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
|
|||
-- persist them on disk so they can be used for all following bootstraps
|
||||
|
||||
-- | configuration values used for initialising the FediChord DHT
|
||||
data FediChordConf = FediChordConf {
|
||||
confDomain :: String
|
||||
data FediChordConf = FediChordConf
|
||||
{ confDomain :: String
|
||||
, confIP :: HostAddress6
|
||||
, confDhtPort :: Int
|
||||
} deriving (Show, Eq)
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- | initialise data structures, compute own IDs and bind to listening socket
|
||||
-- ToDo: load persisted state, thus this function already operates in IO
|
||||
|
|
|
@ -5,69 +5,64 @@ import Data.Time.Clock.POSIX (POSIXTime)
|
|||
|
||||
import Hash2Pub.FediChord
|
||||
|
||||
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^return closest nodes from local cache.
|
||||
-- whole cache entry is returned for making
|
||||
-- the entry time stamp available to the
|
||||
-- protocol serialiser
|
||||
| FOUND NodeState -- ^node is the responsible node for queried ID
|
||||
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry)
|
||||
| FOUND NodeState
|
||||
deriving (Show, Eq)
|
||||
|
||||
-- === protocol serialisation data types
|
||||
|
||||
data Action =
|
||||
QueryID
|
||||
data Action = QueryID
|
||||
| Join
|
||||
| Leave
|
||||
| Stabilise
|
||||
| Ping
|
||||
deriving (Show, Eq, Enum)
|
||||
|
||||
data FediChordMessage =
|
||||
Request {
|
||||
requestID :: Integer
|
||||
data FediChordMessage = Request
|
||||
{ requestID :: Integer
|
||||
, sender :: NodeState
|
||||
, parts :: Integer
|
||||
, part :: Integer
|
||||
-- ^ part starts at 0
|
||||
-- ^ part starts at 1
|
||||
, action :: Action
|
||||
, payload :: Maybe ActionPayload
|
||||
}
|
||||
| Response {
|
||||
responseTo :: Integer
|
||||
| Response
|
||||
{ responseTo :: Integer
|
||||
, senderID :: NodeID
|
||||
, parts :: Integer
|
||||
, part :: Integer
|
||||
, action :: Action
|
||||
, payload :: Maybe ActionPayload
|
||||
} deriving (Show, Eq)
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
data ActionPayload =
|
||||
QueryIDRequestPayload {
|
||||
queryTargetID :: NodeID
|
||||
data ActionPayload = QueryIDRequestPayload
|
||||
{ queryTargetID :: NodeID
|
||||
, queryLBestNodes :: Integer
|
||||
}
|
||||
| JoinRequestPayload
|
||||
| LeaveRequestPayload {
|
||||
leaveSuccessors :: [NodeID]
|
||||
| LeaveRequestPayload
|
||||
{ leaveSuccessors :: [NodeID]
|
||||
, leavePredecessors :: [NodeID]
|
||||
}
|
||||
| StabiliseRequestPayload
|
||||
| PingRequestPayload
|
||||
| QueryIDResponsePayload {
|
||||
queryResult :: QueryResponse
|
||||
| QueryIDResponsePayload
|
||||
{ queryResult :: QueryResponse
|
||||
}
|
||||
| JoinResponsePayload {
|
||||
joinSuccessors :: [NodeID]
|
||||
| JoinResponsePayload
|
||||
{ joinSuccessors :: [NodeID]
|
||||
, joinPredecessors :: [NodeID]
|
||||
, joinCache :: [RemoteCacheEntry]
|
||||
}
|
||||
| LeaveResponsePayload
|
||||
| StabiliseResponsePayload {
|
||||
stabiliseSuccessors :: [NodeID]
|
||||
| StabiliseResponsePayload
|
||||
{ stabiliseSuccessors :: [NodeID]
|
||||
, stabilisePredecessors :: [NodeID]
|
||||
}
|
||||
| PingResponsePayload {
|
||||
pingNodeStates :: [NodeState]
|
||||
| PingResponsePayload
|
||||
{ pingNodeStates :: [NodeState]
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
|
|
Loading…
Reference in a new issue