Module rtc

Classes

class AudioStream
Expand source code
class AudioStream:
    def __init__(self) -> None:
        self.queue: asyncio.Queue = asyncio.Queue()

    def __aiter__(self) -> AsyncIterator[PcmAudioFrame]:
        return self

    async def __anext__(self) -> PcmAudioFrame:
        item = await self.queue.get()
        if item is None:
            raise StopAsyncIteration

        return item
class Channel (rtc: RtcEngine, options: RtcOptions)
Expand source code
class Channel():
    def __init__(
        self, rtc: "RtcEngine", options: RtcOptions
    ) -> None:
        self.loop = asyncio.get_event_loop()

        # Create the event emitter
        self.emitter = AsyncIOEventEmitter(self.loop)

        self.options = options
        self.remote_users = dict[int, Any]()
        self.rtc = rtc
        self.chat = Chat(self)
        self.channelId = options.channel_name
        self.uid = options.uid
        self.token = options.build_token(rtc.appid, rtc.appcert) if rtc.appcert else ""
        conn_config = RTCConnConfig(
            client_role_type=ClientRoleType.CLIENT_ROLE_BROADCASTER,
            channel_profile=ChannelProfileType.CHANNEL_PROFILE_LIVE_BROADCASTING,
        )
        self.connection = self.rtc.agora_service.create_rtc_connection(conn_config)

        self.channel_event_observer = ChannelEventObserver(self.emitter,
            options=options,)    
        self.connection.register_observer(self.channel_event_observer)

        self.local_user = self.connection.get_local_user()
        self.local_user.set_playback_audio_frame_before_mixing_parameters(
            options.channels, options.sample_rate
        )
        self.local_user.register_local_user_observer(self.channel_event_observer)
        self.local_user.register_audio_frame_observer(self.channel_event_observer)
        # self.local_user.subscribe_all_audio()

        self.media_node_factory = self.rtc.agora_service.create_media_node_factory()
        self.audio_pcm_data_sender = (
            self.media_node_factory.create_audio_pcm_data_sender()
        )
        self.audio_track = self.rtc.agora_service.create_custom_audio_track_pcm(
            self.audio_pcm_data_sender
        )
        self.audio_track.set_enabled(1)
        self.local_user.publish_audio(self.audio_track)

        self.stream_id = self.connection.create_data_stream(False, False)
        self.received_chunks = {}
        self.waiting_message = None
        self.msg_id = ""
        self.msg_index = ""
        
        self.on("user_joined", lambda agora_rtc_conn, user_id: self.remote_users.update({user_id: True}))
        self.on("user_left", lambda agora_rtc_conn, user_id, reason: self.remote_users.pop(user_id, None))
        
        def handle_audio_subscribe_state_changed(agora_local_user, channel, user_id, old_state, new_state, elapse_since_last_state):
            if new_state == 3:  # Successfully subscribed
                self.channel_event_observer.audio_streams.update({user_id: AudioStream()})
            elif new_state == 0:
                self.channel_event_observer.audio_streams.pop(user_id, None)

        self.on("audio_subscribe_state_changed", handle_audio_subscribe_state_changed)

    
    async def connect(self) -> None:
        """
        Connects to a channel.

        Parameters:
            channelId: The channel ID.
            uid: The user ID.

        Returns:
            Channel: The connected channel.
        """
        future = asyncio.Future()

        def callback(agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason):
            logger.info(f"Connection state changed: {conn_info.state}")
            if conn_info.state == 3:  # Connection successful
                future.set_result(None)
            elif conn_info.state == 5:  # Connection failed
                future.set_exception(Exception(f"Connection failed with state: {conn_info.state}"))

        self.on("connection_state_changed", callback)
        self.connection.connect(self.token, self.channelId, self.uid)
        
        try:
            await future
        except Exception as e:
            raise Exception(f"Failed to connect to channel {self.channelId}: {str(e)}") from e
        finally:
            self.off("connection_state_changed", callback)

    async def disconnect(self) -> None:
        """
        Disconnects the channel.
        """
        disconnected_future = asyncio.Future[None]()
        def callback(agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason):
            self.off("connection_state_changed", callback)
            if conn_info.state == 1:
                disconnected_future.set_result(None)
        self.on("connection_state_changed", callback)
        self.connection.disconnect()
        await disconnected_future

    def get_audio_frames(self, uid: int) -> AudioStream:
        """
        Returns the audio frames from the channel.

        Returns:
            AudioStream: The audio stream.
        """
        return self.channel_event_observer.audio_streams[uid]

    async def push_audio_frame(self, frame: bytes) -> None:
        """
        Pushes an audio frame to the channel.
        
        Parameters:
            frame: The audio frame to push.
        """
        audio_frame = PcmAudioFrame()
        audio_frame.data = bytearray(frame)
        audio_frame.timestamp = 0
        audio_frame.bytes_per_sample = 2
        audio_frame.number_of_channels = self.options.channels
        audio_frame.sample_rate = self.options.sample_rate
        audio_frame.samples_per_channel = int(
            len(frame) / audio_frame.bytes_per_sample / audio_frame.number_of_channels
        )

        self.audio_pcm_data_sender.send_audio_pcm_data(audio_frame)

    async def subscribe_audio(self, uid: int) -> None:
        """
        Subscribes to the audio of a user.

        Parameters:
            uid: The user ID to subscribe to.
        """
        future = asyncio.Future()

        def callback(
            agora_local_user,
            channel,
            user_id,
            old_state,
            new_state,
            elapse_since_last_state,
        ):
            if new_state == 3:  # Successfully subscribed
                future.set_result(None)
            # elif new_state == 1:  # Subscription failed
            #     future.set_exception(
            #         Exception(
            #             f"Failed to subscribe {user_id} audio: state changed from {old_state} to {new_state}"
            #         )
            #     )

        self.on("audio_subscribe_state_changed", callback)
        self.local_user.subscribe_audio(uid)

        try:
            await future
        except Exception as e:
            raise Exception(f"Audio subscription failed for user {uid}: {str(e)}") from e
        finally:
            self.off("audio_subscribe_state_changed", callback)


    async def unsubscribe_audio(self, uid: int) -> None:
        """
        Unsubscribes from the audio of a user.

        Parameters:
            uid: The user ID to unsubscribe from.
        """
        future = asyncio.Future()

        def callback(
            agora_local_user,
            channel,
            user_id,
            old_state,
            new_state,
            elapse_since_last_state,
        ):
            if new_state == 3:  # Successfully unsubscribed
                future.set_result(None)
            else:  # Failed to unsubscribe
                future.set_exception(
                    Exception(
                        f"Failed to unsubscribe {user_id} audio: state changed from {old_state} to {new_state}"
                    )
                )

        self.on("audio_subscribe_state_changed", callback)
        self.local_user.unsubscribe_audio(uid)

        try:
            await future
        except Exception as e:
            raise Exception(f"Audio unsubscription failed for user {uid}: {str(e)}") from e
        finally:
            self.off("audio_subscribe_state_changed", callback)


    def _split_string_into_chunks(self, long_string, msg_id, chunk_size=300) -> list[dict[str: Any]]:
        """
        Splits a long string into chunks of a given size.
        
        Parameters:
            long_string: The string to split.
            msg_id: The message ID.
            chunk_size: The size of each chunk.

        Returns:
            list[dict[str: Any]]: The list of chunks.
        
        """
        total_parts = (len(long_string) + chunk_size - 1) // chunk_size
        json_chunks = []
        for idx in range(total_parts):
            start = idx * chunk_size
            end = min(start + chunk_size, len(long_string))
            chunk = {
                'msg_id': msg_id,
                'part_idx': idx,
                'total_parts': total_parts,
                'content': long_string[start:end]
            }
            json_chunk = json.dumps(chunk, ensure_ascii=False)
            json_chunks.append(json_chunk)    
        return json_chunks

    async def send_stream_message(self, data: str, msg_id: str) -> None: 
        """
        Sends a stream message to the channel.

        Parameters:
            data: The data to send.
            msg_id: The message ID.
        """

        chunks = self._split_string_into_chunks(data, msg_id)    
        for chunk in chunks:
            self.connection.send_stream_message(self.stream_id, chunk)

    def on(self, event_name: str, callback):
        """
        Allows external components to subscribe to events.
        
        Parameters:
            event_name: The name of the event to subscribe to.
            callback: The callback to call when the event is emitted.

        """
        self.emitter.on(event_name, callback)

    def once(self, event_name: str, callback):
        """
        Allows external components to subscribe to events once.
        
        Parameters:
            event_name: The name of the event to subscribe to.
            callback: The callback to call when the event is emitted.
        """
        self.emitter.once(event_name, callback)

    def off(self, event_name: str, callback):
        """
        Allows external components to unsubscribe from events.

        Parameters:
            event_name: The name of the event to unsubscribe from.
            callback: The callback to remove from the event.
        """
        self.emitter.remove_listener(event_name, callback)

