Design a programming solution to the bounded buffer problem using the producer and consumer processes
Producer–Consumer Problem
In this project, you will design a programming solution to the bounded-buffer problem using the producer and consumer processes . For this project, you will use standard counting semaphores for
empty and full and a mutex lock, rather than a binary semaphore, to represent
mutex. Theproducer andconsumer—running as separate threads—willmove
items to and froma buffer that is synchronizedwith the empty, full, andmutex
structures. You can solve this problem using either Pthreads or the Windows
API.
The Buffer
Internally, the buffer will consist of a fixed-size array of type buffer item
(which will be defined using a typedef). The array of buffer item objects
will be manipulated as a circular queue. The definition of buffer item, along
with the size of the buffer, can be stored in a header file such as the following:
/* buffer.h */
typedef int buffer item;
#define BUFFER SIZE 5
The buffer will be manipulated with two functions, insert item() and
remove item(), which are called by the producer and consumer threads,
respectively. A skeleton outlining these functions appears in Figure 5.24.
The insert item() and remove item() functions will synchronize the
producer and consumer using the algorithms outlined in Figures 5.9 and
5.10. The buffer will also require an initialization function that initializes the
mutual-exclusion object mutex along with the empty and full semaphores.
The main() function will initialize the buffer and create the separate
producer and consumer threads. Once it has created the producer and
consumer threads, the main() function will sleep for a period of time and,
upon awakening, will terminate the application. The main() function will be
passed three parameters on the command line:
1. How long to sleep before terminating
2. The number of producer threads
3. The number of consumer threads
#include “buffer.h”
int main(int argc, char *argv[]) { /* 1. Get command line arguments argv[1],argv[2],argv[3] */
/* 2. Initialize buffer */
/* 3. Create producer thread(s) */
/* 4. Create consumer thread(s) */
/* 5. Sleep */
/* 6. Exit */
}
The Producer and Consumer Threads
The producer thread will alternate between sleeping for a random period of
time and inserting a random integer into the buffer. Random numbers will
be produced using the rand() function, which produces random integers
between 0 and RAND MAX. The consumerwill also sleep for a randomperiod
of time and, upon awakening, will attempt to remove an item from the buffer.
You can solve this problem using either Pthreads or the
Windows API. In the following sections,we supplymore information on each
of these choices.
Pthreads Thread Creation and Synchronization
Refer to those sections for specific instructions on Pthreads thread creation and
synchronization.
Windows
Windows Mutex Locks
Mutex locks are a type of dispatcher object. The
following illustrates how to create a mutex lock using the CreateMutex()
function:
#include <windows.h>
HANDLE Mutex;
Mutex = CreateMutex(NULL, FALSE, NULL);
#include <stdlib.h> /* required for rand() */
#include “buffer.h”
void *producer(void *param) { buffer item item;
while (true) { /* sleep for a random period of time */
sleep(…);
/* generate a random number */
item = rand();
if (insert item(item))
fprintf(“report error condition”);
else
printf(“producer produced %d\n”,item);
}
void *consumer(void *param) { buffer item item;
while (true) { /* sleep for a random period of time */
sleep(…);
if (remove item(&item))
fprintf(“report error condition”);
else
printf(“consumer consumed %d\n”,item);
}
The first parameter refers to a security attribute for the mutex lock. By setting
this attribute to NULL, we disallow any children of the process creating this
mutex lock to inherit the handle of the lock. The second parameter indicates
whether the creator of the mutex lock is the lock’s initial owner. Passing a value
of FALSE indicates that the thread creating the mutex is not the initial owner.
(We shall soon see how mutex locks are acquired.) The third parameter allows
us to name the mutex. However, because we provide a value of NULL, we do
not name the mutex. If successful, CreateMutex() returns a HANDLE to the
mutex lock; otherwise, it returns NULL.
We identified dispatcher objects as being either signaled or
nonsignaled. A signaled dispatcher object (such as a mutex lock) is available
for ownership. Once it is acquired, itmoves to the nonsignaled state.When it
is released, it returns to signaled.
Mutex locks are acquired by invoking the WaitForSingleObject() function.
The function is passed the HANDLE to the lock along with a flag indicating
how long to wait. The following code demonstrates how the mutex lock created
above can be acquired:
WaitForSingleObject(Mutex, INFINITE);
The parameter value INFINITE indicates that we will wait an infinite amount
of time for the lock to become available. Other values could be used that would
allow the calling thread to time out if the lock did not become available within
a specified time. If the lock is in a signaled state, WaitForSingleObject()
returns immediately, and the lock becomes nonsignaled. A lock is released
(moves to the signaled state) by invoking ReleaseMutex()—for example, as
follows:
ReleaseMutex(Mutex);
Windows Semaphores
Semaphores in the Windows API are dispatcher objects and thus use the same
signaling mechanism as mutex locks. Semaphores are created as follows:
#include <windows.h>
HANDLE Sem;
Sem = CreateSemaphore(NULL, 1, 5, NULL);
The first and last parameters identify a security attribute and a name for the
semaphore, similar to what we described for mutex locks. The second and third
parameters indicate the initial value and maximum value of the semaphore. In
this instance, the initial value of the semaphore is 1, and its maximum value
is 5. If successful, CreateSemaphore() returns a HANDLE to the mutex lock;
otherwise, it returns NULL.
Semaphores are acquired with the same WaitForSingleObject() function
as mutex locks.We acquire the semaphore Sem created in this example by
using the following statement:
WaitForSingleObject(Semaphore, INFINITE);
If the value of the semaphore is > 0, the semaphore is in the signaled state
and thus is acquired by the calling thread. Otherwise, the calling thread blocks
indefinitely—as we are specifying INFINITE—until the semaphore returns to
the signaled state.
The equivalent of the signal() operation forWindows semaphores is the
ReleaseSemaphore() function. This function is passed three parameters:
1. The HANDLE of the semaphore
2. How much to increase the value of the semaphore
3. A pointer to the previous value of the semaphore
We can use the following statement to increase Sem by 1:
ReleaseSemaphore(Sem, 1, NULL);
Both ReleaseSemaphore() and ReleaseMutex() return a nonzero value if
successful and 0 otherwise.
Answer:
POSIX
buffer.c
Download this file
buffer.c
1 file(s) 2.62 KB
Not a member!
Create a FREE account here to get access and download this file
buffer.h
/* buffer.h*/ typedef int buffer_item; #define BUFFER_SIZE 5 int insert_item(buffer_item item); int remove_item(buffer_item *item);
win32
producerConsumerWin.c
/*This program creates a producer and consumer</span> <span style="color: #000000; font-family: verdana, geneva, sans-serif;">thread.*/ #include <stdio.h> #include <stdlib.h> #include <windows.h> #include "buffer.h" buffer_item buffer[BUFFER_SIZE]; HANDLE Mutex; HANDLE SemEmpty, SemFull; int insertPosition = 0, removePosition = 0, count =0; // define the insert and remove methods int insert_item(buffer_item); int remove_item(buffer_item *); // the code the producer thread runs DWORD WINAPI Producer(LPVOID Param) { buffer_item randomproduced; while (TRUE) { Sleep(2000); randomproduced = rand(); if (insert_item(randomproduced)) { printf("could not insert item"); } } return 0; } // the code the consumer thread runs DWORD WINAPI Consumer(LPVOID Param) { buffer_item random; while (TRUE) { Sleep(2000); if (remove_item(&random)) { printf("could not remove item"); } else { } } return 0; } int main(int argc, char *argv[]) { int SleepTime, ProducerThreads, ConsumerThreads, i; DWORD ThreadId; HANDLE ConsumerThreadHandles [100], ProducerThreadHandles [100], CreatedThread; Mutex = CreateMutex(NULL, FALSE, NULL); SemFull = CreateSemaphore(NULL, 5, 5, NULL); SemEmpty = CreateSemaphore(NULL, 0, 5, NULL); //check for correct number of parameters if (argc != 4) { printf("must pass three positive integer parameters"); return -1; } SleepTime = atoi(argv[1]); ProducerThreads = atoi(argv[2]); ConsumerThreads = atoi(argv[3]); // check for valid parameters if (SleepTime < 0 || ProducerThreads < 0 || ConsumerThreads < 0) { printf("must pass three positive integer parameters"); return -1; } printf("\n"); // create the producer threads and insert into array for (i = 0; i < ProducerThreads; i++) { CreatedThread = CreateThread(NULL, 0, Producer, NULL, 0, &ThreadId); ProducerThreadHandles[i] = CreatedThread; } // create the consumer threads and insert into array for (i = 0; i < ConsumerThreads; i++) { CreatedThread = CreateThread(NULL, 0, Consumer, NULL, 0, &ThreadId); ConsumerThreadHandles[i] = CreatedThread; } // sleep and allow the program to run Sleep(SleepTime); // join on threads WaitForMultipleObjects(ProducerThreads, ProducerThreadHandles, TRUE, 0); WaitForMultipleObjects(ConsumerThreads, ConsumerThreadHandles, TRUE, 0); CloseHandle(ProducerThreadHandles); CloseHandle(ConsumerThreadHandles); return 0; } int insert_item(buffer_item item) { WaitForSingleObject(SemFull, INFINITE); WaitForSingleObject(Mutex, INFINITE); //insert the item ++count; printf("producer produced %d count = %d\n", item, count); buffer[insertPosition] = item; ++insertPosition; if (insertPosition >= 5) { insertPosition = 0; } else if (insertPosition < 0 || count > 5) { // print out error and return -1 printf("error removing item!!!"); return -1; } //release the mutex and semempty to allow remove to run ReleaseMutex(Mutex); ReleaseSemaphore(SemEmpty, 1, NULL); return 0; } int remove_item(buffer_item *item) { WaitForSingleObject(SemEmpty, INFINITE); WaitForSingleObject(Mutex, INFINITE); // remove the item --count; item[0] = buffer[removePosition]; printf(" consumer consumed %d count = %d\n", item[0], count); ++removePosition; if (removePosition >= 5) { removePosition = 0; } else if (removePosition < 0 || count < 0) { // print out error and return -1 printf("error removing item!!!"); return -1; } // release the mutex and semfull to allow insert to run ReleaseMutex(Mutex); ReleaseSemaphore(SemFull, 1, NULL); return 0; }
buffer_1.h
typedef int buffer_item; #define BUFFER_SIZE 5
Leave a reply