close

Вход

Забыли?

вход по аккаунту

?

Презентация

код для вставкиСкачать
Сошников Дмитрий Валерьевич
к.ф.-м.н., доцент
dmitryso@microsoft.com
Факультет инноваций и высоких технологий
Московский физико-технический институт
Асинхронные и параллельные
вычисления
2
©2008 Сошников Д.В.
Асинхронные вычисления – вычисления,
которые могут запускаться параллельно с
другими процессами
Асинхронный ввод-вывод
По окончании операции вызывается некоторая
функция
Параллельные вычисления
3
.NET Parallel Extensions /
.NET 4.0
Декларативное
программирование
• Locks, Semaphores, …, MPI
• Parallel.For, …
• Применимо только к
независимым участкам кода
• Parallel LINQ
CCR (Concurrency
Coordination Runtime)
Транзакционная память
©2008 Сошников Д.В.
Программирование
«вручную»
Asynchronous
Workflows
4
©2008 Сошников Д.В.
let task1 = async { return 10+10 }
let task2 = async { return 20+20 }
Async.Run (Async.Parallel [ task1; task2 ])
let map' func items =
let tasks =
seq {
for i in items -> async { return (func i) }
}
Async.Run (Async.Parallel tasks)
;;
List.map (fun x -> fib(x)) [1..30];;
map' (fun x -> fib(x)) [1..30];;
5
©2008 Сошников Д.В.
let rec ffor op f (a: float) (b: float) (h: float) =
if a>=b then f b
else op (f a) (ffor op f (a+h) b h);;
let mutable nsteps = 10000.0;;
let integrate f a b =
let h = (b-a)/nsteps in
ffor (+) (fun x -> h*f(x)) a b h;;
let pintegrate f a b =
let c = (b-a)/2.0 in
let t = Async.Run(Async.Parallel2(
async { return integrate f a c },
async { return integrate f c b })) in
fst t + snd t;;
6
©2008 Сошников Д.В.
let searches =
["Live.com", "http://live.com/";
"Yandex", "http://www.yandex.ru/";
"Google", "http://google.ru"]
let fetchAsync(nm,url:string) =
async {
do printfn "Creating request for %s..." nm
let req = WebRequest.Create(url)
let! resp = req.GetResponseAsync()
do printfn "Getting response stream for %s..." nm
let stream = resp.GetResponseStream()
do printfn "Reading response for %s..." nm
let reader = new StreamReader(stream)
let! html = reader.ReadToEndAsync()
do printfn "Read %d characters for %s..." html.Length nm
};;
for nm,url in searches do Async.Spawn (fetchAsync(nm,url));;
7
public static void ReadInImageCallback(IAsyncResult asyncResult)
public class BulkImageProcAsync
{
{
ImageStateObject state = (ImageStateObject)asyncResult.AsyncState;
public const String ImageBaseName = "tmpImage-";
public static void ProcessImagesInBulk()
Stream stream = state.fs;
public const int numImages = 200;
{
int bytesRead = stream.EndRead(asyncResult);
public const int numPixels = 512 * 512;
Console.WriteLine("Processing images... ");
if (bytesRead != numPixels)
long t0 = Environment.TickCount;
throw new Exception(String.Format
// ProcessImage has a simple O(N) loop, and you can vary
the
number
= numImages;
("In ReadInImageCallback, got the wrong number of "NumImagesToFinish
+
// of times you repeat that loop to make the application
more
CPUAsyncCallback readImageCallback = new
"bytes
from
the image: {0}.", bytesRead));
// bound or more IO-bound.
AsyncCallback(ReadInImageCallback);
ProcessImage(state.pixels, state.imageNum);
public static int processImageRepeats = 20; stream.Close();
for (int i = 0; i < numImages; i++)
{
// Threads must decrement NumImagesToFinish, //
andNow
protect
ImageStateObject state = new ImageStateObject();
write out the image.
// their access to it through a mutex.
// Using asynchronous I/O here appears not to be best practice.state.pixels = new byte[numPixels];
public static int NumImagesToFinish = numImages;
// It ends up swamping the threadpool, because the threadpool state.imageNum = i;
public static Object[] NumImagesMutex = new Object[0];
// threads are blocked on I/O requests that were just queued to// Very large items are read only once, so you can make the
// WaitObject is signalled when all image processing
is done.
// buffer on the FileStream very small to save memory.
// the threadpool.
public static Object[] WaitObject = new Object[0];
FileStream fs = new FileStream(ImageBaseName + state.imageNum +FileStream fs = new FileStream(ImageBaseName + i + ".tmp",
public class ImageStateObject
FileMode.Open, FileAccess.Read, FileShare.Read, 1, true);
".done", FileMode.Create, FileAccess.Write, FileShare.None,
{
state.fs = fs;
4096, false);
public byte[] pixels;
fs.BeginRead(state.pixels, 0, numPixels, readImageCallback,
fs.Write(state.pixels, 0, numPixels);
public int imageNum;
state);
fs.Close();
public FileStream fs;
}
}
// This application model uses too much memory.
// Determine whether all images are done being processed.
// Releasing memory as soon as possible is a good idea,
// If not, block until all are finished.
// especially global state.
bool mustBlock = false;
state.pixels = null;
lock (NumImagesMutex)
fs = null;
{
// Record that an image is finished now.
if (NumImagesToFinish > 0)
lock (NumImagesMutex)
mustBlock = true;
{
}
NumImagesToFinish--;
if (mustBlock)
let ProcessImageAsync () =
if (NumImagesToFinish == 0)
{
{
async { let inStream = File.OpenRead(sprintf
"Image%d.tmp" i)
Console.WriteLine("All worker threads are queued. " +
Monitor.Enter(WaitObject);
let! pixels
= inStream.ReadAsync(numPixels)
" Blocking until they complete. numLeft: {0}",
Monitor.Pulse(WaitObject);
let pixels'
= TransformImage(pixels,i)
NumImagesToFinish);
Monitor.Exit(WaitObject);
Monitor.Enter(WaitObject);
}
let outStream = File.OpenWrite(sprintf
"Image%d.done" i)
Monitor.Wait(WaitObject);
}
do! outStream.WriteAsync(pixels')
Monitor.Exit(WaitObject);
}
do
Console.WriteLine "done!" }
}
long t1 = Environment.TickCount;
Console.WriteLine("Total time processing images: {0}ms",
let ProcessImagesAsyncWorkflow() =
(t1 - t0));
Async.Run (Async.Parallel
}
[ for i in 1 .. numImages -> ProcessImageAsync i ])
}
8
©2008 Сошников Д.В.
using System;
using System.IO;
using System.Threading;
9
Документ
Категория
Презентации
Просмотров
17
Размер файла
3 336 Кб
Теги
1/--страниц
Пожаловаться на содержимое документа