C# - Dataflow
- 닷넷프레임워크 4.5에 추가
- NuGet으로 입수 가능 https://www.nuget.org/packages/Microsoft.Tpl.Dataflow
- MSDN에 문서는 있지만 정식 닷넷프레임워크에는 추가 되지 않은 상태
- MSDN: http://msdn.microsoft.com/en-us/library/hh228603.aspx
- 일어 MSDN: http://msdn.microsoft.com/ja-jp/library/vstudio/system.threading.tasks.dataflow.aspx
- 번역기로 돌리면 영어보다 보기 편함
- 비동기 프로그래밍은 쉽지는 않지만 의외로 기존의 동기식 코드를 손쉽게 비동기 코드로 바꿀 수 있는 부분도 있다.
- 이름 공간: System.Threading.Tasks.Dataflow
- 어셈블리: System.Threading.Tasks.Dataflow (System.Threading.Tasks.Dataflow.dll 내)
BufferBlock
- 범용적인 비동기 메시징 구조체를 나타낸다.
- 복수의 소스에서 쓰는 것이 가능하고 도 복수의 타겟에서 읽을 수도 있다.
- FIFO 큐에 메시지를 저장한다.
- 타겟이 메시지를 큐에서 가져가면 그 메시지는 큐에서 삭제된다.
- 복수의 메시지를 다른 컴포넌트에 전달하고, 그 컴포넌트에서 메시지를 수신하고 싶을 때 편리하다.
- 아래 예는 BufferBlock
오브젝트에 Int32 타입의 복수의 값을 post하고 그 오브젝트에서 값을 읽어들인다.
// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
Output: 0 1 2
- TryReceive
- 정기적으로 폴링할 때 사용하면 좋다.
- 이 메소드를 호출한 스레드를 블럭 시키지 않는다.
// Post more messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
int value;
while (bufferBlock.TryReceive(out value))
{
Console.WriteLine(value);
}
Output: 0 1 2
- Post 예제
- post2와 post1은 병렬로 실행하므로 순서 보장은 못함.
- 아래 예제의 task의 완료 순서는 post2, post1, receive
// Write to and read from the message block concurrently.
var post01 = Task.Run(() =>
{
bufferBlock.Post(0);
bufferBlock.Post(1);
});
var receive = Task.Run(() =>
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
});
var post2 = Task.Run(() =>
{
bufferBlock.Post(2);
});
Task.WaitAll(post01, receive, post2);
Sample output: 2 0 1
- 비동기로 쓰고, 읽기
// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++)
{
await bufferBlock.SendAsync(i);
}
// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(await bufferBlock.ReceiveAsync());
}
Output: 0 1 2
- 통합 예제
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
// Demonstrates a how to write to and read from a dataflow block.
class DataflowReadWrite
{
// Demonstrates asynchronous dataflow operations.
static async Task AsyncSendReceive(BufferBlock<int> bufferBlock)
{
// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++)
{
await bufferBlock.SendAsync(i);
}
// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(await bufferBlock.ReceiveAsync());
}
/*Output:
0
1
2
*/
}
static void Main(string[] args)
{
// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
/* Output:
0
1
2
*/
// Post more messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
int value;
while (bufferBlock.TryReceive(out value))
{
Console.WriteLine(value);
}
/* Output:
0
1
2
*/
// Write to and read from the message block concurrently.
var post01 = Task.Run(() =>
{
bufferBlock.Post(0);
bufferBlock.Post(1);
});
var receive = Task.Run(() =>
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
});
var post2 = Task.Run(() =>
{
bufferBlock.Post(2);
});
Task.WaitAll(post01, receive, post2);
// Demonstrate asynchronous dataflow operations.
AsyncSendReceive(bufferBlock).Wait();
}
}
Sample output: 2 0 1
BroadcastBlock
- 다른 컴포넌트에 복수의 메시지를 전달해도 가장 최신의 값만 사용하는 경우에 편리하다.
- 즉 최신 메시지가 이전 메시지를 덮어버린다.
- 메시지를 읽어도 삭제되지 않는다.
// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);
// Post a message to the block.
broadcastBlock.Post(Math.PI);
// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(broadcastBlock.Receive());
}
Output: 3.14159265358979 3.14159265358979 3.14159265358979
WriteOnceBlock
- BroadcastBlock
와 비슷하지만 1회만 WriteOnceBlock 를 기술할 수 있다. - 읽기 전용에 비슷
- 메시지를 읽어도 메시지는 삭제 되지 않는다.
- 복수의 타겟에서 메시지를 복사하여 수신한다.
- 복수의 메시지의 선두만 전달할 때 편리하다.
// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);
// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
() => writeOnceBlock.Post("Message 1"),
() => writeOnceBlock.Post("Message 2"),
() => writeOnceBlock.Post("Message 3"));
// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());
Sample output: Message 2
ActionBlock
- 모든 수신 데이터 요소의 Action
(혹은 Func )로 지정된 델리게이트를 호출하는 데이터 흐름 블럭을 제공 - 아래 예제는 ActionBlock
클래스를 사용하여 데이터 흐름 블럭을 사용해서 복수의 계산을 실행하고, 계산에 소요된 시간을 반환한다.
// Performs several computations by using dataflow and returns the elapsed
// time required to perform the computations.
static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism,
int messageCount)
{
var workerBlock = new ActionBlock<int>(
millisecondsTimeout => Thread.Sleep(millisecondsTimeout),
// 몇개를 병렬로 실행할지 정한다.
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
});
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < messageCount; i++)
{
// 처리할 작업을 요청한다. ActionBlock을 생성할 때 인자로 넣은 함수가 실행된다.
workerBlock.Post(1000);
}
// 더 이상 처리할 작업을 받지 않음을 통보
workerBlock.Complete();
// Completion 멤버는 Task를 반환한다. task가 완료될 때까지 대기한다
workerBlock.Completion.Wait();
// Stop the timer and return the elapsed number of milliseconds.
stopwatch.Stop();
return stopwatch.Elapsed;
}
static void Main(string[] args)
{
int processorCount = Environment.ProcessorCount;
int messageCount = processorCount;
// Print the number of processors on this computer.
Console.WriteLine("Processor count = {0}.", processorCount);
TimeSpan elapsed;
// Perform two dataflow computations and print the elapsed
// time required for each.
// This call specifies a maximum degree of parallelism of 1.
// This causes the dataflow block to process messages serially.
elapsed = TimeDataflowComputations(1, messageCount);
Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
"elapsed time = {2}ms.", 1, messageCount, (int)elapsed.TotalMilliseconds);
// Perform the computations again. This time, specify the number of
// processors as the maximum degree of parallelism. This causes
// multiple messages to be processed in parallel.
elapsed = TimeDataflowComputations(processorCount, messageCount);
Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
"elapsed time = {2}ms.", processorCount, messageCount, (int)elapsed.TotalMilliseconds);
}
ActionBlock 예외 처리
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine("n = {0}", n);
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine("Encountered {0}: {1}",
e.GetType().Name, e.Message);
return true;
});
}
Output: n = 0 n = -1 Encountered ArgumentOutOfRangeException: Specified argument was out of the range of valid values.
- 예외가 발생하면 발생 이후의 요청 작업은 모두 취소 된다.
ActionBlock과 연결되는 task
- ContinueWith를 사용하여 ActionBlock이 끝난 후에 처리할 작업을 지정할 수 있다.
- 예외가 발생하여 중간에 ActionBlock이 끝나더라도 호출된다.
// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine("n = {0}", n);
if (n < 0)
{
throw new ArgumentOutOfRangeException();
}
});
// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
Console.WriteLine("The status of the completion task is '{0}'.",
task.Status);
});
// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();
// Wait for completion in a try/catch block.
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
// If an unhandled exception occurs during dataflow processing, all
// exceptions are propagated through an AggregateException object.
ae.Handle(e =>
{
Console.WriteLine("Encountered {0}: {1}",
e.GetType().Name, e.Message);
return true;
});
}
Output: n = 0 n = -1 The status of the completion task is 'Faulted'. Encountered ArgumentOutOfRangeException: Specified argument was out of the range of valid values.
TransformBlock<TInput, TOutput>
- ActionBlock
와 비슷하지만 소스에서 타겟으로 메시지를 전달한다. - System.Func<TInput, TOutput> 또는 System.Func<TInput, Task>
- System.Func<TInput, TOutput> 을 사용하는 TransformBlock<TInput, TOutput> 오브젝트를 사용하면 각 입력 요소를 처리하는 델리게이트를 반환
- System.Func<TInput, Task
> 을 사용하는 TransformBlock<TInput, TOutput> 오브젝트를 사용하면 각 입력 요소를 처리하는 Task 를 반환 - ActionBlock
과 비슷하게 각 입력 요소를 동기 및 비동기로 조작할 수 있다
// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));
// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);
// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(transformBlock.Receive());
}
Output: 3.16227766016838 4.47213595499958 5.47722557505166
TransformManyBlock<TInput, TOutput>
- TransformBlock과 비슷하지만 1종류의 출력값만 아닌 복수의 출력 값을 저장
- System.Func<TInput, IEnumerable
> 또는 System.Func<TInput, Task<IEnumerable >>
// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
s => s.ToCharArray());
// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");
// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
Console.WriteLine(transformManyBlock.Receive());
}
Output: H e l l o W o r l d
ActionBlock, TransformBlock<TInput, TOutput>, TransformManyBlock<TInput, TOutput> 에 사용하는 델리게이트 타입
| Type | Synchronous Delegate Type | Asynchronous Delegate Type |
|——–|——–|——–|
|ActionBlock
BatchBlock
- 출력 데이터 배열에 배치라고 부르는 입력 데이터 셋을 결합
// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);
// Post several values to the block.
for (int i = 0; i < 13; i++)
{
batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any any remaining
// values as a final batch.
batchBlock.Complete();
// Print the sum of both batches.
Console.WriteLine("The sum of the elements in batch 1 is {0}.",
batchBlock.Receive().Sum());
// 위에서 공간은 10개를 잡았는데 13개를 입력해서 여기서는 위에서 처리하지 못한 나머지 3개를 처리한다
Console.WriteLine("The sum of the elements in batch 2 is {0}.",
batchBlock.Receive().Sum());
Output: The sum of the elements in batch 1 is 45. The sum of the elements in batch 2 is 33.
JoinBlock<T1, T2>
// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();
// Post two values to each target of the join.
joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);
joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);
joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');
// Receive each group of values and apply the operator part
// to the number parts.
for (int i = 0; i < 2; i++)
{
var data = joinBlock.Receive();
switch (data.Item3)
{
case '+':
Console.WriteLine("{0} + {1} = {2}",
data.Item1, data.Item2, data.Item1 + data.Item2);
break;
case '-':
Console.WriteLine("{0} - {1} = {2}",
data.Item1, data.Item2, data.Item1 - data.Item2);
break;
default:
Console.WriteLine("Unknown operator '{0}'.", data.Item3);
break;
}
}
Output: 3 + 5 = 8 6 - 4 = 2
BatchedJoinBlock
// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
if (n < 0)
throw new ArgumentOutOfRangeException();
return n;
};
// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);
// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
try
{
// Post the result of the worker to the
// first target of the block.
batchedJoinBlock.Target1.Post(DoWork(i));
}
catch (ArgumentOutOfRangeException e)
{
// If an error occurred, post the Exception to the
// second target of the block.
batchedJoinBlock.Target2.Post(e);
}
}
// Read the results from the block.
var results = batchedJoinBlock.Receive();
// Print the results to the console.
// Print the results.
foreach (int n in results.Item1)
{
Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
Console.WriteLine(e.Message);
}
Output: 5 6 13 55 0 Specified argument was out of the range of valid values. Specified argument was out of the range of valid values.
Producer-Consumer Dataflow Pattern
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
// Demonstrates a basic producer and consumer pattern that uses dataflow.
class DataflowProducerConsumer
{
// Demonstrates the production end of the producer and consumer pattern.
static void Produce(ITargetBlock<byte[]> target)
{
// Create a Random object to generate random data.
Random rand = new Random();
// In a loop, fill a buffer with random data and
// post the buffer to the target block.
for (int i = 0; i < 100; i++)
{
// Create an array to hold random byte data.
byte[] buffer = new byte[1024];
// Fill the buffer with random bytes.
rand.NextBytes(buffer);
// Post the result to the message block.
target.Post(buffer);
}
// Set the target to the completed state to signal to the consumer
// that no more data will be available.
target.Complete();
}
// Demonstrates the consumption end of the producer and consumer pattern.
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
// Initialize a counter to track the number of bytes that are processed.
int bytesProcessed = 0;
// Read from the source buffer until the source buffer has no
// available output data.
while (await source.OutputAvailableAsync())
{
byte[] data = source.Receive();
// Increment the count of bytes received.
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static void Main(string[] args)
{
// Create a BufferBlock<byte[]> object. This object serves as the
// target block for the producer and the source block for the consumer.
var buffer = new BufferBlock<byte[]>();
// Start the consumer. The Consume method runs asynchronously.
var consumer = ConsumeAsync(buffer);
// Post source data to the dataflow block.
Produce(buffer);
// Wait for the consumer to process all data.
consumer.Wait();
// Print the count of bytes processed to the console.
Console.WriteLine("Processed {0} bytes.", consumer.Result);
}
}
Output: Processed 102400 bytes.
- 신뢰성 높이기
- 복수의 consumer가 있는 경우 TryReceive를 사용하는 것이 좋다. 만약 Receive를 사용하면 다른 consumer가 읽으면 다른 consumer는 블럭이 된다.
// Demonstrates the consumption end of the producer and consumer pattern.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
// Initialize a counter to track the number of bytes that are processed.
int bytesProcessed = 0;
// Read from the source buffer until the source buffer has no
// available output data.
while (await source.OutputAvailableAsync())
{
byte[] data;
while (source.TryReceive(out data))
{
// Increment the count of bytes received.
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
데이터 블럭으로 데이터를 수신 했을 때 지정된 동작 실행
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to provide delegates to exectution dataflow blocks.
class DataflowExecutionBlocks
{
// Computes the number of zero bytes that the provided file
// contains.
static int CountBytes(string path)
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = File.OpenRead(path))
{
int bytesRead = 0;
do
{
bytesRead = fileStream.Read(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
}
static void Main(string[] args)
{
// Create a temporary file on disk.
string tempFile = Path.GetTempFileName();
// Write random data to the temporary file.
using (var fileStream = File.OpenWrite(tempFile))
{
Random rand = new Random();
byte[] buffer = new byte[1024];
for (int i = 0; i < 512; i++)
{
rand.NextBytes(buffer);
fileStream.Write(buffer, 0, buffer.Length);
}
}
// Create an ActionBlock<int> object that prints to the console
// the number of bytes read.
var printResult = new ActionBlock<int>(zeroBytesRead =>
{
Console.WriteLine("{0} contains {1} zero bytes.",
Path.GetFileName(tempFile), zeroBytesRead);
});
// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytes = new TransformBlock<string, int>(
new Func<string, int>(CountBytes));
// Link the TransformBlock<string, int> object to the
// ActionBlock<int> object.
countBytes.LinkTo(printResult);
// Create a continuation task that completes the ActionBlock<int>
// object when the TransformBlock<string, int> finishes.
countBytes.Completion.ContinueWith(delegate { printResult.Complete(); });
// Post the path to the temporary file to the
// TransformBlock<string, int> object.
countBytes.Post(tempFile);
// Requests completion of the TransformBlock<string, int> object.
countBytes.Complete();
// Wait for the ActionBlock<int> object to print the message.
printResult.Completion.Wait();
// Delete the temporary file.
File.Delete(tempFile);
}
}
Sample output: tmp4FBE.tmp contains 2081 zero bytes.
- 신뢰성 높이기
// Asynchronously computes the number of zero bytes that the provided file
// contains.
static async Task<int> CountBytesAsync(string path)
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = new FileStream(
path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
{
int bytesRead = 0;
do
{
// Asynchronously read from the file stream.
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
}
// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytesAsync = new TransformBlock<string, int>(async path =>
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = new FileStream(
path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
{
int bytesRead = 0;
do
{
// Asynchronously read from the file stream.
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
});
Creating a Dataflow Pipeline
- http://msdn.microsoft.com/en-us/library/hh228604.aspx
How to: Unlink Dataflow Blocks
- http://msdn.microsoft.com/en-us/library/hh228600.aspx
How to: Cancel a Dataflow Block
- http://msdn.microsoft.com/en-us/library/hh228611.aspx
Creating a Custom Dataflow Block Type
- http://msdn.microsoft.com/en-us/library/hh228606.aspx
###How to: Use JoinBlock to Read Data From Multiple Sources
- http://msdn.microsoft.com/en-us/library/hh228610.aspx
Dataflow Block 병렬 실행 수 지정
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to specify the maximum degree of parallelism
// when using dataflow.
class Program
{
// Performs several computations by using dataflow and returns the elapsed
// time required to perform the computations.
static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism,
int messageCount)
{
// Create an ActionBlock<int> that performs some work.
var workerBlock = new ActionBlock<int>(
// Simulate work by suspending the current thread.
millisecondsTimeout => Thread.Sleep(millisecondsTimeout),
// Specify a maximum degree of parallelism.
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
});
// Compute the time that it takes for several messages to
// flow through the dataflow block.
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < messageCount; i++)
{
workerBlock.Post(1000);
}
workerBlock.Complete();
// Wait for all messages to propagate through the network.
workerBlock.Completion.Wait();
// Stop the timer and return the elapsed number of milliseconds.
stopwatch.Stop();
return stopwatch.Elapsed;
}
static void Main(string[] args)
{
int processorCount = Environment.ProcessorCount;
int messageCount = processorCount;
// Print the number of processors on this computer.
Console.WriteLine("Processor count = {0}.", processorCount);
TimeSpan elapsed;
// Perform two dataflow computations and print the elapsed
// time required for each.
// This call specifies a maximum degree of parallelism of 1.
// This causes the dataflow block to process messages serially.
elapsed = TimeDataflowComputations(1, messageCount);
Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
"elapsed time = {2}ms.", 1, messageCount, (int)elapsed.TotalMilliseconds);
// Perform the computations again. This time, specify the number of
// processors as the maximum degree of parallelism. This causes
// multiple messages to be processed in parallel.
elapsed = TimeDataflowComputations(processorCount, messageCount);
Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
"elapsed time = {2}ms.", processorCount, messageCount, (int)elapsed.TotalMilliseconds);
}
}
Sample output: Processor count = 4. Degree of parallelism = 1; message count = 4; elapsed time = 4032ms. Degree of parallelism = 4; message count = 4; elapsed time = 1001ms.
How to: Specify a Task Scheduler in a Dataflow Block
- http://msdn.microsoft.com/en-us/library/hh228599.aspx
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;
namespace WriterReadersWinForms
{
public partial class Form1 : Form
{
// Broadcasts values to an ActionBlock<int> object that is associated
// with each check box.
BroadcastBlock<int> broadcaster = new BroadcastBlock<int>(null);
public Form1()
{
InitializeComponent();
// Create an ActionBlock<CheckBox> object that toggles the state
// of CheckBox objects.
// Specifying the current synchronization context enables the
// action to run on the user-interface thread.
var toggleCheckBox = new ActionBlock<CheckBox>(checkBox =>
{
checkBox.Checked = !checkBox.Checked;
},
new ExecutionDataflowBlockOptions
{
TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() // UI 스레드에서 ActionBlock을 실행되도록 설정
});
// Create a ConcurrentExclusiveSchedulerPair object.
// Readers will run on the concurrent part of the scheduler pair.
// The writer will run on the exclusive part of the scheduler pair.
var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
// Create an ActionBlock<int> object for each reader CheckBox object.
// Each ActionBlock<int> object represents an action that can read
// from a resource in parallel to other readers.
// Specifying the concurrent part of the scheduler pair enables the
// reader to run in parallel to other actions that are managed by
// that scheduler.
var readerActions =
from checkBox in new CheckBox[] {checkBox1, checkBox2, checkBox3}
select new ActionBlock<int>(milliseconds =>
{
// Toggle the check box to the checked state.
toggleCheckBox.Post(checkBox);
// Perform the read action. For demonstration, suspend the current
// thread to simulate a lengthy read operation.
Thread.Sleep(milliseconds);
// Toggle the check box to the unchecked state.
toggleCheckBox.Post(checkBox);
},
new ExecutionDataflowBlockOptions
{
TaskScheduler = taskSchedulerPair.ConcurrentScheduler // 현재의 스레드에서 실행되도록
});
// Create an ActionBlock<int> object for the writer CheckBox object.
// This ActionBlock<int> object represents an action that writes to
// a resource, but cannot run in parallel to readers.
// Specifying the exclusive part of the scheduler pair enables the
// writer to run in exclusively with respect to other actions that are
// managed by the scheduler pair.
var writerAction = new ActionBlock<int>(milliseconds =>
{
// Toggle the check box to the checked state.
toggleCheckBox.Post(checkBox4);
// Perform the write action. For demonstration, suspend the current
// thread to simulate a lengthy write operation.
Thread.Sleep(milliseconds);
// Toggle the check box to the unchecked state.
toggleCheckBox.Post(checkBox4);
},
new ExecutionDataflowBlockOptions
{
TaskScheduler = taskSchedulerPair.ExclusiveScheduler // 다른 스레드에서 실행되도록
});
// Link the broadcaster to each reader and writer block.
// The BroadcastBlock<T> class propagates values that it
// receives to all connected targets.
foreach (var readerAction in readerActions)
{
broadcaster.LinkTo(readerAction);
}
broadcaster.LinkTo(writerAction);
// Start the timer.
timer1.Start();
}
// Event handler for the timer.
private void timer1_Tick(object sender, EventArgs e)
{
// Post a value to the broadcaster. The broadcaster
// sends this message to each target.
broadcaster.Post(1000);
}
}
}
테스트: ActionBlock에서 Complete 메소드 호출 영향
- workerBlock.Post 호출과 동시에 스레드에서 작업을 한다.
- 즉 Complete를 늦게 호출하거나 호출하지 않아도 ActionBlock에 정의된 작업은 처리된다.
- 만약 Complete을 호출하지 않으면 workerBlock.Completion.Wait()에 무한 대기 상태에 들어간다.
- 즉 Complete 호출하지 않는다면 Completion.Wait()도 호출하면 안된다.
static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism, int messageCount)
{
var workerBlock = new ActionBlock<int>(
millisecondsTimeout => { Thread.Sleep(millisecondsTimeout);
Console.WriteLine("process workerBlock : {0}.", DateTime.Now.Ticks); },
// 몇개를 병렬로 실행할지 정한다.
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
});
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < messageCount; i++)
{
// 처리할 작업을 요청한다. ActionBlock을 생성할 때 인자로 넣은 함수가 실행된다.
Console.WriteLine("Call workerBlock.Post : {0}.", DateTime.Now.Ticks);
workerBlock.Post(1000);
}
int DelayMillisecond = 2000;
Console.WriteLine("딜레이 {0}밀리세컨드 : {1}.", DelayMillisecond, DateTime.Now.Ticks);
Thread.Sleep(DelayMillisecond);
// 더 이상 처리할 작업을 받지 않음을 통보
Console.WriteLine("Call workerBlock.Complete : {0}.", DateTime.Now.Ticks);
workerBlock.Complete();
// Completion 멤버는 Task를 반환한다. task가 완료될 때까지 대기한다
Console.WriteLine("Call workerBlock.Completion : {0}.", DateTime.Now.Ticks);
workerBlock.Completion.Wait();
// Stop the timer and return the elapsed number of milliseconds.
stopwatch.Stop();
return stopwatch.Elapsed;
}
static void Main(string[] args)
{
int processorCount = Environment.ProcessorCount;
int messageCount = processorCount;
Console.WriteLine("Processor count = {0}.", processorCount);
TimeSpan elapsed;
elapsed = TimeDataflowComputations(1, messageCount);
Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
"elapsed time = {2}ms.", 1, messageCount, (int)elapsed.TotalMilliseconds);
}
- 만약 Complete() 호출 후 또 Post 호출을 한다면?
- 다시 Post()를 하면 Post 호출 결과가 실패가 되면서 정의된 작업이 처리 되지 않는다.
static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism, int messageCount)
{
var workerBlock = new ActionBlock<int>(
millisecondsTimeout => { Thread.Sleep(millisecondsTimeout);
Console.WriteLine("process workerBlock : {0}.", DateTime.Now.Ticks); },
// 몇개를 병렬로 실행할지 정한다.
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
});
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < messageCount; i++)
{
var bResult = workerBlock.Post(1000);
Console.WriteLine("Call workerBlock.Post : {0}, Result:{1}", DateTime.Now.Ticks, bResult);
}
int DelayMillisecond = 2000;
Console.WriteLine("딜레이 {0}밀리세컨드 : {1}.", DelayMillisecond, DateTime.Now.Ticks);
Thread.Sleep(DelayMillisecond);
// 더 이상 처리할 작업을 받지 않음을 통보
Console.WriteLine("Call workerBlock.Complete : {0}.", DateTime.Now.Ticks);
workerBlock.Complete();
// Completion 멤버는 Task를 반환한다. task가 완료될 때까지 대기한다
Console.WriteLine("Call workerBlock.Completion : {0}.", DateTime.Now.Ticks);
workerBlock.Completion.Wait();
Console.WriteLine("End workerBlock.Completion : {0}.", DateTime.Now.Ticks);
// 다시 한번 더
Console.WriteLine("");
Console.WriteLine("다시 한번 더 !!! : {0}.", DateTime.Now.Ticks);
for (int i = 0; i < messageCount; i++)
{
var bResult = workerBlock.Post(1000);
Console.WriteLine("Call workerBlock.Post : {0}, Result:{1}", DateTime.Now.Ticks, bResult);
}
DelayMillisecond = 2000;
Console.WriteLine("딜레이 {0}밀리세컨드 : {1}.", DelayMillisecond, DateTime.Now.Ticks);
Thread.Sleep(DelayMillisecond);
// 더 이상 처리할 작업을 받지 않음을 통보
Console.WriteLine("Call workerBlock.Complete : {0}.", DateTime.Now.Ticks);
workerBlock.Complete();
// Completion 멤버는 Task를 반환한다. task가 완료될 때까지 대기한다
Console.WriteLine("Call workerBlock.Completion : {0}.", DateTime.Now.Ticks);
workerBlock.Completion.Wait();
Console.WriteLine("End workerBlock.Completion : {0}.", DateTime.Now.Ticks);
stopwatch.Stop();
return stopwatch.Elapsed;
}
static void Main(string[] args)
{
int processorCount = Environment.ProcessorCount;
int messageCount = processorCount;
Console.WriteLine("Processor count = {0}.", processorCount);
TimeSpan elapsed;
elapsed = TimeDataflowComputations(1, messageCount);
Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
"elapsed time = {2}ms.", 1, messageCount, (int)elapsed.TotalMilliseconds);
}
TPL Dataflow 샘플
http://qiita.com/st450/items/7ff6a7fc18213175d215
TPL Dataflow - Post와 SendAsync의 차이
http://qiita.com/Temarin_PITA/items/a5c939c924f7dcf15054
TPL Dataflow - Greedy의 동작
http://qiita.com/Temarin_PITA/items/10f3f47761d08348f8e2
TPL DataFlow를 사용하여 처리의 진행 상황을 표시하기
http://qiita.com/kiichi54321/items/f2cb852b89c8f79b1fb0
TPL Dataflow를 사용하여 복수의 스레드에서의 결과를 하나의 파일에 쓰기
http://qiita.com/kiichi54321/items/f0f427055b3b55427a23
이 글은 2019-03-11에 작성되었습니다.