diff --git a/.gitignore b/.gitignore index 1220d7df..cac24aae 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ cmake-build-debug/ .idea/ .vscode/ build/ -_packages \ No newline at end of file +_packages +.vs diff --git a/CMakeLists.txt b/CMakeLists.txt index a8940280..8a550bf3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.18) project(mod_audio_stream - VERSION 1.0.0 + VERSION 1.1.0 DESCRIPTION "Audio streaming module for FreeSWITCH." HOMEPAGE_URL "https://github.com/amigniter/mod_audio_stream") diff --git a/README.md b/README.md index 631dac98..f61ec4d0 100644 --- a/README.md +++ b/README.md @@ -123,7 +123,7 @@ Defaults to `false`, which enforces hostname match with the peer certificate. The freeswitch module exposes the following API commands: ``` -uuid_audio_stream start +uuid_audio_stream start ``` Attaches a media bug and starts streaming audio (in L16 format) to the websocket server. FS default is 8k. If sampling-rate is other than 8k it will be resampled. - `uuid` - Freeswitch channel unique id @@ -136,6 +136,11 @@ Attaches a media bug and starts streaming audio (in L16 format) to the websocket - "8k" = 8000 Hz sample rate will be generated - "16k" = 16000 Hz sample rate will be generated - `metadata` - (optional) a valid `utf-8` text to send. It will be sent the first before audio streaming starts. +- `audio-format` - (optional) specify if audio sent out is binary or base64 encoded. + - "base64" = audio will be base64 encoded before sending it. + - "binary" = anything other that 'base64' will default to sending binary audio. +- `audio-head` - (optional) if sending base64, audio-head and audio-tail will wrap the audio. Useful for sending base64 audio in a json format. +- `audio-tail` - (optional) if sending base64, audio-head and audio-tail will wrap the audio. Useful for sending base64 audio in a json format. ``` uuid_audio_stream send_text diff --git a/audio_streamer_glue.cpp b/audio_streamer_glue.cpp index bfdbc647..2935aa04 100644 --- a/audio_streamer_glue.cpp +++ b/audio_streamer_glue.cpp @@ -1,4 +1,5 @@ #include +#include #include #include "mod_audio_stream.h" #include "WebSocketClient.h" @@ -43,7 +44,7 @@ class AudioStreamer { } bool isConnected() { - return client.isConnected(); + return client.isConnected() && m_isConnected.load(std::memory_order_acquire); } void writeBinary(uint8_t* buffer, size_t len) { @@ -273,17 +274,21 @@ class AudioStreamer { switch (event) { case CONNECT_SUCCESS: + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_INFO, "connection succeeded\n"); send_initial_metadata(psession); + m_isConnected.store(true, std::memory_order_release); m_notify(psession, EVENT_CONNECT, msg.c_str()); break; case CONNECTION_DROPPED: switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_INFO, "connection closed\n"); + m_isConnected.store(false, std::memory_order_release); m_notify(psession, EVENT_DISCONNECT, msg.c_str()); break; case CONNECT_ERROR: switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(psession), SWITCH_LOG_INFO, "connection error\n"); + m_isConnected.store(false, std::memory_order_release); m_notify(psession, EVENT_ERROR, msg.c_str()); media_bug_close(psession); break; @@ -436,6 +441,7 @@ class AudioStreamer { int m_playFile; std::unordered_set m_Files; std::atomic m_cleanedUp{false}; + std::atomic m_isConnected{ false }; std::mutex m_stateMutex; }; @@ -443,7 +449,8 @@ class AudioStreamer { namespace { switch_status_t stream_data_init(private_t *tech_pvt, switch_core_session_t *session, char *wsUri, - uint32_t sampling, int desiredSampling, int channels, char *metadata, responseHandler_t responseHandler, + uint32_t sampling, int desiredSampling, int channels, char *metadata, + int sendAsBinary, char *jsonHead, char *jsonTail, responseHandler_t responseHandler, int deflate, int heart_beat, bool suppressLog, int rtp_packets, const char* extra_headers, const char *tls_cafile, const char *tls_keyfile, const char *tls_certfile, bool tls_disable_hostname_validation) @@ -461,8 +468,11 @@ namespace { tech_pvt->rtp_packets = rtp_packets; tech_pvt->channels = channels; tech_pvt->audio_paused = 0; + tech_pvt->send_binary_audio = sendAsBinary; if (metadata) strncpy(tech_pvt->initialMetadata, metadata, MAX_METADATA_LEN); + if (jsonHead) strncpy(tech_pvt->audioJsonHead, jsonHead, MAX_JSON_HEAD_LEN); + if (jsonTail) strncpy(tech_pvt->audioJsonTail, jsonTail, MAX_JSON_TAIL_LEN); //size_t buflen = (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * 1000 / RTP_PERIOD * BUFFERED_SEC); const size_t buflen = (FRAME_SIZE_8000 * desiredSampling / 8000 * channels * rtp_packets); @@ -493,7 +503,10 @@ namespace { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%s) no resampling needed for this call\n", tech_pvt->sessionId); } - switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%s) stream_data_init\n", tech_pvt->sessionId); + if (tech_pvt->send_binary_audio) + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%s) stream_data_init: sendAsBinary=TRUE\n", tech_pvt->sessionId); + else + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "(%s) stream_data_init: sendAsBinary=FALSE\n", tech_pvt->sessionId); return SWITCH_STATUS_SUCCESS; } @@ -647,6 +660,9 @@ extern "C" { int sampling, int channels, char* metadata, + int sendAsBinary, + char *jsonHead, + char *jsonTail, void **ppUserData) { int deflate, heart_beat; @@ -706,7 +722,7 @@ extern "C" { return SWITCH_STATUS_FALSE; } if (SWITCH_STATUS_SUCCESS != stream_data_init(tech_pvt, session, wsUri, samples_per_second, sampling, channels, - metadata, responseHandler, deflate, heart_beat, suppressLog, rtp_packets, + metadata, sendAsBinary, jsonHead, jsonTail, responseHandler, deflate, heart_beat, suppressLog, rtp_packets, extra_headers, tls_cafile, tls_keyfile, tls_certfile, tls_disable_hostname_validation)) { destroy_tech_pvt(tech_pvt); return SWITCH_STATUS_FALSE; @@ -859,7 +875,19 @@ extern "C" { for (auto &chunk : pending_send) { if (!chunk.empty()) { - streamer->writeBinary(chunk.data(), chunk.size()); + if (tech_pvt->send_binary_audio) { + streamer->writeBinary(chunk.data(), chunk.size()); + } + else { + std::stringstream ss; + + ss << tech_pvt->audioJsonHead; + ss << base64_encode(chunk.data(), chunk.size(), false); + ss << tech_pvt->audioJsonTail; + + auto jsonStr = ss.str(); + streamer->writeText(jsonStr.c_str()); + } } } diff --git a/audio_streamer_glue.h b/audio_streamer_glue.h index cbcc7689..9eee79b0 100644 --- a/audio_streamer_glue.h +++ b/audio_streamer_glue.h @@ -7,7 +7,8 @@ switch_status_t is_valid_utf8(const char *str); switch_status_t stream_session_send_text(switch_core_session_t *session, char* text); switch_status_t stream_session_pauseresume(switch_core_session_t *session, int pause); switch_status_t stream_session_init(switch_core_session_t *session, responseHandler_t responseHandler, - uint32_t samples_per_second, char *wsUri, int sampling, int channels, char* metadata, void **ppUserData); + uint32_t samples_per_second, char *wsUri, int sampling, int channels, char* metadata, + int sendAsBinary, char *jsonHead, char *jsonTail, void **ppUserData); switch_bool_t stream_frame(switch_media_bug_t *bug); switch_status_t stream_session_cleanup(switch_core_session_t *session, char* text, int channelIsClosing); diff --git a/mod_audio_stream.c b/mod_audio_stream.c index 2ea0a8b0..79e58790 100644 --- a/mod_audio_stream.c +++ b/mod_audio_stream.c @@ -57,7 +57,10 @@ static switch_status_t start_capture(switch_core_session_t *session, switch_media_bug_flag_t flags, char* wsUri, int sampling, - char* metadata) + char* metadata, + int sendAsBinary, + char* jsonHead, + char* jsonTail) { switch_channel_t *channel = switch_core_session_get_channel(session); switch_media_bug_t *bug; @@ -81,7 +84,7 @@ static switch_status_t start_capture(switch_core_session_t *session, switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_DEBUG, "calling stream_session_init.\n"); if (SWITCH_STATUS_FALSE == stream_session_init(session, responseHandler, read_codec->implementation->actual_samples_per_second, - wsUri, sampling, channels, metadata, &pUserData)) { + wsUri, sampling, channels, metadata, sendAsBinary, jsonHead, jsonTail, &pUserData)) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "Error initializing mod_audio_stream session.\n"); return SWITCH_STATUS_FALSE; } @@ -139,7 +142,7 @@ static switch_status_t send_text(switch_core_session_t *session, char* text) { #define STREAM_API_SYNTAX " [start | stop | send_text | pause | resume | graceful-shutdown ] [wss-url | path] [mono | mixed | stereo] [8000 | 16000] [metadata]" SWITCH_STANDARD_API(stream_function) { - char *mycmd = NULL, *argv[6] = { 0 }; + char *mycmd = NULL, *argv[9] = { 0 }; int argc = 0; switch_status_t status = SWITCH_STATUS_FALSE; @@ -188,8 +191,9 @@ SWITCH_STANDARD_API(stream_function) char wsUri[MAX_WS_URI]; int sampling = 8000; switch_media_bug_flag_t flags = SMBF_READ_STREAM; + int sendAsBinary = 1; char *metadata = argc > 5 ? argv[5] : NULL; - if(metadata && (is_valid_utf8(argv[2]) != SWITCH_STATUS_SUCCESS)) { + if(metadata && (is_valid_utf8(metadata) != SWITCH_STATUS_SUCCESS)) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "%s contains invalid utf8 characters\n", argv[2]); switch_core_session_rwunlock(lsession); @@ -215,6 +219,25 @@ SWITCH_STANDARD_API(stream_function) sampling = atoi(argv[4]); } } + if (argc > 6) { + if (0 == strcmp(argv[6], "base64")) { + sendAsBinary = 0; + } + } + char *jsonHead = (!sendAsBinary && argc > 8) ? argv[7] : NULL; + char *jsonTail = (!sendAsBinary && argc > 8) ? argv[8] : NULL; + if (jsonHead && (is_valid_utf8(jsonHead) != SWITCH_STATUS_SUCCESS)) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, + "%s contains invalid utf8 characters\n", argv[2]); + switch_core_session_rwunlock(lsession); + goto done; + } + if (jsonTail && (is_valid_utf8(jsonTail) != SWITCH_STATUS_SUCCESS)) { + switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, + "%s contains invalid utf8 characters\n", argv[2]); + switch_core_session_rwunlock(lsession); + goto done; + } if (!validate_ws_uri(argv[2], &wsUri[0])) { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "invalid websocket uri: %s\n", argv[2]); @@ -222,7 +245,7 @@ SWITCH_STANDARD_API(stream_function) switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "invalid sample rate: %s\n", argv[4]); } else { - status = start_capture(lsession, flags, wsUri, sampling, metadata); + status = start_capture(lsession, flags, wsUri, sampling, metadata, sendAsBinary, jsonHead, jsonTail); } } else { switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, diff --git a/mod_audio_stream.h b/mod_audio_stream.h index cb10f720..fa6c7dbf 100644 --- a/mod_audio_stream.h +++ b/mod_audio_stream.h @@ -8,6 +8,8 @@ #define MAX_SESSION_ID (256) #define MAX_WS_URI (4096) #define MAX_METADATA_LEN (8192) +#define MAX_JSON_HEAD_LEN (256) +#define MAX_JSON_TAIL_LEN (256) #define EVENT_CONNECT "mod_audio_stream::connect" #define EVENT_DISCONNECT "mod_audio_stream::disconnect" @@ -29,7 +31,10 @@ struct private_data { int audio_paused:1; int close_requested:1; int cleanup_started:1; + int send_binary_audio:1; char initialMetadata[8192]; + char audioJsonHead[256]; + char audioJsonTail[256]; switch_buffer_t *sbuffer; int rtp_packets; };