Friday, March 1, 2024

Python and random task distribution in ThreadPoolExecutor

Not a usual article for me, but will leave this boilerplate here for myself.

So, the task was the following: send multiple PUBLISH messages over SIPP, but not overload the SIP Proxy, and distribute them in time, like not sending all at once. Yes, I do know SIPP can do this, but I wanted to have SIPP exit code after sending each message and try to re-send it in case of issues. Order of sending is whatever. Just need to be delivered.

Here is the boilerplate Python function that I've used to achieve this

#!/usr/bin/python
import logging
import time
import random

from concurrent.futures import ThreadPoolExecutor

logging.getLogger('paramiko').setLevel(logging.INFO)

logging.basicConfig(
    format = '[%(asctime)s.%(msecs)03d] %(threadName)s %(name)s %(levelname)s: %(message)s',
    level=logging.INFO,
)

def get_timer_delay():
    '''
    Generator to be passed in ThreadPool to have delays in multiple iterated value processes.
    random delay between 5 and 200 ms. When reaching 1 sec - resets back to 0
    '''
    num = 0.0
    while True:
        yield num
        if num < 1:
            num += float(random.randrange(5, 200, 3)) / 1000
        else:
            num = 0.0

def process_iterated_value(value, start_delay):
    '''
    This function is called in thread-wise way to have multiple values processed at the moment
    '''
    time.sleep(start_delay)

    # Call SIPP process here (with exit code control)

    logging.info(f"Processed value {value} with delay {start_delay}")

#### ---------------- Script start ---------------- ########

execute_timeout = get_timer_delay()
iterated_value = range(0, 30)

logging.info("Process start")

with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(process_iterated_value, iterated_value, execute_timeout)


logging.info("Process end")

Output is looks like this. All values are processed in a more-or-less distributed way that can be seen by timestamps

[00:06:43,375.375] MainThread root INFO: Process start
[00:06:43,375.375] ThreadPoolExecutor-0_0 root INFO: Processed value 0 with delay 0.0
[00:06:43,504.504] ThreadPoolExecutor-0_0 root INFO: Processed value 1 with delay 0.128
[00:06:43,620.620] ThreadPoolExecutor-0_1 root INFO: Processed value 2 with delay 0.244
[00:06:43,794.794] ThreadPoolExecutor-0_2 root INFO: Processed value 3 with delay 0.417
[00:06:43,928.928] ThreadPoolExecutor-0_3 root INFO: Processed value 4 with delay 0.5509999999999999
[00:06:44,053.053] ThreadPoolExecutor-0_4 root INFO: Processed value 5 with delay 0.6759999999999999
[00:06:44,334.334] ThreadPoolExecutor-0_0 root INFO: Processed value 6 with delay 0.828
[00:06:44,334.334] ThreadPoolExecutor-0_0 root INFO: Processed value 11 with delay 0.0
[00:06:44,475.475] ThreadPoolExecutor-0_0 root INFO: Processed value 12 with delay 0.14
[00:06:44,590.590] ThreadPoolExecutor-0_1 root INFO: Processed value 7 with delay 0.968
[00:06:44,689.689] ThreadPoolExecutor-0_0 root INFO: Processed value 13 with delay 0.21400000000000002
[00:06:44,769.769] ThreadPoolExecutor-0_2 root INFO: Processed value 8 with delay 0.973
[00:06:44,914.914] ThreadPoolExecutor-0_3 root INFO: Processed value 9 with delay 0.984
[00:06:44,941.941] ThreadPoolExecutor-0_1 root INFO: Processed value 14 with delay 0.35100000000000003
[00:06:45,185.185] ThreadPoolExecutor-0_0 root INFO: Processed value 15 with delay 0.494
[00:06:45,185.185] ThreadPoolExecutor-0_4 root INFO: Processed value 10 with delay 1.13
[00:06:45,413.413] ThreadPoolExecutor-0_2 root INFO: Processed value 16 with delay 0.643
[00:06:45,641.641] ThreadPoolExecutor-0_3 root INFO: Processed value 17 with delay 0.726
[00:06:45,686.686] ThreadPoolExecutor-0_1 root INFO: Processed value 18 with delay 0.743
[00:06:45,686.686] ThreadPoolExecutor-0_1 root INFO: Processed value 23 with delay 0.0
[00:06:45,740.740] ThreadPoolExecutor-0_1 root INFO: Processed value 24 with delay 0.053
[00:06:45,810.810] ThreadPoolExecutor-0_1 root INFO: Processed value 25 with delay 0.07
[00:06:45,886.886] ThreadPoolExecutor-0_1 root INFO: Processed value 26 with delay 0.07500000000000001
[00:06:45,952.952] ThreadPoolExecutor-0_0 root INFO: Processed value 19 with delay 0.766
[00:06:46,021.021] ThreadPoolExecutor-0_1 root INFO: Processed value 27 with delay 0.134
[00:06:46,071.071] ThreadPoolExecutor-0_4 root INFO: Processed value 20 with delay 0.885
[00:06:46,142.142] ThreadPoolExecutor-0_0 root INFO: Processed value 28 with delay 0.19
[00:06:46,354.354] ThreadPoolExecutor-0_1 root INFO: Processed value 29 with delay 0.33299999999999996
[00:06:46,376.376] ThreadPoolExecutor-0_2 root INFO: Processed value 21 with delay 0.962
[00:06:46,721.721] ThreadPoolExecutor-0_3 root INFO: Processed value 22 with delay 1.078
[00:06:46,721.721] MainThread root INFO: Process end
 

