What’s Mediastreamer2. Ticker load management

Ticker load is calculated as the ratio of the time spent processing all filters of the graph connected to this ticker to the interval between ticks. By default, the interval is 10ms, but if you change the sampling rate, then the calculation is performe…


This content originally appeared on Level Up Coding - Medium and was authored by Igor Plastov

Ticker load is calculated as the ratio of the time spent processing all filters of the graph connected to this ticker to the interval between ticks. By default, the interval is 10ms, but if you change the sampling rate, then the calculation is performed for the value you set. The calculation is done according to the information accumulated by the ticker during operation. Mediastreamer provides a function that returns the average over several measurements, expressed as a percentage:

MS2_PUBLIC float ms_ticker_get_average_load(MSTicker *ticker);

If the returned value is close to 100%, this means that this ticker is barely able to keep up with its work before the next tick starts. If we have an application that does not need to work in real time (for example, it just writes sound to a file), then it doesn’t really matter for us how long the next ticker call was postponed. But in a real-time application, processing delay can affect the moment the RTP-packet is sent, which in turn can affect the quality of sound or video. In some cases, the effect of the delay of individual packets can be canceled by using a packet buffer at the receiving end (the so-called jitter buffer). In this case, your sound will be played without defects, but with a delay proportional to the buffer length. This can be unacceptable in cases where the audio signal is used to control real-time processes.

You should start taking action when the ticker load has crossed the 80% limit. With such a load, the ticker starts processing with a lag on certain clock cycles. Ticker lag is the time by which the next ticker launch was postponed. If the ticker lag has exceeded a certain value, then an event is generated MSTickerLateEvent:

Listing 7.1: Structure MSTickerLateEvent

struct _MSTickerLateEvent{
int lateMs; /* The last time delay, in milliseconds. */
uint64_t time; /* The time the event occurred, in milliseconds. */
int current_late_ms; /* Delay at the current tick, in milliseconds. */
};
typedef struct _MSTickerLateEvent MSTickerLateEvent;

It displays a message to the console that looks something like this:

ortp-warning-MSTicker: We are late of 164 miliseconds

Using the function

void ms_ticker_get_last_late_tick(MSTicker *ticker, MSTickerLateEvent *ev);

details of the last such event can be found.

7.1 Ways to
reduce the load

Here we have two options. The first is to change the priority of the ticker, the second is to transfer part of the ticker’s job to another thread. Let’s consider these options.

7.1.1 Changing the priority

Ticker priority has three grades, which are defined in the enumeration
MSTickerPrio:

Listing 7.2: Enumeration MSTickerPrio

enum _MSTickerPrio{
MS_TICKER_PRIO_NORMAL, /* The priority corresponding to the default value for the given OS. */
MS_TICKER_PRIO_HIGH, /* The increased priority is set under Linux/MacOS using setpriority() or sched_setschedparams(), the SCHED_RR policy is set. */
MS_TICKER_PRIO_REALTIME /* The highest priority, for it under Linux the SCHED_FIFO policy is used. */
};
typedef enum _MSTickerPrio MSTickerPrio;

To experiment with ticker loading, we need a circuit that will ramp up the load during operation and shut down when the load reaches 99%. We will use the following scheme as a load:The load will increase by adding a new signal level control (filter type MS_VOLUME) between dtmfgen and voidsink, with a gain not equal to 1, so that the filter does not lazy. It is shown in the figure 7.1.

ticker -> voidsource -> dtmfgen -> voidsink
Figure 7.1: Ticker load

The source code is shown in the listing 7.3, it is provided with comments, so it’s easy to figure it out:

Listing 7.3: Variable computational load