Methods

async def connect(self) ‑> None

Connects to a channel.

Parameters

channelId: The channel ID. uid: The user ID.

Returns

Channel
The connected channel.
async def disconnect(self) ‑> None

Disconnects the channel.

def get_audio_frames(self, uid: int) ‑> AudioStream

Returns the audio frames from the channel.

Returns

AudioStream
The audio stream.
def off(self, event_name: str, callback)

Allows external components to unsubscribe from events.

Parameters

event_name: The name of the event to unsubscribe from. callback: The callback to remove from the event.

def on(self, event_name: str, callback)

Allows external components to subscribe to events.

Parameters

event_name: The name of the event to subscribe to. callback: The callback to call when the event is emitted.

def once(self, event_name: str, callback)

Allows external components to subscribe to events once.

Parameters

event_name: The name of the event to subscribe to. callback: The callback to call when the event is emitted.

async def push_audio_frame(self, frame: bytes) ‑> None

Pushes an audio frame to the channel.

Parameters

frame: The audio frame to push.

async def send_stream_message(self, data: str, msg_id: str) ‑> None

Sends a stream message to the channel.

Parameters

data: The data to send. msg_id: The message ID.

async def subscribe_audio(self, uid: int) ‑> None

Subscribes to the audio of a user.

Parameters

