diff --git a/process_bigraph/experiments/signal_modulation.py b/process_bigraph/experiments/signal_modulation.py index dc927d0..714fb83 100644 --- a/process_bigraph/experiments/signal_modulation.py +++ b/process_bigraph/experiments/signal_modulation.py @@ -93,8 +93,57 @@ def update(self, state, interval): } +class RingModulationProcess(Process): + config_schema = { + 'duration': { + '_type': 'int', + '_default': 10 + }, + #'input_signal': 'list[float]', + 'mod_freq': 'int', + 'starting_frequency': 'int' + } + + def __init__(self, config=None): + super().__init__(config) + self.starting_frequency = self.config['starting_frequency'] + + def initial_state(self): + starting_pitch = start_sine_wave(self.config['duration'], self.starting_frequency) + return {'output_signal': starting_pitch} + + def schema(self): + return { + 'output_signal': 'list[float]', + } + + def update(self, state, interval): + # create new wave + new_wave_modulated = apply_modulation( + input_wave=np.array(state['output_signal']), + modulation_function=tremolo, + depth=self.config['depth'], + rate=self.config['rate'] + ) + print(new_wave_modulated) + + # write out the file + array_to_wav( + filename=os.path.join( + os.getcwd(), + 'tremolo_' + str(datetime.datetime.utcnow()).replace(':', '').replace(' ', '').replace('.', '') + '.wav' + ), + input_signal=new_wave_modulated + ) + + return { + 'output_signal': new_wave_modulated.tolist() + } + + process_registry.register('medium_distortion', MediumDistortionProcess) process_registry.register('tremolo', TremoloProcess) +process_registry.register('ring_modulation', RingModulationProcess) def apply_modulation(input_wave, modulation_function, **kwargs): @@ -134,6 +183,34 @@ def tremolo(input_wave, rate=5, depth=0.75): return input_wave * modulating_wave +def ring_modulation(input_wave, mod_freq=30): + """ + Apply a ring modulation effect to the waveform. + + :param input_wave: NumPy array, the input waveform. + :param mod_freq: float, the frequency of the modulating wave. + :return: NumPy array, the waveform with ring modulation effect. + """ + t = np.linspace(0, 1, len(input_wave), endpoint=True) + modulating_wave = np.sin(2 * np.pi * mod_freq * t) + return input_wave * modulating_wave + + +def bit_crusher(input_wave, bit_depth=8): + """ + Apply a bit crusher effect to the waveform. + + :param input_wave: NumPy array, the input waveform. + :param bit_depth: int, the target bit depth. + :return: NumPy array, the waveform with bit crusher effect. + """ + max_val = np.max(np.abs(input_wave)) + input_wave_normalized = input_wave / max_val + step = 2 ** bit_depth + crushed_wave = np.round(input_wave_normalized * step) / step + return crushed_wave * max_val + + def array_to_wav(filename, input_signal, sample_rate=44100): """ Writes a NumPy array to a WAV file. @@ -162,7 +239,17 @@ def adjust_pitch(starting_frequency, n_semitones): return starting_frequency * 2 ** (n_semitones / 12) -# array_to_wav(input_signal, sample_rate, 'output.wav') +def run_instance(instance, num_beats=4): + # make the composite + workflow = Composite({ + 'state': instance + }) + + # run + workflow.run(num_beats) + + # gather results + return workflow.gather_results() '''# Example usage @@ -345,6 +432,49 @@ def run_instance(instance, num_beats=stop): #array_to_wav(os.path.join(os.getcwd(), 'result.wav'))''' +def test_ring_mod(): + stop = 4 + frequencies = [262, 294, 330, 349] + + def ring_mod_create_instance(): + return { + 'ring_modulation': { + '_type': 'process', + 'address': 'local:ring_modulation', + 'config': { + 'mod_freq': 30, + 'starting_frequency': frequencies[0] + }, + 'wires': { # this should return that which is in the schema + 'output_signal': ['output_signal_store'], + } + }, + 'emitter': { + '_type': 'step', + 'address': 'local:ram-emitter', + 'config': { + 'ports': { + 'inputs': { + 'output_signal': 'list[float]' + }, + } + }, + 'wires': { + 'inputs': { + 'output_signal': ['output_signal_store'], + } + } + } + } + + measure = [] + starting_signal = start_sine_wave(stop, frequencies[0]) + instance = ring_mod_create_instance() + result = run_instance(instance) + measure.append(result) + + if __name__ == '__main__': - test_tremolo() + test_ring_mod() + # test_tremolo() # test_medium_distortion()