Pregunta Duplicar excepciones con BroadcastBlock en TPL Dataflow


Estoy intentando utilizar TPL Dataflow para crear una canalización. Todo está funcionando bien hasta el momento, con mi canalización definida de la siguiente manera (aunque mi problema es solo con la emisora, submissionSucceeded, submissionFailed):

// Define tasks
var productListingBatchBuffer = new BufferBlock<PostSubmissionState>();
var splitFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SplitFile(s));
var saveFile = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => SaveFile(s));
var postSubmission = new TransformBlock<PostSubmissionState, PostSubmissionState>(s => PostSubmission(s));
var broadcaster = new BroadcastBlock<PostSubmissionState>(state => state);
var submissionSucceeded = new ActionBlock<PostSubmissionState>(s => SubmissionSucceeded(s));
var submissionFailed = new ActionBlock<PostSubmissionState>(s => SubmissionFailed(s));

// Link em up
productListingBatchBuffer.LinkTo(splitFile, new DataflowLinkOptions() { PropagateCompletion = true });
splitFile.LinkTo(saveFile, new DataflowLinkOptions() { PropagateCompletion = true });
saveFile.LinkTo(postSubmission, new DataflowLinkOptions() { PropagateCompletion = true });
postSubmission.LinkTo(broadcaster, new DataflowLinkOptions() { PropagateCompletion = true });
broadcaster.LinkTo(submissionSucceeded, new DataflowLinkOptions() { PropagateCompletion = true }, state => state.PostSucceeded);
broadcaster.LinkTo(submissionFailed, new DataflowLinkOptions() { PropagateCompletion = true }, state => !state.PostSucceeded);

El problema que tengo es con la propagación de Excepciones. Debido a que mi BroadcastBlock propaga su terminación (y por lo tanto cualquier falla) a dos bloques, si se produce una excepción, se propaga a ambos bloques. Así cuando lo hago

Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);

Termino con una excepción agregada que contiene dos excepciones. En este momento, lo mejor que puedo hacer es filtrar estos, es decir:

try
{
    Task.WaitAll(submissionSucceeded.Completion, submissionFailed.Completion);
}
catch (AggregateException ex)
{
    var uniqueExceptions = new AggregateException(ex.Flatten().InnerExceptions.Distinct());
    Console.WriteLine("An exception was thrown.\n{0}", uniqueExceptions.Flatten());
}

pero me pregunto si hay una mejor manera de hacer esto. es decir, si solo se produce una excepción, solo quiero que se haga una excepción. Soy nuevo en Dataflow, así que descubro todas las convenciones.


32
2018-02-03 17:47


origen


Respuestas:


He escrito un ejemplo de TPL DataFlow (https://github.com/squideyes/PodFetch) que tiene un enfoque ligeramente diferente a la finalización y el manejo de errores. Aquí está el código relevante de Line 171 a 201 de Program.cs:

    scraper.LinkTo(fetcher, link => link != null);
    scraper.LinkTo(DataflowBlock.NullTarget<Link>());

    scraper.HandleCompletion(fetcher);

    Status.Info.Log("Fetching APOD's archive list");

    links.ForEach(link => scraper.Post(link));

    scraper.Complete();

    try
    {
        await fetcher.Completion;

        Status.Finished.Log("Fetched: {0:N0}, Skipped: {1:N0}, Errors: {2:N0}, Seconds: {3:N2}",
            fetched, skipped, errored, (DateTime.UtcNow - startedOn).TotalMilliseconds / 1000.0);
    }
    catch (AggregateException errors)
    {
        foreach (var error in errors.InnerExceptions)
            Status.Failure.Log(error.Message);
    }
    catch (TaskCanceledException)
    {
        Status.Cancelled.Log("The process was manually cancelled!");
    }
    catch (Exception error)
    {
        Status.Failure.Log(error.Message);
    }

Como puede ver, conecto un par de bloques TPL y luego me preparo para manejar la finalización usando un método de extensión HandleCompletion:

    public static void HandleCompletion(
        this IDataflowBlock source, params IDataflowBlock[] targets)
    {
        source.Completion.ContinueWith(
            task =>
            {
                foreach (var target in targets)
                {
                    if (task.IsFaulted)
                        target.Fault(task.Exception);
                    else
                        target.Complete();
                }
            });
    }

Muy importante, llamo scraper.Complete () cuando termino de pasar objetos al primer bloque de la cadena. Con eso, el método de extensión HandleCompletion se ocupa luego de la continuación. Y, dado que estoy esperando en el buscador (el último bloque de la cadena en completarse), es fácil detectar los errores resultantes en un try / catch.


1
2017-10-09 19:27