forked from schmittlauch/Hash2Pub
Compare commits
3 commits
master
...
dhtNetwork
Author | SHA1 | Date | |
---|---|---|---|
Trolli Schmittlauch | 4e62bb08f8 | ||
Trolli Schmittlauch | 8b01ad2f37 | ||
Trolli Schmittlauch | b8be20b86e |
0
Hash2Pub/.gitignore → .gitignore
vendored
0
Hash2Pub/.gitignore → .gitignore
vendored
|
@ -46,7 +46,7 @@ category: Network
|
|||
extra-source-files: CHANGELOG.md
|
||||
|
||||
common deps
|
||||
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute
|
||||
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl
|
||||
ghc-options: -Wall
|
||||
|
||||
|
17
Readme.md
Normal file
17
Readme.md
Normal file
|
@ -0,0 +1,17 @@
|
|||
# Hash2Pub
|
||||
|
||||
***This is heavily WIP and does not provide any useful functionality yet***.
|
||||
I aim for always having the master branch at a state where it builds and tests pass.
|
||||
|
||||
A fully-decentralised relay for global hashtag federation in [ActivityPub](https://activitypub.rocks) based on a distributed hash table.
|
||||
It allows querying and subscribing to all posts of a certain hashtag and is implemented in Haskell.
|
||||
|
||||
This is the practical implementation of the concept presented in the paper [Decentralised Hashtag Search and Subscription for Federated Social Networks](https://git.orlives.de/schmittlauch/paper_hashtag_federation). A 30 minutes [introduction talk](https://conf.tube/videos/watch/340eb706-28c0-4a43-9364-700297ca96cb) is available as well.
|
||||
|
||||
The ASN.1 module schema used for DHT messages can be found in `FediChord.asn1`.
|
||||
|
||||
## Building
|
||||
|
||||
The project and its developent environment are built with [Nix](https://nixos.org/nix/).
|
||||
|
||||
The development environment can be entered with `nix-shell`. Then the project can be built with `cabal build` from within the environment, or using `nix-shell --command "cabal build"` to do both steps at once.
|
|
@ -23,6 +23,8 @@ import qualified Data.Map as Map
|
|||
import Data.Time.Clock.POSIX
|
||||
import Network.Socket hiding (send, sendTo, recv, recvFrom)
|
||||
import Network.Socket.ByteString
|
||||
import System.Timeout
|
||||
import Control.Monad.State.Strict
|
||||
|
||||
import Hash2Pub.FediChord
|
||||
( NodeID
|
||||
|
@ -210,6 +212,52 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
|
|||
adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
|
||||
adjustFunc entry = entry
|
||||
|
||||
-- ====== message send and receive operations ======
|
||||
|
||||
requestQueryID :: NodeState -> NodeID -> IO NodeState
|
||||
-- 1. do a local lookup for the l closest nodes
|
||||
-- 2. create l sockets
|
||||
-- 3. send a message async concurrently to all l nodes
|
||||
-- 4. collect the results, insert them into cache
|
||||
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
|
||||
requestQueryID ns targetID = do
|
||||
cacheSnapshot <- readIORef $ getNodeCacheRef ns
|
||||
let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
|
||||
-- FOUND can only be returned if targetID is owned by local node
|
||||
case localResult of
|
||||
FOUND thisNode -> return thisNode
|
||||
FORWARD nodeSet ->
|
||||
sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
|
||||
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||
responses = mapM
|
||||
|
||||
sendRequestTo :: Int -- ^ timeout in seconds
|
||||
-> Int -- ^ number of retries
|
||||
-> FediChordMessage -- ^ the message to be sent
|
||||
-> Socket -- ^ connected socket to use for sending
|
||||
-> IO (Set.Set FediChordMessage) -- ^ responses
|
||||
sendRequestTo timeout attempts msg sock = do
|
||||
let requests = serialiseMessage 1200 msg
|
||||
-- ToDo: make attempts and timeout configurable
|
||||
attempts 3 . timeout 5000 $ do
|
||||
where
|
||||
-- state reingeben: state = noch nicht geackte messages, result = responses
|
||||
sendAndAck :: Socket -> StateT (Map.Map Integer BS.ByteString) IO (Set.Set FediChordMessage)
|
||||
sendAndAck sock = do
|
||||
remainingSends <- get
|
||||
sendMany sock $ Map.elems remainingSends
|
||||
-- timeout pro receive socket, danach catMaybes
|
||||
-- wichtig: Pakete können dupliziert werden, dh es können mehr ACKs als gesendete parts ankommen
|
||||
replicateM
|
||||
|
||||
|
||||
|
||||
|
||||
-- idea: send all parts at once
|
||||
-- Set/ Map with unacked parts
|
||||
-- then recv with timeout for |unackedParts| attempts, receive acked parts from set/ map
|
||||
-- how to manage individual retries? nested "attempts"
|
||||
|
||||
-- | retry an IO action at most *i* times until it delivers a result
|
||||
attempts :: Int -- ^ number of retries *i*
|
||||
-> IO (Maybe a) -- ^ action to retry
|
Loading…
Reference in a new issue