For sure, all values need to be adjusted after, but the idea is there.

The only thing that bothers, I'm not 100% sure how generator value num would behave in a multithread environment due to Python GIL. But here I'm asking for someone who has a Python knowledge to comment.

Tuesday, November 28, 2023

Asterisk and sharing <Custom:> device states

Asterisk has a mechanism to sharing device states across the servers using just PJSIP.

But what is interesting, this mechanism is not really applies to Custom: device states. Means if you have 2 servers and want to share BLF state via hint and have something like

extensions.conf

...

exten => MY_STATE,hint,Custom:MY_DEVICE


and changing it via asterisk CLI


> devstate change Custom:MY_DEVICE BUSY


on the local server, it will change the hint on the remote server, but not the actual devstate that you're connecting this hint with 

> devstate list 

---------------------------------------------------------------------
--- Custom Device States --------------------------------------------
---------------------------------------------------------------------
---
--- Name: 'Custom:MY_DEVICE'  State: 'NOT_INUSE'
 

> core show hints

    -= Registered Asterisk Dial Plan Hints =-
MY_STATE@hints: Custom:MY_DEVICE  State:Busy            Presence:not_set         Watchers  0


This situation gives me a lot of hard times, when you have visual inconsistency on the same server, but totally forgetting that you have second spare server.

For me, the resolution was to sync the actual Custom: device states via external service using AMI that monitors DeviceStateChange events and using Setvar just changing the state explicitly.

Just a note, that if someone will fall into the same pit.

Thursday, September 21, 2023

Asterisk queues, local channels and transfers

Here comes the same old story about Asterisk queues and using Local channels within it. But with PJSIP flavour.

So, it is known, that local channels in Asterisk can be tricky and might require disable of the optimizations when used in Queues. Yes, famous /n at the end of the channel name so the Queue application can track the status of the channel

But the drawback of this method is also known and this is transfers calls from the agents. Means when you do a transfer, initial channel is not freed.

Or outgoing calls from the agents. Like if agent do an outbound call and still resides in a queue, it would not be considered as busy.

Many methods to workaround this, and I'm not exception to build own based on GROUP and GROUP_COUNT originating from Asterisk: The Definitive Guide book, but with some modifications regarding new ways of setting variables on PJSIP channels and further optimisations of Local Channel in the Asterisk.


