#!/usr/bin/env python # coding: utf-8 from rknn.api import RKNN from math import exp from sys import exit import onnx import onnxscript batch_size = 1 # embed_seq_len = 590 prompt_tokens_list = [15, 17, 21, 25] encoder_seq_len_list = [577 + p for p in prompt_tokens_list] decoder_seq_len = 1 # set current directory to the directory of this file import os os.chdir(os.path.dirname(os.path.abspath(__file__))) import subprocess import select def run_python_code(code): # 启动子进程并执行代码 process = subprocess.Popen( ['python', '-c', code], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) # 实时读取子进程的输出和错误输出 while True: reads = [process.stdout.fileno(), process.stderr.fileno()] ret = select.select(reads, [], []) for fd in ret[0]: if fd == process.stdout.fileno(): output = process.stdout.readline() if output: print(output.strip()) if fd == process.stderr.fileno(): err = process.stderr.readline() if err: print(f"Error: {err.strip()}") if process.poll() is not None: break def convert_decoder(): rknn = RKNN(verbose=True) ONNX_MODEL="decoder_model.onnx" RKNN_MODEL=ONNX_MODEL.replace(".onnx",".rknn") DATASET="dataset.txt" QUANTIZE=False # [[batch_size, encoder_seq_len], # [batch_size, encoder_seq_len, 768], # [batch_size, decoder_seq_len, 768]] input_shapes =[[[batch_size, encoder_seq_len], [batch_size, encoder_seq_len, 768], [batch_size, decoder_seq_len, 768]] for encoder_seq_len in encoder_seq_len_list] # pre-process config print('--> Config model') rknn.config(quantized_algorithm='normal', quantized_method='channel', target_platform='rk3588', optimization_level=3, single_core_mode=True, dynamic_input=input_shapes) print('done') # Load ONNX model print('--> Loading model') ret = rknn.load_onnx(model=ONNX_MODEL, ) if ret != 0: print('Load model failed!') exit(ret) print('done') # Build model print('--> Building model') ret = rknn.build(do_quantization=QUANTIZE, dataset=DATASET, rknn_batch_size=None) if ret != 0: print('Build model failed!') exit(ret) print('done') #export print('--> Export RKNN model') ret = rknn.export_rknn(RKNN_MODEL) if ret != 0: print('Export RKNN model failed!') exit(ret) print('done') def convert_encoder(): rknn = RKNN(verbose=True) ONNX_MODEL="encoder_model.onnx" RKNN_MODEL=ONNX_MODEL.replace(".onnx",".rknn") DATASET="dataset.txt" QUANTIZE=False #[[batch_size, encoder_seq_len], [batch_size, encoder_seq_len, 768]] input_shapes = [[[batch_size, encoder_seq_len], [batch_size, encoder_seq_len, 768]] for encoder_seq_len in encoder_seq_len_list] # pre-process config print('--> Config model') rknn.config(quantized_algorithm='normal', quantized_method='channel', target_platform='rk3588', optimization_level=3, single_core_mode=True, dynamic_input=input_shapes) print('done') # Load ONNX model print('--> Loading model') ret = rknn.load_onnx(model=ONNX_MODEL ) if ret != 0: print('Load model failed!') exit(ret) print('done') # Build model print('--> Building model') ret = rknn.build(do_quantization=QUANTIZE, dataset=DATASET, rknn_batch_size=None) if ret != 0: print('Build model failed!') exit(ret) print('done') # Export RKNN model print('--> Export RKNN model') ret = rknn.export_rknn(RKNN_MODEL) if ret != 0: print('Export RKNN model failed!') exit(ret) print('done') def convert_vision(): rknn = RKNN(verbose=True) ONNX_MODEL="vision_encoder.onnx" DATASET="dataset.txt" QUANTIZE=False # split the first Transformers block into a separate model because it's too large to fit in the rknn onnx.utils.extract_model(ONNX_MODEL, "vision_encoder_part1.onnx", ['pixel_values'], ['/blocks.0/blocks.0.0/channel_block/channel_attn/Add_output_0']) ##### Build stage 1, this will crash the python process, so we need to run it in a separate process code = f""" from rknn.api import RKNN rknn = RKNN(verbose=True) ONNX_MODEL="vision_encoder.onnx" RKNN_MODEL=ONNX_MODEL.replace(".onnx",".rknn") DATASET="dataset.txt" QUANTIZE=False batch_size = {batch_size} # pre-process config print('--> Config model') rknn.config(quantized_algorithm='normal', quantized_method='channel', target_platform='rk3588', optimization_level=3, single_core_mode=True) print('done') # Load ONNX model print('--> Loading model') ret = rknn.load_onnx(model=ONNX_MODEL, inputs=["pixel_values"], input_size_list=[[batch_size, 3, 768, 768]], ) if ret != 0: print('Load model failed!') exit(ret) print('done') print('--> Building model stage 1') ret = rknn.build(do_quantization=QUANTIZE, dataset=DATASET, rknn_batch_size=None) if ret != 0: print('Build model failed!') exit(ret) print('done') """ run_python_code(code) print("Build stage 1 done") intermidiate_model = onnx.load("check3_fuse_ops.onnx") # fuse ops from onnxscript.rewriter import pattern import onnx.numpy_helper as onh import numpy as np def tp_rs_tp_rs_tp_pattern(op, input1, perm1, shape2, perm3, shape4, perm5): i1 = op.Transpose(input1, perm=perm1) i2 = op.Reshape(i1, shape2) i3 = op.Transpose(i2, perm=perm3) i4 = op.Reshape(i3, shape4) i5 = op.Transpose(i4, perm=perm5) return i5 def fused_pattern(op, input1, perm1, shape2, perm3, shape4, perm5): rs1_shape = op.Constant(value=onh.from_array(np.array([input1.shape[0]* 3, input1.shape[1]//3, input1.shape[2], input1.shape[3]], dtype=np.int64))) fi1 = op.Reshape(input1, rs1_shape) fi2 = op.Transpose(fi1, perm=[0, 2, 1, 3]) elems = input1.shape[0] * input1.shape[1] * input1.shape[2] * input1.shape[3] rs4_shape = op.Constant(value=onh.from_array(np.array([elems / 32 / 144, 32, 1, 144], dtype=np.int64))) fi3 = op.Reshape(fi2, rs4_shape) return fi3 rewrite_rule = pattern.RewriteRule(tp_rs_tp_rs_tp_pattern, fused_pattern) rewrite_rule_set = pattern.RewriteRuleSet([rewrite_rule],commute=True) fused_model = onnxscript.rewriter.rewrite( intermidiate_model, pattern_rewrite_rules=rewrite_rule_set ) onnx.save(fused_model, "vision_encoder_part2.onnx") ONNX_MODEL = "vision_encoder_part2.onnx" RKNN_MODEL=ONNX_MODEL.replace(".onnx",".rknn") del intermidiate_model del fused_model rknn = RKNN(verbose=True) # pre-process config print('--> Config model') rknn.config(quantized_algorithm='normal', quantized_method='channel', target_platform='rk3588', optimization_level=3, single_core_mode=True) print('done') # Load ONNX model print('--> Loading model') ret = rknn.load_onnx(model="check3_fuse_ops.onnx", inputs=["/blocks.0/blocks.0.0/channel_block/channel_attn/Add_output_0-rs"], input_size_list=[[batch_size, 128, 1, 36864]],) if ret != 0: print('Load model failed!') exit(ret) print('done') # Build model print('--> Building model stage 2') ret = rknn.build(do_quantization=QUANTIZE, dataset=DATASET, rknn_batch_size=None) if ret != 0: print('Build model failed!') exit(ret) print('done') # Export RKNN model print('--> Export RKNN model') ret = rknn.export_rknn(RKNN_MODEL) if ret != 0: print('Export RKNN model failed!') exit(ret) print('done') def check_vision_model(): rknn = RKNN(verbose=True) ONNX_MODEL="vision_encoder.onnx" RKNN_MODEL=ONNX_MODEL.replace(".onnx",".rknn") DATASET="dataset.txt" QUANTIZE=False # pre-process config print('--> Config model') rknn.config(quantized_algorithm='normal', quantized_method='channel', target_platform='rk3588', optimization_level=3, single_core_mode=True ) print('done') # Load ONNX model print('--> Loading model') ret = rknn.load_onnx(model=ONNX_MODEL, inputs=["pixel_values"], input_size_list=[[batch_size, 3, vision_size[0], vision_size[1]]], ) if ret != 0: print('Load model failed!') exit(ret) print('done') # Build model print('--> Building model') ret = rknn.build(do_quantization=QUANTIZE, dataset=DATASET, rknn_batch_size=None) if ret != 0: print('Build model failed!') exit(ret) print('done') # Export RKNN model print('--> Export RKNN model') ret = rknn.export_rknn(RKNN_MODEL) if ret != 0: print('Export RKNN model failed!') exit(ret) print('done') #init runtime print('--> Init runtime environment') ret = rknn.init_runtime(target='rk3588') if ret != 0: print('Init runtime environment failed!') exit(ret) print('done') #precision check print('--> Precision check') ret = rknn.accuracy_analysis(inputs=["lena.png"], target='rk3588') if ret != 0: print('Precision check failed!') exit(ret) print('done') import argparse # python convert.py if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("model", type=str, help="Model to convert") parser.add_argument("--check", action="store_true", help="Check model") args = parser.parse_args() if args.model == "decoder": convert_decoder() elif args.model == "encoder": convert_encoder() # elif args.model == "embed": # embed is faster with cpu # convert_embed() elif args.model == "vision": if args.check: check_vision_model() else: convert_vision() elif args.model == "all": convert_decoder() convert_encoder() # convert_embed() convert_vision() else: print("Invalid model") exit(1)