Come può un blocco TPL Dataflow downstream ottenere dati prodotti da una fonte?

Sto elaborando immagini usando TPL Dataflow. Ricevo una richiesta di elaborazione, leggo un’immagine da uno stream, applico diverse trasformazioni, quindi scrivo l’immagine risultante in un altro stream:

Request -> Stream -> Image -> Image ... -> Stream 

Per quello io uso i blocchi:

 BufferBlock TransformBlock TransformBlock TransformBlock TransformBlock ... writerBlock = new ActionBlock 

Il problema è la Request iniziale è ciò che contiene alcuni dati necessari per creare il Stream risultante insieme ad alcune informazioni aggiuntive di cui ho bisogno a quel punto. Devo passare la Request originale (o qualche altro object di contesto) lungo la linea verso il writerBlock su tutti gli altri blocchi come questo:

 TransformBlock<Request,Tuple> TransformBlock<Tuple,Tuple> TransformBlock<Tuple,Tuple> ... 

(che è brutto), o c’è un modo per colbind il primo blocco all’ultimo (o, generalizzando, a quelli che hanno bisogno dei dati aggiuntivi)?

Sì, hai praticamente bisogno di fare ciò che hai descritto, passando i dati aggiuntivi da ogni blocco a quello successivo.

Ma usando un paio di metodi di supporto, puoi renderlo molto più semplice:

 public static IPropagatorBlock> CreateExtendedSource(Func transform) { return new TransformBlock>( input => Tuple.Create(transform(input), input)); } public static IPropagatorBlock, Tuple> CreateExtendedTransform(Func transform) { return new TransformBlock, Tuple>( tuple => Tuple.Create(transform(tuple.Item1), tuple.Item2)); } 

Le firme sembrano scoraggianti, ma in realtà non sono poi così male.

Inoltre, è ansible aggiungere sovraccarichi che passano le opzioni al blocco creato o sovraccarichi che richiedono delegati asincroni.

Ad esempio, se si desidera eseguire alcune operazioni su un numero utilizzando blocchi separati, mentre si passa il numero originale lungo la strada, è ansible fare qualcosa come:

 var source = new BufferBlock(); var divided = CreateExtendedSource(i => i / 2.0); var formatted = CreateExtendedTransform(d => d.ToString("0.0")); var writer = new ActionBlock>(tuple => Console.WriteLine(tuple)); source.LinkTo(divided); divided.LinkTo(formatted); formatted.LinkTo(writer); for (int i = 0; i < 10; i++) source.Post(i); 

Come puoi vedere, i tuoi lambda (tranne l'ultimo) occupano solo il valore "corrente" ( int , double o string , a seconda dello stadio della pipeline), il valore "originale" (sempre int ) viene passato automaticamente . In qualsiasi momento, puoi utilizzare il blocco creato usando il normale costruttore per accedere a entrambi i valori (come l' ActionBlock finale nell'esempio).

(Che BufferBlock non è effettivamente necessario, ma l'ho aggiunto per adattarlo maggiormente al tuo design.)

Potrei andare fuori di testa da quando sto solo iniziando a giocare con TPL Dataflow. Ma credo che tu possa realizzare ciò usando un BroadcastBlock come intermediario tra la tua fonte e il tuo primo objective.

BroadcastBlock può offrire il messaggio a molti bersagli, quindi lo usi per offrire al tuo objective, e anche a un JoinBlock , alla fine che unirà il risultato con il messaggio originale.

 source -> Broadcast ->-----------------------------------------> JoinBlock  -> Transformation1 -> Transformation 'n' -> 

Per esempio:

 var source = new BufferBlock(); var transformation = new TransformBlock(i => i * 100); var broadCast = new BroadcastBlock(null); source.LinkTo(broadCast); broadCast.LinkTo(transformation); var jb = new JoinBlock(); broadCast.LinkTo(jb.Target1); transformation.LinkTo(jb.Target2); jb.LinkTo(new ActionBlock>( c => Console.WriteLine("Source:{0}, Target Result: {1}", c.Item1, c.Item2))); source.Post(1); source.Post(2); source.Complete(); 

i rendimenti …

Fonte: 1, risultato objective: 100

Fonte: 2, Risultato objective: 200

Non sono troppo sicuro di come si comporterebbe in un ambiente asincrono.