1+ # coding=utf-8
2+
3+ import time
4+ import socket
5+
6+ from pymycobot .generate import CommandGenerator
7+ from pymycobot .common import ProtocolCode , write , read
8+ from pymycobot .error import calibration_parameters
9+
10+
11+ class MercurySocket (CommandGenerator ):
12+ _write = write
13+ _read = read
14+ def __init__ (self , ip , netport = 9000 , debug = False ):
15+ """
16+ Args:
17+ ip: Server ip
18+ netport: Server port(default 9000)
19+ """
20+ super (MercurySocket , self ).__init__ (debug )
21+ self .calibration_parameters = calibration_parameters
22+ self .SERVER_IP = ip
23+ self .SERVER_PORT = netport
24+ self .sock = self .connect_socket ()
25+
26+ def connect_socket (self ):
27+ sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
28+ sock .connect ((self .SERVER_IP , self .SERVER_PORT ))
29+ return sock
30+
31+
32+ def _mesg (self , genre , * args , ** kwargs ):
33+ """
34+
35+ Args:
36+ genre: command type (Command)
37+ *args: other data.
38+ It is converted to octal by default.
39+ If the data needs to be encapsulated into hexadecimal,
40+ the array is used to include them. (Data cannot be nested)
41+ **kwargs: support `has_reply`
42+ has_reply: Whether there is a return value to accept.
43+ """
44+ real_command , has_reply = super (MercurySocket , self )._mesg (genre , * args , ** kwargs )
45+ self ._write (self ._flatten (real_command ), "socket" )
46+
47+ if has_reply :
48+ data = self ._read (genre , _class = self .__class__ .__name__ , method = 'socket' )
49+ if genre == ProtocolCode .SET_SSID_PWD :
50+ return None
51+ res = self ._process_received (data , genre , 14 )
52+ if res == []:
53+ return None
54+ if genre in [
55+ ProtocolCode .ROBOT_VERSION ,
56+ ProtocolCode .GET_ROBOT_ID ,
57+ ProtocolCode .IS_POWER_ON ,
58+ ProtocolCode .IS_CONTROLLER_CONNECTED ,
59+ ProtocolCode .IS_PAUSED , # TODO have bug: return b''
60+ ProtocolCode .IS_IN_POSITION ,
61+ ProtocolCode .IS_MOVING ,
62+ ProtocolCode .IS_SERVO_ENABLE ,
63+ ProtocolCode .IS_ALL_SERVO_ENABLE ,
64+ ProtocolCode .GET_SERVO_DATA ,
65+ ProtocolCode .GET_DIGITAL_INPUT ,
66+ ProtocolCode .GET_GRIPPER_VALUE ,
67+ ProtocolCode .IS_GRIPPER_MOVING ,
68+ ProtocolCode .GET_SPEED ,
69+ ProtocolCode .GET_ENCODER ,
70+ ProtocolCode .GET_BASIC_INPUT ,
71+ ProtocolCode .GET_TOF_DISTANCE ,
72+ ProtocolCode .GET_END_TYPE ,
73+ ProtocolCode .GET_MOVEMENT_TYPE ,
74+ ProtocolCode .GET_REFERENCE_FRAME ,
75+ ProtocolCode .GET_FRESH_MODE ,
76+ ProtocolCode .GET_GRIPPER_MODE ,
77+ ProtocolCode .SET_SSID_PWD ,
78+ ProtocolCode .COBOTX_IS_GO_ZERO ,
79+ ProtocolCode .GET_ERROR_DETECT_MODE
80+ ]:
81+ return self ._process_single (res )
82+ elif genre in [ProtocolCode .GET_ANGLES ]:
83+ return [self ._int2angle (angle ) for angle in res ]
84+ elif genre in [
85+ ProtocolCode .GET_COORDS ,
86+ ProtocolCode .MERCURY_GET_BASE_COORDS ,
87+ ProtocolCode .GET_TOOL_REFERENCE ,
88+ ProtocolCode .GET_WORLD_REFERENCE ,
89+ ]:
90+ if res :
91+ r = []
92+ for idx in range (3 ):
93+ r .append (self ._int2coord (res [idx ]))
94+ for idx in range (3 , 6 ):
95+ r .append (self ._int2angle (res [idx ]))
96+ return r
97+ else :
98+ return res
99+ elif genre in [ProtocolCode .GET_SERVO_VOLTAGES ]:
100+ return [self ._int2coord (angle ) for angle in res ]
101+ elif genre in [ProtocolCode .GET_BASIC_VERSION , ProtocolCode .SOFTWARE_VERSION , ProtocolCode .GET_ATOM_VERSION ]:
102+ return self ._int2coord (self ._process_single (res ))
103+ elif genre in [
104+ ProtocolCode .GET_JOINT_MAX_ANGLE ,
105+ ProtocolCode .GET_JOINT_MIN_ANGLE ,
106+ ]:
107+ return self ._int2coord (res [0 ])
108+ elif genre == ProtocolCode .GET_ANGLES_COORDS :
109+ r = []
110+ for index in range (len (res )):
111+ if index < 7 :
112+ r .append (self ._int2angle (res [index ]))
113+ elif index < 10 :
114+ r .append (self ._int2coord (res [index ]))
115+ else :
116+ r .append (self ._int2angle (res [index ]))
117+ return r
118+ elif genre == ProtocolCode .GO_ZERO :
119+ r = []
120+ if res :
121+ if 1 not in res [1 :]:
122+ return res [0 ]
123+ else :
124+ for i in range (1 , len (res )):
125+ if res [i ] == 1 :
126+ r .append (i )
127+ return r
128+ elif genre in [ProtocolCode .COBOTX_GET_ANGLE , ProtocolCode .COBOTX_GET_SOLUTION_ANGLES ]:
129+ return self ._int2angle (res [0 ])
130+ else :
131+ return res
132+ return None
133+
134+ def open (self ):
135+ self ._serial_port .open ()
136+
137+ def close (self ):
138+ self ._serial_port .close ()
139+
140+ def set_solution_angles (self , angle , speed ):
141+ """Set zero space deflection angle value
142+
143+ Args:
144+ angle: Angle of joint 1. The angle range is -90 ~ 90
145+ speed: 1 - 100.
146+ """
147+ self .calibration_parameters (
148+ class_name = self .__class__ .__name__ , speed = speed , solution_angle = angle
149+ )
150+ return self ._mesg (
151+ ProtocolCode .COBOTX_SET_SOLUTION_ANGLES , [self ._angle2int (angle )], speed
152+ )
153+
154+ def get_solution_angles (self ):
155+ """Get zero space deflection angle value"""
156+ return self ._mesg (ProtocolCode .COBOTX_GET_SOLUTION_ANGLES , has_reply = True )
157+
158+ def write_move_c (self , transpoint , endpoint , speed ):
159+ """_summary_
160+
161+ Args:
162+ transpoint (list): Arc passing point coordinates
163+ endpoint (list): Arc end point coordinates
164+ speed (int): 1 ~ 100
165+ """
166+ start = []
167+ end = []
168+ for index in range (6 ):
169+ if index < 3 :
170+ start .append (self ._coord2int (transpoint [index ]))
171+ end .append (self ._coord2int (endpoint [index ]))
172+ else :
173+ start .append (self ._angle2int (transpoint [index ]))
174+ end .append (self ._angle2int (endpoint [index ]))
175+ return self ._mesg (ProtocolCode .WRITE_MOVE_C , start , end , speed )
176+
177+ def focus_all_servos (self ):
178+ """Lock all joints"""
179+ return self ._mesg (ProtocolCode .FOCUS_ALL_SERVOS )
180+
181+ def go_zero (self ):
182+ """Control the machine to return to the zero position.
183+
184+ Return:
185+ 1 : All motors return to zero position.
186+ list : Motor with corresponding ID failed to return to zero.
187+ """
188+ return self ._mesg (ProtocolCode .GO_ZERO , has_reply = True )
189+
190+ def get_angle (self , joint_id ):
191+ """Get single joint angle
192+
193+ Args:
194+ joint_id (int): 1 ~ 7 or 11 ~ 13.
195+ """
196+ self .calibration_parameters (class_name = self .__class__ .__name__ , id = joint_id )
197+ return self ._mesg (ProtocolCode .COBOTX_GET_ANGLE , joint_id , has_reply = True )
198+
199+ def is_gone_zero (self ):
200+ """Check if it has returned to zero
201+
202+ Return:
203+ 1 : Return to zero successfully.
204+ 0 : Returning to zero.
205+ """
206+ return self ._mesg (ProtocolCode .COBOTX_IS_GO_ZERO , has_reply = True )
207+
208+ def set_encoder (self , joint_id , encoder ):
209+ """Set a single joint rotation to the specified potential value.
210+
211+ Args:
212+ joint_id (int): Joint id 1 - 7.
213+ encoder: The value of the set encoder.
214+ """
215+ # TODO Mercury此接口待定
216+ # self.calibration_parameters(
217+ # class_name=self.__class__.__name__, id=joint_id, encoder=encoder
218+ # )
219+ return self ._mesg (ProtocolCode .SET_ENCODER , joint_id , [encoder ])
220+
221+ def servo_restore (self , joint_id ):
222+ """Abnormal recovery of joints
223+
224+ Args:
225+ joint_id (int): Joint ID.
226+ arm : 1 ~ 7
227+ waist : 13
228+ All joints: 254
229+ """
230+ self .calibration_parameters (
231+ class_name = self .__class__ .__name__ , servo_restore = joint_id
232+ )
233+ self ._mesg (ProtocolCode .SERVO_RESTORE , joint_id )
234+
235+ def set_error_detect_mode (self , mode ):
236+ """Set error detection mode. Turn off without saving, default to open state
237+
238+ Return:
239+ mode : 0 - close 1 - open.
240+ """
241+ self .calibration_parameters (
242+ class_name = self .__class__ .__name__ , mode = mode
243+ )
244+ self ._mesg (ProtocolCode .SET_ERROR_DETECT_MODE , mode )
245+
246+ def get_error_detect_mode (self ):
247+ """Set error detection mode"""
248+ return self ._mesg (ProtocolCode .GET_ERROR_DETECT_MODE , has_reply = True )
249+
250+ def sync_send_angles (self , degrees , speed , timeout = 15 ):
251+ """Send the angle in synchronous state and return when the target point is reached
252+
253+ Args:
254+ degrees: a list of degree values(List[float]), length 6.
255+ speed: (int) 0 ~ 100
256+ timeout: default 7s.
257+ """
258+ t = time .time ()
259+ self .send_angles (degrees , speed )
260+ while time .time () - t < timeout :
261+ f = self .is_in_position (degrees , 0 )
262+ if f == 1 :
263+ break
264+ time .sleep (0.1 )
265+ return self
266+
267+ def sync_send_coords (self , coords , speed , mode = None , timeout = 15 ):
268+ """Send the coord in synchronous state and return when the target point is reached
269+
270+ Args:
271+ coords: a list of coord values(List[float])
272+ speed: (int) 0 ~ 100
273+ mode: (int): 0 - angular(default), 1 - linear
274+ timeout: default 7s.
275+ """
276+ t = time .time ()
277+ self .send_coords (coords , speed , mode )
278+ while time .time () - t < timeout :
279+ if self .is_in_position (coords , 1 ) == 1 :
280+ break
281+ time .sleep (0.1 )
282+ return self
283+
284+ def get_base_coords (self ):
285+ """get base coords"""
286+ return self ._mesg (ProtocolCode .MERCURY_GET_BASE_COORDS , has_reply = True )
287+
288+ def send_base_coord (self , axis , coord , speed ):
289+ """_summary_
290+
291+ Args:
292+ axis (_type_): _description_
293+ coord (_type_): _description_
294+ speed (_type_): _description_
295+ """
296+ if axis < 4 :
297+ coord = self ._coord2int (coord )
298+ else :
299+ coord = self ._angle2int (coord )
300+ return self ._mesg (ProtocolCode .MERCURY_SET_BASE_COORD , axis , [coord ], speed )
301+
302+ def send_base_coords (self , coords , speed ):
303+ """_summary_
304+
305+ Args:
306+ coords (_type_): _description_
307+ speed (_type_): _description_
308+ """
309+ coord_list = []
310+ for idx in range (3 ):
311+ coord_list .append (self ._coord2int (coords [idx ]))
312+ for angle in coords [3 :]:
313+ coord_list .append (self ._angle2int (angle ))
314+ return self ._mesg (ProtocolCode .MERCURY_SET_BASE_COORDS , coord_list , speed )
315+
316+ def jog_base_coord (self , axis , direction , speed ):
317+ """_summary_
318+
319+ Args:
320+ axis (_type_): _description_
321+ direction (_type_): _description_
322+ speed (_type_): _description_
323+ """
324+ return self ._mesg (ProtocolCode .MERCURY_JOG_BASE_COORD , axis , direction , speed )
325+
326+ def drag_tech_save (self ):
327+ """Start recording the dragging teaching point. In order to show the best sports effect, the recording time should not exceed 90 seconds."""
328+ return self ._mesg (ProtocolCode .MERCURY_DRAG_TECH_SAVE )
329+
330+ def drag_tech_execute (self ):
331+ """Start dragging the teaching point and only execute it once."""
332+ return self ._mesg (ProtocolCode .MERCURY_DRAG_TECH_EXECUTE )
333+
334+ def drag_tech_pause (self ):
335+ """Pause recording of dragging teaching point"""
336+ self ._mesg (ProtocolCode .MERCURY_DRAG_TECH_PAUSE )
337+
338+ def is_gripper_moving (self , mode = None ):
339+ """Judge whether the gripper is moving or not
340+
341+ Args:
342+ mode: 1 - pro gripper(default) 2 - Parallel gripper
343+
344+ Returns:
345+ 0 - not moving
346+ 1 - is moving
347+ -1- error data
348+ """
349+ if mode :
350+ return self ._mesg (ProtocolCode .IS_GRIPPER_MOVING , mode , has_reply = True )
351+ return self ._mesg (ProtocolCode .IS_GRIPPER_MOVING , has_reply = True )
352+
353+
354+
0 commit comments