Page MenuHome

system_hid_mapper.py

Authored By
David Gnedt (lxp)
Nov 13 2013, 4:23 PM
Size
17 KB
Subscribers
None

system_hid_mapper.py

# ##### BEGIN GPL LICENSE BLOCK #####
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# ##### END GPL LICENSE BLOCK #####
bl_info = {
"name": "HID Mapper",
"description": "Maps HID devices (e.g. MIDI devices) to custom blender"
" functions.",
"author": "David Gnedt, Manuel Steiner, Christian Voglhuber",
"version": (0, 1),
"blender": (2, 61, 0),
"location": "View Text Editor > Templates > HID Mapping",
"warning": "", # used for warning icon and text in addons panel
"wiki_url": "http://wiki.blender.org/index.php/Extensions:2.6/Py/"
"Scripts/System/HID_Mapper",
"tracker_url": "http://projects.blender.org/tracker/index.php?"
"func=detail&aid=31680",
"category": "System"}
import bpy
import atexit as _atexit
import functools as _functools
import inspect as _inspect
import threading as _threading
import multiprocessing as _multiprocessing
import subprocess as _subprocess
_subsys_instances = {}
_message_mappings = {}
# Common classes
class MessageType(object):
_subsys_type = (None, None)
def __hash__(self):
return hash(self._subsys_type)
def __eq__(self, other):
return self._subsys_type == other._subsys_type
def set_subsystem_type(self, subsys_type):
_subsys_type = subsys_type
class Message(object):
msg_type = None
def __init__(self, msg_type):
self.msg_type = msg_type
def get_type(self):
return self.msg_type
class SubsystemException(Exception):
pass
class Subsystem(object):
_device = None
_status = False
_debug = False
@classmethod
def create(subsys_class, **kwargs):
device = subsys_class.get_device(**kwargs)
t = (subsys_class, device)
if t in _subsys_instances:
if 'debug' in kwargs:
_subsys_instances[t]._debug = kwargs['debug']
return _subsys_instances[t]
else:
if subsys_class.is_available(device):
subsys = subsys_class(device)
if 'debug' in kwargs:
subsys._debug = kwargs['debug']
_subsys_instances[t] = subsys
return subsys
else:
raise SubsystemException('Subsystem is not available'
' for this device')
@staticmethod
def get_device(**kwargs):
pass
@staticmethod
def is_available(device):
pass
def get_type(self):
return (type(self), self._device)
def is_debug(self):
return self._debug
def is_running(self):
return self._status
def start(self):
pass
def stop(self):
pass
def poll(self):
pass
class ThreadedSubsystem(Subsystem):
_thread = None
_queue = None
def __init__(self):
self._queue = _multiprocessing.Queue()
def __del__(self):
self.stop()
def start(self):
self._thread = _threading.Thread(target=self.run)
self._thread.daemon = True
self._thread.start()
def stop(self):
self._thread.join()
def poll(self):
if self._queue is not None and not self._queue.empty():
return self._queue.get_nowait()
def run(self):
pass
# MIDI subsystem implementation
class MidiMessageType(MessageType):
_status = None
_control = None
def __init__(self, **kwargs):
if 'status' in kwargs:
self._status = kwargs['status']
if 'control' in kwargs:
self._control = kwargs['control']
def __hash__(self):
return hash((self._subsys_type, self._status, self._control))
def __eq__(self, other):
return (self._subsys_type == other._subsys_type and
self._status == other._status and
self._control == other._control)
def get_status(self):
return self._status
def get_control(self):
return self._control
class MidiMessage(Message):
_value = None
def __init__(self, data):
super().__init__(MidiMessageType(status=data[0], control=data[1]))
self._value = data[2]
def __str__(self):
return '%s(status=%s, control=%s, value=%s)' % (__class__.__name__,
self.get_type().get_status(), self.get_type().get_control(),
self._value)
def get_status(self):
return self.get_type().get_status()
def get_control(self):
return self.get_type().get_control()
def get_value(self):
return self._value
def set_value(self, value):
self._value = value
class AMidiSubsystem(ThreadedSubsystem):
_proc = None
@staticmethod
def _get_devices():
devices = {}
try:
output = _subprocess.check_output(['amidi', '-l'])
lines = output.decode('utf-8').split('\n')
for line in lines[1:]:
line = line.split(None, 2)
if len(line) == 3:
devices[line[1]] = line[2]
return devices
except _subprocess.CalledProcessError:
return {}
@staticmethod
def get_device(**kwargs):
if 'device' in kwargs:
return kwargs['device']
elif 'name' in kwargs:
for dev, name in AMidiSubsystem._get_devices().items():
if name == kwargs['name']:
return dev
raise SubsystemException('Device not found')
@staticmethod
def is_available(device):
return device in AMidiSubsystem._get_devices()
def __init__(self, device):
super().__init__()
self._device = device
def start(self):
if not self._status:
self._status = True
print('HID Mapper: Starting amidi subsystem for %s...' % self._device)
super().start()
def stop(self):
if self._status:
self._status = False
print('HID Mapper: Stopping amidi subsystem %s...' % self._device)
if self._proc.poll() is None:
self._proc.terminate()
self._proc.wait()
super().stop()
def run(self):
self._proc = _subprocess.Popen(['amidi', '-p', self._device, '-d'],
stdout=_subprocess.PIPE)
while self._status and self._proc.poll() is None:
line = self._proc.stdout.read(9)
midi = line[1:].split(b' ')
try:
for i in range(len(midi)):
midi[i] = int(midi[i], 16)
self._queue.put(MidiMessage(midi))
except ValueError:
if self._status:
print('HID Mapper: Parsing error in amidi subsystem %s' % self._device)
if self._proc.poll() is None:
self._proc.terminate()
if self._status:
self._status = False
print('HID Mapper: Unexpected error in amidi subsystem %s' % self._device)
else:
print('HID Mapper: Stopped amidi subsystem %s' % self._device)
# Blender handler
def _scene_update_callback(context):
# iterate over the _subsys_instances
for subsys in _subsys_instances.values():
# check for new message and run the proper _message_mappings
msg = subsys.poll()
while msg is not None:
if subsys.is_debug():
print('HID Mapper DEBUG: %s' % msg)
msgs = [msg]
nextmsg = subsys.poll()
while nextmsg is not None and msg.get_type() == nextmsg.get_type():
if subsys.is_debug():
print('HID Mapper DEBUG: %s' % nextmsg)
msgs.append(nextmsg)
nextmsg = subsys.poll()
if msg.get_type() in _message_mappings:
for (c, f) in _message_mappings[msg.get_type()]:
if 'merge_decorated' in f.mapper_options:
f(c, msgs)
else:
for m in msgs:
f(c, m)
msg = nextmsg
# Functions
def _handle_mapper_options(orig_func, new_func):
if hasattr(orig_func, 'mapper_options'):
if orig_func != new_func:
setattr(new_func, 'mapper_options', orig_func.mapper_options)
else:
setattr(new_func, 'mapper_options', { 'orig_func': orig_func, 'mappings': [] })
def _is_mapped_function(obj):
return _inspect.isfunction(obj) and hasattr(obj, 'mapper_options')
@_atexit.register
def reset_mapper():
if _scene_update_callback in bpy.app.handlers.scene_update_post:
bpy.app.handlers.scene_update_post.remove(_scene_update_callback)
for subsys in _subsys_instances.values():
subsys.stop()
_subsys_instances.clear()
_message_mappings.clear()
# Decorators
def register_mapping(subsys):
def register_mapping(c):
print('HID Mapper: Registering mapping %s' % c.__name__)
for fname, f in _inspect.getmembers(c, predicate=_is_mapped_function):
for msg_type in f.mapper_options['mappings']:
msg_type.set_subsystem_type(subsys.get_type())
if msg_type not in _message_mappings:
_message_mappings[msg_type] = []
_message_mappings[msg_type].append((c, f))
if _scene_update_callback not in bpy.app.handlers.scene_update_post:
bpy.app.handlers.scene_update_post.append(_scene_update_callback)
if not subsys.is_running():
subsys.start()
return c
return register_mapping
def map_message(msg_type):
def map_message(f):
_handle_mapper_options(f, f)
if msg_type not in f.mapper_options['mappings']:
f.mapper_options['mappings'].append(msg_type)
return f
return map_message
def switch_pressed(**kwargs):
def switch_pressed(f):
keyvalue = 0x7F
if 'value' in kwargs:
keyvalue = kwargs['value']
def switch_pressed(self, msg):
if isinstance(msg, MidiMessage):
# if button is hit execute the function
if msg.get_value() == keyvalue:
f(self, msg)
_handle_mapper_options(f, switch_pressed)
return switch_pressed
return switch_pressed
def switch_released(**kwargs):
def switch_released(f):
keyvalue = 0x00
if 'value' in kwargs:
keyvalue = kwargs['value']
def switch_released(self, msg):
if isinstance(msg, MidiMessage):
# if button is released execute the function
if msg.get_value() == keyvalue:
f(self, msg)
_handle_mapper_options(f, switch_released)
return switch_released
return switch_released
def relative_value(**kwargs):
def relative_value(f):
maxvalue = 0x80
if 'max' in kwargs:
maxvalue = kwargs['max']
breakvalue = 0x40
if 'break' in kwargs:
breakvalue = kwargs['break']
def relative_value(self, msg):
if isinstance(msg, MidiMessage):
if msg.get_value() > breakvalue:
msg.set_value(msg.get_value() - maxvalue)
elif isinstance(msg, list):
for m in msg:
if isinstance(m, MidiMessage):
if m.get_value() > breakvalue:
m.set_value(m.get_value() - maxvalue)
f(self, msg)
_handle_mapper_options(f, relative_value)
return relative_value
return relative_value
class MergeType(object):
ABSOLUTE_VALUE = 0
RELATIVE_VALUE = 1
def merge(**kwargs):
def merge(f):
def merge(self, msgs):
if kwargs['type'] == MergeType.RELATIVE_VALUE:
msg = msgs[0]
msgs = map(lambda x: x.get_value(), msgs)
msg.set_value(_functools.reduce(lambda x, y: x + y, msgs))
f(self, msg)
else:
f(self, msgs[-1])
_handle_mapper_options(f, merge)
merge.mapper_options['merge_decorated'] = True
return merge
return merge
# User interface
class TemplateOperator(bpy.types.Operator):
bl_idname = 'mapper.template'
bl_label = 'Add HID Mapping'
def execute(self, context):
text = bpy.data.texts.new('hid_mapping.py')
text.from_string("""# Functions:
#
# reset_mapper()
#
#
# Constructors/Factories:
#
# AMidiSubsystem.create(device=<Device>, name=<Device Name>, debug=<True/False*>)
# MidiMessageType(status=<Status>, control=<Control>)
#
#
# Mapping class decorators:
#
# register_mapping(<subsystem>)
#
#
# Mapping method decorators:
#
# map_message(<MessageType>)
# switch_pressed(value=<Pressed Value/0x7F*>)
# switch_released(value=<Released Value/0x00*>)
# relative_value(max=<Max Value/0x80*>, break=<Break Point Value/0x40*>)
# merge(type=<MergeType.ABSOLUTE_VALUE*/MergeType.RELATIVE_VALUE>)
#
#
# MIDI message methods:
#
# msg.get_status()
# msg.get_control()
# msg.get_value()
#
import bpy
from system_hid_mapper import *
reset_mapper()
@register_mapping(AMidiSubsystem.create(name='TotalTrack Control MIDI 1', debug=True))
class VSEMapping(object):
@map_message(MidiMessageType(status=144, control=67)) # play left
@switch_pressed()
def play(self, msg):
''' play '''
bpy.ops.screen.animation_play()
@map_message(MidiMessageType(status=144, control=51)) # pause left
@switch_pressed()
def pause(self, msg):
''' pause '''
bpy.ops.screen.animation_cancel(restore_frame=False)
def seek(self, value):
''' move current frame '''
bpy.context.scene.frame_current = bpy.context.scene.frame_current + value
@map_message(MidiMessageType(status=144, control=73)) # loop in left
@switch_pressed()
def seek_prev(self, msg):
''' move to prev frame '''
self.seek(self, -1)
@map_message(MidiMessageType(status=144, control=74)) # loop out left
@switch_pressed()
def seek_next(self, msg):
''' move to next frame '''
self.seek(self, 1)
@map_message(MidiMessageType(status=176, control=25)) # jog left
@relative_value()
@merge(type=MergeType.RELATIVE_VALUE)
def seek_jog(self, msg):
''' move current frame '''
self.seek(self, msg.get_value())
@map_message(MidiMessageType(status=144, control=77)) # loop in right
@switch_pressed()
def jump_keyframe_prev(self, msg):
''' jump to prev keyframe '''
bpy.ops.screen.keyframe_jump(next=False)
@map_message(MidiMessageType(status=144, control=78)) # loop out right
@switch_pressed()
def jump_keyframe_next(self, msg):
''' jump to next keyframe '''
bpy.ops.screen.keyframe_jump()
@map_message(MidiMessageType(status=144, control=75)) # load track left
@switch_pressed()
def strip_toggle_mute(self, msg):
''' toggle mute of selected clip '''
sqe = bpy.context.scene.sequence_editor
if sqe is not None and sqe.active_strip is not None:
sqe.active_strip.mute = not sqe.active_strip.mute
@map_message(MidiMessageType(status=176, control=8)) # volume left
@merge(type=MergeType.ABSOLUTE_VALUE)
def strip_volume_keyframe_insert(self, msg):
''' set key-framed volume of selected sound clip '''
sqe = bpy.context.scene.sequence_editor
if (sqe is not None and sqe.active_strip is not None and
sqe.active_strip.type == 'SOUND'):
sqe.active_strip.volume = msg.get_value() / 127.0
sqe.active_strip.keyframe_insert(data_path='volume')
""")
if context.space_data.type == 'TEXT_EDITOR':
context.space_data.text = text
return {'FINISHED'}
def _template_menu_draw(self, context):
self.layout.operator('mapper.template', text='HID Mapping')
def register():
print('HID Mapper: Registering addon')
bpy.utils.register_module(__name__)
bpy.types.TEXT_MT_templates.prepend(_template_menu_draw)
def unregister():
print('HID Mapper: Unregistering addon')
reset_mapper()
bpy.types.TEXT_MT_templates.remove(_template_menu_draw)
bpy.utils.unregister_module(__name__)
if __name__ == '__main__':
try:
unregister()
except:
pass
register()

File Metadata

Mime Type
text/x-python
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
f6/24/06a9a8165bbba146a3ec8e1d43b3

Event Timeline