Questo thread della coda .NET senza blocco è sicuro?

La mia domanda è, è la class inclusa di seguito per una class di coda single-reader single-writer thread-safe? Questo tipo di coda è chiamato lock-free, anche se bloccherà se la coda è piena. La struttura dei dati è stata ispirata dall’implementazione da parte di Marc Gravell di una coda di blocco qui su StackOverflow.

Il punto della struttura è consentire a un singolo thread di scrivere dati nel buffer e un altro thread per leggere i dati. Tutto ciò deve accadere il più rapidamente ansible.

Una struttura dati simile è descritta in un articolo su DDJ di Herb Sutter , eccetto che l’implementazione è in C ++. Un’altra differenza è che io uso una lista collegata alla vaniglia, io uso una lista collegata di matrici.

Piuttosto che includere solo uno snippet di codice, includo l’intera faccenda con commenti con una licenza open source permissiva (MIT License 1.0) nel caso qualcuno la trovi utile e voglia usarla (così com’è o modificata).

Questo è legato ad altre domande poste su Stack Overflow su come creare un blocco di code concorrenti (vedere Creazione di un blockinq Queue in .NET e implementazione della coda di blocco thread-safe in .NET ).

Ecco il codice:

using System; using System.Collections.Generic; using System.Threading; using System.Diagnostics; namespace CollectionSandbox { /// This is a single reader / singler writer buffered queue implemented /// with (almost) no locks. This implementation will block only if filled /// up. The implementation is a linked-list of arrays. /// It was inspired by the desire to create a non-blocking version /// of the blocking queue implementation in C# by Marc Gravell /// https://stackoverflow.com/questions/530211/creating-a-blocking-queuet-in-net/530228#530228 class SimpleSharedQueue : IStreamBuffer { /// Used to signal things are no longer full ManualResetEvent canWrite = new ManualResetEvent(true); /// This is the size of a buffer const int BUFFER_SIZE = 512; /// This is the maximum number of nodes. const int MAX_NODE_COUNT = 100; /// This marks the location to write new data to. Cursor adder; /// This marks the location to read new data from. Cursor remover; /// Indicates that no more data is going to be written to the node. public bool completed = false; /// A node is an array of data items, a pointer to the next item, /// and in index of the number of occupied items class Node { /// Where the data is stored. public T[] data = new T[BUFFER_SIZE]; /// The number of data items currently stored in the node. public Node next; /// The number of data items currently stored in the node. public int count; /// Default constructor, only used for first node. public Node() { count = 0; } /// Only ever called by the writer to add new Nodes to the scene public Node(T x, Node prev) { data[0] = x; count = 1; // The previous node has to be safely updated to point to this node. // A reader could looking at the point, while we set it, so this should be // atomic. Interlocked.Exchange(ref prev.next, this); } } /// This is used to point to a location within a single node, and can perform /// reads or writers. One cursor will only ever read, and another cursor will only /// ever write. class Cursor { /// Points to the parent Queue public SimpleSharedQueue q; /// The current node public Node node; /// For a writer, this points to the position that the next item will be written to. /// For a reader, this points to the position that the next item will be read from. public int current = 0; /// Creates a new cursor, pointing to the node public Cursor(SimpleSharedQueue q, Node node) { this.q = q; this.node = node; } /// Used to push more data onto the queue public void Write(T x) { Trace.Assert(current == node.count); // Check whether we are at the node limit, and are going to need to allocate a new buffer. if (current == BUFFER_SIZE) { // Check if the queue is full if (q.IsFull()) { // Signal the canWrite event to false q.canWrite.Reset(); // Wait until the canWrite event is signaled q.canWrite.WaitOne(); } // create a new node node = new Node(x, node); current = 1; } else { // If the implementation is correct then the reader will never try to access this // array location while we set it. This is because of the invariant that // if reader and writer are at the same node: // reader.current < node.count // and // writer.current = node.count node.data[current++] = x; // We have to use interlocked, to assure that we incremeent the count // atomicalluy, because the reader could be reading it. Interlocked.Increment(ref node.count); } } /// Pulls data from the queue, returns false only if /// there public bool Read(ref T x) { while (true) { if (current < node.count) { x = node.data[current++]; return true; } else if ((current == BUFFER_SIZE) && (node.next != null)) { // Move the current node to the next one. // We know it is safe to do so. // The old node will have no more references to it it // and will be deleted by the garbage collector. node = node.next; // If there is a writer thread waiting on the Queue, // then release it. // Conceptually there is a "if (q.IsFull)", but we can't place it // because that would lead to a Race condition. q.canWrite.Set(); // point to the first spot current = 0; // One of the invariants is that every node created after the first, // will have at least one item. So the following call is safe x = node.data[current++]; return true; } // If we get here, we have read the most recently added data. // We then check to see if the writer has finished producing data. if (q.completed) return false; // If we get here there is no data waiting, and no flagging of the completed thread. // Wait a millisecond. The system will also context switch. // This will allow the writing thread some additional resources to pump out // more data (especially if it iself is multithreaded) Thread.Sleep(1); } } } /// Returns the number of nodes currently used. private int NodeCount { get { int result = 0; Node cur = null; Interlocked.Exchange(ref cur, remover.node); // Counts all nodes from the remover to the adder // Not efficient, but this is not called often. while (cur != null) { ++result; Interlocked.Exchange(ref cur, cur.next); } return result; } } /// Construct the queue. public SimpleSharedQueue() { Node root = new Node(); adder = new Cursor(this, root); remover = new Cursor(this, root); } /// Indicate to the reader that no more data is going to be written. public void MarkCompleted() { completed = true; } /// Read the next piece of data. Returns false if there is no more data. public bool Read(ref T x) { return remover.Read(ref x); } /// Writes more data. public void Write(T x) { adder.Write(x); } /// Tells us if there are too many nodes, and can't add anymore. private bool IsFull() { return NodeCount == MAX_NODE_COUNT; } } } 

Microsoft Research CHESS dovrebbe dimostrarsi un valido strumento per testare la tua implementazione.

La presenza di Sleep() rende completamente inutile un approccio lock-free. L’unica ragione per affrontare la complessità di un design senza serratura è la necessità di velocità assoluta e di evitare il costo dei semafori. L’uso di Sleep (1) sconfigge totalmente questo scopo.

Dato che non riesco a trovare alcun riferimento al fatto che Interlocked.Exchange legga o scriva blocchi, direi di no. Vorrei anche chiedermi perché vuoi andare senza il lucchetto, dato che raramente offre vantaggi sufficienti per contrastare la sua complessità.

Microsoft ha presentato un’eccellente presentazione al GDC 2009 su questo, e puoi ottenere le diapositive qui .

Attenzione al doppio controllo – schema di blocco singolo (come in un link citato sopra: http://www.yoda.arachsys.com/csharp/singleton.html )

Citando verbatim dal “Modern C ++ Design” di Andrei Alexandrescu

    I programmatori multithread di grande esperienza sanno che anche il modello di blocco a doppio controllo, benché corretto su carta, non è sempre corretto nella pratica. In alcuni ambienti multiprocessore simmetrici (quelli che presentano il cosiddetto modello di memoria rilassata), le scritture sono impegnate nella memoria principale a raffica, piuttosto che una ad una. Le raffiche si verificano in ordine crescente di indirizzi, non in ordine cronologico. A causa di questa riorganizzazione delle scritture, la memoria vista da un processore alla volta potrebbe sembrare che le operazioni non vengano eseguite nell’ordine corretto da un altro processore. Concretamente, l’assegnazione a pInstance_ eseguita da un processore potrebbe verificarsi prima che l’object Singleton sia stato completamente inizializzato! Quindi, purtroppo, il modello di bloccaggio a doppio controllo è noto per essere difettoso per tali sistemi

Sospetto che non sia infallibile: immagina il seguente scenario:

due thread immettono il cursor.Write . cursor.Write . Il primo arriva fino al node = new Node(x, node); riga node = new Node(x, node); nella metà vera if (current == BUFFER_SIZE) (ma assumiamo anche che current == BUFFER_SIZE ) quindi quando 1 viene aggiunto alla current un altro thread in arrivo seguirà l’altro percorso attraverso l’istruzione if. Ora immagina che il thread 1 perda la sua porzione temporale e che il thread 2 lo ottenga, e procede per inserire la dichiarazione if sulla errata convinzione che la condizione sia ancora valida. Dovrebbe essere entrato nell’altro percorso.

Non ho neanche eseguito questo codice, quindi non sono sicuro che le mie ipotesi siano possibili in questo codice, ma se lo sono (cioè inserendo il cursore. Scrittura da più thread quando current == BUFFER_SIZE ), allora potrebbe essere incline agli errori di concorrenza.

Innanzitutto, mi chiedo in merito a questa ipotesi in queste due righe di codice sequenziale:

  node.data[current++] = x; // We have to use interlocked, to assure that we incremeent the count // atomicalluy, because the reader could be reading it. Interlocked.Increment(ref node.count); 

Che dire che il nuovo valore di node.data [] è stato impegnato in questa posizione di memoria? Non è memorizzato in un indirizzo di memoria volatile e quindi può essere memorizzato nella cache se ho capito bene? Questo non porta forse a una lettura “sporca”? Ci possono essere altri posti lo stesso è vero, ma questo si è distinto a colpo d’occhio.

Secondo codice multi-thread che contiene quanto segue:

 Thread.Sleep(int); 

… non è mai un buon segno Se è necessario, il codice è destinato a fallire, se non è richiesto è uno spreco. Spero davvero che rimuovano completamente questa API. Renditi conto che è una richiesta di aspettare almeno quella quantità di tempo. Con il sovraccarico del contesto, il tuo quasi certamente aspetterà più a lungo, molto più a lungo.

In terzo luogo, non comprendo completamente l’uso dell’API Interlock qui. Forse sono stanco e mi manca il punto; ma non riesco a trovare il potenziale conflitto di thread su entrambi i thread di lettura e scrittura sulla stessa variabile? Sembrerebbe che l’unico uso che potrei trovare per lo scambio di interlock sarebbe modificare il contenuto di node.data [] per correggere il # 1 sopra.

Infine, sembrerebbe che l’attuazione sia alquanto complicata. Mi manca il punto dell’intera cosa di Cursor / Node o fondamentalmente sta facendo la stessa cosa di questa class? (Nota: non l’ho ancora provato e non penso che questo sia sicuro anche per il thread, solo cercando di farla franca su ciò che penso che tu stia facendo).

 class ReaderWriterQueue { readonly AutoResetEvent _readComplete; readonly T[] _buffer; readonly int _maxBuffer; int _readerPos, _writerPos; public ReaderWriterQueue(int maxBuffer) { _readComplete = new AutoResetEvent(true); _maxBuffer = maxBuffer; _buffer = new T[_maxBuffer]; _readerPos = _writerPos = 0; } public int Next(int current) { return ++current == _maxBuffer ? 0 : current; } public bool Read(ref T item) { if (_readerPos != _writerPos) { item = _buffer[_readerPos]; _readerPos = Next(_readerPos); return true; } else return false; } public void Write(T item) { int next = Next(_writerPos); while (next == _readerPos) _readComplete.WaitOne(); _buffer[next] = item; _writerPos = next; } } 

Quindi sono del tutto fuori base qui e non riesco a vedere la magia nella class originale?

Devo ammettere una cosa, disprezzo Threading. Ho visto i migliori sviluppatori fallire. Questo articolo fornisce un ottimo esempio di quanto sia difficile ottenere il diritto di threading: http://www.yoda.arachsys.com/csharp/singleton.html