uid: The user ID to subscribe to.

async def unsubscribe_audio(self, uid: int) ‑> None

Unsubscribes from the audio of a user.

Parameters

uid: The user ID to unsubscribe from.

class ChannelEventObserver (event_emitter: pyee.asyncio.AsyncIOEventEmitter, options: RtcOptions)
Expand source code
class ChannelEventObserver(IRTCConnectionObserver, IRTCLocalUserObserver, IAudioFrameObserver):
    def __init__(self, event_emitter: AsyncIOEventEmitter, options: RtcOptions) -> None:
        self.loop = asyncio.get_event_loop()
        self.emitter = event_emitter
        self.audio_streams = dict[int, AudioStream]()
        self.options = options



    def emit_event(self, event_name: str, *args):
        """Helper function to emit events."""
        self.loop.call_soon_threadsafe(self.emitter.emit, event_name, *args)

    def on_connected(
        self, agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason
    ):
        logger.info(f"Connected to RTC: {agora_rtc_conn} {conn_info} {reason}")        
        self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason)

    def on_disconnected(
        self, agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason
    ):
        logger.info(f"Disconnected from RTC: {agora_rtc_conn} {conn_info} {reason}")
        self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason)

    def on_connecting(
        self, agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason
    ):
        logger.info(f"Connecting to RTC: {agora_rtc_conn} {conn_info} {reason}")
        self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason)

    def on_connection_failure(self, agora_rtc_conn, conn_info, reason):
        logger.error(f"Connection failure: {agora_rtc_conn} {conn_info} {reason}")
        self.emit_event("connection_state_changed", agora_rtc_conn, conn_info, reason)

    def on_user_joined(self, agora_rtc_conn: RTCConnection, user_id):
        logger.info(f"User joined: {agora_rtc_conn} {user_id}")
        self.emit_event("user_joined", agora_rtc_conn, user_id)

    def on_user_left(self, agora_rtc_conn: RTCConnection, user_id, reason):
        logger.info(f"User left: {agora_rtc_conn} {user_id} {reason}")
        self.emit_event("user_left", agora_rtc_conn, user_id, reason)


    def handle_received_chunk(self, json_chunk):
        chunk = json.loads(json_chunk)
        msg_id = chunk["msg_id"]
        part_idx = chunk["part_idx"]
        total_parts = chunk["total_parts"]
        if msg_id not in self.received_chunks:
            self.received_chunks[msg_id] = {"parts": {}, "total_parts": total_parts}
        if (
            part_idx not in self.received_chunks[msg_id]["parts"]
            and 0 <= part_idx < total_parts
        ):
            self.received_chunks[msg_id]["parts"][part_idx] = chunk
            if len(self.received_chunks[msg_id]["parts"]) == total_parts:
                # all parts received, now recomposing original message and get rid it from dict
                sorted_parts = sorted(
                    self.received_chunks[msg_id]["parts"].values(),
                    key=lambda c: c["part_idx"],
                )
                full_message = "".join(part["content"] for part in sorted_parts)
                del self.received_chunks[msg_id]
                return full_message, msg_id
        return (None, None)

    def on_stream_message(
        self, agora_local_user: LocalUser, user_id, stream_id, data, length
    ):
        # logger.info(f"Stream message", agora_local_user, user_id, stream_id, length)
        (reassembled_message, msg_id) = self.handle_received_chunk(data)
        if reassembled_message is not None:
            logger.info(f"Reassembled message: {msg_id} {reassembled_message}")


    def on_audio_subscribe_state_changed(
        self,
        agora_local_user,
        channel,
        user_id,
        old_state,
        new_state,
        elapse_since_last_state,
    ):
        logger.info(
            f"Audio subscribe state changed: {user_id} {new_state} {elapse_since_last_state}"
        )
        self.emit_event("audio_subscribe_state_changed", agora_local_user, channel, user_id, old_state, new_state, elapse_since_last_state)

    def on_playback_audio_frame_before_mixing(
        self, agora_local_user: LocalUser, channelId, uid, frame: AudioFrame
    ):
        audio_frame = PcmAudioFrame()
        audio_frame.samples_per_channel = frame.samples_per_channel
        audio_frame.bytes_per_sample = frame.bytes_per_sample
        audio_frame.number_of_channels = frame.channels
        audio_frame.sample_rate = self.options.sample_rate
        audio_frame.data = frame.buffer

        # print(
        #     "on_playback_audio_frame_before_mixing",
        #     audio_frame.samples_per_channel,
        #     audio_frame.bytes_per_sample,
        #     audio_frame.number_of_channels,
        #     audio_frame.sample_rate,
        #     len(audio_frame.data),
        # )
        self.loop.call_soon_threadsafe(self.audio_streams[uid].queue.put_nowait, audio_frame)
        return 0