/* File mstest13.c Variable computational load. */
#include <stdio.h>
#include <signal.h>
#include <Mediastreamer2/msfilter.h>
#include <Mediastreamer2/msticker.h>
#include <Mediastreamer2/dtmfgen.h>
#include <Mediastreamer2/mssndcard.h>
#include <Mediastreamer2/msvolume.h>
/*----------------------------------------------------------*/
struct _app_vars
{
int step; /* The number of filters to add at a time. */
int limit; /* The number of filters on which to finish the job. */
int ticker_priority; /* Ticker priority. */
char* file_name; /* Output file name. */
FILE *file;
};
typedef struct _app_vars app_vars;
/*----------------------------------------------------------*/
/* Function to convert command line arguments to program settings. */
void scan_args(int argc, char *argv[], app_vars *v)
{
char i;
for (i=0; i<argc; i++)
{
if (!strcmp(argv[i], "--help"))
{
char *p=argv[0]; p=p + 2;
printf(" %s computational load\n\n", p);
printf("--help List of options.\n");
printf("--version Version of application.\n");
printf("--step Filters count per step.\n");
printf("--tprio Ticker priority:\n"
" MS_TICKER_PRIO_NORMAL 0\n"
" MS_TICKER_PRIO_HIGH 1\n"
" MS_TICKER_PRIO_REALTIME 2\n");
printf("--limit Filters count limit.\n");
printf("-o Output file name.\n");
exit(0);
}
        if (!strcmp(argv[i], "--version"))
{
printf("0.1\n");
exit(0);
}
        if (!strcmp(argv[i], "--step"))
{
v->step = atoi(argv[i+1]);
printf("step: %i\n", v->step);
}
        if (!strcmp(argv[i], "--tprio"))
{
int prio = atoi(argv[i+1]);
if ((prio >=MS_TICKER_PRIO_NORMAL) && (prio <= MS_TICKER_PRIO_REALTIME))
{
v->ticker_priority = atoi(argv[i+1]);
printf("ticker priority: %i\n", v->ticker_priority);
}
else
{
printf(" Bad ticker priority: %i\n", prio);
exit(1);
}
}
        if (!strcmp(argv[i], "--limit"))
{
v->limit = atoi(argv[i+1]);
printf("limit: %i\n", v->limit);
}
        if (!strcmp(argv[i], "-o"))
{
v->file_name=argv[i+1];
printf("file namet: %s\n", v->file_name);
}
}
}
/*----------------------------------------------------------*/
/* Structure for storing program settings. */
app_vars vars;
/*----------------------------------------------------------*/
void saveMyData()
{
// We close the file.
if (vars.file) fclose(vars.file);
exit(0);
}
void signalHandler( int signalNumber )
{
static pthread_once_t semaphore = PTHREAD_ONCE_INIT;
printf("\nsignal %i received.\n", signalNumber);
pthread_once( & semaphore, saveMyData );
}
/*----------------------------------------------------------*/
int main(int argc, char *argv[])
{
/* We set the default settings. */
app_vars vars={100, 100500, MS_TICKER_PRIO_NORMAL, 0};
    // We connect the handler Ctrl-C.
signal( SIGTERM, signalHandler );
signal( SIGINT, signalHandler );
    /* We set the program settings to 
according to the command line arguments. */
scan_args(argc, argv, &vars);
    if (vars.file_name)
{
vars.file = fopen(vars.file_name, "w");
}
    ms_init();
/* We create instances of filters. */
MSFilter *voidsource=ms_filter_new(MS_VOID_SOURCE_ID);
MSFilter *dtmfgen=ms_filter_new(MS_DTMF_GEN_ID);
    MSSndCard *card_playback=ms_snd_card_manager_get_default_card(ms_snd_card_manager_get());
MSFilter *snd_card_write=ms_snd_card_create_writer(card_playback);
MSFilter *voidsink=ms_filter_new(MS_VOID_SINK_ID);
    MSDtmfGenCustomTone dtmf_cfg;
    /* We set the name of our signal, keeping in mind that in the array       we must leave room for a zero that marks the end strings. */
strncpy(dtmf_cfg.tone_name, "busy", sizeof(dtmf_cfg.tone_name));
dtmf_cfg.duration=1000;
dtmf_cfg.frequencies[0]=440; /* We will generate one tone, set the frequency of the second tone to 0. */
dtmf_cfg.frequencies[1]=0;
dtmf_cfg.amplitude=1.0; /*This sine amplitude should correspond to a measurement result of 0.707.*/
dtmf_cfg.interval=0.;
dtmf_cfg.repeat_count=0.;
    /* We set variables to store the result. */
float load=0.;
float latency=0.;
int filter_count=0;
    /* Create a ticker. */
MSTicker *ticker=ms_ticker_new();
ms_ticker_set_priority(ticker, vars.ticker_priority);
    /* We connect filters in a chain. */
ms_filter_link(voidsource, 0, dtmfgen, 0);
ms_filter_link(dtmfgen, 0, voidsink, 0);
    MSFilter* previous_filter=dtmfgen;
int gain=1;
int i;
    printf("# filters load\n");
if (vars.file)
{
fprintf(vars.file, "# filters load\n");
}
    while ((load <= 99.) && (filter_count < vars.limit))
{
// Temporarily disable the packet absorber from the circuit.
ms_filter_unlink(previous_filter, 0, voidsink, 0);
MSFilter *volume;
for (i=0; i<vars.step; i++)
{
volume=ms_filter_new(MS_VOLUME_ID);
ms_filter_call_method(volume, MS_VOLUME_SET_DB_GAIN, &gain);
ms_filter_link(previous_filter, 0, volume, 0);
previous_filter = volume;
}
// We return the "absorber" of packets to the schema.
ms_filter_link(volume, 0, voidsink, 0);
        /* We connect the clock source. */
ms_ticker_attach(ticker,voidsource);
        /* We turn on the sound generator. */
ms_filter_call_method(dtmfgen, MS_DTMF_GEN_PLAY_CUSTOM, (void*)&dtmf_cfg);
        /* We give time 100 milliseconds to accumulate data for averaging. */
ms_usleep(500000);
        /* We read the measurement result. */
load=ms_ticker_get_average_load(ticker);
        filter_count=filter_count + vars.step;
        /* Disable the clock source. */
ms_ticker_detach(ticker,voidsource);
        printf("%i  %f\n", filter_count, load);
if (vars.file) fprintf(vars.file,"%i %f\n", filter_count, load);
}
if (vars.file) fclose(vars.file);
}