queues.conf

...

[QUEUE_A]

member => Local/AGENT_A@agents

...

extensions.conf

...

[incoming]

exten => QUEUE_A,1,Queue(QUEUE_A)

[agents]

; First to  check if there are active calls on this agent

exten => AGENT_A,1,ExecIf($[ ${GROUP_COUNT(${EXTEN}@agents)} >= 1 ]?Congestion())

    ; We  need to set GROUP on outgoing channel as current channel will be destroyed upon answer

    same => n,Dial(PJSIP/AGENT_A@trunk,,b(set_group^${EXTEN}^1))

[agents_outgoing] 

; Make sure we're not getting calls to agent when it's on outgoing call

exten => OUT_NUM,1,Gosub(set_group,${CALLERID(num)},1)

    same => n,Dial(PJSIP/${EXTEN}@out_trunk)


[set_group]

exten => AGENT_A,1,Set(GROUP(agents)=${EXTEN})

    same => n,Return()

Just a small warning, it's more a pseudo-code, but just to give an overall idea

Friday, August 18, 2023

Importance of creating dialog in Kamailio right

 I know, the things described here might be obvious and considered "for beginners", but everyone can make stupid errors. Even the best of us :)
So, what I'm using Kamailio dialog module for?

  • Keepalives with ka_timer parameter, mainly cause of the nature of mobile clients that are unstable due to the mobile network nature
  • Presence PUBLISH'es via pua/pua_dialoginfo modules to separate presence server

 

With the second one actually, I've issued some "problems" due to configuration.