Ancestors

  • agora.rtc.rtc_connection_observer.IRTCConnectionObserver
  • agora.rtc.local_user_observer.IRTCLocalUserObserver
  • agora.rtc.audio_frame_observer.IAudioFrameObserver

Methods

def emit_event(self, event_name: str, *args)

Helper function to emit events.

def handle_received_chunk(self, json_chunk)
def on_audio_subscribe_state_changed(self, agora_local_user, channel, user_id, old_state, new_state, elapse_since_last_state)
def on_connected(self, agora_rtc_conn: agora.rtc.rtc_connection.RTCConnection, conn_info: agora.rtc.rtc_connection.RTCConnInfo, reason)
def on_connecting(self, agora_rtc_conn: agora.rtc.rtc_connection.RTCConnection, conn_info: agora.rtc.rtc_connection.RTCConnInfo, reason)
def on_connection_failure(self, agora_rtc_conn, conn_info, reason)
def on_disconnected(self, agora_rtc_conn: agora.rtc.rtc_connection.RTCConnection, conn_info: agora.rtc.rtc_connection.RTCConnInfo, reason)
def on_playback_audio_frame_before_mixing(self, agora_local_user: agora.rtc.local_user.LocalUser, channelId, uid, frame: agora.rtc.agora_base.AudioFrame)
def on_stream_message(self, agora_local_user: agora.rtc.local_user.LocalUser, user_id, stream_id, data, length)
def on_user_joined(self, agora_rtc_conn: agora.rtc.rtc_connection.RTCConnection, user_id)
def on_user_left(self, agora_rtc_conn: agora.rtc.rtc_connection.RTCConnection, user_id, reason)
class Chat (channel: Channel)
Expand source code
class Chat():
    def __init__(self, channel: Channel) -> None:
        self.channel = channel
        self.loop = self.channel.loop
        self.queue = asyncio.Queue()

        def log_exception(t: asyncio.Task[Any]) -> None:
            if not t.cancelled() and t.exception():
                logger.error(
                    "unhandled exception",
                    exc_info=t.exception(),
                )
        asyncio.create_task(self._process_message()).add_done_callback(log_exception)

    async def send_message(self, item: ChatMessage) -> None:
        """
        Sends a message to the channel.

        Parameters:
            item: The message to send.
        """
        await self.queue.put(item)
        # await self.queue.put_nowait(item)

    async def _process_message(self) -> None:
        """
        Processes messages in the queue.
        """

        while True:
            item: ChatMessage = await self.queue.get()
            await self.channel.send_stream_message(item.message, item.msg_id)
            self.queue.task_done()
            # await asyncio.sleep(0)

