@@ -323,3 +323,101 @@ def xreader():
323323 yield sample
324324
325325 return xreader
326+
327+
328+ def _buf2lines (buf , line_break = "\n " ):
329+ # FIXME: line_break should be automatically configured.
330+ lines = buf .split (line_break )
331+ return lines [:- 1 ], lines [- 1 ]
332+
333+
334+ def pipe_reader (left_cmd ,
335+ parser ,
336+ bufsize = 8192 ,
337+ file_type = "plain" ,
338+ cut_lines = True ,
339+ line_break = "\n " ):
340+ """
341+ pipe_reader read data by stream from a command, take it's
342+ stdout into a pipe buffer and redirect it to the parser to
343+ parse, then yield data as your desired format.
344+
345+ You can using standard linux command or call another program
346+ to read data, from HDFS, Ceph, URL, AWS S3 etc:
347+
348+ cmd = "hadoop fs -cat /path/to/some/file"
349+ cmd = "cat sample_file.tar.gz"
350+ cmd = "curl http://someurl"
351+ cmd = "python print_s3_bucket.py"
352+
353+ A sample parser:
354+
355+ def sample_parser(lines):
356+ # parse each line as one sample data,
357+ # return a list of samples as batches.
358+ ret = []
359+ for l in lines:
360+ ret.append(l.split(" ")[1:5])
361+ return ret
362+
363+ :param left_cmd: command to excute to get stdout from.
364+ :type left_cmd: string
365+ :param parser: parser function to parse lines of data.
366+ if cut_lines is True, parser will receive list
367+ of lines.
368+ if cut_lines is False, parser will receive a
369+ raw buffer each time.
370+ parser should return a list of parsed values.
371+ :type parser: callable
372+ :param bufsize: the buffer size used for the stdout pipe.
373+ :type bufsize: int
374+ :param file_type: can be plain/gzip, stream buffer data type.
375+ :type file_type: string
376+ :param cut_lines: whether to pass lines instead of raw buffer
377+ to the parser
378+ :type cut_lines: bool
379+ :param line_break: line break of the file, like \n or \r
380+ :type line_break: string
381+
382+ :return: the reader generator.
383+ :rtype: callable
384+ """
385+ if not isinstance (left_cmd , str ):
386+ raise TypeError ("left_cmd must be a string" )
387+ if not callable (parser ):
388+ raise TypeError ("parser must be a callable object" )
389+
390+ process = subprocess .Popen (
391+ left_cmd .split (" " ), bufsize = bufsize , stdout = subprocess .PIPE )
392+ # TODO(typhoonzero): add a thread to read stderr
393+
394+ # Always init a decompress object is better than
395+ # create in the loop.
396+ dec = zlib .decompressobj (
397+ 32 + zlib .MAX_WBITS ) # offset 32 to skip the header
398+
399+ def reader ():
400+ remained = ""
401+ while True :
402+ buff = process .stdout .read (bufsize )
403+ if buff :
404+ if file_type == "gzip" :
405+ decomp_buff = dec .decompress (buff )
406+ elif file_type == "plain" :
407+ decomp_buff = buff
408+ else :
409+ raise TypeError ("file_type %s is not allowed" % file_type )
410+
411+ if cut_lines :
412+ lines , remained = _buf2lines ('' .join (
413+ [remained , decomp_buff ]), line_break )
414+ parsed_list = parser (lines )
415+ for ret in parsed_list :
416+ yield ret
417+ else :
418+ for ret in parser (decomp_buff ):
419+ yield ret
420+ else :
421+ break
422+
423+ return reader
0 commit comments