pipewirestreamwatcher/main.cpp

309 lines
7.9 KiB
C++

#include <cstdint>
#include <pipewire/context.h>
#include <pipewire/core.h>
#include <pipewire/pipewire.h>
#include <iostream>
#include <spa/param/audio/format-utils.h>
#include <spa/param/audio/raw.h>
#include "options.h"
#include "log.h"
#include "Socket.h"
#include "spa/support/loop.h"
#define BUFFER_SIZE 2048
#define SENSOR_STRING "bcst: SENSOR TYPE: 254 ID: "
struct PwlPriv
{
struct pw_context* context;
struct pw_core* core;
struct pw_registry* registry;
struct pw_main_loop* loop;
struct pw_node* sink_node;
struct spa_hook sink_listener;
struct pw_stream *sink_stream;
struct spa_audio_info stream_format;
uint8_t buffer[BUFFER_SIZE];
struct Config config;
TCPSocket* socket;
spa_source* timer_source;
struct timespec timout_timespan;
};
static void send_sensor(TCPSocket* socket, uint8_t id, bool state)
{
static int8_t lastState = -1;
if(state == lastState)
return;
lastState = state;
std::string sendbuf(SENSOR_STRING);
sendbuf.append(std::to_string(static_cast<unsigned int>(id)));
sendbuf.append(" FIELD: ");
sendbuf.append(std::to_string(static_cast<unsigned int>(state)));
sendbuf.push_back('\n');
Log(Log::DEBUG, false)<<sendbuf;
socket->send(sendbuf.c_str(), sendbuf.size());
}
static void off_timeout(void *data, uint64_t expirations)
{
PwlPriv *priv = reinterpret_cast<PwlPriv*>(data);
send_sensor(priv->socket, priv->config.id, false);
}
static void state_changed(PwlPriv *priv, bool state)
{
static bool lastState = false;
if(state == lastState)
return;
lastState = state;
if(state)
{
send_sensor(priv->socket, priv->config.id, true);
pw_loop_update_timer(pw_main_loop_get_loop(priv->loop), priv->timer_source, NULL, NULL, false);
}
else
{
pw_loop_update_timer(pw_main_loop_get_loop(priv->loop), priv->timer_source, &priv->timout_timespan, NULL, false);
}
}
static void on_process(void *data)
{
PwlPriv *priv = reinterpret_cast<PwlPriv*>(data);
struct pw_buffer *pw_buf = pw_stream_dequeue_buffer(priv->sink_stream);
if(!pw_buf)
{
Log(Log::WARN)<<"out of buffers";
return;
}
struct spa_buffer *spa_buf = pw_buf->buffer;
uint32_t sample_count = spa_buf->datas[0].chunk->size / sizeof(float);
float* samples = reinterpret_cast<float*>(spa_buf->datas[0].data);
float accum = 0;
for(uint32_t i = 0; i < sample_count; ++i)
accum += std::abs(samples[i]);
accum /= sample_count;
pw_stream_queue_buffer(priv->sink_stream, pw_buf);
state_changed(priv, accum > 0.001);
}
static void on_stream_param_changed(void *data, uint32_t id, const struct spa_pod *param)
{
PwlPriv *priv = reinterpret_cast<PwlPriv*>(data);
/* NULL means to clear the format */
if (param == NULL || id != SPA_PARAM_Format)
return;
if (spa_format_parse(param, &priv->stream_format.media_type, &priv->stream_format.media_subtype) < 0)
return;
/* only accept raw audio */
if (priv->stream_format.media_type != SPA_MEDIA_TYPE_audio ||
priv->stream_format.media_subtype != SPA_MEDIA_SUBTYPE_raw)
return;
/* call a helper function to parse the format for us. */
spa_format_audio_raw_parse(param, &priv->stream_format.info.raw);
Log(Log::DEBUG)<<"capturing rate: "<<priv->stream_format.info.raw.rate
<<" from "<<priv->stream_format.info.raw.channels<<" channels";
}
static void node_event_info(void* data, const struct pw_node_info *info)
{
static enum pw_node_state prev_state = PW_NODE_STATE_CREATING;
PwlPriv *priv = reinterpret_cast<PwlPriv*>(data);
if(prev_state != info->state)
{
prev_state = info->state;
Log(Log::INFO)<<"node "<<info->id<<" is in state "<<pw_node_state_as_string(info->state);
std::string sendbuf(SENSOR_STRING);
switch(info->state)
{
case PW_NODE_STATE_RUNNING:
state_changed(priv, true);
break;
case PW_NODE_STATE_IDLE:
case PW_NODE_STATE_SUSPENDED:
case PW_NODE_STATE_ERROR:
default:
state_changed(priv, false);
break;
}
}
}
static const struct pw_stream_events stream_events =
{
PW_VERSION_STREAM_EVENTS,
.param_changed = on_stream_param_changed,
.process = on_process,
};
static const struct pw_node_events node_events = {
.version = PW_VERSION_NODE_EVENTS,
.info = node_event_info,
};
static void setup_node_active_monitor(uint32_t id, const char* type, PwlPriv* priv)
{
priv->sink_node = static_cast<struct pw_node*>(pw_registry_bind(priv->registry, id, type, PW_VERSION_CLIENT, 0));
if(!priv->sink_node)
{
Log(Log::WARN)<<"could not get node";
return;
}
pw_node_add_listener(priv->sink_node, &priv->sink_listener, &node_events, priv);
}
static void setup_stream_listener(const char* serial, PwlPriv *priv)
{
struct pw_properties* props = pw_properties_new(PW_KEY_MEDIA_TYPE, "Audio",
PW_KEY_MEDIA_CATEGORY, "Capture",
PW_KEY_MEDIA_ROLE, "Music",
PW_KEY_TARGET_OBJECT, serial,
PW_KEY_STREAM_CAPTURE_SINK, "true",
NULL);
priv->sink_stream = pw_stream_new_simple(
pw_main_loop_get_loop(priv->loop),
"audio-capture",
props,
&stream_events,
priv);
const struct spa_pod *params[1];
struct spa_pod_builder b = SPA_POD_BUILDER_INIT(priv->buffer, BUFFER_SIZE);
struct spa_audio_info_raw info_raw = SPA_AUDIO_INFO_RAW_INIT(.format = SPA_AUDIO_FORMAT_F32, .rate = 1000);
params[0] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &info_raw);
enum pw_stream_flags flags = static_cast<enum pw_stream_flags>(PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS);
pw_stream_connect(priv->sink_stream, PW_DIRECTION_INPUT, PW_ID_ANY, flags, params, 1);
}
static void registry_event_global(void *data, uint32_t id,
uint32_t permissions, const char *type, uint32_t version,
const struct spa_dict *props)
{
PwlPriv* priv = static_cast<PwlPriv*>(data);
if(std::string(type) == "PipeWire:Interface:Node")
{
Log(Log::DEBUG)<<id<<' '<<type<<' '<<version;
const struct spa_dict_item* item;
spa_dict_for_each(item, props)
Log(Log::DEBUG)<<'\t'<<item->key<<' '<<item->value;
const struct spa_dict_item *name = spa_dict_lookup_item(props, "node.name");
if(std::string(name->value) == priv->config.sink_name)
{
const struct spa_dict_item *serial = spa_dict_lookup_item(props, "object.serial");
if(priv->config.use_active)
setup_node_active_monitor(id, type, priv);
else
setup_stream_listener(serial->value, priv);
pw_proxy_destroy((struct pw_proxy*)priv->registry);
}
}
}
static const struct pw_registry_events registry_events = {
.version = PW_VERSION_REGISTRY_EVENTS,
.global = registry_event_global,
};
int main(int argc, char *argv[])
{
Log::level = Log::INFO;
pw_init(&argc, &argv);
PwlPriv priv;
argp_parse(&argp, argc, argv, 0, 0, &priv.config);
if(priv.config.sink_name.empty())
{
Log(Log::ERROR)<<"A sink name (-n) must be specified!";
return 1;
}
Log(Log::INFO)<<"opening TCP socket to "<<priv.config.host<<" on port "<<priv.config.port;
try
{
priv.socket = new TCPSocket(priv.config.host, priv.config.port);
}
catch(SocketException &e)
{
Log(Log::ERROR)<<e.what();
return 1;
}
priv.loop = pw_main_loop_new(NULL);
if(!priv.loop)
return 1;
priv.timout_timespan.tv_sec = priv.config.timeout;
priv.timout_timespan.tv_nsec = 0;
priv.timer_source = pw_loop_add_timer(pw_main_loop_get_loop(priv.loop), off_timeout, &priv);
priv.context = pw_context_new(pw_main_loop_get_loop(priv.loop), NULL, 0);
if(!priv.context)
return 1;
priv.core = pw_context_connect(priv.context, NULL, 0);
if(!priv.core)
return 1;
priv.registry = pw_core_get_registry(priv.core, PW_VERSION_REGISTRY, 0 );
if(!priv.registry)
return 1;
struct spa_hook registry_listener;
spa_zero(registry_listener);
pw_registry_add_listener(priv.registry, &registry_listener,
&registry_events, &priv);
pw_main_loop_run(priv.loop);
if(priv.sink_node)
pw_proxy_destroy((struct pw_proxy*)priv.sink_node);
if(priv.sink_stream)
pw_stream_destroy(priv.sink_stream);
pw_core_disconnect(priv.core);
pw_context_destroy(priv.context);
pw_main_loop_destroy(priv.loop);
delete priv.socket;
return 0;
}