Save it under the name mstest13.c and compile it with the command:

$ gcc mstest13.c -o mstest13 `pkg-config Mediastreamer2 - -libs - -cflags`

Next, we launch our tool to estimate the load of the ticker working with the lowest priority:

$ ./mstest13 - -step 100 - -limit 40000 - -tprio 0 -o log0.txt
$ ./mstest13 - -step 100 - -limit 40000 - -tprio 1 -o log1.txt
$ ./mstest13 - -step 100 - -limit 40000 - -tprio 2 -o log2.txt

Next, we “feed” the resulting files log0.txt, log1.txt, log2.txt to the excellent gnuplot utility:

$ gnuplot -e "set terminal png; set output 'load.png'; plot 'log0.txt' using 1:2 with lines , 'log1.txt' using 1:2 with lines, 'log2.txt' using 1:2 with lines"

As a result of the program’s work, the load.png file will be created, in which the graph will be drawn as shown in7.2.

Figure 7.2: Ticker load at different priorities

The ticker load in percents is shown vertically, the number of added load filters is shown horizontally. In this graph, we see that, as expected, for priority 2 (blue line), the first noticeable overshoot is observed with 6000 filters connected, while, for priorities 0 (purple) and 1 (green), overshoots appear earlier, with 1000 and 3000 filters. respectively.

To obtain average results, you need to perform more program runs, but already on this chart we see that the ticker load grows linearly, in proportion to the number of filters.

7.1.2 Transferring part of job
to another thread

If your task can be divided into two or more threads, then everything is simple — create one or more new tickers and connect your filters to each graph. If the task cannot be parallelized, then it can be divided “across” by breaking the chain of filters into several segments, each of which will work in a separate thread (i.e. with its own ticker). Next, you will need to “stitch” the data streams so that the output of the first segment gets to the input of the next one. This transfer of data between threads is performed using two special filters MS_ITC_SINK and MS_ITC_SOURCE, they are collectively called “intertickers “.

