Analogue of Queue.Peek () per BlockingCollection quando si ascolta il consumo di IEnumerable

Sto usando l’implementazione del pattern Pipelines per separare i messaggi dai consumatori di un produttore per evitare problemi di consumo lento.

In caso di qualsiasi eccezione in una fase di elaborazione dei messaggi [1] essa andrà persa e non verrà inviata a un altro servizio / livello [2] . Come posso gestire tale problema in [3] modo che il messaggio non vada perso e che cosa sia importante! l’ordine dei messaggi non sarà confuso, quindi il servizio / livello superiore riceverà i messaggi nell’ordine in cui sono entrati. Ho un’idea che riguarda un’altra Queue intermedia ma sembra complessa? Sfortunatamente BlockingCollection non espone alcun analogo del metodo Queue.Peek() modo che io possa solo leggere il prossimo messaggio disponibile e in caso di elaborazione avvenuta con successo Dequeue()

 private BlockingCollection messagesQueue; // TPL Task does following: // Listen to new messages and as soon as any comes in - process it foreach (var cachedMessage in messagesQueue.GetConsumingEnumerable(cancellation)) { const int maxRetries = 3; int retriesCounter = 0; bool isSent = false; // On this point a message already is removed from messagesQueue while (!isSent && retriesCounter++ <= maxRetries) { try { // [1] Preprocess a message // [2] Dispatch to an other service/layer clientProxyCallback.SendMessage(cachedMessage); isSent = true; } catch(Exception exception) { // [3] // logging if (!isSent && retriesCounter < maxRetries) { Thread.Sleep(NSeconds); } } if (!isSent && retriesCounter == maxRetries) { // just log, message is lost on this stage! } } } 

EDIT : Dimenticato di dire che questo è un servizio WCF ospitato da IIS che invia i messaggi al proxy WCF del client Silverlight tramite il contratto di callback del client.

EDIT2: Di seguito è come lo farei usando Peek() , mi manca qualcosa?

 bool successfullySent = true; try { var item = queue.Peek(); PreProcessItem(item); SendItem(item); } catch(Exception exception) { successfullySent = false; } finally { if (successfullySent) { // just remove already sent item from the queue queue.Dequeue(); } } 

EDIT3: Sicuramente posso usare l’approccio vecchio stile usando while loop, bool flag, Queue e AutoResetEvent , ma mi chiedo solo se lo stesso è ansible usando BlockingCollection e GetConsumingEnumerable() Penso che una funzione come Peek sarebbe molto utile quando si usa insieme al consumo enumerabile , poiché altrimenti tutti gli esempi di implementazione del modello Pipeline, nuove cose come BlockingCollection e GetConsumingEnumerable() non sembrano durevoli e devo tornare al vecchio approccio.

Dovresti considerare la coda intermedia.

BlockingCollection non può “sbirciare” gli oggetti a causa della sua natura – ci può essere più di un consumatore. Uno di loro può sbirciare un object, e un altro può prenderlo – quindi, il primo proverà a prendere l’object, che è già stato preso.

Come dice Dennis nel suo commento, BlockingCollection fornisce un wrapper di blocco a qualsiasi implementor dell’interfaccia IProducerConsumerCollection .

Come puoi vedere, IProducerConsumerCollection , in base alla progettazione, non definisce un Peek o altri metodi necessari per implementare uno. Ciò significa che BlockingCollection non può, così com’è, offrire un’analisi analoga a Peek .

Se si considera, questo riduce notevolmente i problemi di concorrenza creati dal commercio di utilità di un’implementazione Peek . Come si può consumare senza consumare? Per Peek contemporaneamente dovresti bloccare il capo della raccolta fino a quando l’operazione Peek stata completata, che io e i designer di BlockingCollection visto come non ottimale. Penso che sarebbe anche disordinato e difficile da implementare, richiedendo una sorta di contesto di sbocco usa e getta.

Se consumi un messaggio e il suo consumo fallisce, dovrai gestirlo. È ansible aggiungerlo a un’altra coda di errori, aggiungerlo nuovamente alla normale coda di elaborazione per un nuovo tentativo o semplicemente registrare il suo errore per i posteri o, altre azioni appropriate al contesto.

Se non si desidera consumare i messaggi contemporaneamente, non è necessario utilizzare BlockingCollection poiché non è necessario il consumo simultaneo. È ansible utilizzare ConcurrentQueue direttamente, si otterrà comunque la sincronicità su off e si può utilizzare TryPeek modo sicuro poiché si controlla un singolo utente. Se il consumo fallisce, potresti interrompere il consumo con un ciclo infinito di tentativi nel tuo desiderio, anche se, suggerisco, ciò richiede un pensiero progettuale.

BlockingCollection è un wrapper attorno a IProducerConsumerCollection , che è più generico di eg ConcurrentQueue e offre IProducerConsumerCollection la libertà di non dover implementare un metodo (Try)Peek .

Tuttavia, puoi sempre chiamare TryPeek sulla coda sottostante:

 ConcurrentQueue useOnlyForPeeking = new ConcurrentQueue(); BlockingCollection blockingCollection = new BlockingCollection(useOnlyForPeeking); ... useOnlyForPeeking.TryPeek(...) 

Si noti tuttavia che non è necessario modificare la coda tramite useOnlyForPeeking , altrimenti il useOnlyForPeeking verrà confuso e potrebbe generare InvalidOperationException , ma sarei sorpreso se chiamare TryPeek non modificante su questa struttura di dati simultanea sia un problema.

È ansible utilizzare ConcurrentQueue , invece, ha il metodo TryDequeue() .

ConcurrentQueue.TryDequeue(out T result) tenta di rimuovere e restituire l’object all’inizio della coda simultanea, restituisce true se un elemento è stato rimosso e restituito correttamente dall’inizio di ConcurrentQueue.

Quindi, non c’è bisogno di controllare prima una Peek.

TryDequeue() è thread-safe:

ConcurrentQueue gestisce internamente tutte le sincronizzazioni . Se due thread chiamano TryDequeue (T) esattamente nello stesso momento, nessuna operazione viene bloccata.

Per quanto ho capito , restituisce false solo se la coda è vuota :

Se la coda è stata popolata con codice come q.Enqueue (“a”); q.Enqueue ( “b”); q.Enqueue ( “c”); e due thread tentano contemporaneamente di deselezionare un elemento, un thread deselezionerà a e l’altro thread verrà rimosso dalla coda b. Entrambe le chiamate a TryDequeue (T) restituiranno true, poiché entrambi erano in grado di deselezionare un elemento. Se ogni thread ritorna a deselezionare un elemento aggiuntivo, uno dei thread rimuoverà c e restituirà true, mentre l’altro thread troverà la coda vuota e restituirà false.

http://msdn.microsoft.com/en-us/library/dd287208%28v=vs.100%29.aspx

AGGIORNARE

Forse, l’opzione più semplice sarebbe utilizzare la class TaskScheduler . Con esso è ansible avvolgere tutte le attività di elaborazione negli elementi della coda e semplificare l’implementazione della sincronizzazione.