一、环境说明
当前环境说明:
- python3.8.6
- ansible2.10.5
ansible版本差异说明:
- ansible2.0版本前后有较大改变,鉴于当前已经到了2.10版本,不再过多说明历史2.0版本的变动。可参考文章:链接
- ansible2.4,对于Inventory-->InventoryManager VariableManager类的使用,导入方式进行了改变。可参考文章:链接
- ansible2.8,对于初始化有了改变,2.7 使用了Python标准库里的命名元组来初始化选项,而2.8是Ansible自己封装了一个ImmutableDict,之后需要和 context结合使用的。
tip:pip3 install ansible的方式安装,ansible.cfg文件会在site-packages/ansible/galaxy/data/container/tests下。
二、官方代码解析
2.1、官方示例(Ad-hoc)
1 #!/usr/bin/env python 2 3 from __future__ import(absolute_import, division, print_function) 4 __metaclass__ =type 5 6 importjson 7 importshutil 8 9 importansible.constants as C 10 from ansible.executor.task_queue_manager importTaskQueueManager 11 from ansible.module_utils.common.collections importImmutableDict 12 from ansible.inventory.manager importInventoryManager 13 from ansible.parsing.dataloader importDataLoader 14 from ansible.playbook.play importPlay 15 from ansible.plugins.callback importCallbackBase 16 from ansible.vars.manager importVariableManager 17 from ansible importcontext 18 19 20 #Create a callback plugin so we can capture the output 21 classResultsCollectorJSONCallback(CallbackBase): 22 """A sample callback plugin used for performing an action as results come in. 23 24 If you want to collect all results into a single object for processing at 25 the end of the execution, look into utilizing the ``json`` callback plugin 26 or writing your own custom callback plugin. 27 """ 28 29 def __init__(self, *args, **kwargs): 30 super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs) 31 self.host_ok ={} 32 self.host_unreachable ={} 33 self.host_failed ={} 34 35 defv2_runner_on_unreachable(self, result): 36 host =result._host 37 self.host_unreachable[host.get_name()] =result 38 39 def v2_runner_on_ok(self, result, *args, **kwargs): 40 """Print a json representation of the result. 41 42 Also, store the result in an instance attribute for retrieval later 43 """ 44 host =result._host 45 self.host_ok[host.get_name()] =result 46 print(json.dumps({host.name: result._result}, indent=4)) 47 48 def v2_runner_on_failed(self, result, *args, **kwargs): 49 host =result._host 50 self.host_failed[host.get_name()] =result 51 52 53 defmain(): 54 host_list = ['localhost', 'www.example.com', 'www.google.com'] 55 #since the API is constructed for CLI it expects certain options to always be set in the context object 56 context.CLIARGS = ImmutableDict(connection='smart', module_path=['/to/mymodules', '/usr/share/ansible'], forks=10, become=None, 57 become_method=None, become_user=None, check=False, diff=False) 58 #required for 59 #https://github.com/ansible/ansible/blob/devel/lib/ansible/inventory/manager.py#L204 60 sources = ','.join(host_list) 61 if len(host_list) == 1: 62 sources += ',' 63 64 #initialize needed objects 65 loader = DataLoader() #Takes care of finding and reading yaml, json and ini files 66 passwords = dict(vault_pass='secret') 67 68 #Instantiate our ResultsCollectorJSONCallback for handling results as they come in. Ansible expects this to be one of its main display outlets 69 results_callback =ResultsCollectorJSONCallback() 70 71 #create inventory, use path to host config file as source or hosts in a comma separated string 72 inventory = InventoryManager(loader=loader, sources=sources) 73 74 #variable manager takes care of merging all the different sources to give you a unified view of variables available in each context 75 variable_manager = VariableManager(loader=loader, inventory=inventory) 76 77 #instantiate task queue manager, which takes care of forking and setting up all objects to iterate over host list and tasks 78 #IMPORTANT: This also adds library dirs paths to the module loader 79 #IMPORTANT: and so it must be initialized before calling `Play.load()`. 80 tqm =TaskQueueManager( 81 inventory=inventory, 82 variable_manager=variable_manager, 83 loader=loader, 84 passwords=passwords, 85 stdout_callback=results_callback, #Use our custom callback instead of the ``default`` callback plugin, which prints to stdout 86 ) 87 88 #create data structure that represents our play, including tasks, this is basically what our YAML loader does internally. 89 play_source =dict( 90 name="Ansible Play", 91 hosts=host_list, 92 gather_facts='no', 93 tasks=[ 94 dict(action=dict(module='shell', args='ls'), register='shell_out'), 95 dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))), 96 dict(action=dict(module='command', args=dict(cmd='/usr/bin/uptime'))), 97 ] 98 ) 99 100 #Create play object, playbook objects use .load instead of init or new methods, 101 #this will also automatically create the task objects from the info provided in play_source 102 play = Play().load(play_source, variable_manager=variable_manager, loader=loader) 103 104 #Actually run it 105 try: 106 result = tqm.run(play) #most interesting data for a play is actually sent to the callback's methods 107 finally: 108 #we always need to cleanup child procs and the structures we use to communicate with them 109 tqm.cleanup() 110 ifloader: 111 loader.cleanup_all_tmp_files() 112 113 #Remove ansible tmpdir 114 shutil.rmtree(C.DEFAULT_LOCAL_TMP, True) 115 116 print("UP ***********") 117 for host, result inresults_callback.host_ok.items(): 118 print('{0} >>> {1}'.format(host, result._result['stdout'])) 119 120 print("FAILED *******") 121 for host, result inresults_callback.host_failed.items(): 122 print('{0} >>> {1}'.format(host, result._result['msg'])) 123 124 print("DOWN *********") 125 for host, result inresults_callback.host_unreachable.items(): 126 print('{0} >>> {1}'.format(host, result._result['msg'])) 127 128 129 if __name__ == '__main__': 130 main()
2.2、拆分说明
- 导入模块详解
1 #初始化ansible的配置选项,比如: 指定远程用户remote_user=None 2 from ansible.module_utils.common.collections importImmutableDict 3 #上下文管理器,他就是用来接收 ImmutableDict 的示例对象 4 from ansible importcontext 5 #管理资源库,读取yaml/json/ini格式的文件,主要用于读取inventory文件 6 from ansible.parsing.dataloader importDataLoader 7 #变量管理器,包括主机,组,扩展等变量 8 from ansible.vars.manager importVariableManager 9 #用于创建和管理inventory,导入inventory文件 10 from ansible.inventory.manager importInventoryManager 11 #用于执行 Ad-hoc 的类 12 from ansible.playbook.play importPlay 13 #ad-hoc ansible底层用到的任务队列 14 from ansible.executor.task_queue_manager importTaskQueueManager 15 #回调基类,用来定义回调事件,比如返回失败成功等信息 16 from ansible.plugins.callback importCallbackBase 17 #执行playbook 18 from ansible.executor.playbook_executor importPlaybookExecutor 19 #操作单个主机,可以给主机添加变量等操作 20 from ansible.inventory.host importHost 21 #操作单个主机组,可以给组添加变量等操作 22 from ansible.inventory.group importGroup 23 #用于获取 ansible 产生的临时文档 24 import ansible.constants as C
- 回调类
1 classResultsCollectorJSONCallback(CallbackBase): 2 """ 3 回调插件就是一个类。 将执行命令的结果放到一个对象中,一边用json或自定义插件,获取执行结果的。 我们可以改写这个类,以便满足我们查询执行结果格式的需求。 4 """ 5 6 def __init__(self, *args, **kwargs): 7 super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs) 8 self.host_ok ={} 9 self.host_unreachable ={} 10 self.host_failed ={} 11 12 defv2_runner_on_unreachable(self, result): 13 host =result._host 14 self.host_unreachable[host.get_name()] =result 15 16 def v2_runner_on_ok(self, result, *args, **kwargs): 17 """ 18 将结果以json形式打印。indent=4 是json格式美化参数,便于阅读 19 将结果存储在实例属性中,以便稍后检索 20 """ 21 host =result._host 22 self.host_ok[host.get_name()] =result 23 print(json.dumps({host.name: result._result}, indent=4)) 24 25 def v2_runner_on_failed(self, result, *args, **kwargs): 26 host =result._host 27 self.host_failed[host.get_name()] = result
- 初始化选项和source
1 #构建一些初始化选项,在context对象中被设置 2 context.CLIARGS = ImmutableDict(connection='smart', module_path=['/to/mymodules', '/usr/share/ansible'], forks=10, become=None, 3 become_method=None, become_user=None, check=False, diff=False) 4 #source指inventory文件数据,默认指/etc/ansible/hosts,也可读取yaml/json/ini格式的文件,或者主机名以逗号隔开的字符串。 5 host_list = ['localhost', 'www.example.com', 'www.google.com'] 6 sources = ','.join(host_list) 7 if len(host_list) == 1: 8 sources += ','
- 实例化对象
1 #初始化所需的对象集 2 #管理资源库,读取yaml/json/ini格式的文件,主要用于读取inventory文件 3 loader =DataLoader() 4 #密码这里是必须使用的一个参数,假如通过公钥信任,也可以给一个空字典:dict() 5 passwords = dict(vault_pass='secret') 6 7 #回调对象,用于查询执行结果 8 results_callback =ResultsCollectorJSONCallback() 9 10 #资源管理器,创建inventory对象,引入上面的source。 11 #也可以:sources='/etc/ansible/hosts'。也可以:sources='host1,host2,...' 12 inventory = InventoryManager(loader=loader, sources=sources) 13 14 #变量管理器,管理变量 15 variable_manager = VariableManager(loader=loader, inventory=inventory)
- 任务队列管理器
1 #实例化任务队列管理器 2 #tip: 会加载库目录到loader里 3 #tip: 必须在`Play.load()`之前初始化队列实例 4 tqm =TaskQueueManager( 5 inventory=inventory, 6 variable_manager=variable_manager, 7 loader=loader, 8 passwords=passwords, 9 stdout_callback=results_callback, #回调实例 10 )
- ansible命令模块和参数
1 play_source =dict( 2 name="Ansible Play", #名字,只未方便阅读 3 hosts=host_list, #执行主机列表 4 gather_facts='no', 5 tasks=[ 6 dict(action=dict(module='shell', args='ls'), register='shell_out'), 7 dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))), 8 dict(action=dict(module='command', args=dict(cmd='/usr/bin/uptime'))), 9 ] 10 )
- 执行
1 #定义play对象 2 play = Play().load(play_source, variable_manager=variable_manager, loader=loader) 3 4 #在队列中执行play对象 5 try: 6 result = tqm.run(play) #执行的结果返回码,成功是 0 7 finally: 8 #如果 `tqm` 不是 `None`, 需要清理子进程和我们用来与它们通信的结构。 9 tqm.cleanup() 10 ifloader: 11 loader.cleanup_all_tmp_files() 12 13 #最后删除 ansible 产生的临时目录 14 #这个临时目录会在 ~/.ansible/tmp/ 目录下 15 shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
- 查询结果
1 print("UP ***********") 2 for host, result inresults_callback.host_ok.items(): 3 print('{0} >>> {1}'.format(host, result._result['stdout'])) 4 5 print("FAILED *******") 6 for host, result inresults_callback.host_failed.items(): 7 print('{0} >>> {1}'.format(host, result._result['msg'])) 8 9 print("DOWN *********") 10 for host, result inresults_callback.host_unreachable.items(): 11 print('{0} >>> {1}'.format(host, result._result['msg']))
2.3、调用playbook
1 from ansible.executor.playbook_executor importPlaybookExecutor 2 3 playbook = PlaybookExecutor(playbooks=['/root/test.yml'], #tip:这是列表 4 inventory=inventory, 5 variable_manager=variable_manager, 6 loader=loader, 7 passwords=passwords) 8 9 #使用回调函数 10 playbook._tqm._stdout_callback =results_callback 11 12 result =playbook.run() 13 14 for host, result inresults_callback.host_ok.items(): 15 print("主机{}, 执行结果{}".format(host, result._result['result']['stdout'])
三、二次开发
1 #-*- coding:utf-8 -*- 2 importjson 3 importshutil 4 from ansible.module_utils.common.collections importImmutableDict 5 from ansible importcontext 6 from ansible.parsing.dataloader importDataLoader 7 from ansible.vars.manager importVariableManager 8 from ansible.inventory.manager importInventoryManager 9 from ansible.playbook.play importPlay 10 from ansible.executor.playbook_executor importPlaybookExecutor 11 from ansible.executor.task_queue_manager importTaskQueueManager 12 from ansible.plugins.callback importCallbackBase 13 importansible.constants as C 14 15 16 classResultCallback(CallbackBase): 17 """ 18 重写callbackBase类的部分方法 19 """ 20 21 def __init__(self, *args, **kwargs): 22 #python3支持这样的语法 23 #super().__init__(*args, **kwargs) 24 #python2要用这样的语法 25 super(ResultCallback, self).__init__(*args, **kwargs) 26 self.host_ok ={} 27 self.host_unreachable ={} 28 self.host_failed ={} 29 #self.task_ok = {} 30 31 defv2_runner_on_unreachable(self, result): 32 self.host_unreachable[result._host.get_name()] =result 33 34 def v2_runner_on_ok(self, result, **kwargs): 35 self.host_ok[result._host.get_name()] =result 36 37 def v2_runner_on_failed(self, result, **kwargs): 38 self.host_failed[result._host.get_name()] =result 39 40 41 classMyAnsiable(): 42 def __init__(self, 43 connection='local', 44 module_path=None, 45 remote_user=None, 46 ack_pass=None, 47 sudo=None, 48 sudo_user=None, 49 ask_sudo_pass=None, 50 become=None, 51 become_method=None, 52 become_user=None, 53 listhosts=None, 54 listtasks=None, 55 listtags=None, 56 verbosity=3, 57 syntax=None, 58 start_at_task=None, 59 check=False, 60 diff=False, 61 inventory=None, 62 passwords=None): 63 """ 64 初始化函数,定义的默认的选项值, 65 在初始化的时候可以传参,以便覆盖默认选项的值 66 """ 67 context.CLIARGS =ImmutableDict( 68 connection=connection, 69 module_path=module_path, 70 remote_user=remote_user, 71 ack_pass=ack_pass, 72 sudo=sudo, 73 sudo_user=sudo_user, 74 ask_sudo_pass=ask_sudo_pass, 75 become=become, 76 become_method=become_method, 77 become_user=become_user, 78 listhosts=listhosts, 79 listtasks=listtasks, 80 listtags=listtags, 81 verbosity=verbosity, 82 syntax=syntax, 83 start_at_task=start_at_task, 84 check=check, 85 diff=diff 86 ) 87 88 #三元表达式,假如没有传递 inventory, 就使用 "localhost," 89 #确定 inventory 文件 90 self.inventory = inventory if inventory else "localhost," 91 92 #三元表达式,假如没有传递 passwords, 就使用 {} 93 #设置密码,可以为空字典,但必须有此参数 94 self.passwords = passwords if passwords else{} 95 96 #实例化数据解析器 97 self.loader =DataLoader() 98 99 #实例化 资产配置对象 100 self.inventory_manager_obj = InventoryManager(loader=self.loader, sources=self.inventory) 101 102 #变量管理器 103 self.variable_manager_obj =VariableManager(self.loader, self.inventory_manager_obj) 104 105 #实例化回调插件对象 106 self.results_callback =ResultCallback() 107 108 def adhoc(self, name='Ad-hoc', hosts='localhost', gather_facts="no", module="ping", args=''): 109 play_source =dict( 110 name=name, 111 hosts=hosts, 112 gather_facts=gather_facts, 113 tasks=[ 114 #这里每个 task 就是这个列表中的一个元素,格式是嵌套的字典 115 #也可以作为参数传递过来,这里就简单化了。 116 {"action": {"module": module, "args": args}}, 117 ]) 118 119 play = Play().load(play_source, variable_manager=self.variable_manager_obj, loader=self.loader) 120 121 tqm =None 122 try: 123 tqm =TaskQueueManager( 124 inventory=self.inventory_manager_obj, 125 variable_manager=self.variable_manager_obj, 126 loader=self.loader, 127 passwords=self.passwords, 128 stdout_callback=self.results_callback 129 ) 130 result =tqm.run(play) 131 finally: 132 if tqm is notNone: 133 tqm.cleanup() 134 shutil.rmtree(C.DEFAULT_LOCAL_TMP, True) 135 136 defplaybook(self, playbooks): 137 playbook = PlaybookExecutor(playbooks=playbooks, #Tip: 这里是一个列表 138 inventory=self.inventory_manager_obj, 139 variable_manager=self.variable_manager_obj, 140 loader=self.loader, 141 passwords=self.passwords 142 ) 143 144 #使用回调函数 145 playbook._tqm._stdout_callback =self.results_callback 146 #执行 147 result =playbook.run() 148 149 defget_result(self): 150 result_raw = {'success': {}, 'failed': {}, 'unreachable': {}} 151 #print(self.results_callback.host_ok) 152 for host, result inself.results_callback.host_ok.items(): 153 result_raw['success'][host] =result._result 154 155 #print(self.results_callback.host_failed) 156 for host, result inself.results_callback.host_failed.items(): 157 result_raw['failed'][host] =result._result 158 159 #print(self.results_callback.host_unreachable) 160 for host, result inself.results_callback.host_unreachable.items(): 161 result_raw['unreachable'][host] =result._result 162 163 #打印结果,并且使用 JSON 格式化 164 print(json.dumps(result_raw, indent=4))
四、运行示例
4.1、执行默认ad-hoc
#-*- coding:utf-8 -*- from MyAnsible importMyAnsiable ansible_obj =MyAnsiable() ansible_obj.adhoc() ansible_obj.get_result()
结果:
{ "success": { "localhost": { "ping": "pong", "invocation": { "module_args": { "data": "pong"} }, "ansible_facts": { "discovered_interpreter_python": "/usr/bin/python"}, "_ansible_no_log": false, "changed": false } }, "failed": {}, "unreachable": {} }
4.2、远程执行命令
#文件名: /root/ansible-playbook/api/hosts [tapi] 10.96.0.59 #文件名: ansible03.py#-*- coding:utf-8 -*- from MyAnsible importMyAnsiable #配置资产配置文件,并使用 ssh 的远程连接方式 ansible_obj = MyAnsiable(inventory='/root/ansible-playbook/api/hosts', connection='smart') #执行对象是hosts里的组名 ansible_obj.adhoc(hosts='tapi', module='shell', args='ip a |grep "inet"') ansible_obj.get_result()
结果:
{ "failed": {}, "success": { "10.96.0.59": { "stderr_lines": [], "cmd": "ip a |grep \"inet\"", "end": "2021-01-28 15:41:25.972706", "_ansible_no_log": false, "stdout": "inet 127.0.0.1/8 scope host lo\n inet 10.96.0.59/24 brd 10.96.0.255 scope global dynamic eth0", "changed": true, "rc": 0, "start": "2021-01-28 15:41:25.960580", "stderr": "", "delta": "0:00:00.012126", "invocation": { "module_args": { "creates": null, "executable": null, "_uses_shell": true, "strip_empty_ends": true, "_raw_params": "ip a |grep \"inet\"", "removes": null, "argv": null, "warn": true, "chdir": null, "stdin_add_newline": true, "stdin": null } }, "stdout_lines": [ "inet 127.0.0.1/8 scope host lo", "inet 10.96.0.59/24 brd 10.96.0.255 scope global dynamic eth0"], "ansible_facts": { "discovered_interpreter_python": "/usr/bin/python"} } }, "unreachable": {} }
4.3、调用playbook
研究过程中,发现不能传递额外的系统变量, 后面实例懒的写了,看完我前面的代码,执行playbook应该不成问题。
参考资料
- 二开:https://zhuanlan.zhihu.com/p/118411015
- http://www.linuxboy.net/ansiblejc/143275.html
- ansible2.4版本前后模块差异:https://blog.csdn.net/qq_29832217/article/details/97005626
- 常用模块使用:https://blog.csdn.net/yyy72999/article/details/81184720
- 官网2.7 https://docs.ansible.com/ansible/2.7/dev_guide/developing_api.html
- 官网2.8https://docs.ansible.com/ansible/2.8/dev_guide/developing_api.html
- 官网2.10 https://docs.ansible.com/ansible/latest/dev_guide/developing_api.html