Intertickers

Filter MS_ITC_SINKprovides data output from the thread, it has only one input, it has no outputs. Filter MS_ITC_SOURCE provides asynchronous data entry into a thread, it has one output, it has no inputs. In terms of a media streamer, this pair of filters makes it possible to transfer data between filters operating from different tickers.

To start data transfer, these filters need to be connected, but not in the same way as we did with ordinary filters, i.e. with the ms_filter_link() function. In this case, the MS_ITC_SINK_CONNECT method of the MS_ITC_SINK filter is used:

ms_filter_call_method (itc_sink, MS_ITC_SINK_CONNECT, itc_src)

The method connects the two filters using an asynchronous queue. There is no method for separating inter-stickers.

Example of using intertikers

Below, in the figure 7.3, an example of a usage diagram is shown. The triple arrow shows an asynchronous queue between two threads.

Figure 7.3: Interticker application

Now we modify our test program in accordance with this scheme so that two threads connected by intertikers work in it as in the figure. Each thread will have an insertion point where new instances of load filters will be added, as shown in the figure 7.4. The added number of filters will be split exactly in half between threads.

Figure 7.4: Sharing the load between two threads

The corresponding program code is shown in the listing 7.4.

Listing 7.4: Variable computational load with intertickers

/* File mstest14.c Variable computational load with inter-stickers. */
#include <stdio.h>
#include <signal.h>
#include <Mediastreamer2/msfilter.h>
#include <Mediastreamer2/msticker.h>
#include <Mediastreamer2/dtmfgen.h>
#include <Mediastreamer2/mssndcard.h>
#include <Mediastreamer2/msvolume.h>
#include <Mediastreamer2/msitc.h>
/*----------------------------------------------------------*/
struct _app_vars
{
int step; /* The number of filters to add at a time. */
int limit; /* The number of filters on which law to work. */
int ticker_priority; /* Ticker priority. */
char* file_name; /* Output file name. */
FILE *file;
};
typedef struct _app_vars app_vars;
/*----------------------------------------------------------*/
/* Function to convert command line arguments to program settings. */
void scan_args(int argc, char *argv[], app_vars *v)
{
char i;
for (i=0; i<argc; i++)
{
if (!strcmp(argv[i], "--help"))
{
char *p=argv[0]; p=p + 2;
printf(" %s computational load diveded for two threads.\n\n", p);
printf("--help List of options.\n");
printf("--version Version of application.\n");
printf("--step Filters count per step.\n");
printf("--tprio Ticker priority:\n"
" MS_TICKER_PRIO_NORMAL 0\n"
" MS_TICKER_PRIO_HIGH 1\n"
" MS_TICKER_PRIO_REALTIME 2\n");
printf("--limit Filters count limit.\n");
printf("-o Output file name.\n");
exit(0);
}
        if (!strcmp(argv[i], "--version"))
{
printf("0.1\n");
exit(0);
}
        if (!strcmp(argv[i], "--step"))
{
v->step = atoi(argv[i+1]);
printf("step: %i\n", v->step);
}
        if (!strcmp(argv[i], "--tprio"))
{
int prio = atoi(argv[i+1]);
if ((prio >=MS_TICKER_PRIO_NORMAL) && (prio <= MS_TICKER_PRIO_REALTIME))
{
v->ticker_priority = atoi(argv[i+1]);
printf("ticker priority: %i\n", v->ticker_priority);
}
else
{
printf(" Bad ticker priority: %i\n", prio);
exit(1);
}
}
        if (!strcmp(argv[i], "--limit"))
{
v->limit = atoi(argv[i+1]);
printf("limit: %i\n", v->limit);
}
        if (!strcmp(argv[i], "-o"))
{
v->file_name=argv[i+1];
printf("file namet: %s\n", v->file_name);
}
}
}
/*----------------------------------------------------------*/
/* Structure for storing program settings. */
app_vars vars;
/*----------------------------------------------------------*/
void saveMyData()
{
// We close the file.
if (vars.file) fclose(vars.file);
exit(0);
}
void signalHandler( int signalNumber )
{
static pthread_once_t semaphore = PTHREAD_ONCE_INIT;
printf("\nsignal %i received.\n", signalNumber);
pthread_once( & semaphore, saveMyData );
}
/*----------------------------------------------------------*/
int main(int argc, char *argv[])
{
/* We set the default settings. */
app_vars vars={100, 100500, MS_TICKER_PRIO_NORMAL, 0};
    // We connect the handler Ctrl-C.
signal( SIGTERM, signalHandler );
signal( SIGINT, signalHandler );
    /* We set the program settings to according to the command line arguments. */
scan_args(argc, argv, &vars);
    if (vars.file_name)
{
vars.file = fopen(vars.file_name, "w");
}
    ms_init();
    /* Create filter instances for the first thread. */
MSFilter *voidsource = ms_filter_new(MS_VOID_SOURCE_ID);
MSFilter *dtmfgen = ms_filter_new(MS_DTMF_GEN_ID);
MSFilter *itc_sink = ms_filter_new(MS_ITC_SINK_ID);
    MSDtmfGenCustomTone dtmf_cfg;
    /* We set the name of our signal, remembering that in the array we must        leave room for a zero, which marks the end of the line. */
strncpy(dtmf_cfg.tone_name, "busy", sizeof(dtmf_cfg.tone_name));
dtmf_cfg.duration=1000;
dtmf_cfg.frequencies[0]=440; /* We will generate one tone, set the frequency of the second tone to 0. */
dtmf_cfg.frequencies[1]=0;
dtmf_cfg.amplitude=1.0; /* This sine amplitude should correspond to a measurement result of 0.707. */
dtmf_cfg.interval=0.;
dtmf_cfg.repeat_count=0.;
    /* We set variables to store the result. */
float load=0.;
float latency=0.;
int filter_count=0;
    /*Create a ticker. */
MSTicker *ticker1=ms_ticker_new();
ms_ticker_set_priority(ticker1, vars.ticker_priority);
    /* We connect filters in a chain. */
ms_filter_link(voidsource, 0, dtmfgen, 0);
ms_filter_link(dtmfgen, 0, itc_sink , 0);
    /* Create filter instances for the second thread. */
MSTicker *ticker2=ms_ticker_new();
ms_ticker_set_priority(ticker2, vars.ticker_priority);
MSFilter *itc_src = ms_filter_new(MS_ITC_SOURCE_ID);
MSFilter *voidsink2 = ms_filter_new(MS_VOID_SINK_ID);
ms_filter_call_method (itc_sink, MS_ITC_SINK_CONNECT, itc_src);
ms_filter_link(itc_src, 0, voidsink2, 0);
    MSFilter* previous_filter1=dtmfgen;
MSFilter* previous_filter2=itc_src;
int gain=1;
int i;
    printf("# filters load\n");
if (vars.file)
{
fprintf(vars.file, "# filters load\n");
}
while ((load <= 99.) && (filter_count < vars.limit))
{
        // Temporarily disable the packet “absorbers” from the schemes.
ms_filter_unlink(previous_filter1, 0, itc_sink, 0);
ms_filter_unlink(previous_filter2, 0, voidsink2, 0);
MSFilter *volume1, *volume2;
        // We divide the new load filters between the two threads.
int new_filters = vars.step>>1;
for (i=0; i < new_filters; i++)
{
volume1=ms_filter_new(MS_VOLUME_ID);
ms_filter_call_method(volume1, MS_VOLUME_SET_DB_GAIN, &gain);
ms_filter_link(previous_filter1, 0, volume1, 0);
previous_filter1 = volume1;
}
        new_filters = vars.step - new_filters;
for (i=0; i < new_filters; i++)
{
volume2=ms_filter_new(MS_VOLUME_ID);
ms_filter_call_method(volume2, MS_VOLUME_SET_DB_GAIN, &gain);
ms_filter_link(previous_filter2, 0, volume2, 0);
previous_filter2 = volume2;
}
        // We return the packet “absorbers” to the schemas.
ms_filter_link(volume1, 0, itc_sink, 0);
ms_filter_link(volume2, 0, voidsink2, 0);
        /* We connect the clock source. */
ms_ticker_attach(ticker2, itc_src);
ms_ticker_attach(ticker1, voidsource);
        /* We turn on the sound generator. */
ms_filter_call_method(dtmfgen, MS_DTMF_GEN_PLAY_CUSTOM, (void*)&dtmf_cfg);
        /* We give time to accumulate data for averaging. */
ms_usleep(500000);
        /* We read the measurement result. */
load=ms_ticker_get_average_load(ticker1);
        filter_count=filter_count + vars.step;
        /* Disable the clock source. */
ms_ticker_detach(ticker1, voidsource);
        printf("%i  %f\n", filter_count, load);
if (vars.file) fprintf(vars.file,"%i %f\n", filter_count, load);
}
if (vars.file) fclose(vars.file);
}

