|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#include "libavutil/avassert.h" |
|
#include "libavutil/avstring.h" |
|
#include "libavutil/frame.h" |
|
#include "libavutil/threadmessage.h" |
|
#include "libavutil/thread.h" |
|
|
|
struct sender_data { |
|
int id; |
|
pthread_t tid; |
|
int workload; |
|
AVThreadMessageQueue *queue; |
|
}; |
|
|
|
|
|
struct receiver_data { |
|
pthread_t tid; |
|
int workload; |
|
int id; |
|
AVThreadMessageQueue *queue; |
|
}; |
|
|
|
struct message { |
|
AVFrame *frame; |
|
|
|
|
|
int magic; |
|
}; |
|
|
|
#define MAGIC 0xdeadc0de |
|
|
|
static void free_frame(void *arg) |
|
{ |
|
struct message *msg = arg; |
|
av_assert0(msg->magic == MAGIC); |
|
av_frame_free(&msg->frame); |
|
} |
|
|
|
static void *sender_thread(void *arg) |
|
{ |
|
int i, ret = 0; |
|
struct sender_data *wd = arg; |
|
|
|
av_log(NULL, AV_LOG_INFO, "sender #%d: workload=%d\n", wd->id, wd->workload); |
|
for (i = 0; i < wd->workload; i++) { |
|
if (rand() % wd->workload < wd->workload / 10) { |
|
av_log(NULL, AV_LOG_INFO, "sender #%d: flushing the queue\n", wd->id); |
|
av_thread_message_flush(wd->queue); |
|
} else { |
|
char *val; |
|
AVDictionary *meta = NULL; |
|
struct message msg = { |
|
.magic = MAGIC, |
|
.frame = av_frame_alloc(), |
|
}; |
|
|
|
if (!msg.frame) { |
|
ret = AVERROR(ENOMEM); |
|
break; |
|
} |
|
|
|
|
|
val = av_asprintf("frame %d/%d from sender %d", |
|
i + 1, wd->workload, wd->id); |
|
if (!val) { |
|
av_frame_free(&msg.frame); |
|
ret = AVERROR(ENOMEM); |
|
break; |
|
} |
|
ret = av_dict_set(&meta, "sig", val, AV_DICT_DONT_STRDUP_VAL); |
|
if (ret < 0) { |
|
av_frame_free(&msg.frame); |
|
break; |
|
} |
|
msg.frame->metadata = meta; |
|
|
|
|
|
msg.frame->format = AV_PIX_FMT_RGBA; |
|
msg.frame->width = 320; |
|
msg.frame->height = 240; |
|
ret = av_frame_get_buffer(msg.frame, 0); |
|
if (ret < 0) { |
|
av_frame_free(&msg.frame); |
|
break; |
|
} |
|
|
|
|
|
av_log(NULL, AV_LOG_INFO, "sender #%d: sending my work (%d/%d frame:%p)\n", |
|
wd->id, i + 1, wd->workload, msg.frame); |
|
ret = av_thread_message_queue_send(wd->queue, &msg, 0); |
|
if (ret < 0) { |
|
av_frame_free(&msg.frame); |
|
break; |
|
} |
|
} |
|
} |
|
av_log(NULL, AV_LOG_INFO, "sender #%d: my work is done here (%s)\n", |
|
wd->id, av_err2str(ret)); |
|
av_thread_message_queue_set_err_recv(wd->queue, ret < 0 ? ret : AVERROR_EOF); |
|
return NULL; |
|
} |
|
|
|
static void *receiver_thread(void *arg) |
|
{ |
|
int i, ret = 0; |
|
struct receiver_data *rd = arg; |
|
|
|
for (i = 0; i < rd->workload; i++) { |
|
if (rand() % rd->workload < rd->workload / 10) { |
|
av_log(NULL, AV_LOG_INFO, "receiver #%d: flushing the queue, " |
|
"discarding %d message(s)\n", rd->id, |
|
av_thread_message_queue_nb_elems(rd->queue)); |
|
av_thread_message_flush(rd->queue); |
|
} else { |
|
struct message msg; |
|
AVDictionary *meta; |
|
AVDictionaryEntry *e; |
|
|
|
ret = av_thread_message_queue_recv(rd->queue, &msg, 0); |
|
if (ret < 0) |
|
break; |
|
av_assert0(msg.magic == MAGIC); |
|
meta = msg.frame->metadata; |
|
e = av_dict_get(meta, "sig", NULL, 0); |
|
av_log(NULL, AV_LOG_INFO, "got \"%s\" (%p)\n", e->value, msg.frame); |
|
av_frame_free(&msg.frame); |
|
} |
|
} |
|
|
|
av_log(NULL, AV_LOG_INFO, "consumed enough (%d), stop\n", i); |
|
av_thread_message_queue_set_err_send(rd->queue, ret < 0 ? ret : AVERROR_EOF); |
|
|
|
return NULL; |
|
} |
|
|
|
static int get_workload(int minv, int maxv) |
|
{ |
|
return maxv == minv ? maxv : rand() % (maxv - minv) + minv; |
|
} |
|
|
|
int main(int ac, char **av) |
|
{ |
|
int i, ret = 0; |
|
int max_queue_size; |
|
int nb_senders, sender_min_load, sender_max_load; |
|
int nb_receivers, receiver_min_load, receiver_max_load; |
|
struct sender_data *senders; |
|
struct receiver_data *receivers; |
|
AVThreadMessageQueue *queue = NULL; |
|
|
|
if (ac != 8) { |
|
av_log(NULL, AV_LOG_ERROR, "%s <max_queue_size> " |
|
"<nb_senders> <sender_min_send> <sender_max_send> " |
|
"<nb_receivers> <receiver_min_recv> <receiver_max_recv>\n", av[0]); |
|
return 1; |
|
} |
|
|
|
max_queue_size = atoi(av[1]); |
|
nb_senders = atoi(av[2]); |
|
sender_min_load = atoi(av[3]); |
|
sender_max_load = atoi(av[4]); |
|
nb_receivers = atoi(av[5]); |
|
receiver_min_load = atoi(av[6]); |
|
receiver_max_load = atoi(av[7]); |
|
|
|
if (max_queue_size <= 0 || |
|
nb_senders <= 0 || sender_min_load <= 0 || sender_max_load <= 0 || |
|
nb_receivers <= 0 || receiver_min_load <= 0 || receiver_max_load <= 0) { |
|
av_log(NULL, AV_LOG_ERROR, "negative values not allowed\n"); |
|
return 1; |
|
} |
|
|
|
av_log(NULL, AV_LOG_INFO, "qsize:%d / %d senders sending [%d-%d] / " |
|
"%d receivers receiving [%d-%d]\n", max_queue_size, |
|
nb_senders, sender_min_load, sender_max_load, |
|
nb_receivers, receiver_min_load, receiver_max_load); |
|
|
|
senders = av_calloc(nb_senders, sizeof(*senders)); |
|
receivers = av_calloc(nb_receivers, sizeof(*receivers)); |
|
if (!senders || !receivers) { |
|
ret = AVERROR(ENOMEM); |
|
goto end; |
|
} |
|
|
|
ret = av_thread_message_queue_alloc(&queue, max_queue_size, sizeof(struct message)); |
|
if (ret < 0) |
|
goto end; |
|
|
|
av_thread_message_queue_set_free_func(queue, free_frame); |
|
|
|
#define SPAWN_THREADS(type) do { \ |
|
for (i = 0; i < nb_##type##s; i++) { \ |
|
struct type##_data *td = &type##s[i]; \ |
|
\ |
|
td->id = i; \ |
|
td->queue = queue; \ |
|
td->workload = get_workload(type##_min_load, type##_max_load); \ |
|
\ |
|
ret = pthread_create(&td->tid, NULL, type##_thread, td); \ |
|
if (ret) { \ |
|
const int err = AVERROR(ret); \ |
|
av_log(NULL, AV_LOG_ERROR, "Unable to start " AV_STRINGIFY(type) \ |
|
" thread: %s\n", av_err2str(err)); \ |
|
goto end; \ |
|
} \ |
|
} \ |
|
} while (0) |
|
|
|
#define WAIT_THREADS(type) do { \ |
|
for (i = 0; i < nb_##type##s; i++) { \ |
|
struct type##_data *td = &type##s[i]; \ |
|
\ |
|
ret = pthread_join(td->tid, NULL); \ |
|
if (ret) { \ |
|
const int err = AVERROR(ret); \ |
|
av_log(NULL, AV_LOG_ERROR, "Unable to join " AV_STRINGIFY(type) \ |
|
" thread: %s\n", av_err2str(err)); \ |
|
goto end; \ |
|
} \ |
|
} \ |
|
} while (0) |
|
|
|
SPAWN_THREADS(receiver); |
|
SPAWN_THREADS(sender); |
|
|
|
WAIT_THREADS(sender); |
|
WAIT_THREADS(receiver); |
|
|
|
end: |
|
av_thread_message_queue_free(&queue); |
|
av_freep(&senders); |
|
av_freep(&receivers); |
|
|
|
if (ret < 0 && ret != AVERROR_EOF) { |
|
av_log(NULL, AV_LOG_ERROR, "Error: %s\n", av_err2str(ret)); |
|
return 1; |
|
} |
|
return 0; |
|
} |
|
|