Page Menu
Home
Search
Configure Global Search
Log In
Files
F20303
system_hid_mapper.py
Public
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Authored By
David Gnedt (lxp)
Nov 13 2013, 4:23 PM
Size
17 KB
Subscribers
None
system_hid_mapper.py
View Options
# ##### BEGIN GPL LICENSE BLOCK #####
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# ##### END GPL LICENSE BLOCK #####
bl_info
=
{
"name"
:
"HID Mapper"
,
"description"
:
"Maps HID devices (e.g. MIDI devices) to custom blender"
" functions."
,
"author"
:
"David Gnedt, Manuel Steiner, Christian Voglhuber"
,
"version"
:
(
0
,
1
),
"blender"
:
(
2
,
61
,
0
),
"location"
:
"View Text Editor > Templates > HID Mapping"
,
"warning"
:
""
,
# used for warning icon and text in addons panel
"wiki_url"
:
"http://wiki.blender.org/index.php/Extensions:2.6/Py/"
"Scripts/System/HID_Mapper"
,
"tracker_url"
:
"http://projects.blender.org/tracker/index.php?"
"func=detail&aid=31680"
,
"category"
:
"System"
}
import
bpy
import
atexit
as
_atexit
import
functools
as
_functools
import
inspect
as
_inspect
import
threading
as
_threading
import
multiprocessing
as
_multiprocessing
import
subprocess
as
_subprocess
_subsys_instances
=
{}
_message_mappings
=
{}
# Common classes
class
MessageType
(
object
):
_subsys_type
=
(
None
,
None
)
def
__hash__
(
self
):
return
hash
(
self
.
_subsys_type
)
def
__eq__
(
self
,
other
):
return
self
.
_subsys_type
==
other
.
_subsys_type
def
set_subsystem_type
(
self
,
subsys_type
):
_subsys_type
=
subsys_type
class
Message
(
object
):
msg_type
=
None
def
__init__
(
self
,
msg_type
):
self
.
msg_type
=
msg_type
def
get_type
(
self
):
return
self
.
msg_type
class
SubsystemException
(
Exception
):
pass
class
Subsystem
(
object
):
_device
=
None
_status
=
False
_debug
=
False
@classmethod
def
create
(
subsys_class
,
**
kwargs
):
device
=
subsys_class
.
get_device
(
**
kwargs
)
t
=
(
subsys_class
,
device
)
if
t
in
_subsys_instances
:
if
'debug'
in
kwargs
:
_subsys_instances
[
t
]
.
_debug
=
kwargs
[
'debug'
]
return
_subsys_instances
[
t
]
else
:
if
subsys_class
.
is_available
(
device
):
subsys
=
subsys_class
(
device
)
if
'debug'
in
kwargs
:
subsys
.
_debug
=
kwargs
[
'debug'
]
_subsys_instances
[
t
]
=
subsys
return
subsys
else
:
raise
SubsystemException
(
'Subsystem is not available'
' for this device'
)
@staticmethod
def
get_device
(
**
kwargs
):
pass
@staticmethod
def
is_available
(
device
):
pass
def
get_type
(
self
):
return
(
type
(
self
),
self
.
_device
)
def
is_debug
(
self
):
return
self
.
_debug
def
is_running
(
self
):
return
self
.
_status
def
start
(
self
):
pass
def
stop
(
self
):
pass
def
poll
(
self
):
pass
class
ThreadedSubsystem
(
Subsystem
):
_thread
=
None
_queue
=
None
def
__init__
(
self
):
self
.
_queue
=
_multiprocessing
.
Queue
()
def
__del__
(
self
):
self
.
stop
()
def
start
(
self
):
self
.
_thread
=
_threading
.
Thread
(
target
=
self
.
run
)
self
.
_thread
.
daemon
=
True
self
.
_thread
.
start
()
def
stop
(
self
):
self
.
_thread
.
join
()
def
poll
(
self
):
if
self
.
_queue
is
not
None
and
not
self
.
_queue
.
empty
():
return
self
.
_queue
.
get_nowait
()
def
run
(
self
):
pass
# MIDI subsystem implementation
class
MidiMessageType
(
MessageType
):
_status
=
None
_control
=
None
def
__init__
(
self
,
**
kwargs
):
if
'status'
in
kwargs
:
self
.
_status
=
kwargs
[
'status'
]
if
'control'
in
kwargs
:
self
.
_control
=
kwargs
[
'control'
]
def
__hash__
(
self
):
return
hash
((
self
.
_subsys_type
,
self
.
_status
,
self
.
_control
))
def
__eq__
(
self
,
other
):
return
(
self
.
_subsys_type
==
other
.
_subsys_type
and
self
.
_status
==
other
.
_status
and
self
.
_control
==
other
.
_control
)
def
get_status
(
self
):
return
self
.
_status
def
get_control
(
self
):
return
self
.
_control
class
MidiMessage
(
Message
):
_value
=
None
def
__init__
(
self
,
data
):
super
()
.
__init__
(
MidiMessageType
(
status
=
data
[
0
],
control
=
data
[
1
]))
self
.
_value
=
data
[
2
]
def
__str__
(
self
):
return
'
%s
(status=
%s
, control=
%s
, value=
%s
)'
%
(
__class__
.
__name__
,
self
.
get_type
()
.
get_status
(),
self
.
get_type
()
.
get_control
(),
self
.
_value
)
def
get_status
(
self
):
return
self
.
get_type
()
.
get_status
()
def
get_control
(
self
):
return
self
.
get_type
()
.
get_control
()
def
get_value
(
self
):
return
self
.
_value
def
set_value
(
self
,
value
):
self
.
_value
=
value
class
AMidiSubsystem
(
ThreadedSubsystem
):
_proc
=
None
@staticmethod
def
_get_devices
():
devices
=
{}
try
:
output
=
_subprocess
.
check_output
([
'amidi'
,
'-l'
])
lines
=
output
.
decode
(
'utf-8'
)
.
split
(
'
\n
'
)
for
line
in
lines
[
1
:]:
line
=
line
.
split
(
None
,
2
)
if
len
(
line
)
==
3
:
devices
[
line
[
1
]]
=
line
[
2
]
return
devices
except
_subprocess
.
CalledProcessError
:
return
{}
@staticmethod
def
get_device
(
**
kwargs
):
if
'device'
in
kwargs
:
return
kwargs
[
'device'
]
elif
'name'
in
kwargs
:
for
dev
,
name
in
AMidiSubsystem
.
_get_devices
()
.
items
():
if
name
==
kwargs
[
'name'
]:
return
dev
raise
SubsystemException
(
'Device not found'
)
@staticmethod
def
is_available
(
device
):
return
device
in
AMidiSubsystem
.
_get_devices
()
def
__init__
(
self
,
device
):
super
()
.
__init__
()
self
.
_device
=
device
def
start
(
self
):
if
not
self
.
_status
:
self
.
_status
=
True
print
(
'HID Mapper: Starting amidi subsystem for
%s
...'
%
self
.
_device
)
super
()
.
start
()
def
stop
(
self
):
if
self
.
_status
:
self
.
_status
=
False
print
(
'HID Mapper: Stopping amidi subsystem
%s
...'
%
self
.
_device
)
if
self
.
_proc
.
poll
()
is
None
:
self
.
_proc
.
terminate
()
self
.
_proc
.
wait
()
super
()
.
stop
()
def
run
(
self
):
self
.
_proc
=
_subprocess
.
Popen
([
'amidi'
,
'-p'
,
self
.
_device
,
'-d'
],
stdout
=
_subprocess
.
PIPE
)
while
self
.
_status
and
self
.
_proc
.
poll
()
is
None
:
line
=
self
.
_proc
.
stdout
.
read
(
9
)
midi
=
line
[
1
:]
.
split
(
b
' '
)
try
:
for
i
in
range
(
len
(
midi
)):
midi
[
i
]
=
int
(
midi
[
i
],
16
)
self
.
_queue
.
put
(
MidiMessage
(
midi
))
except
ValueError
:
if
self
.
_status
:
print
(
'HID Mapper: Parsing error in amidi subsystem
%s
'
%
self
.
_device
)
if
self
.
_proc
.
poll
()
is
None
:
self
.
_proc
.
terminate
()
if
self
.
_status
:
self
.
_status
=
False
print
(
'HID Mapper: Unexpected error in amidi subsystem
%s
'
%
self
.
_device
)
else
:
print
(
'HID Mapper: Stopped amidi subsystem
%s
'
%
self
.
_device
)
# Blender handler
def
_scene_update_callback
(
context
):
# iterate over the _subsys_instances
for
subsys
in
_subsys_instances
.
values
():
# check for new message and run the proper _message_mappings
msg
=
subsys
.
poll
()
while
msg
is
not
None
:
if
subsys
.
is_debug
():
print
(
'HID Mapper DEBUG:
%s
'
%
msg
)
msgs
=
[
msg
]
nextmsg
=
subsys
.
poll
()
while
nextmsg
is
not
None
and
msg
.
get_type
()
==
nextmsg
.
get_type
():
if
subsys
.
is_debug
():
print
(
'HID Mapper DEBUG:
%s
'
%
nextmsg
)
msgs
.
append
(
nextmsg
)
nextmsg
=
subsys
.
poll
()
if
msg
.
get_type
()
in
_message_mappings
:
for
(
c
,
f
)
in
_message_mappings
[
msg
.
get_type
()]:
if
'merge_decorated'
in
f
.
mapper_options
:
f
(
c
,
msgs
)
else
:
for
m
in
msgs
:
f
(
c
,
m
)
msg
=
nextmsg
# Functions
def
_handle_mapper_options
(
orig_func
,
new_func
):
if
hasattr
(
orig_func
,
'mapper_options'
):
if
orig_func
!=
new_func
:
setattr
(
new_func
,
'mapper_options'
,
orig_func
.
mapper_options
)
else
:
setattr
(
new_func
,
'mapper_options'
,
{
'orig_func'
:
orig_func
,
'mappings'
:
[]
})
def
_is_mapped_function
(
obj
):
return
_inspect
.
isfunction
(
obj
)
and
hasattr
(
obj
,
'mapper_options'
)
@_atexit.register
def
reset_mapper
():
if
_scene_update_callback
in
bpy
.
app
.
handlers
.
scene_update_post
:
bpy
.
app
.
handlers
.
scene_update_post
.
remove
(
_scene_update_callback
)
for
subsys
in
_subsys_instances
.
values
():
subsys
.
stop
()
_subsys_instances
.
clear
()
_message_mappings
.
clear
()
# Decorators
def
register_mapping
(
subsys
):
def
register_mapping
(
c
):
print
(
'HID Mapper: Registering mapping
%s
'
%
c
.
__name__
)
for
fname
,
f
in
_inspect
.
getmembers
(
c
,
predicate
=
_is_mapped_function
):
for
msg_type
in
f
.
mapper_options
[
'mappings'
]:
msg_type
.
set_subsystem_type
(
subsys
.
get_type
())
if
msg_type
not
in
_message_mappings
:
_message_mappings
[
msg_type
]
=
[]
_message_mappings
[
msg_type
]
.
append
((
c
,
f
))
if
_scene_update_callback
not
in
bpy
.
app
.
handlers
.
scene_update_post
:
bpy
.
app
.
handlers
.
scene_update_post
.
append
(
_scene_update_callback
)
if
not
subsys
.
is_running
():
subsys
.
start
()
return
c
return
register_mapping
def
map_message
(
msg_type
):
def
map_message
(
f
):
_handle_mapper_options
(
f
,
f
)
if
msg_type
not
in
f
.
mapper_options
[
'mappings'
]:
f
.
mapper_options
[
'mappings'
]
.
append
(
msg_type
)
return
f
return
map_message
def
switch_pressed
(
**
kwargs
):
def
switch_pressed
(
f
):
keyvalue
=
0x7F
if
'value'
in
kwargs
:
keyvalue
=
kwargs
[
'value'
]
def
switch_pressed
(
self
,
msg
):
if
isinstance
(
msg
,
MidiMessage
):
# if button is hit execute the function
if
msg
.
get_value
()
==
keyvalue
:
f
(
self
,
msg
)
_handle_mapper_options
(
f
,
switch_pressed
)
return
switch_pressed
return
switch_pressed
def
switch_released
(
**
kwargs
):
def
switch_released
(
f
):
keyvalue
=
0x00
if
'value'
in
kwargs
:
keyvalue
=
kwargs
[
'value'
]
def
switch_released
(
self
,
msg
):
if
isinstance
(
msg
,
MidiMessage
):
# if button is released execute the function
if
msg
.
get_value
()
==
keyvalue
:
f
(
self
,
msg
)
_handle_mapper_options
(
f
,
switch_released
)
return
switch_released
return
switch_released
def
relative_value
(
**
kwargs
):
def
relative_value
(
f
):
maxvalue
=
0x80
if
'max'
in
kwargs
:
maxvalue
=
kwargs
[
'max'
]
breakvalue
=
0x40
if
'break'
in
kwargs
:
breakvalue
=
kwargs
[
'break'
]
def
relative_value
(
self
,
msg
):
if
isinstance
(
msg
,
MidiMessage
):
if
msg
.
get_value
()
>
breakvalue
:
msg
.
set_value
(
msg
.
get_value
()
-
maxvalue
)
elif
isinstance
(
msg
,
list
):
for
m
in
msg
:
if
isinstance
(
m
,
MidiMessage
):
if
m
.
get_value
()
>
breakvalue
:
m
.
set_value
(
m
.
get_value
()
-
maxvalue
)
f
(
self
,
msg
)
_handle_mapper_options
(
f
,
relative_value
)
return
relative_value
return
relative_value
class
MergeType
(
object
):
ABSOLUTE_VALUE
=
0
RELATIVE_VALUE
=
1
def
merge
(
**
kwargs
):
def
merge
(
f
):
def
merge
(
self
,
msgs
):
if
kwargs
[
'type'
]
==
MergeType
.
RELATIVE_VALUE
:
msg
=
msgs
[
0
]
msgs
=
map
(
lambda
x
:
x
.
get_value
(),
msgs
)
msg
.
set_value
(
_functools
.
reduce
(
lambda
x
,
y
:
x
+
y
,
msgs
))
f
(
self
,
msg
)
else
:
f
(
self
,
msgs
[
-
1
])
_handle_mapper_options
(
f
,
merge
)
merge
.
mapper_options
[
'merge_decorated'
]
=
True
return
merge
return
merge
# User interface
class
TemplateOperator
(
bpy
.
types
.
Operator
):
bl_idname
=
'mapper.template'
bl_label
=
'Add HID Mapping'
def
execute
(
self
,
context
):
text
=
bpy
.
data
.
texts
.
new
(
'hid_mapping.py'
)
text
.
from_string
(
"""# Functions:
#
# reset_mapper()
#
#
# Constructors/Factories:
#
# AMidiSubsystem.create(device=<Device>, name=<Device Name>, debug=<True/False*>)
# MidiMessageType(status=<Status>, control=<Control>)
#
#
# Mapping class decorators:
#
# register_mapping(<subsystem>)
#
#
# Mapping method decorators:
#
# map_message(<MessageType>)
# switch_pressed(value=<Pressed Value/0x7F*>)
# switch_released(value=<Released Value/0x00*>)
# relative_value(max=<Max Value/0x80*>, break=<Break Point Value/0x40*>)
# merge(type=<MergeType.ABSOLUTE_VALUE*/MergeType.RELATIVE_VALUE>)
#
#
# MIDI message methods:
#
# msg.get_status()
# msg.get_control()
# msg.get_value()
#
import bpy
from system_hid_mapper import *
reset_mapper()
@register_mapping(AMidiSubsystem.create(name='TotalTrack Control MIDI 1', debug=True))
class VSEMapping(object):
@map_message(MidiMessageType(status=144, control=67)) # play left
@switch_pressed()
def play(self, msg):
''' play '''
bpy.ops.screen.animation_play()
@map_message(MidiMessageType(status=144, control=51)) # pause left
@switch_pressed()
def pause(self, msg):
''' pause '''
bpy.ops.screen.animation_cancel(restore_frame=False)
def seek(self, value):
''' move current frame '''
bpy.context.scene.frame_current = bpy.context.scene.frame_current + value
@map_message(MidiMessageType(status=144, control=73)) # loop in left
@switch_pressed()
def seek_prev(self, msg):
''' move to prev frame '''
self.seek(self, -1)
@map_message(MidiMessageType(status=144, control=74)) # loop out left
@switch_pressed()
def seek_next(self, msg):
''' move to next frame '''
self.seek(self, 1)
@map_message(MidiMessageType(status=176, control=25)) # jog left
@relative_value()
@merge(type=MergeType.RELATIVE_VALUE)
def seek_jog(self, msg):
''' move current frame '''
self.seek(self, msg.get_value())
@map_message(MidiMessageType(status=144, control=77)) # loop in right
@switch_pressed()
def jump_keyframe_prev(self, msg):
''' jump to prev keyframe '''
bpy.ops.screen.keyframe_jump(next=False)
@map_message(MidiMessageType(status=144, control=78)) # loop out right
@switch_pressed()
def jump_keyframe_next(self, msg):
''' jump to next keyframe '''
bpy.ops.screen.keyframe_jump()
@map_message(MidiMessageType(status=144, control=75)) # load track left
@switch_pressed()
def strip_toggle_mute(self, msg):
''' toggle mute of selected clip '''
sqe = bpy.context.scene.sequence_editor
if sqe is not None and sqe.active_strip is not None:
sqe.active_strip.mute = not sqe.active_strip.mute
@map_message(MidiMessageType(status=176, control=8)) # volume left
@merge(type=MergeType.ABSOLUTE_VALUE)
def strip_volume_keyframe_insert(self, msg):
''' set key-framed volume of selected sound clip '''
sqe = bpy.context.scene.sequence_editor
if (sqe is not None and sqe.active_strip is not None and
sqe.active_strip.type == 'SOUND'):
sqe.active_strip.volume = msg.get_value() / 127.0
sqe.active_strip.keyframe_insert(data_path='volume')
"""
)
if
context
.
space_data
.
type
==
'TEXT_EDITOR'
:
context
.
space_data
.
text
=
text
return
{
'FINISHED'
}
def
_template_menu_draw
(
self
,
context
):
self
.
layout
.
operator
(
'mapper.template'
,
text
=
'HID Mapping'
)
def
register
():
print
(
'HID Mapper: Registering addon'
)
bpy
.
utils
.
register_module
(
__name__
)
bpy
.
types
.
TEXT_MT_templates
.
prepend
(
_template_menu_draw
)
def
unregister
():
print
(
'HID Mapper: Unregistering addon'
)
reset_mapper
()
bpy
.
types
.
TEXT_MT_templates
.
remove
(
_template_menu_draw
)
bpy
.
utils
.
unregister_module
(
__name__
)
if
__name__
==
'__main__'
:
try
:
unregister
()
except
:
pass
register
()
File Metadata
Details
Mime Type
text/x-python
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
f6/24/06a9a8165bbba146a3ec8e1d43b3
Event Timeline
Log In to Comment