Next, we compile and run our program with the tickers working with the lowest priority:

$ ./mstest14 - -step 100 - -limit 40000 - -tprio 0 -o log4.txt

The obtained measurement result is shown in the figure.7.5

Figure 7.5: Ticker load when using an additional thread

For convenience, curves obtained for the first version of the program have been added to the graph. The orange curve shows the result for the “two-threaded” version of the program. It can be seen from the graph that the ticker load growth rate for the “two-threaded” scheme is lower. The load on the second ticker is not shown.

If necessary, you can connect threads running on different hosts, only in this case, instead of intertikers, use an RTP‑session (as we did earlier when creating an intercom), here you will also need to take into account that the size of RTP‑packets is limited from above by the MTU value.

Based on the results of this chapter, we can conclude that changing the priority of a ticker does not affect its performance, increasing the priority only reduces the likelihood of ticker lag. The recipe for improving the performance of the circuit as a whole is to split the processing graph into several clusters with their own individual tickers. In this case, the payment for reducing the load on the ticker is an increase in the time of data passage from the input to the output of the circuit.


What’s Mediastreamer2. Ticker load management was originally published in Level Up Coding on Medium, where people are continuing the conversation by highlighting and responding to this story.


This content originally appeared on Level Up Coding - Medium and was authored by Igor Plastov


