""" SoundSystem module prototype, based on Jason Tribbeck's spec. See: http://autobuild.tribbeck.com/audioconductor/index.php """ from riscos.modules.pymodules import PyModule from riscos.errors import RISCOSSyntheticError, RISCOSError from riscos.readunsigned import readunsigned class SoundSystemConstants(object): # SWI numbers SWI_SoundSystem_RegisterDevice = 0x5a080 SWI_SoundSystem_DeregisterDevice = 0x5a081 SWI_SoundSystem_EnumerateDevices = 0x5a082 SWI_SoundSystem_EnumerateDeviceFormats = 0x5a083 SWI_SoundSystem_EnumerateDeviceSampleRates = 0x5a084 SWI_SoundSystem_SetFormat = 0x5a085 SWI_SoundSystem_6 = 0x5a086 SWI_SoundSystem_SetSampleRate = 0x5a087 SWI_SoundSystem_AttachPlaybackHandler = 0x5a088 SWI_SoundSystem_AttachRecordingHandler = 0x5a089 SWI_SoundSystem_DetachPlaybackHandler = 0x5a08A SWI_SoundSystem_DetachRecordingHandler = 0x5a08B SWI_SoundSystem_SetVolume = 0x5a08C SWI_SoundSystem_Configure = 0x5a08D SWI_SoundSystem_RegisterExclusiveAccess = 0x5a08E SWI_SoundSystem_DeregisterExclusiveAccess = 0x5a08F # Services Service_SoundSystemInitialised = 0x81140 Service_SoundSystemInitialised_Initialise = 0 Service_SoundSystemInitialised_Finalise = 1 Service_SoundSystemHardwareAdded = 0x81141 Service_SoundSystemHardwareRemoved = 0x81142 # Device control entry point values Control_Initialise = 0 Control_SampleFormat = 1 Control_SampleRate = 2 Control_OutputVolume = 3 Control_InputVolume = 4 Control_Enable = 5 Control_Enable_Playback = (1<<0) Control_Enable_Recording = (1<<1) Capability_Playback = (1<<0) # Device supports playback Capability_Recording = (1<<1) # Device supports recording Capability_SupportsLPCM = (1<<2) # Device supports LPCM playback / recording Capability_SupportsNonLPCM = (1<<3) # Device supports non-LPCM playback / recording # bits 4 - 7 Reserved (0) Capability_InputMicrophone = (1<<8) # Device supports microphone input Capability_InputLine = (1<<9) # Device supports line input Capability_InputDigital = (1<<10) # Device supports digital input # bits 11 - 15 Reserved (0) Capability_ChannelLeft = (1<<16) # Left front mixer is present Capability_ChannelRight = (1<<17) # Right front mixer is present Capability_ChannelCentre = (1<<18) # Centre front mixer is present Capability_ChannelLFE = (1<<19) # Low frequency enhancement mixer is present Capability_ChannelLeftSound = (1<<20) # Left surround mixer is present Capability_ChannelRightSound = (1<<21) # Right surround mixer is present Capability_ChannelLeftCentre = (1<<22) # Left of centre mixer is present Capability_ChannelRightCentre = (1<<23) # Right of centre mixer is present Capability_ChannelSurround = (1<<24) # Surround mixer is present Capability_ChannelSideLeft = (1<<25) # Side left mixer is present Capability_ChannelSideRight = (1<<26) # Side right mixer is present Capability_ChannelTop = (1<<27) # Top mixer is present # bits 28 Reserved (0) Capability_InputMixerMicrophone = (1<<29) # Microphone input mixer is present Capability_InputMixerLine = (1<<30) # Line input mixer is present Capability_InputMixerDigital = (1<<31) # Digital input mixer is present class SoundSystemDevice(object): def __init__(self, ro, device_name, device_id, dib, dib_base): self.ro = ro self.name = device_name self.id = device_id self.capabilities = dib[0].word self.channels = dib[4].word formats = ro.memory[dib[8].word + dib_base] # FIXME: Haven't implemented processing of formats yet self.control_code = dib[12].word + dib_base self.dib_copy = self.ro.kernel.da_rma.allocate(16) self.dib_copy[0].word = self.capabilities self.dib_copy[4].word = self.channels self.dib_copy[8].word = 0 # FIXME: Haven't implemented the copying of the formats self.dib_copy[12].word = self.control_code self.name_copy = self.ro.kernel.da_rma.strdup(self.name) self.id_copy = self.ro.kernel.da_rma.strdup(self.id) def __repr__(self): return "<{}(name={}, id={}, channels={})>".format(self.__class__.__name__, self.name, self.id, self.channels) def shutdown(self): self.dib_copy.free() self.name_copy.free() self.id_copy.free() # FIXME: Do we need to call the driver to tell it that it's no longer being used? class SoundSystem(PyModule): version = '0.01' date = '19 Oct 2020' swi_base = 0x5A080 swi_prefix = "SoundSystem" swi_names = [ "RegisterDevice", "DeregisterDevice", "EnumerateDevices", "EnumerateDeviceFormats", "EnumerateDeviceSampleRates", "SetFormat", "6" "SetSampleRate", "AttachPlaybackHandler", "AttachRecordingHandler", "DetachPlaybackHandler", "DetachRecordingHandler", "SetVolume", # &5A09x "Configure", # &5A0Bx "RegisterExclusiveAccess", # &5A08x "DeregisterExclusiveAccess", # &5A08x ] entrypoint_names = [ 'init_callback_handler', 'device_request_playback_data_handler', 'device_supply_recording_data_handler', ] commands = ( ('SoundSystemDevices', '*SoundSystemDevices lists the devices registered with SoundSystem.', 0x00000000, 'Syntax: *SoundSystemDevices'), ) def __init__(self, ro, module): super(SoundSystem, self).__init__(ro, module) self.swi_dispatch = { 0: self.swi_soundsystem_register_device, 1: self.swi_soundsystem_deregister_device, 2: self.swi_soundsystem_enumerate_devices, } # Devices in the order of registration self.devices = [] # Devices by name self.device_ids = {} self.pwp = None self.debug_soundsystem = False self.ro.debug_register_ivar('soundsystem', self) def initialise(self, arguments, pwp): # Must announce our existance on a callback, as SWIs not available during initialise self.pwp = pwp self.ro.kernel.api.os_addcallback(self.module.entrypoints['init_callback_handler'].address, self.pwp) def init_callback_handler(self, regs): self.announce_initialise(SoundSystemConstants.Service_SoundSystemInitialised_Initialise) def finalise(self, pwp): # Remove any announcement self.ro.kernel.api.os_removecallback(self.module.entrypoints['init_callback_handler'].address, self.pwp) # Issue the finalise first so that clients can know to stop calling us. self.announce_initialise(SoundSystemConstants.Service_SoundSystemInitialised_Finalise) # FIXME: Set a flag here that we're shutting down in case people didn't pay attention to the # finalise service call? for device_id, ssdevice in list(self.device_ids.items()): if device_id in self.device_ids: # Be careful because the shutdown of a device may cause the driver to remove # other devices. del self.device_ids[device_id] self.devices.remove(ssdevice) ssdevice.shutdown() def announce_initialise(self, state): """ Announce the initialisation/finalisation state. @param state: The initialise/finalise state of the module """ if self.debug_soundsystem: print("SoundSystem: Announcing initialised/finalised: state={}".format(state)) self.ro.kernel.api.os_servicecall(SoundSystemConstants.Service_SoundSystemInitialised, state) def announce_device(self, device_id, added): """ Announce the device state. @param state: The initialise/finalise state of the module """ if self.debug_soundsystem: print("SoundSystem: Announcing device (added={}): device_id={})".format(added, device_id)) if added: service = SoundSystemConstants.Service_SoundSystemHardwareAdded else: service = SoundSystemConstants.Service_SoundSystemHardwareRemoved with self.ro.kernel.da_rma.strdup(device_id) as devid: self.ro.kernel.api.os_servicecall(service, devid) def swi(self, offset, regs): func = self.swi_dispatch.get(offset, None) if func: return func(regs) return False def _lookup_ssdevice(self, ssid): ssdevice = self.device_ids.get(ssid, None) if not ssdevice: raise RISCOSSyntheticError(self.ro, "Invalid device identifer for SoundSystem") return ssdevice def swi_soundsystem_register_device(self, regs): """ SoundSystem_RegisterDevice - register a device => R0 = pointer to human readable device name R1 = pointer to device identifier R2 = pointer to device information block: +0 Bit flags of capabilities +4 Maximum number of channels +8 Pointer or offset to list of formats supported (zero if the hardware driver only supports 2-channel 32-bit LPCM formatted data) +12 Pointer or offset to control code R3 = pointer to start of module if information block contains offsets; or 0 if information block contains poiners <= R0 - R3 = previous values """ device_name = self.ro.memory[regs[0]].string device_id = self.ro.memory[regs[1]].string dib = self.ro.memory[regs[2]] dib_base = regs[3] if device_id in self.device_ids: return RISCOSSyntheticError(self.ro, "Device already registered") ssdevice = SoundSystemDevice(self.ro, device_name, device_id, dib, dib_base) if self.debug_soundsystem: print("SoundSystem_RegisterDevice: device={}".format(ssdevice)) self.devices.append(ssdevice) self.device_ids[device_id] = ssdevice # Device has registered - announce it self.announce_device(device_id=device_id, added=True) return True def swi_soundsystem_deregister_device(self, regs): """ SoundSystem_DeregisterDevice - deregister a device => R1 = pointer to device identifier <= R1 preserved """ device_id = self.ro.memory[regs[1]].string if self.debug_soundsystem: print("SoundSystem_DeregisterDevice: device_id={}".format(device_id)) ssdevice = self._lookup_ssdevice(device_id) del self.device_ids[device_id] self.devices.remove(ssdevice) ssdevice.shutdown() # Device has been deregistered - announce it self.announce_device(device_id=device_id, added=False) return True def swi_soundsystem_enumerate_devices(self, regs): """ SoundSystem_EnumerateDevices => R1 = pointer to device identifier, or 0 to start <= R0 = pointer to the device name, or not valid if R1 = 0 R1 = pointer to device identifier, or 0 for end of the list R2 = pointer to device information block, or not valid if R1 = 0 """ last_device_id = self.ro.memory[regs[1]].string if regs[1] != 0 else None for ssdevice in self.devices: return_next = last_device_id is None if return_next: # We have found the device they want. regs[0] = ssdevice.name_copy.address regs[1] = ssdevice.id_copy.address regs[2] = ssdevice.dib_copy.address return True if ssdevice.id == last_device_id: last_device_id = None # Force return of the next device ID if last_device_id: # We went through the whole enumeration but didn't see the device_id they # supplied. raise RISCOSSyntheticError(self.ro, "Invalid device ID supplied to SoundSystem_EnumerateDevices") # That was the last item; so we terminate the enumeration regs[1] = 0 return True def cmd_soundsystemdevices(self, args): """ Syntax: *SoundSystemDevices """ self.ro.kernel.writeln("{:12} | {:24} | {:8}".format("DeviceID", "Name", "Channels")) for ssdevice in self.devices: self.ro.kernel.writeln("{:12} | {:24} | {:8}".format(ssdevice.id, ssdevice.name, ssdevice.channels)) def device_request_playback_data_handler(self, regs): """ Device is requesting playback data. => R0 = pointer to buffer to populate R1 = length of buffer R12 = private word (in our case, a driver identification) <= R0 = pointer after last byte filled """ # FIXME: Locate ssdevice this is from ssdevice = None playback_buffer = self.ro.memory[regs[0]] playback_length = regs[2] if self.debug_soundsystem: print("Device {} requested {} bytes of playback data".format(ssdevice, playback_length)) # FIXME: Populate data def device_supply_recording_data_handler(self, regs): """ Device is supplying recorded data. Note: All data must be consumed. => R0 = pointer to buffer to containing data R1 = length of buffer R12 = private word (in our case, a driver identification) """ ssdevice = None playback_buffer = self.ro.memory[regs[0]] playback_length = regs[2] if self.debug_soundsystem: print("Device {} supplied {} bytes of recorded data".format(ssdevice, playback_length)) # FIXME: Store data? ################################################################################################################# class SoundPyromaniac(PyModule): version = '0.01' date = '19 Oct 2020' entrypoint_names = [ 'control_handler', ] def __init__(self, ro, module): super(SoundPyromaniac, self).__init__(ro, module) self.registered = False self.my_name = "SoundPyromaniac" self.my_deviceid = "Device0" self.my_channels = 2 self.my_formats = 0 # FIXME: Not implemented yet self.my_capabilities = 0 self.handler_playback = None self.handler_recording = None self.handler_workspace = None self.output_enable = False self.input_enable = False self.sample_rate = 44.1 * 1000 * 1024 self.sample_format = 0 self.output_volume = 255 self.input_volume = 255 self.debug_soundpyromaniac = False self.ro.debug_register_ivar('soundpyromaniac', self) def initialise(self, args, pwp): self.register() def finalise(self, pwp): self.deregister() def register(self): if not self.registered: with self.ro.kernel.da_rma.strdup(self.my_name) as name, \ self.ro.kernel.da_rma.strdup(self.my_deviceid) as deviceid, \ self.ro.kernel.da_rma.allocate(16) as dib: dib[0].word = self.my_capabilities dib[4].word = self.my_channels dib[8].word = 0 # FIXME: Not supported formats yet dib[12].word = self.module.entrypoints['control_handler'].address try: self.ro.kernel.api.swi(SoundSystemConstants.SWI_SoundSystem_RegisterDevice, regs={0: name.address, 1: deviceid.address, 2: dib.address}) self.registered = True if self.debug_soundpyromaniac: print("SoundPyromaniac: Registered OK") except RISCOSError as exc: if self.debug_soundpyromaniac: print("Error registering SoundPyromaniac: {}".format(exc)) def deregister(self): if self.registered: # FIXME: Actually disable the callbacks self.output_enable = False self.input_enable = False with self.ro.kernel.da_rma.strdup(self.my_deviceid) as deviceid: try: self.ro.kernel.api.swi(SoundSystemConstants.SWI_SoundSystem_DeregisterDevice, regs={1: deviceid.address}) except RISCOSError as exc: if self.debug_soundpyromaniac: print("Error deregistering SoundPyromaniac: {}".format(exc)) # Regardless of what it is, we ignore self.registered = False self.handler_playback = None self.handler_recording = None self.handler_workspace = None def service(self, service_number, regs): if service_number == SoundSystemConstants.Service_SoundSystemInitialised: state = regs[0] if state == SoundSystemConstants.Service_SoundSystemInitialised_Initialise: # The SoundSystem has initialised. self.register() elif state == SoundSystemConstants.Service_SoundSystemInitialised_Finalise: # The SoundSystem has finalised; we need to shutdown # Note: Documentation says not to deregister #self.deregister() self.registered = False def control_handler(self, regs): code = regs[0] if self.debug_soundpyromaniac: print("SoundPyromaniac: Control {} called".format(code)) regs.dump() if code == SoundSystemConstants.Control_Initialise: # We've been supplied with the handler details, so we can remember them self.handler_playback = regs[1] self.handler_recording = regs[2] self.handler_workspace = regs[3] # We start out in the disabled state so that we can be configured. self.enabled = False if self.debug_soundpyromaniac: print(" Configured SoundSystem: &{:08x}/&{:08x} workspace &{:08x}".format(self.handler_playback, self.handler_recording, self.handler_workspace)) elif code == SoundSystemConstants.Control_SampleFormat: new_sample_format = regs[1] old_sample_format = self.sample_format if new_sample_format != -1: # FIXME: Validate? # FIXME: How do we say no? self.sample_format = new_sample_format if self.debug_soundpyromaniac: print(" Set sample format to {}".format(self.sample_format)) regs[1] = old_sample_format elif code == SoundSystemConstants.Control_SampleRate: new_sample_rate = regs[1] old_sample_rate = self.sample_rate if new_sample_rate != -1: # FIXME: Validate? # FIXME: How do we say no? self.sample_rate = new_sample_rate if self.debug_soundpyromaniac: print(" Set sample rate to {} Hz".format(self.sample_rate / 1024.0)) regs[1] = old_sample_rate elif code == SoundSystemConstants.Control_OutputVolume: channel = regs[1] # FIXME: Validate the channel selected # How do we report invalid channels? new_volume = regs[2] # FIXME: We only support a master volume old_volume = self.output_volume # FIXME: Presumably we just quantise this as necessary and they'll call again to # read what was actually set? if new_volume != -1: self.output_volume = new_volume if self.debug_soundpyromaniac: print(" Set output volume for channel {} to {} ({5.1f}%".format(channel, self.output_volume, self.output_volume * 100 / 255.0)) regs[1] = old_volume elif code == SoundSystemConstants.Control_InputVolume: channel = regs[1] # FIXME: Validate the channel selected # How do we report invalid channels? new_volume = regs[2] # FIXME: We only support a master volume old_volume = self.input_volume # FIXME: Presumably we just quantise this as necessary and they'll call again to # read what was actually set? if new_volume != -1: self.input_volume = new_volume if self.debug_soundpyromaniac: print(" Set output volume for channel {} to {} ({:5.1f}%".format(channel, self.input_volume, self.input_volume * 100 / 255.0)) regs[1] = old_volume elif code == SoundSystemConstants.Control_Enable: flags = regs[1] new_output_enable = bool(flags & SoundSystemConstants.Control_Enable_Playback) new_input_enable = bool(flags & SoundSystemConstants.Control_Enable_Recording) if new_output_enable != self.output_enable: if self.debug_soundpyromaniac: print(" Output state change: {}".format(new_output_enable)) self.output_enable = new_output_enable # FIXME: Actually start the callbacks if new_input_enable != self.input_enable: if self.debug_soundpyromaniac: print(" Input state change: {}".format(new_input_enable)) self.input_enable = new_input_enable # FIXME: Actually start the callbacks