begin implementation of message sending

This commit is contained in:
Trolli Schmittlauch 2020-05-17 01:24:56 +02:00
parent fdd4efe269
commit b8be20b86e
2 changed files with 49 additions and 1 deletions

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md extra-source-files: CHANGELOG.md
common deps common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl
ghc-options: -Wall ghc-options: -Wall

View file

@ -23,6 +23,8 @@ import qualified Data.Map as Map
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Network.Socket hiding (send, sendTo, recv, recvFrom) import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString import Network.Socket.ByteString
import System.Timeout
import Control.Monad.State.Strict
import Hash2Pub.FediChord import Hash2Pub.FediChord
( NodeID ( NodeID
@ -210,6 +212,52 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
adjustFunc entry = entry adjustFunc entry = entry
-- ====== message send and receive operations ======
requestQueryID :: NodeState -> NodeID -> IO NodeState
-- 1. do a local lookup for the l closest nodes
-- 2. create l sockets
-- 3. send a message async concurrently to all l nodes
-- 4. collect the results, insert them into cache
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
requestQueryID ns targetID = do
cacheSnapshot <- readIORef $ getNodeCacheRef ns
let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
-- FOUND can only be returned if targetID is owned by local node
case localResult of
FOUND thisNode -> return thisNode
FORWARD nodeSet ->
sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
responses = mapM
sendRequestTo :: Int -- ^ timeout in seconds
-> Int -- ^ number of retries
-> FediChordMessage -- ^ the message to be sent
-> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestTo timeout attempts msg sock = do
let requests = serialiseMessage 1200 msg
-- ToDo: make attempts and timeout configurable
attempts 3 . timeout 5000 $ do
where
-- state reingeben: state = noch nicht geackte messages, result = responses
sendAndAck :: Socket -> StateT (Map.Map Integer BS.ByteString) IO (Set.Set FediChordMessage)
sendAndAck sock = do
remainingSends <- get
sendMany sock $ Map.elems remainingSends
-- timeout pro receive socket, danach catMaybes
-- wichtig: Pakete können dupliziert werden, dh es können mehr ACKs als gesendete parts ankommen
replicateM
-- idea: send all parts at once
-- Set/ Map with unacked parts
-- then recv with timeout for |unackedParts| attempts, receive acked parts from set/ map
-- how to manage individual retries? nested "attempts"
-- | retry an IO action at most *i* times until it delivers a result -- | retry an IO action at most *i* times until it delivers a result
attempts :: Int -- ^ number of retries *i* attempts :: Int -- ^ number of retries *i*
-> IO (Maybe a) -- ^ action to retry -> IO (Maybe a) -- ^ action to retry