Most examples (at least what I've seen) on Kamailio configs, that are operating with the dialogs, have more-or-less this structure:

request_route {
   route(REQINIT);

   if (is_method("INVITE")) {
       dlg_manage();
   }

   route(WITHINDLG);

   ...

   route(AUTH);

   ...

}

There is a problem with this code. And it is, that this code creates dialogs for EVERY INVITE, even if not authed. 

So, the usual flow INVITE - 401 - INVITE (w/auth) - 200 will create 2 dialogs in this case. The first one will be not terminated correctly with connection with pua module. It will generate PUBLISH with the Trying state,  but on 401 - ACK, there would be no corresponding PUBLISH for the terminated state. And as usual, Expire here is 3600, and you will have a lot of "ringing" devices on your presence server.

The answer would be: create dialogs only for authed (and valid) INVITEs. This will also save some CPU not to create dialogs for spammers.

request_route {
   route(REQINIT);

   route(WITHINDLG);

   ...

   route(AUTH);

   if (is_method("INVITE")) {
       dlg_manage();
   }

   ...

}

Tuesday, April 4, 2023

Adding a bit of privacy to Kamailio's pua_dialoginfo module

Kamailio is a great product. With really good customisation possibilities.One of examples will follow here.

Task is the following: we have a setup with 2 different servers, proxy and presence server. Presence server is simple and just processing SUBSCRIBE/PUBLISH messages and generating NOTIFY's. Mainly for BLF's.  But users wants to have some privacy in a question, than not everyone can subscribe to anyone and more interesting - monitor who's calling whom. But some of the users, especially in boss-assistant scenarios want to have full information about calls. Like assistant of the boss usually should know who's calling to boss phone and answer (pickup) in some cases. 

Kamailio allows to achieve this scenario, but with a bit of customisation.

Idea is to have concept of 2 groups. 

Group1 for ACL about possibility of user monitor other user in general, second one - having ACL for seeing call details.

Here I'll point on idea with some code examples for proxy part. And just to mention, pickup functions are implemented not on proxy server, so it's not broken for me here.

Proxy

kamailio.cfg

loadmodule "pua.so"
loadmodule "pua_dialoginfo.so"
...
modparam("pua", "db_url", DBURL)
modparam("pua", "outbound_proxy", "sip:<PRESENCE_SERVER>:5060")
modparam("pua", "db_mode", 0)
...
modparam("pua_dialoginfo", "include_callid", 0)
modparam("pua_dialoginfo", "include_tags", 0)
# we want to have info on who's calling
modparam("pua_dialoginfo", "include_localremote", 1)
... 
# Table to allow subscription one on each other. If users share the same group - they can subscribe one to each other.
# By default all shares the same group = 1. If group == 0, that means nobody can see your state.
# Format is <extension>=><list_of_groups_comma_separated>
modparam("htable", "htable", 'presence_view_grp=>size=15;')
# Table to allow subscribers view each other call details data. By default it's forbidden.
modparam("htable", "htable", 'presence_view_details_grp=>size=15;')
# You can fill these tables via database or whatever else method you prefer

...
route(PRESENCE_PRIVACY);
route(WITHINDLG);
...

route(AUTH)
route(PRESENCE)
...
# Presence server route
route[PRESENCE] {
    if(!is_method("PUBLISH|SUBSCRIBE")) {
        return;
    }
    # Check if user is registered
    if (is_method("SUBSCRIBE") && !(registered("location", "$fu", 4) == 1)) {
        append_to_reply("Retry-After: 10\r\n");
        send_reply("500", "Retry Later");
        exit;
    }

   $du = "sip:<PRESENCE_SERVER>:5060";
   
    # By default - allow anyone subscribe to anyone.
    $var(presence_grp_from) = "1";
    $var(presence_grp_to) = "1";

    if ($sht(presence_view_grp=>$fU) != $null) {
        $var(presence_grp_from) = $sht(presence_view_grp=>$fU);
    }

    if ($sht(presence_view_grp=>$tU) != $null) {
        $var(presence_grp_to) = $sht(presence_view_grp=>$tU);
    }


    if (is_method("SUBSCRIBE")) {
        if ((str)$var(presence_grp_to) == "0") {
            # Presence group "0" means fully private.

            send_reply("404", "Subscriber not exists");
            exit;
        }

        $var(group_1) = $var(presence_grp_to);
        $var(group_2) = $var(presence_grp_from);

        if (!route(CHECK_GROUP_INTERSECTION)) {
            # No common groups for From/To
            send_reply("404", "Subscriber not exists");
            exit;
        }
    }

    route(RELAY);
}

route[PRESENCE_PRIVACY] {
    # We're processing only NOTIFY's with Dialog XML data here
    if (!is_method("NOTIFY") || !has_body("application/dialog-info+xml")) {
        return;
    }

    if ($sht(presence_view_details_grp=>$fU) == $null || $sht(presence_view_details_grp=>$tU) == $null) {
        # By default all call data is private
        route(PRESENCE_CLEAR_CALL_DETAILS);
        return;
    }

    $var(group_1) = $sht(presence_view_details_grp=>$tU);
    $var(group_2) = $sht(presence_view_details_grp=>$fU);

    if (!route(CHECK_GROUP_INTERSECTION)) {
        # No common groups for From/To
        route(
PRESENCE_CLEAR_CALL_DETAILS);
        return;
    }

    # At this point all is ok.

}

route[PRESENCE_CLEAR_CALL_DETAILS] {

    # Forming new body of the presence info
    # For replace trick see https://lists.kamailio.org/pipermail/sr-users/2022-February/114185.html
    $xml(body=>doc) = $(rb{s.replace,xmlns=,xyzwq=});
   
    # Forming new XML
    $var(new_body) = "<?xml version=\"1.0\"?>\n";
    $var(new_body) = $var(new_body) + "<dialog-info xmlns=\"" + $xml(body=>xpath:/dialog-info/@xyzwq) + "\"";
    $var(new_body) = $var(new_body) + " version=\"" + $xml(body=>xpath:/dialog-info/@version) + "\"";
    $var(new_body) = $var(new_body) + " state=\"" + $xml(body=>xpath:/dialog-info/@state) + "\"";
    $var(new_body) = $var(new_body) + " entity=\"" + $xml(body=>xpath:/dialog-info/@entity) + "\">\n";
    $var(new_body) = $var(new_body) + "  <dialog id=\"" + $xml(body=>xpath:/dialog-info/dialog/@id) + "\"\n";
    $var(new_body) = $var(new_body) + " direction=\"" + $xml(body=>xpath:/dialog-info/dialog/@direction) + "\">\n";
    $var(new_body) = $var(new_body) + "    <state>" + $xml(body=>xpath:/dialog-info/dialog/state/text()) + "</state>\n";
    $var(new_body) = $var(new_body) + "  </dialog>\n</dialog-info>";

    replace_body_atonce("^.+$", $var(new_body));
}

route[CHECK_GROUP_INTERSECTION] {
# Checks if contents of $var(group_1) and $var(group_2) that are comma-separated list of groups have at least one in common
    $var(group_1_param_count) = $(var(group_1){s.count,,});

    while ($var(group_1_param_count) >= 0) {
        $var(group_1_param) = $(var(group_1){s.select,$var(group_1_param_count),,});

        if (in_list("$var(group_1_param)", "$var(group_2)", ",")) {
            return 1;
        }
        $var(group_1_param_count) = $var(group_1_param_count) - 1;
    }
    return -1;
}

Idea here is we're stripping local/remote data from NOTIFY XML that indicates a dialog. In this case we're indicating fact of the call, but not exposing any sensitive data.

Monday, December 5, 2022

VOLTS - framework for testing VoIP deployments

 Last year I was working on VOLTS - framework I'm using for testing VoIP deployments in automated way. 

Right now it combines tools like voip_patrol and sipp in one tool with some extra features like database write/delete and media file check.

Not to repeat here all README, two links - github and the following video will tell it all.



 


Do you test your deployments?

Tuesday, October 25, 2022

Protect Kamailio from TCP/TLS flood

 After stress-testing Kamailio with sipflood tool from sippts suite (which deserves another article), not so good outcome was faced.

Using CentOS 7 default OpenSSL library (1.0.2k-fips) with using Kamailio 5.4-5.6 with TLS transport, it's quite easy to get a segfault inside tls routines. I've found that roughly 10 000 OPTIONS packets with 200 threads is enough to ruin Kamailio process.

Basically, you can DoS the whole server regardless of it's power just with a single mid-range computer.

Solution was found with using Kamailio 5.6, but with tlsa flavour and latest openssl 1.1.x compiled.

Turns out it's a really simple process. 

As we're gonna need to compile Kamailio anyway, assume, that we have all necessary packets for build already on the system.

First - we need to get openssl sources:

# cd /usr/src

# wget https://www.openssl.org/source/openssl-1.1.<latest>.tar.gz

# tar xvf https://www.openssl.org/source/openssl-1.1.<latest>.tar.gz

# cd  openssl-1.1.<latest>

#  ./config

# make

(Optionally) Here we can make sure that this release is passing tests

# yum install perl-Test-Simple

# make test

Next step - point Kamailio to newly compiled openssl

# cd /usr/src

# wget https://www.kamailio.org/pub/kamailio/5.6.<latest>/src/kamailio-5.6.<latest>_src.tar.gz

# tar xvf kamailio-5.6.<latest>_src.tar.gz

# cd kamailio-5.6.<latest>

#  sed -i "s?LIBSSL_STATIC_SRCLIB \?= no?LIBSSL_STATIC_SRCLIB \?= yes?g" ./src/modules/tlsa/Makefile

# sed -i "s?LIBSSL_STATIC_SRCPATH \?= /usr/local/src/openssl?LIBSSL_STATIC_SRCPATH \?= /usr/src/openssl-1.1.<latest>?g" ./src/modules/tlsa/Makefile

...

Than goes your usual Kamailio compiling and don't forget to replace all "tls" module mentions in kamailio.cfg to "tlsa"

Results are much better. But than I've faced, that it's possible to "eat" all TCP connections on Kamailio server with this type of flood.

First - ulimit. Never underestimate defaults.  

# ulimit -n unlimited

Next steps - tune TCP stack.

Disclamer: next provided options are discussable and was not found by me and need to be adjusted to your case

kamailio.conf

...

tcp_connection_lifetime=3605
tcp_max_connections=4096
tls_max_connections=4096
tcp_connect_timeout=5
tcp_async=yes
tcp_keepidle=5
open_files_limit=4096

...

/etc/sysctl.conf

...

# To increase the amount of memory available for socket input/output queues
net.ipv4.tcp_rmem = 4096 25165824 25165824
net.core.rmem_max = 25165824
net.core.rmem_default = 25165824
net.ipv4.tcp_wmem = 4096 65536 25165824
net.core.wmem_max = 25165824
net.core.wmem_default = 65536
net.core.optmem_max = 25165824

# To limit the maximum number of requests queued to a listen socket
net.core.somaxconn = 128

# Tells TCP to instead make decisions that would prefer lower latency.
net.ipv4.tcp_low_latency=1

# Optional (it will increase performance)
net.core.netdev_max_backlog = 1000
net.ipv4.tcp_max_syn_backlog = 128
...

This will help, but not fully (at least in my case, I've must miss something and comments here are really welcomed)

As the second part I've decided to go with Fail2Ban and block flood on iptables level.

Setup is quite simple as well.

First - make sure Kamailio will log flood attempts:

kamailio.conf

...

 loadmodule "pike.so"

modparam("pike", "sampling_time_unit", 2)
modparam("pike", "reqs_density_per_unit", 30)
modparam("pike", "remove_latency", 120)

...

if (!pike_check_req()) {
            xlog("L_ALERT", "[SIP-FIREWALL][FAIL2BAN] $si\n");

            $sht(ipban=>$si) = 1;
            if ($proto != 'udp') {
                tcp_close_connection();
            }
            drop;
        }

...

Next - install and configure Fail2Ban

# yum install -y fail2ban

 /etc/fail2ban/jail.local

[DEFAULT]
# Ban hosts for one hour:
bantime = 3600

# Override /etc/fail2ban/jail.d/00-firewalld.conf:
banaction = iptables-multiport
action      = %(action_mwl)s

[cernphone-iptables]
enabled  = true
filter   = mypbx
action   = iptables-mypbx[name=mypbx, protocol=tcp, blocktype='REJECT --reject-with tcp-reset']
           sendmail[sender=<sender_addr>, dest=<dest_addr> sendername=Fail2Ban]
logpath  = <your_kamailio_logfile>
maxretry = 1
bantime  = 3600s
findtime = 10s
 

 /etc/fail2ban/action.d/iptables-mypbx.conf

[INCLUDES]

before = iptables-common.conf

[Definition]

actionstart = <iptables> -N f2b-<name>
              <iptables> -A f2b-<name> -j <returntype>
              <iptables> -I <chain> -p <protocol> -j f2b-<name>

actionstop = <iptables> -D <chain> -p <protocol>  -j f2b-<name>
             <actionflush>
             <iptables> -X f2b-<name>


actioncheck = <iptables> -n -L <chain> | grep -q 'f2b-<name>[ \t]'

actionban = <iptables> -I f2b-<name> 1 -s <ip> -p <protocol> -j <blocktype>

actionunban = <iptables> -D f2b-<name> -s <ip> -p <protocol> -j <blocktype>

 

 /etc/fail2ban/filter.d/mypbx.local

[Definition]
# filter for kamailio messages
failregex = \[SIP-FIREWALL\]\[FAIL2BAN\] <HOST>$
 

# systemctl enable fail2ban

# systemctl start fail2ban

In this case we will get host banned on iptables level.