Methods

async def send_message(self, item: ChatMessage) ‑> None

Sends a message to the channel.

Parameters

item: The message to send.

class ChatMessage (message: str, msg_id: str)
Expand source code
class ChatMessage():
    def __init__(self, message: str, msg_id: str) -> None:
        self.message = message
        self.msg_id = msg_id
class RtcEngine (appid: str, appcert: str)
Expand source code
class RtcEngine:
    def __init__(self, appid: str, appcert: str):
        self.appid = appid
        self.appcert = appcert
        
        if not appid:
            raise Exception("App ID is required)")
        
        config = AgoraServiceConfig()
        config.audio_scenario = AudioScenarioType.AUDIO_SCENARIO_CHORUS
        config.appid = appid
        config.log_path = os.path.join(
            os.path.dirname(
                os.path.dirname(
                    os.path.dirname(os.path.join(os.path.abspath(__file__)))
                )
            ),
            "agorasdk.log",
        )
        self.agora_service = AgoraService()
        self.agora_service.initialize(config)

    def create_channel(self, options: RtcOptions) -> Channel:
        """
        Creates a channel.

        Parameters:
            channelId: The channel ID.
            uid: The user ID.

        Returns:
            Channel: The created channel.
        """
        return Channel(self, options)

    def destroy(self) -> None:
        """
        Destroys the RTC engine.
        """
        self.agora_service.release()

Methods

def create_channel(self, options: RtcOptions) ‑> Channel

Creates a channel.

Parameters

channelId: The channel ID. uid: The user ID.

Returns

Channel
The created channel.
def destroy(self) ‑> None

Destroys the RTC engine.

class RtcOptions (*, channel_name: str = None, uid: int = 0, sample_rate: int = 24000, channels: int = 1)
Expand source code
class RtcOptions:
    def __init__(
        self,
        *,
        channel_name: str = None,
        uid: int = 0,
        sample_rate: int = 24000,
        channels: int = 1,):
        self.channel_name = channel_name
        self.uid = uid
        self.sample_rate = sample_rate
        self.channels = channels
    
    def build_token(self, appid: str, appcert: str) -> str:
        return RealtimekitTokenBuilder.build_token(appid, appcert, self.channel_name, self.uid)

Methods

def build_token(self, appid: str, appcert: str) ‑> str