Print Share Comment Cite Upload Translate Updates
APA

Igor Plastov | Sciencx (2022-05-01T16:01:01+00:00) What’s Mediastreamer2. Ticker load management. Retrieved from https://www.scien.cx/2022/05/01/whats-mediastreamer2-ticker-load-management/

MLA
" » What’s Mediastreamer2. Ticker load management." Igor Plastov | Sciencx - Sunday May 1, 2022, https://www.scien.cx/2022/05/01/whats-mediastreamer2-ticker-load-management/
HARVARD
Igor Plastov | Sciencx Sunday May 1, 2022 » What’s Mediastreamer2. Ticker load management., viewed ,<https://www.scien.cx/2022/05/01/whats-mediastreamer2-ticker-load-management/>
VANCOUVER
Igor Plastov | Sciencx - » What’s Mediastreamer2. Ticker load management. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2022/05/01/whats-mediastreamer2-ticker-load-management/
CHICAGO
" » What’s Mediastreamer2. Ticker load management." Igor Plastov | Sciencx - Accessed . https://www.scien.cx/2022/05/01/whats-mediastreamer2-ticker-load-management/
IEEE
" » What’s Mediastreamer2. Ticker load management." Igor Plastov | Sciencx [Online]. Available: https://www.scien.cx/2022/05/01/whats-mediastreamer2-ticker-load-management/. [Accessed: ]
rf:citation
» What’s Mediastreamer2. Ticker load management | Igor Plastov | Sciencx | https://www.scien.cx/2022/05/01/whats-mediastreamer2-ticker-load-management/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.