From 11b648c3ba5f782b9d0df07adb4681ecd66eeff5 Mon Sep 17 00:00:00 2001 From: Carl Philipp Klemm Date: Wed, 23 Jul 2025 18:25:53 +0200 Subject: [PATCH] initial commit --- CMakeLists.txt | 16 +++ Socket.cpp | 370 +++++++++++++++++++++++++++++++++++++++++++++++++ Socket.h | 326 +++++++++++++++++++++++++++++++++++++++++++ capture.c | 193 ++++++++++++++++++++++++++ capture.cpp | 193 ++++++++++++++++++++++++++ log.cpp | 63 +++++++++ log.h | 64 +++++++++ main.cpp | 308 ++++++++++++++++++++++++++++++++++++++++ options.h | 71 ++++++++++ 9 files changed, 1604 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 Socket.cpp create mode 100644 Socket.h create mode 100644 capture.c create mode 100644 capture.cpp create mode 100644 log.cpp create mode 100644 log.h create mode 100644 main.cpp create mode 100644 options.h diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..c4e8199 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,16 @@ +cmake_minimum_required(VERSION 3.20) + +project(pipewirestreamwatcher LANGUAGES CXX) +set(CXX_STANDARD 17) + +find_package(PkgConfig REQUIRED) +pkg_check_modules(PIPEWIRE REQUIRED libpipewire-0.3) + +add_executable(${PROJECT_NAME} main.cpp log.cpp Socket.cpp) +add_dependencies(${PROJECT_NAME} ${PROJECT_NAME}) +target_link_libraries(${PROJECT_NAME} ${PIPEWIRE_LIBRARIES}) +target_include_directories(${PROJECT_NAME} PUBLIC ${PIPEWIRE_INCLUDE_DIRS}) +set_target_properties(${PROJECT_NAME} PROPERTIES COMPILE_FLAGS "-Wall -O2 -march=x86-64-v2 -g" LINK_FLAGS "-flto") +install(TARGETS ${PROJECT_NAME} DESTINATION bin) + + diff --git a/Socket.cpp b/Socket.cpp new file mode 100644 index 0000000..429f861 --- /dev/null +++ b/Socket.cpp @@ -0,0 +1,370 @@ + +#include "Socket.h" + + + #include // For data types + #include // For socket(), connect(), send(), and recv() + #include // For gethostbyname() + #include // For inet_addr() + #include // For close() + #include // For sockaddr_in + #include // TCP_KEEPCNT + #include + typedef void raw_type; // Type used for raw data on this platform + +#include // For errno + +using namespace std; + +// SocketException Code + +SocketException::SocketException(const string &message, bool inclSysMsg) + : userMessage(message) { + if (inclSysMsg) { + userMessage.append(": "); + userMessage.append(strerror(errno)); + } +} + +SocketException::~SocketException() noexcept (true) { +} + +const char *SocketException::what(){ + return userMessage.c_str(); +} + +// Function to fill in address structure given an address and port +static void fillAddr(const string &address, unsigned short port, + sockaddr_in &addr) { + memset(&addr, 0, sizeof(addr)); // Zero out address structure + addr.sin_family = AF_INET; // Internet address + + hostent *host; // Resolve name + if ((host = gethostbyname(address.c_str())) == NULL) { + // strerror() will not work for gethostbyname() and hstrerror() + // is supposedly obsolete + throw SocketException("Failed to resolve name (gethostbyname())"); + } + addr.sin_addr.s_addr = *((unsigned long *) host->h_addr_list[0]); + + addr.sin_port = htons(port); // Assign port in network byte order +} + +// Socket Code + +Socket::Socket(int type, int protocol) { + + // Make a new socket + if ((sockDesc = socket(PF_INET, type, protocol)) < 0) { + throw SocketException("Socket creation failed (socket())", true); + } +} + +Socket::Socket(int sockDesc) { + this->sockDesc = sockDesc; +} + +Socket::~Socket() +{ + close(sockDesc); + sockDesc = -1; +} + +string Socket::getLocalAddress() { + sockaddr_in addr; + unsigned int addr_len = sizeof(addr); + + if (getsockname(sockDesc, (sockaddr *) &addr, (socklen_t *) &addr_len) < 0) { + throw SocketException("Fetch of local address failed (getsockname())", true); + } + return inet_ntoa(addr.sin_addr); +} + +unsigned short Socket::getLocalPort() { + sockaddr_in addr; + unsigned int addr_len = sizeof(addr); + + if (getsockname(sockDesc, (sockaddr *) &addr, (socklen_t *) &addr_len) < 0) { + throw SocketException("Fetch of local port failed (getsockname())", true); + } + return ntohs(addr.sin_port); +} + +void Socket::setLocalPort(unsigned short localPort) { + // Bind the socket to its port + sockaddr_in localAddr; + memset(&localAddr, 0, sizeof(localAddr)); + localAddr.sin_family = AF_INET; + localAddr.sin_addr.s_addr = htonl(INADDR_ANY); + localAddr.sin_port = htons(localPort); + + if (bind(sockDesc, (sockaddr *) &localAddr, sizeof(sockaddr_in)) < 0) { + throw SocketException("Set of local port failed (bind())", true); + } +} + +void Socket::setLocalAddressAndPort(const string &localAddress, + unsigned short localPort) { + // Get the address of the requested host + sockaddr_in localAddr; + fillAddr(localAddress, localPort, localAddr); + + if (bind(sockDesc, (sockaddr *) &localAddr, sizeof(sockaddr_in)) < 0) { + throw SocketException("Set of local address and port failed (bind())", true); + } +} + +void Socket::setKeepalive() +{ + int optval = 1; + setsockopt(sockDesc, SOL_SOCKET, SO_KEEPALIVE,&optval, sizeof(optval)); + + optval = 2; + setsockopt(sockDesc, SOL_SOCKET, TCP_KEEPCNT, &optval, sizeof(optval)); + + optval = 10; + setsockopt(sockDesc, SOL_SOCKET, TCP_KEEPIDLE, &optval, sizeof(optval)); + + optval = 5; + setsockopt(sockDesc, SOL_SOCKET, TCP_KEEPINTVL, &optval, sizeof(optval)); +} + +void Socket::setBlocking(bool flag) +{ + int flags = fcntl(sockDesc, F_GETFL, 0); + if( !flag ) flags = flags | O_NONBLOCK; + else flags = flags & ~O_NONBLOCK; + fcntl(sockDesc, F_SETFL, flags); +} + +void Socket::cleanUp() { + +} + +unsigned short Socket::resolveService(const string &service, + const string &protocol) { + struct servent *serv; /* Structure containing service information */ + + if ((serv = getservbyname(service.c_str(), protocol.c_str())) == NULL) + return atoi(service.c_str()); /* Service is port number */ + else + return ntohs(serv->s_port); /* Found port (network byte order) by name */ +} + +// CommunicatingSocket Code + +CommunicatingSocket::CommunicatingSocket(int type, int protocol) + : Socket(type, protocol) { +} + +CommunicatingSocket::CommunicatingSocket(int newConnSD) : Socket(newConnSD) { +} + +void CommunicatingSocket::connect(const string &foreignAddress, + unsigned short foreignPort) { + // Get the address of the requested host + sockaddr_in destAddr; + fillAddr(foreignAddress, foreignPort, destAddr); + + // Try to connect to the given port + if (::connect(sockDesc, (sockaddr *) &destAddr, sizeof(destAddr)) < 0) { + throw SocketException("Connect failed (connect())", true); + } +} + +void CommunicatingSocket::send(const void *buffer, int bufferLen) + { + if (::send(sockDesc, (raw_type *) buffer, bufferLen, 0) < 0) { + throw SocketException("Send failed (send())", true); + } +} + +int CommunicatingSocket::recv(void *buffer, int bufferLen) +{ + int rtn; + if ((rtn = ::recv(sockDesc, (raw_type *) buffer, bufferLen, MSG_DONTWAIT)) < 0 ) + { + if(errno == EWOULDBLOCK || errno == EAGAIN) return -1; + else throw SocketException("Received failed (recv())", true); + } + + return rtn; +} + +string CommunicatingSocket::getForeignAddress() + { + sockaddr_in addr; + unsigned int addr_len = sizeof(addr); + + if (getpeername(sockDesc, (sockaddr *) &addr,(socklen_t *) &addr_len) < 0) { + throw SocketException("Fetch of foreign address failed (getpeername())", true); + } + return inet_ntoa(addr.sin_addr); +} + +unsigned short CommunicatingSocket::getForeignPort() { + sockaddr_in addr; + unsigned int addr_len = sizeof(addr); + + if (getpeername(sockDesc, (sockaddr *) &addr, (socklen_t *) &addr_len) < 0) { + throw SocketException("Fetch of foreign port failed (getpeername())", true); + } + return ntohs(addr.sin_port); +} + +// TCPSocket Code + +TCPSocket::TCPSocket() + : CommunicatingSocket(SOCK_STREAM, + IPPROTO_TCP) { +} + +TCPSocket::TCPSocket(const string &foreignAddress, unsigned short foreignPort, bool keepalive) + : CommunicatingSocket(SOCK_STREAM, IPPROTO_TCP) { + connect(foreignAddress, foreignPort); + if(keepalive) setKeepalive(); +} + +TCPSocket::TCPSocket(int newConnSD) : CommunicatingSocket(newConnSD) { +} + +// TCPServerSocket Code + +TCPServerSocket::TCPServerSocket(unsigned short localPort, int queueLen, bool keepaliveIN) + : Socket(SOCK_STREAM, IPPROTO_TCP) +{ + keepalive = keepaliveIN; + setLocalPort(localPort); + setListen(queueLen); +} + +TCPServerSocket::TCPServerSocket(const string &localAddress, unsigned short localPort, int queueLen, bool keepaliveIN) + : Socket(SOCK_STREAM, IPPROTO_TCP) +{ + keepalive = keepaliveIN; + setLocalAddressAndPort(localAddress, localPort); + setListen(queueLen); +} + +TCPSocket* TCPServerSocket::accept() +{ + int newConnSD = -1; + if ((newConnSD = ::accept(sockDesc, NULL, 0)) < 0 && errno != EAGAIN && errno != EWOULDBLOCK ) + { + throw SocketException("Accept failed (accept())", true); + } + TCPSocket* newSocket = nullptr; + if(newConnSD > 0) + { + newSocket = new TCPSocket(newConnSD); + if(keepalive) newSocket->setKeepalive(); + } + return newSocket; +} + + + +void TCPServerSocket::setListen(int queueLen) { + if (listen(sockDesc, queueLen) < 0) { + throw SocketException("Set listening socket failed (listen())", true); + } +} + +// UDPSocket Code + +UDPSocket::UDPSocket() : CommunicatingSocket(SOCK_DGRAM, + IPPROTO_UDP) { + setBroadcast(); +} + +UDPSocket::UDPSocket(unsigned short localPort) : + CommunicatingSocket(SOCK_DGRAM, IPPROTO_UDP) { + setLocalPort(localPort); + setBroadcast(); +} + +UDPSocket::UDPSocket(const string &localAddress, unsigned short localPort) + : CommunicatingSocket(SOCK_DGRAM, IPPROTO_UDP) { + setLocalAddressAndPort(localAddress, localPort); + setBroadcast(); +} + +void UDPSocket::setBroadcast() { + // If this fails, we'll hear about it when we try to send. This will allow + // system that cannot broadcast to continue if they don't plan to broadcast + int broadcastPermission = 1; + setsockopt(sockDesc, SOL_SOCKET, SO_BROADCAST, + (raw_type *) &broadcastPermission, sizeof(broadcastPermission)); +} + +void UDPSocket::disconnect() { + sockaddr_in nullAddr; + memset(&nullAddr, 0, sizeof(nullAddr)); + nullAddr.sin_family = AF_UNSPEC; + + // Try to disconnect + if (::connect(sockDesc, (sockaddr *) &nullAddr, sizeof(nullAddr)) < 0) { + if (errno != EAFNOSUPPORT) { + throw SocketException("Disconnect failed (connect())", true); + } + } +} + +void UDPSocket::sendTo(const void *buffer, int bufferLen, + const string &foreignAddress, unsigned short foreignPort) + { + sockaddr_in destAddr; + fillAddr(foreignAddress, foreignPort, destAddr); + + // Write out the whole buffer as a single message. + if (sendto(sockDesc, (raw_type *) buffer, bufferLen, 0, + (sockaddr *) &destAddr, sizeof(destAddr)) != bufferLen) { + throw SocketException("Send failed (sendto())", true); + } +} + +int UDPSocket::recvFrom(void *buffer, int bufferLen, string &sourceAddress, + unsigned short &sourcePort) { + sockaddr_in clntAddr; + socklen_t addrLen = sizeof(clntAddr); + int rtn; + if ((rtn = recvfrom(sockDesc, (raw_type *) buffer, bufferLen, MSG_DONTWAIT, (sockaddr *) &clntAddr, (socklen_t *) &addrLen)) < 0) + { + throw SocketException("Receive failed (recvfrom())", true); + } + sourceAddress = inet_ntoa(clntAddr.sin_addr); + sourcePort = ntohs(clntAddr.sin_port); + + return rtn; +} + +void UDPSocket::setMulticastTTL(unsigned char multicastTTL) { + if (setsockopt(sockDesc, IPPROTO_IP, IP_MULTICAST_TTL, + (raw_type *) &multicastTTL, sizeof(multicastTTL)) < 0) { + throw SocketException("Multicast TTL set failed (setsockopt())", true); + } +} + +void UDPSocket::joinGroup(const string &multicastGroup) { + struct ip_mreq multicastRequest; + + multicastRequest.imr_multiaddr.s_addr = inet_addr(multicastGroup.c_str()); + multicastRequest.imr_interface.s_addr = htonl(INADDR_ANY); + if (setsockopt(sockDesc, IPPROTO_IP, IP_ADD_MEMBERSHIP, + (raw_type *) &multicastRequest, + sizeof(multicastRequest)) < 0) { + throw SocketException("Multicast group join failed (setsockopt())", true); + } +} + +void UDPSocket::leaveGroup(const string &multicastGroup) { + struct ip_mreq multicastRequest; + + multicastRequest.imr_multiaddr.s_addr = inet_addr(multicastGroup.c_str()); + multicastRequest.imr_interface.s_addr = htonl(INADDR_ANY); + if (setsockopt(sockDesc, IPPROTO_IP, IP_DROP_MEMBERSHIP, + (raw_type *) &multicastRequest, + sizeof(multicastRequest)) < 0) { + throw SocketException("Multicast group leave failed (setsockopt())", true); + } +} diff --git a/Socket.h b/Socket.h new file mode 100644 index 0000000..cc943d3 --- /dev/null +++ b/Socket.h @@ -0,0 +1,326 @@ +#ifndef __SOCKET_INCLUDED__ +#define __SOCKET_INCLUDED__ + +#include // For std::string +#include // For exception class +#include +#include + +/** + * Signals a problem with the execution of a socket call. + */ +class SocketException : public std::exception { +public: + /** + * Construct a SocketException with a explanatory message. + * @param message explanatory message + * @param incSysMsg true if system message (from strerror(errno)) + * should be postfixed to the user provided message + */ + SocketException(const std::string &message, bool inclSysMsg = false) ; + + /** + * Provided just to guarantee that no exceptions are thrown. + */ + ~SocketException() noexcept (true); + + /** + * Get the exception message + * @return exception message + */ + const char *what(); + +private: + std::string userMessage; // Exception message +}; + +/** + * Base class representing basic communication endpoint + */ +class Socket { +public: + /** + * Close and deallocate this socket + */ + ~Socket(); + + /** + * Get the local address + * @return local address of socket + * @exception SocketException thrown if fetch fails + */ + std::string getLocalAddress(); + + /** + * Get the local port + * @return local port of socket + * @exception SocketException thrown if fetch fails + */ + unsigned short getLocalPort() ; + + /** + * Set the local port to the specified port and the local address + * to any interface + * @param localPort local port + * @exception SocketException thrown if setting local port fails + */ + void setLocalPort(unsigned short localPort) ; + + /** + * Set the local port to the specified port and the local address + * to the specified address. If you omit the port, a random port + * will be selected. + * @param localAddress local address + * @param localPort local port + * @exception SocketException thrown if setting local port or address fails + */ + void setLocalAddressAndPort(const std::string &localAddress, + unsigned short localPort = 0) ; + + /** + * If WinSock, unload the WinSock DLLs; otherwise do nothing. We ignore + * this in our sample client code but include it in the library for + * completeness. If you are running on Windows and you are concerned + * about DLL resource consumption, call this after you are done with all + * Socket instances. If you execute this on Windows while some instance of + * Socket exists, you are toast. For portability of client code, this is + * an empty function on non-Windows platforms so you can always include it. + * @param buffer buffer to receive the data + * @param bufferLen maximum number of bytes to read into buffer + * @return number of bytes read, 0 for EOF, and -1 for error + * @exception SocketException thrown WinSock clean up fails + */ + static void cleanUp() ; + + /** + * Resolve the specified service for the specified protocol to the + * corresponding port number in host byte order + * @param service service to resolve (e.g., "http") + * @param protocol protocol of service to resolve. Default is "tcp". + */ + static unsigned short resolveService(const std::string &service, + const std::string &protocol = "tcp"); + + void setKeepalive(); + void setBlocking(bool flag); + +private: + // Prevent the user from trying to use value semantics on this object + Socket(const Socket &sock); + void operator=(const Socket &sock); + +protected: + int sockDesc; // Socket descriptor + Socket(int type, int protocol) ; + Socket(int sockDesc); +}; + +/** + * Socket which is able to connect, send, and receive + */ +class CommunicatingSocket : public Socket { +public: + /** + * Establish a socket connection with the given foreign + * address and port + * @param foreignAddress foreign address (IP address or name) + * @param foreignPort foreign port + * @exception SocketException thrown if unable to establish connection + */ + void connect(const std::string &foreignAddress, unsigned short foreignPort) + ; + + /** + * Write the given buffer to this socket. Call connect() before + * calling send() + * @param buffer buffer to be written + * @param bufferLen number of bytes from buffer to be written + * @exception SocketException thrown if unable to send data + */ + void send(const void *buffer, int bufferLen) ; + + /** + * Read into the given buffer up to bufferLen bytes data from this + * socket. Call connect() before calling recv() + * @param buffer buffer to receive the data + * @param bufferLen maximum number of bytes to read into buffer + * @return number of bytes read, 0 for EOF, and -1 for error + * @exception SocketException thrown if unable to receive data + */ + int recv(void *buffer, int bufferLen) ; + + /** + * Get the foreign address. Call connect() before calling recv() + * @return foreign address + * @exception SocketException thrown if unable to fetch foreign address + */ + std::string getForeignAddress() ; + + /** + * Get the foreign port. Call connect() before calling recv() + * @return foreign port + * @exception SocketException thrown if unable to fetch foreign port + */ + unsigned short getForeignPort() ; + +protected: + CommunicatingSocket(int type, int protocol) ; + CommunicatingSocket(int newConnSD); +}; + +/** + * TCP socket for communication with other TCP sockets + */ +class TCPSocket : public CommunicatingSocket { +public: + /** + * Construct a TCP socket with no connection + * @exception SocketException thrown if unable to create TCP socket + */ + TCPSocket() ; + + /** + * Construct a TCP socket with a connection to the given foreign address + * and port + * @param foreignAddress foreign address (IP address or name) + * @param foreignPort foreign port + * @exception SocketException thrown if unable to create TCP socket + */ + TCPSocket(const std::string &foreignAddress, unsigned short foreignPort, bool keepalive = false) + ; + + bool isOpen(); + +private: + // Access for TCPServerSocket::accept() connection creation + friend class TCPServerSocket; + TCPSocket(int newConnSD); +}; + +/** + * TCP socket class for servers + */ +class TCPServerSocket : public Socket { +public: + /** + * Construct a TCP socket for use with a server, accepting connections + * on the specified port on any interface + * @param localPort local port of server socket, a value of zero will + * give a system-assigned unused port + * @param queueLen maximum queue length for outstanding + * connection requests (default 5) + * @exception SocketException thrown if unable to create TCP server socket + */ + TCPServerSocket(unsigned short localPort, int queueLen = 5, bool keepaliveIN = false); + + /** + * Construct a TCP socket for use with a server, accepting connections + * on the specified port on the interface specified by the given address + * @param localAddress local interface (address) of server socket + * @param localPort local port of server socket + * @param queueLen maximum queue length for outstanding + * connection requests (default 5) + * @exception SocketException thrown if unable to create TCP server socket + */ + TCPServerSocket(const std::string &localAddress, unsigned short localPort, + int queueLen = 5, bool keepaliveIN = false); + + /** + * Blocks until a new connection is established on this socket or error + * @return new connection socket + * @exception SocketException thrown if attempt to accept a new connection fails + */ + TCPSocket *accept() ; + + +private: + void setListen(int queueLen) ; + bool keepalive; +}; + +/** + * UDP socket class + */ +class UDPSocket : public CommunicatingSocket { +public: + /** + * Construct a UDP socket + * @exception SocketException thrown if unable to create UDP socket + */ + UDPSocket() ; + + /** + * Construct a UDP socket with the given local port + * @param localPort local port + * @exception SocketException thrown if unable to create UDP socket + */ + UDPSocket(unsigned short localPort) ; + + /** + * Construct a UDP socket with the given local port and address + * @param localAddress local address + * @param localPort local port + * @exception SocketException thrown if unable to create UDP socket + */ + UDPSocket(const std::string &localAddress, unsigned short localPort) + ; + + /** + * Unset foreign address and port + * @return true if disassociation is successful + * @exception SocketException thrown if unable to disconnect UDP socket + */ + void disconnect() ; + + /** + * Send the given buffer as a UDP datagram to the + * specified address/port + * @param buffer buffer to be written + * @param bufferLen number of bytes to write + * @param foreignAddress address (IP address or name) to send to + * @param foreignPort port number to send to + * @return true if send is successful + * @exception SocketException thrown if unable to send datagram + */ + void sendTo(const void *buffer, int bufferLen, const std::string &foreignAddress, + unsigned short foreignPort) ; + + /** + * Read read up to bufferLen bytes data from this socket. The given buffer + * is where the data will be placed + * @param buffer buffer to receive data + * @param bufferLen maximum number of bytes to receive + * @param sourceAddress address of datagram source + * @param sourcePort port of data source + * @return number of bytes received and -1 for error + * @exception SocketException thrown if unable to receive datagram + */ + int recvFrom(void *buffer, int bufferLen, std::string &sourceAddress, + unsigned short &sourcePort) ; + + /** + * Set the multicast TTL + * @param multicastTTL multicast TTL + * @exception SocketException thrown if unable to set TTL + */ + void setMulticastTTL(unsigned char multicastTTL) ; + + /** + * Join the specified multicast group + * @param multicastGroup multicast group address to join + * @exception SocketException thrown if unable to join group + */ + void joinGroup(const std::string &multicastGroup) ; + + /** + * Leave the specified multicast group + * @param multicastGroup multicast group address to leave + * @exception SocketException thrown if unable to leave group + */ + void leaveGroup(const std::string &multicastGroup) ; + +private: + void setBroadcast(); +}; + +#endif diff --git a/capture.c b/capture.c new file mode 100644 index 0000000..b47171b --- /dev/null +++ b/capture.c @@ -0,0 +1,193 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2022 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +/* + [title] + Audio capture using \ref pw_stream "pw_stream". + [title] + */ + +#include +#include +#include +#include + +#include + +#include + +struct data +{ + struct pw_main_loop *loop; + struct pw_stream *stream; + + struct spa_audio_info format; + unsigned move:1; +}; + +/* our data processing function is in general: + * + * struct pw_buffer *b; + * b = pw_stream_dequeue_buffer(stream); + * + * .. consume stuff in the buffer ... + * + * pw_stream_queue_buffer(stream, b); + */ +static void on_process(void *userdata) +{ + struct data *data = userdata; + struct pw_buffer *b; + struct spa_buffer *buf; + float *samples, max; + uint32_t c, n, n_channels, n_samples, peak; + + if ((b = pw_stream_dequeue_buffer(data->stream)) == NULL) + { + pw_log_warn("out of buffers: %m"); + return; + } + + buf = b->buffer; + if ((samples = buf->datas[0].data) == NULL) + return; + + n_channels = data->format.info.raw.channels; + n_samples = buf->datas[0].chunk->size / sizeof(float); + + /* move cursor up */ + if (data->move) + fprintf(stdout, "%c[%dA", 0x1b, n_channels + 1); + fprintf(stdout, "captured %d samples\n", n_samples / n_channels); + for (c = 0; c < data->format.info.raw.channels; c++) + { + max = 0.0f; + for (n = c; n < n_samples; n += n_channels) + max = fmaxf(max, fabsf(samples[n])); + + peak = SPA_CLAMP(max * 30, 0, 39); + + fprintf(stdout, "channel %d: |%*s%*s| peak:%f\n", + c, peak+1, "*", 40 - peak, "", max); + } + data->move = true; + fflush(stdout); + + pw_stream_queue_buffer(data->stream, b); +} + +/* Be notified when the stream param changes. We're only looking at the + * format changes. + */ +static void +on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param) +{ + struct data *data = _data; + + /* NULL means to clear the format */ + if (param == NULL || id != SPA_PARAM_Format) + return; + + if (spa_format_parse(param, &data->format.media_type, &data->format.media_subtype) < 0) + return; + + /* only accept raw audio */ + if (data->format.media_type != SPA_MEDIA_TYPE_audio || + data->format.media_subtype != SPA_MEDIA_SUBTYPE_raw) + return; + + /* call a helper function to parse the format for us. */ + spa_format_audio_raw_parse(param, &data->format.info.raw); + + fprintf(stdout, "capturing rate:%d channels:%d\n", + data->format.info.raw.rate, data->format.info.raw.channels); + +} + +static const struct pw_stream_events stream_events = +{ + PW_VERSION_STREAM_EVENTS, + .param_changed = on_stream_param_changed, + .process = on_process, +}; + +static void do_quit(void *userdata, int signal_number) +{ + struct data *data = userdata; + pw_main_loop_quit(data->loop); +} + +int main(int argc, char *argv[]) +{ + struct data data = { 0, }; + const struct spa_pod *params[1]; + uint8_t buffer[1024]; + struct pw_properties *props; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + + pw_init(&argc, &argv); + + /* make a main loop. If you already have another main loop, you can add + * the fd of this pipewire mainloop to it. */ + data.loop = pw_main_loop_new(NULL); + + pw_loop_add_signal(pw_main_loop_get_loop(data.loop), SIGINT, do_quit, &data); + pw_loop_add_signal(pw_main_loop_get_loop(data.loop), SIGTERM, do_quit, &data); + + /* Create a simple stream, the simple stream manages the core and remote + * objects for you if you don't need to deal with them. + * + * If you plan to autoconnect your stream, you need to provide at least + * media, category and role properties. + * + * Pass your events and a user_data pointer as the last arguments. This + * will inform you about the stream state. The most important event + * you need to listen to is the process event where you need to produce + * the data. + */ + props = pw_properties_new(PW_KEY_MEDIA_TYPE, "Audio", + PW_KEY_MEDIA_CATEGORY, "Capture", + PW_KEY_MEDIA_ROLE, "Music", + NULL); + if (argc > 1) + /* Set stream target if given on command line */ + pw_properties_set(props, PW_KEY_TARGET_OBJECT, argv[1]); + + /* uncomment if you want to capture from the sink monitor ports */ + pw_properties_set(props, PW_KEY_STREAM_CAPTURE_SINK, "true"); + + data.stream = pw_stream_new_simple( + pw_main_loop_get_loop(data.loop), + "audio-capture", + props, + &stream_events, + &data); + + /* Make one parameter with the supported formats. The SPA_PARAM_EnumFormat + * id means that this is a format enumeration (of 1 value). + * We leave the channels and rate empty to accept the native graph + * rate and channels. */ + params[0] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, + &SPA_AUDIO_INFO_RAW_INIT( + .format = SPA_AUDIO_FORMAT_F32)); + + /* Now connect this stream. We ask that our process function is + * called in a realtime thread. */ + pw_stream_connect(data.stream, + PW_DIRECTION_INPUT, + PW_ID_ANY, + PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + params, 1); + + /* and wait while we let things run */ + pw_main_loop_run(data.loop); + + pw_stream_destroy(data.stream); + pw_main_loop_destroy(data.loop); + pw_deinit(); + + return 0; +} diff --git a/capture.cpp b/capture.cpp new file mode 100644 index 0000000..b47171b --- /dev/null +++ b/capture.cpp @@ -0,0 +1,193 @@ +/* PipeWire */ +/* SPDX-FileCopyrightText: Copyright © 2022 Wim Taymans */ +/* SPDX-License-Identifier: MIT */ + +/* + [title] + Audio capture using \ref pw_stream "pw_stream". + [title] + */ + +#include +#include +#include +#include + +#include + +#include + +struct data +{ + struct pw_main_loop *loop; + struct pw_stream *stream; + + struct spa_audio_info format; + unsigned move:1; +}; + +/* our data processing function is in general: + * + * struct pw_buffer *b; + * b = pw_stream_dequeue_buffer(stream); + * + * .. consume stuff in the buffer ... + * + * pw_stream_queue_buffer(stream, b); + */ +static void on_process(void *userdata) +{ + struct data *data = userdata; + struct pw_buffer *b; + struct spa_buffer *buf; + float *samples, max; + uint32_t c, n, n_channels, n_samples, peak; + + if ((b = pw_stream_dequeue_buffer(data->stream)) == NULL) + { + pw_log_warn("out of buffers: %m"); + return; + } + + buf = b->buffer; + if ((samples = buf->datas[0].data) == NULL) + return; + + n_channels = data->format.info.raw.channels; + n_samples = buf->datas[0].chunk->size / sizeof(float); + + /* move cursor up */ + if (data->move) + fprintf(stdout, "%c[%dA", 0x1b, n_channels + 1); + fprintf(stdout, "captured %d samples\n", n_samples / n_channels); + for (c = 0; c < data->format.info.raw.channels; c++) + { + max = 0.0f; + for (n = c; n < n_samples; n += n_channels) + max = fmaxf(max, fabsf(samples[n])); + + peak = SPA_CLAMP(max * 30, 0, 39); + + fprintf(stdout, "channel %d: |%*s%*s| peak:%f\n", + c, peak+1, "*", 40 - peak, "", max); + } + data->move = true; + fflush(stdout); + + pw_stream_queue_buffer(data->stream, b); +} + +/* Be notified when the stream param changes. We're only looking at the + * format changes. + */ +static void +on_stream_param_changed(void *_data, uint32_t id, const struct spa_pod *param) +{ + struct data *data = _data; + + /* NULL means to clear the format */ + if (param == NULL || id != SPA_PARAM_Format) + return; + + if (spa_format_parse(param, &data->format.media_type, &data->format.media_subtype) < 0) + return; + + /* only accept raw audio */ + if (data->format.media_type != SPA_MEDIA_TYPE_audio || + data->format.media_subtype != SPA_MEDIA_SUBTYPE_raw) + return; + + /* call a helper function to parse the format for us. */ + spa_format_audio_raw_parse(param, &data->format.info.raw); + + fprintf(stdout, "capturing rate:%d channels:%d\n", + data->format.info.raw.rate, data->format.info.raw.channels); + +} + +static const struct pw_stream_events stream_events = +{ + PW_VERSION_STREAM_EVENTS, + .param_changed = on_stream_param_changed, + .process = on_process, +}; + +static void do_quit(void *userdata, int signal_number) +{ + struct data *data = userdata; + pw_main_loop_quit(data->loop); +} + +int main(int argc, char *argv[]) +{ + struct data data = { 0, }; + const struct spa_pod *params[1]; + uint8_t buffer[1024]; + struct pw_properties *props; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + + pw_init(&argc, &argv); + + /* make a main loop. If you already have another main loop, you can add + * the fd of this pipewire mainloop to it. */ + data.loop = pw_main_loop_new(NULL); + + pw_loop_add_signal(pw_main_loop_get_loop(data.loop), SIGINT, do_quit, &data); + pw_loop_add_signal(pw_main_loop_get_loop(data.loop), SIGTERM, do_quit, &data); + + /* Create a simple stream, the simple stream manages the core and remote + * objects for you if you don't need to deal with them. + * + * If you plan to autoconnect your stream, you need to provide at least + * media, category and role properties. + * + * Pass your events and a user_data pointer as the last arguments. This + * will inform you about the stream state. The most important event + * you need to listen to is the process event where you need to produce + * the data. + */ + props = pw_properties_new(PW_KEY_MEDIA_TYPE, "Audio", + PW_KEY_MEDIA_CATEGORY, "Capture", + PW_KEY_MEDIA_ROLE, "Music", + NULL); + if (argc > 1) + /* Set stream target if given on command line */ + pw_properties_set(props, PW_KEY_TARGET_OBJECT, argv[1]); + + /* uncomment if you want to capture from the sink monitor ports */ + pw_properties_set(props, PW_KEY_STREAM_CAPTURE_SINK, "true"); + + data.stream = pw_stream_new_simple( + pw_main_loop_get_loop(data.loop), + "audio-capture", + props, + &stream_events, + &data); + + /* Make one parameter with the supported formats. The SPA_PARAM_EnumFormat + * id means that this is a format enumeration (of 1 value). + * We leave the channels and rate empty to accept the native graph + * rate and channels. */ + params[0] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, + &SPA_AUDIO_INFO_RAW_INIT( + .format = SPA_AUDIO_FORMAT_F32)); + + /* Now connect this stream. We ask that our process function is + * called in a realtime thread. */ + pw_stream_connect(data.stream, + PW_DIRECTION_INPUT, + PW_ID_ANY, + PW_STREAM_FLAG_AUTOCONNECT | + PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS, + params, 1); + + /* and wait while we let things run */ + pw_main_loop_run(data.loop); + + pw_stream_destroy(data.stream); + pw_main_loop_destroy(data.loop); + pw_deinit(); + + return 0; +} diff --git a/log.cpp b/log.cpp new file mode 100644 index 0000000..61fa188 --- /dev/null +++ b/log.cpp @@ -0,0 +1,63 @@ +/** +* Lubricant Detecter +* Copyright (C) 2021 Carl Klemm +* +* This program is free software; you can redistribute it and/or +* modify it under the terms of the GNU General Public License +* version 3 as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with this program; if not, write to the +* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, +* Boston, MA 02110-1301, USA. +*/ + +#include "log.h" + +Log::Log(Level type, bool endlineI): endline(endlineI) +{ + msglevel = type; + if(headers) + { + operator << ("["+getLabel(type)+"] "); + } +} + +Log::~Log() +{ + if(opened && endline) + { + std::cout<<'\n'; + } + opened = false; +} + + +std::string Log::getLabel(Level level) +{ + std::string label; + switch(level) + { + case DEBUG: + label = "DEBUG"; + break; + case INFO: + label = "INFO "; + break; + case WARN: + label = "WARN "; + break; + case ERROR: + label = "ERROR"; + break; + } + return label; +} + +bool Log::headers = false; +Log::Level Log::level = WARN; diff --git a/log.h b/log.h new file mode 100644 index 0000000..c0d90eb --- /dev/null +++ b/log.h @@ -0,0 +1,64 @@ +/** +* eisgenerator +* Copyright (C) 2021 Carl Klemm +* +* This program is free software; you can redistribute it and/or +* modify it under the terms of the GNU General Public License +* version 3 as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with this program; if not, write to the +* Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, +* Boston, MA 02110-1301, USA. +*/ + +#pragma once +#include +#include + +class Log +{ +public: + + enum Level + { + DEBUG, + INFO, + WARN, + ERROR + }; + +private: + bool opened = false; + Level msglevel = DEBUG; + bool endline = true; + + std::string getLabel(Level level); + +public: + + static bool headers; + static Level level; + + Log() {} + Log(Level type, bool endlineI = true); + ~Log(); + + template Log &operator<<(const T &msg) + { + if(msglevel >= level) + { + if(msglevel == ERROR) + std::cerr< +#include +#include +#include +#include +#include +#include + +#include "options.h" +#include "log.h" +#include "Socket.h" +#include "spa/support/loop.h" + + +#define BUFFER_SIZE 2048 + +#define SENSOR_STRING "bcst: SENSOR TYPE: 254 ID: " + +struct PwlPriv +{ + struct pw_context* context; + struct pw_core* core; + struct pw_registry* registry; + struct pw_main_loop* loop; + struct pw_node* sink_node; + struct spa_hook sink_listener; + + struct pw_stream *sink_stream; + struct spa_audio_info stream_format; + uint8_t buffer[BUFFER_SIZE]; + + struct Config config; + + TCPSocket* socket; + + spa_source* timer_source; + struct timespec timout_timespan; +}; + +static void send_sensor(TCPSocket* socket, uint8_t id, bool state) +{ + static int8_t lastState = -1; + + if(state == lastState) + return; + + lastState = state; + + std::string sendbuf(SENSOR_STRING); + sendbuf.append(std::to_string(static_cast(id))); + sendbuf.append(" FIELD: "); + sendbuf.append(std::to_string(static_cast(state))); + sendbuf.push_back('\n'); + Log(Log::DEBUG, false)<send(sendbuf.c_str(), sendbuf.size()); +} + +static void off_timeout(void *data, uint64_t expirations) +{ + PwlPriv *priv = reinterpret_cast(data); + send_sensor(priv->socket, priv->config.id, false); +} + +static void state_changed(PwlPriv *priv, bool state) +{ + static bool lastState = false; + + if(state == lastState) + return; + + lastState = state; + + if(state) + { + send_sensor(priv->socket, priv->config.id, true); + pw_loop_update_timer(pw_main_loop_get_loop(priv->loop), priv->timer_source, NULL, NULL, false); + } + else + { + pw_loop_update_timer(pw_main_loop_get_loop(priv->loop), priv->timer_source, &priv->timout_timespan, NULL, false); + } +} + +static void on_process(void *data) +{ + PwlPriv *priv = reinterpret_cast(data); + + struct pw_buffer *pw_buf = pw_stream_dequeue_buffer(priv->sink_stream); + if(!pw_buf) + { + Log(Log::WARN)<<"out of buffers"; + return; + } + + struct spa_buffer *spa_buf = pw_buf->buffer; + + uint32_t sample_count = spa_buf->datas[0].chunk->size / sizeof(float); + float* samples = reinterpret_cast(spa_buf->datas[0].data); + + float accum = 0; + for(uint32_t i = 0; i < sample_count; ++i) + accum += std::abs(samples[i]); + accum /= sample_count; + pw_stream_queue_buffer(priv->sink_stream, pw_buf); + + state_changed(priv, accum > 0.001); +} + +static void on_stream_param_changed(void *data, uint32_t id, const struct spa_pod *param) +{ + PwlPriv *priv = reinterpret_cast(data); + + /* NULL means to clear the format */ + if (param == NULL || id != SPA_PARAM_Format) + return; + + if (spa_format_parse(param, &priv->stream_format.media_type, &priv->stream_format.media_subtype) < 0) + return; + + /* only accept raw audio */ + if (priv->stream_format.media_type != SPA_MEDIA_TYPE_audio || + priv->stream_format.media_subtype != SPA_MEDIA_SUBTYPE_raw) + return; + + /* call a helper function to parse the format for us. */ + spa_format_audio_raw_parse(param, &priv->stream_format.info.raw); + + Log(Log::DEBUG)<<"capturing rate: "<stream_format.info.raw.rate + <<" from "<stream_format.info.raw.channels<<" channels"; +} + +static void node_event_info(void* data, const struct pw_node_info *info) +{ + static enum pw_node_state prev_state = PW_NODE_STATE_CREATING; + + PwlPriv *priv = reinterpret_cast(data); + + if(prev_state != info->state) + { + prev_state = info->state; + Log(Log::INFO)<<"node "<id<<" is in state "<state); + + std::string sendbuf(SENSOR_STRING); + switch(info->state) + { + case PW_NODE_STATE_RUNNING: + state_changed(priv, true); + break; + case PW_NODE_STATE_IDLE: + case PW_NODE_STATE_SUSPENDED: + case PW_NODE_STATE_ERROR: + default: + state_changed(priv, false); + break; + } + } +} + +static const struct pw_stream_events stream_events = +{ + PW_VERSION_STREAM_EVENTS, + .param_changed = on_stream_param_changed, + .process = on_process, +}; + +static const struct pw_node_events node_events = { + .version = PW_VERSION_NODE_EVENTS, + .info = node_event_info, +}; + +static void setup_node_active_monitor(uint32_t id, const char* type, PwlPriv* priv) +{ + priv->sink_node = static_cast(pw_registry_bind(priv->registry, id, type, PW_VERSION_CLIENT, 0)); + + if(!priv->sink_node) + { + Log(Log::WARN)<<"could not get node"; + return; + } + pw_node_add_listener(priv->sink_node, &priv->sink_listener, &node_events, priv); +} + +static void setup_stream_listener(const char* serial, PwlPriv *priv) +{ + struct pw_properties* props = pw_properties_new(PW_KEY_MEDIA_TYPE, "Audio", + PW_KEY_MEDIA_CATEGORY, "Capture", + PW_KEY_MEDIA_ROLE, "Music", + PW_KEY_TARGET_OBJECT, serial, + PW_KEY_STREAM_CAPTURE_SINK, "true", + NULL); + + priv->sink_stream = pw_stream_new_simple( + pw_main_loop_get_loop(priv->loop), + "audio-capture", + props, + &stream_events, + priv); + + const struct spa_pod *params[1]; + struct spa_pod_builder b = SPA_POD_BUILDER_INIT(priv->buffer, BUFFER_SIZE); + struct spa_audio_info_raw info_raw = SPA_AUDIO_INFO_RAW_INIT(.format = SPA_AUDIO_FORMAT_F32, .rate = 1000); + params[0] = spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &info_raw); + + enum pw_stream_flags flags = static_cast(PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS); + pw_stream_connect(priv->sink_stream, PW_DIRECTION_INPUT, PW_ID_ANY, flags, params, 1); +} + +static void registry_event_global(void *data, uint32_t id, + uint32_t permissions, const char *type, uint32_t version, + const struct spa_dict *props) +{ + + PwlPriv* priv = static_cast(data); + + if(std::string(type) == "PipeWire:Interface:Node") + { + Log(Log::DEBUG)<key<<' '<value; + + const struct spa_dict_item *name = spa_dict_lookup_item(props, "node.name"); + if(std::string(name->value) == priv->config.sink_name) + { + + const struct spa_dict_item *serial = spa_dict_lookup_item(props, "object.serial"); + + if(priv->config.use_active) + setup_node_active_monitor(id, type, priv); + else + setup_stream_listener(serial->value, priv); + + pw_proxy_destroy((struct pw_proxy*)priv->registry); + } + } +} + +static const struct pw_registry_events registry_events = { + .version = PW_VERSION_REGISTRY_EVENTS, + .global = registry_event_global, +}; + +int main(int argc, char *argv[]) +{ + Log::level = Log::INFO; + + pw_init(&argc, &argv); + + PwlPriv priv; + + argp_parse(&argp, argc, argv, 0, 0, &priv.config); + + if(priv.config.sink_name.empty()) + { + Log(Log::ERROR)<<"A sink name (-n) must be specified!"; + return 1; + } + + Log(Log::INFO)<<"opening TCP socket to "< +#include +#include +#include +#include "log.h" + +const inline char *argp_program_version = "PipwireAudioWatcher"; +const inline char *argp_program_bug_address = ""; +static char doc[] = "Application Monitors a Pipewire sink for activity and reports it"; +static char args_doc[] = ""; + +static struct argp_option options[] = +{ + {"verbose", 'v', 0, 0, "Show debug messages" }, + {"quiet", 'q', 0, 0, "only output data" }, + {"sink name", 'n', "[NAME]", 0, "Name of the sink to connect to" }, + {"use-active", 'a', 0, 0, "Use active state instead of samples to determine if sink is active" }, + {"host", 'h', "[HOST]", 0, "host to send events to" }, + {"port", 'p', "[PORT]", 0, "port to send events to" }, + {"id", 'i', "[ID]", 0, "the id the sensor associated with the sink should have" }, + {"timeout", 't', "[SECONDS]", 0, "the lenght of time that has to pass with no sounds for the stream to be determined inactive" }, + { 0 } +}; + + +struct Config +{ + std::string sink_name; + bool use_active = false; + std::string host = "localhost"; + unsigned short port = 6856; + uint8_t id = 0; + unsigned int timeout = 60; +}; + +static error_t parse_opt (int key, char *arg, struct argp_state *state) +{ + Config *config = reinterpret_cast(state->input); + + switch (key) + { + case 'q': + Log::level = Log::ERROR; + break; + case 'v': + Log::level = Log::DEBUG; + break; + case 'n': + config->sink_name.assign(arg); + break; + case 'a': + config->use_active = true; + break; + case 'h': + config->host.assign(arg); + break; + case 'p': + config->port = atoi(arg); + break; + case 'i': + config->id = strtoul(arg, NULL, 0); + break; + default: + return ARGP_ERR_UNKNOWN; + } + return 0; +} + +static inline struct argp argp = {options, parse_opt, args_doc, doc}; +